Updates to models go into sorted sets in Redis rather than queued as sidekiq jobs.
A periodic sidekiq worker runs every 1 minute and pops the top 1k update jobs from the sorted set
The 1k update jobs are transformed into a bulk update for Elasticsearch
Failed updates are retried
Likely follow up issues
Timeouts for updates are retried in increasingly smaller batches similar to #195774 (closed) . This may be preferable to do in a follow up issue since running this every 1 minute likely won't build up large payloads (ie. we're unlikely to hit 1k jobs in a minute until we roll this out more widely)
This functionality is gated behind a default-off feature flag: :elastic_bulk_incremental_updates . An issue to roll out the feature flag and subsequently remove the FF is at: #208717 (closed)
Edited
Designs
An error occurred while loading designs. Please try again.
Child items
0
Show closed items
GraphQL error: The resource that you are attempting to access does not exist or you don't have permission to perform this action
@nick.thomas I wasn't able to find out if there was an existing related issue so I've just created this now and will use this for now to compile thoughts and solutions on this.
This data structure seems to map well to our problem space and we already have Redis. We can have a sidekiq worker that is scheduled to run periodically (perhaps hourly or every 10 mins if we really care about it being up to date) and pull all of the messages from the queue.
We want to batch though rather than sending arbitrarily large payloads to ES. So pull (for example) at most 1000 messages from the queue and if there are still more then 100 left then requeue itself to handle those immediately. Or loop over batches of 1000 at a time. But we want to avoid the situation where we just keep processing because more messages keep coming in during the processing. We may choose to only loop if there are more than 1000 in the queue. We need to always process at least once every 10 mins though so we should always pull from the queue once regardless if there is less than 1000 in the queue. The article seems to not describe the above problem so maybe I'm overthinking it. It just describes pulling 1000 messages from the queue and performing the bulk update. The one concern I have is that the queue may grow increasingly if we consistently are getting more than 1000 updates during the interval. But maybe then we have our interval set wrong anyway? It's possibly just a tradeoff between 2 possible risks which is either overloading ElasticSearch or never finishing processing your queue. I think in either case you find that you need to intervene to rescale things (either automatically or manually) and depending on which way you go you will be monitoring either ES being overloaded or your queue growing too large. It's also possible that overloading ES just has the same effect of back pressuring on slow writes so your queue grows anyway so maybe it's preferable to just let your queue grow too long and monitor that.
I've previously implemented a similar approach to this in a ruby gem that stores buckets in redis https://github.com/dgvz/em-bucketer . The idea is that when a bucket reaches a certain size we pop the bucket and process all the messages. Or you can set a max time to wait and if that time elapses again we pop the bucket. This means that you have a maximum delay of things getting indexed and they may get indexed sooner if we make a large enough batch beforehand. That gem doesn't perfectly match what we're trying to do here because it doesn't de-dupe things but the concept of filling buckets and setting max wait times could certainly apply.
we would create a queue entry with the data {record_type: "post", record_id: 123}. If the document is modified a second time, it is important that no duplicate queue entry be made. This deduplication of multiple changes over a given time period effectively insulates Elasticsearch from excessive write churn.
To avoid missing an update we should dequeue before looking up the record in our DB which ensures if a new update comes in after we look up the record in the DB it will again get added to the queue for later processing.
Queues are remarkably poor at handling a full-reindex of data. This requires inserting a queue entry for every record in the database
Batching Based on Ranges
Not every dataset needs to deal with updates to documents. Some datasets only insert, and never update data. An example of this is a logging application. Logs are generally immutable
It seems like this may be applicable to indexing commits but we would need special handling for deleting commits which is necessary under certain circumstances for security reasons.
Some of the time based advantages described in this don't seem like they'd be valuable in any GitLab features since we're never really providing users the ability to limit the time ranges of their searches. This is more useful for when people are querying things that happened in the last month etc. and none of our current queries work like this. It's possible, though, that we may wish to model comment search like this but then comments can be updated so it may not really be useful to do this anyway.
Combining Queues and Ranges
The key here is to retroactively update rows via the range based approach for infrequent full table ‘scrubs’, while relying on hooks in the ORM lifecycle to account for any changes since then. That approach allows one to periodically issue full table reindexes with little cost
I don't actually understand what is meant by range based in this context because I believe we're talking about reindexing everything from scratch (hence the range of recent additions doesn't seem applicable).
I think this is kinda how we do things today as we do have a way to trigger a full reindex that uses the Bulk API and does not queue up indexing per record. I'm trying to figure out if there is anything particularly insightful to think about from this section but I think it's all kinda obvious and we already do it. One thing that maybe seems insightful here is the idea of reindexing everything from scratch but right now all our data lives in one index so I think what we have in place to recreate the whole index is the best we can do in this regard.
There is a slight risk of a race condition between the two systems, but the probabilities of it being an actual issue are slim, and can be avoided with careful locking if required.
I believe we have some locking in place today to prevent these 2 things interacting with each other. Though I do wonder if our locking is correctly handling when updates come in as the bulk insert is happening. Do we lose updates or do we re-queue those updates with some delay? How would we handle that if we moved to using the batching approach described here? Can we put things back into the queue with some delay if a bulk process is underway for that project/group? Or perhaps we just stick it straight back onto the back of the queue and it will try again in the next time interval which is the delay we want.
Error Handling in Replication
Additionally, it is important to note that the Queue entries should be transactionally locked during processing, and only deleted once all records have been successfully sent to the bulk API
If we use redis sorted sets how do we reconcile this with wanting to remove things from the sorted set straight away so that new updates will definitely be requeued? I'm not familiar with redis transactions and not sure if they will help here. Perhaps we need to move the in process messages to a separate sorted set where they will be deleted after successfully processing. If the process dies before completing we can just re-add them to the original redis sorted set after the process is restarted.
So I think that on our schedule the process would be:
Read 1000 elements from <main_sorted_set_queue>
Store elements in <backup_sorted_set_queue>
Delete elements from <main_sorted_set_queue>
Read records from DB
Process all bulk inserts
Delete elements from <backup_sorted_set_queue>
This seems safe due all messages in the queue being idempotent.
To handle the errors of crashing before deleting from <backup_sorted_set_queue> we could read from this queue first in our periodic worker and copy any elements from there to <main_sorted_set_queue>.
It's worth noting that this kind of reliability is usually accomplished easily in other queueing systems by acknowleding the messages after processing or by incrementing a counter of where you are in your processing of the queue only after processing. In either case I don't know if other message queue systems will be able to handle the deduplication that Redis handles well with sorted sets.
Why Marking Source Records is an Anti-Pattern
We are kinda doing this with index_statuses that we are using for repo indexing. Given that re-indexing all the source code of a project would be inefficient after every commit this seems to be somewhat necessary. It's worth noting that since we store it in a separate table we avoid some of the performance problems described in that article. That being said if we start indexing project repos in bulk there may still be some performance costs to bulk updating the index_status table though given it's a narrow table that is not being read or written by any other processes it's probably not really a problem here.
The alternative approach here that seems to align better with treating each file in a repo as a single record would be that each time a push occurs we look at the changed files and we store the filename for every changed file into the sorted sets. This means that we just re-read all those files from the default branch when it comes to doing the indexing. This means we don't actually need to calculate the diff at indexing time because we used the diff from the pushed commits. This approach would mean there is no need to store the index status of a project anymore. The other clear advantage here would be that the bulk updating of project repos could be handled exactly the same as any other record (ie. using using the sorted sets).
I really like this approach - using a sidekiq job per issue or note to be updated has always been a lot of overhead, and switching to the bulk API necessarily means that we move away from that. A lot of our prior work has focused on making backfill scale; this proposal starts to zero in on making incremental index scale as well.
@DylanGriffith's notes above point the way to an alternative system where we replace "enqueue sidekiq jobs for incremental indexing" with "push an ID that needs updating/deleting to a set in redis". So we get deduplication for free. We'd then have a regular sidekiq-cron job (runs every minute?) that takes a subset of those IDs, generates the elasticsearch documents for them, and writes those documents to the elasticsearch cluster. Deletes can be handled similarly. It has a lot of parallels with how we run enqueued updates on Geo secondaries, which is comforting.
In this model, most of the application work stays in Sidekiq, and we're actually doing less work in unicorn. We can unlock the bulk API for doing those updates, and scheduling overhead is greatly reduced. Control over parallelism and batch size is also enhanced significantly. If elasticsearch is down, pending updates accumulate in redis, but do they do much more efficiently than at present, and clearing the backlog is much cheaper. It's also less likely to happen, since we're no longer bombarding elasticsearch with individual updates 😅. As long as the redis cluster is not lost, we can bring the index back up to date.
I've had an alternative idea in the back of my mind for a while now, which achieves the same but with slightly different components; I'll lay it out here for completeness, but I think I've convinced myself that "now is not the time". Details below the fold for posterity.
Some years ago I suggested that gitlab-elasticsearch-indexer should be a daemon rather than a subprocess: gitlab-elasticsearch-indexer#3
We'd run N of these per gitlab cluster, and they would monitor redis for indexing jobs. For repository updates, this means replacing the existing sidekiq job with "write an ID to a set in redis".
All very well, but what's the relevance to incremental updates for databases? We can't let the go daemon read from postgres, and it's definitely not a good idea to put it in charge of the transformations from database row to elasticsearch document!
However, if it's there, and reading from redis, we can take the proposal above and modify it slightly so that, instead of pushing an id to redis, unicorn/puma pushes the entire generated elasticsearch document to redis instead. The daemon then reads it from redis and passes it onto elasticsearch as part of a bulk call unmodified.
This moves the application work out of sidekiq and into puma/unicorn. This can be viewed as a positive or a negative, depending on your point of view, but in general, generating these documents is not a large cost. The main benefit I see is in having much better control over parallelism and latency than sidekiq can give us.
We do a little more work in the "many updates show up quickly" case, since each time we do an update we're pushing the whole document to redis, but we'd still only push the most recent one to elasticsearch.
I may come back to this in the future, but for now, I think I'm happy not pursuing it.