Skip to content

VSD no-op count worker setup

What does this MR do and why?

This MR sets up the count worker for Value Stream Dashboard feature. The worker will be timed around the end of month (-5 days):

  • Schedule the job every 5 minutes (not in this MR).
  • Return early if the feature is not available or we're not at the end of month.
  • Take 100 aggregation records (top level namespaces which enabled the feature) ordered by the last_run_at timestamp. Not executed aggregations (last_run_at: nil) take precedence.
  • Process the aggregations sequentially. (currently a no-op)
  • The next job after a few minutes will continue the processing where the previous has stopped.

I deliberately skipped the record counter logic implementation because it would make this MR significantly bigger and more difficult to review.

Batching query

Worst case scenario the query will read 2 x batch_size + 1 (201) rows (unlikely).

Migration

Up:

main: == 20230516080816 ReplaceVsdIndexWithNullsFirstOrder: migrating ===============
main: -- transaction_open?()
main:    -> 0.0000s
main: -- view_exists?(:postgres_partitions)
main:    -> 0.0530s
main: -- index_exists?(:value_stream_dashboard_aggregations, [:last_run_at, :namespace_id], {:where=>"enabled IS TRUE", :name=>"index_on_value_stream_dashboard_aggregations_last_run_at_and_id", :order=>{:last_run_at=>"ASC NULLS FIRST"}, :algorithm=>:concurrently})
main:    -> 0.0035s
main: -- execute("SET statement_timeout TO 0")
main:    -> 0.0004s
main: -- add_index(:value_stream_dashboard_aggregations, [:last_run_at, :namespace_id], {:where=>"enabled IS TRUE", :name=>"index_on_value_stream_dashboard_aggregations_last_run_at_and_id", :order=>{:last_run_at=>"ASC NULLS FIRST"}, :algorithm=>:concurrently})
main:    -> 0.0015s
main: -- execute("RESET statement_timeout")
main:    -> 0.0003s
main: -- transaction_open?()
main:    -> 0.0000s
main: -- view_exists?(:postgres_partitions)
main:    -> 0.0008s
main: -- indexes(:value_stream_dashboard_aggregations)
main:    -> 0.0025s
main: -- remove_index(:value_stream_dashboard_aggregations, {:algorithm=>:concurrently, :name=>"index_on_value_stream_dashboard_aggregations_last_run_at_id"})
main:    -> 0.0009s
main: == 20230516080816 ReplaceVsdIndexWithNullsFirstOrder: migrated (0.0857s) ======


ci: == 20230516080816 ReplaceVsdIndexWithNullsFirstOrder: migrating ===============
ci: -- transaction_open?()
ci:    -> 0.0000s
ci: -- view_exists?(:postgres_partitions)
ci:    -> 0.0008s
ci: -- index_exists?(:value_stream_dashboard_aggregations, [:last_run_at, :namespace_id], {:where=>"enabled IS TRUE", :name=>"index_on_value_stream_dashboard_aggregations_last_run_at_and_id", :order=>{:last_run_at=>"ASC NULLS FIRST"}, :algorithm=>:concurrently})
ci:    -> 0.0033s
ci: -- execute("SET statement_timeout TO 0")
ci:    -> 0.0005s
ci: -- add_index(:value_stream_dashboard_aggregations, [:last_run_at, :namespace_id], {:where=>"enabled IS TRUE", :name=>"index_on_value_stream_dashboard_aggregations_last_run_at_and_id", :order=>{:last_run_at=>"ASC NULLS FIRST"}, :algorithm=>:concurrently})
ci:    -> 0.0015s
ci: -- execute("RESET statement_timeout")
ci:    -> 0.0003s
ci: -- transaction_open?()
ci:    -> 0.0000s
ci: -- view_exists?(:postgres_partitions)
ci:    -> 0.0008s
ci: -- indexes(:value_stream_dashboard_aggregations)
ci:    -> 0.0026s
ci: -- remove_index(:value_stream_dashboard_aggregations, {:algorithm=>:concurrently, :name=>"index_on_value_stream_dashboard_aggregations_last_run_at_id"})
ci:    -> 0.0010s
ci: == 20230516080816 ReplaceVsdIndexWithNullsFirstOrder: migrated (0.0330s) ======

Down:

ci: == 20230516080816 ReplaceVsdIndexWithNullsFirstOrder: reverting ===============
ci: -- transaction_open?()
ci:    -> 0.0000s
ci: -- view_exists?(:postgres_partitions)
ci:    -> 0.0588s
ci: -- index_exists?(:value_stream_dashboard_aggregations, [:last_run_at, :namespace_id], {:where=>"enabled IS TRUE", :name=>"index_on_value_stream_dashboard_aggregations_last_run_at_id", :algorithm=>:concurrently})
ci:    -> 0.0049s
ci: -- execute("SET statement_timeout TO 0")
ci:    -> 0.0008s
ci: -- add_index(:value_stream_dashboard_aggregations, [:last_run_at, :namespace_id], {:where=>"enabled IS TRUE", :name=>"index_on_value_stream_dashboard_aggregations_last_run_at_id", :algorithm=>:concurrently})
ci:    -> 0.0025s
ci: -- execute("RESET statement_timeout")
ci:    -> 0.0008s
ci: -- transaction_open?()
ci:    -> 0.0000s
ci: -- view_exists?(:postgres_partitions)
ci:    -> 0.0008s
ci: -- indexes(:value_stream_dashboard_aggregations)
ci:    -> 0.0035s
ci: -- remove_index(:value_stream_dashboard_aggregations, {:algorithm=>:concurrently, :name=>"index_on_value_stream_dashboard_aggregations_last_run_at_and_id"})
ci:    -> 0.0011s
ci: == 20230516080816 ReplaceVsdIndexWithNullsFirstOrder: reverted (0.1170s) ======

main: == 20230516080816 ReplaceVsdIndexWithNullsFirstOrder: reverting ===============
main: -- transaction_open?()
main:    -> 0.0000s
main: -- view_exists?(:postgres_partitions)
main:    -> 0.0488s
main: -- index_exists?(:value_stream_dashboard_aggregations, [:last_run_at, :namespace_id], {:where=>"enabled IS TRUE", :name=>"index_on_value_stream_dashboard_aggregations_last_run_at_id", :algorithm=>:concurrently})
main:    -> 0.0028s
main: -- execute("SET statement_timeout TO 0")
main:    -> 0.0004s
main: -- add_index(:value_stream_dashboard_aggregations, [:last_run_at, :namespace_id], {:where=>"enabled IS TRUE", :name=>"index_on_value_stream_dashboard_aggregations_last_run_at_id", :algorithm=>:concurrently})
main:    -> 0.0016s
main: -- execute("RESET statement_timeout")
main:    -> 0.0003s
main: -- transaction_open?()
main:    -> 0.0000s
main: -- view_exists?(:postgres_partitions)
main:    -> 0.0008s
main: -- indexes(:value_stream_dashboard_aggregations)
main:    -> 0.0023s
main: -- remove_index(:value_stream_dashboard_aggregations, {:algorithm=>:concurrently, :name=>"index_on_value_stream_dashboard_aggregations_last_run_at_and_id"})
main:    -> 0.0010s
main: == 20230516080816 ReplaceVsdIndexWithNullsFirstOrder: reverted (0.0787s) ======

MR acceptance checklist

This checklist encourages us to confirm any changes have been analyzed to reduce risks in quality, performance, reliability, security, and maintainability.

Related to #402739 (closed)

Edited by Adam Hegyi

Merge request reports