ActiveContext Reference class and processing
What does this MR do and why?
Creates a new ActiveContext::Reference class for ActiveContext per this discussion and updates BulkProcessQueue to use the ref class.
The Reference class:
- Mostly mimics
Search::Elastic::Reference - Tries to find reference class inheriting from
ActiveContext::Reference, otherwise falls back toSearch::Elastic::Reference
BulkProcessQueue now:
- deserializes refs
- preloads references
- calls the
BulkProcessorto process each ref - flushes the
BulkProcessorto return failures - re-tracks failures
A simple example ref class looks like:
# frozen_string_literal: true
module Ai
module Context
module References
class MergeRequest < ::ActiveContext::Reference
def self.serialize(record)
new(record.id).serialize
end
attr_reader :identifier
def initialize(identifier)
@identifier = identifier.to_i
end
def serialize
self.class.join_delimited([identifier].compact)
end
def as_indexed_json
{
id: identifier
}
end
def operation
:index
end
def index_name
'ai_context_merge_requests'
end
end
end
end
end
References
Please include cross links to any resources that are relevant to this MR. This will give reviewers and future readers helpful context to give an efficient review of the changes introduced.
- Execute refs from all queues async (#507973 - closed)
- Handle bulk ref processing for Elasticsearch / ... (#507974 - closed)
MR acceptance checklist
Please evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.
How to set up and validate locally
- Configure the initializer to use elasticsearch
- Create a queue class
- Create a reference class, defining the necessary methods
- Create a collection class and use the new ref class, e.g.
# frozen_string_literal: true
module Ai
module Context
module Collections
class MergeRequest
include ::ActiveContext::Concerns::Collection
def self.queue
Queues::MergeRequest
end
def references
[References::MergeRequest.serialize(object)]
end
end
end
end
end
- Track a reference:
Ai::Context::Collections::MergeRequest.track!(MergeRequest.first) - View the tracked items:
Ai::Context::Queues::MergeRequest.queued_items - Process the queue and shard:
ActiveContext::BulkProcessQueue.process!(Ai::Context::Queues::MergeRequest, 0) - Note that the document was indexed into Elasticsearch in the index specified by the
index_name - You can also view the logs:
{"severity":"INFO","time":"2025-01-29T14:27:52.157Z","queue":"Ai::Context::Queues::MergeRequest","message":"bulk_indexing_start","meta.indexing.redis_set":"ai_context_queues:{merge_request}:0:zset","meta.indexing.records_count":1,"meta.indexing.first_score":7.0,"meta.indexing.last_score":7.0}
{"severity":"INFO","time":"2025-01-29T14:27:52.177Z","message":"bulk_submitted","meta.indexing.bulk_count":1,"meta.indexing.errors_count":0}
{"severity":"INFO","time":"2025-01-29T14:27:52.177Z","class":"ActiveContext::BulkProcessQueue","message":"bulk_indexer_flushed","meta.indexing.search_flushing_duration_s":0.020589000021573156}
{"severity":"INFO","time":"2025-01-29T14:27:52.178Z","class":"ActiveContext::BulkProcessQueue","message":"bulk_indexing_end","meta.indexing.redis_set":"ai_context_queues:{merge_request}:0:zset","meta.indexing.records_count":1,"meta.indexing.first_score":7.0,"meta.indexing.last_score":7.0,"meta.indexing.failures_count":0,"meta.indexing.bulk_execution_duration_s":0.03743999998550862}
Optional: force the index process to fail by creating a mapping exception, or similar.
Ai::Context::Collections::MergeRequest.track!(MergeRequest.first)-
Ai::Context::Queues::MergeRequest.queued_items=>{0=>[["Ai::Context::References::MergeRequest|1", 9.0]]} -
ActiveContext::BulkProcessQueue.process!(Ai::Context::Queues::MergeRequest, 0)=>[1, 1]
-
Ai::Context::Queues::MergeRequest.queued_items=>{0=>[["Ai::Context::References::MergeRequest|1", 10.0]]}
Related to #507973 (closed)
Edited by Madelein van Niekerk
