Skip to content
Snippets Groups Projects
Commit 905a3823 authored by David Fernandez's avatar David Fernandez :two:
Browse files

Add the packages execute cleanup policy service

Update the related services to support this change

Changelog: added
parent 675be82c
No related branches found
No related tags found
1 merge request!90395Add the packages execute cleanup service
Showing with 396 additions and 11 deletions
......@@ -27,6 +27,10 @@ def set_next_run_at
# fixed cadence of 12 hours
self.next_run_at = Time.zone.now + 12.hours
end
def keep_n_duplicated_package_files_disabled?
keep_n_duplicated_package_files == 'all'
end
end
end
end
# frozen_string_literal: true
module Packages
module Cleanup
class ExecutePolicyService
include Gitlab::Utils::StrongMemoize
MAX_EXECUTION_TIME = 250.seconds
DUPLICATED_FILES_BATCH_SIZE = 10_000
MARK_PACKAGE_FILES_FOR_DESTRUCTION_SERVICE_BATCH_SIZE = 200
def initialize(policy)
@policy = policy
@counts = {
marked_package_files_total_count: 0,
unique_package_id_and_file_name_total_count: 0
}
end
def execute
cleanup_duplicated_files
end
private
def cleanup_duplicated_files
return if @policy.keep_n_duplicated_package_files_disabled?
result = installable_package_files.each_batch(of: DUPLICATED_FILES_BATCH_SIZE) do |package_files|
break :timeout if cleanup_duplicated_files_on(package_files) == :timeout
end
response_success(timeout: result == :timeout)
end
def cleanup_duplicated_files_on(package_files)
unique_package_id_and_file_name_from(package_files).each do |package_id, file_name|
result = remove_duplicated_files_for(package_id: package_id, file_name: file_name)
@counts[:marked_package_files_total_count] += result.payload[:marked_package_files_count]
@counts[:unique_package_id_and_file_name_total_count] += 1
break :timeout unless result.success?
end
end
def unique_package_id_and_file_name_from(package_files)
# This is a highly custom query for this service, that's why it's not in the model.
# rubocop: disable CodeReuse/ActiveRecord
package_files.group(:package_id, :file_name)
.having("COUNT(*) > #{@policy.keep_n_duplicated_package_files}")
.pluck(:package_id, :file_name)
# rubocop: enable CodeReuse/ActiveRecord
end
def remove_duplicated_files_for(package_id:, file_name:)
base = ::Packages::PackageFile.for_package_ids(package_id)
.installable
.with_file_name(file_name)
ids_to_keep = base.recent
.limit(@policy.keep_n_duplicated_package_files)
.pluck_primary_key
duplicated_package_files = base.id_not_in(ids_to_keep)
::Packages::MarkPackageFilesForDestructionService.new(duplicated_package_files)
.execute(batch_deadline: batch_deadline, batch_size: MARK_PACKAGE_FILES_FOR_DESTRUCTION_SERVICE_BATCH_SIZE)
end
def project
@policy.project
end
def installable_package_files
::Packages::PackageFile.installable
.for_package_ids(
::Packages::Package.installable
.for_projects(project.id)
)
end
def batch_deadline
strong_memoize(:batch_deadline) do
MAX_EXECUTION_TIME.from_now
end
end
def response_success(timeout:)
ServiceResponse.success(
message: "Packages cleanup policy executed for project #{project.id}",
payload: {
timeout: timeout,
counts: @counts
}
)
end
end
end
end
......@@ -9,18 +9,41 @@ def initialize(package_files)
@package_files = package_files
end
def execute
@package_files.each_batch(of: BATCH_SIZE) do |batched_package_files|
batched_package_files.update_all(status: :pending_destruction)
def execute(batch_deadline: nil, batch_size: BATCH_SIZE)
timeout = false
updates_count = 0
min_batch_size = [batch_size, BATCH_SIZE].min
@package_files.each_batch(of: min_batch_size) do |batched_package_files|
if batch_deadline && Time.zone.now > batch_deadline
timeout = true
break
end
updates_count += batched_package_files.update_all(status: :pending_destruction)
end
service_response_success('Package files are now pending destruction')
payload = { marked_package_files_count: updates_count }
return response_error(payload) if timeout
response_success(payload)
end
private
def service_response_success(message)
ServiceResponse.success(message: message)
def response_success(payload)
ServiceResponse.success(
message: 'Package files are now pending destruction',
payload: payload
)
end
def response_error(payload)
ServiceResponse.error(
message: 'Timeout while marking package files as pending destruction',
payload: payload
)
end
end
end
# frozen_string_literal: true
class AddIndexOnInstallablePackageFiles < Gitlab::Database::Migration[2.0]
disable_ddl_transaction!
INDEX_NAME = 'idx_pkgs_installable_package_files_on_package_id_id_file_name'
# See https://gitlab.com/gitlab-org/gitlab/-/blob/e3ed2c1f65df2e137fc714485d7d42264a137968/app/models/packages/package_file.rb#L16
DEFAULT_STATUS = 0
def up
add_concurrent_index :packages_package_files,
[:package_id, :id, :file_name],
where: "(status = #{DEFAULT_STATUS})",
name: INDEX_NAME
end
def down
remove_concurrent_index_by_name :packages_package_files, INDEX_NAME
end
end
# frozen_string_literal: true
class ReplacePackagesIndexOnProjectIdAndStatus < Gitlab::Database::Migration[2.0]
disable_ddl_transaction!
NEW_INDEX_NAME = 'index_packages_packages_on_project_id_and_status_and_id'
OLD_INDEX_NAME = 'index_packages_packages_on_project_id_and_status'
def up
add_concurrent_index :packages_packages,
[:project_id, :status, :id],
name: NEW_INDEX_NAME
remove_concurrent_index_by_name :packages_packages, OLD_INDEX_NAME
end
def down
add_concurrent_index :packages_packages,
[:project_id, :status],
name: OLD_INDEX_NAME
remove_concurrent_index_by_name :packages_packages, NEW_INDEX_NAME
end
end
668404076e9cfc91817b8ae3ec995a69ec0db283153bbe497a81eb83c2188ceb
\ No newline at end of file
547fc0071177395133497cbcec9a9d9ed058fe74f632f5e84d9a6416047503f2
\ No newline at end of file
......@@ -27116,6 +27116,8 @@ CREATE INDEX idx_pkgs_debian_project_distribution_keys_on_distribution_id ON pac
 
CREATE UNIQUE INDEX idx_pkgs_dep_links_on_pkg_id_dependency_id_dependency_type ON packages_dependency_links USING btree (package_id, dependency_id, dependency_type);
 
CREATE INDEX idx_pkgs_installable_package_files_on_package_id_id_file_name ON packages_package_files USING btree (package_id, id, file_name) WHERE (status = 0);
CREATE INDEX idx_proj_feat_usg_on_jira_dvcs_cloud_last_sync_at_and_proj_id ON project_feature_usages USING btree (jira_dvcs_cloud_last_sync_at, project_id) WHERE (jira_dvcs_cloud_last_sync_at IS NOT NULL);
 
CREATE INDEX idx_proj_feat_usg_on_jira_dvcs_server_last_sync_at_and_proj_id ON project_feature_usages USING btree (jira_dvcs_server_last_sync_at, project_id) WHERE (jira_dvcs_server_last_sync_at IS NOT NULL);
......@@ -29096,7 +29098,7 @@ CREATE INDEX index_packages_packages_on_project_id_and_created_at ON packages_pa
 
CREATE INDEX index_packages_packages_on_project_id_and_package_type ON packages_packages USING btree (project_id, package_type);
 
CREATE INDEX index_packages_packages_on_project_id_and_status ON packages_packages USING btree (project_id, status);
CREATE INDEX index_packages_packages_on_project_id_and_status_and_id ON packages_packages USING btree (project_id, status, id);
 
CREATE INDEX index_packages_packages_on_project_id_and_version ON packages_packages USING btree (project_id, version);
 
......@@ -25,4 +25,16 @@
it { is_expected.to contain_exactly(active_policy) }
end
describe '#keep_n_duplicated_package_files_disabled?' do
subject { policy.keep_n_duplicated_package_files_disabled? }
%w[all 1].each do |value|
context "with value set to #{value}" do
let(:policy) { build(:packages_cleanup_policy, keep_n_duplicated_package_files: value) }
it { is_expected.to eq(value == 'all') }
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Packages::Cleanup::ExecutePolicyService do
let_it_be(:project) { create(:project) }
let_it_be_with_reload(:policy) { create(:packages_cleanup_policy, project: project) }
let(:service) { described_class.new(policy) }
describe '#execute' do
subject(:execute) { service.execute }
context 'with the keep_n_duplicated_files parameter' do
let_it_be(:package1) { create(:package, project: project) }
let_it_be(:package2) { create(:package, project: project) }
let_it_be(:package3) { create(:package, project: project) }
let_it_be(:package4) { create(:package, :pending_destruction, project: project) }
let_it_be(:package_file1_1) { create(:package_file, package: package1, file_name: 'file_name1') }
let_it_be(:package_file1_2) { create(:package_file, package: package1, file_name: 'file_name1') }
let_it_be(:package_file1_3) { create(:package_file, package: package1, file_name: 'file_name1') }
let_it_be(:package_file1_4) { create(:package_file, package: package1, file_name: 'file_name2') }
let_it_be(:package_file1_5) { create(:package_file, package: package1, file_name: 'file_name2') }
let_it_be(:package_file1_6) { create(:package_file, package: package1, file_name: 'file_name2') }
let_it_be(:package_file1_7) do
create(:package_file, :pending_destruction, package: package1, file_name: 'file_name2')
end
let_it_be(:package_file2_1) { create(:package_file, package: package2, file_name: 'file_name1') }
let_it_be(:package_file2_2) { create(:package_file, package: package2, file_name: 'file_name1') }
let_it_be(:package_file2_3) { create(:package_file, package: package2, file_name: 'file_name1') }
let_it_be(:package_file2_4) { create(:package_file, package: package2, file_name: 'file_name1') }
let_it_be(:package_file3_1) { create(:package_file, package: package3, file_name: 'file_name_test') }
let_it_be(:package_file4_1) { create(:package_file, package: package4, file_name: 'file_name1') }
let_it_be(:package_file4_2) { create(:package_file, package: package4, file_name: 'file_name1') }
let(:package_files_1) { package1.package_files.installable }
let(:package_files_2) { package2.package_files.installable }
let(:package_files_3) { package3.package_files.installable }
context 'set to less than the total number of duplicated files' do
before do
# for each package file duplicate, we keep only the most recent one
policy.update!(keep_n_duplicated_package_files: '1')
end
shared_examples 'keeping the most recent package files' do
let(:response_payload) do
{
counts: {
marked_package_files_total_count: 7,
unique_package_id_and_file_name_total_count: 3
},
timeout: false
}
end
it 'only keeps the most recent package files' do
expect { execute }.to change { ::Packages::PackageFile.installable.count }.by(-7)
expect(package_files_1).to contain_exactly(package_file1_3, package_file1_6)
expect(package_files_2).to contain_exactly(package_file2_4)
expect(package_files_3).to contain_exactly(package_file3_1)
expect(execute).to be_success
expect(execute.message).to eq("Packages cleanup policy executed for project #{project.id}")
expect(execute.payload).to eq(response_payload)
end
end
it_behaves_like 'keeping the most recent package files'
context 'when the service needs to loop' do
before do
stub_const("#{described_class.name}::DUPLICATED_FILES_BATCH_SIZE", 2)
end
it_behaves_like 'keeping the most recent package files' do
before do
expect(::Packages::MarkPackageFilesForDestructionService)
.to receive(:new).exactly(3).times.and_call_original
end
end
context 'when a timeout is hit' do
let(:response_payload) do
{
counts: {
marked_package_files_total_count: 4,
unique_package_id_and_file_name_total_count: 3
},
timeout: true
}
end
let(:service_timeout_response) do
ServiceResponse.error(
message: 'Timeout while marking package files as pending destruction',
payload: { marked_package_files_count: 0 }
)
end
before do
mock_service_timeout(on_iteration: 3)
end
it 'keeps part of the most recent package files' do
expect { execute }
.to change { ::Packages::PackageFile.installable.count }.by(-4)
.and not_change { package_files_2.count } # untouched because of the timeout
.and not_change { package_files_3.count } # untouched because of the timeout
expect(package_files_1).to contain_exactly(package_file1_3, package_file1_6)
expect(execute).to be_success
expect(execute.message).to eq("Packages cleanup policy executed for project #{project.id}")
expect(execute.payload).to eq(response_payload)
end
def mock_service_timeout(on_iteration:)
execute_call_count = 1
expect_next_instances_of(::Packages::MarkPackageFilesForDestructionService, 3) do |service|
expect(service).to receive(:execute).and_wrap_original do |m, *args|
# timeout if we are on the right iteration
if execute_call_count == on_iteration
service_timeout_response
else
execute_call_count += 1
m.call(*args)
end
end
end
end
end
end
end
context 'set to more than the total number of duplicated files' do
before do
# using the biggest value for keep_n_duplicated_package_files
policy.update!(keep_n_duplicated_package_files: '50')
end
it 'keeps all package files' do
expect { execute }.not_to change { ::Packages::PackageFile.installable.count }
end
end
context 'set to all' do
before do
policy.update!(keep_n_duplicated_package_files: 'all')
end
it 'skips the policy' do
expect(::Packages::MarkPackageFilesForDestructionService).not_to receive(:new)
expect { execute }.not_to change { ::Packages::PackageFile.installable.count }
end
end
end
end
end
......@@ -6,9 +6,11 @@
let(:service) { described_class.new(package_files) }
describe '#execute', :aggregate_failures do
subject { service.execute }
let(:batch_deadline) { nil }
shared_examples 'executing successfully' do
subject { service.execute(batch_deadline: batch_deadline) }
shared_examples 'executing successfully' do |marked_package_files_count: 0|
it 'marks package files for destruction' do
expect { subject }
.to change { ::Packages::PackageFile.pending_destruction.count }.by(package_files.size)
......@@ -17,6 +19,7 @@
it 'executes successfully' do
expect(subject).to be_success
expect(subject.message).to eq('Package files are now pending destruction')
expect(subject.payload).to eq(marked_package_files_count: marked_package_files_count)
end
end
......@@ -30,13 +33,49 @@
let_it_be(:package_file) { create(:package_file) }
let_it_be(:package_files) { ::Packages::PackageFile.id_in(package_file.id) }
it_behaves_like 'executing successfully'
it_behaves_like 'executing successfully', marked_package_files_count: 1
end
context 'with many package files' do
let_it_be(:package_files) { ::Packages::PackageFile.id_in(create_list(:package_file, 3).map(&:id)) }
it_behaves_like 'executing successfully'
it_behaves_like 'executing successfully', marked_package_files_count: 3
context 'with a batch deadline' do
let_it_be(:batch_deadline) { 250.seconds.from_now }
context 'when the deadline is not hit' do
before do
expect(Time.zone).to receive(:now).and_return(batch_deadline - 10.seconds)
end
it_behaves_like 'executing successfully', marked_package_files_count: 3
end
context 'when the deadline is hit' do
it 'does not execute the batch loop' do
expect(Time.zone).to receive(:now).and_return(batch_deadline + 10.seconds)
expect { subject }.to not_change { ::Packages::PackageFile.pending_destruction.count }
expect(subject).to be_error
expect(subject.message).to eq('Timeout while marking package files as pending destruction')
expect(subject.payload).to eq(marked_package_files_count: 0)
end
end
end
context 'when a batch size is defined' do
let_it_be(:batch_deadline) { 250.seconds.from_now }
let(:batch_size) { 2 }
subject { service.execute(batch_deadline: batch_deadline, batch_size: batch_size) }
before do
expect(Time.zone).to receive(:now).twice.and_call_original
end
it_behaves_like 'executing successfully', marked_package_files_count: 3
end
end
context 'with an error during the update' do
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment