Skip to content
Snippets Groups Projects

POC for ci analytics on ClickHouse

Closed Vladimir Shushlin requested to merge vshushlin/runner-ch-poc into master

What does this MR do and why?

Here's the explaining video: https://www.loom.com/share/3cf98980e8334f14a4740836989f4ff6?sid=d80c1c6d-c636-4236-aad9-b1d75785b910

This MR is Proof of concept of using ClickHouse for running analytics queries on GitLab CI database.

It is focused on implementing Wait time to pick up a job from the runner dashboard: #390921[new.png]

Such queries are slow in Postgres even with indexes and specialized table that has only small portion of builds: !110137 (comment 1298955647)

This MR adds:

  1. a few tables to ClickHouse schema
  2. a service to sync finished builds from postgres to clickhouse
  3. a few queries with comparable performance(see below)

Data ingestion strategy

The rough idea is:

  1. to only sync finished builds (Click House isn't optimized for updating the data)
  2. run a special background worker every X amount of time: it would collect new records and push them in batches

Schema

There are 3 tables:

  1. ci_finished_builds - basically raw data, we can calculate metrics on it directly, but it may be not optimal
  2. ci_finished_builds_max_finished_at - for storing the maximum value of tuple (finished_at, id) for easy access during sync
  3. ci_finished_builds_by_runner_type - an aggregate materialized view storing pre-calculated percentiles

Queries:

-- direct aggregate
SELECT   toStartOfInterval(started_at, interval 5 minute) AS started_at_bucket,
         count(*),
         quantile(0.9)(age('second', queued_at, started_at)),
         quantile(0.5)(age('second', queued_at, started_at)),
         quantile(0.25)(age('second', queued_at, started_at))
FROM     ci_finished_builds
WHERE    status = 'success'
AND      runner_type = 0
GROUP BY started_at_bucket
ORDER BY started_at_bucket DESC limit 1000;

-- 1000 rows in set. Elapsed: 0.393 sec. Processed 16.54 million rows, 297.79 MB (42.05 million rows/s., 756.98 MB/s.)

-- aggreated on materialized view
SELECT   started_at_bucket,
         countMerge(count_builds),
         quantileMerge(0.9)(wait_time_90_percentile),
         quantileMerge(0.50)(wait_time_50_percentile),
         quantileMerge(0.25)(wait_time_25_percentile)
FROM     ci_finished_builds_by_runner_type
WHERE    status = 'success'
AND      runner_type = 0
GROUP BY started_at_bucket
ORDER BY started_at_bucket DESC limit 1000;

-- 1000 rows in set. Elapsed: 0.784 sec. Processed 7.79 thousand rows, 3.16 MB (9.95 thousand rows/s., 4.04 MB/s.)

Both queries generate similar data, example:

┌───started_at_bucket─┬─countMerge(count_builds)─┬─quantileMerge(0.9)(wait_time_90_percentile)─┬─quantileMerge(0.5)(wait_time_50_percentile)─┬─quantileMerge(0.25)(wait_time_25_percentile)─┐
│ 2023-06-18 12:50:00 │                      129 │                                     31867.2 │                                       21575 │                                        16678 │
│ 2023-06-18 12:45:00 │                      124 │                                     33349.8 │                                       22863 │                                        16601 │
│ 2023-06-18 12:40:00 │                      136 │                                     29959.5 │                                       22481 │                                        15853 │
│ 2023-06-18 12:35:00 │                      129 │                                     33766.8 │                                       20995 │                                        15818 │
│ 2023-06-18 12:30:00 │                      107 │                                     32567.4 │                                       20760 │                                        15981 │
│ 2023-06-18 12:25:00 │                      132 │                          32304.500000000004 │                                       21357 │                                      15688.5 │
│ 2023-06-18 12:20:00 │                      122 │                          32344.000000000004 │                                     20804.5 │                                     15828.25 │
│ 2023-06-18 12:15:00 │                      128 │                                     33010.6 │                                       21306 │                                        15656 │
│ 2023-06-18 12:10:00 │                      133 │                                     31407.2 │                                       21868 │                                        15779 │

As you see, the second query reads significantly smaller amount of data, but it takes almost exactly the same amount of time.

I have a few hypothesis why the time doesn't improve much:

  1. every bucket only has about 100 rows, and maybe it's easier for ClickHouse to just calculate quantiles directly, than using complex quantileState structure.
  2. 1M is nowhere near enough to test performance and the time I'm getting it just basically just overhead on parsing query etc...

Screenshots or screen recordings

How to set up and validate locally

MR acceptance checklist

This checklist encourages us to confirm any changes have been analyzed to reduce risks in quality, performance, reliability, security, and maintainability.

Edited by Vladimir Shushlin

Merge request reports

Loading
Loading

Activity

Filter activity
  • Approvals
  • Assignees & reviewers
  • Comments (from bots)
  • Comments (from users)
  • Commits & branches
  • Edits
  • Labels
  • Lock status
  • Mentions
  • Merge request status
  • Tracking
1 # frozen_string_literal: true
2
3 module ClickHouse
4 module DataIngestion
5 class SyncCiFinishedBuildsService
6 # TODO: use exclusite lease or something to avoid concurrent execution
7 # TODO: limit number of records
8 def execute
9 builds_finished_since_last_sync
10 .each_batch(column: :finished_at, order_hint: :id) do |builds_batch|
  • Adam Hegyi
    Adam Hegyi @ahegyi started a thread on the diff
  • 1 # frozen_string_literal: true
    2
    3 module ClickHouse
    4 module DataIngestion
    5 class SyncCiFinishedBuildsService
    6 # TODO: use exclusite lease or something to avoid concurrent execution
    7 # TODO: limit number of records
    8 def execute
    9 builds_finished_since_last_sync
    10 .each_batch(column: :finished_at, order_hint: :id) do |builds_batch|
    11 insert_builds(builds_batch.includes(:runner, :runner_manager)) # rubocop: disable CodeReuse/ActiveRecord
  • Adam Hegyi
    Adam Hegyi @ahegyi started a thread on the diff
  • 16
    17 BUILD_FIELDS = [:id, :project_id, :pipeline_id, :status, :finished_at,
    18 :created_at, :started_at, :queued_at, :runner_id,
    19 :runner_manager_system_xid].freeze
    20 RUNNER_FIELDS = [:run_untagged, :type].freeze
    21 RUNNER_MANAGER_FIELDS = [:version, :revision, :platform, :architecture].freeze
    22
    23 def builds_finished_since_last_sync
    24 finished_at, id = last_record_finished_at_and_id
    25
    26 return Ci::Build unless finished_at
    27
    28 Ci::Build.finished_after(finished_at, id)
    29 end
    30
    31 def insert_builds(builds)
    • I'm hoping to get some tooling implemented that makes batch inserts memory efficient using a tmp csv file + compression: #414937 (comment 1461641827)

    • I wonder how inefficient will that be if I load let's say 10k records in a single batch? :thinking:

      Or use similar approach #414937 (comment 1461641827): load smaller batches and concatenate values :thinking:

    • @vshushlin,

      Temporarily writing 10K rows on the disk should be "cheaper" than keeping 10K rows in memory until a batch is done (GC might actually free up memory while a batch is written to disk since there is no more reference to those strings). Uploading a csv with 10K rows can be done with a streaming approach, using the body_stream option for HTTPParty.

      Anyway, this step should be abstracted away in the client itself.

    • Please register or sign in to reply
  • Adam Hegyi
    Adam Hegyi @ahegyi started a thread on the diff
  • 1 -- source table for CI analytics, almost useless on it's own, but it's a basis for creating materialized views
    2 CREATE TABLE ci_finished_builds
    3 (
    4 id UInt64 DEFAULT 0,
    5 project_id UInt32 DEFAULT 0,
  • Adam Hegyi
    Adam Hegyi @ahegyi started a thread on the diff
  • 10 finished_at DateTime64(6, 'UTC') DEFAULT now(),
    11 created_at DateTime64(6, 'UTC') DEFAULT now(),
    12 started_at DateTime64(6, 'UTC') DEFAULT now(),
    13 queued_at DateTime64(6, 'UTC') DEFAULT now(),
    14 runner_id UInt32 DEFAULT 0,
    15 runner_manager_system_xid String DEFAULT '',
    16
    17 --- Runner fields
    18 runner_run_untagged Boolean DEFAULT FALSE,
    19 runner_type UInt8 DEFAULT 0,
    20 runner_manager_version LowCardinality(String) DEFAULT '',
    21 runner_manager_revision LowCardinality(String) DEFAULT '',
    22 runner_manager_platform LowCardinality(String) DEFAULT '',
    23 runner_manager_architecture LowCardinality(String) DEFAULT ''
    24 )
    25 ENGINE = MergeTree
    • For the events data, I opted for ReplacingMergeTree so we can eventually "eliminate" accidentally duplicated data. Implementing an at-most-once guarantee into the data syncing process is very difficult since CH doesn't have unique indexes.

      Example: a batch is accidentally inserted twice.

    • Good idea!

    • Please register or sign in to reply
  • Adam Hegyi
    Adam Hegyi @ahegyi started a thread on the diff
  • 1 -- source table for CI analytics, almost useless on it's own, but it's a basis for creating materialized views
    2 CREATE TABLE ci_finished_builds
    3 (
    4 id UInt64 DEFAULT 0,
    5 project_id UInt32 DEFAULT 0,
    6 pipeline_id UInt32 DEFAULT 0, -- this is called commit_id in the main app
    7 status LowCardinality(String) DEFAULT '',
    8
    9 --- Fields to calculate timings
    10 finished_at DateTime64(6, 'UTC') DEFAULT now(),
    11 created_at DateTime64(6, 'UTC') DEFAULT now(),
    12 started_at DateTime64(6, 'UTC') DEFAULT now(),
    13 queued_at DateTime64(6, 'UTC') DEFAULT now(),
    14 runner_id UInt32 DEFAULT 0,
    15 runner_manager_system_xid String DEFAULT '',
    16
  • Adam Hegyi
    Adam Hegyi @ahegyi started a thread on the diff
  • 1 -- This table is is needed to quickly calculate max(finished at, id) on ci_finished_builds for data ingestion
    2 CREATE MATERIALIZED VIEW ci_finished_builds_max_finished_at
    3 ENGINE = AggregatingMergeTree()
    4 ORDER BY (max_finished_at_id)
    5 POPULATE AS
    6 SELECT
    7 maxSimpleState(tuple(finished_at, id)) as max_finished_at_id
    • This one looks cool. For Contribution Analytics I was thinking about keeping track of the cursor (finished_at, id) in a separate table.

    • Please register or sign in to reply
  • Adam Hegyi
    Adam Hegyi @ahegyi started a thread on the diff
  • 1 CREATE MATERIALIZED VIEW ci_finished_builds_by_runner_type
    • The general advice for defining MVs:

      • Create an empty table that mimics your MVs schema.
      • Create an MV pointing to the empty table.

      See an example in the description: #414610 (closed)

      AFAIK this will make it easier to alter the underlying DB structure (I haven't tested it yet).

    • Please register or sign in to reply
  • Adam Hegyi
    Adam Hegyi @ahegyi started a thread on the diff
  • 1 CREATE MATERIALIZED VIEW ci_finished_builds_by_runner_type
    2 ENGINE = AggregatingMergeTree()
    3 ORDER BY (status, runner_type, started_at_bucket)
    • Changing the sorting might "fix" the performance issue from the video. Since you're looking at the last N buckets:

      Suggested change
      3 ORDER BY (status, runner_type, started_at_bucket)
      3 ORDER BY (started_at_bucket, status, runner_type)

      Before measuring the query performance, ensure that the table is optimized:

      • OPTIMIZE TABLE my_table final;
      • And that there are no BG merge operations running: SELECT * FROM system.mutations; (should return 0 rows)
    • Please register or sign in to reply
  • Thanks, @vshushlin! I left a few comments.

  • 173 174 scope :with_live_trace, -> { where('EXISTS (?)', Ci::BuildTraceChunk.where("#{quoted_table_name}.id = #{Ci::BuildTraceChunk.quoted_table_name}.build_id").select(1)) }
    174 175 scope :with_stale_live_trace, -> { with_live_trace.finished_before(12.hours.ago) }
    175 176 scope :finished_before, -> (date) { finished.where('finished_at < ?', date) }
    177
    178 # TODO: add index to facilitate this query (state, finished_at, id)
    179 scope :finished_after, -> (finished_at, id = nil) { finished.where('finished_at > ?', finished_at).or(finished.where('finished_at = ? AND id > ?', finished_at, id)) }
    • Why do we need to iterate builds instead of sending data to Clickhouse immediately once a build is complete? If there is a need for aggregation, we can build and aggregation data structure somewhere, but do we know that we need to do it actually?

      Edited by Grzegorz Bizon
    • @grzesiek ClickHouse likes to receive the data in (large) batches.

      You can read more in https://clickhouse.com/docs/en/optimize/bulk-inserts

      It also has build-in "asynchronous inserts", but I'm not sure how reliable they are given that CH batches the data in memory :shrug:

    • @vshushlin So, how often do we want to run SyncCiFinishedBuildsService? Please be aware that we may be processing around 0.5 million builds per hour these days. It might be quite expensive to build a large bulk insert payload for 0.5M entries, right? The number can grow quite quickly over the next few years.

      Edited by Grzegorz Bizon
    • @grzesiek

      The idea was to run it every 5 minutes, and the @ahegyi was suggesting to optimize building the payload with CSV file: #414937 (comment 1461641827)

      Now I wonder though, if it will be possible to keep up with the load in a single thread :thinking:

      Actually, WDYT about using already existing partitioning to parallelize things? We can create one "loader" process per partition. (I'm not familiar with our partitioning at all)

      Edited by Vladimir Shushlin
    • @vshushlin,

      Now I wonder though, if it will be possible to keep up with the load in a single thread

      A single thread will go a long way, assuming 1s for loading 1K batch of builds from PG where we allow the max runtime to be 3 minutes (3 minutes PG + 2 minutes CH) we could process 150K+ rows (1.8m rows / hour).

      Idea for parallelization: minute ranges

      • Worker 1 looks at builds between 00-30 minute range.
      • Worker 2 looks at builds between 30-60 minute range.

      This might complicate the batching logic and we might need to track the cursors separately.

      CSV writing and uploading process should be also batched/rotated. For example once we reach the 100K rows, we close the CSV file and invoke the uploader for CH, after that we can open a new CSV file.

    • 1.8m rows / hour

      @ahegyi As we are currently processing 0.5M per hour, soon it might be 1M, then 1.8M does not seem to offer a lot of headroom :thinking:

    • @grzesiek, there are many options for tweaking the background jobs. Our "luck" is that we can safely offload the read queries to the replicas which means that we might run more queries per job. I wouldn't complicate the initial PoC with a parallel worker implementation (needs more thinking).

      The 1 second runtime per batch was an estimation based on PG.ai which we know that it performs slower than PRD. Reading 1K rows from a random id takes 700ms: https://console.postgres.ai/gitlab/gitlab-production-ci/sessions/20980/commands/68663

    • Please register or sign in to reply
  • Nice PoC @vshushlin! I left one, tiny question :)

  • Grzegorz Bizon removed review request for @grzesiek

    removed review request for @grzesiek

  • 18 :created_at, :started_at, :queued_at, :runner_id,
    19 :runner_manager_system_xid].freeze
    20 RUNNER_FIELDS = [:run_untagged, :type].freeze
    21 RUNNER_MANAGER_FIELDS = [:version, :revision, :platform, :architecture].freeze
    22
    23 def builds_finished_since_last_sync
    24 finished_at, id = last_record_finished_at_and_id
    25
    26 return Ci::Build unless finished_at
    27
    28 Ci::Build.finished_after(finished_at, id)
    29 end
    30
    31 def insert_builds(builds)
    32 query = <<~SQL
    33 INSERT INTO ci_finished_builds (#{column_names.join(',')}) VALUES #{values(builds)}
  • 1 -- source table for CI analytics, almost useless on it's own, but it's a basis for creating materialized views
    2 CREATE TABLE ci_finished_builds
    3 (
    4 id UInt64 DEFAULT 0,
    5 project_id UInt32 DEFAULT 0,
    6 pipeline_id UInt32 DEFAULT 0, -- this is called commit_id in the main app
    7 status LowCardinality(String) DEFAULT '',
    8
    9 --- Fields to calculate timings
    10 finished_at DateTime64(6, 'UTC') DEFAULT now(),
  • Vladimir Shushlin added 2412 commits

    added 2412 commits

    Compare with previous version

  • Contributor

    :warning: @vshushlin Some end-to-end (E2E) tests have been selected based on the stage label on this MR.

    Please start the trigger-omnibus-and-follow-up-e2e job in the qa stage and ensure the tests in follow-up-e2e:package-and-test-ee pipeline are passing before this MR is merged. (The E2E test pipeline is computationally intensive and we cannot afford running it automatically for all pushes/rebases. Therefore, this job must be triggered manually after significant changes at least once.)

    If you would like to run all E2E tests, please apply the pipeline:run-all-e2e label and trigger a new pipeline. This will run all tests in e2e:package-and-test pipeline.

    The E2E test jobs are allowed to fail due to flakiness. For the list of known failures please refer to the latest pipeline triage issue.

    Once done, please apply the :white_check_mark: emoji on this comment.

    For any questions or help in reviewing the E2E test results, please reach out on the internal #quality Slack channel.

  • mentioned in issue #412709 (closed)

  • mentioned in issue #412714 (closed)

  • mentioned in issue #412720 (closed)

  • mentioned in issue #421199 (closed)

  • mentioned in issue #421200 (closed)

  • mentioned in issue #421202 (closed)

  • Loading
  • Loading
  • Loading
  • Loading
  • Loading
  • Loading
  • Loading
  • Loading
  • Loading
  • Loading
  • Please register or sign in to reply
    Loading