Integrate Ci::Pipelines::AutoCleanupService in DestroyOldPipelinesWorker
What does this MR do and why?
Integrates Ci::Pipelines::AutoCleanupService into DestroyOldPipelinesWorker to fix statement timeouts when deleting old pipelines.
This MR builds on !211726 (merged) by enabling the optimized pipeline deletion service in the worker behind the ci_optimized_old_pipelines_query feature flag. The integration:
- Uses the new
AutoCleanupServicewhen the feature flag is enabled - Falls back to the legacy implementation when disabled
- Re-enqueue either
skipped_pipelines_sizeordestroyed_pipelines_size>100 - Logs both destroyed and skipped pipeline counts for monitoring
References
Parent MR: !211726 (merged)
Screenshots or screen recordings
N/A
How to set up and validate locally
1. Setup - Create Project and Pipelines
# Create a project with 90-day retention period
timestamp = Time.now.to_i
unique_path = "test-pipeline-delete-#{timestamp}"
namespace = FactoryBot.create(:namespace, path: unique_path)
project = FactoryBot.create(
:project,
name: unique_path,
path: unique_path,
namespace: namespace,
ci_delete_pipelines_in_seconds: 90.days.to_i
)
# Create enough old pipelines across multiple statuses to exceed RE_ENQUEUE_THRESHOLD (100)
# With 3 status groups × 50 pipelines each = 150 total processed
ancient_success = FactoryBot.create_list(:ci_pipeline, 60,
project: project,
status: 'success',
created_at: 1.year.ago,
locked: :unlocked
)
ancient_failed = FactoryBot.create_list(:ci_pipeline, 60,
project: project,
status: 'failed',
created_at: 1.year.ago,
locked: :unlocked
)
ancient_canceled = FactoryBot.create_list(:ci_pipeline, 60,
project: project,
status: 'canceled',
created_at: 1.year.ago,
locked: :unlocked
)
# Create recent pipelines - should NOT be deleted
recent_success = FactoryBot.create_list(:ci_pipeline, 10,
project: project,
status: 'success',
created_at: 1.week.ago,
locked: :unlocked
)
2. Verify Pipeline Counts Before
# Check total counts
project.all_pipelines.count
# => Should be 190
# Check by age
cutoff = 90.days.ago
project.all_pipelines.where('created_at < ?', cutoff).count
# => Should be 180 (ancient)
project.all_pipelines.where('created_at >= ?', cutoff).count
# => Should be 10 (recent)
3. Enqueue Project and Run Worker with Feature Flag Enabled
# Enqueue the project for cleanup
cleanup_queue = Ci::RetentionPolicies::ProjectsCleanupQueue.instance
cleanup_queue.enqueue!(project)
# Enable the feature flag
Feature.enable(:ci_optimized_old_pipelines_query, project.root_namespace)
# Run the worker
worker = Ci::DestroyOldPipelinesWorker.new
worker.perform_work
# Check the result - processes 50 per status group (success, failed, canceled)
project.reload.all_pipelines.count
# => Should be 40 (deleted 50+50+50=150 pipelines)
4. Verify Re-enqueuing Behavior
# Check if project was re-enqueued (should be because 150 > 100)
cleanup_queue.size
# => Should be 1 (re-enqueued because destroyed_count=150 > 100)
# Run worker again if re-enqueued to continue cleanup
worker.perform_work if cleanup_queue.size > 0
project.reload.all_pipelines.count
# => Should be 10 (deleted remaining 30 old pipelines)
5. Verify Final State
# After all runs, only recent pipelines should remain
project.reload.all_pipelines.count
# => Should be 10
# Verify all remaining pipelines are recent
project.all_pipelines.where('created_at < ?', 90.days.ago).count
# => Should be 0
6. Check Cache
# Read the cache to verify per-status tracking
cache = Ci::RetentionPolicies::PipelineDeletionCutoffCache.new(project: project).read
# Inspect cache values
cache.keys
# => ["success", "failed", "canceled", "skipped", "manual", "other"]
cache.values.compact.map(&:to_date)
# Should show dates around 1 year ago (the most recent deleted pipelines)
7. Test Legacy Behavior (Feature Flag Disabled)
# Reset the test
project.destroy!
namespace.destroy!
# Recreate project and pipelines
timestamp = Time.now.to_i
unique_path = "test-pipeline-delete-#{timestamp}"
namespace = FactoryBot.create(:namespace, path: unique_path)
project = FactoryBot.create(
:project,
name: unique_path,
path: unique_path,
namespace: namespace,
ci_delete_pipelines_in_seconds: 90.days.to_i
)
ancient_success = FactoryBot.create_list(:ci_pipeline, 120, project: project, status: 'success', created_at: 1.year.ago, locked: :unlocked)
recent_success = FactoryBot.create_list(:ci_pipeline, 10, project: project, status: 'success', created_at: 1.week.ago, locked: :unlocked)
# Disable the feature flag
Feature.disable(:ci_optimized_old_pipelines_query, project.root_namespace)
# Enqueue and run worker
cleanup_queue.enqueue!(project)
worker = Ci::DestroyOldPipelinesWorker.new
worker.perform_work
# Verify legacy behavior (deletes up to 250 pipelines at once)
project.reload.all_pipelines.count
# => Should be 10 (all 120 old pipelines deleted in one run)
8. Clean Up
# Clean up test data when done
project.destroy!
namespace.destroy!
Feature.remove(:ci_optimized_old_pipelines_query)
Complete Testing Snippet
Here's a simple snippet to copy-paste in rails console:
# === Setup ===
timestamp = Time.now.to_i
unique_path = "test-pipeline-delete-#{timestamp}"
namespace = FactoryBot.create(:namespace, path: unique_path)
project = FactoryBot.create(
:project,
name: unique_path,
path: unique_path,
namespace: namespace,
ci_delete_pipelines_in_seconds: 90.days.to_i
)
# Create enough pipelines across multiple statuses to exceed RE_ENQUEUE_THRESHOLD (100)
# Service processes up to 50 per status group, so with 3 statuses we can process 150 total
ancient_success = FactoryBot.create_list(:ci_pipeline, 60, project: project, status: 'success', created_at: 1.year.ago, locked: :unlocked)
ancient_failed = FactoryBot.create_list(:ci_pipeline, 60, project: project, status: 'failed', created_at: 1.year.ago, locked: :unlocked)
ancient_canceled = FactoryBot.create_list(:ci_pipeline, 60, project: project, status: 'canceled', created_at: 1.year.ago, locked: :unlocked)
# Create recent pipelines - should NOT be deleted
recent = FactoryBot.create_list(:ci_pipeline, 10, project: project, status: 'success', created_at: 1.week.ago, locked: :unlocked)
puts "Before: #{project.all_pipelines.count} pipelines"
# => 190
# === Enable Feature Flag and Run Worker ===
Feature.enable(:ci_optimized_old_pipelines_query, project.root_namespace)
cleanup_queue = Ci::RetentionPolicies::ProjectsCleanupQueue.instance
cleanup_queue.enqueue!(project)
worker = Ci::DestroyOldPipelinesWorker.new
worker.perform_work
puts "After first run: #{project.reload.all_pipelines.count} pipelines"
# => Should be 40 (deleted 50 success + 50 failed + 50 canceled = 150 total, triggers re-enqueue)
puts "Re-enqueued: #{cleanup_queue.size > 0}"
# => true (because 150 > 100)
# Continue running if re-enqueued
while cleanup_queue.size > 0
worker.perform_work
puts "After run: #{project.reload.all_pipelines.count} pipelines"
end
puts "Final count: #{project.reload.all_pipelines.count} pipelines"
# => 10 (only recent remain)
# === Verify Cache ===
cache = Ci::RetentionPolicies::PipelineDeletionCutoffCache.new(project: project).read
puts "Cache timestamps: #{cache.values.map { |t| t.strftime('%Y-%m-%d') }.uniq}"
# === Cleanup ===
project.destroy!
namespace.destroy!
Feature.remove(:ci_optimized_old_pipelines_query)
MR acceptance checklist
Evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.
Edited by Narendran