Skip to content

Implement component usage batch aggregator

Leaminn Ma requested to merge ci-component-usage-aggregator into master

What does this MR do and why?

Background:

In !148216 (merged), we enabled tracking component usages in a Postgres table (p_catalog_resource_component_usages). The next step is to implement a daily worker that aggregates the usage data in batches and saves the counts into the applicable table.

This MR:

This MR is a precursor to implementing the service and worker. It introduces the Aggregator and Cursor classes so that we can collect the usage counts in time-boxed batches. In the next MR, we will run the aggregator as a service every 4 minutes. On each run, it resumes the count from where it last left off.

Our first aggregator service will support a rolling 30-day window where the last_30_day_usage_count column for each catalog resource is updated with the newest usage count.

In the next MR, the plan is to create a service that executes a similar logic to below:

def aggregate!
  # The cursor's target_id resets to 0 when the Redis cursor is re-initialized or when it advances past max target_id.
  # To minimize redundant reprocessing, we add a limiting condition so that the cursor resumes aggregating from targets that haven't been updated yet.
  target_scope = Ci::Catalog::Resource.where('last_30_day_usage_count_updated_at < ?', Date.today.to_time) 

  aggregator = Gitlab::Ci::Components::Usages::Aggregator.new(target_scope: target_scope, <other_args>)

  result = aggregator.each_batch do |usage_counts|
   # Code to update usage counts in the database
  end

  # Log result
end

The approach:

  • This MR follows the basic approach outlined in the snippet here, with additional considerations for edge cases and other scenarios.
  • For now, we only need to support a rolling 30-day window. However, in the future we will likely need to support other types, such as non-rolling monthly windows in #443681. For this reason, these new Aggregator and Cursor classes were designed to be flexible with a variety of aggregation windows.

NOTE: Since this MR doesn't actually execute a service, we will add the necessary database indexes and query plans in the next MR.

Resolves Step 1 of #452545.

MR acceptance checklist

Please evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.

How to set up and validate locally

To test the aggregator, we will seed test data and run it in the Rails console.

  1. Create 3 catalog resources:
bundle exec rake "gitlab:seed:ci_catalog_resources[<YOUR-GROUP-PATH>, 3, true]"
  1. Create a version for each catalog resource:
project_ids = Project.last(3).pluck(:id) # These should be the project IDs from the first step
user = User.first # This should be your root user

Project.where(id: project_ids).each do |project|
  Releases::CreateService.new(project, user, tag: '1.0.0', ref: 'main', legacy_catalog_publish: true).execute
end
  1. For each component, create 1-9 usages in the last 1-9 days:
project_ids = Project.last(3).pluck(:id)
components = Project.where(id: project_ids).map { |p| p.ci_components }.flatten

components.each_with_index do |component, i|
  (1..(i + 1)**2).each do |j|
    Ci::Catalog::Resources::Components::Usage.create!(component: component, catalog_resource: component.catalog_resource, project: component.project, used_by_project_id: j, used_date: Date.today - j.days)
  end
end
  1. Run the aggregator. Observe that the output is equivalent to the one using a direct SQL query.
project_ids = Project.last(3).pluck(:id)
lease_key = 'my_test_lease_key'

# Start with a fresh cursor
Gitlab::Redis::SharedState.with do |redis|
  redis.del(lease_key)
end

aggregator = Gitlab::Ci::Components::Usages::Aggregator.new(
               target_scope: Ci::Catalog::Resource.where(project_id: project_ids),
               group_by_column: :catalog_resource_id,
               usage_start_date: Date.today - 30.days,
               usage_end_date: Date.today - 1.day,
               lease_key: lease_key
             )

batched_usage_counts = []

result = aggregator.each_batch do |usage_counts|
  batched_usage_counts << usage_counts
end

aggregator_usage_counts = batched_usage_counts.flatten.reduce(&:merge).transform_keys! { |k| k.id }
usage_window = result.cursor.usage_window

direct_query_usage_counts = Ci::Catalog::Resources::Components::Usage
                              .includes(:catalog_resource)
                              .select('catalog_resource_id, COUNT(DISTINCT used_by_project_id) AS usage_count')
                              .where(project_id: project_ids)
                              .where(used_date: usage_window.start_date..usage_window.end_date)
                              .group(:catalog_resource_id)
                              .each_with_object({}) { |r, hash| hash[r.catalog_resource_id] = r.usage_count }

aggregator_usage_counts
direct_query_usage_counts
aggregator_usage_counts == direct_query_usage_counts

Screenshot_2024-05-01_at_12.24.08_PM

  1. (Optional) Change the batch sizes in lib/gitlab/ci/components/usages/aggregator.rb and restart the Rails console. Re-run step 4.
          TARGET_BATCH_SIZE = 1
          DISTINCT_USAGE_BATCH_SIZE = 2

Related to #452545

Edited by Leaminn Ma

Merge request reports