Skip to content

Split operations in processScanResultWorker into multiple workers to scale

We tried to split the operations performed in ProcessScanResultPolicyWorker into multiple workers and make it loosely coupled or eventually consistent in #393871 (closed) as discussed in this proposal

But it was noticed it does not work for all cases as discussed in #393871 (comment 1349864969)

Latest update: I think moving SyncOpenedMergeRequestsService to a new worker would not work for all cases, as we delete and recreate approval rules, SyncOpenedMergeRequestsService might introduce race condition where we might try to process already deleted records. So, I've kept it within the same worker but did a refactor in !117397 (merged)

Creating this issue to track the split up discussions and implementation plan as the other issue scope was limited to make the worker idempotent with deduplication strategy.

Implementation Plan

  • Create a new feature flag sync_mr_approval_rules_worker
  • Create a new worker that wraps Security::SecurityOrchestrationPolicies::SyncOpenedMergeRequestsService
  • Update Security::ProcessScanResultPolicyWorker to call the new worker behind feature flag
diff --git a/ee/app/services/security/security_orchestration_policies/sync_opened_merge_requests_service.rb b/ee/app/services/security/security_orchestration_policies/sync_opened_merge_requests_service.rb
index f192c3b2c7db..da32db540cc5 100644
--- a/ee/app/services/security/security_orchestration_policies/sync_opened_merge_requests_service.rb
+++ b/ee/app/services/security/security_orchestration_policies/sync_opened_merge_requests_service.rb
@@ -13,7 +13,11 @@ def initialize(project:, policy_configuration:)
 
       def execute
         each_open_merge_request do |merge_request|
-          merge_request.sync_project_approval_rules_for_policy_configuration(@policy_configuration.id)
+          begin
+            merge_request.sync_project_approval_rules_for_policy_configuration(@policy_configuration.id)
+          rescue ActiveRecord::ActiveRecordError
+            break
+          end
 
           head_pipeline = merge_request.actual_head_pipeline
           next unless head_pipeline
diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml
index ab59942c5864..b36dccdac539 100644
--- a/ee/app/workers/all_queues.yml
+++ b/ee/app/workers/all_queues.yml
@@ -1650,6 +1650,15 @@
   :weight: 1
   :idempotent: true
   :tags: []
+- :name: security_scan_result_policies_sync_mrs
+  :worker_name: Security::ScanResultPolicies::SyncMrsWorker
+  :feature_category: :security_policy_management
+  :has_external_dependencies: false
+  :urgency: :low
+  :resource_boundary: :unknown
+  :weight: 1
+  :idempotent: true
+  :tags: []
 - :name: security_scans_purge_by_job_id
   :worker_name: Security::Scans::PurgeByJobIdWorker
   :feature_category: :vulnerability_management
diff --git a/ee/app/workers/security/process_scan_result_policy_worker.rb b/ee/app/workers/security/process_scan_result_policy_worker.rb
index c399d425232a..a7639d7d51b9 100644
--- a/ee/app/workers/security/process_scan_result_policy_worker.rb
+++ b/ee/app/workers/security/process_scan_result_policy_worker.rb
@@ -30,6 +30,11 @@ def perform(project_id, configuration_id)
         end
       end
 
+      if Feature.enabled?(:sync_mr_approval_rules_worker, @project)
+        Security::ScanResultPolicies::SyncMrsWorker.perform_async(project_id, configuration_id)
+        return
+      end
+
       Security::SecurityOrchestrationPolicies::SyncOpenedMergeRequestsService
         .new(project: project, policy_configuration: configuration)
         .execute
diff --git a/ee/app/workers/security/scan_result_policies/sync_mrs_worker.rb b/ee/app/workers/security/scan_result_policies/sync_mrs_worker.rb
new file mode 100644
index 000000000000..e7964383090c
--- /dev/null
+++ b/ee/app/workers/security/scan_result_policies/sync_mrs_worker.rb
@@ -0,0 +1,26 @@
+# frozen_string_literal: true
+
+module Security
+  module ScanResultPolicies
+    class SyncMrsWorker
+      include ApplicationWorker
+
+      idempotent!
+      deduplicate :until_executed, if_deduplicated: :reschedule_once
+
+      data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
+      sidekiq_options retry: true
+      feature_category :security_policy_management
+
+      def perform(project_id, configuration_id)
+        project = Project.find_by_id(project_id)
+        configuration = Security::OrchestrationPolicyConfiguration.find_by_id(configuration_id)
+        return unless project && configuration
+
+        Security::SecurityOrchestrationPolicies::SyncOpenedMergeRequestsService
+          .new(project: project, policy_configuration: configuration)
+          .execute
+      end
+    end
+  end
+end
Edited by Sashi Kumar Kumaresan