Skip to content

Make ProcessScanResultPolicyWorker idempotent

Why are we doing this work

There is a race condition in the ProcessScanResultPolicyWorker (#391634 (closed)). Making the worker idempotent allows us to get rid of the condition.

Implementation Plan

diff --git a/ee/app/workers/security/process_scan_result_policy_worker.rb
@@ -3,12 +3,6 @@
 module Security
   class ProcessScanResultPolicyWorker # rubocop:disable Scalability/IdempotentWorker
     include ApplicationWorker
-    include Gitlab::ExclusiveLeaseHelpers
-
-    LEASE_NAMESPACE = "process_scan_result_policy_worker"
-    LEASE_TTL = 5.minutes
-    LEASE_RETRY_BASE = 0.1
-    LEASE_RETRY_MULTIPLIER = 1.3

     data_consistency :always

+    deduplicate :until_executing, including_scheduled: true
+    idempotent!

@@ -16,42 +10,27 @@ class ProcessScanResultPolicyWorker # rubocop:disable Scalability/IdempotentWork

     feature_category :security_policy_management

-    def self.lease_key(project, configuration)
-      "#{LEASE_NAMESPACE}:#{project.id}:#{configuration.id}"
-    end
-
     def perform(project_id, configuration_id)
       project = Project.find_by_id(project_id)
       configuration = Security::OrchestrationPolicyConfiguration.find_by_id(configuration_id)

       return unless project && configuration

-      in_lock(self.class.lease_key(project, configuration), ttl: LEASE_TTL, sleep_sec: method(:lease_sleep_sec)) do
-        configuration.transaction do
-          configuration.delete_scan_result_policy_reads
-          configuration.delete_scan_finding_rules_for_project(project_id)
+      configuration.transaction do
+        configuration.delete_scan_result_policy_reads
+        configuration.delete_scan_finding_rules_for_project(project_id)

-          active_scan_result_policies = configuration.active_scan_result_policies
+        active_scan_result_policies = configuration.active_scan_result_policies

-          active_scan_result_policies.each_with_index do |policy, policy_index|
-            Security::SecurityOrchestrationPolicies::ProcessScanResultPolicyService
-              .new(project: project, policy_configuration: configuration, policy: policy, policy_index: policy_index)
-              .execute
-          end
-
-          Security::SecurityOrchestrationPolicies::SyncOpenedMergeRequestsService
-            .new(project: project)
+        active_scan_result_policies.each_with_index do |policy, policy_index|
+          Security::SecurityOrchestrationPolicies::ProcessScanResultPolicyService
+            .new(project: project, policy_configuration: configuration, policy: policy, policy_index: policy_index)
             .execute
         end
-
-        Security::SecurityOrchestrationPolicies::SyncOpenMergeRequestsHeadPipelineService
-          .new(project: project)
-          .execute
       end
-    end

-    def lease_sleep_sec(attempts)
-      LEASE_RETRY_BASE * (LEASE_RETRY_MULTIPLIER**attempts)
+      Security::SyncOpenedMergeRequestsServiceWorker.perform_async(project_id)
+      Security::SyncOpenMergeRequestsHeadPipelineWorker.perform_async(project_id)
     end
   end
 end
diff --git a/ee/app/workers/security/sync_open_merge_requests_head_pipeline_worker.rb
@@ -0,0 +1,21 @@
+# frozen_string_literal: true
+
+module Security
+  class SyncOpenMergeRequestsHeadPipelineWorker
+    include ApplicationWorker
+
+    data_consistency :always
+    feature_category :security_policy_management
+
+    deduplicate :until_executing, including_scheduled: true
+    idempotent!
+
+    def perform(project_id)
+      return unless project = Project.find_by_id(project_id)
+
+      Security::SecurityOrchestrationPolicies::SyncOpenMergeRequestsHeadPipelineService
+        .new(project: project)
+        .execute
+    end
+  end
+end
diff --git a/ee/app/workers/security/sync_opened_merge_requests_service_worker.rb
@@ -0,0 +1,21 @@
+# frozen_string_literal: true
+
+module Security
+  class SyncOpenedMergeRequestsServiceWorker
+    include ApplicationWorker
+
+    data_consistency :always
+    feature_category :security_policy_management
+
+    deduplicate :until_executing, including_scheduled: true
+    idempotent!
+
+    def perform(project_id)
+      return unless project = Project.find_by_id(project_id)
+
+      Security::SecurityOrchestrationPolicies::SyncOpenedMergeRequestsService
+        .new(project: project)
+        .execute
+    end
+  end
+end

Documentation

Per this thread, update documentation once issue is resolved.

Edited by Grant Hickman