Skip to content

Fix Direct Transfer stage incorrect execution order due to failures

What does this MR do and why?

Updates pipeline runner not to mark pipeline trackers as failed when a StandardError exception occurs to fix the stage execution order.

BulkImports::EntityWorker advances to the next stage when there are no pipeline trackers running for the previous stage, in other works no pipeline tracker with status enqueued or started. This means we should only mark the tracker's status as a final status when the pipeline stops processing records, otherwise, pipelines from the previous and next stage could be executed at the same time.

Before this change, the tracker status was set to failed when an exception occurred which sometimes could cause the problem described above as the tracker would be assigned a final status and the pipeline would continue to run.

For example, during the migration of merge requests an exception could occur, the pipeline tracker marked as failed and the pipeline continued to run and process the remaining merge requests. And because EntityWorker would start the next stage, CI pipelines would run concurrently and cause failures.

Also marking trackers as failed can cause batched migrations to skip batches as BulkImports::PipelineBatchWorker skip them if the tracker has a failed status.

Related to: Direct Transfer - Stages executed out of order (#425716 - closed)

Screenshots or screen recordings

Screenshots are required for UI changes, and strongly recommended for all other merge requests.

Before After

How to set up and validate locally

To test, we must raise an error while a pipeline executes. The easiest way is to call raise in the runner

diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb
index 2300e78a719e..aca5dcce5e84 100644
--- a/lib/bulk_imports/pipeline/runner.rb
+++ b/lib/bulk_imports/pipeline/runner.rb
@@ -26,6 +26,9 @@ def run
             end

             run_pipeline_step(:loader, loader.class.name) do
+              raise if index == 0 && instance_of?(BulkImports::Projects::Pipelines::IssuesPipeline)
+              raise if index == 0 && instance_of?(BulkImports::Projects::Pipelines::MergeRequestsPipeline)
+
               loader.load(context, entry)
             end
  1. Enable Direct Transfer following these steps
  2. Perform a Direct Transfer migration
  3. verify if the stages are executed in order during the migration by checking the bulk_import_trackers table.

MR acceptance checklist

This checklist encourages us to confirm any changes have been analyzed to reduce risks in quality, performance, reliability, security, and maintainability.

Edited by Rodrigo Tomonari

Merge request reports