Skip to content
Snippets Groups Projects

Migrate Sidekiq jobs based on routing rules

Merged Marco Gregorius requested to merge pdm-migrate-sidekiq-jobs-to-default into master
All threads resolved!
Files
2
@@ -3,44 +3,67 @@
@@ -3,44 +3,67 @@
class MigrateSidekiqJobs < Gitlab::Database::Migration[2.0]
class MigrateSidekiqJobs < Gitlab::Database::Migration[2.0]
# Migrate jobs that don't belong to current routing rules
# Migrate jobs that don't belong to current routing rules
# https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1930
# https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1930
 
class SidekiqMigrateJobs
 
LOG_FREQUENCY = 1_000
def up
attr_reader :logger
queues = scan_queues
migrate_jobs(queues)
end
def down
def initialize(logger: nil)
# no-op
@logger = logger
end
end
 
 
# Migrates jobs from queues that are outside the mappings
 
# mappings is a hash of WorkerClassName => target_queue_name
 
def migrate_queues(mappings)
 
routing_rules_queues = mappings.values.uniq
 
logger&.info("List of queues based on routing rules: #{routing_rules_queues}")
 
Sidekiq.redis do |conn| # rubocop:disable Cop/SidekiqRedisCall
 
conn.scan_each(match: "queue:*", type: 'list') do |key|
 
queue_from = key.split(':').last
 
next if routing_rules_queues.include? queue_from
 
 
logger&.info("Migrating #{queue_from} queue")
private
migrated = 0
 
while queue_length(queue_from) > 0
 
begin
 
if migrated >= 0 && migrated % LOG_FREQUENCY == 0
 
logger&.info("Migrating from #{queue_from}. Total: #{queue_length(queue_from)}. Migrated: #{migrated}.")
 
end
def scan_queues
job = conn.rpop "queue:#{queue_from}"
queues = []
job_hash = Sidekiq.load_json job
routing_rules_queues = Settings.sidekiq.routing_rules.map { |query, rule| rule }
next unless mappings.has_key?(job_hash['class'])
Sidekiq.redis do |conn| # rubocop:disable Cop/SidekiqRedisCall
conn.scan_each(match: "queue:*", type: 'list') do |key|
destination_queue = mappings[job_hash['class']]
queue = key.split(':').last
conn.lpush("queue:#{destination_queue}", job)
queues << queue unless routing_rules_queues.include? queue
migrated += 1
 
rescue JSON::ParserError
 
logger&.error("Unmarshal JSON payload from MigrateSidekiqJobs failed. Job: #{job}")
 
next
 
end
 
end
 
logger&.info("Finished migrating #{queue_from} queue")
 
end
end
end
end
end
queues.uniq
end
def migrate_jobs(queues_from)
private
queues_from.each do |queue_from|
while sidekiq_queue_length(queue_from) > 0
def queue_length(queue_name)
Sidekiq.redis do |conn| # rubocop:disable Cop/SidekiqRedisCall
Sidekiq.redis do |conn| # rubocop:disable Cop/SidekiqRedisCall
payload = conn.rpop "queue:#{queue_from}"
conn.llen("queue:#{queue_name}")
parsed = Gitlab::Json.parse payload
next if parsed['class'].nil?
worker_class = Object.const_get(parsed['class'], false)
conn.lpush("queue:#{worker_class.queue}", payload) unless worker_class.queue == queue_from
rescue JSON::ParserError
next
end
end
end
end
end
end
end
 
 
def up
 
mappings = Gitlab::SidekiqConfig.worker_queue_mappings
 
logger = ::Gitlab::BackgroundMigration::Logger.build
 
SidekiqMigrateJobs.new(logger: logger).migrate_queues(mappings)
 
end
 
 
def down
 
# no-op
 
end
end
end
Loading