Skip to content

Draft: [Spike] BulkImports: Run migration in stages

Kassio Borges requested to merge georgekoltsov+kassio/bulkimports-stages into master

What does this MR do?

BulkImport background jobs now runs in stages. Each stage run in a serial mode (one after other) while the pipelines of a stage runs in parallel to each other.

Simplified flow
This is a simplified flow of the current pipelines in place.
The stages run sequentially, but the pipelines within the same stage runs in parallel.
flowchart LR
    subgraph stage0
        StartS0["Start"] --> Group --> FinishS0["Finished"]

        S0Note["All the followin <br> Stages depends <br> on Group"]
        S0Note:::note
    end
    subgraph stage1
        StartS1["Start"] --> Subgroup --> FinishSubgroup["Finished"]
        StartS1 --> Members --> FinishMembers["Finished"]
        StartS1 --> Labels --> FinishLabels["Finished"]
    end
    subgraph stage2
        StartS2["Start"] --> Epic --> FinishEpic["Finished"]

        S2Note["Epics depends <br> on Labels"]
        S2Note:::note
    end
    subgraph stage3
        StartS3["Start"] --> EpicAwardEmoji --> FinishEpicAwardEmoji["Finish"]
        StartS3 --> EpicEvents --> FinishEpicEvents["Finish"]

        S3Note["Epics realtions <br> depends on Epics"]
        S3Note:::note
    end

    Start --> stage0
    stage0 --> stage1
    stage1 --> stage2
    stage2 --> stage3

    classDef note fill:#ffd,stroke-width:0;
Sequence diagram

Large diagrams doesn't load by default, there's a button bellow the code to "Display"

sequenceDiagram
    User ->>+ Controller: Import Group1 and Group 2
    Controller ->>+ BulkImportService: Import Group1 and Group 2
    BulkImportService ->> BulkImportService: Creates BulkImport and <br> one entity for each Group
    BulkImportService -->> BulkImportWorker: perform_async
    BulkImportService ->>- User: Import Scheduled

    par
        BulkImportWorker -->> BulkImportEntityWorker: perform_async (Group1 Entity)
    and
        BulkImportWorker -->> BulkImportEntityWorker: perform_async (Group2 Entity)
    end

    BulkImportWorker -->> BulkImportWorker: perform_async
    Note over BulkImportWorker, BulkImportWorker: Check if there's more <br> Entities to process <br> (subgroups)

    BulkImportEntityWorker ->> BulkImportGroupImporter: execute(entity)
    BulkImportGroupImporter ->> BulkImportGroupImporter: Creates PipelineStatuses
    BulkImportGroupImporter -->> BulkImportStageWorker: perform_async(entity)
    Note over BulkImportGroupImporter, BulkImportStageWorker: Fetch all the pipelines from <br>the next stage to run

    par For Each pipeline from stage
    BulkImportStageWorker -->> BulkImportsPipelineWorker: perform_async(pipeline, entity)
    and
    BulkImportStageWorker -->> BulkImportsPipelineWorker: perform_async(pipeline, entity)
    end

    BulkImportStageWorker -->> BulkImportStageWorker: perform_async(entity, stage)
    Note over BulkImportStageWorker, BulkImportStageWorker: Check if all the pipelines from stage finished<br>If so start next stage, otherwise wait


    BulkImportsPipelineWorker ->>+ Pipeline: run
    Pipeline ->>- BulkImportsPipelineWorker: 
    Note over BulkImportsPipelineWorker, Pipeline: PipelineStatus is update <br> (start, finished/failed)

    alt All pipelines finished
         par For Each pipeline of the next stage
            BulkImportStageWorker -->> BulkImportsPipelineWorker: perform_async(pipeline, entity)
        and
            BulkImportStageWorker -->> BulkImportsPipelineWorker: perform_async(pipeline, entity)
        end
    else One or more pipelines from stage still running
        BulkImportStageWorker -->> BulkImportStageWorker: perform_async(entity, stage)
    end


    participant BulkImportEntityWorker as BulkImports::EntityWorker
    participant BulkImportGroupImporter as BulkImports::Importers::GroupImporter
    participant BulkImportStageWorker as BulkImports::StageWorker
    participant BulkImportsPipelineWorker as BulkImports::PipelineWorker

Approach

To keep track of the jobs the BulkImports::EntityPipelineStatus model was created.


  • Pros of a more concurrent model (smaller background jobs)
    • Easier to handle retries (#271233 (closed))
    • Easier to handle rate limits (#262024 (closed))
      • On a follow-up we could handle each request page, of the same relation, in a different background job, which would improve the rate limit handling
    • Single-responsibility principle
    • More parallelization of import parts, which will result in a faster migration

Related to:

Why keep the status in the database

These status in the database will be in the future presented to the user to provide a better feedback and sense of progress of large migrations. More information in: &2771

Screenshots (strongly suggested)

  • What is this?
    • Live update of a database query to follow the progress of a migration
  • What to see here:
    • All the pipelines of the same stage starts together, status column change from created to started
    • The next stage only starts when all the pipeline status of the last stage is either finished or failed
    • When the subgroup pipeline is finished more entities are created

tracker-status_speeder

Does this MR meet the acceptance criteria?

Conformity

Availability and Testing

Security

If this MR contains changes to processing or storing of credentials or tokens, authorization and authentication methods and other items described in the security review guidelines:

  • Label as security and @ mention @gitlab-com/gl-security/appsec
  • The MR includes necessary changes to maintain consistency between UI, API, email, or other methods
  • Security reports checked/validated by a reviewer from the AppSec team
Edited by Kassio Borges

Merge request reports