Partition the NATS subjects (low priority)

Problem: consumer data ingestion to ClickHouse (and other data stores) is single-threaded (per table).

This means that during a large snapshot run (5 parallel goroutines) the data lag will increase significantly because there is only one consumer goroitine processing the snapshot.

Idea: partition the replication events in NATS by primary key. This will allow safe parallel execution by the consumer.

Producer side:

  • Add a configuration to the table_mapping where we can define the number of partitions.
  • When we build the replication events, we know the primary key(s).
  • Hash the primary key(s) so it ends up in one of the partitions. events.users.partition1.
  • During snapshot, we need to merge data consistently: snapshot_events.users.partition1 -> events.users.partition1

Consumer side:

  • Add a configuration to the streams where we can define the number of partitions (must match with the producer).
  • When instantiating the consumers, we should create one for each partition (parallelization).
  • Concurrent execution can happen because we partitioned the data by primary key(s).

Implementation details

Producer

Extend the BufferedItem struct with a hash field (uint64). The hash value can be calculated the following way:

  • In the BuildReplicationEvent function, we know which columns are part of the primary key (we must also handle composite primary keys).
  • Collect the byte value for each PK column. It's available from PG LR stream (unparsed).
  • Use xxhash for hashing the values, so we get an uint64 as a result.
  • If there are multiple primary keys, put the byte values into an array, sort them, and then produce a single uint64 hash value.
  • Store the hash on the BufferedItem struct.

If we do his right, the snapshot process will do the same hashing since we parse the columns the same way.

When we submit data to NATS, we look into the partition options of the given subject. If we have partitions defined then we group the events for a given subject into N groups using the modulo operator: (hash & partition_count).

Each group will be serialized, compressed and submitted to NATS separately.

Edited by Adam Hegyi