Use bounded parallelism when performing global advisory scans

Problem to solve

In Skip advisory scans during initial sync (#421294 - closed) advisory scanning was updated to reduce the chance of scan-after-ingestion monopolizing all instance resources. This was done by skipping some advisories.

However, this might not be efficient, and we might run in situations where an instance runs too many AdvisoryScanWorker jobs in parallel. This might result in database contention because these jobs all try to upsert records in the database tables that track vulnerabilities.

Also, this option is not ideal because we are relying on a pipeline run to trigger the scan of SBOM components which may cause skipped advisory scans. This may be a problem for a couple of reasons:

  1. A mature project with few changes will only rarely have pipelines triggered - pipelines would be the mechanism we'd rely on for a rescan.
  2. The scan latency could be unacceptably high for the same reason as above.

Proposals

Limited capacity worker

In order to ensure all ingested advisories are scanned without a thundering herd problem, a scan queue using limited capacity worker is proposed.

This allows the creation of an upper bound on the number of scans performed after ingestion.

Pros:

  • Comprehensive - can scan the entire backlog without worrying about missing something.
  • Monolith abstractions already exist for this limited capacity worker and state_machines-activerecord.
  • Doesn't need elaborate schemes for denoting initial scans or where scans should start.
  • No changes on the external service nor the protocol side.
  • Allows to order advisories that trigger scans. Example: Advisories w/ higher severity come first.
  • Makes it possible to remove the 14-day limit. The instance could process older advisories if it has enough resources to do so.

Cons:

  • Will scan all ingested advisories - so the resources needed to scan all of the ingested data will still be used, there just won't be a thundering herd problem.
    • This is no longer the case now that !130469 (merged) has been completed. We now only scan the advisories published in the last 14 days instead of all the advisories ingested. If needed, we can also further restrict this so that we decrease the load further.
  • Using a limited capacity worker removes the indempotent! attribute and removes the retry ability. A raised exception that's not handled, means that any projects pending in the job will not see a new vulnerability.

Concurrency limit ⬅ (selected)

We can set a concurrency limit that will have Sidekiq's middleware regulate the max worker count that runs at any given time.

Pros:

  • It's simple to implement and only requires a one line change.
  • Idempotent worker attribute is maintained

Cons:

  • If there is a sustained workload over the limit, the queue is going to grow until the limit is disabled or the workload drops under the limit. The risk of this happening is small, and will reduce even further with Increase the frequency of the package metadata ... (#441508) • Unassigned • Backlog.

Implementation Plan

  • Update the PackageMetadata::GlobalAdvisoryScanWorker. Add the concurrency_limit attribute to use the application setting so that it uses the application setting.

    concurrency_limit -> { 10 }
Previous implementation plan
  1. Update the advisory model to add a queue.
    1. Add migration to add the following columns to pm_advisories.

      • state => smallint
      • scan_started_at => datetime_with_timezone
      • scan_finished_at => datetime_with_timezone
      • Add index on state and scan_started_at columns
    2. Create a state_machine (see below example).

      • General examples also exist in gem and in other models in the codebase.
    3. Add a scope to the model to return the pending queue (used to determine how much work is remaining by the worker).

      • PackageMetadata::Advisory.scan_queue
        • with_state(:scan_pending)
    4. Add a scope to the model to return stalled scans

      • PackageMetadata::Advisory.stalled_queue
        • with_state(:scan_started).and(where("current_time - scan_started_at > interval '1' DAY"))
          • 1 day is chosen a
    5. Add a method to take an advisory off of the scan queue (in transaction).

      • e.g. method name: PackageMetadata::Advisory.take_from_scan_queue!
          module PackageMetadata
            class Advisory < ApplicationRecord
              def take_from_scan_queue
                transaction do
                  advisory = scan_queue.first
                  advisory = stalled_queue.first if advisory.nil?
                  advisory.schedule_scan unless advisory.nil?
                end
              end
            end
          end
  2. Change the advisory scan worker to a limited capacity worker.
    • perform_work
      • Takes one advisory from the queue using advisory = PackageMetadata::Advisory.take_from_scan_queue!
      • Passes it to the scan service.
      • On success updates advisory advisory.scan_success
      • On failure updates advisory advisory.scan_failure
    • remaining_work_count
      • PackageMetadata::Advisory.scan_queue.count + PackageMetadata::Advisory.stalled_queue.count
    • max_running_jobs
      • Can be 1 in the beginning.
    • Add a cron worker which triggers the above with perform_with_capacity via cron.
  3. Change advisory ingestion service
    • Remove event publishing
    • publish! should instead
      1. Add each advisory to scan queue advisory.schedule_scan
      2. Trigger the scan worker at the end of a batch of advisories via a perform_with_capacity.
  4. Remove the event store logic for scanning advisories.

State machine draft code

module PackageMetadata
  class Advisory < ApplicationRecord
    state_machine :initial => :scan_pending do
      event :schedule_scan do
        transition :scan_failed => :scan_pending
        transition :scan_succeeded => :scan_pending
      end

      event :scan do
        transition :scan_pending => :scan_started
      end

      before_transition :scan_pending => :scan_started do |advisory, _|
        advisory.scan_started_at = Time.now
      end
      
      event :scan_failure do
        transition :scan_started => :scan_failed
      end

      event :scan_success do
        transition :scan_started => :scan_succeeded
      end

      before_transition :scan_started => [:scan_failed, :scan_succeeded] do |advisory, _|
        advisory.scan_finished_at = Time.now
      end
    end
end

Verification

Monitor the CVS dashboard, and confirm that the job queue does not continue to grow indefinitely.

Edited Mar 01, 2024 by Oscar Tovar
Assignee Loading
Time tracking Loading