ActiveContext Reference class and processing

What does this MR do and why?

Creates a new ActiveContext::Reference class for ActiveContext per this discussion and updates BulkProcessQueue to use the ref class.

The Reference class:

  • Mostly mimics Search::Elastic::Reference
  • Tries to find reference class inheriting from ActiveContext::Reference, otherwise falls back to Search::Elastic::Reference

BulkProcessQueue now:

  • deserializes refs
  • preloads references
  • calls the BulkProcessor to process each ref
  • flushes the BulkProcessor to return failures
  • re-tracks failures

A simple example ref class looks like:

# frozen_string_literal: true

module Ai
  module Context
    module References
      class MergeRequest < ::ActiveContext::Reference
        def self.serialize(record)
          new(record.id).serialize
        end

        attr_reader :identifier

        def initialize(identifier)
          @identifier = identifier.to_i
        end

        def serialize
          self.class.join_delimited([identifier].compact)
        end

        def as_indexed_json
          {
            id: identifier
          }
        end

        def operation
          :index
        end

        def index_name
          'ai_context_merge_requests'
        end
      end
    end
  end
end

References

Please include cross links to any resources that are relevant to this MR. This will give reviewers and future readers helpful context to give an efficient review of the changes introduced.

MR acceptance checklist

Please evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.

How to set up and validate locally

  1. Configure the initializer to use elasticsearch
  2. Create a queue class
  3. Create a reference class, defining the necessary methods
  4. Create a collection class and use the new ref class, e.g.
# frozen_string_literal: true

module Ai
  module Context
    module Collections
      class MergeRequest
        include ::ActiveContext::Concerns::Collection

        def self.queue
          Queues::MergeRequest
        end

        def references
          [References::MergeRequest.serialize(object)]
        end
      end
    end
  end
end
  1. Track a reference: Ai::Context::Collections::MergeRequest.track!(MergeRequest.first)
  2. View the tracked items: Ai::Context::Queues::MergeRequest.queued_items
  3. Process the queue and shard: ActiveContext::BulkProcessQueue.process!(Ai::Context::Queues::MergeRequest, 0)
  4. Note that the document was indexed into Elasticsearch in the index specified by the index_name
  5. You can also view the logs:
{"severity":"INFO","time":"2025-01-29T14:27:52.157Z","queue":"Ai::Context::Queues::MergeRequest","message":"bulk_indexing_start","meta.indexing.redis_set":"ai_context_queues:{merge_request}:0:zset","meta.indexing.records_count":1,"meta.indexing.first_score":7.0,"meta.indexing.last_score":7.0}
{"severity":"INFO","time":"2025-01-29T14:27:52.177Z","message":"bulk_submitted","meta.indexing.bulk_count":1,"meta.indexing.errors_count":0}
{"severity":"INFO","time":"2025-01-29T14:27:52.177Z","class":"ActiveContext::BulkProcessQueue","message":"bulk_indexer_flushed","meta.indexing.search_flushing_duration_s":0.020589000021573156}
{"severity":"INFO","time":"2025-01-29T14:27:52.178Z","class":"ActiveContext::BulkProcessQueue","message":"bulk_indexing_end","meta.indexing.redis_set":"ai_context_queues:{merge_request}:0:zset","meta.indexing.records_count":1,"meta.indexing.first_score":7.0,"meta.indexing.last_score":7.0,"meta.indexing.failures_count":0,"meta.indexing.bulk_execution_duration_s":0.03743999998550862}

Optional: force the index process to fail by creating a mapping exception, or similar.

  1. Ai::Context::Collections::MergeRequest.track!(MergeRequest.first)
  2. Ai::Context::Queues::MergeRequest.queued_items => {0=>[["Ai::Context::References::MergeRequest|1", 9.0]]}
  3. ActiveContext::BulkProcessQueue.process!(Ai::Context::Queues::MergeRequest, 0) => [1, 1] Screenshot_2025-01-29_at_16.34.30
  4. Ai::Context::Queues::MergeRequest.queued_items => {0=>[["Ai::Context::References::MergeRequest|1", 10.0]]}

Related to #507973 (closed)

Edited by Madelein van Niekerk

Merge request reports

Loading