Add bulk processor for ActiveContext and handle Elastic operations
What does this MR do and why?
Introduce bulk processor for ActiveContext and handle Elastic operations.
The BulkProcessor will be called from the BulkProcessQueue after we get refs from the queue to actually process them and re-add any failures. It mimics the BulkIndexer except that now we offload much of the handling to the adapters because they index and delete differently.
BulkProcessQueue will call:
-
process(ref)to build up ref operations for bulk. If the adapter returnstruehere, it will force a bulk operation - for Elaticsearch that will happen if the operations exceed the configured bulk size -
flushto call bulk on the built-up operations, return any failures and reset the state of the adapter to build up ref operations from scratch.BulkProcessQueuewill then re-add these to the queue
The adapters now need to define the following additional methods:
def add_ref(ref)def empty?def bulkdef process_bulk_errorsdef reset
The Elasticsearch adapter implements these methods according to how it is done in the BulkIndexer currently.
References
Please include cross links to any resources that are relevant to this MR. This will give reviewers and future readers helpful context to give an efficient review of the changes introduced.
- Handle bulk ref processing for Elasticsearch / ... (#507974 - closed)
- Execute refs from all queues async (#507973 - closed)
MR acceptance checklist
Please evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.
Screenshots or screen recordings
Screenshots are required for UI changes, and strongly recommended for all other merge requests.
| Before | After |
|---|---|
How to set up and validate locally
- Add an initializer with an Elastic adapter
ActiveContext.configure do |config|
config.enabled = true
config.logger = ::Gitlab::Elasticsearch::Logger.build
config.databases = {
es1: {
adapter: 'ActiveContext::Databases::Elasticsearch::Adapter',
prefix: 'gitlab_active_context',
options: ::Gitlab::CurrentSettings.elasticsearch_config
}
}
end
- Get a ref, e.g.
ref = ::Search::Elastic::References::WorkItem.new(1, "group_22")`
- Initialize a bulk processor
bulk_processor = ActiveContext::BulkProcessor.new
- Process the ref
bulk_processor.process(ref)
- You can see that the ref was added to the adapter because the total byte size is below the threshold
bulk_processor.adapter.bulk_size
bulk_processor.adapter.all_refs
- Flush the processor. Note that no failures are returned
bulk_processor.flush
- [Optional] Decrease the max bulk size
# frozen_string_literal: true
ActiveContext.configure do |config|
config.enabled = true
config.databases = {
es1: {
adapter: 'ActiveContext::Databases::Elasticsearch::Adapter',
prefix: 'gitlab_active_context',
options: ::Gitlab::CurrentSettings.elasticsearch_config.merge(max_bulk_size_bytes: 5)
}
}
end
- Test that adding any ref immediately executes the bulk process
ref = ::Search::Elastic::References::WorkItem.new(1, "group_22")
bulk_processor = ActiveContext::BulkProcessor.new
bulk_processor.process(ref)
=> I, [2025-01-22T13:51:48.324363 #97058] INFO -- : {"message"=>"bulk_submitted", "meta.indexing.bulk_count"=>1, "meta.indexing.errors_count"=>0}
- [Optional] Force an error during indexing and see that flush returns the failed refs. Add
data['non-existing'] = trueto thebuild_indexed_jsonfor the ref. This will cause a mapping exception.
ref = ::Search::Elastic::References::WorkItem.new(1, "group_22")
bulk_processor = ActiveContext::BulkProcessor.new
bulk_processor.process(ref)
bulk_processor.flush
=> I, [2025-01-22T13:56:03.470259 #2577] INFO -- : {"message"=>"bulk_submitted", "meta.indexing.bulk_count"=>1, "meta.indexing.errors_count"=>1}
=> [#<Search::Elastic::References::WorkItem:0x0000000169417818 @database_record=#<WorkItem id:1 toolbox/gitlab-smoke-tests#1>, @identifier=1, @routing="group_22">]
- [Optional] Add more refs to the process and some delete operations and verify that the change has been actioned in Elasticsearch
Related to #507974 (closed)