Skip to content

Create a new sidekiq deduplication strategy that accumulates arguments

During the review for !151541 (merged) it was noted that a more complete fix for the issue at hand would be a new sidekiq job de-duplication strategy (thread)


The full details of the scenario can be read in #452005 (closed)

the short version is that:

  • We have a job that where we need the pipeline_id for the pipeline that scheduled the job
  • We want to de-duplicate based on project_id to prevent the issues described in the linked issue/MR

We are currently getting around this in a very roundabout way:

  1. pipeline saves its id in the redis cache, and schedules the job passing only project_id1
  2. project_id works with the existing de-duplication strategy2
  3. when the job runs, it obtains the needed pipeline_id from the cache3

Adding the ability to de-duplicate by [project_id, pipeline_id] might help, but has the following complications described in this comment:

If we accept 2 arguments for this job(project_id, pipeline_id) and then deduplicate the job just by project_id, this has the following problem;

  • Pipeline A completes and we schedule job A for [project, pipeline A]
  • Pipeline B completes and we try to schedule job B for [project, pipeline B]
  • Since job A is not started yet, we drop job B.

This can be a huge problem if the queue latency grows.

With the current implementation, we will always process the latest completing pipeline(there still can be race conditions though) no matter what the queue latency is.

needs investigation needs refining Implementation Plan

If we want:

  1. only one job scheduled per project_id
  2. to run the job for every pipeline_id (not drop any pipelines as described in the quote above)

Maybe we should investigate if it is possible to have some sort of "accumulator" argument as a job param.

so:

MyWorker.perform_async(project_id: project.id, accumulator: pipeline1.id)
MyWorker.perform_async(project_id: project.id, accumulator: pipeline2.id)
MyWorker.perform_async(project_id: project.id, accumulator: pipeline3.id)

would result in one job being run with the following arguments:

class MyWorker
  include ApplicationWorker
  deduplicate :until_executing, with_accumulator: :pipeline_ids

  def perform(project_id: 1, pipeline_ids: [])
    # project_id == project.id
    # pipeline_ids is [pipeline1.id, pipeline2.id, pipeline3.id]

related to: gitlab-com/gl-infra/scalability#195


  1. https://gitlab.com/gitlab-org/gitlab/-/blob/master/ee/app/services/security/store_scans_service.rb?ref_type=heads#L63-68

  2. https://gitlab.com/gitlab-org/gitlab/-/blob/master/ee/app/workers/security/store_security_reports_by_project_worker.rb?ref_type=heads#L17-18

  3. https://gitlab.com/gitlab-org/gitlab/-/blob/master/ee/app/workers/security/store_security_reports_by_project_worker.rb?ref_type=heads#L42-49

Edited by Michael Becker