Set dynamic TTL for PipelineExecutionPolicies::RunScheduleWorker
With scheduled pipeline execution policies (PEP) it is possible to enforce the start pipelines in each project that the policy applies to based on a schedule. More details in &14147.
When it is time to execute a schedule, we enqueue a Security::PipelineExecutionPolicies::RunScheduleWorker for each project that starts a pipeline. We rely on sidekiq deduplication to make sure there is no backlog of jobs.
Currently, our Sidekiq jobs for pipeline schedules use a fixed TTL (Time To Live) for deduplication. This can lead to two potential issues:
- If the TTL is too short relative to the scheduling interval, we may process duplicate jobs when the deduplication key expires prematurely
- If the TTL is too long, we risk blocking legitimate pipeline executions if the deduplication key fails to be cleaned up
We should look into implementing a dynamic TTL that aligns the deduplication window with each job's scheduling interval.
Implementation plan
Instead of implementing a dynamic TTL in the sidekiq middleware, we could pass a time to live as attribute to the jobs:
diff --git a/ee/app/workers/security/pipeline_execution_policies/run_schedule_worker.rb b/ee/app/workers/security/pipeline_execution_policies/run_schedule_worker.rb
index 4eda9649c5e8..995d7362370d 100644
--- a/ee/app/workers/security/pipeline_execution_policies/run_schedule_worker.rb
+++ b/ee/app/workers/security/pipeline_execution_policies/run_schedule_worker.rb
@@ -13,12 +13,13 @@ class RunScheduleWorker
PIPELINE_SOURCE = :pipeline_execution_policy_schedule
EVENT_KEY = 'scheduled_pipeline_execution_policy_failure'
- def perform(schedule_id)
+ def perform(schedule_id, execute_until: nil)
schedule = Security::PipelineExecutionProjectSchedule.find_by_id(schedule_id) || return
return if Feature.disabled?(:scheduled_pipeline_execution_policies, schedule.project)
return unless experiment_enabled?(schedule)
return if schedule.snoozed?
+ return if execute_until.present? && Time.now > execute_until
result = execute(schedule)
diff --git a/ee/app/workers/security/pipeline_execution_policies/schedule_worker.rb b/ee/app/workers/security/pipeline_execution_policies/schedule_worker.rb
index 9b0a56492548..ea5a430a560d 100644
--- a/ee/app/workers/security/pipeline_execution_policies/schedule_worker.rb
+++ b/ee/app/workers/security/pipeline_execution_policies/schedule_worker.rb
@@ -58,7 +58,7 @@ def enqueue_within_time_window(schedule)
delay = Random.rand(time_window)
with_context(project: schedule.project_id) do
- Security::PipelineExecutionPolicies::RunScheduleWorker.perform_in(delay, schedule.id)
+ Security::PipelineExecutionPolicies::RunScheduleWorker.perform_in(delay, schedule.id, execute_until: (Time.zone.now + schedule.next_run_in))
end
end