Skip to content
Snippets Groups Projects
Commit 29f38440 authored by Sam Word's avatar Sam Word :two: Committed by Tiger Watson
Browse files

Removed each_batch from PruneExpiredExportJobsService for scalability

In PruneExpiredExportJobsService, EachBatch won't scale for large
volumes of data because we're trying to batch by pkey and a timestamp.
This introduces a new index on project_export_jobs to use loop-based
deletion and keyset pagination to more efficiently prune old uploads.
parent 65cec4fe
No related branches found
No related tags found
1 merge request!145702Replace each_batch PruneExpiredExportJobsService with scalable queries
......@@ -18,6 +18,7 @@ class ProjectExportJob < ApplicationRecord
}.freeze
scope :prunable, -> { where("updated_at < ?", EXPIRES_IN.ago) }
scope :order_by_updated_at, -> { order(:updated_at, :id) }
state_machine :status, initial: :queued do
event :start do
......
......@@ -3,36 +3,46 @@
module Projects
module ImportExport
class PruneExpiredExportJobsService
BATCH_SIZE = 1000
class << self
def execute
prunable_jobs = ProjectExportJob.prunable
delete_uploads_for_expired_jobs(prunable_jobs)
delete_expired_jobs(prunable_jobs)
delete_uploads_for_expired_jobs
delete_expired_jobs
end
private
def delete_expired_jobs(prunable_jobs)
prunable_jobs.each_batch do |relation|
relation.delete_all
def delete_expired_jobs
loop do
deleted_count = ProjectExportJob.prunable.limit(BATCH_SIZE).delete_all
break if deleted_count == 0
end
end
def delete_uploads_for_expired_jobs(prunable_jobs)
prunable_uploads = get_uploads_for_prunable_jobs(prunable_jobs)
prunable_upload_keys = prunable_uploads.begin_fast_destroy
def delete_uploads_for_expired_jobs
prunable_scope = ProjectExportJob.prunable.select(:id, :updated_at)
iterator = Gitlab::Pagination::Keyset::Iterator.new(scope: prunable_scope.order_by_updated_at)
prunable_uploads.each_batch do |relation|
relation.delete_all
end
iterator.each_batch(of: BATCH_SIZE) do |prunable_job_batch_scope|
prunable_job_batch = prunable_job_batch_scope.to_a
loop do
prunable_uploads = uploads_for_expired_jobs(prunable_job_batch)
prunable_upload_keys = prunable_uploads.begin_fast_destroy
Upload.finalize_fast_destroy(prunable_upload_keys)
deleted_count = prunable_uploads.delete_all
break if deleted_count == 0
Upload.finalize_fast_destroy(prunable_upload_keys)
end
end
end
def get_uploads_for_prunable_jobs(prunable_jobs)
def uploads_for_expired_jobs(prunable_jobs)
prunable_export_uploads = Projects::ImportExport::RelationExportUpload
.for_project_export_jobs(prunable_jobs.select(:id))
.for_project_export_jobs(prunable_jobs.map(&:id))
Upload.for_model_type_and_id(
Projects::ImportExport::RelationExportUpload,
......
# frozen_string_literal: true
class AddIndexOnUpdatedAtAndIdToProjectExportJobs < Gitlab::Database::Migration[2.2]
disable_ddl_transaction!
milestone '16.10'
TABLE_NAME = :project_export_jobs
INDEX_NAME = 'index_project_export_jobs_on_updated_at_and_id'
def up
add_concurrent_index TABLE_NAME, [:updated_at, :id], name: INDEX_NAME
end
def down
remove_concurrent_index_by_name TABLE_NAME, INDEX_NAME
end
end
05f5baf54474c621e51cdab4fe58ceea6a3bd12253e53d0ccb35f4a4f18461a7
\ No newline at end of file
......@@ -26280,6 +26280,8 @@ CREATE INDEX index_project_export_jobs_on_project_id_and_status ON project_expor
 
CREATE INDEX index_project_export_jobs_on_status ON project_export_jobs USING btree (status);
 
CREATE INDEX index_project_export_jobs_on_updated_at_and_id ON project_export_jobs USING btree (updated_at, id);
CREATE INDEX index_project_feature_usages_on_project_id ON project_feature_usages USING btree (project_id);
 
CREATE UNIQUE INDEX index_project_features_on_project_id ON project_features USING btree (project_id);
......@@ -13,4 +13,45 @@
it { is_expected.to validate_presence_of(:jid) }
it { is_expected.to validate_presence_of(:status) }
end
describe 'scopes' do
let_it_be(:current_time) { Time.current }
let_it_be(:eight_days_ago) { current_time - 8.days }
let_it_be(:seven_days_ago) { current_time - 7.days }
let_it_be(:five_days_ago) { current_time - 5.days }
let_it_be(:recent_export_job) { create(:project_export_job, updated_at: five_days_ago) }
let_it_be(:week_old_export_job) { create(:project_export_job, updated_at: seven_days_ago) }
let_it_be(:prunable_export_job_1) { create(:project_export_job, updated_at: eight_days_ago) }
let_it_be(:prunable_export_job_2) { create(:project_export_job, updated_at: eight_days_ago) }
around do |example|
travel_to(current_time) { example.run }
end
describe '.prunable' do
it 'only includes records with updated_at older than 7 days ago' do
expect(described_class.prunable).to match_array([prunable_export_job_1, prunable_export_job_2])
end
end
describe '.order_by_updated_at' do
it 'sorts by updated_at' do
expect(described_class.order_by_updated_at).to eq(
[
prunable_export_job_1,
prunable_export_job_2,
week_old_export_job,
recent_export_job
]
)
end
it 'uses id as a tiebreaker' do
export_jobs_with_same_updated_at = described_class.where(updated_at: eight_days_ago).order_by_updated_at
expect(export_jobs_with_same_updated_at[0].id).to be < export_jobs_with_same_updated_at[1].id
end
end
end
end
......@@ -3,67 +3,105 @@
require 'spec_helper'
RSpec.describe Projects::ImportExport::PruneExpiredExportJobsService, feature_category: :importers do
describe '#execute' do
context 'when pruning expired jobs' do
let_it_be(:old_job_1) { create(:project_export_job, updated_at: 37.months.ago) }
let_it_be(:old_job_2) { create(:project_export_job, updated_at: 12.months.ago) }
let_it_be(:old_job_3) { create(:project_export_job, updated_at: 8.days.ago) }
let_it_be(:fresh_job_1) { create(:project_export_job, updated_at: 1.day.ago) }
let_it_be(:fresh_job_2) { create(:project_export_job, updated_at: 2.days.ago) }
let_it_be(:fresh_job_3) { create(:project_export_job, updated_at: 6.days.ago) }
let_it_be(:old_relation_export_1) { create(:project_relation_export, project_export_job_id: old_job_1.id) }
let_it_be(:old_relation_export_2) { create(:project_relation_export, project_export_job_id: old_job_2.id) }
let_it_be(:old_relation_export_3) { create(:project_relation_export, project_export_job_id: old_job_3.id) }
let_it_be(:fresh_relation_export_1) { create(:project_relation_export, project_export_job_id: fresh_job_1.id) }
let_it_be(:old_upload_1) { create(:relation_export_upload, project_relation_export_id: old_relation_export_1.id) }
let_it_be(:old_upload_2) { create(:relation_export_upload, project_relation_export_id: old_relation_export_2.id) }
let_it_be(:old_upload_3) { create(:relation_export_upload, project_relation_export_id: old_relation_export_3.id) }
let_it_be(:fresh_upload_1) do
create(
:relation_export_upload,
project_relation_export_id: fresh_relation_export_1.id
)
end
describe '#execute', :freeze_time do
let_it_be(:project) { create(:project) }
let!(:old_job_1) { create(:project_export_job, updated_at: 37.months.ago, project: project) }
let!(:old_job_2) { create(:project_export_job, updated_at: 12.months.ago, project: project) }
let!(:old_job_3) { create(:project_export_job, updated_at: 8.days.ago, project: project) }
let!(:fresh_job_1) { create(:project_export_job, updated_at: 1.day.ago, project: project) }
let!(:fresh_job_2) { create(:project_export_job, updated_at: 2.days.ago, project: project) }
let!(:fresh_job_3) { create(:project_export_job, updated_at: 6.days.ago, project: project) }
it 'prunes ProjectExportJob records and associations older than 7 days' do
expect { described_class.execute }.to change { ProjectExportJob.count }.by(-3)
it 'prunes jobs and associations older than 7 days' do
old_uploads = Upload.for_model_type_and_id(
Projects::ImportExport::RelationExportUpload,
[old_upload_1, old_upload_2, old_upload_3].map(&:id)
)
old_upload_file_paths = Uploads::Local.new.keys(old_uploads)
expect(ProjectExportJob.find_by(id: old_job_1.id)).to be_nil
expect(ProjectExportJob.find_by(id: old_job_2.id)).to be_nil
expect(ProjectExportJob.find_by(id: old_job_3.id)).to be_nil
expect(DeleteStoredFilesWorker).to receive(:perform_async).with(Uploads::Local, old_upload_file_paths)
expect(fresh_job_1.reload).to be_present
expect(fresh_job_2.reload).to be_present
expect(fresh_job_3.reload).to be_present
end
it 'prunes ProjectExportJob records in batches' do
stub_const("#{described_class.name}::BATCH_SIZE", 2)
expect { described_class.execute }.to change { ProjectExportJob.count }.by(-3)
allow(described_class).to receive(:delete_uploads_for_expired_jobs).and_return(nil)
expect(ProjectExportJob).to receive(:prunable).and_call_original.exactly(3).times
expect(ProjectExportJob.find_by(id: old_job_1.id)).to be_nil
expect(ProjectExportJob.find_by(id: old_job_2.id)).to be_nil
expect(ProjectExportJob.find_by(id: old_job_3.id)).to be_nil
described_class.execute
end
context 'with associated RelationExport records' do
let!(:old_relation_export_1) { create(:project_relation_export, project_export_job_id: old_job_1.id) }
let!(:old_relation_export_2) { create(:project_relation_export, project_export_job_id: old_job_2.id) }
let!(:old_relation_export_3) { create(:project_relation_export, project_export_job_id: old_job_3.id) }
let!(:fresh_relation_export_1) { create(:project_relation_export, project_export_job_id: fresh_job_1.id) }
it 'prunes expired RelationExport records' do
expect { described_class.execute }.to change { Projects::ImportExport::RelationExport.count }.by(-3)
expect(Projects::ImportExport::RelationExport.find_by(id: old_relation_export_1.id)).to be_nil
expect(Projects::ImportExport::RelationExport.find_by(id: old_relation_export_2.id)).to be_nil
expect(Projects::ImportExport::RelationExport.find_by(id: old_relation_export_3.id)).to be_nil
expect(Projects::ImportExport::RelationExportUpload.find_by(id: old_upload_1.id)).to be_nil
expect(Projects::ImportExport::RelationExportUpload.find_by(id: old_upload_2.id)).to be_nil
expect(Projects::ImportExport::RelationExportUpload.find_by(id: old_upload_3.id)).to be_nil
expect(old_uploads.reload).to be_empty
expect(fresh_relation_export_1.reload).to be_present
end
it 'does not delete associated records for jobs younger than 7 days' do
described_class.execute
context 'and RelationExportUploads' do
let!(:old_upload_1) { create(:relation_export_upload, project_relation_export_id: old_relation_export_1.id) }
let!(:old_upload_2) { create(:relation_export_upload, project_relation_export_id: old_relation_export_2.id) }
let!(:old_upload_3) { create(:relation_export_upload, project_relation_export_id: old_relation_export_3.id) }
let!(:fresh_upload_1) do
create(
:relation_export_upload,
project_relation_export_id: fresh_relation_export_1.id
)
end
expect(fresh_job_1.reload).to be_present
expect(fresh_job_2.reload).to be_present
expect(fresh_job_3.reload).to be_present
expect(fresh_relation_export_1.reload).to be_present
expect(fresh_upload_1.reload).to be_present
expect(
Upload.for_model_type_and_id(Projects::ImportExport::RelationExportUpload, fresh_upload_1.id)
).to be_present
let(:old_uploads) do
Upload.for_model_type_and_id(
Projects::ImportExport::RelationExportUpload,
[old_upload_1, old_upload_2, old_upload_3].map(&:id)
)
end
it 'prunes expired RelationExportUpload records' do
expect { described_class.execute }.to change { Projects::ImportExport::RelationExportUpload.count }.by(-3)
expect(Projects::ImportExport::RelationExportUpload.find_by(id: old_upload_1.id)).to be_nil
expect(Projects::ImportExport::RelationExportUpload.find_by(id: old_upload_2.id)).to be_nil
expect(Projects::ImportExport::RelationExportUpload.find_by(id: old_upload_3.id)).to be_nil
end
it 'deletes associated Upload records' do
described_class.execute
expect(old_uploads).to be_empty
expect(fresh_upload_1.reload).to be_present
expect(
Upload.for_model_type_and_id(Projects::ImportExport::RelationExportUpload, fresh_upload_1.id)
).to be_present
end
it 'deletes stored upload files' do
old_upload_file_paths = Uploads::Local.new.keys(old_uploads)
expect(DeleteStoredFilesWorker).to receive(:perform_async).with(Uploads::Local, old_upload_file_paths)
described_class.execute
end
it 'deletes expired uploads in batches' do
stub_const("#{described_class.name}::BATCH_SIZE", 2)
expect(Upload).to receive(:finalize_fast_destroy).and_call_original.twice
described_class.execute
end
end
end
end
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment