Events Platform Phase 1 first iteration
What does this MR do and why?
POC: Events Platform
- Addressing: #593564 (closed)
- Parent epic: &21345
It introduces the first iteration of the GitLab Events Platform (Phase 1), building an internal event-driven architecture on top of the existing Gitlab::EventStore. This MR uses the merge request reviewer assignment flow as the initial use case to validate the platform design.
The Events Platform enables decoupled, asynchronous communication between GitLab subsystems by publishing and consuming CloudEvents v1.0 compliant events through Sidekiq workers.
What's included
Relay::CloudEvent- A new event class extendingGitlab::EventStore::Eventthat enforces theCloudEventsv1.0schemaRelay::PublishEventService- Constructs and publishesCloudEvents, validating event payloads against schema before publishing to the EventStoreRelay::ConsumeEventService- ReceivesCloudEvents, parses the event type, resolves the appropriate dispatcher, and delegates processingRelay::ConsumeEventsWorker- Sidekiq worker subscribed toRelay::CloudEventthat triggers the consume service- Driver layer (
Relay::Drivers::MergeRequests) - Generates event-specific data and source/subject paths for merge request events - Dispatcher layer (
Relay::Dispatchers::MergeRequests) - Routes consumed events to the correct handler based on event type - JSON schema validation - Schema definition for the
assigned_reviewersevent payload - Feature flag
events_platform_trigger_cloud_events(gitlab_com_derisk) - at the organization level
Architecture
The event flow follows a publish-subscribe pattern:
MergeRequests::BaseServicecallsRelay::PublishEventServicewhen reviewers change- The Driver generates a CloudEvents-compliant payload with JSON schema validation
- The event is published to
Gitlab::EventStore Relay::ConsumeEventsWorkerpicks up the event (gated by feature flag)Relay::ConsumeEventServiceparses the event type and delegates to the appropriate Dispatcher- The Dispatcher executes the domain-specific handler
Feature flag
- Name:
events_platform_trigger_cloud_events - Type:
gitlab_com_derisk - Scoped to: Project
- Default: disabled
- Rollout issue: #595528
Next Step
- Implement 1-3 trigger events and wire up to the events platform to complete and verify the full end-to-end flow.
- Work Item and discussion: #594482 (comment 3199161397)
How to set up and validate locally (using Rails console)
Prerequisites: Two terminal windows - one for watching logs, one for Rails console.
1. Terminal 1 (Logs: be sure to be in gdk root)
tail -f gitlab/log/application_json.log | jq -r 'select(.class? == "Relay::ConsumeEventService" or .message? == "Processing assigned_reviewers") | "\(.severity // "INFO") \(.message) \(del(.severity, .message, .time, .correlation_id) | to_entries | map("\(.key)=\(.value)") | join(" "))"'2. Terminal 2: Rails console (gdk rails c):
- Set up test data (see details for script)
user = User.find_by(username: 'root') || User.first
org = Organizations::Organization.first || Organizations::Organization.default_organization
project = user.projects.first
branch_name = "test-branch-#{Time.current.to_i}"
project.repository.create_branch(branch_name, project.default_branch)
mr = MergeRequests::CreateService.new(
project: project,
current_user: user,
params: {
title: 'Test MR',
source_branch: branch_name,
target_branch: project.default_branch
}
).execute
r1_result = Users::CreateService.new(user, {
name: "Reviewer One",
username: "r1_#{Time.current.to_i}",
email: "r1_#{Time.current.to_i}@test.com",
password: 'P@ssw0rd!123',
skip_confirmation: true,
organization_id: org.id
}).execute
r1 = r1_result.payload[:user]
r2_result = Users::CreateService.new(user, {
name: "Reviewer Two",
username: "r2_#{Time.current.to_i}",
email: "r2_#{Time.current.to_i}@test.com",
password: 'P@ssw0rd!123',
skip_confirmation: true,
organization_id: org.id
}).execute
r2 = r2_result.payload[:user]
project.add_developer(r1)
project.add_developer(r2)
puts "MR ID: #{mr.id}, Reviewers: #{r1.id}, #{r2.id}"- Enable the feature flag:
Feature.enable(:events_platform_trigger_cloud_events) - Trigger
PublishEventService(see details for script)
begin
Relay::PublishEventService.new(
:merge_requests,
:assigned_reviewers,
user,
org,
mr,
data: { new_reviewers: [r1] }
).execute
puts "PublishEventService executed successfully"
rescue => e
puts "Error: #{e.class} - #{e.message}"
end
sleep 2
puts ""- Switch to Terminal 1 and verify event publishing and consuming logs appear (see screenshot below)
Log examples
INFO Starting event consumption meta.feature_category=code_suggestions meta.caller_id=Relay::ConsumeEventsWorker class=Relay::ConsumeEventService event_id=b66b9ec8-ac0c-41d0-83fc-de7fbdd48167 event_type=com.gitlab.merge_requests.assigned_reviewers event_source=/projects/3/merge_requests/61
INFO Processing assigned_reviewers meta.feature_category=code_suggestions meta.caller_id=Relay::ConsumeEventsWorker class=Relay::ConsumeEventService event_id=b66b9ec8-ac0c-41d0-83fc-de7fbdd48167 merge_request_id=321 reviewer_ids=[128]
INFO Event consumption completed meta.feature_category=code_suggestions meta.caller_id=Relay::ConsumeEventsWorker class=Relay::ConsumeEventService event_id=b66b9ec8-ac0c-41d0-83fc-de7fbdd48167 event_type=com.gitlab.merge_requests.assigned_reviewersScreenshot
MR acceptance checklist
Evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.

