Add subscribe_events to Gitlab::Kas::Client for events platform
What does this MR do and why?
Adds Gitlab::Kas::Client#subscribe_events to support subscribing to CloudEvents from the events_platform service on GitLab Relay (KAS).
The method opens a bidirectional gRPC stream, yields each received CloudEvent to the caller's block, and acknowledges the broker after each successful block return. If the block raises, the in-flight event is not acknowledged and will be redelivered (at-least-once semantics).
This is the symmetric counterpart to Gitlab::Kas::Client#publish_events added in !238362 (merged).
Notes on the implementation
- Per-call deadline. The shared
Gitlab::Kas::Clientstub is constructed withtimeout: Gitlab::Kas.client_timeout_seconds(5 seconds by default). gRPC Ruby applies that timeout as a deadline on every call, including streaming RPCs — which would kill any long-running subscription after a few seconds.subscribe_eventsoverrides this with a bounded per-call deadline that defaults to 110 minutes (configurable via thedeadline:kwarg). The default sits slightly below KAS's server-sidemax_connection_age(2 hours, seeinternal/cmd/kas/defaulting.go:50) so the client closes the stream cleanly before the server cutoff. - No auto-reconnect. When the stream closes (server-side max-age, deadline expiration, network error), the method returns and the caller decides whether to reconnect. A resilient wrapper (
subscribe_events_resilient) that adds reconnect-with-backoff, observability, and graceful-stop semantics is planned as a follow-up MR (Phase B-1.5), before the bridge worker. - No custom stop API. The loop terminates when (a) the server closes the stream, or (b) the block raises. Long-running callers (e.g. the future bridge worker) signal shutdown by raising a sentinel exception from inside the block.
SizedQueuefor ack handoff. The ack queue between the response loop and the request enumerator is aSizedQueue.new(1)rather than an unboundedQueue. This expresses the synchronous-handoff intent explicitly: at most one ack is in flight at a time, and back-pressure surfaces immediately if the response loop ever outpaces the request enumerator.SubscribeResponse#event. The yielded CloudEvent comes from the proto'seventfield (notcloud_event— that field name doesn't exist on the proto).
References
Related to #594102 (closed)
Roll out issue: #602413
Phase A (publish): !238362 (merged)
Screenshots or screen recordings
How to set up and validate locally
- In your GDK directory run
If it's not yet enabled in
gdk config set gitlab_k8s_agent.enabled true gdk reconfiguregdk.yml. - Edit
gitlab-k8s-agent-config.ymland addevents_platform: {}at the end. - Run
gdk restart. - The simplest way to publish test events is to use
relay-playground. Follow "Quick start" to configure. Use a 2nd terminal window for that. - Start the Relay Playground TUI.
- In the TUI, hit
ctrl+pto open settings. Set the Publish topic totest.subscribeand consumer group torails-test. - In the 1st terminal window, from the
gitlabdirectory, rungdk rails c. - Enable the feature flag:
Feature.enable(:subscribe_events_from_relay) - Start a subscribe loop (this blocks the console — that's expected):
Gitlab::Kas::Client.new.subscribe_events( topic: 'test.subscribe', consumer_group: 'rails-test' ) do |event| puts "Received: id=#{event.id} type=#{event.type} source=#{event.source} data=#{event.text_data.inspect}" end - In the Relay Playground TUI, type a CloudEvent JSON into the input area and hit
ctrl+sto publish it. For example:{ "type": "com.example.test", "source": "relay.playground", "textData": "{\"message\": \"hello from playground\"}" } - The Rails console should print the received event within a second or two. Publish a few more from the TUI to confirm subsequent events arrive and previously-acked events are not redelivered.
Verifying the no-ack-on-raise contract
To confirm that a raising block does not acknowledge the in-flight event:
- In Rails console, run the subscribe loop with a
raiseinside the block:Gitlab::Kas::Client.new.subscribe_events( topic: 'test.subscribe', consumer_group: 'rails-test' ) do |event| puts "Saw: #{event.id}" raise 'simulated handler failure' end - Publish a fresh event from the TUI. The block prints
Saw: <id>then raises. - Re-enter the subscribe loop without the raise. The same event arrives again — proving it was not acknowledged.
MR acceptance checklist
Evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.
