Set dynamic TTL for PipelineExecutionPolicies::RunScheduleWorker

With scheduled pipeline execution policies (PEP) it is possible to enforce the start pipelines in each project that the policy applies to based on a schedule. More details in &14147.

When it is time to execute a schedule, we enqueue a Security::PipelineExecutionPolicies::RunScheduleWorker for each project that starts a pipeline. We rely on sidekiq deduplication to make sure there is no backlog of jobs.

Currently, our Sidekiq jobs for pipeline schedules use a fixed TTL (Time To Live) for deduplication. This can lead to two potential issues:

  1. If the TTL is too short relative to the scheduling interval, we may process duplicate jobs when the deduplication key expires prematurely
  2. If the TTL is too long, we risk blocking legitimate pipeline executions if the deduplication key fails to be cleaned up

We should look into implementing a dynamic TTL that aligns the deduplication window with each job's scheduling interval.

Implementation plan

Instead of implementing a dynamic TTL in the sidekiq middleware, we could pass a time to live as attribute to the jobs:

diff --git a/ee/app/workers/security/pipeline_execution_policies/run_schedule_worker.rb b/ee/app/workers/security/pipeline_execution_policies/run_schedule_worker.rb
index 4eda9649c5e8..995d7362370d 100644
--- a/ee/app/workers/security/pipeline_execution_policies/run_schedule_worker.rb
+++ b/ee/app/workers/security/pipeline_execution_policies/run_schedule_worker.rb
@@ -13,12 +13,13 @@ class RunScheduleWorker
       PIPELINE_SOURCE = :pipeline_execution_policy_schedule
       EVENT_KEY = 'scheduled_pipeline_execution_policy_failure'
 
-      def perform(schedule_id)
+      def perform(schedule_id, execute_until: nil)
         schedule = Security::PipelineExecutionProjectSchedule.find_by_id(schedule_id) || return
 
         return if Feature.disabled?(:scheduled_pipeline_execution_policies, schedule.project)
         return unless experiment_enabled?(schedule)
         return if schedule.snoozed?
+        return if execute_until.present? && Time.now > execute_until
 
         result = execute(schedule)
 
diff --git a/ee/app/workers/security/pipeline_execution_policies/schedule_worker.rb b/ee/app/workers/security/pipeline_execution_policies/schedule_worker.rb
index 9b0a56492548..ea5a430a560d 100644
--- a/ee/app/workers/security/pipeline_execution_policies/schedule_worker.rb
+++ b/ee/app/workers/security/pipeline_execution_policies/schedule_worker.rb
@@ -58,7 +58,7 @@ def enqueue_within_time_window(schedule)
         delay = Random.rand(time_window)
 
         with_context(project: schedule.project_id) do
-          Security::PipelineExecutionPolicies::RunScheduleWorker.perform_in(delay, schedule.id)
+          Security::PipelineExecutionPolicies::RunScheduleWorker.perform_in(delay, schedule.id, execute_until: (Time.zone.now + schedule.next_run_in))
         end
       end
Edited by Andy Schoenen