Add UniqueTransactionId to ORDER BY in usage_billing_consumed ClickHouse table
Problem
The current usage_billing_consumed table uses ReplacingMergeTree(ConsumedAt) with ORDER BY EventId. This means ClickHouse will merge rows with the same EventId together, keeping only the row with the latest ConsumedAt timestamp.
However, we're now writing UniqueTransactionId to this table (as seen in Billing::Usage::Consumed#mark_consumed!), and we need to ensure that both EventId and UniqueTransactionId form a composite key to prevent unintended deduplication.
Current schema:
CREATE TABLE {db_name}.usage_billing_consumed
(
EventId FixedString(16) CODEC(ZSTD(1)),
ConsumedAt DateTime64(9) DEFAULT now64(9) CODEC(Delta(8), ZSTD(1))
)
ENGINE = ReplacingMergeTree(ConsumedAt)
PARTITION BY toYYYYMM(ConsumedAt)
ORDER BY EventId
SETTINGS index_granularity = 8192;
Issue: With the current ORDER BY EventId, if the same EventId is consumed with different UniqueTransactionId values, ClickHouse will merge them, potentially losing consumption records.
Solution
Update the table schema to include UniqueTransactionId in the ORDER BY clause, making (EventId, UniqueTransactionId) the composite key.
Since ClickHouse does not allow modifying the ORDER BY clause of an existing table, we need to:
- Create a new table with the correct schema
- Copy data from the old table
- Perform an atomic swap
- Clean up the old table
Note: The UniqueTransactionId column already exists in the table (it's being written by the application code), so we only need to change the ORDER BY clause.
Migration Steps
1. Stop consumption jobs (maintenance window)
Before performing the migration, we should temporarily pause consumption jobs to prevent writes during the table swap:
Enable the usage_billing_disable_consumption flag
- Verify:
Billing::Usage::Feature.consumption_enabled?returnsfalse
2. Execute ClickHouse migration
-- 1. Create new table with composite ORDER BY
CREATE TABLE {db_name}.usage_billing_consumed_new
(
EventId FixedString(16) CODEC(ZSTD(1)),
ConsumedAt DateTime64(9) DEFAULT now64(9) CODEC(Delta(8), ZSTD(1)),
UniqueTransactionId UUID CODEC(ZSTD(1))
)
ENGINE = ReplacingMergeTree(ConsumedAt)
PARTITION BY toYYYYMM(ConsumedAt)
ORDER BY (EventId, UniqueTransactionId)
SETTINGS index_granularity = 8192;
-- 2. Copy data from old table (should be fast, ~1-2 seconds for typical data volumes)
INSERT INTO {db_name}.usage_billing_consumed_new
SELECT EventId, ConsumedAt, UniqueTransactionId
FROM {db_name}.usage_billing_consumed;
-- 3. Verify counts match
SELECT count() FROM {db_name}.usage_billing_consumed;
SELECT count() FROM {db_name}.usage_billing_consumed_new;
-- 4. Atomic swap (renames are atomic in ClickHouse)
RENAME TABLE {db_name}.usage_billing_consumed TO {db_name}.usage_billing_consumed_old;
RENAME TABLE {db_name}.usage_billing_consumed_new TO {db_name}.usage_billing_consumed;
-- 5. Verify the new table is working
SELECT count() FROM {db_name}.usage_billing_consumed;
-- 6. After verification (e.g., 24-48 hours), drop the old table
-- DROP TABLE {db_name}.usage_billing_consumed_old;
3. Update schema file
Update db/clickhouse/007_usage_billing_consumed.sql:
CREATE TABLE IF NOT EXISTS {db_name}.usage_billing_consumed
(
EventId FixedString(16) CODEC(ZSTD(1)),
ConsumedAt DateTime64(9) DEFAULT now64(9) CODEC(Delta(8), ZSTD(1)),
UniqueTransactionId UUID CODEC(ZSTD(1))
)
ENGINE = ReplacingMergeTree(ConsumedAt)
PARTITION BY toYYYYMM(ConsumedAt)
ORDER BY (EventId, UniqueTransactionId)
SETTINGS index_granularity = 8192;
4. Resume consumption jobs
Testing
-
Pre-migration verification:
- Record current row count:
SELECT count() FROM usage_billing_consumed - Record sample data:
SELECT * FROM usage_billing_consumed LIMIT 10
- Record current row count:
-
Post-migration verification:
- Verify row counts match
- Verify
ORDER BYclause:SHOW CREATE TABLE usage_billing_consumed - Test consumption flow end-to-end
- Monitor for any errors in consumption jobs
-
Rollback plan:
- If issues arise, we can swap back to the old table:
RENAME TABLE {db_name}.usage_billing_consumed TO {db_name}.usage_billing_consumed_failed, {db_name}.usage_billing_consumed_old TO {db_name}.usage_billing_consumed;
- If issues arise, we can swap back to the old table:
Impact
- Downtime: Minimal (only during the RENAME operation, which is atomic)
- Data loss risk: Low (we're copying all data and keeping the old table as backup)
- Performance impact: Negligible (composite keys are well-supported in ClickHouse)
-
Application changes: None required (the application already writes
UniqueTransactionId)
Related Code
- Table schema:
db/clickhouse/007_usage_billing_consumed.sql - Write logic:
app/models/billing/usage/consumed.rb(mark_consumed!method) - Consumption job:
app/jobs/billing/usage/consumption_job.rb - Schema loader:
lib/click_house/schema_loader.rb
Acceptance Criteria
-
New table created with
ORDER BY (EventId, UniqueTransactionId) - Data successfully copied from old table to new table
- Atomic swap completed without errors
- Schema file updated in repository
- Consumption jobs resume successfully after migration
- No data loss verified (row counts match)
- Old table dropped after 24-48 hour verification period