Skip to content

Proposal: Scalable data sync/replication strategy

This issue gives high-level overview about a scalable PostgreSQL data replication strategy which can be implemented on the application layer.

Terminology

  • Upstream database: our PostgreSQL database (CI or Main). The source of truth.
  • Downstream database: database/datastore where we'd like to replicate some data from the upstream database.
    • Examples: another PG DB, ClickHouse, Redis, ElasticSearch, simple CSV file
  • Cursor: (updated_at, id) pair. Uniquely identifies one database row.

Problem statement

We have several cases where moving and transforming existing data from our upstream database would improve the overall performance of the application. A few use cases:

  • Use a different database specialized for analytical queries such as ClickHouse.
  • Keep data up-to-date in another PostgreSQL database.
  • Replicate some data to a different database table for faster querying: self-built materialized views.

Note: the replication/sync is one-way, upstream -> downstream

Keeping the data up-to-date and consistent is a challenging task, we have to accept the following trade-offs:

  • Data lag. Changes within the upstream database will not be available right away within the downstream database.
  • Eventual consistency. Inconsistencies could happen from time to time which will be corrected automatically (with some delay).

Requirements:

  • Works on all environments: .com, self-managed.
  • Depending on the data volume, the strategy could be scaled horizontally. (Sidekiq + read-only replicas)
  • Generic implementation, there can be different downstream databases.

Change detection logic

Changing data: INSERT, UPDATE

The detection uses the updated_at column on the given database table. This column is usually available on all database tables we have within GitLab. For single record updates, Rails automatically bumps the updated_at column. When bulk update happens, the updated_at column update is often skipped which means that the strategy wouldn't be able to detect changes. To fix this issue, we install a row-level trigger to ensure that the updated_at column is updated (touched). Based on some local performance testing, adding a trigger does not affect the UPDATE performance considerably (+-5%).

Edge cases:

  • updated_at column is not part of the table.
    • We add the updated_at column to the table.
  • updated_at column has some specific application behaviour: automatic bumping of updated_at is prohibited.
    • Add a separate column for tracking row updates.

When processing a database table, we keep track of the last replicated updated_at, id combination (tie-breaker). This way we can replicate changes in batches.

Example keyset pagination query to load 20 rows from a specific point:

SELECT * FROM events WHERE (updated_at, id) > ('2022-01-05 12:00', 5) LIMIT 20;

This solution requires an index to be present on the updated_at, id columns. The benefit of this approach is that we can use read-only replicas for loading the data, we wouldn't overload the primary database (of the upstream database). Additionally, we can slice the data by time ranges so parallel processing would be possible.

Deleting data: DELETE

For tracking record deletions we can extend the existing loose foreign keys feature to support other DBs than PostgreSQL with different cleanup strategies.

Currently, when a parent record is deleted, the child records are cleaned up async. The async processing ensures that the actual child records are completely removed. This might be a problem depending on the database engine. For example, ClickHouse also removes data async.

Example, current configuration:

merge_requests:
  - table: ci_pipelines
    column: head_pipeline_id
    on_delete: async_nullify

Let's say we replicate merge requests to ClickHouse:

merge_requests:
  - table: ci_pipelines
    column: head_pipeline_id
    on_delete: async_nullify
    database: main
  - table: merge_requests
    column: id
    on_delete: async_delete
    database: clickhouse

The async delete for clickhouse database simply invokes a DELETE query in a fire-and-forget manner.

Note: not all databases/datastores will require "real" LFK deletion. I see a scenario where we would use the replication strategy to stream changes to an endpoint. In that case, the actual LFK behaviour would be some sort of event push to an endpoint.

System design

Note: this part focuses on row changes based on the updated_at column.

Replication status table

This table is responsible for tracking the replication status of individual tables/entities by the downstream database.

downstream table_name last_updated_at last_id
clickhouse issues 2022-10-05 12:21 23
clickhouse events 2022-10-06 11:05 12345
main_ci_sync users 2022-10-02 11:03 1
paid_namespaces namespaces 2022-09-05 10:01 9979

For each table/entity that we want to replicate, we must create a record. The last_ columns hold the last processed cursor (updated_at, id) values.

The downstream column defines the downstream database where the data will be replicated. Each downstream entry has to implement a well-defined API for receiving a batch of rows.

The implementation defines:

  • How to transform the loaded models from the upstream database.
  • How to formulate the INSERT query.
    • Example 1: UPSERT namespaces rows into the paid_namespaces table (PG -> PG).
    • Example 2: INSERT events to a ClickHouse table (PG -> CH).
    • Example 3: Writing issues.title to a csv file (PG -> CSV).
    • Example 4: Add issues to ElasticSearch via the ES HTTP interface (PG -> ES).

Example (pseudo-code):

def process
  table.within_time_slice(current_slice).each_batch do |relation| # batching logic provided by the system
    send_to_downstream(relation.to_json)
  end
end

# Custom implementation created by the owner of the "downstream"
# For example, Group::Optimize would implement a strategy that inserts data into ClickHouse  
def send_to_downstream
  ClickHouse.insert(relation.to_json) # downstream specific insert/consumption logic 
end

Within the batching block, there is an option to filter and transform the data. For example, we can filter out items that are not licensed.

Downstream conflict resolution would be the responsibility of the implementor. For example, dealing with existing records (upsert, etc.).

Job coordinator

This is a periodical job that determines which entries should be replicated next by looking at the replication status table. The coordinator is also responsible for limiting the number of jobs, this is where we could implement parallel execution and overload protection (also checking replication lag).

When an entry (for example, clickhouse, events table) is up next, the coordinator will do the following:

  • Verify that the currently running jobs are less than the maximum number of parallel jobs the system configuration allows.
  • Inserts a new record to the replication_jobs table to process the next time slice.
  • If there is more "room" to run jobs, insert another record to process the next time slice (if there is one).

replication_jobs table:

replication_status_id from_updated_at from_id to_updated_at to_id status
4 2022-10-05 12:21 43 2022-10-05 12:26 23 pending
4 2022-10-05 12:26 23 2022-10-05 12:31 542 pending

The replication_jobs would use our sliding list partitioning strategy to clear out finished jobs.

Job processor

The job processor is a periodical job which takes one item from the replication_jobs table and starts querying the upstream database from the last cursor (updated_at, id pair).

Based on the strategy, the job processor will send data in batches to the downstream database. The job processor is time-limited, after a certain runtime, the job will stop the execution and records the last processed cursor. At a later step, the next job processor can pick up the work and continue the processing.

Overview

replicator

Out-of-sync problem

There is a small chance that data becomes stale / out-of-sync. Examples:

  • The downstream database accidentally deletes some data.
  • Some data is inserted into the upstream database with a lower updated_at timestamp than the current cursor.

These cases can and will happen. We can build an extra process that periodically re-syncs the whole database. For example, we could extend the replication_status table with two more columns: full_replication_updated_at, full_replication_id.

These timestamps would track the process of the "full" replication. Once the updated_at timestamp reaches the current timestamp, the replication status would be reset and it starts from the beginning. This would be a rather slow process which we would run on low priority (less frequently).

Additionally, we could give the possibility to the admins to schedule a new replication job for a given time range, let's say the last 3 hours.

Edited by Adam Hegyi