From c8326438fff7837e9a937195dc171065eb5544a0 Mon Sep 17 00:00:00 2001
From: mc_rocha <mrocha@gitlab.com>
Date: Mon, 8 Jul 2024 10:27:13 -0400
Subject: [PATCH 1/6] Add dynamic concurrency limit for create pipeline worker

Changelog: changed
EE: true
---
 app/models/ci/build.rb                        |  6 +++
 config/initializers/1_settings.rb             |  3 ++
 .../create_pipeline_worker.rb                 |  2 +-
 .../pause_create_pipeline_worker.rb           | 40 +++++++++++++++++++
 .../scan_execution_pipeline_worker_pause.yml  |  8 ++++
 5 files changed, 58 insertions(+), 1 deletion(-)
 create mode 100644 ee/app/workers/security/scan_execution_policies/pause_create_pipeline_worker.rb
 create mode 100644 ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_worker_pause.yml

diff --git a/app/models/ci/build.rb b/app/models/ci/build.rb
index 5a767a56f39dc630..4290e4452f3128dc 100644
--- a/app/models/ci/build.rb
+++ b/app/models/ci/build.rb
@@ -220,6 +220,12 @@ class Build < Ci::Processable
       .or(with_job_artifacts.where(project_id: project_id, job_artifacts: { file_type: 'dotenv' })).distinct
     end
 
+    scope :with_pipeline_source_type, ->(pipeline_source_type) { joins(:pipeline).where(pipeline: { source: pipeline_source_type }) }
+    scope :created_after, ->(time) { where(arel_table[:created_at].gt(time)) }
+    scope :updated_after, ->(time) { where(arel_table[:updated_at].gt(time)) }
+    scope :order_by_created_at_asc, -> { order(created_at: :asc) }
+    scope :order_by_project_id_asc, -> { order(project_id: :asc) }
+
     add_authentication_token_field :token,
       encrypted: :required,
       format_with_prefix: :prefix_and_partition_for_token
diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb
index b1dffc104f433204..909beb7f086869d3 100644
--- a/config/initializers/1_settings.rb
+++ b/config/initializers/1_settings.rb
@@ -930,6 +930,9 @@
   Settings.cron_jobs['gitlab_subscriptions_add_on_purchases_cleanup_worker'] ||= {}
   Settings.cron_jobs['gitlab_subscriptions_add_on_purchases_cleanup_worker']['cron'] ||= '0 1 * * *'
   Settings.cron_jobs['gitlab_subscriptions_add_on_purchases_cleanup_worker']['job_class'] = 'GitlabSubscriptions::AddOnPurchases::CleanupWorker'
+  Settings.cron_jobs['pause_create_pipeline_worker'] ||= {}
+  Settings.cron_jobs['pause_create_pipeline_worker']['cron'] ||= '*/1 * * * *'
+  Settings.cron_jobs['pause_create_pipeline_worker']['job_class'] ||= 'Security::ScanExecutionPolicies::PauseCreatePipelineWorker'
 
   Gitlab.com do
     Settings.cron_jobs['disable_legacy_open_source_license_for_inactive_projects'] ||= {}
diff --git a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb
index 92107ea0b78f4bf4..f32eaddb68583dd0 100644
--- a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb
+++ b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb
@@ -10,7 +10,7 @@ class CreatePipelineWorker # rubocop:disable Scalability/IdempotentWorker -- The
       urgency :throttled
       data_consistency :delayed
 
-      concurrency_limit -> { Gitlab::CurrentSettings.security_policy_scheduled_scans_max_concurrency }
+      concurrency_limit -> { Feature.enabled?(:scan_execution_pipeline_worker_pause) ? 1 : nil }
 
       def perform(project_id, current_user_id, schedule_id, branch)
         project = Project.find_by_id(project_id)
diff --git a/ee/app/workers/security/scan_execution_policies/pause_create_pipeline_worker.rb b/ee/app/workers/security/scan_execution_policies/pause_create_pipeline_worker.rb
new file mode 100644
index 0000000000000000..ee5a76c4bd1c1a63
--- /dev/null
+++ b/ee/app/workers/security/scan_execution_policies/pause_create_pipeline_worker.rb
@@ -0,0 +1,40 @@
+# frozen_string_literal: true
+
+module Security
+  module ScanExecutionPolicies
+    class PauseCreatePipelineWorker
+      include ApplicationWorker
+      include CronjobQueue # rubocop: disable Scalability/CronWorkerContext -- Periodic processing is required
+
+      feature_category :security_policy_management
+      data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency -- cron job
+      urgency :high
+      deduplicate :until_executed, if_deduplicated: :reschedule_once
+      idempotent!
+
+      def perform
+        if active_jobs >= max_scheduled_scans_concurrency
+          Feature.enable(:scan_execution_pipeline_worker_pause)
+        elsif Feature.enabled?(:scan_execution_pipeline_worker_pause) # rubocop:disable Gitlab/FeatureFlagWithoutActor -- flag must be global
+          Feature.disable(:scan_execution_pipeline_worker_pause)
+        end
+      end
+
+      private
+
+      def active_jobs
+        ::Ci::Build.with_pipeline_source_type('security_orchestration_policy')
+                   .with_status(*::Ci::HasStatus::ALIVE_STATUSES)
+                   .created_after(1.hour.ago)
+                   .updated_after(1.hour.ago)
+                   .order_by_created_at_asc
+                   .order_by_project_id_asc
+                   .count
+      end
+
+      def max_scheduled_scans_concurrency
+        Gitlab::CurrentSettings.security_policy_scheduled_scans_max_concurrency
+      end
+    end
+  end
+end
diff --git a/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_worker_pause.yml b/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_worker_pause.yml
new file mode 100644
index 0000000000000000..6ff52254fa15eeda
--- /dev/null
+++ b/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_worker_pause.yml
@@ -0,0 +1,8 @@
+---
+name: scan_execution_pipeline_worker_pause
+introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/153158
+rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/463802
+milestone: '17.1'
+type: gitlab_com_derisk
+group: group::security policies
+default_enabled: false
-- 
GitLab


From f3761cc5a47a3864a4fd1beb3190224af9a91dd2 Mon Sep 17 00:00:00 2001
From: mc_rocha <mrocha@gitlab.com>
Date: Mon, 8 Jul 2024 10:29:41 -0400
Subject: [PATCH 2/6] Add dynamic concurrency limit for create pipeline worker

Changelog: changed
EE: true
---
 ee/app/workers/all_queues.yml | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml
index 0612959b5f808742..eba42f8d0d2a4f45 100644
--- a/ee/app/workers/all_queues.yml
+++ b/ee/app/workers/all_queues.yml
@@ -561,6 +561,15 @@
   :weight: 1
   :idempotent: false
   :tags: []
+- :name: cronjob:security_scan_execution_policies_pause_create_pipeline
+  :worker_name: Security::ScanExecutionPolicies::PauseCreatePipelineWorker
+  :feature_category: :security_policy_management
+  :has_external_dependencies: false
+  :urgency: :high
+  :resource_boundary: :unknown
+  :weight: 1
+  :idempotent: true
+  :tags: []
 - :name: cronjob:security_scans_purge
   :worker_name: Security::Scans::PurgeWorker
   :feature_category: :vulnerability_management
-- 
GitLab


From 2f3c441d96840d8ebf743825c8b45841b9f1f5b6 Mon Sep 17 00:00:00 2001
From: mc_rocha <mrocha@gitlab.com>
Date: Thu, 11 Jul 2024 17:17:34 -0400
Subject: [PATCH 3/6] Add dynamic concurrency limit for create pipeline worker

Changelog: changed
EE: true
---
 .../rule_schedule_service.rb                  |  5 ++-
 ee/app/workers/all_queues.yml                 |  9 ----
 .../security/policy_concurrency_control.rb    | 44 +++++++++++++++++++
 .../create_pipeline_worker.rb                 |  5 ++-
 .../pause_create_pipeline_worker.rb           | 40 -----------------
 .../create_pipeline_worker_spec.rb            |  2 +-
 6 files changed, 52 insertions(+), 53 deletions(-)
 create mode 100644 ee/app/workers/concerns/security/policy_concurrency_control.rb
 delete mode 100644 ee/app/workers/security/scan_execution_policies/pause_create_pipeline_worker.rb

diff --git a/ee/app/services/security/security_orchestration_policies/rule_schedule_service.rb b/ee/app/services/security/security_orchestration_policies/rule_schedule_service.rb
index 0708a4df956741af..17037603c2398f0d 100644
--- a/ee/app/services/security/security_orchestration_policies/rule_schedule_service.rb
+++ b/ee/app/services/security/security_orchestration_policies/rule_schedule_service.rb
@@ -72,11 +72,14 @@ def schedule_scan(actions, branches)
       end
 
       def schedule_scans_using_a_worker(branches, schedule)
+        number_of_actions = actions_for(schedule).count
+
         branches.map do |branch|
           ::Security::ScanExecutionPolicies::CreatePipelineWorker.perform_async(project.id,
             current_user.id,
             schedule.id,
-            branch)
+            branch,
+            number_of_actions)
         end
       end
     end
diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml
index eba42f8d0d2a4f45..0612959b5f808742 100644
--- a/ee/app/workers/all_queues.yml
+++ b/ee/app/workers/all_queues.yml
@@ -561,15 +561,6 @@
   :weight: 1
   :idempotent: false
   :tags: []
-- :name: cronjob:security_scan_execution_policies_pause_create_pipeline
-  :worker_name: Security::ScanExecutionPolicies::PauseCreatePipelineWorker
-  :feature_category: :security_policy_management
-  :has_external_dependencies: false
-  :urgency: :high
-  :resource_boundary: :unknown
-  :weight: 1
-  :idempotent: true
-  :tags: []
 - :name: cronjob:security_scans_purge
   :worker_name: Security::Scans::PurgeWorker
   :feature_category: :vulnerability_management
diff --git a/ee/app/workers/concerns/security/policy_concurrency_control.rb b/ee/app/workers/concerns/security/policy_concurrency_control.rb
new file mode 100644
index 0000000000000000..a6abce1c159e92be
--- /dev/null
+++ b/ee/app/workers/concerns/security/policy_concurrency_control.rb
@@ -0,0 +1,44 @@
+# frozen_string_literal: true
+
+module Security
+  module PolicyConcurrencyControl
+    RESCHEDULE_INTERVAL = 5.seconds
+    CACHE_EXPIRES_IN = 1.second
+
+    def perform(*args)
+      schedule_builds_count = args[4]
+
+      if Security::PolicyConcurrencyControl.defer_job?(schedule_builds_count)
+        self.class.perform_in(RESCHEDULE_INTERVAL, *args)
+      else
+        super
+      end
+    end
+
+    class << self
+      def defer_job?(schedule_builds_count)
+        max_scheduled_scans_concurrency > 0 && reached_limit?(limit: max_scheduled_scans_concurrency,
+          schedule_builds_count: schedule_builds_count)
+      end
+
+      def max_scheduled_scans_concurrency
+        Gitlab::CurrentSettings.security_policy_scheduled_scans_max_concurrency
+      end
+
+      def cache_key
+        [:security, :policy_concurrency_control]
+      end
+
+      def reached_limit?(limit:, schedule_builds_count:)
+        Rails.cache.fetch(cache_key, expires_in: CACHE_EXPIRES_IN) do
+          ::Ci::Build.with_pipeline_source_type('security_orchestration_policy')
+                    .with_status(*::Ci::HasStatus::ALIVE_STATUSES)
+                    .created_after(1.hour.ago)
+                    .updated_after(1.hour.ago)
+                    .limit(limit)
+                    .count + schedule_builds_count > limit
+        end
+      end
+    end
+  end
+end
diff --git a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb
index f32eaddb68583dd0..669b34ae05111c76 100644
--- a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb
+++ b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb
@@ -4,15 +4,16 @@ module Security
   module ScanExecutionPolicies
     class CreatePipelineWorker # rubocop:disable Scalability/IdempotentWorker -- The worker should not run multiple times to avoid creating multiple pipelines
       include ApplicationWorker
+      prepend Security::PolicyConcurrencyControl
 
       feature_category :security_policy_management
       deduplicate :until_executing
       urgency :throttled
       data_consistency :delayed
 
-      concurrency_limit -> { Feature.enabled?(:scan_execution_pipeline_worker_pause) ? 1 : nil }
+      concurrency_limit -> { Gitlab::CurrentSettings.security_policy_scheduled_scans_max_concurrency * 10 }
 
-      def perform(project_id, current_user_id, schedule_id, branch)
+      def perform(project_id, current_user_id, schedule_id, branch, _number_of_actions)
         project = Project.find_by_id(project_id)
         return unless project
 
diff --git a/ee/app/workers/security/scan_execution_policies/pause_create_pipeline_worker.rb b/ee/app/workers/security/scan_execution_policies/pause_create_pipeline_worker.rb
deleted file mode 100644
index ee5a76c4bd1c1a63..0000000000000000
--- a/ee/app/workers/security/scan_execution_policies/pause_create_pipeline_worker.rb
+++ /dev/null
@@ -1,40 +0,0 @@
-# frozen_string_literal: true
-
-module Security
-  module ScanExecutionPolicies
-    class PauseCreatePipelineWorker
-      include ApplicationWorker
-      include CronjobQueue # rubocop: disable Scalability/CronWorkerContext -- Periodic processing is required
-
-      feature_category :security_policy_management
-      data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency -- cron job
-      urgency :high
-      deduplicate :until_executed, if_deduplicated: :reschedule_once
-      idempotent!
-
-      def perform
-        if active_jobs >= max_scheduled_scans_concurrency
-          Feature.enable(:scan_execution_pipeline_worker_pause)
-        elsif Feature.enabled?(:scan_execution_pipeline_worker_pause) # rubocop:disable Gitlab/FeatureFlagWithoutActor -- flag must be global
-          Feature.disable(:scan_execution_pipeline_worker_pause)
-        end
-      end
-
-      private
-
-      def active_jobs
-        ::Ci::Build.with_pipeline_source_type('security_orchestration_policy')
-                   .with_status(*::Ci::HasStatus::ALIVE_STATUSES)
-                   .created_after(1.hour.ago)
-                   .updated_after(1.hour.ago)
-                   .order_by_created_at_asc
-                   .order_by_project_id_asc
-                   .count
-      end
-
-      def max_scheduled_scans_concurrency
-        Gitlab::CurrentSettings.security_policy_scheduled_scans_max_concurrency
-      end
-    end
-  end
-end
diff --git a/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb b/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb
index 9e9e2d4995ce5430..f49ae36c03d56114 100644
--- a/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb
+++ b/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb
@@ -37,7 +37,7 @@
       end
     end
 
-    subject(:run_worker) { described_class.new.perform(project_id, current_user_id, schedule_id, branch) }
+    subject(:run_worker) { described_class.new.perform(project_id, current_user_id, schedule_id, branch, 1) }
 
     context 'when project is not found' do
       let(:project_id) { non_existing_record_id }
-- 
GitLab


From d495372d4d3a99686da6cc150b910c7fef9e3930 Mon Sep 17 00:00:00 2001
From: mc_rocha <mrocha@gitlab.com>
Date: Fri, 12 Jul 2024 11:34:07 -0400
Subject: [PATCH 4/6] Add dynamic concurrency limit for create pipeline worker

Changelog: changed
EE: true
---
 config/initializers/1_settings.rb             |  3 --
 .../rule_schedule_service.rb                  |  5 +--
 .../conditional_concurrency_limit_control.rb  | 25 +++++++++++
 .../security/policy_concurrency_control.rb    | 44 -------------------
 .../create_pipeline_worker.rb                 | 36 ++++++++++++++-
 ...xecution_pipeline_concurrency_control.yml} |  2 +-
 .../create_pipeline_worker_spec.rb            |  2 +-
 7 files changed, 62 insertions(+), 55 deletions(-)
 create mode 100644 ee/app/workers/concerns/gitlab/conditional_concurrency_limit_control.rb
 delete mode 100644 ee/app/workers/concerns/security/policy_concurrency_control.rb
 rename ee/config/feature_flags/gitlab_com_derisk/{scan_execution_pipeline_worker_pause.yml => scan_execution_pipeline_concurrency_control.yml} (83%)

diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb
index 909beb7f086869d3..b1dffc104f433204 100644
--- a/config/initializers/1_settings.rb
+++ b/config/initializers/1_settings.rb
@@ -930,9 +930,6 @@
   Settings.cron_jobs['gitlab_subscriptions_add_on_purchases_cleanup_worker'] ||= {}
   Settings.cron_jobs['gitlab_subscriptions_add_on_purchases_cleanup_worker']['cron'] ||= '0 1 * * *'
   Settings.cron_jobs['gitlab_subscriptions_add_on_purchases_cleanup_worker']['job_class'] = 'GitlabSubscriptions::AddOnPurchases::CleanupWorker'
-  Settings.cron_jobs['pause_create_pipeline_worker'] ||= {}
-  Settings.cron_jobs['pause_create_pipeline_worker']['cron'] ||= '*/1 * * * *'
-  Settings.cron_jobs['pause_create_pipeline_worker']['job_class'] ||= 'Security::ScanExecutionPolicies::PauseCreatePipelineWorker'
 
   Gitlab.com do
     Settings.cron_jobs['disable_legacy_open_source_license_for_inactive_projects'] ||= {}
diff --git a/ee/app/services/security/security_orchestration_policies/rule_schedule_service.rb b/ee/app/services/security/security_orchestration_policies/rule_schedule_service.rb
index 17037603c2398f0d..0708a4df956741af 100644
--- a/ee/app/services/security/security_orchestration_policies/rule_schedule_service.rb
+++ b/ee/app/services/security/security_orchestration_policies/rule_schedule_service.rb
@@ -72,14 +72,11 @@ def schedule_scan(actions, branches)
       end
 
       def schedule_scans_using_a_worker(branches, schedule)
-        number_of_actions = actions_for(schedule).count
-
         branches.map do |branch|
           ::Security::ScanExecutionPolicies::CreatePipelineWorker.perform_async(project.id,
             current_user.id,
             schedule.id,
-            branch,
-            number_of_actions)
+            branch)
         end
       end
     end
diff --git a/ee/app/workers/concerns/gitlab/conditional_concurrency_limit_control.rb b/ee/app/workers/concerns/gitlab/conditional_concurrency_limit_control.rb
new file mode 100644
index 0000000000000000..32f0aacbc58c1cad
--- /dev/null
+++ b/ee/app/workers/concerns/gitlab/conditional_concurrency_limit_control.rb
@@ -0,0 +1,25 @@
+# frozen_string_literal: true
+
+module Gitlab
+  module ConditionalConcurrencyLimitControl
+    DEFAULT_RESCHEDULE_INTERVAL = 5.seconds
+
+    def perform(*args)
+      if defer_job?(*args)
+        self.class.perform_in(reschedule_interval, *args)
+      else
+        super
+      end
+    end
+
+    private
+
+    def defer_job?(*args)
+      raise NotImplementedError
+    end
+
+    def reschedule_interval
+      DEFAULT_RESCHEDULE_INTERVAL
+    end
+  end
+end
diff --git a/ee/app/workers/concerns/security/policy_concurrency_control.rb b/ee/app/workers/concerns/security/policy_concurrency_control.rb
deleted file mode 100644
index a6abce1c159e92be..0000000000000000
--- a/ee/app/workers/concerns/security/policy_concurrency_control.rb
+++ /dev/null
@@ -1,44 +0,0 @@
-# frozen_string_literal: true
-
-module Security
-  module PolicyConcurrencyControl
-    RESCHEDULE_INTERVAL = 5.seconds
-    CACHE_EXPIRES_IN = 1.second
-
-    def perform(*args)
-      schedule_builds_count = args[4]
-
-      if Security::PolicyConcurrencyControl.defer_job?(schedule_builds_count)
-        self.class.perform_in(RESCHEDULE_INTERVAL, *args)
-      else
-        super
-      end
-    end
-
-    class << self
-      def defer_job?(schedule_builds_count)
-        max_scheduled_scans_concurrency > 0 && reached_limit?(limit: max_scheduled_scans_concurrency,
-          schedule_builds_count: schedule_builds_count)
-      end
-
-      def max_scheduled_scans_concurrency
-        Gitlab::CurrentSettings.security_policy_scheduled_scans_max_concurrency
-      end
-
-      def cache_key
-        [:security, :policy_concurrency_control]
-      end
-
-      def reached_limit?(limit:, schedule_builds_count:)
-        Rails.cache.fetch(cache_key, expires_in: CACHE_EXPIRES_IN) do
-          ::Ci::Build.with_pipeline_source_type('security_orchestration_policy')
-                    .with_status(*::Ci::HasStatus::ALIVE_STATUSES)
-                    .created_after(1.hour.ago)
-                    .updated_after(1.hour.ago)
-                    .limit(limit)
-                    .count + schedule_builds_count > limit
-        end
-      end
-    end
-  end
-end
diff --git a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb
index 669b34ae05111c76..7072d328ee3cf7a2 100644
--- a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb
+++ b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb
@@ -4,7 +4,7 @@ module Security
   module ScanExecutionPolicies
     class CreatePipelineWorker # rubocop:disable Scalability/IdempotentWorker -- The worker should not run multiple times to avoid creating multiple pipelines
       include ApplicationWorker
-      prepend Security::PolicyConcurrencyControl
+      include Gitlab::ConditionalConcurrencyLimitControl
 
       feature_category :security_policy_management
       deduplicate :until_executing
@@ -13,7 +13,7 @@ class CreatePipelineWorker # rubocop:disable Scalability/IdempotentWorker -- The
 
       concurrency_limit -> { Gitlab::CurrentSettings.security_policy_scheduled_scans_max_concurrency * 10 }
 
-      def perform(project_id, current_user_id, schedule_id, branch, _number_of_actions)
+      def perform(project_id, current_user_id, schedule_id, branch)
         project = Project.find_by_id(project_id)
         return unless project
 
@@ -36,6 +36,38 @@ def perform(project_id, current_user_id, schedule_id, branch, _number_of_actions
 
       private
 
+      def defer_job?(*args)
+        return false unless Feature.enabled?(:scan_execution_pipeline_concurrency_control)
+
+        schedule_id = args[3]
+        schedule = Security::OrchestrationPolicyRuleSchedule.find_by_id(schedule_id)
+        return false unless schedule
+
+        schedule_builds_count = actions_for(schedule)
+
+        max_scheduled_scans_concurrency > 0 && reached_limit?(limit: max_scheduled_scans_concurrency,
+          schedule_builds_count: schedule_builds_count)
+      end
+
+      def max_scheduled_scans_concurrency
+        Gitlab::CurrentSettings.security_policy_scheduled_scans_max_concurrency
+      end
+
+      def cache_key
+        [:security, :policy_concurrency_control]
+      end
+
+      def reached_limit?(limit:, schedule_builds_count:)
+        Rails.cache.fetch(cache_key, expires_in: CACHE_EXPIRES_IN) do
+          ::Ci::Build.with_pipeline_source_type('security_orchestration_policy')
+                     .with_status(*::Ci::HasStatus::ALIVE_STATUSES)
+                     .created_after(1.hour.ago)
+                     .updated_after(1.hour.ago)
+                     .limit(limit)
+                     .count + schedule_builds_count > limit
+        end
+      end
+
       def actions_for(schedule)
         policy = schedule.policy
         return [] if policy.blank?
diff --git a/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_worker_pause.yml b/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_concurrency_control.yml
similarity index 83%
rename from ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_worker_pause.yml
rename to ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_concurrency_control.yml
index 6ff52254fa15eeda..52339fda0cf235f1 100644
--- a/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_worker_pause.yml
+++ b/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_concurrency_control.yml
@@ -1,5 +1,5 @@
 ---
-name: scan_execution_pipeline_worker_pause
+name: scan_execution_pipeline_concurrency_control
 introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/153158
 rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/463802
 milestone: '17.1'
diff --git a/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb b/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb
index f49ae36c03d56114..9e9e2d4995ce5430 100644
--- a/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb
+++ b/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb
@@ -37,7 +37,7 @@
       end
     end
 
-    subject(:run_worker) { described_class.new.perform(project_id, current_user_id, schedule_id, branch, 1) }
+    subject(:run_worker) { described_class.new.perform(project_id, current_user_id, schedule_id, branch) }
 
     context 'when project is not found' do
       let(:project_id) { non_existing_record_id }
-- 
GitLab


From 83bc1330fd7a65e2cbb5f293df0558d412d89d83 Mon Sep 17 00:00:00 2001
From: mc_rocha <mrocha@gitlab.com>
Date: Sun, 14 Jul 2024 12:30:30 -0400
Subject: [PATCH 5/6] Add dynamic concurrency limit for create pipeline worker

Changelog: changed
EE: true
---
 app/models/ci/build.rb                        |  2 -
 .../conditional_concurrency_limit_control.rb  |  6 +-
 .../create_pipeline_worker.rb                 |  8 ++-
 ...execution_pipeline_concurrency_control.yml |  2 +-
 ...ditional_concurrency_limit_control_spec.rb | 58 +++++++++++++++++++
 .../create_pipeline_worker_spec.rb            | 38 ++++++++++++
 spec/models/ci/build_spec.rb                  | 34 +++++++++++
 7 files changed, 141 insertions(+), 7 deletions(-)
 create mode 100644 ee/spec/workers/concerns/gitlab/conditional_concurrency_limit_control_spec.rb

diff --git a/app/models/ci/build.rb b/app/models/ci/build.rb
index 4290e4452f3128dc..701d9dccc9f8bb1b 100644
--- a/app/models/ci/build.rb
+++ b/app/models/ci/build.rb
@@ -223,8 +223,6 @@ class Build < Ci::Processable
     scope :with_pipeline_source_type, ->(pipeline_source_type) { joins(:pipeline).where(pipeline: { source: pipeline_source_type }) }
     scope :created_after, ->(time) { where(arel_table[:created_at].gt(time)) }
     scope :updated_after, ->(time) { where(arel_table[:updated_at].gt(time)) }
-    scope :order_by_created_at_asc, -> { order(created_at: :asc) }
-    scope :order_by_project_id_asc, -> { order(project_id: :asc) }
 
     add_authentication_token_field :token,
       encrypted: :required,
diff --git a/ee/app/workers/concerns/gitlab/conditional_concurrency_limit_control.rb b/ee/app/workers/concerns/gitlab/conditional_concurrency_limit_control.rb
index 32f0aacbc58c1cad..b9ea581ac9f5af98 100644
--- a/ee/app/workers/concerns/gitlab/conditional_concurrency_limit_control.rb
+++ b/ee/app/workers/concerns/gitlab/conditional_concurrency_limit_control.rb
@@ -15,10 +15,14 @@ def perform(*args)
     private
 
     def defer_job?(*args)
-      raise NotImplementedError
+      return super if defined?(super)
+
+      false
     end
 
     def reschedule_interval
+      return super if defined?(super)
+
       DEFAULT_RESCHEDULE_INTERVAL
     end
   end
diff --git a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb
index 7072d328ee3cf7a2..4fbe2dc5a82d62b5 100644
--- a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb
+++ b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb
@@ -4,7 +4,9 @@ module Security
   module ScanExecutionPolicies
     class CreatePipelineWorker # rubocop:disable Scalability/IdempotentWorker -- The worker should not run multiple times to avoid creating multiple pipelines
       include ApplicationWorker
-      include Gitlab::ConditionalConcurrencyLimitControl
+      prepend Gitlab::ConditionalConcurrencyLimitControl
+
+      CACHE_EXPIRES_IN = 1.second
 
       feature_category :security_policy_management
       deduplicate :until_executing
@@ -39,11 +41,11 @@ def perform(project_id, current_user_id, schedule_id, branch)
       def defer_job?(*args)
         return false unless Feature.enabled?(:scan_execution_pipeline_concurrency_control)
 
-        schedule_id = args[3]
+        schedule_id = args[2]
         schedule = Security::OrchestrationPolicyRuleSchedule.find_by_id(schedule_id)
         return false unless schedule
 
-        schedule_builds_count = actions_for(schedule)
+        schedule_builds_count = actions_for(schedule).count
 
         max_scheduled_scans_concurrency > 0 && reached_limit?(limit: max_scheduled_scans_concurrency,
           schedule_builds_count: schedule_builds_count)
diff --git a/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_concurrency_control.yml b/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_concurrency_control.yml
index 52339fda0cf235f1..15e320232871a24c 100644
--- a/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_concurrency_control.yml
+++ b/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_concurrency_control.yml
@@ -2,7 +2,7 @@
 name: scan_execution_pipeline_concurrency_control
 introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/153158
 rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/463802
-milestone: '17.1'
+milestone: '17.3'
 type: gitlab_com_derisk
 group: group::security policies
 default_enabled: false
diff --git a/ee/spec/workers/concerns/gitlab/conditional_concurrency_limit_control_spec.rb b/ee/spec/workers/concerns/gitlab/conditional_concurrency_limit_control_spec.rb
new file mode 100644
index 0000000000000000..d0fbae23f7ab9222
--- /dev/null
+++ b/ee/spec/workers/concerns/gitlab/conditional_concurrency_limit_control_spec.rb
@@ -0,0 +1,58 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe ::Gitlab::ConditionalConcurrencyLimitControl, feature_category: :shared do
+  let(:defer_job) { false }
+
+  let(:worker) do
+    Class.new do
+      prepend ::Gitlab::ConditionalConcurrencyLimitControl
+      def perform(defer_job); end
+
+      def defer_job?(*args)
+        args.first
+      end
+    end
+  end
+
+  subject(:perform) { worker.new.perform(defer_job) }
+
+  context 'when defer_job? returns true' do
+    let(:defer_job) { true }
+
+    it 'reschedule the worker' do
+      expect(worker).to receive(:perform_in).with(
+        ::Gitlab::ConditionalConcurrencyLimitControl::DEFAULT_RESCHEDULE_INTERVAL, defer_job)
+
+      perform
+    end
+  end
+
+  context 'when defer_job? returns false' do
+    it 'does not reschedule the work' do
+      expect(worker).not_to receive(:perform_in).with(
+        ::Gitlab::ConditionalConcurrencyLimitControl::DEFAULT_RESCHEDULE_INTERVAL, defer_job)
+
+      perform
+    end
+  end
+
+  context 'when defer_job? is not defined' do
+    let(:worker) do
+      Class.new do
+        prepend ::Gitlab::ConditionalConcurrencyLimitControl
+        def perform; end
+      end
+    end
+
+    subject(:perform) { worker.new.perform }
+
+    it 'does not reschedule the work' do
+      expect(worker).not_to receive(:perform_in).with(
+        ::Gitlab::ConditionalConcurrencyLimitControl::DEFAULT_RESCHEDULE_INTERVAL)
+
+      perform
+    end
+  end
+end
diff --git a/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb b/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb
index 9e9e2d4995ce5430..e93d4a028852a658 100644
--- a/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb
+++ b/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb
@@ -61,6 +61,44 @@
         run_worker
       end
 
+      context 'when the number of running security policy scheduled scans exceeds the limit' do
+        before do
+          stub_application_setting(security_policy_scheduled_scans_max_concurrency: 1)
+
+          create_list(:ci_build, 2,
+            :running,
+            created_at: 1.minute.ago,
+            updated_at: 1.minute.ago,
+            pipeline: create(:ci_pipeline, source: :security_orchestration_policy))
+        end
+
+        context 'when feature flag `scan_execution_pipeline_concurrency_control` is disabled' do
+          before do
+            stub_feature_flags(scan_execution_pipeline_concurrency_control: false)
+          end
+
+          it 'delegates the pipeline creation to Security::SecurityOrchestrationPolicies::CreatePipelineService' do
+            expect(::Security::SecurityOrchestrationPolicies::CreatePipelineService).to(
+              receive(:new)
+                .with(project: project, current_user: current_user, params: params)
+                .and_call_original)
+
+            run_worker
+          end
+        end
+
+        context 'when feature flag `scan_execution_pipeline_concurrency_control` is enabled' do
+          it 'does not invokes CreatePipelineService' do
+            expect(::Security::SecurityOrchestrationPolicies::CreatePipelineService).not_to(
+              receive(:new)
+                .with(project: project, current_user: current_user, params: params)
+                .and_call_original)
+
+            run_worker
+          end
+        end
+      end
+
       context 'when create pipeline service returns errors' do
         before do
           allow_next_instance_of(::Security::SecurityOrchestrationPolicies::CreatePipelineService) do |service|
diff --git a/spec/models/ci/build_spec.rb b/spec/models/ci/build_spec.rb
index cccf39e26cf135a6..452a367b21ee6184 100644
--- a/spec/models/ci/build_spec.rb
+++ b/spec/models/ci/build_spec.rb
@@ -102,6 +102,40 @@
     end
   end
 
+  describe 'scopes' do
+    let_it_be(:old_project) { create(:project) }
+    let_it_be(:new_project) { create(:project) }
+    let_it_be(:old_build) { create(:ci_build, created_at: 1.week.ago, updated_at: 1.week.ago, project: old_project) }
+    let_it_be(:new_build) { create(:ci_build, created_at: 1.minute.ago, updated_at: 1.minute.ago, project: new_project) }
+
+    describe 'created_after' do
+      subject { described_class.created_after(1.day.ago) }
+
+      it 'returns the builds created after the given time' do
+        is_expected.to contain_exactly(new_build, build)
+      end
+    end
+
+    describe 'updated_after' do
+      subject { described_class.updated_after(1.day.ago) }
+
+      it 'returns the builds updated after the given time' do
+        is_expected.to contain_exactly(new_build, build)
+      end
+    end
+
+    describe 'with_pipeline_source_type' do
+      let_it_be(:pipeline) { create(:ci_pipeline, source: :security_orchestration_policy) }
+      let_it_be(:build) { create(:ci_build, pipeline: pipeline) }
+
+      subject { described_class.with_pipeline_source_type('security_orchestration_policy') }
+
+      it 'returns the builds updated after the given time' do
+        is_expected.to contain_exactly(build)
+      end
+    end
+  end
+
   describe 'callbacks' do
     context 'when running after_create callback' do
       it 'executes hooks' do
-- 
GitLab


From 60aa5853c6e249684a826dedc9933ba1d5275fc2 Mon Sep 17 00:00:00 2001
From: Marcos Rocha <mrocha@gitlab.com>
Date: Mon, 22 Jul 2024 21:34:40 +0000
Subject: [PATCH 6/6] Apply MR suggestion

---
 .../create_pipeline_worker.rb                 |  6 +++--
 ...ditional_concurrency_limit_control_spec.rb | 22 ++++++++++++-------
 .../create_pipeline_worker_spec.rb            |  8 +++++++
 spec/models/ci/build_spec.rb                  |  2 ++
 4 files changed, 28 insertions(+), 10 deletions(-)

diff --git a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb
index 4fbe2dc5a82d62b5..d17fea0fc1f735a0 100644
--- a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb
+++ b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb
@@ -60,14 +60,16 @@ def cache_key
       end
 
       def reached_limit?(limit:, schedule_builds_count:)
-        Rails.cache.fetch(cache_key, expires_in: CACHE_EXPIRES_IN) do
+        active_builds = Rails.cache.fetch(cache_key, expires_in: CACHE_EXPIRES_IN) do
           ::Ci::Build.with_pipeline_source_type('security_orchestration_policy')
                      .with_status(*::Ci::HasStatus::ALIVE_STATUSES)
                      .created_after(1.hour.ago)
                      .updated_after(1.hour.ago)
                      .limit(limit)
-                     .count + schedule_builds_count > limit
+                     .count
         end
+
+        active_builds + schedule_builds_count > limit
       end
 
       def actions_for(schedule)
diff --git a/ee/spec/workers/concerns/gitlab/conditional_concurrency_limit_control_spec.rb b/ee/spec/workers/concerns/gitlab/conditional_concurrency_limit_control_spec.rb
index d0fbae23f7ab9222..450b757124b0be38 100644
--- a/ee/spec/workers/concerns/gitlab/conditional_concurrency_limit_control_spec.rb
+++ b/ee/spec/workers/concerns/gitlab/conditional_concurrency_limit_control_spec.rb
@@ -5,10 +5,12 @@
 RSpec.describe ::Gitlab::ConditionalConcurrencyLimitControl, feature_category: :shared do
   let(:defer_job) { false }
 
-  let(:worker) do
+  let(:worker_class) do
     Class.new do
       prepend ::Gitlab::ConditionalConcurrencyLimitControl
-      def perform(defer_job); end
+      def perform(_defer_job)
+        puts 'Perform Job'
+      end
 
       def defer_job?(*args)
         args.first
@@ -16,13 +18,13 @@ def defer_job?(*args)
     end
   end
 
-  subject(:perform) { worker.new.perform(defer_job) }
+  subject(:perform) { worker_class.new.perform(defer_job) }
 
   context 'when defer_job? returns true' do
     let(:defer_job) { true }
 
     it 'reschedule the worker' do
-      expect(worker).to receive(:perform_in).with(
+      expect(worker_class).to receive(:perform_in).with(
         ::Gitlab::ConditionalConcurrencyLimitControl::DEFAULT_RESCHEDULE_INTERVAL, defer_job)
 
       perform
@@ -31,25 +33,29 @@ def defer_job?(*args)
 
   context 'when defer_job? returns false' do
     it 'does not reschedule the work' do
-      expect(worker).not_to receive(:perform_in).with(
+      expect(worker_class).not_to receive(:perform_in).with(
         ::Gitlab::ConditionalConcurrencyLimitControl::DEFAULT_RESCHEDULE_INTERVAL, defer_job)
 
+      expect_next_instance_of(worker_class) do |instance|
+        expect(instance).to receive(:perform).with(defer_job)
+      end
+
       perform
     end
   end
 
   context 'when defer_job? is not defined' do
-    let(:worker) do
+    let(:worker_class) do
       Class.new do
         prepend ::Gitlab::ConditionalConcurrencyLimitControl
         def perform; end
       end
     end
 
-    subject(:perform) { worker.new.perform }
+    subject(:perform) { worker_class.new.perform }
 
     it 'does not reschedule the work' do
-      expect(worker).not_to receive(:perform_in).with(
+      expect(worker_class).not_to receive(:perform_in).with(
         ::Gitlab::ConditionalConcurrencyLimitControl::DEFAULT_RESCHEDULE_INTERVAL)
 
       perform
diff --git a/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb b/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb
index e93d4a028852a658..3da9f606f2c904a0 100644
--- a/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb
+++ b/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb
@@ -96,6 +96,14 @@
 
             run_worker
           end
+
+          it 'rescheduled the worker' do
+            expect(described_class).to receive(:perform_in)
+                                         .with(Gitlab::ConditionalConcurrencyLimitControl::DEFAULT_RESCHEDULE_INTERVAL,
+                                           project_id, current_user_id, schedule_id, branch)
+
+            run_worker
+          end
         end
       end
 
diff --git a/spec/models/ci/build_spec.rb b/spec/models/ci/build_spec.rb
index 452a367b21ee6184..3b57d11a06984bdf 100644
--- a/spec/models/ci/build_spec.rb
+++ b/spec/models/ci/build_spec.rb
@@ -127,6 +127,8 @@
     describe 'with_pipeline_source_type' do
       let_it_be(:pipeline) { create(:ci_pipeline, source: :security_orchestration_policy) }
       let_it_be(:build) { create(:ci_build, pipeline: pipeline) }
+      let_it_be(:push_pipeline) { create(:ci_pipeline, source: :push) }
+      let_it_be(:push_build) { create(:ci_build, pipeline: push_pipeline) }
 
       subject { described_class.with_pipeline_source_type('security_orchestration_policy') }
 
-- 
GitLab