Add siphon-backed finder for PipelineAnalytics behind feature flag

What does this MR do and why?

Adds ClickHouse::Finders::Ci::SiphonPipelinesFinder as an alternative data path for the PipelineAnalytics GraphQL field (Project + Group). The new finder reads from siphon_p_ci_pipelines (Siphon-replicated copy of p_ci_pipelines) using an argMax(_siphon_replicated_at) dedup subquery — same pattern as FinishedBuildsDeduplicatedFinder.

The path is selected at runtime by a single feature flag check in CollectPipelineAnalyticsServiceBase#clickhouse_model:

  • pipeline_analytics_siphon disabled (default): existing ci_finished_pipelines_{hourly,daily} MV path. No behaviour change.
  • pipeline_analytics_siphon enabled: new siphon finder.

Once the flag reaches 100% and is removed, the bespoke Ci::ClickHouse::DataIngestion::FinishedPipelinesSyncService sync pipeline becomes redundant. ⚠️ (We can do this only when siphon is rolled out to and stable in self-managed & dedicated)

Notable differences from the MV path

  • Dedup: argMax(_siphon_replicated_at) inner subquery + _siphon_deleted = false outer filter. No FINAL.
  • Bucketing: filters on raw started_at instead of the hour-truncated started_at_bucket. This is a deliberate accuracy gain — boundary pipelines that started a few seconds before a window edge are now included.
  • source: siphon stores source as Nullable(Int64) enum, so the finder translates via Ci::Pipeline.sources[symbol] before pushing the filter.
  • traversal_path: siphon includes the leading organization_id segment (e.g. 1/9970/19/); the finder uses Namespace#traversal_path(with_organization: true).
Test coverage

All four spec files that exercise the analytics services now run the data-driven shared examples twice — once with the flag disabled and once with it enabled — so neither code path can silently regress during the rollout:

  • spec/services/ci/collect_aggregate_pipeline_analytics_service_spec.rb
  • spec/services/ci/collect_time_series_pipeline_analytics_service_spec.rb
  • spec/requests/api/graphql/project/project_pipeline_analytics_spec.rb
  • spec/requests/api/graphql/group/group_pipeline_analytics_spec.rb

spec/lib/click_house/finders/ci/siphon_pipelines_finder_spec.rb covers the finder directly with both SQL-shape assertions and execution-based tests (single insert, soft-delete, argMax dedup with two versions, source-enum translation). Coverage: 100% lines, 100% branches.

ClickHouse Query Plans
1. Aggregate count, quantile (project, 30 days)
SELECT count() AS `Total pipeline runs`,
 quantile(0.5) (`pipelines`.`duration`) AS `Median duration`,
 countIf(status == 'success') AS `Success Count`,
 countIf(status == 'failed') AS `Failed Count`
FROM (
  SELECT
    `siphon_p_ci_pipelines`.`id`,
    `siphon_p_ci_pipelines`.`partition_id`,
    argMax(`siphon_p_ci_pipelines`.`traversal_path`,  `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS traversal_path,
    argMax(`siphon_p_ci_pipelines`.`status`,          `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS status,
    argMax(`siphon_p_ci_pipelines`.`source`,          `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS source,
    argMax(`siphon_p_ci_pipelines`.`ref`,             `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS ref,
    argMax(`siphon_p_ci_pipelines`.`started_at`,      `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS started_at,
    argMax(`siphon_p_ci_pipelines`.`finished_at`,     `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS finished_at,
    argMax(`siphon_p_ci_pipelines`.`duration`,        `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS duration,
    argMax(`siphon_p_ci_pipelines`.`_siphon_deleted`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS _siphon_deleted
  FROM `siphon_p_ci_pipelines`
  WHERE `siphon_p_ci_pipelines`.`traversal_path` = '1/9970/15846663/'
  GROUP BY id, partition_id
) pipelines
WHERE `pipelines`.`started_at`     >= toDateTime64('2026-05-04 00:00:00', 6, 'UTC')
  AND `pipelines`.`started_at`     <  toDateTime64('2026-06-03 00:00:00', 6, 'UTC')
  AND `pipelines`.`_siphon_deleted` = 'false';
Elapsed: 0.837s
Read: 4,342,475 rows (269.88 MB)

-- Query Plan (EXPLAIN indexes = 1)
Expression ((Project names + Projection))
Aggregating
Expression (Before GROUP BY)
Filter ((WHERE + (Change column names to column identifiers + (Project names + Projection))))
Aggregating
Expression (Before GROUP BY)
Expression ((WHERE + Change column names to column identifiers))
ReadFromMergeTree (gitlab_clickhouse_main_production.siphon_p_ci_pipelines)
Indexes:
PrimaryKey
Keys:
traversal_path
Condition: (traversal_path in ['1/9970/15846663/', '1/9970/15846663/'])
Parts: 18/21
Granules: 2111/1133274
Search Algorithm: binary search
Ranges: 18
2. Time series count, quantile (project, 30 days)
SELECT
  dateTrunc('day', `pipelines`.`started_at`, 'UTC') AS timestamp,
  count() AS `Total pipeline runs`,
 countIf(status == 'success') AS `Success Count`,
 countIf(status == 'failed') AS `Failed Count`,
  quantile(0.5) (`pipelines`.`duration`) AS p50,
  quantile(0.95)(`pipelines`.`duration`) AS p95
FROM (
  SELECT
    `siphon_p_ci_pipelines`.`id`,
    `siphon_p_ci_pipelines`.`partition_id`,
    argMax(`siphon_p_ci_pipelines`.`traversal_path`,  `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS traversal_path,
    argMax(`siphon_p_ci_pipelines`.`status`,          `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS status,
    argMax(`siphon_p_ci_pipelines`.`source`,          `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS source,
    argMax(`siphon_p_ci_pipelines`.`ref`,             `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS ref,
    argMax(`siphon_p_ci_pipelines`.`started_at`,      `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS started_at,
    argMax(`siphon_p_ci_pipelines`.`finished_at`,     `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS finished_at,
    argMax(`siphon_p_ci_pipelines`.`duration`,        `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS duration,
    argMax(`siphon_p_ci_pipelines`.`_siphon_deleted`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS _siphon_deleted
  FROM `siphon_p_ci_pipelines`
  WHERE `siphon_p_ci_pipelines`.`traversal_path` = '1/9970/15846663/'
  GROUP BY id, partition_id
) pipelines
WHERE `pipelines`.`started_at`     >= toDateTime64('2026-05-04 00:00:00', 6, 'UTC')
  AND `pipelines`.`started_at`     <  toDateTime64('2026-06-03 00:00:00', 6, 'UTC')
  AND `pipelines`.`_siphon_deleted` = 'false'
GROUP BY timestamp
ORDER BY timestamp;
Elapsed: 2.291s
Read: 4,333,448 rows (269.12 MB)

-- Query Plan (EXPLAIN indexes = 1)
Expression (Project names)
Sorting (Sorting for ORDER BY)
Expression ((Before ORDER BY + Projection))
Aggregating
Expression (Before GROUP BY)
Filter ((WHERE + (Change column names to column identifiers + (Project names + Projection))))
Aggregating
Expression (Before GROUP BY)
Expression ((WHERE + Change column names to column identifiers))
ReadFromMergeTree (gitlab_clickhouse_main_production.siphon_p_ci_pipelines)
Indexes:
PrimaryKey
Keys:
traversal_path
Condition: (traversal_path in ['1/9970/15846663/', '1/9970/15846663/'])
Parts: 15/15
Granules: 2109/1133204
Search Algorithm: binary search
Ranges: 15
3. Aggregate count, quantiles with filters (project, 30 days)
SELECT count() AS `Total pipeline runs`,
 quantile(0.5) (`pipelines`.`duration`) AS `Median duration`,
 countIf(status == 'success') AS `Success Count`,
 countIf(status == 'failed') AS `Failed Count`
FROM (
  SELECT
    `siphon_p_ci_pipelines`.`id`,
    `siphon_p_ci_pipelines`.`partition_id`,
    argMax(`siphon_p_ci_pipelines`.`traversal_path`,  `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS traversal_path,
    argMax(`siphon_p_ci_pipelines`.`status`,          `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS status,
    argMax(`siphon_p_ci_pipelines`.`source`,          `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS source,
    argMax(`siphon_p_ci_pipelines`.`ref`,             `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS ref,
    argMax(`siphon_p_ci_pipelines`.`started_at`,      `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS started_at,
    argMax(`siphon_p_ci_pipelines`.`finished_at`,     `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS finished_at,
    argMax(`siphon_p_ci_pipelines`.`duration`,        `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS duration,
    argMax(`siphon_p_ci_pipelines`.`_siphon_deleted`, `siphon_p_ci_pipelines`.`_siphon_replicated_at`) AS _siphon_deleted
  FROM `siphon_p_ci_pipelines`
  WHERE `siphon_p_ci_pipelines`.`traversal_path` = '1/9970/15846663/'
  GROUP BY id, partition_id
) pipelines
WHERE `pipelines`.`started_at`     >= toDateTime64('2026-05-04 00:00:00', 6, 'UTC')
  AND `pipelines`.`started_at`     <  toDateTime64('2026-06-03 00:00:00', 6, 'UTC')
  AND `pipelines`.`source`          = 1                                            -- 1 = 'push' (Ci::Pipeline.sources)
  AND `pipelines`.`ref`             = 'master'
  AND `pipelines`.`status`         IN ('success', 'failed', 'canceled', 'skipped')
  AND `pipelines`.`_siphon_deleted` = 'false';
Elapsed: 0.753s
Read: 4,337,884 rows (455.74 MB)

--Query Plans (EXPLAIN indexes=1)

Expression ((Project names + Projection))
Aggregating
Expression (Before GROUP BY)
Filter ((WHERE + (Change column names to column identifiers + (Project names + Projection))))
Aggregating
Expression (Before GROUP BY)
Expression ((WHERE + Change column names to column identifiers))
ReadFromMergeTree (gitlab_clickhouse_main_production.siphon_p_ci_pipelines)
Indexes:
PrimaryKey
Keys:
traversal_path
Condition: (traversal_path in ['1/9970/15846663/', '1/9970/15846663/'])
Parts: 15/19
Granules: 2110/1133224
Search Algorithm: binary search
Ranges: 15

References

Screenshots or screen recordings

N/A — no UI changes. The GraphQL response shape is unchanged.

How to set up and validate locally

  1. Apply the migrations and start the GDK so siphon_p_ci_pipelines exists locally (gdk update && gdk restart).

  2. Run the test suite to confirm both data paths pass:

    bin/rspec \
      spec/lib/click_house/finders/ci/siphon_pipelines_finder_spec.rb \
      spec/services/ci/collect_aggregate_pipeline_analytics_service_spec.rb \
      spec/services/ci/collect_time_series_pipeline_analytics_service_spec.rb \
      spec/requests/api/graphql/project/project_pipeline_analytics_spec.rb \
      spec/requests/api/graphql/group/group_pipeline_analytics_spec.rb

    Expected: 247 examples / 0 failures (each data-driven scenario runs once per flag state).

  3. End-to-end validation that seeds deterministic data into both the MV and siphon tables, then runs the service for both flag states and asserts the counts match. Paste the script into rails console:

    project = Project.first
    user    = User.where(admin: true).first
    Gitlab::Auth::CurrentUserMode.bypass_session!(user.id)
    
    mv_path     = project.project_namespace.traversal_path
    siphon_path = project.project_namespace.traversal_path(with_organization: true)
    
    window_start = Time.utc(2020, 1, 1)
    window_end   = Time.utc(2020, 1, 8)
    base_id      = 9_900_000
    
    fixtures = [
      [1, 'success',  window_start + 1.day,  1800],
      [2, 'failed',   window_start + 2.days, 600],
      [3, 'failed',   window_start + 3.days, 3600],
      [4, 'canceled', window_start + 4.days, 60],
      [5, 'skipped',  window_start + 5.days, 900]
    ]
    
    quote = ->(value) do
      case value
      when nil                   then 'NULL'
      when Integer, Float        then value.to_s
      when TrueClass, FalseClass then value.to_s
      when Time, DateTime        then "toDateTime64('#{value.utc.strftime('%Y-%m-%d %H:%M:%S')}', 6, 'UTC')"
      else "'#{value.to_s.gsub("'", "\\\\'")}'"
      end
    end
    
    insert_row = ->(table, row) do
      cols = row.keys.join(', ')
      vals = row.values.map { |v| quote.call(v) }.join(', ')
      ClickHouse::Client.execute("INSERT INTO #{table} (#{cols}) VALUES (#{vals})", :main)
    end
    
    # Idempotent re-run cleanup
    ClickHouse::Client.execute("ALTER TABLE ci_finished_pipelines DELETE WHERE id >= #{base_id} AND id < #{base_id + 100}", :main)
    ClickHouse::Client.execute("ALTER TABLE siphon_p_ci_pipelines DELETE WHERE id >= #{base_id} AND id < #{base_id + 100}", :main)
    
    fixtures.each do |id_offset, status, started_at, duration|
      finished_at = started_at + duration.seconds
      pipeline_id = base_id + id_offset
    
      insert_row.call('ci_finished_pipelines', {
        id: pipeline_id, path: mv_path, committed_at: started_at, created_at: started_at,
        started_at: started_at, finished_at: finished_at, duration: duration,
        status: status, source: 'push', ref: 'master'
      })
    
      insert_row.call('siphon_p_ci_pipelines', {
        id: pipeline_id, partition_id: 100, project_id: project.id, status: status,
        source: ::Ci::Pipeline.sources['push'], ref: 'master', duration: duration,
        committed_at: started_at, created_at: started_at, started_at: started_at,
        finished_at: finished_at, traversal_path: siphon_path,
        _siphon_replicated_at: Time.current, _siphon_deleted: false
      })
    end
    
    # Let ReplacingMergeTree absorb the inserts.
    ClickHouse::Client.execute('OPTIMIZE TABLE ci_finished_pipelines FINAL', :main)
    ClickHouse::Client.execute('OPTIMIZE TABLE siphon_p_ci_pipelines FINAL', :main)
    
    results = {}
    [false, true].each do |enabled|
      enabled ? Feature.enable(:pipeline_analytics_siphon, project) : Feature.disable(:pipeline_analytics_siphon, project)
      state = Feature.enabled?(:pipeline_analytics_siphon, project) ? 'ON' : 'OFF'
      resp  = ::Ci::CollectAggregatePipelineAnalyticsService.new(
        current_user: user, container: project,
        from_time: window_start, to_time: window_end,
        status_groups: [:any, :success, :failed, :other]
      ).execute
      results[state] = resp.payload && resp.payload[:aggregate]
      puts "[siphon=#{state}] success=#{resp.success?} aggregate=#{results[state].inspect}"
    end
    
    Feature.disable(:pipeline_analytics_siphon, project)
    
    expected = { count: { any: 5, success: 1, failed: 2, other: 2 } }
    puts "expected:           #{expected.inspect}"
    puts "match (siphon=OFF): #{results['OFF'] == expected}"
    puts "match (siphon=ON):  #{results['ON']  == expected}"
    
    # Cleanup
    ClickHouse::Client.execute("ALTER TABLE ci_finished_pipelines DELETE WHERE id >= #{base_id} AND id < #{base_id + 100}", :main)
    ClickHouse::Client.execute("ALTER TABLE siphon_p_ci_pipelines DELETE WHERE id >= #{base_id} AND id < #{base_id + 100}", :main)

    Expected output (verified locally):

    [siphon=OFF] success=true aggregate={:count=>{:any=>5, :success=>1, :failed=>2, :other=>2}}
    [siphon=ON]  success=true aggregate={:count=>{:any=>5, :success=>1, :failed=>2, :other=>2}}
    expected:           {:count=>{:any=>5, :success=>1, :failed=>2, :other=>2}}
    match (siphon=OFF): true
    match (siphon=ON):  true

MR acceptance checklist

Evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.

Edited by Narendran

Merge request reports

Loading