SPEP Test Run: Create TestRunService and pipeline completion event subscription

Summary

Create the service to create test runs and execute pipelines, plus an event subscription to update test run state when pipelines complete.

Background

The service handles validation, test run creation, and pipeline execution. The event subscriber listens for pipeline completion events and updates the test run state accordingly.

Parent MR: !218585 (closed)

Implementation details

Service Implementation
# ee/app/services/security/pipeline_execution_schedule_policies/test_run_service.rb
module Security
  module PipelineExecutionSchedulePolicies
    class TestRunService
      include Gitlab::Utils::StrongMemoize

      PIPELINE_SOURCE = :pipeline_execution_policy_schedule

      def initialize(policy:, project:, user:)
        @policy = policy
        @project = project
        @user = user
      end

      def execute
        return error('Feature not available') unless feature_enabled?
        return error('Policy must be a pipeline_execution_schedule_policy') unless valid_policy_type?
        return error('User must be a project owner') unless user_is_owner?
        return error('Project is not in policy scope') unless project_in_scope?

        test_run = create_test_run
        return error(test_run.errors.full_messages.join(', ')) unless test_run.persisted?

        execute_pipeline(test_run)
      end

      private

      attr_reader :policy, :project, :user

      def feature_enabled?
        Feature.enabled?(:spep_test_run, project)
      end

      def valid_policy_type?
        policy.type_pipeline_execution_schedule_policy?
      end

      def user_is_owner?
        project.owner == user || user.can?(:owner_access, project)
      end

      def project_in_scope?
        policy.scope_applicable?(project)
      end

      def create_test_run
        Security::PolicyScheduleTestRun.create(
          security_policy: policy,
          project: project,
          user: user
        )
      end

      def execute_pipeline(test_run)
        ensure_security_policy_bot

        ci_content = policy.content['content'].deep_stringify_keys.to_yaml
        branch = project.default_branch_or_main

        result = Ci::CreatePipelineService.new(
          project,
          project.security_policy_bot,
          ref: branch
        ).execute(PIPELINE_SOURCE, content: ci_content, ignore_skip_ci: true)

        if result.success?
          pipeline = result.payload
          test_run.update!(pipeline: pipeline, state: :running)
          success(test_run: test_run, pipeline: pipeline)
        else
          test_run.update!(error_message: result.message, state: :failed)
          error(result.message, test_run: test_run)
        end
      end

      def ensure_security_policy_bot
        return if project.security_policy_bot

        Security::Orchestration::CreateBotService
          .new(project, nil, skip_authorization: true)
          .execute
      end

      def success(payload = {})
        ServiceResponse.success(payload: payload)
      end

      def error(message, payload = {})
        ServiceResponse.error(message: message, payload: payload)
      end
    end
  end
end
Event Subscriber (Worker)
# ee/app/workers/security/pipeline_execution_schedule_policies/test_run_completion_worker.rb
module Security
  module PipelineExecutionSchedulePolicies
    class TestRunCompletionWorker
      include Gitlab::EventStore::Subscriber

      def handle_event(event)
        pipeline_id = event.data[:pipeline_id]
        status = event.data[:status]

        test_run = ::Security::PolicyScheduleTestRun.find_by(pipeline_id: pipeline_id)
        return unless test_run

        new_state = status == 'failed' ? :failed : :complete
        test_run.update!(state: new_state)

        # Send email notification
        Security::PipelineExecutionSchedulePolicies::TestRunMailer
          .completion_email(test_run).deliver_later
      end
    end
  end
end
Event Store Subscription

Add to ee/lib/ee/gitlab/event_store.rb in register_security_policy_subscribers:

store.subscribe ::Security::PipelineExecutionSchedulePolicies::TestRunCompletionWorker,
  to: ::Ci::PipelineFinishedEvent,
  if: ->(event) { ::Security::PolicyScheduleTestRun.exists?(pipeline_id: event.data[:pipeline_id]) }

Sidekiq Queue Configuration

Add to config/sidekiq_queues.yml:

- - security_pipeline_execution_schedule_policies_test_run_completion
  - 1
Mailer (Optional)
# ee/app/mailers/security/pipeline_execution_schedule_policies/test_run_mailer.rb
module Security
  module PipelineExecutionSchedulePolicies
    class TestRunMailer < ApplicationMailer
      helper EmailsHelper

      def completion_email(test_run)
        @test_run = test_run
        @project = test_run.project
        @user = test_run.user
        @pipeline = test_run.pipeline

        mail(to: @user.email, subject: test_run_completion_subject)
      end

      private

      def test_run_completion_subject
        status = @pipeline&.status || 'unknown'
        "Pipeline Execution Schedule Policy Test Run #{status.humanize}"
      end
    end
  end
end
Edited by Andy Schoenen