Use bounded parallelism when performing global advisory scans
Problem to solve
In Skip advisory scans during initial sync (#421294 - closed) advisory scanning was updated to reduce the chance of scan-after-ingestion monopolizing all instance resources. This was done by skipping some advisories.
However, this might not be efficient, and we might run in situations where an instance runs too many AdvisoryScanWorker
jobs in parallel. This might result in database contention because these jobs all try to upsert records in the database tables that track vulnerabilities.
Also, this option is not ideal because we are relying on a pipeline run to trigger the scan of SBOM components which may cause skipped advisory scans. This may be a problem for a couple of reasons:
- A mature project with few changes will only rarely have pipelines triggered - pipelines would be the mechanism we'd rely on for a rescan.
- The scan latency could be unacceptably high for the same reason as above.
Proposals
Limited capacity worker
In order to ensure all ingested advisories are scanned without a thundering herd problem, a scan queue using limited capacity worker is proposed.
This allows the creation of an upper bound on the number of scans performed after ingestion.
Pros:
- Comprehensive - can scan the entire backlog without worrying about missing something.
- Monolith abstractions already exist for this limited capacity worker and state_machines-activerecord.
- Doesn't need elaborate schemes for denoting initial scans or where scans should start.
- No changes on the external service nor the protocol side.
- Allows to order advisories that trigger scans. Example: Advisories w/ higher severity come first.
- Makes it possible to remove the 14-day limit. The instance could process older advisories if it has enough resources to do so.
Cons:
-
Will scan all ingested advisories - so the resources needed to scan all of the ingested data will still be used, there just won't be a thundering herd problem.- This is no longer the case now that !130469 (merged) has been completed. We now only scan the advisories published in the last 14 days instead of all the advisories ingested. If needed, we can also further restrict this so that we decrease the load further.
- Using a limited capacity worker removes the
indempotent!
attribute and removes the retry ability. A raised exception that's not handled, means that any projects pending in the job will not see a new vulnerability.
⬅ (selected)
Concurrency limit We can set a concurrency limit that will have Sidekiq's middleware regulate the max worker count that runs at any given time.
Pros:
- It's simple to implement and only requires a one line change.
- Idempotent worker attribute is maintained
Cons:
- If there is a sustained workload over the limit, the queue is going to grow until the limit is disabled or the workload drops under the limit. The risk of this happening is small, and will reduce even further with Increase the frequency of the package metadata ... (#441508) • Unassigned • Backlog.
Implementation Plan
-
Update the PackageMetadata::GlobalAdvisoryScanWorker
. Add theconcurrency_limit
attribute to use the application setting so that it uses the application setting.concurrency_limit -> { 10 }
Previous implementation plan
- Update the advisory model to add a queue.
-
Add migration to add the following columns to
pm_advisories
.-
state
=>smallint
-
scan_started_at
=>datetime_with_timezone
-
scan_finished_at
=>datetime_with_timezone
- Add index on
state
andscan_started_at
columns
-
-
Create a
state_machine
(see below example).- General examples also exist in gem and in other models in the codebase.
-
Add a scope to the model to return the pending queue (used to determine how much work is remaining by the worker).
-
PackageMetadata::Advisory.scan_queue
with_state(:scan_pending)
-
-
Add a scope to the model to return stalled scans
-
PackageMetadata::Advisory.stalled_queue
-
with_state(:scan_started).and(where("current_time - scan_started_at > interval '1' DAY"))
- 1 day is chosen a
-
-
-
Add a method to take an advisory off of the scan queue (in transaction).
- e.g. method name:
PackageMetadata::Advisory.take_from_scan_queue!
module PackageMetadata class Advisory < ApplicationRecord def take_from_scan_queue transaction do advisory = scan_queue.first advisory = stalled_queue.first if advisory.nil? advisory.schedule_scan unless advisory.nil? end end end end
- e.g. method name:
-
- Change the advisory scan worker to a limited capacity worker.
-
perform_work
- Takes one advisory from the queue using
advisory = PackageMetadata::Advisory.take_from_scan_queue!
- Passes it to the scan service.
- On success updates advisory
advisory.scan_success
- On failure updates advisory
advisory.scan_failure
- Takes one advisory from the queue using
-
remaining_work_count
-
PackageMetadata::Advisory.scan_queue.count
+PackageMetadata::Advisory.stalled_queue.count
-
-
max_running_jobs
- Can be
1
in the beginning.
- Can be
- Add a cron worker which triggers the above with
perform_with_capacity
via cron.
-
- Change advisory ingestion service
- Remove event publishing
-
publish!
should instead- Add each advisory to scan queue
advisory.schedule_scan
- Trigger the scan worker at the end of a batch of advisories via a
perform_with_capacity
.
- Add each advisory to scan queue
- Remove the event store logic for scanning advisories.
State machine draft code
module PackageMetadata
class Advisory < ApplicationRecord
state_machine :initial => :scan_pending do
event :schedule_scan do
transition :scan_failed => :scan_pending
transition :scan_succeeded => :scan_pending
end
event :scan do
transition :scan_pending => :scan_started
end
before_transition :scan_pending => :scan_started do |advisory, _|
advisory.scan_started_at = Time.now
end
event :scan_failure do
transition :scan_started => :scan_failed
end
event :scan_success do
transition :scan_started => :scan_succeeded
end
before_transition :scan_started => [:scan_failed, :scan_succeeded] do |advisory, _|
advisory.scan_finished_at = Time.now
end
end
end
Verification
Monitor the CVS dashboard, and confirm that the job queue does not continue to grow indefinitely.