Skip to content
Snippets Groups Projects
Commit 9f6de9b8 authored by Diogo Frazão's avatar Diogo Frazão 🌴
Browse files

Merge branch 'batched-background-runner-wrong-connection' into 'master'

Test batched background migrations with the correct connection

Closes gitlab-org/database-team/gitlab-com-database-testing#79

See merge request !100050



Merged-by: default avatarDiogo Frazão <dfrazao@gitlab.com>
Approved-by: default avatarPedro Pombeiro (OOO from Oct 10th-12th) <noreply@pedro.pombei.ro>
Approved-by: default avatarMatt Kasa <mkasa@gitlab.com>
Approved-by: default avatarAlexandru Croitor <acroitor@gitlab.com>
Approved-by: default avatarDiogo Frazão <dfrazao@gitlab.com>
Co-authored-by: Simon Tomlinson's avatarSimon Tomlinson <stomlinson@gitlab.com>
parents 67ec7edf e93bab2e
No related branches found
No related tags found
1 merge request!100050Test batched background migrations with the correct connection
Pipeline #666931356 failed
Pipeline: GitLab

#666948451

    ......@@ -4,10 +4,11 @@ module Gitlab
    module Database
    module Migrations
    class BaseBackgroundRunner
    attr_reader :result_dir
    attr_reader :result_dir, :connection
    def initialize(result_dir:)
    def initialize(result_dir:, connection:)
    @result_dir = result_dir
    @connection = connection
    end
    def jobs_by_migration_name
    ......@@ -45,7 +46,7 @@ def run_jobs_for_migration(migration_name:, jobs:, run_until:)
    instrumentation.observe(version: nil,
    name: batch_names.next,
    connection: ActiveRecord::Migration.connection) do
    connection: connection) do
    run_job(j)
    end
    end
    ......
    ......@@ -5,7 +5,7 @@ module Database
    module Migrations
    class TestBackgroundRunner < BaseBackgroundRunner
    def initialize(result_dir:)
    super(result_dir: result_dir)
    super(result_dir: result_dir, connection: ActiveRecord::Migration.connection)
    @job_coordinator = Gitlab::BackgroundMigration.coordinator_for_database(Gitlab::Database::MAIN_DATABASE_NAME)
    end
    ......
    ......@@ -5,65 +5,73 @@ module Database
    module Migrations
    class TestBatchedBackgroundRunner < BaseBackgroundRunner
    include Gitlab::Database::DynamicModelHelpers
    attr_reader :connection
    def initialize(result_dir:, connection:)
    super(result_dir: result_dir)
    super(result_dir: result_dir, connection: connection)
    @connection = connection
    end
    def jobs_by_migration_name
    Gitlab::Database::BackgroundMigration::BatchedMigration
    .executable
    .created_after(3.hours.ago) # Simple way to exclude migrations already running before migration testing
    .to_h do |migration|
    batching_strategy = migration.batch_class.new(connection: connection)
    smallest_batch_start = migration.next_min_value
    table_max_value = define_batchable_model(migration.table_name, connection: connection)
    .maximum(migration.column_name)
    largest_batch_start = table_max_value - migration.batch_size
    # variance is the portion of the batch range that we shrink between variance * 0 and variance * 1
    # to pick actual batches to sample.
    variance = largest_batch_start - smallest_batch_start
    batch_starts = uniform_fractions
    .lazy # frac varies from 0 to 1, values in smallest_batch_start..largest_batch_start
    .map { |frac| (variance * frac).to_i + smallest_batch_start }
    # Track previously run batches so that we stop sampling if a new batch would intersect an older one
    completed_batches = []
    jobs_to_sample = batch_starts
    # Stop sampling if a batch would intersect a previous batch
    .take_while { |start| completed_batches.none? { |batch| batch.cover?(start) } }
    .map do |batch_start|
    next_bounds = batching_strategy.next_batch(
    migration.table_name,
    migration.column_name,
    batch_min_value: batch_start,
    batch_size: migration.batch_size,
    job_arguments: migration.job_arguments
    )
    batch_min, batch_max = next_bounds
    job = migration.create_batched_job!(batch_min, batch_max)
    completed_batches << (batch_min..batch_max)
    Gitlab::Database::SharedModel.using_connection(connection) do
    Gitlab::Database::BackgroundMigration::BatchedMigration
    .executable
    .created_after(3.hours.ago) # Simple way to exclude migrations already running before migration testing
    .to_h do |migration|
    batching_strategy = migration.batch_class.new(connection: connection)
    smallest_batch_start = migration.next_min_value
    table_max_value = define_batchable_model(migration.table_name, connection: connection)
    .maximum(migration.column_name)
    largest_batch_start = table_max_value - migration.batch_size
    # variance is the portion of the batch range that we shrink between variance * 0 and variance * 1
    # to pick actual batches to sample.
    variance = largest_batch_start - smallest_batch_start
    batch_starts = uniform_fractions
    .lazy # frac varies from 0 to 1, values in smallest_batch_start..largest_batch_start
    .map { |frac| (variance * frac).to_i + smallest_batch_start }
    # Track previously run batches so that we stop sampling if a new batch would intersect an older one
    completed_batches = []
    jobs_to_sample = batch_starts
    # Stop sampling if a batch would intersect a previous batch
    .take_while { |start| completed_batches.none? { |batch| batch.cover?(start) } }
    .map do |batch_start|
    # The current block is lazily evaluated as part of the jobs_to_sample enumerable
    # so it executes after the enclosing using_connection block has already executed
    # Therefore we need to re-associate with the explicit connection again
    Gitlab::Database::SharedModel.using_connection(connection) do
    next_bounds = batching_strategy.next_batch(
    migration.table_name,
    migration.column_name,
    batch_min_value: batch_start,
    batch_size: migration.batch_size,
    job_arguments: migration.job_arguments
    )
    batch_min, batch_max = next_bounds
    job = migration.create_batched_job!(batch_min, batch_max)
    completed_batches << (batch_min..batch_max)
    job
    end
    end
    job
    [migration.job_class_name, jobs_to_sample]
    end
    [migration.job_class_name, jobs_to_sample]
    end
    end
    def run_job(job)
    Gitlab::Database::BackgroundMigration::BatchedMigrationWrapper.new(connection: connection).perform(job)
    Gitlab::Database::SharedModel.using_connection(connection) do
    Gitlab::Database::BackgroundMigration::BatchedMigrationWrapper.new(connection: connection).perform(job)
    end
    end
    def uniform_fractions
    ......
    ......@@ -3,6 +3,8 @@
    require 'spec_helper'
    RSpec.describe Gitlab::Database::Migrations::BaseBackgroundRunner, :freeze_time do
    let(:connection) { ApplicationRecord.connection }
    let(:result_dir) { Dir.mktmpdir }
    after do
    ......@@ -10,7 +12,7 @@
    end
    context 'subclassing' do
    subject { described_class.new(result_dir: result_dir) }
    subject { described_class.new(result_dir: result_dir, connection: connection) }
    it 'requires that jobs_by_migration_name be implemented' do
    expect { subject.jobs_by_migration_name }.to raise_error(NotImplementedError)
    ......
    ......@@ -6,106 +6,156 @@
    include Gitlab::Database::MigrationHelpers
    include Database::MigrationTestingHelpers
    let(:result_dir) { Dir.mktmpdir }
    after do
    FileUtils.rm_rf(result_dir)
    def queue_migration(
    job_class_name,
    batch_table_name,
    batch_column_name,
    *job_arguments,
    job_interval:,
    batch_size: Gitlab::Database::Migrations::BatchedBackgroundMigrationHelpers::BATCH_SIZE,
    sub_batch_size: Gitlab::Database::Migrations::BatchedBackgroundMigrationHelpers::SUB_BATCH_SIZE
    )
    batch_max_value = define_batchable_model(batch_table_name, connection: connection).maximum(batch_column_name)
    Gitlab::Database::SharedModel.using_connection(connection) do
    Gitlab::Database::BackgroundMigration::BatchedMigration.create!(
    job_class_name: job_class_name,
    table_name: batch_table_name,
    column_name: batch_column_name,
    job_arguments: job_arguments,
    interval: job_interval,
    min_value: Gitlab::Database::Migrations::BatchedBackgroundMigrationHelpers::BATCH_MIN_VALUE,
    max_value: batch_max_value,
    batch_class_name: Gitlab::Database::Migrations::BatchedBackgroundMigrationHelpers::BATCH_CLASS_NAME,
    batch_size: batch_size,
    sub_batch_size: sub_batch_size,
    status_event: :execute,
    max_batch_size: nil,
    gitlab_schema: gitlab_schema
    )
    end
    end
    let(:migration) do
    ActiveRecord::Migration.new.extend(Gitlab::Database::Migrations::BatchedBackgroundMigrationHelpers)
    where(:case_name, :base_model, :gitlab_schema) do
    [
    ['main database', ApplicationRecord, :gitlab_main],
    ['ci database', Ci::ApplicationRecord, :gitlab_ci]
    ]
    end
    let(:connection) { ApplicationRecord.connection }
    with_them do
    let(:result_dir) { Dir.mktmpdir }
    let(:table_name) { "_test_column_copying" }
    after do
    FileUtils.rm_rf(result_dir)
    end
    before do
    connection.execute(<<~SQL)
    CREATE TABLE #{table_name} (
    id bigint primary key not null,
    data bigint default 0
    );
    let(:connection) { base_model.connection }
    insert into #{table_name} (id) select i from generate_series(1, 1000) g(i);
    SQL
    let(:table_name) { "_test_column_copying" }
    allow(migration).to receive(:transaction_open?).and_return(false)
    end
    before do
    connection.execute(<<~SQL)
    CREATE TABLE #{table_name} (
    id bigint primary key not null,
    data bigint default 0
    );
    context 'running a real background migration' do
    it 'runs sampled jobs from the batched background migration' do
    migration.queue_batched_background_migration('CopyColumnUsingBackgroundMigrationJob',
    table_name, :id,
    :id, :data,
    batch_size: 100,
    job_interval: 5.minutes) # job_interval is skipped when testing
    # Expect that running sampling for this migration processes some of the rows. Sampling doesn't run
    # over every row in the table, so this does not completely migrate the table.
    expect { described_class.new(result_dir: result_dir, connection: connection).run_jobs(for_duration: 1.minute) }
    .to change { define_batchable_model(table_name).where('id IS DISTINCT FROM data').count }
    .by_at_most(-1)
    insert into #{table_name} (id) select i from generate_series(1, 1000) g(i);
    SQL
    end
    end
    context 'with jobs to run' do
    let(:migration_name) { 'TestBackgroundMigration' }
    context 'running a real background migration' do
    before do
    queue_migration('CopyColumnUsingBackgroundMigrationJob',
    table_name, :id,
    :id, :data,
    batch_size: 100,
    job_interval: 5.minutes) # job_interval is skipped when testing
    end
    it 'samples jobs' do
    calls = []
    define_background_migration(migration_name) do |*args|
    calls << args
    subject(:sample_migration) do
    described_class.new(result_dir: result_dir, connection: connection).run_jobs(for_duration: 1.minute)
    end
    migration.queue_batched_background_migration(migration_name, table_name, :id,
    job_interval: 5.minutes,
    batch_size: 100)
    it 'runs sampled jobs from the batched background migration' do
    # Expect that running sampling for this migration processes some of the rows. Sampling doesn't run
    # over every row in the table, so this does not completely migrate the table.
    expect { subject }.to change {
    define_batchable_model(table_name, connection: connection)
    .where('id IS DISTINCT FROM data').count
    }.by_at_most(-1)
    end
    described_class.new(result_dir: result_dir, connection: connection).run_jobs(for_duration: 3.minutes)
    it 'uses the correct connection to instrument the background migration' do
    expect_next_instance_of(Gitlab::Database::Migrations::Instrumentation) do |instrumentation|
    expect(instrumentation).to receive(:observe).with(hash_including(connection: connection))
    .at_least(:once).and_call_original
    end
    expect(calls).not_to be_empty
    subject
    end
    end
    context 'with multiple jobs to run' do
    it 'runs all jobs created within the last 3 hours' do
    old_migration = define_background_migration(migration_name)
    migration.queue_batched_background_migration(migration_name, table_name, :id,
    job_interval: 5.minutes,
    batch_size: 100)
    travel 4.hours
    new_migration = define_background_migration('NewMigration') { travel 1.second }
    migration.queue_batched_background_migration('NewMigration', table_name, :id,
    job_interval: 5.minutes,
    batch_size: 10,
    sub_batch_size: 5)
    other_new_migration = define_background_migration('NewMigration2') { travel 2.seconds }
    migration.queue_batched_background_migration('NewMigration2', table_name, :id,
    job_interval: 5.minutes,
    batch_size: 10,
    sub_batch_size: 5)
    expect_migration_runs(new_migration => 3, other_new_migration => 2, old_migration => 0) do
    described_class.new(result_dir: result_dir, connection: connection).run_jobs(for_duration: 5.seconds)
    context 'with jobs to run' do
    let(:migration_name) { 'TestBackgroundMigration' }
    it 'samples jobs' do
    calls = []
    define_background_migration(migration_name) do |*args|
    calls << args
    end
    queue_migration(migration_name, table_name, :id,
    job_interval: 5.minutes,
    batch_size: 100)
    described_class.new(result_dir: result_dir, connection: connection).run_jobs(for_duration: 3.minutes)
    expect(calls).not_to be_empty
    end
    context 'with multiple jobs to run' do
    it 'runs all jobs created within the last 3 hours' do
    old_migration = define_background_migration(migration_name)
    queue_migration(migration_name, table_name, :id,
    job_interval: 5.minutes,
    batch_size: 100)
    travel 4.hours
    new_migration = define_background_migration('NewMigration') { travel 1.second }
    queue_migration('NewMigration', table_name, :id,
    job_interval: 5.minutes,
    batch_size: 10,
    sub_batch_size: 5)
    other_new_migration = define_background_migration('NewMigration2') { travel 2.seconds }
    queue_migration('NewMigration2', table_name, :id,
    job_interval: 5.minutes,
    batch_size: 10,
    sub_batch_size: 5)
    expect_migration_runs(new_migration => 3, other_new_migration => 2, old_migration => 0) do
    described_class.new(result_dir: result_dir, connection: connection).run_jobs(for_duration: 5.seconds)
    end
    end
    end
    end
    end
    context 'choosing uniform batches to run' do
    subject { described_class.new(result_dir: result_dir, connection: connection) }
    context 'choosing uniform batches to run' do
    subject { described_class.new(result_dir: result_dir, connection: connection) }
    describe '#uniform_fractions' do
    it 'generates evenly distributed sequences of fractions' do
    received = subject.uniform_fractions.take(9)
    expected = [0, 1, 1.0 / 2, 1.0 / 4, 3.0 / 4, 1.0 / 8, 3.0 / 8, 5.0 / 8, 7.0 / 8]
    describe '#uniform_fractions' do
    it 'generates evenly distributed sequences of fractions' do
    received = subject.uniform_fractions.take(9)
    expected = [0, 1, 1.0 / 2, 1.0 / 4, 3.0 / 4, 1.0 / 8, 3.0 / 8, 5.0 / 8, 7.0 / 8]
    # All the fraction numerators are small integers, and all denominators are powers of 2, so these
    # fit perfectly into floating point numbers with zero loss of precision
    expect(received).to eq(expected)
    # All the fraction numerators are small integers, and all denominators are powers of 2, so these
    # fit perfectly into floating point numbers with zero loss of precision
    expect(received).to eq(expected)
    end
    end
    end
    end
    ......
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Finish editing this message first!
    Please register or to comment