From c8326438fff7837e9a937195dc171065eb5544a0 Mon Sep 17 00:00:00 2001 From: mc_rocha <mrocha@gitlab.com> Date: Mon, 8 Jul 2024 10:27:13 -0400 Subject: [PATCH 1/6] Add dynamic concurrency limit for create pipeline worker Changelog: changed EE: true --- app/models/ci/build.rb | 6 +++ config/initializers/1_settings.rb | 3 ++ .../create_pipeline_worker.rb | 2 +- .../pause_create_pipeline_worker.rb | 40 +++++++++++++++++++ .../scan_execution_pipeline_worker_pause.yml | 8 ++++ 5 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 ee/app/workers/security/scan_execution_policies/pause_create_pipeline_worker.rb create mode 100644 ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_worker_pause.yml diff --git a/app/models/ci/build.rb b/app/models/ci/build.rb index 5a767a56f39dc630..4290e4452f3128dc 100644 --- a/app/models/ci/build.rb +++ b/app/models/ci/build.rb @@ -220,6 +220,12 @@ class Build < Ci::Processable .or(with_job_artifacts.where(project_id: project_id, job_artifacts: { file_type: 'dotenv' })).distinct end + scope :with_pipeline_source_type, ->(pipeline_source_type) { joins(:pipeline).where(pipeline: { source: pipeline_source_type }) } + scope :created_after, ->(time) { where(arel_table[:created_at].gt(time)) } + scope :updated_after, ->(time) { where(arel_table[:updated_at].gt(time)) } + scope :order_by_created_at_asc, -> { order(created_at: :asc) } + scope :order_by_project_id_asc, -> { order(project_id: :asc) } + add_authentication_token_field :token, encrypted: :required, format_with_prefix: :prefix_and_partition_for_token diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb index b1dffc104f433204..909beb7f086869d3 100644 --- a/config/initializers/1_settings.rb +++ b/config/initializers/1_settings.rb @@ -930,6 +930,9 @@ Settings.cron_jobs['gitlab_subscriptions_add_on_purchases_cleanup_worker'] ||= {} Settings.cron_jobs['gitlab_subscriptions_add_on_purchases_cleanup_worker']['cron'] ||= '0 1 * * *' Settings.cron_jobs['gitlab_subscriptions_add_on_purchases_cleanup_worker']['job_class'] = 'GitlabSubscriptions::AddOnPurchases::CleanupWorker' + Settings.cron_jobs['pause_create_pipeline_worker'] ||= {} + Settings.cron_jobs['pause_create_pipeline_worker']['cron'] ||= '*/1 * * * *' + Settings.cron_jobs['pause_create_pipeline_worker']['job_class'] ||= 'Security::ScanExecutionPolicies::PauseCreatePipelineWorker' Gitlab.com do Settings.cron_jobs['disable_legacy_open_source_license_for_inactive_projects'] ||= {} diff --git a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb index 92107ea0b78f4bf4..f32eaddb68583dd0 100644 --- a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb +++ b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb @@ -10,7 +10,7 @@ class CreatePipelineWorker # rubocop:disable Scalability/IdempotentWorker -- The urgency :throttled data_consistency :delayed - concurrency_limit -> { Gitlab::CurrentSettings.security_policy_scheduled_scans_max_concurrency } + concurrency_limit -> { Feature.enabled?(:scan_execution_pipeline_worker_pause) ? 1 : nil } def perform(project_id, current_user_id, schedule_id, branch) project = Project.find_by_id(project_id) diff --git a/ee/app/workers/security/scan_execution_policies/pause_create_pipeline_worker.rb b/ee/app/workers/security/scan_execution_policies/pause_create_pipeline_worker.rb new file mode 100644 index 0000000000000000..ee5a76c4bd1c1a63 --- /dev/null +++ b/ee/app/workers/security/scan_execution_policies/pause_create_pipeline_worker.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +module Security + module ScanExecutionPolicies + class PauseCreatePipelineWorker + include ApplicationWorker + include CronjobQueue # rubocop: disable Scalability/CronWorkerContext -- Periodic processing is required + + feature_category :security_policy_management + data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency -- cron job + urgency :high + deduplicate :until_executed, if_deduplicated: :reschedule_once + idempotent! + + def perform + if active_jobs >= max_scheduled_scans_concurrency + Feature.enable(:scan_execution_pipeline_worker_pause) + elsif Feature.enabled?(:scan_execution_pipeline_worker_pause) # rubocop:disable Gitlab/FeatureFlagWithoutActor -- flag must be global + Feature.disable(:scan_execution_pipeline_worker_pause) + end + end + + private + + def active_jobs + ::Ci::Build.with_pipeline_source_type('security_orchestration_policy') + .with_status(*::Ci::HasStatus::ALIVE_STATUSES) + .created_after(1.hour.ago) + .updated_after(1.hour.ago) + .order_by_created_at_asc + .order_by_project_id_asc + .count + end + + def max_scheduled_scans_concurrency + Gitlab::CurrentSettings.security_policy_scheduled_scans_max_concurrency + end + end + end +end diff --git a/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_worker_pause.yml b/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_worker_pause.yml new file mode 100644 index 0000000000000000..6ff52254fa15eeda --- /dev/null +++ b/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_worker_pause.yml @@ -0,0 +1,8 @@ +--- +name: scan_execution_pipeline_worker_pause +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/153158 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/463802 +milestone: '17.1' +type: gitlab_com_derisk +group: group::security policies +default_enabled: false -- GitLab From f3761cc5a47a3864a4fd1beb3190224af9a91dd2 Mon Sep 17 00:00:00 2001 From: mc_rocha <mrocha@gitlab.com> Date: Mon, 8 Jul 2024 10:29:41 -0400 Subject: [PATCH 2/6] Add dynamic concurrency limit for create pipeline worker Changelog: changed EE: true --- ee/app/workers/all_queues.yml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml index 0612959b5f808742..eba42f8d0d2a4f45 100644 --- a/ee/app/workers/all_queues.yml +++ b/ee/app/workers/all_queues.yml @@ -561,6 +561,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: cronjob:security_scan_execution_policies_pause_create_pipeline + :worker_name: Security::ScanExecutionPolicies::PauseCreatePipelineWorker + :feature_category: :security_policy_management + :has_external_dependencies: false + :urgency: :high + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:security_scans_purge :worker_name: Security::Scans::PurgeWorker :feature_category: :vulnerability_management -- GitLab From 2f3c441d96840d8ebf743825c8b45841b9f1f5b6 Mon Sep 17 00:00:00 2001 From: mc_rocha <mrocha@gitlab.com> Date: Thu, 11 Jul 2024 17:17:34 -0400 Subject: [PATCH 3/6] Add dynamic concurrency limit for create pipeline worker Changelog: changed EE: true --- .../rule_schedule_service.rb | 5 ++- ee/app/workers/all_queues.yml | 9 ---- .../security/policy_concurrency_control.rb | 44 +++++++++++++++++++ .../create_pipeline_worker.rb | 5 ++- .../pause_create_pipeline_worker.rb | 40 ----------------- .../create_pipeline_worker_spec.rb | 2 +- 6 files changed, 52 insertions(+), 53 deletions(-) create mode 100644 ee/app/workers/concerns/security/policy_concurrency_control.rb delete mode 100644 ee/app/workers/security/scan_execution_policies/pause_create_pipeline_worker.rb diff --git a/ee/app/services/security/security_orchestration_policies/rule_schedule_service.rb b/ee/app/services/security/security_orchestration_policies/rule_schedule_service.rb index 0708a4df956741af..17037603c2398f0d 100644 --- a/ee/app/services/security/security_orchestration_policies/rule_schedule_service.rb +++ b/ee/app/services/security/security_orchestration_policies/rule_schedule_service.rb @@ -72,11 +72,14 @@ def schedule_scan(actions, branches) end def schedule_scans_using_a_worker(branches, schedule) + number_of_actions = actions_for(schedule).count + branches.map do |branch| ::Security::ScanExecutionPolicies::CreatePipelineWorker.perform_async(project.id, current_user.id, schedule.id, - branch) + branch, + number_of_actions) end end end diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml index eba42f8d0d2a4f45..0612959b5f808742 100644 --- a/ee/app/workers/all_queues.yml +++ b/ee/app/workers/all_queues.yml @@ -561,15 +561,6 @@ :weight: 1 :idempotent: false :tags: [] -- :name: cronjob:security_scan_execution_policies_pause_create_pipeline - :worker_name: Security::ScanExecutionPolicies::PauseCreatePipelineWorker - :feature_category: :security_policy_management - :has_external_dependencies: false - :urgency: :high - :resource_boundary: :unknown - :weight: 1 - :idempotent: true - :tags: [] - :name: cronjob:security_scans_purge :worker_name: Security::Scans::PurgeWorker :feature_category: :vulnerability_management diff --git a/ee/app/workers/concerns/security/policy_concurrency_control.rb b/ee/app/workers/concerns/security/policy_concurrency_control.rb new file mode 100644 index 0000000000000000..a6abce1c159e92be --- /dev/null +++ b/ee/app/workers/concerns/security/policy_concurrency_control.rb @@ -0,0 +1,44 @@ +# frozen_string_literal: true + +module Security + module PolicyConcurrencyControl + RESCHEDULE_INTERVAL = 5.seconds + CACHE_EXPIRES_IN = 1.second + + def perform(*args) + schedule_builds_count = args[4] + + if Security::PolicyConcurrencyControl.defer_job?(schedule_builds_count) + self.class.perform_in(RESCHEDULE_INTERVAL, *args) + else + super + end + end + + class << self + def defer_job?(schedule_builds_count) + max_scheduled_scans_concurrency > 0 && reached_limit?(limit: max_scheduled_scans_concurrency, + schedule_builds_count: schedule_builds_count) + end + + def max_scheduled_scans_concurrency + Gitlab::CurrentSettings.security_policy_scheduled_scans_max_concurrency + end + + def cache_key + [:security, :policy_concurrency_control] + end + + def reached_limit?(limit:, schedule_builds_count:) + Rails.cache.fetch(cache_key, expires_in: CACHE_EXPIRES_IN) do + ::Ci::Build.with_pipeline_source_type('security_orchestration_policy') + .with_status(*::Ci::HasStatus::ALIVE_STATUSES) + .created_after(1.hour.ago) + .updated_after(1.hour.ago) + .limit(limit) + .count + schedule_builds_count > limit + end + end + end + end +end diff --git a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb index f32eaddb68583dd0..669b34ae05111c76 100644 --- a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb +++ b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb @@ -4,15 +4,16 @@ module Security module ScanExecutionPolicies class CreatePipelineWorker # rubocop:disable Scalability/IdempotentWorker -- The worker should not run multiple times to avoid creating multiple pipelines include ApplicationWorker + prepend Security::PolicyConcurrencyControl feature_category :security_policy_management deduplicate :until_executing urgency :throttled data_consistency :delayed - concurrency_limit -> { Feature.enabled?(:scan_execution_pipeline_worker_pause) ? 1 : nil } + concurrency_limit -> { Gitlab::CurrentSettings.security_policy_scheduled_scans_max_concurrency * 10 } - def perform(project_id, current_user_id, schedule_id, branch) + def perform(project_id, current_user_id, schedule_id, branch, _number_of_actions) project = Project.find_by_id(project_id) return unless project diff --git a/ee/app/workers/security/scan_execution_policies/pause_create_pipeline_worker.rb b/ee/app/workers/security/scan_execution_policies/pause_create_pipeline_worker.rb deleted file mode 100644 index ee5a76c4bd1c1a63..0000000000000000 --- a/ee/app/workers/security/scan_execution_policies/pause_create_pipeline_worker.rb +++ /dev/null @@ -1,40 +0,0 @@ -# frozen_string_literal: true - -module Security - module ScanExecutionPolicies - class PauseCreatePipelineWorker - include ApplicationWorker - include CronjobQueue # rubocop: disable Scalability/CronWorkerContext -- Periodic processing is required - - feature_category :security_policy_management - data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency -- cron job - urgency :high - deduplicate :until_executed, if_deduplicated: :reschedule_once - idempotent! - - def perform - if active_jobs >= max_scheduled_scans_concurrency - Feature.enable(:scan_execution_pipeline_worker_pause) - elsif Feature.enabled?(:scan_execution_pipeline_worker_pause) # rubocop:disable Gitlab/FeatureFlagWithoutActor -- flag must be global - Feature.disable(:scan_execution_pipeline_worker_pause) - end - end - - private - - def active_jobs - ::Ci::Build.with_pipeline_source_type('security_orchestration_policy') - .with_status(*::Ci::HasStatus::ALIVE_STATUSES) - .created_after(1.hour.ago) - .updated_after(1.hour.ago) - .order_by_created_at_asc - .order_by_project_id_asc - .count - end - - def max_scheduled_scans_concurrency - Gitlab::CurrentSettings.security_policy_scheduled_scans_max_concurrency - end - end - end -end diff --git a/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb b/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb index 9e9e2d4995ce5430..f49ae36c03d56114 100644 --- a/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb +++ b/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb @@ -37,7 +37,7 @@ end end - subject(:run_worker) { described_class.new.perform(project_id, current_user_id, schedule_id, branch) } + subject(:run_worker) { described_class.new.perform(project_id, current_user_id, schedule_id, branch, 1) } context 'when project is not found' do let(:project_id) { non_existing_record_id } -- GitLab From d495372d4d3a99686da6cc150b910c7fef9e3930 Mon Sep 17 00:00:00 2001 From: mc_rocha <mrocha@gitlab.com> Date: Fri, 12 Jul 2024 11:34:07 -0400 Subject: [PATCH 4/6] Add dynamic concurrency limit for create pipeline worker Changelog: changed EE: true --- config/initializers/1_settings.rb | 3 -- .../rule_schedule_service.rb | 5 +-- .../conditional_concurrency_limit_control.rb | 25 +++++++++++ .../security/policy_concurrency_control.rb | 44 ------------------- .../create_pipeline_worker.rb | 36 ++++++++++++++- ...xecution_pipeline_concurrency_control.yml} | 2 +- .../create_pipeline_worker_spec.rb | 2 +- 7 files changed, 62 insertions(+), 55 deletions(-) create mode 100644 ee/app/workers/concerns/gitlab/conditional_concurrency_limit_control.rb delete mode 100644 ee/app/workers/concerns/security/policy_concurrency_control.rb rename ee/config/feature_flags/gitlab_com_derisk/{scan_execution_pipeline_worker_pause.yml => scan_execution_pipeline_concurrency_control.yml} (83%) diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb index 909beb7f086869d3..b1dffc104f433204 100644 --- a/config/initializers/1_settings.rb +++ b/config/initializers/1_settings.rb @@ -930,9 +930,6 @@ Settings.cron_jobs['gitlab_subscriptions_add_on_purchases_cleanup_worker'] ||= {} Settings.cron_jobs['gitlab_subscriptions_add_on_purchases_cleanup_worker']['cron'] ||= '0 1 * * *' Settings.cron_jobs['gitlab_subscriptions_add_on_purchases_cleanup_worker']['job_class'] = 'GitlabSubscriptions::AddOnPurchases::CleanupWorker' - Settings.cron_jobs['pause_create_pipeline_worker'] ||= {} - Settings.cron_jobs['pause_create_pipeline_worker']['cron'] ||= '*/1 * * * *' - Settings.cron_jobs['pause_create_pipeline_worker']['job_class'] ||= 'Security::ScanExecutionPolicies::PauseCreatePipelineWorker' Gitlab.com do Settings.cron_jobs['disable_legacy_open_source_license_for_inactive_projects'] ||= {} diff --git a/ee/app/services/security/security_orchestration_policies/rule_schedule_service.rb b/ee/app/services/security/security_orchestration_policies/rule_schedule_service.rb index 17037603c2398f0d..0708a4df956741af 100644 --- a/ee/app/services/security/security_orchestration_policies/rule_schedule_service.rb +++ b/ee/app/services/security/security_orchestration_policies/rule_schedule_service.rb @@ -72,14 +72,11 @@ def schedule_scan(actions, branches) end def schedule_scans_using_a_worker(branches, schedule) - number_of_actions = actions_for(schedule).count - branches.map do |branch| ::Security::ScanExecutionPolicies::CreatePipelineWorker.perform_async(project.id, current_user.id, schedule.id, - branch, - number_of_actions) + branch) end end end diff --git a/ee/app/workers/concerns/gitlab/conditional_concurrency_limit_control.rb b/ee/app/workers/concerns/gitlab/conditional_concurrency_limit_control.rb new file mode 100644 index 0000000000000000..32f0aacbc58c1cad --- /dev/null +++ b/ee/app/workers/concerns/gitlab/conditional_concurrency_limit_control.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +module Gitlab + module ConditionalConcurrencyLimitControl + DEFAULT_RESCHEDULE_INTERVAL = 5.seconds + + def perform(*args) + if defer_job?(*args) + self.class.perform_in(reschedule_interval, *args) + else + super + end + end + + private + + def defer_job?(*args) + raise NotImplementedError + end + + def reschedule_interval + DEFAULT_RESCHEDULE_INTERVAL + end + end +end diff --git a/ee/app/workers/concerns/security/policy_concurrency_control.rb b/ee/app/workers/concerns/security/policy_concurrency_control.rb deleted file mode 100644 index a6abce1c159e92be..0000000000000000 --- a/ee/app/workers/concerns/security/policy_concurrency_control.rb +++ /dev/null @@ -1,44 +0,0 @@ -# frozen_string_literal: true - -module Security - module PolicyConcurrencyControl - RESCHEDULE_INTERVAL = 5.seconds - CACHE_EXPIRES_IN = 1.second - - def perform(*args) - schedule_builds_count = args[4] - - if Security::PolicyConcurrencyControl.defer_job?(schedule_builds_count) - self.class.perform_in(RESCHEDULE_INTERVAL, *args) - else - super - end - end - - class << self - def defer_job?(schedule_builds_count) - max_scheduled_scans_concurrency > 0 && reached_limit?(limit: max_scheduled_scans_concurrency, - schedule_builds_count: schedule_builds_count) - end - - def max_scheduled_scans_concurrency - Gitlab::CurrentSettings.security_policy_scheduled_scans_max_concurrency - end - - def cache_key - [:security, :policy_concurrency_control] - end - - def reached_limit?(limit:, schedule_builds_count:) - Rails.cache.fetch(cache_key, expires_in: CACHE_EXPIRES_IN) do - ::Ci::Build.with_pipeline_source_type('security_orchestration_policy') - .with_status(*::Ci::HasStatus::ALIVE_STATUSES) - .created_after(1.hour.ago) - .updated_after(1.hour.ago) - .limit(limit) - .count + schedule_builds_count > limit - end - end - end - end -end diff --git a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb index 669b34ae05111c76..7072d328ee3cf7a2 100644 --- a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb +++ b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb @@ -4,7 +4,7 @@ module Security module ScanExecutionPolicies class CreatePipelineWorker # rubocop:disable Scalability/IdempotentWorker -- The worker should not run multiple times to avoid creating multiple pipelines include ApplicationWorker - prepend Security::PolicyConcurrencyControl + include Gitlab::ConditionalConcurrencyLimitControl feature_category :security_policy_management deduplicate :until_executing @@ -13,7 +13,7 @@ class CreatePipelineWorker # rubocop:disable Scalability/IdempotentWorker -- The concurrency_limit -> { Gitlab::CurrentSettings.security_policy_scheduled_scans_max_concurrency * 10 } - def perform(project_id, current_user_id, schedule_id, branch, _number_of_actions) + def perform(project_id, current_user_id, schedule_id, branch) project = Project.find_by_id(project_id) return unless project @@ -36,6 +36,38 @@ def perform(project_id, current_user_id, schedule_id, branch, _number_of_actions private + def defer_job?(*args) + return false unless Feature.enabled?(:scan_execution_pipeline_concurrency_control) + + schedule_id = args[3] + schedule = Security::OrchestrationPolicyRuleSchedule.find_by_id(schedule_id) + return false unless schedule + + schedule_builds_count = actions_for(schedule) + + max_scheduled_scans_concurrency > 0 && reached_limit?(limit: max_scheduled_scans_concurrency, + schedule_builds_count: schedule_builds_count) + end + + def max_scheduled_scans_concurrency + Gitlab::CurrentSettings.security_policy_scheduled_scans_max_concurrency + end + + def cache_key + [:security, :policy_concurrency_control] + end + + def reached_limit?(limit:, schedule_builds_count:) + Rails.cache.fetch(cache_key, expires_in: CACHE_EXPIRES_IN) do + ::Ci::Build.with_pipeline_source_type('security_orchestration_policy') + .with_status(*::Ci::HasStatus::ALIVE_STATUSES) + .created_after(1.hour.ago) + .updated_after(1.hour.ago) + .limit(limit) + .count + schedule_builds_count > limit + end + end + def actions_for(schedule) policy = schedule.policy return [] if policy.blank? diff --git a/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_worker_pause.yml b/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_concurrency_control.yml similarity index 83% rename from ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_worker_pause.yml rename to ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_concurrency_control.yml index 6ff52254fa15eeda..52339fda0cf235f1 100644 --- a/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_worker_pause.yml +++ b/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_concurrency_control.yml @@ -1,5 +1,5 @@ --- -name: scan_execution_pipeline_worker_pause +name: scan_execution_pipeline_concurrency_control introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/153158 rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/463802 milestone: '17.1' diff --git a/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb b/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb index f49ae36c03d56114..9e9e2d4995ce5430 100644 --- a/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb +++ b/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb @@ -37,7 +37,7 @@ end end - subject(:run_worker) { described_class.new.perform(project_id, current_user_id, schedule_id, branch, 1) } + subject(:run_worker) { described_class.new.perform(project_id, current_user_id, schedule_id, branch) } context 'when project is not found' do let(:project_id) { non_existing_record_id } -- GitLab From 83bc1330fd7a65e2cbb5f293df0558d412d89d83 Mon Sep 17 00:00:00 2001 From: mc_rocha <mrocha@gitlab.com> Date: Sun, 14 Jul 2024 12:30:30 -0400 Subject: [PATCH 5/6] Add dynamic concurrency limit for create pipeline worker Changelog: changed EE: true --- app/models/ci/build.rb | 2 - .../conditional_concurrency_limit_control.rb | 6 +- .../create_pipeline_worker.rb | 8 ++- ...execution_pipeline_concurrency_control.yml | 2 +- ...ditional_concurrency_limit_control_spec.rb | 58 +++++++++++++++++++ .../create_pipeline_worker_spec.rb | 38 ++++++++++++ spec/models/ci/build_spec.rb | 34 +++++++++++ 7 files changed, 141 insertions(+), 7 deletions(-) create mode 100644 ee/spec/workers/concerns/gitlab/conditional_concurrency_limit_control_spec.rb diff --git a/app/models/ci/build.rb b/app/models/ci/build.rb index 4290e4452f3128dc..701d9dccc9f8bb1b 100644 --- a/app/models/ci/build.rb +++ b/app/models/ci/build.rb @@ -223,8 +223,6 @@ class Build < Ci::Processable scope :with_pipeline_source_type, ->(pipeline_source_type) { joins(:pipeline).where(pipeline: { source: pipeline_source_type }) } scope :created_after, ->(time) { where(arel_table[:created_at].gt(time)) } scope :updated_after, ->(time) { where(arel_table[:updated_at].gt(time)) } - scope :order_by_created_at_asc, -> { order(created_at: :asc) } - scope :order_by_project_id_asc, -> { order(project_id: :asc) } add_authentication_token_field :token, encrypted: :required, diff --git a/ee/app/workers/concerns/gitlab/conditional_concurrency_limit_control.rb b/ee/app/workers/concerns/gitlab/conditional_concurrency_limit_control.rb index 32f0aacbc58c1cad..b9ea581ac9f5af98 100644 --- a/ee/app/workers/concerns/gitlab/conditional_concurrency_limit_control.rb +++ b/ee/app/workers/concerns/gitlab/conditional_concurrency_limit_control.rb @@ -15,10 +15,14 @@ def perform(*args) private def defer_job?(*args) - raise NotImplementedError + return super if defined?(super) + + false end def reschedule_interval + return super if defined?(super) + DEFAULT_RESCHEDULE_INTERVAL end end diff --git a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb index 7072d328ee3cf7a2..4fbe2dc5a82d62b5 100644 --- a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb +++ b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb @@ -4,7 +4,9 @@ module Security module ScanExecutionPolicies class CreatePipelineWorker # rubocop:disable Scalability/IdempotentWorker -- The worker should not run multiple times to avoid creating multiple pipelines include ApplicationWorker - include Gitlab::ConditionalConcurrencyLimitControl + prepend Gitlab::ConditionalConcurrencyLimitControl + + CACHE_EXPIRES_IN = 1.second feature_category :security_policy_management deduplicate :until_executing @@ -39,11 +41,11 @@ def perform(project_id, current_user_id, schedule_id, branch) def defer_job?(*args) return false unless Feature.enabled?(:scan_execution_pipeline_concurrency_control) - schedule_id = args[3] + schedule_id = args[2] schedule = Security::OrchestrationPolicyRuleSchedule.find_by_id(schedule_id) return false unless schedule - schedule_builds_count = actions_for(schedule) + schedule_builds_count = actions_for(schedule).count max_scheduled_scans_concurrency > 0 && reached_limit?(limit: max_scheduled_scans_concurrency, schedule_builds_count: schedule_builds_count) diff --git a/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_concurrency_control.yml b/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_concurrency_control.yml index 52339fda0cf235f1..15e320232871a24c 100644 --- a/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_concurrency_control.yml +++ b/ee/config/feature_flags/gitlab_com_derisk/scan_execution_pipeline_concurrency_control.yml @@ -2,7 +2,7 @@ name: scan_execution_pipeline_concurrency_control introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/153158 rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/463802 -milestone: '17.1' +milestone: '17.3' type: gitlab_com_derisk group: group::security policies default_enabled: false diff --git a/ee/spec/workers/concerns/gitlab/conditional_concurrency_limit_control_spec.rb b/ee/spec/workers/concerns/gitlab/conditional_concurrency_limit_control_spec.rb new file mode 100644 index 0000000000000000..d0fbae23f7ab9222 --- /dev/null +++ b/ee/spec/workers/concerns/gitlab/conditional_concurrency_limit_control_spec.rb @@ -0,0 +1,58 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ::Gitlab::ConditionalConcurrencyLimitControl, feature_category: :shared do + let(:defer_job) { false } + + let(:worker) do + Class.new do + prepend ::Gitlab::ConditionalConcurrencyLimitControl + def perform(defer_job); end + + def defer_job?(*args) + args.first + end + end + end + + subject(:perform) { worker.new.perform(defer_job) } + + context 'when defer_job? returns true' do + let(:defer_job) { true } + + it 'reschedule the worker' do + expect(worker).to receive(:perform_in).with( + ::Gitlab::ConditionalConcurrencyLimitControl::DEFAULT_RESCHEDULE_INTERVAL, defer_job) + + perform + end + end + + context 'when defer_job? returns false' do + it 'does not reschedule the work' do + expect(worker).not_to receive(:perform_in).with( + ::Gitlab::ConditionalConcurrencyLimitControl::DEFAULT_RESCHEDULE_INTERVAL, defer_job) + + perform + end + end + + context 'when defer_job? is not defined' do + let(:worker) do + Class.new do + prepend ::Gitlab::ConditionalConcurrencyLimitControl + def perform; end + end + end + + subject(:perform) { worker.new.perform } + + it 'does not reschedule the work' do + expect(worker).not_to receive(:perform_in).with( + ::Gitlab::ConditionalConcurrencyLimitControl::DEFAULT_RESCHEDULE_INTERVAL) + + perform + end + end +end diff --git a/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb b/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb index 9e9e2d4995ce5430..e93d4a028852a658 100644 --- a/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb +++ b/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb @@ -61,6 +61,44 @@ run_worker end + context 'when the number of running security policy scheduled scans exceeds the limit' do + before do + stub_application_setting(security_policy_scheduled_scans_max_concurrency: 1) + + create_list(:ci_build, 2, + :running, + created_at: 1.minute.ago, + updated_at: 1.minute.ago, + pipeline: create(:ci_pipeline, source: :security_orchestration_policy)) + end + + context 'when feature flag `scan_execution_pipeline_concurrency_control` is disabled' do + before do + stub_feature_flags(scan_execution_pipeline_concurrency_control: false) + end + + it 'delegates the pipeline creation to Security::SecurityOrchestrationPolicies::CreatePipelineService' do + expect(::Security::SecurityOrchestrationPolicies::CreatePipelineService).to( + receive(:new) + .with(project: project, current_user: current_user, params: params) + .and_call_original) + + run_worker + end + end + + context 'when feature flag `scan_execution_pipeline_concurrency_control` is enabled' do + it 'does not invokes CreatePipelineService' do + expect(::Security::SecurityOrchestrationPolicies::CreatePipelineService).not_to( + receive(:new) + .with(project: project, current_user: current_user, params: params) + .and_call_original) + + run_worker + end + end + end + context 'when create pipeline service returns errors' do before do allow_next_instance_of(::Security::SecurityOrchestrationPolicies::CreatePipelineService) do |service| diff --git a/spec/models/ci/build_spec.rb b/spec/models/ci/build_spec.rb index cccf39e26cf135a6..452a367b21ee6184 100644 --- a/spec/models/ci/build_spec.rb +++ b/spec/models/ci/build_spec.rb @@ -102,6 +102,40 @@ end end + describe 'scopes' do + let_it_be(:old_project) { create(:project) } + let_it_be(:new_project) { create(:project) } + let_it_be(:old_build) { create(:ci_build, created_at: 1.week.ago, updated_at: 1.week.ago, project: old_project) } + let_it_be(:new_build) { create(:ci_build, created_at: 1.minute.ago, updated_at: 1.minute.ago, project: new_project) } + + describe 'created_after' do + subject { described_class.created_after(1.day.ago) } + + it 'returns the builds created after the given time' do + is_expected.to contain_exactly(new_build, build) + end + end + + describe 'updated_after' do + subject { described_class.updated_after(1.day.ago) } + + it 'returns the builds updated after the given time' do + is_expected.to contain_exactly(new_build, build) + end + end + + describe 'with_pipeline_source_type' do + let_it_be(:pipeline) { create(:ci_pipeline, source: :security_orchestration_policy) } + let_it_be(:build) { create(:ci_build, pipeline: pipeline) } + + subject { described_class.with_pipeline_source_type('security_orchestration_policy') } + + it 'returns the builds updated after the given time' do + is_expected.to contain_exactly(build) + end + end + end + describe 'callbacks' do context 'when running after_create callback' do it 'executes hooks' do -- GitLab From 60aa5853c6e249684a826dedc9933ba1d5275fc2 Mon Sep 17 00:00:00 2001 From: Marcos Rocha <mrocha@gitlab.com> Date: Mon, 22 Jul 2024 21:34:40 +0000 Subject: [PATCH 6/6] Apply MR suggestion --- .../create_pipeline_worker.rb | 6 +++-- ...ditional_concurrency_limit_control_spec.rb | 22 ++++++++++++------- .../create_pipeline_worker_spec.rb | 8 +++++++ spec/models/ci/build_spec.rb | 2 ++ 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb index 4fbe2dc5a82d62b5..d17fea0fc1f735a0 100644 --- a/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb +++ b/ee/app/workers/security/scan_execution_policies/create_pipeline_worker.rb @@ -60,14 +60,16 @@ def cache_key end def reached_limit?(limit:, schedule_builds_count:) - Rails.cache.fetch(cache_key, expires_in: CACHE_EXPIRES_IN) do + active_builds = Rails.cache.fetch(cache_key, expires_in: CACHE_EXPIRES_IN) do ::Ci::Build.with_pipeline_source_type('security_orchestration_policy') .with_status(*::Ci::HasStatus::ALIVE_STATUSES) .created_after(1.hour.ago) .updated_after(1.hour.ago) .limit(limit) - .count + schedule_builds_count > limit + .count end + + active_builds + schedule_builds_count > limit end def actions_for(schedule) diff --git a/ee/spec/workers/concerns/gitlab/conditional_concurrency_limit_control_spec.rb b/ee/spec/workers/concerns/gitlab/conditional_concurrency_limit_control_spec.rb index d0fbae23f7ab9222..450b757124b0be38 100644 --- a/ee/spec/workers/concerns/gitlab/conditional_concurrency_limit_control_spec.rb +++ b/ee/spec/workers/concerns/gitlab/conditional_concurrency_limit_control_spec.rb @@ -5,10 +5,12 @@ RSpec.describe ::Gitlab::ConditionalConcurrencyLimitControl, feature_category: :shared do let(:defer_job) { false } - let(:worker) do + let(:worker_class) do Class.new do prepend ::Gitlab::ConditionalConcurrencyLimitControl - def perform(defer_job); end + def perform(_defer_job) + puts 'Perform Job' + end def defer_job?(*args) args.first @@ -16,13 +18,13 @@ def defer_job?(*args) end end - subject(:perform) { worker.new.perform(defer_job) } + subject(:perform) { worker_class.new.perform(defer_job) } context 'when defer_job? returns true' do let(:defer_job) { true } it 'reschedule the worker' do - expect(worker).to receive(:perform_in).with( + expect(worker_class).to receive(:perform_in).with( ::Gitlab::ConditionalConcurrencyLimitControl::DEFAULT_RESCHEDULE_INTERVAL, defer_job) perform @@ -31,25 +33,29 @@ def defer_job?(*args) context 'when defer_job? returns false' do it 'does not reschedule the work' do - expect(worker).not_to receive(:perform_in).with( + expect(worker_class).not_to receive(:perform_in).with( ::Gitlab::ConditionalConcurrencyLimitControl::DEFAULT_RESCHEDULE_INTERVAL, defer_job) + expect_next_instance_of(worker_class) do |instance| + expect(instance).to receive(:perform).with(defer_job) + end + perform end end context 'when defer_job? is not defined' do - let(:worker) do + let(:worker_class) do Class.new do prepend ::Gitlab::ConditionalConcurrencyLimitControl def perform; end end end - subject(:perform) { worker.new.perform } + subject(:perform) { worker_class.new.perform } it 'does not reschedule the work' do - expect(worker).not_to receive(:perform_in).with( + expect(worker_class).not_to receive(:perform_in).with( ::Gitlab::ConditionalConcurrencyLimitControl::DEFAULT_RESCHEDULE_INTERVAL) perform diff --git a/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb b/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb index e93d4a028852a658..3da9f606f2c904a0 100644 --- a/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb +++ b/ee/spec/workers/security/scan_execution_policies/create_pipeline_worker_spec.rb @@ -96,6 +96,14 @@ run_worker end + + it 'rescheduled the worker' do + expect(described_class).to receive(:perform_in) + .with(Gitlab::ConditionalConcurrencyLimitControl::DEFAULT_RESCHEDULE_INTERVAL, + project_id, current_user_id, schedule_id, branch) + + run_worker + end end end diff --git a/spec/models/ci/build_spec.rb b/spec/models/ci/build_spec.rb index 452a367b21ee6184..3b57d11a06984bdf 100644 --- a/spec/models/ci/build_spec.rb +++ b/spec/models/ci/build_spec.rb @@ -127,6 +127,8 @@ describe 'with_pipeline_source_type' do let_it_be(:pipeline) { create(:ci_pipeline, source: :security_orchestration_policy) } let_it_be(:build) { create(:ci_build, pipeline: pipeline) } + let_it_be(:push_pipeline) { create(:ci_pipeline, source: :push) } + let_it_be(:push_build) { create(:ci_build, pipeline: push_pipeline) } subject { described_class.with_pipeline_source_type('security_orchestration_policy') } -- GitLab