Split operations in processScanResultWorker into multiple workers to scale
We tried to split the operations performed in ProcessScanResultPolicyWorker
into multiple workers and make it loosely coupled or eventually consistent in #393871 (closed) as discussed in this proposal
But it was noticed it does not work for all cases as discussed in #393871 (comment 1349864969)
Latest update: I think moving
SyncOpenedMergeRequestsService
to a new worker would not work for all cases, as we delete and recreate approval rules,SyncOpenedMergeRequestsService
might introduce race condition where we might try to process already deleted records. So, I've kept it within the same worker but did a refactor in !117397 (merged)
Creating this issue to track the split up discussions and implementation plan as the other issue scope was limited to make the worker idempotent with deduplication strategy.
Implementation Plan
-
Create a new feature flag sync_mr_approval_rules_worker
-
Create a new worker that wraps Security::SecurityOrchestrationPolicies::SyncOpenedMergeRequestsService
-
Update Security::ProcessScanResultPolicyWorker
to call the new worker behind feature flag
diff --git a/ee/app/services/security/security_orchestration_policies/sync_opened_merge_requests_service.rb b/ee/app/services/security/security_orchestration_policies/sync_opened_merge_requests_service.rb
index f192c3b2c7db..da32db540cc5 100644
--- a/ee/app/services/security/security_orchestration_policies/sync_opened_merge_requests_service.rb
+++ b/ee/app/services/security/security_orchestration_policies/sync_opened_merge_requests_service.rb
@@ -13,7 +13,11 @@ def initialize(project:, policy_configuration:)
def execute
each_open_merge_request do |merge_request|
- merge_request.sync_project_approval_rules_for_policy_configuration(@policy_configuration.id)
+ begin
+ merge_request.sync_project_approval_rules_for_policy_configuration(@policy_configuration.id)
+ rescue ActiveRecord::ActiveRecordError
+ break
+ end
head_pipeline = merge_request.actual_head_pipeline
next unless head_pipeline
diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml
index ab59942c5864..b36dccdac539 100644
--- a/ee/app/workers/all_queues.yml
+++ b/ee/app/workers/all_queues.yml
@@ -1650,6 +1650,15 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: security_scan_result_policies_sync_mrs
+ :worker_name: Security::ScanResultPolicies::SyncMrsWorker
+ :feature_category: :security_policy_management
+ :has_external_dependencies: false
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: security_scans_purge_by_job_id
:worker_name: Security::Scans::PurgeByJobIdWorker
:feature_category: :vulnerability_management
diff --git a/ee/app/workers/security/process_scan_result_policy_worker.rb b/ee/app/workers/security/process_scan_result_policy_worker.rb
index c399d425232a..a7639d7d51b9 100644
--- a/ee/app/workers/security/process_scan_result_policy_worker.rb
+++ b/ee/app/workers/security/process_scan_result_policy_worker.rb
@@ -30,6 +30,11 @@ def perform(project_id, configuration_id)
end
end
+ if Feature.enabled?(:sync_mr_approval_rules_worker, @project)
+ Security::ScanResultPolicies::SyncMrsWorker.perform_async(project_id, configuration_id)
+ return
+ end
+
Security::SecurityOrchestrationPolicies::SyncOpenedMergeRequestsService
.new(project: project, policy_configuration: configuration)
.execute
diff --git a/ee/app/workers/security/scan_result_policies/sync_mrs_worker.rb b/ee/app/workers/security/scan_result_policies/sync_mrs_worker.rb
new file mode 100644
index 000000000000..e7964383090c
--- /dev/null
+++ b/ee/app/workers/security/scan_result_policies/sync_mrs_worker.rb
@@ -0,0 +1,26 @@
+# frozen_string_literal: true
+
+module Security
+ module ScanResultPolicies
+ class SyncMrsWorker
+ include ApplicationWorker
+
+ idempotent!
+ deduplicate :until_executed, if_deduplicated: :reschedule_once
+
+ data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
+ sidekiq_options retry: true
+ feature_category :security_policy_management
+
+ def perform(project_id, configuration_id)
+ project = Project.find_by_id(project_id)
+ configuration = Security::OrchestrationPolicyConfiguration.find_by_id(configuration_id)
+ return unless project && configuration
+
+ Security::SecurityOrchestrationPolicies::SyncOpenedMergeRequestsService
+ .new(project: project, policy_configuration: configuration)
+ .execute
+ end
+ end
+ end
+end