Skip to content
Snippets Groups Projects
Commit abff40cd authored by Max Fan's avatar Max Fan :two: Committed by Alex Pooley
Browse files

Updating stale logic checking for FinishBatchedPipelineWorker

parent f977ea87
No related branches found
No related tags found
1 merge request!136621Updating stale logic checking for FinishBatchedPipelineWorker
......@@ -8,6 +8,8 @@ class BatchTracker < ApplicationRecord
validates :batch_number, presence: true, uniqueness: { scope: :tracker_id }
scope :by_last_updated, -> { order(updated_at: :desc) }
state_machine :status, initial: :created do
state :created, value: 0
state :started, value: 1
......
......@@ -24,7 +24,6 @@ class BulkImports::Tracker < ApplicationRecord
delegate :file_extraction_pipeline?, to: :pipeline_class
DEFAULT_PAGE_SIZE = 500
STALE_AFTER = 4.hours
scope :next_pipeline_trackers_for, -> (entity_id) {
entity_scope = where(bulk_import_entity_id: entity_id)
......@@ -88,8 +87,4 @@ def pipeline_class
transition [:created, :started] => :timeout
end
end
def stale?
created_at < STALE_AFTER.ago
end
end
......@@ -6,6 +6,7 @@ class FinishBatchedPipelineWorker
include ExceptionBacktrace
REQUEUE_DELAY = 5.seconds
STALE_AFTER = 4.hours
idempotent!
deduplicate :until_executing
......@@ -18,24 +19,21 @@ def perform(pipeline_tracker_id)
@tracker = Tracker.find(pipeline_tracker_id)
@context = ::BulkImports::Pipeline::Context.new(tracker)
return unless tracker.batched?
return unless tracker.started?
return unless tracker.batched? && tracker.started?
@sorted_batches = tracker.batches.by_last_updated
return fail_stale_tracker_and_batches if most_recent_batch_stale?
return re_enqueue if import_in_progress?
if tracker.stale?
logger.error(log_attributes(message: 'Tracker stale. Failing batches and tracker'))
tracker.batches.map(&:fail_op!)
tracker.fail_op!
else
tracker.pipeline_class.new(@context).on_finish
logger.info(log_attributes(message: 'Tracker finished'))
tracker.finish!
end
tracker.pipeline_class.new(@context).on_finish
logger.info(log_attributes(message: 'Tracker finished'))
tracker.finish!
end
private
attr_reader :tracker
attr_reader :tracker, :sorted_batches
def re_enqueue
with_context(bulk_import_entity_id: tracker.entity.id) do
......@@ -44,7 +42,19 @@ def re_enqueue
end
def import_in_progress?
tracker.batches.any? { |b| b.started? || b.created? }
sorted_batches.any? { |b| b.started? || b.created? }
end
def most_recent_batch_stale?
return false unless sorted_batches.any?
sorted_batches.first.updated_at < STALE_AFTER.ago
end
def fail_stale_tracker_and_batches
logger.error(log_attributes(message: 'Batch stale. Failing batches and tracker'))
sorted_batches.map(&:fail_op!)
tracker.fail_op!
end
def logger
......
......@@ -33,6 +33,8 @@ def on_finish; end
)
end
let!(:batch_1) { create(:bulk_import_batch_tracker, :finished, tracker: pipeline_tracker) }
subject(:worker) { described_class.new }
describe '#perform' do
......@@ -45,27 +47,27 @@ def on_finish; end
end
end
context 'when import is in progress' do
it 'marks the tracker as finished' do
expect_next_instance_of(BulkImports::Logger) do |logger|
expect(logger).to receive(:info).with(
a_hash_including('message' => 'Tracker finished')
)
end
expect { subject.perform(pipeline_tracker.id) }
.to change { pipeline_tracker.reload.finished? }
.from(false).to(true)
it 'marks the tracker as finished' do
expect_next_instance_of(BulkImports::Logger) do |logger|
expect(logger).to receive(:info).with(
a_hash_including('message' => 'Tracker finished')
)
end
it "calls the pipeline's `#on_finish`" do
expect_next_instance_of(pipeline_class) do |pipeline|
expect(pipeline).to receive(:on_finish)
end
expect { subject.perform(pipeline_tracker.id) }
.to change { pipeline_tracker.reload.finished? }
.from(false).to(true)
end
subject.perform(pipeline_tracker.id)
it "calls the pipeline's `#on_finish`" do
expect_next_instance_of(pipeline_class) do |pipeline|
expect(pipeline).to receive(:on_finish)
end
subject.perform(pipeline_tracker.id)
end
context 'when import is in progress' do
it 're-enqueues for any started batches' do
create(:bulk_import_batch_tracker, :started, tracker: pipeline_tracker)
......@@ -88,14 +90,14 @@ def on_finish; end
end
context 'when pipeline tracker is stale' do
let(:pipeline_tracker) { create(:bulk_import_tracker, :started, :batched, :stale, entity: entity) }
before do
batch_1.update!(updated_at: 5.hours.ago)
end
it 'fails pipeline tracker and its batches' do
create(:bulk_import_batch_tracker, :finished, tracker: pipeline_tracker)
expect_next_instance_of(BulkImports::Logger) do |logger|
expect(logger).to receive(:error).with(
a_hash_including('message' => 'Tracker stale. Failing batches and tracker')
a_hash_including('message' => 'Batch stale. Failing batches and tracker')
)
end
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment