Verified Commit 717bf5a6 authored by Nick Thomas's avatar Nick Thomas 💃

Migrate diffs immediately when an MR is closed or merged

It seems difficult to perform full table scans on merge_request_diffs,
so consider an approach that allows us to queue migrations to external
storage in an event-based manner instead.

This necessitates dropping the 7-days-old cutoff for merged and closed
MRs, but that doesn't seem like a huge problem to me.
parent 6f639831
Pipeline #53179976 failed with stages
in 57 minutes and 22 seconds
......@@ -12,10 +12,6 @@ class MergeRequestDiff < ActiveRecord::Base
# Don't display more than 100 commits at once
COMMITS_SAFE_SIZE = 100
# Applies to closed or merged MRs when determining whether to migrate their
# diffs to external storage
EXTERNAL_DIFF_CUTOFF = 7.days.freeze
belongs_to :merge_request
manual_inverse_association :merge_request, :merge_request_diff
......@@ -52,81 +48,6 @@ class MergeRequestDiff < ActiveRecord::Base
end
scope :recent, -> { order(id: :desc).limit(100) }
scope :files_in_database, -> { where(stored_externally: [false, nil]) }
scope :not_latest_diffs, -> do
merge_requests = MergeRequest.arel_table
mr_diffs = arel_table
join_condition = merge_requests[:id].eq(mr_diffs[:merge_request_id])
.and(mr_diffs[:id].not_eq(merge_requests[:latest_merge_request_diff_id]))
arel_join = mr_diffs.join(merge_requests).on(join_condition)
joins(arel_join.join_sources)
end
scope :old_merged_diffs, -> (before) do
merge_requests = MergeRequest.arel_table
mr_metrics = MergeRequest::Metrics.arel_table
mr_diffs = arel_table
mr_join = mr_diffs
.join(merge_requests)
.on(mr_diffs[:merge_request_id].eq(merge_requests[:id]))
metrics_join_condition = mr_diffs[:merge_request_id]
.eq(mr_metrics[:merge_request_id])
.and(mr_metrics[:merged_at].not_eq(nil))
metrics_join = mr_diffs.join(mr_metrics).on(metrics_join_condition)
condition = MergeRequest.arel_table[:state].eq(:merged)
.and(MergeRequest::Metrics.arel_table[:merged_at].lteq(before))
.and(MergeRequest::Metrics.arel_table[:merged_at].not_eq(nil))
joins(metrics_join.join_sources, mr_join.join_sources).where(condition)
end
scope :old_closed_diffs, -> (before) do
condition = MergeRequest.arel_table[:state].eq(:closed)
.and(MergeRequest::Metrics.arel_table[:latest_closed_at].lteq(before))
joins(merge_request: :metrics).where(condition)
end
def self.ids_for_external_storage_migration(limit:)
# No point doing any work unless the feature is enabled
return [] unless Gitlab.config.external_diffs.enabled
case Gitlab.config.external_diffs.when
when 'always'
files_in_database.limit(limit).pluck(:id)
when 'outdated'
# Outdated is too complex to be a single SQL query, so split into three
before = EXTERNAL_DIFF_CUTOFF.ago
ids = files_in_database
.old_merged_diffs(before)
.limit(limit)
.pluck(:id)
return ids if ids.size >= limit
ids += files_in_database
.old_closed_diffs(before)
.limit(limit - ids.size)
.pluck(:id)
return ids if ids.size >= limit
ids + files_in_database
.not_latest_diffs
.limit(limit - ids.size)
.pluck(:id)
else
[]
end
end
mount_uploader :external_diff, ExternalDiffUploader
......@@ -457,24 +378,12 @@ class MergeRequestDiff < ActiveRecord::Base
when 'always'
true
when 'outdated'
outdated_by_merge? || outdated_by_closure? || old_version?
merge_request.merged? || merge_request.closed? || old_version?
else
false # Disable external diffs if misconfigured
end
end
def outdated_by_merge?
return false unless merge_request&.metrics&.merged_at
merge_request.merged? && merge_request.metrics.merged_at < EXTERNAL_DIFF_CUTOFF.ago
end
def outdated_by_closure?
return false unless merge_request&.metrics&.latest_closed_at
merge_request.closed? && merge_request.metrics.latest_closed_at < EXTERNAL_DIFF_CUTOFF.ago
end
# We can't rely on `merge_request.latest_merge_request_diff_id` because that
# may have been changed in `save_git_content` without being reflected in
# the association's instance. This query is always subject to races, but
......
......@@ -17,6 +17,7 @@ module MergeRequests
execute_hooks(merge_request, 'close')
invalidate_cache_counts(merge_request, users: merge_request.assignees)
merge_request.update_project_counter_caches
migrate_diff_files_to_external_storage(merge_request)
end
merge_request
......@@ -32,5 +33,9 @@ module MergeRequests
merge_request_metrics_service(merge_request).close(close_event)
end
end
def migrate_diff_files_to_external_storage(merge_request)
MigrateExternalDiffsWorker.perform_async(merge_request.id)
end
end
end
......@@ -2,22 +2,16 @@
module MergeRequests
class MigrateExternalDiffsService < ::BaseService
MAX_JOBS = 1000.freeze
attr_reader :merge_request
attr_reader :diff
def self.enqueue!
ids = MergeRequestDiff.ids_for_external_storage_migration(limit: MAX_JOBS)
MigrateExternalDiffsWorker.bulk_perform_async(ids.map { |id| [id] })
end
def initialize(merge_request_diff)
@diff = merge_request_diff
def initialize(merge_request)
@merge_request = merge_request
end
def execute
diff.migrate_files_to_external_storage!
merge_request.merge_request_diffs.each do |diff|
diff.migrate_files_to_external_storage!
end
end
end
end
......@@ -18,6 +18,7 @@ module MergeRequests
invalidate_cache_counts(merge_request, users: merge_request.assignees)
merge_request.update_project_counter_caches
delete_non_latest_diffs(merge_request)
migrate_diff_files_to_external_storage(merge_request)
end
private
......@@ -38,6 +39,10 @@ module MergeRequests
DeleteNonLatestDiffsService.new(merge_request).execute
end
def migrate_diff_files_to_external_storage(merge_request)
MigrateExternalDiffsWorker.perform_async(merge_request.id)
end
def create_merge_event(merge_request, current_user)
EventCreateService.new.merge_mr(merge_request, current_user)
end
......
......@@ -61,6 +61,8 @@ module MergeRequests
merge_request.mark_as_unchecked
end
MigrateExternalDiffsWorker.perform_async(merge_request.id) if merge_request.open?
handle_milestone_change(merge_request)
added_labels = merge_request.labels - old_labels
......
......@@ -21,7 +21,6 @@
- cronjob:trending_projects
- cronjob:issue_due_scheduler
- cronjob:prune_web_hook_logs
- cronjob:schedule_migrate_external_diffs
- gcp_cluster:cluster_install_app
- gcp_cluster:cluster_patch_app
......
......@@ -3,10 +3,10 @@
class MigrateExternalDiffsWorker
include ApplicationWorker
def perform(merge_request_diff_id)
diff = MergeRequestDiff.find_by_id(merge_request_diff_id)
return unless diff
def perform(merge_request_id)
merge_request = MergeRequest.find_by_id(merge_request_id)
return unless merge_request
MergeRequests::MigrateExternalDiffsService.new(diff).execute
MergeRequests::MigrateExternalDiffsService.new(merge_request).execute
end
end
# frozen_string_literal: false
class ScheduleMigrateExternalDiffsWorker
include ApplicationWorker
include CronjobQueue
include Gitlab::ExclusiveLeaseHelpers
def perform
in_lock(self.class.name.underscore, ttl: 2.hours, retries: 0) do
MergeRequests::MigrateExternalDiffsService.enqueue!
end
rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
end
end
......@@ -301,10 +301,6 @@ production: &base
pages_domain_verification_cron_worker:
cron: "*/15 * * * *"
# Periodically migrate diffs from the database to external storage
schedule_migrate_external_diffs_worker:
cron: "15 * * * *"
registry:
# enabled: true
# host: registry.example.com
......
......@@ -345,10 +345,6 @@ Settings.cron_jobs['prune_web_hook_logs_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['prune_web_hook_logs_worker']['cron'] ||= '0 */1 * * *'
Settings.cron_jobs['prune_web_hook_logs_worker']['job_class'] = 'PruneWebHookLogsWorker'
Settings.cron_jobs['schedule_migrate_external_diffs_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['schedule_migrate_external_diffs_worker']['cron'] ||= '15 * * * *'
Settings.cron_jobs['schedule_migrate_external_diffs_worker']['job_class'] = 'ScheduleMigrateExternalDiffsWorker'
#
# Sidekiq
#
......
......@@ -89,4 +89,4 @@
- [project_daily_statistics, 1]
- [import_issues_csv, 2]
- [chat_notification, 2]
- [migrate_external_diffs, 1]
- [migrate_external_diffs, 1]
# frozen_string_literal: true
class AddIndexesForMergeRequestDiffsQuery < ActiveRecord::Migration[5.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
INDEX_SPECS = [
[:merge_request_metrics, :latest_closed_at, { where: 'latest_closed_at IS NOT NULL' }],
[:merge_request_metrics, :merged_at, { where: 'merged_at IS NOT NULL' }],
[
:merge_request_diffs,
[:merge_request_id, :id],
{
name: 'index_merge_request_diffs_on_merge_request_id_and_id_partial',
where: 'NOT stored_externally OR stored_externally IS NULL'
}
]
].freeze
disable_ddl_transaction!
def up
INDEX_SPECS.each do |spec|
add_concurrent_index(*spec) unless index_exists?(*spec)
end
end
def down
INDEX_SPECS.reverse.each do |spec|
remove_concurrent_index(*spec) if index_exists?(*spec)
end
end
end
......@@ -1248,7 +1248,6 @@ ActiveRecord::Schema.define(version: 20190301182457) do
t.integer "external_diff_store"
t.boolean "stored_externally"
t.index ["merge_request_id", "id"], name: "index_merge_request_diffs_on_merge_request_id_and_id", using: :btree
t.index ["merge_request_id", "id"], name: "index_merge_request_diffs_on_merge_request_id_and_id_partial", where: "((NOT stored_externally) OR (stored_externally IS NULL))", using: :btree
end
create_table "merge_request_metrics", force: :cascade do |t|
......@@ -1264,10 +1263,8 @@ ActiveRecord::Schema.define(version: 20190301182457) do
t.integer "latest_closed_by_id"
t.datetime_with_timezone "latest_closed_at"
t.index ["first_deployed_to_production_at"], name: "index_merge_request_metrics_on_first_deployed_to_production_at", using: :btree
t.index ["latest_closed_at"], name: "index_merge_request_metrics_on_latest_closed_at", where: "(latest_closed_at IS NOT NULL)", using: :btree
t.index ["latest_closed_by_id"], name: "index_merge_request_metrics_on_latest_closed_by_id", using: :btree
t.index ["merge_request_id"], name: "index_merge_request_metrics", using: :btree
t.index ["merged_at"], name: "index_merge_request_metrics_on_merged_at", where: "(merged_at IS NOT NULL)", using: :btree
t.index ["merged_by_id"], name: "index_merge_request_metrics_on_merged_by_id", using: :btree
t.index ["pipeline_id"], name: "index_merge_request_metrics_on_pipeline_id", using: :btree
end
......
......@@ -183,8 +183,8 @@ than externally. They will be moved to external storage once any of these
conditions become true:
- A newer version of the merge request diff exists
- The merge request was merged more than seven days ago
- The merge request was closed more than seven day ago
- The merge request was merged
- The merge request was closed
These rules strike a balance between space and performance by only storing
frequently-accessed diffs in the database. Diffs that are less likely to be
......
......@@ -46,26 +46,10 @@ FactoryBot.define do
target_branch "improve/awesome"
end
trait :merged_last_month do
merged
after(:build) do |merge_request|
merge_request.build_metrics.merged_at = 1.month.ago
end
end
trait :closed do
state :closed
end
trait :closed_last_month do
closed
after(:build) do |merge_request|
merge_request.build_metrics.latest_closed_at = 1.month.ago
end
end
trait :opened do
state :opened
end
......
......@@ -51,72 +51,6 @@ describe MergeRequestDiff do
end
end
describe '.ids_for_external_storage_migration' do
set(:merge_request) { create(:merge_request) }
set(:outdated) { merge_request.merge_request_diff }
set(:latest) { merge_request.create_merge_request_diff }
set(:closed_mr) { create(:merge_request, :closed_last_month) }
let(:closed) { closed_mr.merge_request_diff }
set(:merged_mr) { create(:merge_request, :merged_last_month) }
let(:merged) { merged_mr.merge_request_diff }
set(:recently_closed_mr) { create(:merge_request, :closed) }
let(:closed_recently) { recently_closed_mr.merge_request_diff }
set(:recently_merged_mr) { create(:merge_request, :merged) }
let(:merged_recently) { recently_merged_mr.merge_request_diff }
before do
merge_request.update!(latest_merge_request_diff: latest)
end
subject { described_class.ids_for_external_storage_migration(limit: 1000) }
context 'external diffs are disabled' do
before do
stub_external_diffs_setting(enabled: false)
end
it { is_expected.to be_empty }
end
context 'external diffs are misconfigured' do
before do
stub_external_diffs_setting(enabled: true, when: 'every second tuesday')
end
it { is_expected.to be_empty }
end
context 'external diffs are enabled unconditionally' do
before do
stub_external_diffs_setting(enabled: true)
end
it { is_expected.to contain_exactly(outdated.id, latest.id, closed.id, merged.id, closed_recently.id, merged_recently.id) }
end
context 'external diffs are enabled for outdated diffs' do
before do
stub_external_diffs_setting(enabled: true, when: 'outdated')
end
it 'returns records for outdated merge request versions' do
is_expected.to contain_exactly(outdated.id, closed.id, merged.id)
end
end
context 'with limit' do
it 'respects the limit' do
stub_external_diffs_setting(enabled: true)
expect(described_class.ids_for_external_storage_migration(limit: 3).count).to eq(3)
end
end
end
describe '#migrate_files_to_external_storage!' do
let(:diff) { create(:merge_request).merge_request_diff }
......@@ -338,18 +272,6 @@ describe MergeRequestDiff do
expect(diff).not_to be_stored_externally
end
it 'stores diffs for recently closed MRs in the database' do
mr = create(:merge_request, :closed)
expect(mr.merge_request_diff).not_to be_stored_externally
end
it 'stores diffs for recently merged MRs in the database' do
mr = create(:merge_request, :merged)
expect(mr.merge_request_diff).not_to be_stored_externally
end
it 'stores diffs for old MR versions in external storage' do
old_diff = diff
merge_request.create_merge_request_diff
......@@ -358,14 +280,14 @@ describe MergeRequestDiff do
expect(old_diff).to be_stored_externally
end
it 'stores diffs for old closed MRs in external storage' do
mr = create(:merge_request, :closed_last_month)
it 'stores diffs for closed MRs in external storage' do
mr = create(:merge_request, :closed)
expect(mr.merge_request_diff).to be_stored_externally
end
it 'stores diffs for old merged MRs in external storage' do
mr = create(:merge_request, :merged_last_month)
it 'stores diffs for merged MRs in external storage' do
mr = create(:merge_request, :merged)
expect(mr.merge_request_diff).to be_stored_externally
end
......
......@@ -6,38 +6,15 @@ describe MergeRequests::MigrateExternalDiffsService do
let(:merge_request) { create(:merge_request) }
let(:diff) { merge_request.merge_request_diff }
describe '.enqueue!', :sidekiq do
around do |example|
Sidekiq::Testing.fake! { example.run }
end
it 'enqueues nothing if external diffs are disabled' do
expect(diff).not_to be_stored_externally
expect { described_class.enqueue! }
.not_to change { MigrateExternalDiffsWorker.jobs.count }
end
it 'enqueues eligible in-database diffs if external diffs are enabled' do
expect(diff).not_to be_stored_externally
stub_external_diffs_setting(enabled: true)
expect { described_class.enqueue! }
.to change { MigrateExternalDiffsWorker.jobs.count }
.by(1)
end
end
describe '#execute' do
it 'migrates an in-database diff to the external store' do
expect(diff).not_to be_stored_externally
stub_external_diffs_setting(enabled: true)
described_class.new(diff).execute
expect(diff).to be_stored_externally
expect { described_class.new(merge_request).execute }
.to change { diff.reload.stored_externally? }
.to be_truthy
end
end
end
......@@ -4,22 +4,24 @@ require 'spec_helper'
describe MigrateExternalDiffsWorker do
let(:worker) { described_class.new }
let(:diff) { create(:merge_request).merge_request_diff }
let(:merge_request) { create(:merge_request) }
describe '#perform' do
it 'migrates the listed diff' do
it 'migrates diffs for the given merge request' do
expect_next_instance_of(MergeRequests::MigrateExternalDiffsService) do |instance|
expect(instance.diff).to eq(diff)
expect(instance.merge_request).to eq(merge_request)
expect(instance).to receive(:execute)
end
worker.perform(diff.id)
worker.perform(merge_request.id)
end
it 'does nothing if the diff is missing' do
diff.destroy
it 'does nothing if the merge request is missing' do
merge_request.destroy
worker.perform(diff.id)
expect(MergeRequests::MigrateExternalDiffsService).not_to receive(:new)
worker.perform(merge_request.id)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe ScheduleMigrateExternalDiffsWorker do
include ExclusiveLeaseHelpers
let(:worker) { described_class.new }
describe '#perform' do
it 'triggers a scan for diffs to migrate' do
expect(MergeRequests::MigrateExternalDiffsService).to receive(:enqueue!)
worker.perform
end
it 'will not run if the lease is already taken' do
stub_exclusive_lease_taken('schedule_migrate_external_diffs_worker', timeout: 2.hours)
expect(MergeRequests::MigrateExternalDiffsService).not_to receive(:enqueue!)
worker.perform
end
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment