Integrate Ci::Pipelines::AutoCleanupService in DestroyOldPipelinesWorker

What does this MR do and why?

Integrates Ci::Pipelines::AutoCleanupService into DestroyOldPipelinesWorker to fix statement timeouts when deleting old pipelines.

This MR builds on !211726 (merged) by enabling the optimized pipeline deletion service in the worker behind the ci_optimized_old_pipelines_query feature flag. The integration:

  • Uses the new AutoCleanupService when the feature flag is enabled
  • Falls back to the legacy implementation when disabled
  • Re-enqueue either skipped_pipelines_size or destroyed_pipelines_size > 100
  • Logs both destroyed and skipped pipeline counts for monitoring

References

#572278 (closed)

Parent MR: !211726 (merged)

Screenshots or screen recordings

N/A

How to set up and validate locally

1. Setup - Create Project and Pipelines

  # Create a project with 90-day retention period
  timestamp = Time.now.to_i
  unique_path = "test-pipeline-delete-#{timestamp}"

  namespace = FactoryBot.create(:namespace, path: unique_path)

  project = FactoryBot.create(
    :project,
    name: unique_path,
    path: unique_path,
    namespace: namespace,
    ci_delete_pipelines_in_seconds: 90.days.to_i
  )

  # Create enough old pipelines across multiple statuses to exceed RE_ENQUEUE_THRESHOLD (100)
  # With 3 status groups × 50 pipelines each = 150 total processed
  ancient_success = FactoryBot.create_list(:ci_pipeline, 60,
    project: project,
    status: 'success',
    created_at: 1.year.ago,
    locked: :unlocked
  )

  ancient_failed = FactoryBot.create_list(:ci_pipeline, 60,
    project: project,
    status: 'failed',
    created_at: 1.year.ago,
    locked: :unlocked
  )

  ancient_canceled = FactoryBot.create_list(:ci_pipeline, 60,
    project: project,
    status: 'canceled',
    created_at: 1.year.ago,
    locked: :unlocked
  )

  # Create recent pipelines - should NOT be deleted
  recent_success = FactoryBot.create_list(:ci_pipeline, 10,
    project: project,
    status: 'success',
    created_at: 1.week.ago,
    locked: :unlocked
  )

2. Verify Pipeline Counts Before

  # Check total counts
  project.all_pipelines.count
  # => Should be 190

  # Check by age
  cutoff = 90.days.ago
  project.all_pipelines.where('created_at < ?', cutoff).count
  # => Should be 180 (ancient)

  project.all_pipelines.where('created_at >= ?', cutoff).count
  # => Should be 10 (recent)

3. Enqueue Project and Run Worker with Feature Flag Enabled

  # Enqueue the project for cleanup
  cleanup_queue = Ci::RetentionPolicies::ProjectsCleanupQueue.instance
  cleanup_queue.enqueue!(project)

  # Enable the feature flag
  Feature.enable(:ci_optimized_old_pipelines_query, project.root_namespace)

  # Run the worker
  worker = Ci::DestroyOldPipelinesWorker.new
  worker.perform_work

  # Check the result - processes 50 per status group (success, failed, canceled)
  project.reload.all_pipelines.count
  # => Should be 40 (deleted 50+50+50=150 pipelines)

4. Verify Re-enqueuing Behavior

  # Check if project was re-enqueued (should be because 150 > 100)
  cleanup_queue.size
  # => Should be 1 (re-enqueued because destroyed_count=150 > 100)

  # Run worker again if re-enqueued to continue cleanup
  worker.perform_work if cleanup_queue.size > 0

  project.reload.all_pipelines.count
  # => Should be 10 (deleted remaining 30 old pipelines)

5. Verify Final State

  # After all runs, only recent pipelines should remain
  project.reload.all_pipelines.count
  # => Should be 10

  # Verify all remaining pipelines are recent
  project.all_pipelines.where('created_at < ?', 90.days.ago).count
  # => Should be 0

6. Check Cache

  # Read the cache to verify per-status tracking
  cache = Ci::RetentionPolicies::PipelineDeletionCutoffCache.new(project: project).read

  # Inspect cache values
  cache.keys
  # => ["success", "failed", "canceled", "skipped", "manual", "other"]

  cache.values.compact.map(&:to_date)
  # Should show dates around 1 year ago (the most recent deleted pipelines)

7. Test Legacy Behavior (Feature Flag Disabled)

  # Reset the test
  project.destroy!
  namespace.destroy!

  # Recreate project and pipelines
  timestamp = Time.now.to_i
  unique_path = "test-pipeline-delete-#{timestamp}"

  namespace = FactoryBot.create(:namespace, path: unique_path)

  project = FactoryBot.create(
    :project,
    name: unique_path,
    path: unique_path,
    namespace: namespace,
    ci_delete_pipelines_in_seconds: 90.days.to_i
  )

  ancient_success = FactoryBot.create_list(:ci_pipeline, 120, project: project, status: 'success', created_at: 1.year.ago, locked: :unlocked)
  recent_success = FactoryBot.create_list(:ci_pipeline, 10, project: project, status: 'success', created_at: 1.week.ago, locked: :unlocked)

  # Disable the feature flag
  Feature.disable(:ci_optimized_old_pipelines_query, project.root_namespace)

  # Enqueue and run worker
  cleanup_queue.enqueue!(project)
  worker = Ci::DestroyOldPipelinesWorker.new
  worker.perform_work

  # Verify legacy behavior (deletes up to 250 pipelines at once)
  project.reload.all_pipelines.count
  # => Should be 10 (all 120 old pipelines deleted in one run)

8. Clean Up

  # Clean up test data when done
  project.destroy!
  namespace.destroy!
  Feature.remove(:ci_optimized_old_pipelines_query)

Complete Testing Snippet

Here's a simple snippet to copy-paste in rails console:

# === Setup ===
timestamp = Time.now.to_i
unique_path = "test-pipeline-delete-#{timestamp}"

namespace = FactoryBot.create(:namespace, path: unique_path)

project = FactoryBot.create(
  :project,
  name: unique_path,
  path: unique_path,
  namespace: namespace,
  ci_delete_pipelines_in_seconds: 90.days.to_i
)

# Create enough pipelines across multiple statuses to exceed RE_ENQUEUE_THRESHOLD (100)
# Service processes up to 50 per status group, so with 3 statuses we can process 150 total
ancient_success = FactoryBot.create_list(:ci_pipeline, 60, project: project, status: 'success', created_at: 1.year.ago, locked: :unlocked)
ancient_failed = FactoryBot.create_list(:ci_pipeline, 60, project: project, status: 'failed', created_at: 1.year.ago, locked: :unlocked)
ancient_canceled = FactoryBot.create_list(:ci_pipeline, 60, project: project, status: 'canceled', created_at: 1.year.ago, locked: :unlocked)

# Create recent pipelines - should NOT be deleted
recent = FactoryBot.create_list(:ci_pipeline, 10, project: project, status: 'success', created_at: 1.week.ago, locked: :unlocked)

puts "Before: #{project.all_pipelines.count} pipelines"
# => 190

# === Enable Feature Flag and Run Worker ===
Feature.enable(:ci_optimized_old_pipelines_query, project.root_namespace)

cleanup_queue = Ci::RetentionPolicies::ProjectsCleanupQueue.instance
cleanup_queue.enqueue!(project)

worker = Ci::DestroyOldPipelinesWorker.new
worker.perform_work

puts "After first run: #{project.reload.all_pipelines.count} pipelines"
# => Should be 40 (deleted 50 success + 50 failed + 50 canceled = 150 total, triggers re-enqueue)

puts "Re-enqueued: #{cleanup_queue.size > 0}"
# => true (because 150 > 100)

# Continue running if re-enqueued
while cleanup_queue.size > 0
  worker.perform_work
  puts "After run: #{project.reload.all_pipelines.count} pipelines"
end

puts "Final count: #{project.reload.all_pipelines.count} pipelines"
# => 10 (only recent remain)

# === Verify Cache ===
cache = Ci::RetentionPolicies::PipelineDeletionCutoffCache.new(project: project).read
puts "Cache timestamps: #{cache.values.map { |t| t.strftime('%Y-%m-%d') }.uniq}"

# === Cleanup ===
project.destroy!
namespace.destroy!
Feature.remove(:ci_optimized_old_pipelines_query)

MR acceptance checklist

Evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.

Edited by Narendran

Merge request reports

Loading