Skip to content
Snippets Groups Projects

Phase 2 enqueuer

Merged Steve Abrams requested to merge 349744-phase2-enqueuer into master
All threads resolved!
Compare and
12 files
+ 549
17
Compare changes
  • Side-by-side
  • Inline
Files
12
@@ -10,14 +10,17 @@ class ContainerRepository < ApplicationRecord
REQUIRING_CLEANUP_STATUSES = %i[cleanup_unscheduled cleanup_scheduled].freeze
IDLE_MIGRATION_STATES = %w[default pre_import_done import_done import_aborted import_skipped].freeze
ACTIVE_MIGRATION_STATES = %w[pre_importing importing].freeze
ABORTABLE_MIGRATION_STATES = %w[default pre_importing importing].freeze
MIGRATION_STATES = (IDLE_MIGRATION_STATES + ACTIVE_MIGRATION_STATES).freeze
TooManyImportsError = Class.new(StandardError)
belongs_to :project
validates :name, length: { minimum: 0, allow_nil: false }
validates :name, uniqueness: { scope: :project_id }
validates :migration_state, presence: true, inclusion: { in: MIGRATION_STATES }
validates :migration_aborted_in_state, inclusion: { in: ACTIVE_MIGRATION_STATES }, allow_nil: true
validates :migration_aborted_in_state, inclusion: { in: ABORTABLE_MIGRATION_STATES }, allow_nil: true
validates :migration_retries_count, presence: true,
numericality: { greater_than_or_equal_to: 0 },
@@ -30,6 +33,7 @@ class ContainerRepository < ApplicationRecord
delegate :client, :gitlab_api_client, to: :registry
scope :ordered, -> { order(:name) }
scope :unordered, -> { order('') }
scope :with_api_entity_associations, -> { preload(project: [:route, { namespace: :route }]) }
scope :for_group_and_its_subgroups, ->(group) do
project_scope = Project
@@ -44,8 +48,27 @@ class ContainerRepository < ApplicationRecord
scope :waiting_for_cleanup, -> { where(expiration_policy_cleanup_status: WAITING_CLEANUP_STATUSES) }
scope :expiration_policy_started_at_nil_or_before, ->(timestamp) { where('expiration_policy_started_at < ? OR expiration_policy_started_at IS NULL', timestamp) }
scope :with_stale_ongoing_cleanup, ->(threshold) { cleanup_ongoing.where('expiration_policy_started_at < ?', threshold) }
state_machine :migration_state, initial: :default do
scope :import_in_process, -> { where(migration_state: %w[pre_importing pre_import_done importing]) }
scope :aborted_imports, -> { where(migration_state: :import_aborted).unordered }
scope :recently_imported, -> { where(migration_state: :import_done).order(:migration_import_done_at) }
scope :ready_for_import, -> do
joins(:project).where(
migration_state: [:default],
created_at: ...ContainerRegistry::Migration.created_before
).with_target_import_tier
.where(
"NOT EXISTS (
SELECT 1
FROM feature_gates
WHERE feature_gates.feature_key = 'container_registry_phase_2_deny_list'
AND feature_gates.key = 'actors'
AND feature_gates.value = concat('Group:', projects.namespace_id)
)"
).unordered
end
state_machine :migration_state, initial: :default, use_transactions: false do
state :pre_importing do
validates :migration_pre_import_started_at, presence: true
validates :migration_pre_import_done_at, presence: false
@@ -96,11 +119,11 @@ class ContainerRepository < ApplicationRecord
end
event :abort_import do
transition %i[pre_importing importing] => :import_aborted
transition ABORTABLE_MIGRATION_STATES.map(&:to_sym) => :import_aborted
end
event :skip_import do
transition %i[default pre_importing importing] => :import_skipped
transition ABORTABLE_MIGRATION_STATES.map(&:to_sym) => :import_skipped
end
event :retry_pre_import do
@@ -117,7 +140,9 @@ class ContainerRepository < ApplicationRecord
end
after_transition any => :pre_importing do |container_repository|
container_repository.abort_import unless container_repository.migration_pre_import == :ok
container_repository.try_import do
container_repository.migration_pre_import
end
end
before_transition pre_importing: :pre_import_done do |container_repository|
@@ -130,7 +155,9 @@ class ContainerRepository < ApplicationRecord
end
after_transition any => :importing do |container_repository|
container_repository.abort_import unless container_repository.migration_import == :ok
container_repository.try_import do
container_repository.migration_import
end
end
before_transition importing: :import_done do |container_repository|
@@ -151,11 +178,6 @@ class ContainerRepository < ApplicationRecord
before_transition any => :import_skipped do |container_repository|
container_repository.migration_skipped_at = Time.zone.now
end
before_transition any => %i[import_done import_aborted] do
# EnqueuerJob.enqueue perform_async or perform_in depending on the speed FF
# To be implemented in https://gitlab.com/gitlab-org/gitlab/-/issues/349744
end
end
def self.exists_by_path?(path)
@@ -181,6 +203,11 @@ def self.with_unfinished_cleanup
with_enabled_policy.cleanup_unfinished
end
def self.with_target_import_tier
# overridden in ee
all
end
def skip_import(reason:)
self.migration_skipped_reason = reason
@@ -205,6 +232,52 @@ def retry_import
super
end
def abort_import
# workers cannot be called from within the transaction so we call
# it separately here instead of using callbacks
result = super
::ContainerRegistry::Migration::EnqueuerWorker.perform_async
result
end
def finish_import
# workers cannot be called from within the transaction so we call
# it separately here instead of using callbacks
result = super
::ContainerRegistry::Migration::EnqueuerWorker.perform_async
result
end
def try_import
raise ArgumentError, 'block not given' unless block_given?
try_count = 0
begin
try_count += 1
case yield
when :ok
true
when :too_many_imports
raise TooManyImportsError
else
abort_import
false
end
rescue TooManyImportsError
if try_count <= ::ContainerRegistry::Migration.start_max_retries
sleep 0.1 * try_count
retry
else
abort_import
false
end
end
end
# rubocop: disable CodeReuse/ServiceClass
def registry
@registry ||= begin
Loading