Proposal: Scalable data sync/replication strategy
This issue gives high-level overview about a scalable PostgreSQL data replication strategy which can be implemented on the application layer.
Terminology
- Upstream database: our PostgreSQL database (CI or Main). The source of truth.
- Downstream database: database/datastore where we'd like to replicate some data from the upstream database.
- Examples: another PG DB, ClickHouse, Redis, ElasticSearch, simple CSV file
- Cursor: (updated_at, id) pair. Uniquely identifies one database row.
Problem statement
We have several cases where moving and transforming existing data from our upstream database would improve the overall performance of the application. A few use cases:
- Use a different database specialized for analytical queries such as ClickHouse.
- Keep data up-to-date in another PostgreSQL database.
- Replicate some data to a different database table for faster querying: self-built materialized views.
Note: the replication/sync is one-way, upstream -> downstream
Keeping the data up-to-date and consistent is a challenging task, we have to accept the following trade-offs:
- Data lag. Changes within the upstream database will not be available right away within the downstream database.
- Eventual consistency. Inconsistencies could happen from time to time which will be corrected automatically (with some delay).
Requirements:
- Works on all environments: .com, self-managed.
- Depending on the data volume, the strategy could be scaled horizontally. (Sidekiq + read-only replicas)
- Generic implementation, there can be different downstream databases.
Change detection logic
INSERT
, UPDATE
Changing data: The detection uses the updated_at
column on the given database table. This column is usually available on all database tables we have within GitLab. For single record updates, Rails automatically bumps the updated_at
column. When bulk update happens, the updated_at
column update is often skipped which means that the strategy wouldn't be able to detect changes. To fix this issue, we install a row-level trigger to ensure that the updated_at
column is updated (touched). Based on some local performance testing, adding a trigger does not affect the UPDATE
performance considerably (+-5%).
Edge cases:
-
updated_at
column is not part of the table.- We add the
updated_at
column to the table.
- We add the
-
updated_at
column has some specific application behaviour: automatic bumping ofupdated_at
is prohibited.- Add a separate column for tracking row updates.
When processing a database table, we keep track of the last replicated updated_at, id
combination (tie-breaker). This way we can replicate changes in batches.
Example keyset pagination query to load 20 rows from a specific point:
SELECT * FROM events WHERE (updated_at, id) > ('2022-01-05 12:00', 5) LIMIT 20;
This solution requires an index to be present on the updated_at, id
columns. The benefit of this approach is that we can use read-only replicas for loading the data, we wouldn't overload the primary database (of the upstream database). Additionally, we can slice the data by time ranges so parallel processing would be possible.
DELETE
Deleting data: For tracking record deletions we can extend the existing loose foreign keys feature to support other DBs than PostgreSQL with different cleanup strategies.
Currently, when a parent record is deleted, the child records are cleaned up async. The async processing ensures that the actual child records are completely removed. This might be a problem depending on the database engine. For example, ClickHouse also removes data async.
Example, current configuration:
merge_requests:
- table: ci_pipelines
column: head_pipeline_id
on_delete: async_nullify
Let's say we replicate merge requests to ClickHouse:
merge_requests:
- table: ci_pipelines
column: head_pipeline_id
on_delete: async_nullify
database: main
- table: merge_requests
column: id
on_delete: async_delete
database: clickhouse
The async delete for clickhouse
database simply invokes a DELETE
query in a fire-and-forget manner.
Note: not all databases/datastores will require "real" LFK deletion. I see a scenario where we would use the replication strategy to stream changes to an endpoint. In that case, the actual LFK behaviour would be some sort of event push to an endpoint.
System design
Note: this part focuses on row changes based on the updated_at
column.
Replication status table
This table is responsible for tracking the replication status of individual tables/entities by the downstream database.
downstream | table_name | last_updated_at | last_id |
---|---|---|---|
clickhouse | issues | 2022-10-05 12:21 | 23 |
clickhouse | events | 2022-10-06 11:05 | 12345 |
main_ci_sync | users | 2022-10-02 11:03 | 1 |
paid_namespaces | namespaces | 2022-09-05 10:01 | 9979 |
For each table/entity that we want to replicate, we must create a record. The last_
columns hold the last processed cursor (updated_at
, id
) values.
The downstream
column defines the downstream database where the data will be replicated. Each downstream
entry has to implement a well-defined API for receiving a batch of rows.
The implementation defines:
- How to transform the loaded models from the upstream database.
- How to formulate the
INSERT
query.- Example 1:
UPSERT
namespaces
rows into thepaid_namespaces
table (PG -> PG). - Example 2:
INSERT
events
to a ClickHouse table (PG -> CH). - Example 3: Writing
issues.title
to a csv file (PG -> CSV). - Example 4: Add
issues
to ElasticSearch via the ES HTTP interface (PG -> ES).
- Example 1:
Example (pseudo-code):
def process
table.within_time_slice(current_slice).each_batch do |relation| # batching logic provided by the system
send_to_downstream(relation.to_json)
end
end
# Custom implementation created by the owner of the "downstream"
# For example, Group::Optimize would implement a strategy that inserts data into ClickHouse
def send_to_downstream
ClickHouse.insert(relation.to_json) # downstream specific insert/consumption logic
end
Within the batching block, there is an option to filter and transform the data. For example, we can filter out items that are not licensed.
Downstream conflict resolution would be the responsibility of the implementor. For example, dealing with existing records (upsert, etc.).
Job coordinator
This is a periodical job that determines which entries should be replicated next by looking at the replication status table. The coordinator is also responsible for limiting the number of jobs, this is where we could implement parallel execution and overload protection (also checking replication lag).
When an entry (for example, clickhouse
, events
table) is up next, the coordinator will do the following:
- Verify that the currently running jobs are less than the maximum number of parallel jobs the system configuration allows.
- Inserts a new record to the
replication_jobs
table to process the next time slice. - If there is more "room" to run jobs, insert another record to process the next time slice (if there is one).
replication_jobs
table:
replication_status_id | from_updated_at | from_id | to_updated_at | to_id | status |
---|---|---|---|---|---|
4 | 2022-10-05 12:21 | 43 | 2022-10-05 12:26 | 23 | pending |
4 | 2022-10-05 12:26 | 23 | 2022-10-05 12:31 | 542 | pending |
The replication_jobs
would use our sliding list partitioning strategy to clear out finished jobs.
Job processor
The job processor is a periodical job which takes one item from the replication_jobs
table and starts querying the upstream database from the last cursor (updated_at
, id
pair).
Based on the strategy, the job processor will send data in batches to the downstream database. The job processor is time-limited, after a certain runtime, the job will stop the execution and records the last processed cursor. At a later step, the next job processor can pick up the work and continue the processing.
Overview
Out-of-sync problem
There is a small chance that data becomes stale / out-of-sync. Examples:
- The downstream database accidentally deletes some data.
- Some data is inserted into the upstream database with a lower
updated_at
timestamp than the current cursor.
These cases can and will happen. We can build an extra process that periodically re-syncs the whole database. For example, we could extend the replication_status
table with two more columns: full_replication_updated_at, full_replication_id
.
These timestamps would track the process of the "full" replication. Once the updated_at
timestamp reaches the current timestamp, the replication status would be reset and it starts from the beginning. This would be a rather slow process which we would run on low priority (less frequently).
Additionally, we could give the possibility to the admins to schedule a new replication job for a given time range, let's say the last 3 hours.