Add bulk processor for ActiveContext and handle Elastic operations

What does this MR do and why?

Introduce bulk processor for ActiveContext and handle Elastic operations.

The BulkProcessor will be called from the BulkProcessQueue after we get refs from the queue to actually process them and re-add any failures. It mimics the BulkIndexer except that now we offload much of the handling to the adapters because they index and delete differently.

BulkProcessQueue will call:

  • process(ref) to build up ref operations for bulk. If the adapter returns true here, it will force a bulk operation - for Elaticsearch that will happen if the operations exceed the configured bulk size
  • flush to call bulk on the built-up operations, return any failures and reset the state of the adapter to build up ref operations from scratch. BulkProcessQueue will then re-add these to the queue

The adapters now need to define the following additional methods:

  • def add_ref(ref)
  • def empty?
  • def bulk
  • def process_bulk_errors
  • def reset

The Elasticsearch adapter implements these methods according to how it is done in the BulkIndexer currently.

References

Please include cross links to any resources that are relevant to this MR. This will give reviewers and future readers helpful context to give an efficient review of the changes introduced.

MR acceptance checklist

Please evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.

Screenshots or screen recordings

Screenshots are required for UI changes, and strongly recommended for all other merge requests.

Before After

How to set up and validate locally

  • Add an initializer with an Elastic adapter
ActiveContext.configure do |config|
  config.enabled = true
  config.logger = ::Gitlab::Elasticsearch::Logger.build

  config.databases = {
    es1: {
      adapter: 'ActiveContext::Databases::Elasticsearch::Adapter',
      prefix: 'gitlab_active_context',
      options: ::Gitlab::CurrentSettings.elasticsearch_config
    }
  }
end
  • Get a ref, e.g.
ref = ::Search::Elastic::References::WorkItem.new(1, "group_22")`
  • Initialize a bulk processor
bulk_processor = ActiveContext::BulkProcessor.new
  • Process the ref
bulk_processor.process(ref)
  • You can see that the ref was added to the adapter because the total byte size is below the threshold
bulk_processor.adapter.bulk_size
bulk_processor.adapter.all_refs
  • Flush the processor. Note that no failures are returned
bulk_processor.flush
  • [Optional] Decrease the max bulk size
# frozen_string_literal: true

ActiveContext.configure do |config|
  config.enabled = true
  config.databases = {
    es1: {
      adapter: 'ActiveContext::Databases::Elasticsearch::Adapter',
      prefix: 'gitlab_active_context',
      options: ::Gitlab::CurrentSettings.elasticsearch_config.merge(max_bulk_size_bytes: 5)
    }
  }
end
  • Test that adding any ref immediately executes the bulk process
ref = ::Search::Elastic::References::WorkItem.new(1, "group_22")
bulk_processor = ActiveContext::BulkProcessor.new
bulk_processor.process(ref)
=> I, [2025-01-22T13:51:48.324363 #97058]  INFO -- : {"message"=>"bulk_submitted", "meta.indexing.bulk_count"=>1, "meta.indexing.errors_count"=>0}
  • [Optional] Force an error during indexing and see that flush returns the failed refs. Add data['non-existing'] = true to the build_indexed_json for the ref. This will cause a mapping exception.
ref = ::Search::Elastic::References::WorkItem.new(1, "group_22")
bulk_processor = ActiveContext::BulkProcessor.new
bulk_processor.process(ref)
bulk_processor.flush
=> I, [2025-01-22T13:56:03.470259 #2577]  INFO -- : {"message"=>"bulk_submitted", "meta.indexing.bulk_count"=>1, "meta.indexing.errors_count"=>1}
=> [#<Search::Elastic::References::WorkItem:0x0000000169417818 @database_record=#<WorkItem id:1 toolbox/gitlab-smoke-tests#1>, @identifier=1, @routing="group_22">]
  • [Optional] Add more refs to the process and some delete operations and verify that the change has been actioned in Elasticsearch

Related to #507974 (closed)

Edited by Madelein van Niekerk

Merge request reports

Loading