Add table schema version in package
What does this MR do?
This MR adds a VersionHash field to the LogicalReplicationEvents protobuf message. The VersionHash is a fingerprint that represents the specific state of a database table's schema at the time an event package was generated.
Key Changes
-
Protobuf Update: Added
uint64 version_hashfield toLogicalReplicationEventsmessage -
Schema Version Calculation: Implemented hash calculation based on:
- Table name and schema
- Column names and their data types (sorted alphabetically)
- Uses XXH3 algorithm for hashing
-
Queue Interface Changes: Updated
PersistentQueue.Add()method signature to acceptRelationMessageWithVersioninstead of separate table/schema parameters, enabling version hash propagation -
Consumer Updates:
- Demo consumer now displays the version hash
- All queue implementations (NATS, Noop) updated to include version hash in events
-
Documentation: Added comprehensive section in README explaining:
- What
VersionHashis and why it's useful - How to use it for detecting schema drift
- How the hash is calculated
- SQL query to compute the hash for verification
- What
How do I test it?
You can test the behavior using the simulator:
cd simulator
docker compose up --build
See the logs for the demo consumer:
docker compose logs | grep siphon_consumer_1_demo
You should see the version hash printed for the users package:
siphon_consumer_1_demo | Application identifier: siphon-producer-db1
siphon_consumer_1_demo | PostgreSQL table: users
siphon_consumer_1_demo | PostgreSQL schema: public
siphon_consumer_1_demo | PostgreSQL table schema version hash: 17321062187436956279
Start psql and update some rows to trigger another package:
psql "host=localhost port=5433 user=db1 password=db1 dbname=siphon"
update users set updated_at=now();
You should see a new entry in the logs with the same version hash:
siphon_consumer_1_demo | Application identifier: siphon-producer-db1
siphon_consumer_1_demo | PostgreSQL table: users
siphon_consumer_1_demo | PostgreSQL schema: public
siphon_consumer_1_demo | PostgreSQL table schema version hash: 17321062187436956279
siphon_consumer_1_demo | Event count: 3
siphon_consumer_1_demo | First event: operation:OPERATION_UPDATE columns:{column_index:2 value:{int64_value:1}} columns:{column_index:3 value:{string_value:"User Name1"}} columns:{column_index:5 value:{string_value:"user1"}} columns:{value:{string_value:"about"}} columns:{column_index:1 value:{timestamp_value:{seconds:1765471970 nanos:303107}}} columns:{column_index:4 value:{timestamp_value:{seconds:1765472071 nanos:265503}}}
Make a change to the PostgreSQL DB schema:
ALTER TABLE users ADD COLUMN foo bigint;
-- update some rows
update users set updated_at=now();
The version hash in the output should change:
siphon_consumer_1_demo | Application identifier: siphon-producer-db1
siphon_consumer_1_demo | PostgreSQL table: users
siphon_consumer_1_demo | PostgreSQL schema: public
siphon_consumer_1_demo | PostgreSQL table schema version hash: 8784686533527315965