Skip to content

Add pause control strategy for ClickHouse migrations

What does this MR do and why?

This MR does the following (note that each commit can be reviewed independently):

  • Pauses ClickHouse workers based on presence of ClickHouse::MigrationSupport::ExclusiveLock::MIGRATION_LEASE_KEY in Redis.
  • Implements a new ClickHouse::MigrationSupport::SidekiqMiddleware middleware that registers ClickHouse workers in a click_house:workers:active_workers Redis list with an appropriate TTL score while the worker is active.
  • Implements a ClickHouseWorker concern that enables ClickHouse::MigrationSupport::SidekiqMiddleware.
  • Ensures that Redis list is empty before starting migration, i.e. ZRANGEBYSCORE click_house:workers:active_workers {now} +inf returns empty.
  • Adds a documentation section explaining how to implement ClickHouse Sidekiq workers.

Closes #428274 (closed)

FF rollout issue: #433389 (closed)

Screenshots or screen recordings

Screenshots are required for UI changes, and strongly recommended for all other merge requests.

Scenario Screenshot Notes
Starting a migration while CH workers are active image
image
The dots represent sleep periods, and we can see the exclusive lock taken by the migration lock, another by the worker, and the CH worker registration (ZRANGE).
Running a CH job while a migration is underway image The new pause control strategy kicks in, since "gitlab:exclusive_lease:click_house:migrations" key is present in Redis.
A cron job getting resumed after a pause image

How to set up and validate locally

Numbered steps to set up and validate the change are strongly suggested.

NOTE: Useful commands for testing:

  • Rails CLI
    • Gitlab::SidekiqMiddleware::PauseControl::PauseControlService.has_jobs_in_waiting_queue?('click_house/ci_finished_builds_sync_worker')
  • Redis CLI:
    • KEYS gitlab:exclusive_lease:*: check if there are exclusive locks in place
    • ZRANGE click_house:workers:active_workers 0 +inf BYSCORE WITHSCORES: check which CH runners have been registered by ClickHouseWorker module
  1. (Optional): Apply the following patch to the local branch. It will help testing, since it will avoid consuming sync events from the PG database into the CH database, and will also make the migration command take at least 5 minutes:

    Patch to help test
    diff --git a/app/workers/ci/schedule_unlock_pipelines_in_queue_cron_worker.rb b/app/workers/ci/schedule_unlock_pipelines_in_queue_cron_worker.rb
    index 1a5933261200..c0d15dc8987e 100644
    --- a/app/workers/ci/schedule_unlock_pipelines_in_queue_cron_worker.rb
    +++ b/app/workers/ci/schedule_unlock_pipelines_in_queue_cron_worker.rb
    @@ -15,7 +15,7 @@ class ScheduleUnlockPipelinesInQueueCronWorker
         idempotent!
     
         def perform(...)
    -      Ci::UnlockPipelinesInQueueWorker.perform_with_capacity(...)
    +      # Ci::UnlockPipelinesInQueueWorker.perform_with_capacity(...)
         end
       end
     end
    diff --git a/app/workers/concerns/click_house_worker.rb b/app/workers/concerns/click_house_worker.rb
    index 483bdb712fd6..544dffeefea8 100644
    --- a/app/workers/concerns/click_house_worker.rb
    +++ b/app/workers/concerns/click_house_worker.rb
    @@ -30,6 +30,6 @@ def click_house_migration_lock(ttl)
         pause_control :click_house_migration
       end
     
    -  DEFAULT_CLICKHOUSE_WORKER_TTL = 30.minutes
    +  DEFAULT_CLICKHOUSE_WORKER_TTL = 3.minutes
       CLICKHOUSE_ACTIVE_WORKERS_KEY = 'click_house:workers:active_workers'
     end
    diff --git a/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb b/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb
    index 617c9f0cb981..b2b4b4cdc668 100644
    --- a/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb
    +++ b/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb
    @@ -21,6 +21,8 @@ def initialize(worker_index: 0, total_workers: 1)
           end
     
           def execute
    +        # binding.pry_shell
    +
             unless enabled?
               return ServiceResponse.error(
                 message: 'Feature ci_data_ingestion_to_click_house is disabled',
    @@ -78,15 +80,15 @@ def insert_new_finished_builds
               csv_builder.render do |tempfile|
                 next if csv_builder.rows_written == 0
     
    -            File.open(tempfile.path) do |f|
    -              ClickHouse::Client.insert_csv(INSERT_FINISHED_BUILDS_QUERY, f, :main)
    -            end
    +            # File.open(tempfile.path) do |f|
    +            #   ClickHouse::Client.insert_csv(INSERT_FINISHED_BUILDS_QUERY, f, :main)
    +            # end
               end
             end
     
             {
    -          records_inserted:
    -            Ci::FinishedBuildChSyncEvent.primary_key_in(@processed_record_ids).update_all(processed: true),
    +          records_inserted: 0,
    +            # Ci::FinishedBuildChSyncEvent.primary_key_in(@processed_record_ids).update_all(processed: true),
               reached_end_of_table: @reached_end_of_table
             }
           end
    diff --git a/ee/app/workers/click_house/ci_finished_builds_sync_cron_worker.rb b/ee/app/workers/click_house/ci_finished_builds_sync_cron_worker.rb
    index 46fcda0fc880..0bb176d8ae13 100644
    --- a/ee/app/workers/click_house/ci_finished_builds_sync_cron_worker.rb
    +++ b/ee/app/workers/click_house/ci_finished_builds_sync_cron_worker.rb
    @@ -16,6 +16,8 @@ def perform(*args)
           return unless job_version == 2
           return unless Feature.enabled?(:ci_data_ingestion_to_click_house)
     
    +      # binding.pry_shell
    +
           total_workers = args.first || 1
     
           total_workers.times do |worker_index|
    diff --git a/ee/app/workers/click_house/ci_finished_builds_sync_worker.rb b/ee/app/workers/click_house/ci_finished_builds_sync_worker.rb
    index 42bc4110d276..28d0a382e761 100644
    --- a/ee/app/workers/click_house/ci_finished_builds_sync_worker.rb
    +++ b/ee/app/workers/click_house/ci_finished_builds_sync_worker.rb
    @@ -12,6 +12,8 @@ class CiFinishedBuildsSyncWorker
         loggable_arguments 1, 2
     
         def perform(worker_index = 0, total_workers = 1)
    +      # binding.pry_shell
    +
           response = ::ClickHouse::DataIngestion::CiFinishedBuildsSyncService.new(
             worker_index: worker_index, total_workers: total_workers
           ).execute
    diff --git a/lib/click_house/migration_support/migrator.rb b/lib/click_house/migration_support/migrator.rb
    index ef100179a4ef..84525afedef1 100644
    --- a/lib/click_house/migration_support/migrator.rb
    +++ b/lib/click_house/migration_support/migrator.rb
    @@ -47,6 +47,9 @@ def migrate
             in_lock(LEASE_KEY, ttl: LOCK_DURATION, retries: 5, sleep_sec: RETRY_DELAY) do
               wait_until_workers_inactive(WORKERS_WAIT_DURATION.from_now)
     
    +          puts "Sleeping 5 minutes while holding lock..."
    +          sleep 5.minutes
    +
               migrate_without_lock
             end
           rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError => e
    @@ -107,6 +110,7 @@ def wait_until_workers_inactive(worker_wait_ttl)
                 # Check if any workers are registered with a future expiry date
                 min = Time.current.to_i
                 workers_active = redis.zrange(key, min, '+inf', by_score: true, limit: [0, 1]).any?
    +            print '.' if workers_active
     
                 # Expire keys in the past
                 redis.zremrangebyscore(key, 0, min)
    @@ -119,6 +123,8 @@ def wait_until_workers_inactive(worker_wait_ttl)
             end
     
             raise ClickHouse::MigrationSupport::LockError, 'Timed out waiting for active workers' if workers_active
    +
    +        puts
           end
     
           # Used for running a specific migration.
  2. Enable FFs:

    Feature.enable(:wait_for_clickhouse_workers_during_migration)
    Feature.enable(:pause_clickhouse_workers_during_migration)
  3. Ensure you have a few thousands of finished builds. If needed, go to the shell in your GDK gitlab directory and run bundle exec rake "gitlab:seed:runner_fleet". This will seed your GDK with some runners and jobs required for testing this MR. To check if you have data to run with, you can use:

    SELECT processed, partition, COUNT(*)
      FROM p_ci_finished_build_ch_sync_events
      GROUP BY processed, partition
      ORDER BY partition, processed DESC;
  4. Ensure you have sync events for each of those builds, so that the ClickHouse::CiFinishedBuildsSyncCronWorker has something to do:

    Ci::Build.include EachBatch; Ci::Build.finished.where.not(finished_at: nil).order(finished_at: :asc).each_batch(of: 20000) { |batch| batch = batch.map { |build| Ci::FinishedBuildChSyncEvent.new(build_id: build.id, build_finished_at: build.finished_at) }; Ci::FinishedBuildChSyncEvent.transaction { batch.each(&:save); } }
  5. Either wait for the next time the click_house_ci_finished_builds_sync_worker cron job starts (every 3 minutes), or start it yourself:

    ::ClickHouse::CiFinishedBuildsSyncCronWorker.perform_async(1)
  6. Run migrations:

    bundle exec rake gitlab:clickhouse:migrate

The migrator should wait until the worker has finished its job, and new CH workers should not start during migrations (since ZRANGE click_house:workers:active_workers 0 +inf BYSCORE WITHSCORES returns a non-empty collection).

MR acceptance checklist

This checklist encourages us to confirm any changes have been analyzed to reduce risks in quality, performance, reliability, security, and maintainability.

Edited by Pedro Pombeiro

Merge request reports