ActiveContext OpenSearch indexer
What does this MR do and why?
Builds on !178705 (merged) to handle indexing for OpenSearch which is just a copy of Elasticsearch.
References
Please include cross links to any resources that are relevant to this MR. This will give reviewers and future readers helpful context to give an efficient review of the changes introduced.
MR acceptance checklist
Please evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.
How to set up and validate locally
- Add an initializer with an Opensearch adapter
ActiveContext.configure do |config|
config.enabled = true
config.logger = ::Gitlab::Elasticsearch::Logger.build
config.databases = {
es1: {
adapter: 'ActiveContext::Databases::Opensearch::Adapter',
prefix: 'gitlab_active_context',
options: ::Gitlab::CurrentSettings.elasticsearch_config
}
}
end
- Get a ref, e.g.
ref = ::Search::Elastic::References::WorkItem.new(1, "group_22")`
- Initialize a bulk processor
bulk_processor = ActiveContext::BulkProcessor.new
- Process the ref
bulk_processor.process(ref)
- You can see that the ref was added to the adapter because the total byte size is below the threshold
bulk_processor.adapter.all_refs
- Flush the processor. Note that no failures are returned
bulk_processor.flush
- [Optional] Decrease the max bulk size
# frozen_string_literal: true
ActiveContext.configure do |config|
config.enabled = true
config.databases = {
es1: {
adapter: 'ActiveContext::Databases::Opensearch::Adapter',
prefix: 'gitlab_active_context',
options: ::Gitlab::CurrentSettings.elasticsearch_config.merge(max_bulk_size_bytes: 5)
}
}
end
- Test that adding any ref immediately executes the bulk process
ref = ::Search::Elastic::References::WorkItem.new(1, "group_22")
bulk_processor = ActiveContext::BulkProcessor.new
bulk_processor.process(ref)
=> I, [2025-01-22T13:51:48.324363 #97058] INFO -- : {"message"=>"bulk_submitted", "meta.indexing.bulk_count"=>1, "meta.indexing.errors_count"=>0}
- [Optional] Force an error during indexing and see that flush returns the failed refs. Add
data['non-existing'] = trueto thebuild_indexed_jsonfor the ref. This will cause a mapping exception.
ref = ::Search::Elastic::References::WorkItem.new(1, "group_22")
bulk_processor = ActiveContext::BulkProcessor.new
bulk_processor.process(ref)
bulk_processor.flush
=> I, [2025-01-22T13:56:03.470259 #2577] INFO -- : {"message"=>"bulk_submitted", "meta.indexing.bulk_count"=>1, "meta.indexing.errors_count"=>1}
=> [#<Search::Elastic::References::WorkItem:0x0000000169417818 @database_record=#<WorkItem id:1 toolbox/gitlab-smoke-tests#1>, @identifier=1, @routing="group_22">]
- [Optional] Add more refs to the process and some delete operations and verify that the change has been actioned in OpenSearch.
Related to #507974 (closed)
Edited by Madelein van Niekerk