VSD - Implement the value stream dashboard count worker

Add a new worker that is scheduled in every 5 minutes, where we limit the maximum runtime to 3 minutes using the Analytics::CycleAnalytics::RuntimeLimiter class.

The worker invokes a service that does the actual record counting. The service will return a cursor and an array of counts which will be bulk inserted into the Analytics::ValueStreamDashboard::Count.

As the first step, the worker fetches a cursor from redis or DB. The cursor will help continuing the processing where the previous worker was stopped.

Cursor logic

When the cursor is nil

This case is when there was no aggregation executed yet (new installation) or when the processing for the current month has finished.

Find the first aggregation namespace record and check the last runtime

aggregation = Analytics::ValueStreamDashboard::Aggregation.first
return if aggregation.nil? # feature is not used at all
return if aggregation.last_run_at > 1.month.ago # we store the counts monthly

cursor = { top_level_namespace_id: namespace_id, namespace_id: -1, metric: 1, last_count: 0, last_value: -1 } # initial cursor

When the cursor is not nil

cursor = fetch_cursor # to be implemented, maybe we can use redis?

Iteration logic (pseudo code)

  1. Iterate over Analytics::ValueStreamDashboard::Aggregation.
  2. Iterate over the subgroups or project namespaces.
  3. Iterate over the defined metrics (enum).
  4. Invoke the batch counter.
  5. If time is up: store the cursor and return.
  6. If a count is finished: add it to the counts array.
  7. If loop is done: reset the cursor (nil).
counts = []
aggregation_scope = Analytics::ValueStreamDashboard::Aggregation.where('namespace_id >= ?', cursor[:top_level_namespace_id])
aggregation_scope.each_batch do |aggregations|
  aggregations.each do |aggregation|
    # TODO: check license, disable the aggregation if license is missing.
    metrics = Analytics::ValueStreamDashboard::Count.metrics.gte(cursor[:metric]) # list of metrics values from the enum
    metrics.each do |metric|
      Namespace.where('traversal_ids[1] = ?', namespace_id).where(type: 'Project').where('id > ?', cursor[:namespace_id]).each_batch do |namespaces| # or project or group, depending on the metric
        project_namespaces.each do |namespace|
          last_count, last_value = build_scope_for_metric(metric, namespace).each_batch_count(last_count: cursor[:last_count], last_value: cursor[:last_value]) do
            runtime_limiter.over_time?
          end
      
          if runtime_limiter.over_time?
            store_cursor(top_level_namespace: aggregation.namespace_id, namespace_id: namespace.id last_count: count, last_value: last_value, metric_id: metric)
            return
          else
            counts << { namespace_id: namespace.id, metric: metric, count: last_count }
          end
        end
      end
    end
  end
end

return [cursor, counts]

build_scope_for_metric method

This method is responsible for building the scope for the counter queries. Depending on the passed in metric, it would create the appropriate AR scope.

Example: when metric is issue count

Issue is associated to Project at the moment which means that the outer query should iterate over the Namespaces::ProjectNamespace rows for the given top-level Group.

Namespace.where('traversal_ids[1] = ?', namespace_id).where(type: 'Project').where('id > ?', cursor[:namespace_id]).each_batch do |namespaces|
  Project.where(project_namespace_id: namespaces.select(:id).each do |project|
    # build_scope_for_metric would do this: 
    last_count, last_value = Issue.where(project_id: project.id).each_batch_count(column :iid, # rest of the arguments)
  end
end

We could come up with some sort of metric -> scope mapping:

{
  issues: -> (parent) { Issue.where(project_id: parent.id) },
  groups: -> (parent) { Group.where(parent_id: parent.id) }
}

As the first step, implement the counting worker for `issues` only and do the rest as a follow-up.
Edited by Adam Hegyi