ActiveContext: Allow flexible throughput adjustment for bulk processing

What does this MR do and why?

During initial staging tests for gitlab-org/gitlab, we found that the embeddings generation is really slow, mostly due to the BulkProcessWorker processing only 1000 documents at a time with no parallelization. (See staging test findings: #554923 (comment 2657881221))

In this MR, we introduce flexible throughput adjustment by allowing changes in the following values:

  • number_of_shards = number of parallel queues per collection
  • shard_limit = the batch size or number of queued items to process per shard at a time

References

Screenshots or screen recordings

N/A

How to set up and validate locally

queue_shard_count / queue parallelization

  1. Update the Code collection record's queue_shard_count

    ::Ai::ActiveContext::Collections::Code.collection_record.update_options!({queue_shard_count: 3 })
  2. Queue some refs for the Code collection

    refs_for_queue = (1..10).map(&:to_s)
    ::Ai::ActiveContext::Collections::Code.track_refs!(routing: "1", hashes: refs_for_queue)
  3. Verify that the refs/items were added to 3 queues

    ::Ai::ActiveContext::Queues::Code.queued_items
    => {
      0=>[
        ["Ai::ActiveContext::References::Code|2|1|7", 1.0], 
        ["Ai::ActiveContext::References::Code|2|1|9", 2.0], 
        ["Ai::ActiveContext::References::Code|2|1|10", 3.0]
      ],
      1=>[
        ["Ai::ActiveContext::References::Code|2|1|1", 1.0],
        ["Ai::ActiveContext::References::Code|2|1|5", 2.0],
        ["Ai::ActiveContext::References::Code|2|1|6", 3.0],
        ["Ai::ActiveContext::References::Code|2|1|8", 4.0]
      ],
      2=>[
        ["Ai::ActiveContext::References::Code|2|1|2", 1.0], 
        ["Ai::ActiveContext::References::Code|2|1|3", 2.0], 
        ["Ai::ActiveContext::References::Code|2|1|4", 3.0]
      ]
    }

queue_shard_limit / bulk processing

This is a bit trickier to test because it involves actual embeddings generations, but you can run Code Indexing for a project then run bulk-processing.

  1. Follow the "End to end using Index State Management tasks" guide up to the Run process_pending_enabled_namespace task section

  2. Ensure that the Ai::ActiveContext::Code::Repository record created is still at a pending state

  3. Run the Indexer for that repository, e.g.:

    r = Ai::ActiveContext::Code::Repository.pending.first
    Ai::ActiveContext::Code::RepositoryIndexWorker.new.perform(r.id)
  4. Verify that there are items in the Code queue:

    ::Ai::ActiveContext::Queues::Code.queued_items
  5. Update the queue_shard_limit to a small value so for easier checking, e.g.:

    ::Ai::ActiveContext::Collections::Code.collection_record.update_options!({queue_shard_limit: 2 })
  6. Run the BulkProcessWorker.perform for one of the Code queue shards.

    shard_number = 0
    ::Ai::ActiveContext::BulkProcessWorker.new.perform("Ai::ActiveContext::Queues::Code", shard_number)

    Verify that the items have been processed by checking the queues again. The number of items should be reduced in the queue with the specified shard.

    ::Ai::ActiveContext::Queues::Code.queued_items

    Note: Since we are testing synchronously, this will only pick up items up to the queue_shard_limit from the given shard. In the chron-scheduled BulkProcessWorker, it should process the queues in parallel.

MR acceptance checklist

Evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.

Edited by Pam Artiaga

Merge request reports

Loading