SPEP Test Run: Create TestRunService and pipeline completion event subscription
Summary
Create the service to create test runs and execute pipelines, plus an event subscription to update test run state when pipelines complete.
Background
The service handles validation, test run creation, and pipeline execution. The event subscriber listens for pipeline completion events and updates the test run state accordingly.
Parent MR: !218585 (closed)
Implementation details
Service Implementation
# ee/app/services/security/pipeline_execution_schedule_policies/test_run_service.rb
module Security
module PipelineExecutionSchedulePolicies
class TestRunService
include Gitlab::Utils::StrongMemoize
PIPELINE_SOURCE = :pipeline_execution_policy_schedule
def initialize(policy:, project:, user:)
@policy = policy
@project = project
@user = user
end
def execute
return error('Feature not available') unless feature_enabled?
return error('Policy must be a pipeline_execution_schedule_policy') unless valid_policy_type?
return error('User must be a project owner') unless user_is_owner?
return error('Project is not in policy scope') unless project_in_scope?
test_run = create_test_run
return error(test_run.errors.full_messages.join(', ')) unless test_run.persisted?
execute_pipeline(test_run)
end
private
attr_reader :policy, :project, :user
def feature_enabled?
Feature.enabled?(:spep_test_run, project)
end
def valid_policy_type?
policy.type_pipeline_execution_schedule_policy?
end
def user_is_owner?
project.owner == user || user.can?(:owner_access, project)
end
def project_in_scope?
policy.scope_applicable?(project)
end
def create_test_run
Security::PolicyScheduleTestRun.create(
security_policy: policy,
project: project,
user: user
)
end
def execute_pipeline(test_run)
ensure_security_policy_bot
ci_content = policy.content['content'].deep_stringify_keys.to_yaml
branch = project.default_branch_or_main
result = Ci::CreatePipelineService.new(
project,
project.security_policy_bot,
ref: branch
).execute(PIPELINE_SOURCE, content: ci_content, ignore_skip_ci: true)
if result.success?
pipeline = result.payload
test_run.update!(pipeline: pipeline, state: :running)
success(test_run: test_run, pipeline: pipeline)
else
test_run.update!(error_message: result.message, state: :failed)
error(result.message, test_run: test_run)
end
end
def ensure_security_policy_bot
return if project.security_policy_bot
Security::Orchestration::CreateBotService
.new(project, nil, skip_authorization: true)
.execute
end
def success(payload = {})
ServiceResponse.success(payload: payload)
end
def error(message, payload = {})
ServiceResponse.error(message: message, payload: payload)
end
end
end
end
Event Subscriber (Worker)
# ee/app/workers/security/pipeline_execution_schedule_policies/test_run_completion_worker.rb
module Security
module PipelineExecutionSchedulePolicies
class TestRunCompletionWorker
include Gitlab::EventStore::Subscriber
def handle_event(event)
pipeline_id = event.data[:pipeline_id]
status = event.data[:status]
test_run = ::Security::PolicyScheduleTestRun.find_by(pipeline_id: pipeline_id)
return unless test_run
new_state = status == 'failed' ? :failed : :complete
test_run.update!(state: new_state)
# Send email notification
Security::PipelineExecutionSchedulePolicies::TestRunMailer
.completion_email(test_run).deliver_later
end
end
end
end
Event Store Subscription
Add to ee/lib/ee/gitlab/event_store.rb in register_security_policy_subscribers:
store.subscribe ::Security::PipelineExecutionSchedulePolicies::TestRunCompletionWorker,
to: ::Ci::PipelineFinishedEvent,
if: ->(event) { ::Security::PolicyScheduleTestRun.exists?(pipeline_id: event.data[:pipeline_id]) }
Sidekiq Queue Configuration
Add to config/sidekiq_queues.yml:
- - security_pipeline_execution_schedule_policies_test_run_completion
- 1
Mailer (Optional)
# ee/app/mailers/security/pipeline_execution_schedule_policies/test_run_mailer.rb
module Security
module PipelineExecutionSchedulePolicies
class TestRunMailer < ApplicationMailer
helper EmailsHelper
def completion_email(test_run)
@test_run = test_run
@project = test_run.project
@user = test_run.user
@pipeline = test_run.pipeline
mail(to: @user.email, subject: test_run_completion_subject)
end
private
def test_run_completion_subject
status = @pipeline&.status || 'unknown'
"Pipeline Execution Schedule Policy Test Run #{status.humanize}"
end
end
end
end
Edited by Andy Schoenen