Add subscribe_events to Gitlab::Kas::Client for events platform

What does this MR do and why?

Adds Gitlab::Kas::Client#subscribe_events to support subscribing to CloudEvents from the events_platform service on GitLab Relay (KAS).

The method opens a bidirectional gRPC stream, yields each received CloudEvent to the caller's block, and acknowledges the broker after each successful block return. If the block raises, the in-flight event is not acknowledged and will be redelivered (at-least-once semantics).

This is the symmetric counterpart to Gitlab::Kas::Client#publish_events added in !238362 (merged).

Notes on the implementation

  • Per-call deadline. The shared Gitlab::Kas::Client stub is constructed with timeout: Gitlab::Kas.client_timeout_seconds (5 seconds by default). gRPC Ruby applies that timeout as a deadline on every call, including streaming RPCs — which would kill any long-running subscription after a few seconds. subscribe_events overrides this with a bounded per-call deadline that defaults to 110 minutes (configurable via the deadline: kwarg). The default sits slightly below KAS's server-side max_connection_age (2 hours, see internal/cmd/kas/defaulting.go:50) so the client closes the stream cleanly before the server cutoff.
  • No auto-reconnect. When the stream closes (server-side max-age, deadline expiration, network error), the method returns and the caller decides whether to reconnect. A resilient wrapper (subscribe_events_resilient) that adds reconnect-with-backoff, observability, and graceful-stop semantics is planned as a follow-up MR (Phase B-1.5), before the bridge worker.
  • No custom stop API. The loop terminates when (a) the server closes the stream, or (b) the block raises. Long-running callers (e.g. the future bridge worker) signal shutdown by raising a sentinel exception from inside the block.
  • SizedQueue for ack handoff. The ack queue between the response loop and the request enumerator is a SizedQueue.new(1) rather than an unbounded Queue. This expresses the synchronous-handoff intent explicitly: at most one ack is in flight at a time, and back-pressure surfaces immediately if the response loop ever outpaces the request enumerator.
  • SubscribeResponse#event. The yielded CloudEvent comes from the proto's event field (not cloud_event — that field name doesn't exist on the proto).

References

Related to #594102 (closed)

Roll out issue: #602413

Phase A (publish): !238362 (merged)

Screenshots or screen recordings

Screenshot_2026-06-09_at_12.30.51

How to set up and validate locally

  1. In your GDK directory run
    gdk config set gitlab_k8s_agent.enabled true
    gdk reconfigure
    If it's not yet enabled in gdk.yml.
  2. Edit gitlab-k8s-agent-config.yml and add events_platform: {} at the end.
  3. Run gdk restart.
  4. The simplest way to publish test events is to use relay-playground. Follow "Quick start" to configure. Use a 2nd terminal window for that.
  5. Start the Relay Playground TUI.
  6. In the TUI, hit ctrl+p to open settings. Set the Publish topic to test.subscribe and consumer group to rails-test.
  7. In the 1st terminal window, from the gitlab directory, run gdk rails c.
  8. Enable the feature flag: Feature.enable(:subscribe_events_from_relay)
  9. Start a subscribe loop (this blocks the console — that's expected):
    Gitlab::Kas::Client.new.subscribe_events(
      topic: 'test.subscribe',
      consumer_group: 'rails-test'
    ) do |event|
      puts "Received: id=#{event.id} type=#{event.type} source=#{event.source} data=#{event.text_data.inspect}"
    end
  10. In the Relay Playground TUI, type a CloudEvent JSON into the input area and hit ctrl+s to publish it. For example:
    {
      "type": "com.example.test",
      "source": "relay.playground",
      "textData": "{\"message\": \"hello from playground\"}"
    }
  11. The Rails console should print the received event within a second or two. Publish a few more from the TUI to confirm subsequent events arrive and previously-acked events are not redelivered.

Verifying the no-ack-on-raise contract

To confirm that a raising block does not acknowledge the in-flight event:

  1. In Rails console, run the subscribe loop with a raise inside the block:
    Gitlab::Kas::Client.new.subscribe_events(
      topic: 'test.subscribe',
      consumer_group: 'rails-test'
    ) do |event|
      puts "Saw: #{event.id}"
      raise 'simulated handler failure'
    end
  2. Publish a fresh event from the TUI. The block prints Saw: <id> then raises.
  3. Re-enter the subscribe loop without the raise. The same event arrives again — proving it was not acknowledged.

MR acceptance checklist

Evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.

Edited by Vitali Tatarintev

Merge request reports

Loading