Skip to content

Make BulkImport NDJsonExtractor idempotent

Max Fan requested to merge 424954-make-workers-idempotent into master

What does this MR do and why?

This is part 1 in a 2 part series of MRs that will make BulkImports::PipelineWorker idempotent.

This will tackle the NDJSONExtractor, RestExtractor as well as set the base work in Pipeline::Runner.

In this MR, we introduce caching processed entries, so when workers are retried after stopping midway through, they won't create duplicate records.

Screenshots or screen recordings

n/a

Before After

How to set up and validate locally

This is quite hard to reproduce as we'll have to kill the worker in just the right way

  1. Check out this branch
  2. I find pipelineSchedules the easiest to test with. As there's no implicit duplication constraint. So go ahead and create 5 of those.
  3. Here, we'll manipulate the code so it fails the worker, and test it without our change
+++ b/lib/bulk_imports/pipeline/runner.rb
@@ -16,6 +16,10 @@ def run
 
         if extracted_data
           extracted_data.each_with_index do |entry, index|
+            if index == 1 && context.tracker.relation == "BulkImports::Projects::Pipelines::PipelineSchedulesPipeline"
+              raise BulkImports::RetryPipelineError.new("oh no", 1.second)
+            end
+
             transformers.each do |transformer|
               entry = run_pipeline_step(:transformer, transformer.class.name) do
                 transformer.transform(context, entry)
  1. Now, running direct transfer for groups should give you many pipelineSchedules
  2. Enable the Feature Flag Feature.enable(:bulk_import_idempotent_workers)
  3. Now running the same code will only generate 3 pipelineSchedules. As the duplicate ones are not created

MR acceptance checklist

This checklist encourages us to confirm any changes have been analyzed to reduce risks in quality, performance, reliability, security, and maintainability.

Related to #424954 (closed)

Edited by Max Fan

Merge request reports