Make BulkImport NDJsonExtractor idempotent
What does this MR do and why?
This is part 1 in a 2 part series of MRs that will make BulkImports::PipelineWorker idempotent.
This will tackle the NDJSONExtractor, RestExtractor as well as set the base work in Pipeline::Runner.
In this MR, we introduce caching processed entries, so when workers are retried after stopping midway through, they won't create duplicate records.
Screenshots or screen recordings
n/a
Before | After |
---|---|
How to set up and validate locally
This is quite hard to reproduce as we'll have to kill the worker in just the right way
- Check out this branch
- I find pipelineSchedules the easiest to test with. As there's no implicit duplication constraint. So go ahead and create 5 of those.
- Here, we'll manipulate the code so it fails the worker, and test it without our change
+++ b/lib/bulk_imports/pipeline/runner.rb
@@ -16,6 +16,10 @@ def run
if extracted_data
extracted_data.each_with_index do |entry, index|
+ if index == 1 && context.tracker.relation == "BulkImports::Projects::Pipelines::PipelineSchedulesPipeline"
+ raise BulkImports::RetryPipelineError.new("oh no", 1.second)
+ end
+
transformers.each do |transformer|
entry = run_pipeline_step(:transformer, transformer.class.name) do
transformer.transform(context, entry)
- Now, running direct transfer for groups should give you many pipelineSchedules
- Enable the Feature Flag
Feature.enable(:bulk_import_idempotent_workers)
- Now running the same code will only generate 3 pipelineSchedules. As the duplicate ones are not created
MR acceptance checklist
This checklist encourages us to confirm any changes have been analyzed to reduce risks in quality, performance, reliability, security, and maintainability.
-
I have evaluated the MR acceptance checklist for this MR.
Related to #424954 (closed)
Edited by Max Fan