Skip to content
Snippets Groups Projects
Verified Commit c96bdc59 authored by Michael Becker's avatar Michael Becker Committed by GitLab
Browse files

Create worker to store security reports by project

Technical context
-----------------------

`UPSERT` queries **require acquiring locks on unique index
tuples**. This will cause lock contention if multiple processes try to
`UPSERT` records with the same unique attributes. The lock contention
will make each process wait for the other to complete.

Historical context
-----------------------

The `StoreSecurityReportsWorker` job has the lock-contention issue
described above. It was discovered in [this][0] production incident

This Commit
-----------------------

The short-term solution resolves the lock contention by, in effect,
making the jobs run sequentially. However, going through these jobs
sequentially can take a long time ([somewhere][1] on the order of 3.25
hours to 20 hours)

In this change, we implement a medium-term solution that replaces the
problematic job with a similar job that can make use of our existing
[sidekiq deduplication][2] tooling

With this change, even if a single project suddenly has many pipelines
created for it, only one job will be scheduled and the rest will be
de-duplicated

[0]:gitlab-com/gl-infra/production#17754
[1]:gitlab-com/gl-infra/production#17754 (comment 1831442417)
[2]:https://docs.gitlab.com/ee/development/sidekiq/idempotent_jobs.html#deduplication

related to: #452005

EE: true
Changelog: fixed
parent 6c08589a
No related branches found
No related tags found
2 merge requests!162233Draft: Script to update Topology Service Gem,!151541Create worker to store security reports by project
......@@ -82,8 +82,8 @@ Some of the scenarios where these `Security::Finding` records may be promoted to
If the pipeline ran on the default branch then the following steps, in addition to the steps in [Scan runs in a pipeline for a non-default branch](#scan-runs-in-a-pipeline-for-a-non-default-branch), are executed:
1. `Security::StoreScansService` gets called and schedules `StoreSecurityReportsWorker`.
1. `StoreSecurityReportsWorker` executes `Security::Ingestion::IngestReportsService`.
1. `Security::StoreScansService` gets called and schedules `StoreSecurityReportsByProjectWorker`.
1. `StoreSecurityReportsByProjectWorker` executes `Security::Ingestion::IngestReportsService`.
1. `Security::Ingestion::IngestReportsService` takes all reports from a given Pipeline and calls `Security::Ingestion::IngestReportService` and then calls `Security::Ingestion::MarkAsResolvedService`.
1. `Security::Ingestion::IngestReportService` calls `Security::Ingestion::IngestReportSliceService` which executes a number of tasks for a report slice.
......
......@@ -58,7 +58,17 @@ def parse_report_file?(file_type)
end
def schedule_store_reports_worker
StoreSecurityReportsWorker.perform_async(pipeline.id) if pipeline.default_branch?
return unless pipeline.default_branch?
if ::Feature.enabled?(:deduplicate_security_report_ingestion_jobs, project)
Gitlab::Redis::SharedState.with do |redis|
redis.set(Security::StoreSecurityReportsByProjectWorker.cache_key(project_id: project.id), pipeline.id)
end
Security::StoreSecurityReportsByProjectWorker.perform_async(project.id)
else
StoreSecurityReportsWorker.perform_async(pipeline.id)
end
end
def schedule_scan_security_report_secrets_worker
......
......@@ -948,6 +948,15 @@
:weight: 2
:idempotent: false
:tags: []
- :name: security_scans:security_store_security_reports_by_project
:worker_name: Security::StoreSecurityReportsByProjectWorker
:feature_category: :vulnerability_management
:has_external_dependencies: false
:urgency: :low
:resource_boundary: :cpu
:weight: 2
:idempotent: true
:tags: []
- :name: security_scans:security_track_secure_scans
:worker_name: Security::TrackSecureScansWorker
:feature_category: :vulnerability_management
......
# frozen_string_literal: true
# Worker for storing security reports into the database.
#
module Security
class StoreSecurityReportsByProjectWorker
include ApplicationWorker
include SecurityScansQueue
# rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency -- Refactor and do not want to change existing behavior
data_consistency :always
# rubocop:enable SidekiqLoadBalancing/WorkerDataConsistency
sidekiq_options retry: 3
feature_category :vulnerability_management
worker_resource_boundary :cpu
idempotent!
deduplicate :until_executing
def self.cache_key(project_id: nil)
return unless project_id.present?
"#{name}::latest_pipeline_with_security_reports::#{project_id}"
end
def perform(project_id)
project = Project.find_by_id(project_id)
return unless project&.can_store_security_reports?
pipeline = latest_pipeline_with_security_reports(project.id)
# Technically possible since this is an async job and pipelines
# can be deleted between when this job was scheduled and
# run;very unlikely
return unless pipeline
::Security::Ingestion::IngestReportsService.execute(pipeline)
end
private
def latest_pipeline_with_security_reports(project_id)
self.class.cache_key(project_id: project_id)
.then { |cache_key| Gitlab::Redis::SharedState.with { |redis| redis.get(cache_key) } }
# not strictly necessary, but to prevent coercing nil to id 0
.then { |pipeline_id_string| pipeline_id_string.blank? ? nil : pipeline_id_string.to_i }
.then { |pipeline_id| Ci::Pipeline.find_by_id(pipeline_id) }
end
end
end
......@@ -2,6 +2,7 @@
# Worker for storing security reports into the database.
#
# DEPRECATED: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/151541
class StoreSecurityReportsWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
include SecurityScansQueue
......
---
name: deduplicate_security_report_ingestion_jobs
feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/452005
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/151541
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/460476
milestone: '17.1'
group: group::threat insights
type: gitlab_com_derisk
default_enabled: false
......@@ -33,6 +33,7 @@
subject(:store_group_of_artifacts) { service_object.execute }
before do
allow(Security::StoreSecurityReportsByProjectWorker).to receive(:perform_async)
allow(StoreSecurityReportsWorker).to receive(:perform_async)
allow(ScanSecurityReportSecretsWorker).to receive(:perform_async)
allow(Security::StoreGroupedScansService).to receive(:execute)
......@@ -56,10 +57,10 @@
allow(pipeline).to receive(:default_branch?).and_return(true)
end
it 'does not schedule the `StoreSecurityReportsWorker`' do
it 'does not schedule the `StoreSecurityReportsByProjectWorker`' do
store_group_of_artifacts
expect(StoreSecurityReportsWorker).not_to have_received(:perform_async)
expect(Security::StoreSecurityReportsByProjectWorker).not_to have_received(:perform_async)
end
end
end
......@@ -75,16 +76,42 @@
end
end
context 'for StoreSecurityReportsWorker' do
context 'for StoreSecurityReportsByProjectWorker' do
context 'when the pipeline is for the default branch' do
let(:project_id) { pipeline.project.id }
let(:cache_key) { Security::StoreSecurityReportsByProjectWorker.cache_key(project_id: project_id) }
before do
allow(pipeline).to receive(:default_branch?).and_return(true)
end
it 'schedules the `StoreSecurityReportsWorker`' do
it 'schedules the `StoreSecurityReportsByProjectWorker`' do
store_group_of_artifacts
expect(StoreSecurityReportsWorker).to have_received(:perform_async).with(pipeline.id)
expect(StoreSecurityReportsWorker).not_to have_received(:perform_async)
expect(Security::StoreSecurityReportsByProjectWorker).to have_received(:perform_async).with(
project_id
)
end
it 'sets the expected redis cache value', :clean_gitlab_redis_shared_state do
expect { store_group_of_artifacts }.to change {
Gitlab::Redis::SharedState.with { |redis| redis.get(cache_key) }
}.from(nil).to(pipeline.id.to_s)
end
context 'when deduplicate_security_report_ingestion_jobs FF is disabled' do
before do
stub_feature_flags(deduplicate_security_report_ingestion_jobs: false)
allow(StoreSecurityReportsWorker).to receive(:perform_async)
end
it 'schedules the `StoreSecurityReportsWorker`' do
store_group_of_artifacts
expect(StoreSecurityReportsWorker).to have_received(:perform_async).with(pipeline.id)
expect(Security::StoreSecurityReportsByProjectWorker).not_to have_received(:perform_async)
end
end
end
......@@ -93,10 +120,25 @@
allow(pipeline).to receive(:default_branch?).and_return(false)
end
it 'does not schedule the `StoreSecurityReportsWorker`' do
it 'does not schedule the `StoreSecurityReportsByProjectWorker`' do
store_group_of_artifacts
expect(StoreSecurityReportsWorker).not_to have_received(:perform_async)
expect(Security::StoreSecurityReportsByProjectWorker).not_to have_received(:perform_async)
end
context 'when deduplicate_security_report_ingestion_jobs FF is disabled' do
before do
stub_feature_flags(deduplicate_security_report_ingestion_jobs: false)
allow(StoreSecurityReportsWorker).to receive(:perform_async)
end
it 'does not schedules the `StoreSecurityReportsWorker`' do
store_group_of_artifacts
expect(StoreSecurityReportsWorker).not_to have_received(:perform_async)
expect(Security::StoreSecurityReportsByProjectWorker).not_to have_received(:perform_async)
end
end
end
end
......@@ -232,10 +274,11 @@
expect(ScanSecurityReportSecretsWorker).not_to have_received(:perform_async)
end
it 'does not schedule the `StoreSecurityReportsWorker`' do
it 'does not schedule the `StoreSecurityReportsByProjectWorker`' do
store_group_of_artifacts
expect(StoreSecurityReportsWorker).not_to have_received(:perform_async)
expect(Security::StoreSecurityReportsByProjectWorker).not_to have_received(:perform_async)
end
describe 'scheduling `SyncFindingsToApprovalRulesWorker`' do
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Security::StoreSecurityReportsByProjectWorker, feature_category: :vulnerability_management do
let_it_be(:group) { create(:group) }
let_it_be(:project) { create(:project, :pipeline_refs, namespace: group) }
let(:pipeline) do
create(
:ee_ci_pipeline,
:with_sast_report,
status: :success,
ref: project.default_branch,
project: project,
user: project.creator
)
end
describe '.cache_key' do
subject { described_class.cache_key(project_id: project.id) }
it { is_expected.to eq("#{described_class}::latest_pipeline_with_security_reports::#{project.id}") }
context 'when project_id is nil' do
it 'returns nil' do
expect(described_class.cache_key(project_id: nil)).to be_nil
end
end
context 'when project_id is not present' do
it 'returns nil' do
expect(described_class.cache_key(project_id: '')).to be_nil
end
end
end
describe '#perform' do
subject(:perform) { worker.perform(project_id) }
let(:worker) { described_class.new }
context 'when there is no project with the given ID' do
let(:project_id) { non_existing_record_id }
it 'does not raise an error' do
expect { perform }.not_to raise_error
end
end
context 'when there is no pipeline associated to the project' do
let(:project_id) { project.id }
before do
stub_licensed_features(sast: true)
end
it 'does not raise an error' do
expect { perform }.not_to raise_error
end
end
context "when the security reports feature is not available" do
where(report_type: ::EE::Enums::Ci::JobArtifact.security_report_file_types.map(&:to_sym))
with_them do
before do
stub_licensed_features(report_type => false)
end
it 'does not execute IngestReportsService' do
expect(::Security::Ingestion::IngestReportsService).not_to receive(:execute)
worker.perform(project.id)
end
end
end
context 'when at least one security report feature is enabled' do
where(report_type: ::EE::Enums::Ci::JobArtifact.security_report_file_types.map(&:to_sym))
with_them do
before do
stub_licensed_features(report_type => true)
update_cache(pipeline)
end
it 'executes IngestReportsService for given pipeline' do
expect(::Security::Ingestion::IngestReportsService).to receive(:execute).with(pipeline)
worker.perform(project.id)
end
end
end
context 'when running SAST analyzers that produce duplicate vulnerabilities' do
let(:pipeline2) do
create(
:ee_ci_pipeline,
:with_sast_report,
status: :success,
ref: 'master',
project: project,
user: project.creator,
pipeline_metadata: create(:ci_pipeline_metadata, project: project)
)
end
where(vulnerability_finding_signatures_enabled: [true, false])
with_them do
before do
stub_licensed_features(
sast: true,
vulnerability_finding_signatures: vulnerability_finding_signatures_enabled
)
update_cache(pipeline)
end
context 'and prefers original analyzer over semgrep when deduplicating' do
let(:artifact_bandit1) do
create(
:ci_build, :sast, :success,
user: project.creator, pipeline: pipeline, project: project
).then { |build| create(:ee_ci_job_artifact, :sast_bandit, job: build) }
end
let(:artifact_bandit2) do
create(
:ci_build, :sast, :success,
user: project.creator, pipeline: pipeline2, project: project
).then { |build| create(:ee_ci_job_artifact, :sast_bandit, job: build) }
end
let(:artifact_semgrep) do
create(
:ci_build, :sast, :success,
user: project.creator, pipeline: pipeline2, project: project
).then { |build| create(:ee_ci_job_artifact, :sast_semgrep_for_bandit, job: build) }
end
it 'does not duplicate vulnerabilities' do
# seeding a scan that should be ingested as a vulnerability
Security::StoreGroupedScansService.execute([artifact_bandit1])
expect(Security::Finding.count).to eq 1
expect(Security::Scan.count).to eq 1
# ingest the security finding/scan into a
# vulnerability/vulnerability_finding
expect { worker.perform(project.id) }.to change {
Vulnerabilities::Finding.count
}.from(0).to(1).and change { Vulnerability.count }.from(0).to(1)
# seeding a scan that is indicating the same vulnerability
# we just ingested
Security::StoreGroupedScansService.execute([artifact_bandit2, artifact_semgrep])
expect(Security::Finding.count).to eq 3
expect(Security::Scan.count).to eq 3
# simulate a new pipeline completing
update_cache(pipeline2)
# After running the worker again, we do not create
# additional vulnerabiities (since they would be duplicates)
expect { worker.perform(project.id) }.to change {
Vulnerabilities::Finding.count
}.by(0).and change { Vulnerability.count }.by(0)
end
end
context 'and prefers semgrep over original analyzer when deduplicating' do
let(:artifact_gosec1) do
create(
:ci_build, :sast, :success,
user: project.creator, pipeline: pipeline, project: project
).then { |build| create(:ee_ci_job_artifact, :sast_gosec, job: build) }
end
let(:artifact_gosec2) do
create(
:ci_build, :sast, :success,
user: project.creator, pipeline: pipeline2, project: project
).then { |build| create(:ee_ci_job_artifact, :sast_gosec, job: build) }
end
let(:artifact_semgrep) do
create(
:ci_build, :sast, :success,
user: project.creator, pipeline: pipeline2, project: project
).then { |build| create(:ee_ci_job_artifact, :sast_semgrep_for_gosec, job: build) }
end
it 'does not duplicate vulnerabilities' do
# seeding a scan that should be ingested as a vulnerability
Security::StoreGroupedScansService.execute([artifact_gosec1])
expect(Security::Finding.count).to eq 1
expect(Security::Scan.count).to eq 1
# ingest the security finding/scan into a
# vulnerability/vulnerability_finding
expect { worker.perform(project.id) }.to change {
Vulnerabilities::Finding.count
}.from(0).to(1).and change { Vulnerability.count }.from(0).to(1)
# seeding a scan that is indicating the same vulnerability
# we just ingested
Security::StoreGroupedScansService.execute([artifact_gosec2, artifact_semgrep])
expect(Security::Finding.count).to eq 3
expect(Security::Scan.count).to eq 3
# simulate a new pipeline completing
update_cache(pipeline2)
# After running the worker again, we do not create
# additional vulnerabiities (since they would be duplicates)
expect { worker.perform(project.id) }.to change {
Vulnerabilities::Finding.count
}.by(0).and change { Vulnerability.count }.by(0)
end
end
end
end
context 'when resolving dropped identifiers', :sidekiq_inline do
let(:artifact_semgrep1) { create(:ee_ci_job_artifact, :sast_semgrep_for_multiple_findings, job: semgrep1_build) }
let(:semgrep1_build) do
create(:ci_build, :sast, :success, user: project.creator, pipeline: pipeline, project: project)
end
let(:pipeline2) do
create(
:ee_ci_pipeline,
:with_sast_report,
status: :success,
ref: 'master',
project: project,
user: project.creator
)
end
let(:artifact_semgrep2) { create(:ee_ci_job_artifact, :sast_semgrep_for_gosec, job: semgrep2_build) }
let(:semgrep2_build) do
create(:ci_build, :sast, :success, user: project.creator, pipeline: pipeline2, project: project)
end
before do
stub_licensed_features(sast: true)
update_cache(pipeline)
end
it 'resolves vulnerabilities' do
expect do
Security::StoreGroupedScansService.execute([artifact_semgrep1])
end.to change { Security::Finding.count }.by(2)
.and change { Security::Scan.count }.by(1)
expect do
worker.perform(project.id)
end.to change { Vulnerabilities::Finding.count }.by(2)
.and change { Vulnerability.count }.by(2)
.and change { project.vulnerabilities.with_resolution(false).count }.by(2)
.and change { project.vulnerabilities.with_states(%w[detected]).count }.by(2)
expect do
Security::StoreGroupedScansService.execute([artifact_semgrep2])
end.to change { Security::Finding.count }.by(1)
.and change { Security::Scan.count }.by(1)
# simulate a new pipeline completing
update_cache(pipeline2)
expect do
worker.perform(project.id)
end.to change { Vulnerabilities::Finding.count }.by(0)
.and change { Vulnerability.count }.by(0)
.and change { project.vulnerabilities.with_resolution(true).count }.by(1)
.and change { project.vulnerabilities.with_states(%w[detected]).count }.by(-1)
.and change { project.vulnerabilities.with_states(%w[resolved]).count }.by(1)
end
end
context "when the same scanner runs multiple times in one pipeline" do
let(:artifact_sast1) do
create(
:ci_build, :sast, :success,
user: project.creator, pipeline: pipeline, project: project
).then { |build| create(:ee_ci_job_artifact, :semgrep_web_vulnerabilities, job: build) }
end
let(:artifact_sast2) do
create(
:ci_build, :sast, :success,
user: project.creator, pipeline: pipeline, project: project
).then { |build| create(:ee_ci_job_artifact, :semgrep_api_vulnerabilities, job: build) }
end
before do
stub_licensed_features(sast: true)
update_cache(pipeline)
end
it "does not mark any of the detected vulnerabilities as resolved",
:aggregate_failures do
Security::StoreGroupedScansService.execute([artifact_sast2])
expect(Security::Finding.count).to eq 1
expect(Security::Scan.count).to eq 1
Security::StoreGroupedScansService.execute([artifact_sast1])
expect(Security::Finding.count).to eq 2
expect(Security::Scan.count).to eq 2
expect { worker.perform(project.id) }.to change { Vulnerability.count }.by(2)
expect(project.vulnerabilities.map(&:resolved_on_default_branch)).not_to include(true)
end
end
end
def update_cache(pipeline)
cache_key = described_class.cache_key(project_id: pipeline.project_id)
Gitlab::Redis::SharedState.with { |redis| redis.set(cache_key, pipeline.id) }
end
end
......@@ -2728,7 +2728,6 @@
- './ee/spec/services/security/security_orchestration_policies/validate_policy_service_spec.rb'
- './ee/spec/services/security/store_grouped_scans_service_spec.rb'
- './ee/spec/services/security/store_scan_service_spec.rb'
- './ee/spec/services/security/store_scans_service_spec.rb'
- './ee/spec/services/security/token_revocation_service_spec.rb'
- './ee/spec/services/security/track_scan_service_spec.rb'
- './ee/spec/services/security/update_training_service_spec.rb'
......@@ -2950,7 +2949,6 @@
- './ee/spec/workers/security/create_orchestration_policy_worker_spec.rb'
- './ee/spec/workers/security/orchestration_policy_rule_schedule_namespace_worker_spec.rb'
- './ee/spec/workers/security/orchestration_policy_rule_schedule_worker_spec.rb'
- './ee/spec/workers/security/store_scans_worker_spec.rb'
- './ee/spec/workers/security/sync_scan_policies_worker_spec.rb'
- './ee/spec/workers/security/track_secure_scans_worker_spec.rb'
- './ee/spec/workers/set_user_status_based_on_user_cap_setting_worker_spec.rb'
......
......@@ -449,6 +449,7 @@
'StageUpdateWorker' => 3,
'StatusPage::PublishWorker' => 5,
'StoreSecurityReportsWorker' => 3,
'Security::StoreSecurityReportsByProjectWorker' => 3,
'SyncSeatLinkRequestWorker' => 20,
'SyncSeatLinkWorker' => 12,
'SystemHookPushWorker' => 3,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment