ActiveContext: Allow flexible throughput adjustment for bulk processing
What does this MR do and why?
During initial staging tests for gitlab-org/gitlab, we found that the embeddings generation is really slow, mostly due to the BulkProcessWorker processing only 1000 documents at a time with no parallelization. (See staging test findings: #554923 (comment 2657881221))
In this MR, we introduce flexible throughput adjustment by allowing changes in the following values:
-
number_of_shards= number of parallel queues per collection -
shard_limit= the batch size or number of queued items to process per shard at a time
References
- Issue: [ActiveContext] Adjust throughput for BulkProce... (#558004 - closed)
- see "Context" section for more information on throughput control
- Solution discussion: !199853 (comment 2660475880)
Screenshots or screen recordings
N/A
How to set up and validate locally
queue_shard_count / queue parallelization
-
Update the
Codecollection record'squeue_shard_count::Ai::ActiveContext::Collections::Code.collection_record.update_options!({queue_shard_count: 3 }) -
Queue some refs for the
Codecollectionrefs_for_queue = (1..10).map(&:to_s) ::Ai::ActiveContext::Collections::Code.track_refs!(routing: "1", hashes: refs_for_queue) -
Verify that the refs/items were added to 3 queues
::Ai::ActiveContext::Queues::Code.queued_items => { 0=>[ ["Ai::ActiveContext::References::Code|2|1|7", 1.0], ["Ai::ActiveContext::References::Code|2|1|9", 2.0], ["Ai::ActiveContext::References::Code|2|1|10", 3.0] ], 1=>[ ["Ai::ActiveContext::References::Code|2|1|1", 1.0], ["Ai::ActiveContext::References::Code|2|1|5", 2.0], ["Ai::ActiveContext::References::Code|2|1|6", 3.0], ["Ai::ActiveContext::References::Code|2|1|8", 4.0] ], 2=>[ ["Ai::ActiveContext::References::Code|2|1|2", 1.0], ["Ai::ActiveContext::References::Code|2|1|3", 2.0], ["Ai::ActiveContext::References::Code|2|1|4", 3.0] ] }
queue_shard_limit / bulk processing
This is a bit trickier to test because it involves actual embeddings generations, but you can run Code Indexing for a project then run bulk-processing.
-
Follow the "End to end using Index State Management tasks" guide up to the
Run process_pending_enabled_namespace tasksection -
Ensure that the
Ai::ActiveContext::Code::Repositoryrecord created is still at a pending state -
Run the Indexer for that repository, e.g.:
r = Ai::ActiveContext::Code::Repository.pending.first Ai::ActiveContext::Code::RepositoryIndexWorker.new.perform(r.id) -
Verify that there are items in the Code queue:
::Ai::ActiveContext::Queues::Code.queued_items -
Update the
queue_shard_limitto a small value so for easier checking, e.g.:::Ai::ActiveContext::Collections::Code.collection_record.update_options!({queue_shard_limit: 2 }) -
Run the
BulkProcessWorker.performfor one of theCodequeue shards.shard_number = 0 ::Ai::ActiveContext::BulkProcessWorker.new.perform("Ai::ActiveContext::Queues::Code", shard_number)Verify that the items have been processed by checking the queues again. The number of items should be reduced in the queue with the specified shard.
::Ai::ActiveContext::Queues::Code.queued_itemsNote: Since we are testing synchronously, this will only pick up items up to the
queue_shard_limitfrom the given shard. In the chron-scheduledBulkProcessWorker, it should process the queues in parallel.
MR acceptance checklist
Evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.