Siphon Toast Handling Null Types / Arrays
See example report on gitlab-org/orbit/knowledge-graph$4930693
Example reproducible test case:
From 16e7b3eb0a41e36ac9520a388f5c2df5cac1cd07 Mon Sep 17 00:00:00 2001
From: Adam Hegyi <ahegyi@gitlab.com>
Date: Tue, 3 Feb 2026 16:18:48 +0100
Subject: [PATCH] Fix dedup mismatching primary key
---
pkg/consumers/clickhouse_consumer_test.go | 68 +++++++++++++++++++++++
pkg/consumers/testdata/clickhouse_init.sh | 11 ++++
2 files changed, 79 insertions(+)
diff --git a/pkg/consumers/clickhouse_consumer_test.go b/pkg/consumers/clickhouse_consumer_test.go
index a3f7fc5..1822f28 100644
--- a/pkg/consumers/clickhouse_consumer_test.go
+++ b/pkg/consumers/clickhouse_consumer_test.go
@@ -1369,6 +1369,74 @@ var _ = Describe("ClickHouse consumer", func() {
// new _siphon_replicated_at values should be higher
Expect((*replicatedAts)[0]).To(BeTemporally("<", (*freshReplicatedAts)[0]), "expected t1 to be older than t2")
})
+
+ Describe("using the siphon_notes table", func() {
+ It("deletes data from ClickHouse", Label("clickhouse"), func(ctx SpecContext) {
+ var columnNames = map[string]uint32{
+ "id": 0,
+ "project_id": 1,
+ "note": 2,
+ }
+
+ config.Streams = append(config.Streams, consumers.Stream{
+ Identifier: "notes",
+ Subject: "notes",
+ Target: "siphon_notes",
+ })
+
+ conn, err := consumers.NewClickHouseConnection(ctx, config)
+ Expect(err).NotTo(HaveOccurred())
+
+ query := `
+ INSERT INTO siphon_notes
+ (id, project_id, note)
+ VALUES
+ (1, 101, 'a note')
+ `
+ err = conn.Exec(context.Background(), query)
+ Expect(err).NotTo(HaveOccurred())
+
+ events := []*pb.ReplicationEvent{
+ {
+ Operation: pb.ReplicationEvent_OPERATION_DELETE,
+ Columns: []*pb.ReplicationEvent_Column{
+ {
+ ColumnIndex: columnNames["id"],
+ Value: &pb.Value{
+ Value: &pb.Value_Int64Value{Int64Value: 1},
+ },
+ },
+ },
+ },
+ }
+
+ pkg := pb.LogicalReplicationEvents{
+ Events: events,
+ Columns: getKeys(columnNames),
+ }
+
+ handler, err := consumers.NewClickHouseConsumer(ctx, config, logger)
+ Expect(err).NotTo(HaveOccurred())
+
+ err = handler.HandleEvents(ctx, &pkg, &config.Streams[1])
+ Expect(err).NotTo(HaveOccurred())
+
+ query = `SELECT count() FROM siphon_notes FINAL`
+
+ rows, err := conn.Query(context.Background(), query)
+ Expect(err).NotTo(HaveOccurred())
+ defer rows.Close()
+
+ var count uint64
+ for rows.Next() {
+ if err := rows.Scan(&count); err != nil {
+ Expect(err).NotTo(HaveOccurred())
+ }
+ }
+
+ Expect(count).To(BeEquivalentTo(0))
+ })
+ })
})
func getIDAndReplicatedAt(conn clickhouse.Conn) (*[]int64, *[]time.Time) {
diff --git a/pkg/consumers/testdata/clickhouse_init.sh b/pkg/consumers/testdata/clickhouse_init.sh
index d5410c2..79942a5 100644
--- a/pkg/consumers/testdata/clickhouse_init.sh
+++ b/pkg/consumers/testdata/clickhouse_init.sh
@@ -29,4 +29,15 @@ PRIMARY KEY id;
)
ENGINE=ReplacingMergeTree(_siphon_replicated_at, _siphon_deleted)
PRIMARY KEY id;
+
+ CREATE TABLE siphon_notes (
+ id Int64,
+ project_id Int64,
+ note String,
+ note_with_default String DEFAULT 'NOTE: ' || note,
+ _siphon_replicated_at DateTime64(6, 'UTC') DEFAULT NOW(),
+ _siphon_deleted Boolean DEFAULT false
+)
+ENGINE=ReplacingMergeTree(_siphon_replicated_at, _siphon_deleted)
+PRIMARY KEY (note_with_default, project_id, id);
" --multiquery
\ No newline at end of file
--
2.39.5