Add siphon-backed finder for PipelineAnalytics behind feature flag
What does this MR do and why?
Adds ClickHouse::Finders::Ci::SiphonPipelinesFinder as an alternative data path for the PipelineAnalytics GraphQL field (Project + Group). The new finder reads from siphon_p_ci_pipelines (Siphon-replicated copy of p_ci_pipelines) using an argMax(_siphon_replicated_at) dedup subquery — same pattern as FinishedBuildsDeduplicatedFinder.
The path is selected at runtime by a single feature flag check in CollectPipelineAnalyticsServiceBase#clickhouse_model:
pipeline_analytics_siphondisabled (default): existingci_finished_pipelines_{hourly,daily}MV path. No behaviour change.pipeline_analytics_siphonenabled: new siphon finder.
Once the flag reaches 100% and is removed, the bespoke Ci::ClickHouse::DataIngestion::FinishedPipelinesSyncService sync pipeline becomes redundant.
Notable differences from the MV path
- Dedup:
argMax(_siphon_replicated_at)inner subquery +_siphon_deleted = falseouter filter. NoFINAL. - Bucketing: filters on raw
started_atinstead of the hour-truncatedstarted_at_bucket. This is a deliberate accuracy gain — boundary pipelines that started a few seconds before a window edge are now included. source: siphon storessourceasNullable(Int64)enum, so the finder translates viaCi::Pipeline.sources[symbol]before pushing the filter.traversal_path: siphon includes the leadingorganization_idsegment (e.g.1/9970/19/); the finder usesNamespace#traversal_path(with_organization: true).
Test coverage
All four spec files that exercise the analytics services now run the data-driven shared examples twice — once with the flag disabled and once with it enabled — so neither code path can silently regress during the rollout:
spec/services/ci/collect_aggregate_pipeline_analytics_service_spec.rbspec/services/ci/collect_time_series_pipeline_analytics_service_spec.rbspec/requests/api/graphql/project/project_pipeline_analytics_spec.rbspec/requests/api/graphql/group/group_pipeline_analytics_spec.rb
spec/lib/click_house/finders/ci/siphon_pipelines_finder_spec.rb covers the finder directly with both SQL-shape assertions and execution-based tests (single insert, soft-delete, argMax dedup with two versions, source-enum translation). Coverage: 100% lines, 100% branches.
ClickHouse Query Plans
1. Aggregate count, quantile (project, 30 days)
SELECT count() AS `Total pipeline runs`,
quantile(0.5) (`pipelines`.`duration`) AS `Median duration`,
countIf(status == 'success') AS `Success Count`,
countIf(status == 'failed') AS `Failed Count`
FROM (
SELECT
`siphon_p_ci_pipelines`.`id`,
`siphon_p_ci_pipelines`.`partition_id`,
argMax(`siphon_p_ci_pipelines`.`traversal_path`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS traversal_path,
argMax(`siphon_p_ci_pipelines`.`status`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS status,
argMax(`siphon_p_ci_pipelines`.`source`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS source,
argMax(`siphon_p_ci_pipelines`.`ref`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS ref,
argMax(`siphon_p_ci_pipelines`.`started_at`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS started_at,
argMax(`siphon_p_ci_pipelines`.`finished_at`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS finished_at,
argMax(`siphon_p_ci_pipelines`.`duration`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS duration,
argMax(`siphon_p_ci_pipelines`.`_siphon_deleted`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS _siphon_deleted
FROM `siphon_p_ci_pipelines`
WHERE `siphon_p_ci_pipelines`.`traversal_path` = '1/9970/15846663/'
GROUP BY id, partition_id
) pipelines
WHERE `pipelines`.`started_at` >= toDateTime64('2026-05-04 00:00:00', 6, 'UTC')
AND `pipelines`.`started_at` < toDateTime64('2026-06-03 00:00:00', 6, 'UTC')
AND `pipelines`.`_siphon_deleted` = 'false';Elapsed: 0.837s
Read: 4,342,475 rows (269.88 MB)
-- Query Plan (EXPLAIN indexes = 1)
Expression ((Project names + Projection))
Aggregating
Expression (Before GROUP BY)
Filter ((WHERE + (Change column names to column identifiers + (Project names + Projection))))
Aggregating
Expression (Before GROUP BY)
Expression ((WHERE + Change column names to column identifiers))
ReadFromMergeTree (gitlab_clickhouse_main_production.siphon_p_ci_pipelines)
Indexes:
PrimaryKey
Keys:
traversal_path
Condition: (traversal_path in ['1/9970/15846663/', '1/9970/15846663/'])
Parts: 18/21
Granules: 2111/1133274
Search Algorithm: binary search
Ranges: 182. Time series count, quantile (project, 30 days)
SELECT
dateTrunc('day', `pipelines`.`started_at`, 'UTC') AS timestamp,
count() AS `Total pipeline runs`,
countIf(status == 'success') AS `Success Count`,
countIf(status == 'failed') AS `Failed Count`,
quantile(0.5) (`pipelines`.`duration`) AS p50,
quantile(0.95)(`pipelines`.`duration`) AS p95
FROM (
SELECT
`siphon_p_ci_pipelines`.`id`,
`siphon_p_ci_pipelines`.`partition_id`,
argMax(`siphon_p_ci_pipelines`.`traversal_path`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS traversal_path,
argMax(`siphon_p_ci_pipelines`.`status`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS status,
argMax(`siphon_p_ci_pipelines`.`source`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS source,
argMax(`siphon_p_ci_pipelines`.`ref`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS ref,
argMax(`siphon_p_ci_pipelines`.`started_at`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS started_at,
argMax(`siphon_p_ci_pipelines`.`finished_at`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS finished_at,
argMax(`siphon_p_ci_pipelines`.`duration`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS duration,
argMax(`siphon_p_ci_pipelines`.`_siphon_deleted`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS _siphon_deleted
FROM `siphon_p_ci_pipelines`
WHERE `siphon_p_ci_pipelines`.`traversal_path` = '1/9970/15846663/'
GROUP BY id, partition_id
) pipelines
WHERE `pipelines`.`started_at` >= toDateTime64('2026-05-04 00:00:00', 6, 'UTC')
AND `pipelines`.`started_at` < toDateTime64('2026-06-03 00:00:00', 6, 'UTC')
AND `pipelines`.`_siphon_deleted` = 'false'
GROUP BY timestamp
ORDER BY timestamp;Elapsed: 2.291s
Read: 4,333,448 rows (269.12 MB)
-- Query Plan (EXPLAIN indexes = 1)
Expression (Project names)
Sorting (Sorting for ORDER BY)
Expression ((Before ORDER BY + Projection))
Aggregating
Expression (Before GROUP BY)
Filter ((WHERE + (Change column names to column identifiers + (Project names + Projection))))
Aggregating
Expression (Before GROUP BY)
Expression ((WHERE + Change column names to column identifiers))
ReadFromMergeTree (gitlab_clickhouse_main_production.siphon_p_ci_pipelines)
Indexes:
PrimaryKey
Keys:
traversal_path
Condition: (traversal_path in ['1/9970/15846663/', '1/9970/15846663/'])
Parts: 15/15
Granules: 2109/1133204
Search Algorithm: binary search
Ranges: 153. Aggregate count, quantiles with filters (project, 30 days)
SELECT count() AS `Total pipeline runs`,
quantile(0.5) (`pipelines`.`duration`) AS `Median duration`,
countIf(status == 'success') AS `Success Count`,
countIf(status == 'failed') AS `Failed Count`
FROM (
SELECT
`siphon_p_ci_pipelines`.`id`,
`siphon_p_ci_pipelines`.`partition_id`,
argMax(`siphon_p_ci_pipelines`.`traversal_path`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS traversal_path,
argMax(`siphon_p_ci_pipelines`.`status`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS status,
argMax(`siphon_p_ci_pipelines`.`source`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS source,
argMax(`siphon_p_ci_pipelines`.`ref`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS ref,
argMax(`siphon_p_ci_pipelines`.`started_at`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS started_at,
argMax(`siphon_p_ci_pipelines`.`finished_at`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS finished_at,
argMax(`siphon_p_ci_pipelines`.`duration`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS duration,
argMax(`siphon_p_ci_pipelines`.`_siphon_deleted`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS _siphon_deleted
FROM `siphon_p_ci_pipelines`
WHERE `siphon_p_ci_pipelines`.`traversal_path` = '1/9970/15846663/'
GROUP BY id, partition_id
) pipelines
WHERE `pipelines`.`started_at` >= toDateTime64('2026-05-04 00:00:00', 6, 'UTC')
AND `pipelines`.`started_at` < toDateTime64('2026-06-03 00:00:00', 6, 'UTC')
AND `pipelines`.`source` = 1 -- 1 = 'push' (Ci::Pipeline.sources)
AND `pipelines`.`ref` = 'master'
AND `pipelines`.`status` IN ('success', 'failed', 'canceled', 'skipped')
AND `pipelines`.`_siphon_deleted` = 'false';Elapsed: 0.753s
Read: 4,337,884 rows (455.74 MB)
--Query Plans (EXPLAIN indexes=1)
Expression ((Project names + Projection))
Aggregating
Expression (Before GROUP BY)
Filter ((WHERE + (Change column names to column identifiers + (Project names + Projection))))
Aggregating
Expression (Before GROUP BY)
Expression ((WHERE + Change column names to column identifiers))
ReadFromMergeTree (gitlab_clickhouse_main_production.siphon_p_ci_pipelines)
Indexes:
PrimaryKey
Keys:
traversal_path
Condition: (traversal_path in ['1/9970/15846663/', '1/9970/15846663/'])
Parts: 15/19
Granules: 2110/1133224
Search Algorithm: binary search
Ranges: 15References
- Feature issue: #598440
- Rollout issue: #601824
- Reference pattern:
lib/click_house/finders/ci/finished_builds_deduplicated_finder.rb
Screenshots or screen recordings
N/A — no UI changes. The GraphQL response shape is unchanged.
How to set up and validate locally
-
Apply the migrations and start the GDK so
siphon_p_ci_pipelinesexists locally (gdk update && gdk restart). -
Run the test suite to confirm both data paths pass:
bin/rspec \ spec/lib/click_house/finders/ci/siphon_pipelines_finder_spec.rb \ spec/services/ci/collect_aggregate_pipeline_analytics_service_spec.rb \ spec/services/ci/collect_time_series_pipeline_analytics_service_spec.rb \ spec/requests/api/graphql/project/project_pipeline_analytics_spec.rb \ spec/requests/api/graphql/group/group_pipeline_analytics_spec.rbExpected: 247 examples / 0 failures (each data-driven scenario runs once per flag state).
-
End-to-end validation that seeds deterministic data into both the MV and siphon tables, then runs the service for both flag states and asserts the counts match. Paste the script into
rails console:project = Project.first user = User.where(admin: true).first Gitlab::Auth::CurrentUserMode.bypass_session!(user.id) mv_path = project.project_namespace.traversal_path siphon_path = project.project_namespace.traversal_path(with_organization: true) window_start = Time.utc(2020, 1, 1) window_end = Time.utc(2020, 1, 8) base_id = 9_900_000 fixtures = [ [1, 'success', window_start + 1.day, 1800], [2, 'failed', window_start + 2.days, 600], [3, 'failed', window_start + 3.days, 3600], [4, 'canceled', window_start + 4.days, 60], [5, 'skipped', window_start + 5.days, 900] ] quote = ->(value) do case value when nil then 'NULL' when Integer, Float then value.to_s when TrueClass, FalseClass then value.to_s when Time, DateTime then "toDateTime64('#{value.utc.strftime('%Y-%m-%d %H:%M:%S')}', 6, 'UTC')" else "'#{value.to_s.gsub("'", "\\\\'")}'" end end insert_row = ->(table, row) do cols = row.keys.join(', ') vals = row.values.map { |v| quote.call(v) }.join(', ') ClickHouse::Client.execute("INSERT INTO #{table} (#{cols}) VALUES (#{vals})", :main) end # Idempotent re-run cleanup ClickHouse::Client.execute("ALTER TABLE ci_finished_pipelines DELETE WHERE id >= #{base_id} AND id < #{base_id + 100}", :main) ClickHouse::Client.execute("ALTER TABLE siphon_p_ci_pipelines DELETE WHERE id >= #{base_id} AND id < #{base_id + 100}", :main) fixtures.each do |id_offset, status, started_at, duration| finished_at = started_at + duration.seconds pipeline_id = base_id + id_offset insert_row.call('ci_finished_pipelines', { id: pipeline_id, path: mv_path, committed_at: started_at, created_at: started_at, started_at: started_at, finished_at: finished_at, duration: duration, status: status, source: 'push', ref: 'master' }) insert_row.call('siphon_p_ci_pipelines', { id: pipeline_id, partition_id: 100, project_id: project.id, status: status, source: ::Ci::Pipeline.sources['push'], ref: 'master', duration: duration, committed_at: started_at, created_at: started_at, started_at: started_at, finished_at: finished_at, traversal_path: siphon_path, _siphon_replicated_at: Time.current, _siphon_deleted: false }) end # Let ReplacingMergeTree absorb the inserts. ClickHouse::Client.execute('OPTIMIZE TABLE ci_finished_pipelines FINAL', :main) ClickHouse::Client.execute('OPTIMIZE TABLE siphon_p_ci_pipelines FINAL', :main) results = {} [false, true].each do |enabled| enabled ? Feature.enable(:pipeline_analytics_siphon, project) : Feature.disable(:pipeline_analytics_siphon, project) state = Feature.enabled?(:pipeline_analytics_siphon, project) ? 'ON' : 'OFF' resp = ::Ci::CollectAggregatePipelineAnalyticsService.new( current_user: user, container: project, from_time: window_start, to_time: window_end, status_groups: [:any, :success, :failed, :other] ).execute results[state] = resp.payload && resp.payload[:aggregate] puts "[siphon=#{state}] success=#{resp.success?} aggregate=#{results[state].inspect}" end Feature.disable(:pipeline_analytics_siphon, project) expected = { count: { any: 5, success: 1, failed: 2, other: 2 } } puts "expected: #{expected.inspect}" puts "match (siphon=OFF): #{results['OFF'] == expected}" puts "match (siphon=ON): #{results['ON'] == expected}" # Cleanup ClickHouse::Client.execute("ALTER TABLE ci_finished_pipelines DELETE WHERE id >= #{base_id} AND id < #{base_id + 100}", :main) ClickHouse::Client.execute("ALTER TABLE siphon_p_ci_pipelines DELETE WHERE id >= #{base_id} AND id < #{base_id + 100}", :main)Expected output (verified locally):
[siphon=OFF] success=true aggregate={:count=>{:any=>5, :success=>1, :failed=>2, :other=>2}} [siphon=ON] success=true aggregate={:count=>{:any=>5, :success=>1, :failed=>2, :other=>2}} expected: {:count=>{:any=>5, :success=>1, :failed=>2, :other=>2}} match (siphon=OFF): true match (siphon=ON): true
MR acceptance checklist
Evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.