Skip to content
Snippets Groups Projects
Commit 9aef08f3 authored by Adam Hegyi's avatar Adam Hegyi
Browse files

Merge branch 'distinct_each_batch_strategy' into 'master'

Add the LooseIndexScanBatchingStrategy strategy

See merge request !91041
parents 289544ba 56ee01c9
No related branches found
No related tags found
1 merge request!91041Add the LooseIndexScanBatchingStrategy strategy
Pipeline #586792199 passed
......@@ -314,6 +314,66 @@ background migration.
After the batched migration is completed, you can safely depend on the
data in `routes.namespace_id` being populated.
### Batching over non-distinct columns
The default batching strategy provides an efficient way to iterate over primary key columns.
However, if you need to iterate over columns where values are not unique, you must use a
different batching strategy.
The `LooseIndexScanBatchingStrategy` batching strategy uses a special version of [`EachBatch`](../iterating_tables_in_batches.md#loose-index-scan-with-distinct_each_batch)
to provide efficient and stable iteration over the distinct column values.
This example shows a batched background migration where the `issues.project_id` column is used as
the batching column.
Database post-migration:
```ruby
class ProjectsWithIssuesMigration < Gitlab::Database::Migration[2.0]
MIGRATION = 'BatchProjectsWithIssues'
INTERVAL = 2.minutes
BATCH_SIZE = 5000
SUB_BATCH_SIZE = 500
restrict_gitlab_migration gitlab_schema: :gitlab_main
disable_ddl_transaction!
def up
queue_batched_background_migration(
MIGRATION,
:issues,
:project_id,
job_interval: INTERVAL,
batch_size: BATCH_SIZE,
batch_class_name: 'LooseIndexScanBatchingStrategy', # Override the default batching strategy
sub_batch_size: SUB_BATCH_SIZE
)
end
def down
delete_batched_background_migration(MIGRATION, :issues, :project_id, [])
end
end
```
Implementing the background migration class:
```ruby
module Gitlab
module BackgroundMigration
class BatchProjectsWithIssues < Gitlab::BackgroundMigration::BatchedMigrationJob
include Gitlab::Database::DynamicModelHelpers
def perform
distinct_each_batch(operation_name: :backfill_issues) do |batch|
project_ids = batch.pluck(batch_column)
# do something with the distinct project_ids
end
end
end
end
end
```
## Testing
Writing tests is required for:
......
......@@ -44,7 +44,19 @@ def each_sub_batch(operation_name: :default, batching_arguments: {}, batching_sc
end
end
def parent_batch_relation(batching_scope)
def distinct_each_batch(operation_name: :default, batching_arguments: {})
all_batching_arguments = { column: batch_column, of: sub_batch_size }.merge(batching_arguments)
parent_batch_relation.distinct_each_batch(**all_batching_arguments) do |relation|
batch_metrics.instrument_operation(operation_name) do
yield relation
end
sleep([pause_ms, 0].max * 0.001)
end
end
def parent_batch_relation(batching_scope = nil)
parent_relation = define_batchable_model(batch_table, connection: connection)
.where(batch_column => start_id..end_id)
......
# frozen_string_literal: true
module Gitlab
module BackgroundMigration
module BatchingStrategies
# This strategy provides an efficient way to iterate over columns with non-distinct values.
# A common use case would be iterating over a foreign key columns, for example issues.project_id
class LooseIndexScanBatchingStrategy < BaseStrategy
include Gitlab::Database::DynamicModelHelpers
# Finds and returns the next batch in the table.
#
# table_name - The table to batch over
# column_name - The column to batch over
# batch_min_value - The minimum value which the next batch will start at
# batch_size - The size of the next batch
# job_arguments - The migration job arguments
# job_class - The migration job class
def next_batch(table_name, column_name, batch_min_value:, batch_size:, job_arguments:, job_class: nil)
model_class = define_batchable_model(table_name, connection: connection)
quoted_column_name = model_class.connection.quote_column_name(column_name)
relation = model_class.where("#{quoted_column_name} >= ?", batch_min_value)
next_batch_bounds = nil
relation.distinct_each_batch(of: batch_size, column: column_name) do |batch| # rubocop:disable Lint/UnreachableLoop
next_batch_bounds = batch.pluck(Arel.sql("MIN(#{quoted_column_name}), MAX(#{quoted_column_name})")).first
break
end
next_batch_bounds
end
end
end
end
end
......@@ -92,5 +92,69 @@ def perform(*job_arguments)
end
end
end
context 'when the subclass uses distinct each batch' do
let(:job_instance) do
job_class.new(start_id: 1,
end_id: 100,
batch_table: '_test_table',
batch_column: 'from_column',
sub_batch_size: 2,
pause_ms: 10,
connection: connection)
end
let(:job_class) do
Class.new(described_class) do
def perform(*job_arguments)
distinct_each_batch(operation_name: :insert) do |sub_batch|
sub_batch.pluck(:from_column).each do |value|
connection.execute("INSERT INTO _test_insert_table VALUES (#{value})")
end
sub_batch.size
end
end
end
end
let(:test_table) { table(:_test_table) }
let(:test_insert_table) { table(:_test_insert_table) }
before do
allow(job_instance).to receive(:sleep)
connection.create_table :_test_table do |t|
t.timestamps_with_timezone null: false
t.integer :from_column, null: false
end
connection.create_table :_test_insert_table, id: false do |t|
t.integer :to_column
t.index :to_column, unique: true
end
test_table.create!(id: 1, from_column: 5)
test_table.create!(id: 2, from_column: 10)
test_table.create!(id: 3, from_column: 10)
test_table.create!(id: 4, from_column: 5)
test_table.create!(id: 5, from_column: 15)
end
after do
connection.drop_table(:_test_table)
connection.drop_table(:_test_insert_table)
end
it 'calls the operation for each distinct batch' do
expect { perform_job }.to change { test_insert_table.pluck(:to_column) }.from([]).to([5, 10, 15])
end
it 'stores the affected rows' do
perform_job
expect(job_instance.batch_metrics.affected_rows[:insert]).to contain_exactly(2, 1)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::BackgroundMigration::BatchingStrategies::LooseIndexScanBatchingStrategy, '#next_batch' do
let(:batching_strategy) { described_class.new(connection: ActiveRecord::Base.connection) }
let(:namespaces) { table(:namespaces) }
let(:projects) { table(:projects) }
let(:issues) { table(:issues) }
let!(:namespace1) { namespaces.create!(name: 'ns1', path: 'ns1') }
let!(:namespace2) { namespaces.create!(name: 'ns2', path: 'ns2') }
let!(:namespace3) { namespaces.create!(name: 'ns3', path: 'ns3') }
let!(:namespace4) { namespaces.create!(name: 'ns4', path: 'ns4') }
let!(:namespace5) { namespaces.create!(name: 'ns5', path: 'ns5') }
let!(:project1) { projects.create!(name: 'p1', namespace_id: namespace1.id, project_namespace_id: namespace1.id) }
let!(:project2) { projects.create!(name: 'p2', namespace_id: namespace2.id, project_namespace_id: namespace2.id) }
let!(:project3) { projects.create!(name: 'p3', namespace_id: namespace3.id, project_namespace_id: namespace3.id) }
let!(:project4) { projects.create!(name: 'p4', namespace_id: namespace4.id, project_namespace_id: namespace4.id) }
let!(:project5) { projects.create!(name: 'p5', namespace_id: namespace5.id, project_namespace_id: namespace5.id) }
let!(:issue1) { issues.create!(title: 'title', description: 'description', project_id: project2.id) }
let!(:issue2) { issues.create!(title: 'title', description: 'description', project_id: project1.id) }
let!(:issue3) { issues.create!(title: 'title', description: 'description', project_id: project2.id) }
let!(:issue4) { issues.create!(title: 'title', description: 'description', project_id: project3.id) }
let!(:issue5) { issues.create!(title: 'title', description: 'description', project_id: project2.id) }
let!(:issue6) { issues.create!(title: 'title', description: 'description', project_id: project4.id) }
let!(:issue7) { issues.create!(title: 'title', description: 'description', project_id: project5.id) }
it { expect(described_class).to be < Gitlab::BackgroundMigration::BatchingStrategies::BaseStrategy }
context 'when starting on the first batch' do
it 'returns the bounds of the next batch' do
batch_bounds = batching_strategy
.next_batch(:issues, :project_id, batch_min_value: project1.id, batch_size: 2, job_arguments: [])
expect(batch_bounds).to eq([project1.id, project2.id])
end
end
context 'when additional batches remain' do
it 'returns the bounds of the next batch' do
batch_bounds = batching_strategy
.next_batch(:issues, :project_id, batch_min_value: project2.id, batch_size: 3, job_arguments: [])
expect(batch_bounds).to eq([project2.id, project4.id])
end
end
context 'when on the final batch' do
it 'returns the bounds of the next batch' do
batch_bounds = batching_strategy
.next_batch(:issues, :project_id, batch_min_value: project4.id, batch_size: 3, job_arguments: [])
expect(batch_bounds).to eq([project4.id, project5.id])
end
end
context 'when no additional batches remain' do
it 'returns nil' do
batch_bounds = batching_strategy
.next_batch(:issues, :project_id, batch_min_value: project5.id + 1, batch_size: 1, job_arguments: [])
expect(batch_bounds).to be_nil
end
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment