Events Platform Phase 1 first iteration

What does this MR do and why?

POC: Events Platform

It introduces the first iteration of the GitLab Events Platform (Phase 1), building an internal event-driven architecture on top of the existing Gitlab::EventStore. This MR uses the merge request reviewer assignment flow as the initial use case to validate the platform design.

The Events Platform enables decoupled, asynchronous communication between GitLab subsystems by publishing and consuming CloudEvents v1.0 compliant events through Sidekiq workers.

events_platform.drawio

What's included

  • Relay::CloudEvent - A new event class extending Gitlab::EventStore::Event that enforces the CloudEvents v1.0 schema
  • Relay::PublishEventService - Constructs and publishes CloudEvents, validating event payloads against schema before publishing to the EventStore
  • Relay::ConsumeEventService - Receives CloudEvents, parses the event type, resolves the appropriate dispatcher, and delegates processing
  • Relay::ConsumeEventsWorker - Sidekiq worker subscribed to Relay::CloudEvent that triggers the consume service
  • Driver layer (Relay::Drivers::MergeRequests) - Generates event-specific data and source/subject paths for merge request events
  • Dispatcher layer (Relay::Dispatchers::MergeRequests) - Routes consumed events to the correct handler based on event type
  • JSON schema validation - Schema definition for the assigned_reviewers event payload
  • Feature flag events_platform_trigger_cloud_events (gitlab_com_derisk) - at the organization level

Architecture

The event flow follows a publish-subscribe pattern:

  1. MergeRequests::BaseService calls Relay::PublishEventService when reviewers change
  2. The Driver generates a CloudEvents-compliant payload with JSON schema validation
  3. The event is published to Gitlab::EventStore
  4. Relay::ConsumeEventsWorker picks up the event (gated by feature flag)
  5. Relay::ConsumeEventService parses the event type and delegates to the appropriate Dispatcher
  6. The Dispatcher executes the domain-specific handler

Feature flag

  • Name: events_platform_trigger_cloud_events
  • Type: gitlab_com_derisk
  • Scoped to: Project
  • Default: disabled
  • Rollout issue: #595528

Next Step

  • Implement 1-3 trigger events and wire up to the events platform to complete and verify the full end-to-end flow.

How to set up and validate locally (using Rails console)

Prerequisites: Two terminal windows - one for watching logs, one for Rails console.

1. Terminal 1 (Logs: be sure to be in gdk root)

tail -f gitlab/log/application_json.log | jq -r 'select(.class? == "Relay::ConsumeEventService" or .message? == "Processing assigned_reviewers") | "\(.severity // "INFO") \(.message) \(del(.severity, .message, .time, .correlation_id) | to_entries | map("\(.key)=\(.value)") | join(" "))"'

2. Terminal 2: Rails console (gdk rails c):

  1. Set up test data (see details for script)
user = User.find_by(username: 'root') || User.first
org = Organizations::Organization.first || Organizations::Organization.default_organization

project = user.projects.first

branch_name = "test-branch-#{Time.current.to_i}"
project.repository.create_branch(branch_name, project.default_branch)

mr = MergeRequests::CreateService.new(
  project: project,
  current_user: user,
  params: {
    title: 'Test MR',
    source_branch: branch_name,
    target_branch: project.default_branch
  }
).execute

r1_result = Users::CreateService.new(user, {
  name: "Reviewer One",
  username: "r1_#{Time.current.to_i}",
  email: "r1_#{Time.current.to_i}@test.com",
  password: 'P@ssw0rd!123',
  skip_confirmation: true,
  organization_id: org.id
}).execute
r1 = r1_result.payload[:user]

r2_result = Users::CreateService.new(user, {
  name: "Reviewer Two",
  username: "r2_#{Time.current.to_i}",
  email: "r2_#{Time.current.to_i}@test.com",
  password: 'P@ssw0rd!123',
  skip_confirmation: true,
  organization_id: org.id
}).execute
r2 = r2_result.payload[:user]

project.add_developer(r1)
project.add_developer(r2)

puts "MR ID: #{mr.id}, Reviewers: #{r1.id}, #{r2.id}"
  1. Enable the feature flag: Feature.enable(:events_platform_trigger_cloud_events)
  2. Trigger PublishEventService (see details for script)
begin
  Relay::PublishEventService.new(
    :merge_requests,
    :assigned_reviewers,
    user,
    org,
    mr,
    data: { new_reviewers: [r1] }
  ).execute
  puts "PublishEventService executed successfully"
rescue => e
  puts "Error: #{e.class} - #{e.message}"
end

sleep 2

puts ""
  1. Switch to Terminal 1 and verify event publishing and consuming logs appear (see screenshot below)

Log examples

INFO Starting event consumption meta.feature_category=code_suggestions meta.caller_id=Relay::ConsumeEventsWorker class=Relay::ConsumeEventService event_id=b66b9ec8-ac0c-41d0-83fc-de7fbdd48167 event_type=com.gitlab.merge_requests.assigned_reviewers event_source=/projects/3/merge_requests/61
INFO Processing assigned_reviewers meta.feature_category=code_suggestions meta.caller_id=Relay::ConsumeEventsWorker class=Relay::ConsumeEventService event_id=b66b9ec8-ac0c-41d0-83fc-de7fbdd48167 merge_request_id=321 reviewer_ids=[128]
INFO Event consumption completed meta.feature_category=code_suggestions meta.caller_id=Relay::ConsumeEventsWorker class=Relay::ConsumeEventService event_id=b66b9ec8-ac0c-41d0-83fc-de7fbdd48167 event_type=com.gitlab.merge_requests.assigned_reviewers

Screenshot

Screenshot_2026-04-07_at_17.15.10

MR acceptance checklist

Evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.

Edited by Shola Quadri

Merge request reports

Loading