Skip to content
Snippets Groups Projects
Commit d5d91112 authored by Madelein van Niekerk's avatar Madelein van Niekerk
Browse files

ActiveContext add preprocessors for references

Changelog: added
parent 72f90fbb
No related branches found
No related tags found
2 merge requests!181671ActiveContext: Add preprocessing for references,!180727Resolve "Extend job archival mechanism to the whole pipeline"
......@@ -48,7 +48,7 @@ def process(redis)
refs = deserialize_all(specs_buffer)
Reference.preload(refs).each do |ref| # rubocop: disable Rails/FindEach -- not ActiveRecord
Reference.preprocess_references(refs).each do |ref|
bulk_processor.process(ref)
end
......
# frozen_string_literal: true
module ActiveContext
module Concerns
module Preprocessor
def preprocessors
@preprocessors ||= []
end
def add_preprocessor(name, &block)
preprocessors << { name: name, block: block }
end
def preprocess(refs)
refs_by_class = refs.group_by(&:class)
refs_by_class.flat_map do |klass, class_refs|
klass.preprocessors.reduce(class_refs) do |processed_refs, preprocessor|
preprocessor[:block].call(processed_refs)
end
end
end
end
end
end
......@@ -3,9 +3,9 @@
module ActiveContext
class Reference
extend Concerns::ReferenceUtils
extend Concerns::Preprocessor
DELIMITER = '|'
PRELOAD_BATCH_SIZE = 1_000
class << self
def deserialize(string)
......@@ -22,27 +22,17 @@ def instantiate(string)
new(*deserialize_string(string))
end
def preload(refs)
refs.group_by(&:class).each do |klass, class_refs|
class_refs.each_slice(PRELOAD_BATCH_SIZE) do |group_slice|
klass.preload_refs(group_slice)
end
end
refs
end
def serialize
raise NotImplementedError
end
def preload_refs(refs)
refs
end
def klass
name.demodulize
end
def preprocess_references(refs)
preprocess(refs)
end
end
def klass
......
......@@ -19,7 +19,7 @@
describe '#process' do
let(:specs) { [['spec1', 1], ['spec2', 2]] }
let(:reference_class) { class_double("ActiveContext::Reference", preload_refs: nil).as_stubbed_const }
let(:reference_class) { class_double("ActiveContext::Reference").as_stubbed_const }
let(:references) { [instance_double('ActiveContext::Reference'), instance_double('ActiveContext::Reference')] }
before do
......@@ -29,8 +29,7 @@
allow(bulk_process_queue).to receive(:deserialize_all).and_return(references)
allow(redis).to receive(:zremrangebyscore)
allow(references).to receive(:group_by).and_return({ reference_class => references })
allow(reference_class).to receive(:preload_refs)
allow(ActiveContext::Reference).to receive(:preload).and_return(references)
allow(reference_class).to receive(:preprocess_references).and_return(references)
end
it 'processes specs and flushes the bulk processor' do
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment