Skip to content

Bulk publish events with Gitlab::EventStore

Problem

When publishing events using Gitlab::EventStore we can sometimes end up publishing many similar events. As each event is at least 1 Sidekiq job scheduled we may have a burst of Sidekiq jobs in the queue.

Proposal

One idea proposed by @reprazent is to have a mechanism to batch publish events/consume events so that less Sidekiq jobs are scheduled and each Sidekiq job takes longer by iterating through the batch.

One way we could implement that would be:

Gitlab::EventStore.publish_group(events)
store.subscribe SomeWorker, to: SomeEvent, group_size: 100 
store.subscribe AnotherWorker, to: SomeEvent # default `group_size: 10`
diff --git a/lib/gitlab/event_store/subscriber.rb b/lib/gitlab/event_store/subscriber.rb
index da95d3cfcfa8..1e03af005b55 100644
--- a/lib/gitlab/event_store/subscriber.rb
+++ b/lib/gitlab/event_store/subscriber.rb
@@ -29,11 +29,15 @@ module Subscriber
       def perform(event_type, data)
         raise InvalidEvent, event_type unless self.class.const_defined?(event_type)
 
-        event = event_type.constantize.new(
-          data: data.with_indifferent_access
-        )
+        event_type_class = event_type.constantize
 
-        handle_event(event)
+        Array.wrap(data).each do |single_event_data|
+          event = event_type_class.new(
+            data: single_event_data.with_indifferent_access
+          )
+
+          handle_event(event) # TODO: error handling for a single event?
+        end
       end
 
       def handle_event(event)
  • The default group_size is 10 when not specified.
  • A single-event publish action is implemented as iterating through an array of 1 event. This would allow us to treat group and single publishing the same way.
  • Sidekiq workers are scheduled with a max number of events in the payload as the group_size .

Challenges

  • An event could have in theory a large payload, what would be a reasonable batch size?
    • Answer: using group_size on the subscriber-side we can tweak the size based on the payload and the processing that the subscriber is going to do.
Edited by Fabio Pitino