Geo: Add calculator service and worker for job artifact verification summaries

What does this MR do and why?

Add backfill service and calculator worker for job artifact verification summaries

This MR is part 2 of a series to replace batch_count verification metrics with pre-calculated summaries:

  • MR1 (merged): Created geo_ci_job_artifact_verification_summaries table
  • MR2 (this): Calculator service and worker (FF-gated, no-op until data exists)
  • MR3 (planned): Functional index on ci_job_artifact_states, post-deploy migration to populate 100k summary rows, DB trigger to mark buckets dirty on changes
  • MR4 (planned): Replicator switch to read from summaries instead of batch_count

How to set up and validate locally

How to set up and validate locally

This MR adds a calculator service and worker behind a disabled feature flag. On deploy, nothing changes — the worker exits immediately when the FF is off.

To verify the worker and service work correctly:

  1. Ensure the summaries table exists (created in MR1):

    Geo::CiJobArtifactVerificationSummary.count
    # => 0
  2. Create some test data in Rails console:

    # Create a dirty summary bucket
    Geo::CiJobArtifactVerificationSummary.create!(
      bucket_number: 0,
      state: :dirty,
      state_changed_at: Time.current
    )
  3. Verify the calculator service processes it:

    result = Geo::CiJobArtifactVerificationSummaryCalculatorService.new.execute
    result[:buckets_calculated]
    # => 1
    
    summary = Geo::CiJobArtifactVerificationSummary.find_by!(bucket_number: 0)
    summary.state
    # => "clean"
    summary.total_count
    # => 0 (no matching artifact states in bucket 0)
  4. Verify the worker respects the feature flag:

    # FF is disabled by default — worker is a no-op
    Feature.enabled?(:geo_job_artifact_verification_summaries)
    # => false
    
    Geo::CiJobArtifactVerificationSummaryCalculatorWorker.new.perform
    # Returns immediately, nothing processed
    
    # Enable the FF
    Feature.enable(:geo_job_artifact_verification_summaries)
    
    # Create another dirty bucket
    Geo::CiJobArtifactVerificationSummary.create!(
      bucket_number: 1,
      state: :dirty,
      state_changed_at: Time.current
    )
    
    Geo::CiJobArtifactVerificationSummaryCalculatorWorker.new.perform
    Geo::CiJobArtifactVerificationSummary.find_by!(bucket_number: 1).state
    # => "clean"
  5. Run the specs:

    bundle exec rspec ee/spec/services/geo/ci_job_artifact_verification_summary_calculator_service_spec.rb ee/spec/workers/geo/ci_job_artifact_verification_summary_calculator_worker_spec.rb

Database Review

Database review notes:

  • Geo is not active on GitLab.com (only GitLab Dedicated and Self-Managed), so Geo tables have no meaningful production data in the postgres.ai snapshot.
  • geo_ci_job_artifact_verification_summaries was created in a prior MR that has not yet reached the postgres.ai snapshot. It was created via DDL and seeded with 100k rows (45k clean, 50k dirty, 5k calculating) to produce realistic query plans. exec ANALYZE was run after seeding.
  • ci_job_artifact_states was seeded with 10M rows (70% verified, 20% pending, 10% failed) to simulate production-scale data. FK constraint was temporarily dropped for seeding. exec ANALYZE was run after seeding.
  • All queries are behind the geo_job_artifact_verification_summaries feature flag (disabled by default). The summaries table won't be populated until a follow-up MR adds a post-deploy backfill migration.
Query Link Execution time
Q1: bucket_numbers_needing_recalculation 149417 ~0.1ms
Q2: claim_buckets 149945 ~22ms
Q3: count_dirty_buckets (no index) 149419 1.98s
Q3: count_dirty_buckets (with index) 149423 10.6ms
Q4: update_summary_counts 149952 ~1.8ms

Query 1: bucket_numbers_needing_recalculation

Finds dirty buckets and any calculating buckets stuck longer than 10 minutes (crash recovery). Uses the partial index on state IN (1, 2).

CiJobArtifactVerificationSummary
  .needs_recalculation(10.minutes)
  .state_changed_asc
  .limit(100)
  .pluck(:bucket_number)
SELECT "geo_ci_job_artifact_verification_summaries"."bucket_number"
FROM "geo_ci_job_artifact_verification_summaries"
WHERE ("geo_ci_job_artifact_verification_summaries"."state" = 1
  OR ("geo_ci_job_artifact_verification_summaries"."state" = 2
    AND "geo_ci_job_artifact_verification_summaries"."state_changed_at" < now() - interval '10 minutes'))
ORDER BY "geo_ci_job_artifact_verification_summaries"."state_changed_at" ASC
LIMIT 100;

Query plan: https://postgres.ai/console/gitlab/gitlab-production-ci/sessions/50334/commands/149417

Query 2: mark_as_calculating

Atomically selects dirty/stuck buckets and transitions them to calculating state in a single query. Replaces the previous separate select + update approach.

table = CiJobArtifactVerificationSummary.table_name eligible_sql = CiJobArtifactVerificationSummary .needs_recalculation(CALCULATING_TIMEOUT) .state_changed_asc .limit(DIRTY_BUCKET_LIMIT) .select(:bucket_number) .to_sql

connection.exec_query(<<~SQL) UPDATE #{table} AS summaries SET state = 2, state_changed_at = now() FROM (#{eligible_sql}) AS eligible WHERE summaries.bucket_number = eligible.bucket_number RETURNING summaries.bucket_number SQL


```sql
UPDATE geo_ci_job_artifact_verification_summaries AS summaries
SET state = 2, state_changed_at = now()
FROM (
  SELECT "geo_ci_job_artifact_verification_summaries"."bucket_number"
  FROM "geo_ci_job_artifact_verification_summaries"
  WHERE ("geo_ci_job_artifact_verification_summaries"."state" = 1
    OR "geo_ci_job_artifact_verification_summaries"."state" = 2
    AND "geo_ci_job_artifact_verification_summaries"."state_changed_at" < now() - interval '10 minutes')
  ORDER BY "geo_ci_job_artifact_verification_summaries"."state_changed_at" ASC
  LIMIT 100
) AS eligible
WHERE summaries.bucket_number = eligible.bucket_number
RETURNING summaries.bucket_number;

Query plan: https://console.postgres.ai/gitlab/gitlab-production-ci/sessions/50521/commands/149945

Query 3: count_dirty_buckets (critical — hits large table)

Grouped COUNT with conditional aggregation against ci_job_artifact_states.

JobArtifactState
  .where("job_artifact_id % ? IN (?)", 100_000, bucket_numbers)
  .select(
    Arel.sql("job_artifact_id % 100000 AS bucket_number"),
    Arel.sql("COUNT(*) AS total_count"),
    Arel.sql("COUNT(*) FILTER (WHERE verification_state = 2) AS verified_count"),
    Arel.sql("COUNT(*) FILTER (WHERE verification_state = 3) AS failed_count")
  )
  .group(Arel.sql("job_artifact_id % 100000"))
SELECT job_artifact_id % 100000 AS bucket_number,
  COUNT(*) AS total_count,
  COUNT(*) FILTER (WHERE verification_state = 2) AS verified_count,
  COUNT(*) FILTER (WHERE verification_state = 3) AS failed_count
FROM ci_job_artifact_states
WHERE job_artifact_id % 100000 IN (0,1,2,...,99)
GROUP BY job_artifact_id % 100000;

Without functional index (current state): https://postgres.ai/console/gitlab/gitlab-production-ci/sessions/50334/commands/1494191.98s (Seq Scan on 10M rows)

With functional index ((job_artifact_id % 100000)) (planned for follow-up MR): https://postgres.ai/console/gitlab/gitlab-production-ci/sessions/50334/commands/14942310.6ms (Bitmap Index Scan) — ~186x improvement

Note: The functional index ((job_artifact_id % 100000)) on ci_job_artifact_states is planned for a follow-up MR. This MR is gated behind a disabled feature flag and the summaries table won't be populated until after the index is in place.

Query 4: update_summary_counts

Writes recalculated counts for all claimed buckets in a single batched query. Only updates buckets still in calculating state to avoid overwriting buckets re-dirtied by the DB trigger during calculation.

values = @bucket_numbers.map do |bucket_number|
  counts = @counts_by_bucket.fetch(bucket_number, { total: 0, verified: 0, failed: 0 })
  "(#{bucket_number}, #{counts[:total]}, #{counts[:verified]}, #{counts[:failed]})"
end

connection.execute(<<~SQL)
  UPDATE #{table} AS summaries
  SET total_count = data.total, verified_count = data.verified,
      failed_count = data.failed, state = 0, state_changed_at = now(),
      last_calculated_at = now(), updated_at = now()
  FROM (VALUES #{values.join(', ')}) AS data(bucket_number, total, verified, failed)
  WHERE summaries.bucket_number = data.bucket_number
    AND summaries.state = 2
SQL
UPDATE geo_ci_job_artifact_verification_summaries AS summaries
SET total_count = data.total, verified_count = data.verified,
    failed_count = data.failed, state = 0, state_changed_at = now(),
    last_calculated_at = now(), updated_at = now()
FROM (VALUES (95000,1000,500,167),(95001,980,490,163),(95002,1020,510,170),...,
  (95099,1000,500,167)) AS data(bucket_number, total, verified, failed)
WHERE summaries.bucket_number = data.bucket_number
  AND summaries.state = 2;

Query plan: https://console.postgres.ai/gitlab/gitlab-production-ci/sessions/50521/commands/149952

MR acceptance checklist

Evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.

Related to #590853 (closed)

Edited by Scott Murray

Merge request reports

Loading