Siphon Toast Handling Null Types / Arrays

See example report on gitlab-org/orbit/knowledge-graph$4930693

Example reproducible test case:

From 16e7b3eb0a41e36ac9520a388f5c2df5cac1cd07 Mon Sep 17 00:00:00 2001
From: Adam Hegyi <ahegyi@gitlab.com>
Date: Tue, 3 Feb 2026 16:18:48 +0100
Subject: [PATCH] Fix dedup mismatching primary key

---
 pkg/consumers/clickhouse_consumer_test.go | 68 +++++++++++++++++++++++
 pkg/consumers/testdata/clickhouse_init.sh | 11 ++++
 2 files changed, 79 insertions(+)

diff --git a/pkg/consumers/clickhouse_consumer_test.go b/pkg/consumers/clickhouse_consumer_test.go
index a3f7fc5..1822f28 100644
--- a/pkg/consumers/clickhouse_consumer_test.go
+++ b/pkg/consumers/clickhouse_consumer_test.go
@@ -1369,6 +1369,74 @@ var _ = Describe("ClickHouse consumer", func() {
 		// new _siphon_replicated_at values should be higher
 		Expect((*replicatedAts)[0]).To(BeTemporally("<", (*freshReplicatedAts)[0]), "expected t1 to be older than t2")
 	})
+
+	Describe("using the siphon_notes table", func() {
+		It("deletes data from ClickHouse", Label("clickhouse"), func(ctx SpecContext) {
+			var columnNames = map[string]uint32{
+				"id":         0,
+				"project_id": 1,
+				"note":       2,
+			}
+
+			config.Streams = append(config.Streams, consumers.Stream{
+				Identifier: "notes",
+				Subject:    "notes",
+				Target:     "siphon_notes",
+			})
+
+			conn, err := consumers.NewClickHouseConnection(ctx, config)
+			Expect(err).NotTo(HaveOccurred())
+
+			query := `
+			INSERT INTO siphon_notes
+			(id, project_id, note)
+			VALUES
+			(1, 101, 'a note')
+			`
+			err = conn.Exec(context.Background(), query)
+			Expect(err).NotTo(HaveOccurred())
+
+			events := []*pb.ReplicationEvent{
+				{
+					Operation: pb.ReplicationEvent_OPERATION_DELETE,
+					Columns: []*pb.ReplicationEvent_Column{
+						{
+							ColumnIndex: columnNames["id"],
+							Value: &pb.Value{
+								Value: &pb.Value_Int64Value{Int64Value: 1},
+							},
+						},
+					},
+				},
+			}
+
+			pkg := pb.LogicalReplicationEvents{
+				Events:  events,
+				Columns: getKeys(columnNames),
+			}
+
+			handler, err := consumers.NewClickHouseConsumer(ctx, config, logger)
+			Expect(err).NotTo(HaveOccurred())
+
+			err = handler.HandleEvents(ctx, &pkg, &config.Streams[1])
+			Expect(err).NotTo(HaveOccurred())
+
+			query = `SELECT count()	FROM siphon_notes FINAL`
+
+			rows, err := conn.Query(context.Background(), query)
+			Expect(err).NotTo(HaveOccurred())
+			defer rows.Close()
+
+			var count uint64
+			for rows.Next() {
+				if err := rows.Scan(&count); err != nil {
+					Expect(err).NotTo(HaveOccurred())
+				}
+			}
+
+			Expect(count).To(BeEquivalentTo(0))
+		})
+	})
 })
 
 func getIDAndReplicatedAt(conn clickhouse.Conn) (*[]int64, *[]time.Time) {
diff --git a/pkg/consumers/testdata/clickhouse_init.sh b/pkg/consumers/testdata/clickhouse_init.sh
index d5410c2..79942a5 100644
--- a/pkg/consumers/testdata/clickhouse_init.sh
+++ b/pkg/consumers/testdata/clickhouse_init.sh
@@ -29,4 +29,15 @@ PRIMARY KEY id;
 )
 ENGINE=ReplacingMergeTree(_siphon_replicated_at, _siphon_deleted)
 PRIMARY KEY id;
+
+  CREATE TABLE siphon_notes (
+  id Int64,
+  project_id Int64,
+  note String,
+  note_with_default String DEFAULT 'NOTE: ' || note,
+  _siphon_replicated_at DateTime64(6, 'UTC') DEFAULT NOW(),
+  _siphon_deleted Boolean DEFAULT false
+)
+ENGINE=ReplacingMergeTree(_siphon_replicated_at, _siphon_deleted)
+PRIMARY KEY (note_with_default, project_id, id);
 " --multiquery
\ No newline at end of file
-- 
2.39.5

Assignee Loading
Time tracking Loading