Skip to content
Snippets Groups Projects
Commit 805b9ef2 authored by Marco (Gregorius)'s avatar Marco (Gregorius) :palm_tree:
Browse files

Update default initializer value for Sidekiq routing_rules

We want to update self-managed instances to route all Sidekiq jobs to
the 'default' queue. By updating the Rails initializer, we wouldn't need
to update all other installations such as Omnibus and Helm Chart.
https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1491

Changelog: changed
parent 50994604
No related branches found
No related tags found
No related merge requests found
Showing
with 43 additions and 31 deletions
......@@ -790,7 +790,11 @@
#
Settings['sidekiq'] ||= Settingslogic.new({})
Settings['sidekiq']['log_format'] ||= 'default'
Settings['sidekiq']['routing_rules'] ||= []
# Route all jobs to 'default' queue. This setting is meant for self-managed instances use to keep things simple.
# See https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1491
sidekiq_routing_rules = Settings['sidekiq']['routing_rules']
Settings['sidekiq']['routing_rules'] = [['*', 'default']] if sidekiq_routing_rules.nil? || sidekiq_routing_rules&.empty?
#
# GitLab Shell
......
......@@ -14,7 +14,9 @@
described_class.new.execute(project_ids: [project1.id, project2.id], namespace_ids: [3, 4])
end
jobs = Sidekiq::Queues[ElasticNamespaceIndexerWorker.queue]
jobs = Sidekiq::Queues[ElasticNamespaceIndexerWorker.queue].select do |job|
job["class"] == ElasticNamespaceIndexerWorker.name
end
expect(jobs.size).to eq(2)
expect(jobs[0]['args']).to eq([3, 'index'])
......
......@@ -15,6 +15,6 @@ def self.name
end
it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('geo:dummy')
expect(worker.generated_queue_name).to eq('geo:dummy')
end
end
......@@ -15,12 +15,12 @@
end
it 'disables retrying of failed jobs' do
expect(subject.sidekiq_options_hash).to match(
expect(subject.sidekiq_options_hash).to include(
'retry' => false,
'version' => an_instance_of(Integer),
'queue' => 'geo:geo_repository_verification_primary_single',
'queue_namespace' => :geo
)
expect(subject.class.generated_queue_name).to eq('geo:geo_repository_verification_primary_single')
end
describe '#perform' do
......
......@@ -15,12 +15,12 @@
end
it 'disables retrying of failed jobs' do
expect(subject.sidekiq_options_hash).to match(
expect(subject.sidekiq_options_hash).to include(
'retry' => false,
'version' => an_instance_of(Integer),
'queue' => 'geo:geo_repository_verification_secondary_single',
'queue_namespace' => :geo
)
expect(subject.class.generated_queue_name).to eq('geo:geo_repository_verification_secondary_single')
end
describe '#perform' do
......
......@@ -15,9 +15,9 @@
it 'uses a Geo queue' do
expect(job.sidekiq_options_hash).to include(
'queue' => 'geo:geo_reverification_batch',
'queue_namespace' => :geo
)
expect(job.class.generated_queue_name).to eq('geo:geo_reverification_batch')
end
describe '#perform' do
......
......@@ -22,9 +22,9 @@
it 'uses a cronjob queue' do
expect(subject.sidekiq_options_hash).to include(
'queue' => 'cronjob:geo_secondary_registry_consistency',
'queue_namespace' => :cronjob
)
expect(subject.class.generated_queue_name).to include('cronjob:geo_secondary_registry_consistency')
end
describe '#perform' do
......
......@@ -13,9 +13,9 @@
it 'uses a cronjob queue' do
expect(subject.sidekiq_options_hash).to include(
'queue' => 'cronjob:geo_secondary_usage_data_cron',
'queue_namespace' => :cronjob
)
expect(subject.class.generated_queue_name).to eq('cronjob:geo_secondary_usage_data_cron')
end
it 'does not run for primary nodes' do
......
......@@ -17,8 +17,8 @@
it 'uses a cronjob queue' do
expect(subject.sidekiq_options_hash).to include(
'queue' => 'cronjob:geo_sync_timeout_cron',
'queue_namespace' => :cronjob
)
expect(subject.class.generated_queue_name).to eq('cronjob:geo_sync_timeout_cron')
end
end
......@@ -17,9 +17,9 @@
it 'uses a Geo queue' do
expect(job.sidekiq_options_hash).to include(
'queue' => 'geo:geo_verification_batch',
'queue_namespace' => :geo
)
expect(job.class.generated_queue_name).to eq('geo:geo_verification_batch')
end
describe '#perform' do
......
......@@ -16,8 +16,8 @@
it 'uses a cronjob queue' do
expect(subject.sidekiq_options_hash).to include(
'queue' => 'cronjob:geo_verification_cron',
'queue_namespace' => :cronjob
)
expect(subject.class.generated_queue_name).to include('cronjob:geo_verification_cron')
end
end
......@@ -18,9 +18,9 @@
it 'uses a geo queue' do
expect(subject.sidekiq_options_hash).to include(
'queue' => 'geo:geo_verification_state_backfill',
'queue_namespace' => :geo
)
expect(subject.class.generated_queue_name).to eq('geo:geo_verification_state_backfill')
end
describe '#perform' do
......
......@@ -9,9 +9,9 @@
it 'uses a Geo queue' do
expect(described_class.new.sidekiq_options_hash).to include(
'queue' => 'geo:geo_verification_timeout',
'queue_namespace' => :geo
)
expect(described_class.generated_queue_name).to eq('geo:geo_verification_timeout')
end
describe 'perform' do
......
......@@ -150,4 +150,10 @@
expect(Settings.encrypted('tmp/tests/test.enc').read).to be_empty
end
end
describe '.sidekiq.routing_rules' do
it "defaults to [['*', 'default']]" do
expect(described_class.sidekiq.routing_rules).to eq([['*', 'default']])
end
end
end
......@@ -41,10 +41,10 @@ def self.work
describe '#call' do
it 'removes the stored job from redis before execution' do
bare_job = { 'class' => 'TestDeduplicationWorker', 'args' => ['hello'] }
job_definition = Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob.new(bare_job.dup, 'test_deduplication')
job_definition = Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob.new(bare_job.dup, 'default')
expect(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob)
.to receive(:new).with(a_hash_including(bare_job), 'test_deduplication')
.to receive(:new).with(a_hash_including(bare_job), 'default')
.and_return(job_definition).twice # once in client middleware
expect(job_definition).to receive(:delete!).ordered.and_call_original
......@@ -60,10 +60,10 @@ def self.work
it 'removes the stored job from redis after execution' do
bare_job = { 'class' => 'TestDeduplicationWorker', 'args' => ['hello'] }
job_definition = Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob.new(bare_job.dup, 'test_deduplication')
job_definition = Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob.new(bare_job.dup, 'default')
expect(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob)
.to receive(:new).with(a_hash_including(bare_job), 'test_deduplication')
.to receive(:new).with(a_hash_including(bare_job), 'default')
.and_return(job_definition).twice # once in client middleware
expect(TestDeduplicationWorker).to receive(:work).ordered.and_call_original
......
......@@ -46,7 +46,7 @@ def clear_queues
expect(migrator.execute('PostReceive' => 'new_queue')).to eq(scanned: 3, migrated: 0)
expect(set_after.length).to eq(3)
expect(set_after.map(&:first)).to all(include('queue' => 'authorized_projects',
expect(set_after.map(&:first)).to all(include('queue' => 'default',
'class' => 'AuthorizedProjectsWorker'))
end
end
......@@ -62,7 +62,7 @@ def clear_queues
if item['class'] == 'AuthorizedProjectsWorker'
expect(item).to include('queue' => 'new_queue', 'args' => [i])
else
expect(item).to include('queue' => 'post_receive', 'args' => [i])
expect(item).to include('queue' => 'default', 'args' => [i])
end
expect(score).to be_within(schedule_jitter).of(i.succ.hours.from_now.to_i)
......@@ -116,7 +116,7 @@ def clear_queues
expect(migrator.execute('PostReceive' => 'new_queue')).to eq(scanned: 4, migrated: 0)
expect(set_after.length).to eq(3)
expect(set_after.map(&:first)).to all(include('queue' => 'authorized_projects'))
expect(set_after.map(&:first)).to all(include('queue' => 'default'))
end
end
......@@ -138,7 +138,7 @@ def clear_queues
expect(migrator.execute('PostReceive' => 'new_queue')).to eq(scanned: 4, migrated: 1)
expect(set_after.group_by { |job| job.first['queue'] }.transform_values(&:count))
.to eq('authorized_projects' => 6, 'new_queue' => 1)
.to eq('default' => 6, 'new_queue' => 1)
end
it 'iterates through the entire set of jobs' do
......
......@@ -4,15 +4,15 @@
RSpec.describe Gitlab::SidekiqQueue, :clean_gitlab_redis_queues do
around do |example|
Sidekiq::Queue.new('default').clear
Sidekiq::Queue.new('foobar').clear
Sidekiq::Testing.disable!(&example)
Sidekiq::Queue.new('default').clear
Sidekiq::Queue.new('foobar').clear
end
def add_job(args, user:, klass: 'AuthorizedProjectsWorker')
Sidekiq::Client.push(
'class' => klass,
'queue' => 'default',
'queue' => 'foobar',
'args' => args,
'meta.user' => user.username
)
......@@ -20,7 +20,7 @@ def add_job(args, user:, klass: 'AuthorizedProjectsWorker')
describe '#drop_jobs!' do
shared_examples 'queue processing' do
let(:sidekiq_queue) { described_class.new('default') }
let(:sidekiq_queue) { described_class.new('foobar') }
let_it_be(:sidekiq_queue_user) { create(:user) }
before do
......@@ -80,7 +80,7 @@ def add_job(args, user:, klass: 'AuthorizedProjectsWorker')
it 'raises NoMetadataError' do
add_job([1], user: create(:user))
expect { described_class.new('default').drop_jobs!({ username: 'sidekiq_queue_user' }, timeout: 1) }
expect { described_class.new('foobar').drop_jobs!({ username: 'sidekiq_queue_user' }, timeout: 1) }
.to raise_error(described_class::NoMetadataError)
end
end
......
......@@ -14,6 +14,6 @@ def self.name
end
end
it { expect(worker.queue).to eq('cluster_agent:example') }
it { expect(worker.generated_queue_name).to eq('cluster_agent:example') }
it { expect(worker.get_feature_category).to eq(:kubernetes_management) }
end
......@@ -15,7 +15,7 @@ def self.name
end
it 'sets a default pipelines queue automatically' do
expect(worker.sidekiq_options['queue'])
expect(worker.generated_queue_name)
.to eq 'gcp_cluster:dummy'
end
end
......@@ -41,7 +41,7 @@ def perform(identifier, *args)
end
it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob:dummy')
expect(worker.generated_queue_name).to eq('cronjob:dummy')
end
it 'disables retrying of failed jobs' do
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment