The middleware from #165 (closed) should be extended to drop jobs if the worker is marked as idempotent and a job with the same identity is already enqueued.
It should be possible to disable dropping the jobs using a feature flag.
Retries should run, even if the there was a duplicate already enqueued.
Original proposal:
# Introduction
During the "October 13 / Sunday night Crypto Miner Limit Takedown" incident, (RCA https://gitlab.com/gitlab-com/gl-infra/infrastructure/issues/8153) a very poorly performing SQL query led to two queues pipeline_processing:stage_update and pipeline_cache:expire_job_cache congest with over one hundred thousand jobs in each queue.
Since the SQL query in these jobs was leading to CPU saturation on the Postgres primary, and each query was taking several seconds to complete, the backload quickly built up.
After a while, it became clear that the arguments for the job in this queue were in fact identical, and since these jobs are idempotent (ie, calling the job more than once has the same effect as calling it a single time) most of the processing happening was completely wasteful.
Proposal
Step 1: Explicitly mark jobs as being idempotent. This could follow the pattern established in the Sidekiq attribution work being carried out in gitlab-org/gitlab!18462 (merged). As @reprazent has pointed out, Sidekiq workers should all be idempotent, but in reality, GitLab's are not.
We should start with pipeline_cache:expire_job_cache as it's already idempotent and was mentioned in the incident.
Step 2: add a client side middleware that checks whether a job is idempotent, and has identical arguments/class to an already enqueued job, and if it is enqueued, not running, drop the new job.
Having this in place would have reduced the backlog of 100k+ jobs down to a handful.
This approach has a built-in negative feedback loop, which helps make our queueing infrastructure less fragile: when an idempotent job starts taking longer and backing up a queue, there is a higher likelihood of it being dropped as a duplicate. This has an effect of dampening down poorly performing idempotent jobs.
Details
Duplicate check mechanism
Sidekiq implements its queues as redis lists (unsurprisingly!). Checking the queues for duplicated would be an O(n) operation and too expensive for production usage. Instead, I propose a client-side and server-side middleware combination, with the following roles. This approach is O(1) so should not have a big impact on production performance:
On client-side job creation: (client-middleware)
Middleware checks whether a job is marked as idempotent, if not it passes through as normal
The middleware calculates the "idempotency string" for the job. This containings the worker class and the arguments for the job.
The middleware calculates the "idempotency hash" for the job. This is simply a hash of the "idempotency string" using SHA256 or other.
The middleware will use this hash as an address of a Redis hash. Using a hash instead of the full idempotency string keeps the keys short while avoiding the (incredibly unlikely, but possible) chance of hash collisions.
The middleware queuries the Redis hash : HEXISTS gitlab:sidekiq:duplicate:<queue_name>:<idempotency hash> <idempotency string>
If the result exists, the middleware silently drops the job and returns to the client
If the result does not exist, the middleware, continues with the following steps:
Adds the key to the hash: HSET gitlab:sidekiq:duplicate:<queue_name>:<idempotency hash> <idempotency string> 1
Sets a TTL on they hash to one day (this is a safety clean-up mechanism which would only be required during incidents): EXPIRE gitlab:sidekiq:duplicate:<queue_name>:<idempotency hash> <idempotency string> 86400
The middleware now continues as normal
On the server-side: (server-middleware)
First three steps are the same as the client side:
Middleware checks whether a job is marked as idempotent, if not it passes through as normal
The middleware calculates the "idempotency string" for the job.
The middleware calculates the "idempotency hash" for the job.
The middleware removes the Redis hash : HDEL gitlab:sidekiq:duplicate:<queue_name>:<idempotency hash> <idempotency string>
The middleware now continues as normal
This approach will drop duplicates but maintain performance. For some workers such as pipeline_processing:stage_update, and especially during times of bad performance (due to pg, redis or Gitaly issue), this could lead to a big reduction in sidekiq activity, helping reduce the impact of the incident.
Guarantees
It's important that this middleware is not treated by developers as a guarantee that the job will be dropped. There are race conditions that may lead to jobs being added twice (as it would be before the introduction of this change), but these should be rare. Additionally, we would need to consider how to handle retries. The two options would be:
Always retry, simple to implement, but could lead to some duplicates
Check whether a job was added while another instance was running and drop the retry in favour of the more recently queued job. This has some observability issues (such as retries effectively having different jids and correlation_ids, which is not ideal).
Personally, for retries I think we should just allow duplicates.
The only guarantee we should make is that the change will never drop a job unless another job with matching identity is queued up (but not running).
Andrew Newdigatechanged title from Sidekiq Client Middleware/Mixin for dropping duplicate jobs from a queue to Sidekiq Client Middleware/Mixin for dropping duplicate idempotent jobs from a queue
changed title from Sidekiq Client Middleware/Mixin for dropping duplicate jobs from a queue to Sidekiq Client Middleware/Mixin for dropping duplicate idempotent jobs from a queue
Andrew Newdigatechanged title from Sidekiq Client Middleware/Mixin for dropping duplicate idempotent jobs from a queue to Sidekiq middleware for dropping duplicate idempotent jobs from a queue
changed title from Sidekiq Client Middleware/Mixin for dropping duplicate idempotent jobs from a queue to Sidekiq middleware for dropping duplicate idempotent jobs from a queue
For Pipelines specifically we likely need more targeted approach: https://gitlab.com/gitlab-org/gitlab-ce/issues/65538. It is described there, this is needed to perform efficient processing of pipelines, especially with DAG.
Currently, we use single theme to drop duplicates:
remove duplicated concurrent, this is achieved with ExclusiveLease.
We will not be able to use sidekiq-unique-jobs, as this will clash with reliable-fetch.
Dropping duplicates should be based on desired outcome: https://github.com/mhenrixon/sidekiq-unique-jobs#locks. This very well describes the different locking mechanisms. I wish we could adapt that gem, it would be likely simplest to have. The idempotent seems to model the :until_expired.
As far as I understand, the gem maintains its own locks on queuing and starting and stopping, similar to what andrew is describing in his deduplication proposal. But I'm not familiar with how reliable fetch hooks into sidekiq.
Dropping duplicates should be based on desired outcome: https://github.com/mhenrixon/sidekiq-unique-jobs#locks. This very well describes the different locking mechanisms. I wish we could adapt that gem, it would be likely simplest to have. The idempotent seems to model the :until_expired.
Wouldn't we want :until_and_while_executing so we cannot schedule new jobs when the same job (same class + arguments) is already scheduled? Though as soon as the job starts, there could be a state change somewhere that warrants a re-run after the first completes.
As an example: Push mirrors get scheduled on each push, we don't need duplicates, but every time a new push comes we need a new job as the currently running one might not have included all changes.
As an example: Push mirrors get scheduled on each push, we don't need duplicates, but every time a new push comes we need a new job as the currently running one might not have included all changes.
Oh this is a neat example! So as you said, it's semantically safe to discard duplicate idempotent jobs only until the first job (the one we didn't discard) starts running. At that point we need to keep the next identical job request, even if it arrives while the 1st one is running, because the scope of the running job is ambiguous.
Do we also need to ensure the 2nd of the identical job requests doesn't start running until the 1st one completes? If we're not certain that the jobs are both idempotent and concurrency-safe, then it seems like they'd need some serialization mechanism, either in the job scheduler or internal to the job itself. (I know we're talking about a specific example here, but I'm asking the concurrency safety question in general, not just in the context of the steps and stateful data of push-mirror jobs.)
I think @reprazent is right about sidekiq-unique-jobs: it can be used with the reliable fetcher because it creates a client and server middleware as this issue proposes. The reliable fetcher hooks into the Sidekiq processor.
Oh this is a neat example! So as you said, it's semantically safe to discard duplicate idempotent jobs only until the first job (the one we didn't discard) starts running. At that point we need to keep the next identical job request, even if it arrives while the 1st one is running, because the scope of the running job is ambiguous.
Yep, that's right, we don't know where in the process the running job is. Something might have changed since it's start causing a new job to be scheduled.
Do we also need to ensure the 2nd of the identical job requests doesn't start running until the 1st one completes? If we're not certain that the jobs are both idempotent and concurrency-safe, then it seems like they'd need some serialization mechanism, either in the job scheduler or internal to the job itself. (I know we're talking about a specific example here, but I'm asking the concurrency safety question in general, not just in the context of the steps and stateful data of push-mirror jobs.)
I think we should be certain that the jobs are concurrency-safe and idempotent. But as a job with identical params should be doing the same thing, we might spend our compute time better on things that aren't running already.
To use the push-mirror example: If the first job is pushing a lot of data, and takes a long time to complete, and the second job was scheduled for a single updated branch. If the second job starts before the first one completes, it's going to do part of the work that the first job is already doing.
@andrewn the approach sounds great to me. Using an O(1) hash lookup to avoid the O(N) linear search makes great sense.
This is a small thing, but figured it might be worth asking about:
For the following steps in the client-side middleware:
The middleware queuries the Redis hash : HEXISTS gitlab:sidekiq:duplicate:<queue_name>:<idempotency hash> <idempotency string>
If the result exists, the middleware silently drops the job and returns to the client
If the result does not exist, the middleware, continues with the following steps:
Adds the key to the hash: HSET gitlab:sidekiq:duplicate:<queue_name>:<idempotency hash> <idempotency string> 1
It may be a bit more efficient to combine the above HEXISTS and HSET (steps 5 and 8) into an atomic test-and-set HSETNX. Then use the return value to determine if the hash key existed prior to the call, so your logic in steps 6 and 7 remain the same. This closes one of the windows for a race condition in adding duplicates, and eliminates one Redis call per job.
It may be a bit more efficient to combine the above HEXISTS and HSET (steps 5 and 8) into an atomic test-and-set HSETNX. Then use the return value to determine if the hash key existed prior to the call, so your logic in steps 6 and 7 remain the same. This closes one of the windows for a race condition in adding duplicates, and eliminates one Redis call per job.
The job arguments may not match because of some jobs which add a second argument for the JobWaiter class. #86 (closed) has a bit more detail here.
The spikes may be more down to many distinct small jobs rather than duplicates. However, gitlab-org/gitlab#20126 (closed) describes a case where a GitLab instance had more jobs in the queue than users, so we definitely can get duplicates
To decide if this is a good queue to start with, we can try to look at the recent spikes in job size in that queue and validate both of those concerns: how many duplicate jobs did we have? How many duplicate jobs did we have if we only consider the first argument?
I think that because of this, authorized_projects isn't the best queue to start with. Many of the jobs are waited on, which means that we will block another action - typically an HTTP request - to wait for them to happen, or for 10 seconds to pass, whichever is shorter.
We could still handle this. We could only consider the first argument and make other calls wait 10 seconds. Or we could store a list of waiter keys in the only job that's going to be executed and notify all waiters when that job completes. Or something else.
We could also look at whether we need to wait in all cases, and how often we hit the timeout instead of waiting successfully.
However, both of those would add complexity to what's already a pretty involved project. This is a shame, because @andrewn helped me with some Elasticsearch magic to find that over that period, 319,761 authorized_projects jobs used a job waiter, and 106,358 did not. Deduplicating the jobs using a job waiter would be a big win. It's just not the best candidate for the first iteration of this project.
To effectively validate this issue, I think we need a job that:
Is idempotent.
Is called with identical arguments.
Frequently has multiple jobs with the same arguments enqueued.
authorized_projects fails on 2. project_daily_statistics seems to fail on 3, because it has lots of duplicate jobs that aren't in the queue at the same time.
I'm open to ideas for a good candidate job, because this sounds like a fun project otherwise Maybe we could revisit the jobs mentioned in the description?
mailers, pipeline_hooks:build_hooks, and web_hook - these all fire some external action, making them not idempotent (and also we can't drop duplicates).
post_receive - this isn't idempotent but is also very unlikely to have duplicates as one of the arguments is a list of changes received.
Eyeballing this suggests pipeline_cache:expire_job_cache might be the best candidate, which was also mentioned in the description
It looks like they are, the message_id field in those logs are the same. @andrewn do you know what would cause that?
I'm not sure how to check if it often is scheduled multiple times, did you have a query handy for that?
Looking at those logs, I think we won't be able to de-duplicate: The jobs run within milliseconds, and from eyeballing that log, they're not overlapping.
If we implement this and it does work well, our task then is to make other Sidekiq queues that back up take the same arguments, and have idempotent jobs, so that we can reuse this elsewhere.
If we implement this and it does work well, our task then is to make other Sidekiq queues that back up take the same arguments, and have idempotent jobs, so that we can reuse this elsewhere.
Should we start implementing from the other side though: Mark all jobs as not_idempotent and default to considering them idempotent if nothing is specified? As in theory all jobs should be idempotent... We could delegate removing the non_idempotent declaration to the feature category the worker belongs to.
@DylanGriffith this could also be interesting to consider from the point of view of existing elasticsearch indexing jobs. If someone updates the same issue 5 times in a row, we'd be creating sidekiq jobs like:
Yes this would help us indeed but it's not quite as good as your example since sadly the updated fields are also part of the sidekiq arguments so we will still have waste if someone updates a title and description field straight after. Here is an example of args for an updated issue:
Since there is an array of changed fields as well it has additional issues with matching for checksums.
As far as I know the only reason changed_fields is there is when permissions change for issues and merge requests the permission change needs to cascade down to all the notes. So there may be other options like just moving that logic up to the web worker to queue more jobs.
Ahh, I'd forgotten we had changed_fields in there too. And since that's a serialised json string, trying to filter on it generically is going to be very painful.
Given what we use changed_fields for, I wonder if we could make it less busy / remove it entirely? We can do the "have relevant fields changed?" check before enqueuing the sidekiq job. Handling issue notes remains awkward - but could we reduce that to a boolean value, to make these jobs more amenable to being dropped as duplicates?
Looking at the list of changed fields, we might be lucky with some objects already, just because the same fields tend to change each time, but it's definitely not as easy as win as I thought.
Is it possible to get the de-duplication logic for free if sidekiq queues were implemented using Redis sorted sets instead of lists? If the payload was identical redis would just throw away the new job. I also think that redis sorted sets support all the relevant operations needed by the sidekiq queues though this may be trickier to implement than using a middleware as it may require changes to sidekiq itself.
If the result exists, the middleware silently drops the job and returns to the client
Adding structured logs to this piece of code (when bypassing the enqueing) might be an interesting way to validate the dropped jobs, before possibly enabling it using a feature flag (which should add little overhead).
Should we start implementing from the other side though: Mark all jobs as not_idempotent and default to considering them if nothing is provided? As in theory all jobs should be idempotent... We could delegate removing the non_idempotent declaration to the feature category the worker belongs to.
That sounds like part of the effort for automating the check for idempotent workers, but might be OK to do as part of this issue too, considering we pick a queue/worker we're confident about.
Not sure if I follow the and default to considering them if nothing is provided? As in theory all jobs should be idempotent. part. Defaulting every job as not idempotent and applying the duplicate filter just to the ones that we mark as idempotent - in the initial case ExpireJobCacheWorker, should be WDYT @reprazent?
@smcgivern@reprazent Do you have further points for this one? It's a good candidate for ~"workflow::Ready"
Not sure if I follow the and default to considering them if nothing is provided? As in theory all jobs should be idempotent. part. Defaulting every job as not idempotent and applying the duplicate filter just to the ones that we mark as idempotent
@oswaldo I've updated my comment , some words were missing. I meant that I'd prefer to have a not_idempotent marker, and that by default we consider every job to be idempotent.
Adding structured logs to this piece of code (when bypassing the enqueing) might be an interesting way to validate the dropped jobs, before possibly enabling it using a feature flag (which should add little overhead).
I really like this idea! So if I understand correctly, we'll start by logging every time a job would be enqueued that already lives in the queue with the same arguments.
In pseudo code, to see if I understood correctly:
# So we can track that this job was a duplicate, perhaps figure out the results, look at failuresjob['duplicate']=already_exists_for_args?ifFeature.enabled?(:drop_duplicates)&&already_exists_for_args?# log a message that we've swallowed the jobelseyieldend
I think this is something we can start on, I'll move this to ~"workflow::Ready". Please move back if we missed something.
@oswaldo During deduplication of the queue, I don't think we need to check if a worker was idempotent. Since if we include the arguments in the decision to schedule in the client middleware, any job scheduled (but not started) with the same arguments would be the same work, correct?
The client skips pushing a job on the queue, if there was already an unperformed job scheduled but not started.
Remind me again why we need to check for idempotency at the time of scheduling?
It might not come out as a requirement here. Though not-idempotent jobs, even with the same arguments, might have different side-effects depending on the time it actually runs, which might make them not safe to be dropped-out from the queue?
Maybe @andrewn can clarify if that's what he had in mind.
This would be the first manipulation of a job in client middleware that wouldn't show up in the STDOUT logs of the sidekiq process, if this is run from a client that isn't sidekiq itself, for example puma or inicorn (so most schedulings).
I'll write this out to what we currently call the Gitlab::SidekiqLogger, which writes to Rails.root.join('log', sidekiq.log) and is currently unstructured.
This isn't nescessarily blocked by these issues yet, development can continue, but we can't properly test this (enable the feature flag) without visibility.
I've marked the AuthorizedProjectsWorker as idempotent in gitlab-org/gitlab!26794 (merged) so when that is deployed, we can try flipping the switch and see if we're creating less duplicate jobs.
Which shows the limited list of idempotent workers being deduplicated. Most notably: AuthorizedProjectsWorker gets a handful of duplicates, we'll need to see what this looks like during one of those peaks.
Marking this as ~"workflow::Done". But leaving it open for a while until we remove the feature flags, I'd keep them in place until the end of the week.
@rnienaber The feature flags are being removed in gitlab-org/gitlab!29116 (merged) but that hasn't hit production yet, which it needs to, so we can remove the flags from the database as well. That's why this is still open.