Skip to content
Snippets Groups Projects
Commit 3a4e253a authored by Rodrigo Tomonari's avatar Rodrigo Tomonari :two: Committed by George Koltsov
Browse files

Introduce parallelised BitBucket Server Importer

Update BitBucket Server to import project using multiple workers
similar to GitHub Importer

Changelog: changed
parent ecfc0e83
No related branches found
No related tags found
1 merge request!120931Introduce parallelised BitBucket Server Importer
Showing
with 712 additions and 22 deletions
......@@ -2316,6 +2316,87 @@
:weight: 1
:idempotent: false
:tags: []
- :name: bitbucket_server_import_advance_stage
:worker_name: Gitlab::BitbucketServerImport::AdvanceStageWorker
:feature_category: :importers
:has_external_dependencies: false
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: false
:tags: []
- :name: bitbucket_server_import_import_lfs_object
:worker_name: Gitlab::BitbucketServerImport::ImportLfsObjectWorker
:feature_category: :importers
:has_external_dependencies: true
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: false
:tags: []
- :name: bitbucket_server_import_import_pull_request
:worker_name: Gitlab::BitbucketServerImport::ImportPullRequestWorker
:feature_category: :importers
:has_external_dependencies: true
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: bitbucket_server_import_import_pull_request_notes
:worker_name: Gitlab::BitbucketServerImport::ImportPullRequestNotesWorker
:feature_category: :importers
:has_external_dependencies: true
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: bitbucket_server_import_stage_finish_import
:worker_name: Gitlab::BitbucketServerImport::Stage::FinishImportWorker
:feature_category: :importers
:has_external_dependencies: true
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: false
:tags: []
- :name: bitbucket_server_import_stage_import_lfs_objects
:worker_name: Gitlab::BitbucketServerImport::Stage::ImportLfsObjectsWorker
:feature_category: :importers
:has_external_dependencies: true
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: false
:tags: []
- :name: bitbucket_server_import_stage_import_notes
:worker_name: Gitlab::BitbucketServerImport::Stage::ImportNotesWorker
:feature_category: :importers
:has_external_dependencies: true
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: false
:tags: []
- :name: bitbucket_server_import_stage_import_pull_requests
:worker_name: Gitlab::BitbucketServerImport::Stage::ImportPullRequestsWorker
:feature_category: :importers
:has_external_dependencies: true
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: false
:tags: []
- :name: bitbucket_server_import_stage_import_repository
:worker_name: Gitlab::BitbucketServerImport::Stage::ImportRepositoryWorker
:feature_category: :importers
:has_external_dependencies: true
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: false
:tags: []
- :name: bulk_import
:worker_name: BulkImportWorker
:feature_category: :importers
......
# frozen_string_literal: true
module Gitlab
module BitbucketServerImport
# ObjectImporter defines the base behaviour for every Sidekiq worker that
# imports a single resource such as a note or pull request.
module ObjectImporter
extend ActiveSupport::Concern
included do
include ApplicationWorker
data_consistency :always
feature_category :importers
worker_has_external_dependencies!
sidekiq_retries_exhausted do |msg|
args = msg['args']
jid = msg['jid']
# If a job is being exhausted we still want to notify the
# Gitlab::Import::AdvanceStageWorker to prevent the entire import from getting stuck
if args.length == 3 && (key = args.last) && key.is_a?(String)
JobWaiter.notify(key, jid)
end
end
end
def perform(project_id, hash, notify_key)
project = Project.find_by_id(project_id)
return unless project
if project.import_state&.canceled?
info(project.id, message: 'project import canceled')
return
end
import(project, hash)
ensure
notify_waiter(notify_key)
end
private
# project - An instance of `Project` to import the data into.
# hash - A Hash containing the details of the object to import.
def import(project, hash)
info(project.id, message: 'importer started')
importer_class.new(project, hash).execute
info(project.id, message: 'importer finished')
rescue ActiveRecord::RecordInvalid => e
# We do not raise exception to prevent job retry
track_exception(project, e)
rescue StandardError => e
track_and_raise_exception(project, e)
end
def notify_waiter(key)
JobWaiter.notify(key, jid)
end
# Returns the class to use for importing the object.
def importer_class
raise NotImplementedError
end
def info(project_id, extra = {})
Logger.info(log_attributes(project_id, extra))
end
def log_attributes(project_id, extra = {})
extra.merge(
project_id: project_id,
importer: importer_class.name
)
end
def track_exception(project, exception, fail_import: false)
Gitlab::Import::ImportFailureService.track(
project_id: project.id,
error_source: importer_class.name,
exception: exception,
fail_import: fail_import
)
end
def track_and_raise_exception(project, exception, fail_import: false)
track_exception(project, exception, fail_import: fail_import)
raise(exception)
end
end
end
end
# frozen_string_literal: true
module Gitlab
module BitbucketServerImport
module StageMethods
extend ActiveSupport::Concern
included do
include ApplicationWorker
worker_has_external_dependencies!
feature_category :importers
data_consistency :always
sidekiq_options dead: false, retry: 3
sidekiq_retries_exhausted do |msg, e|
Gitlab::Import::ImportFailureService.track(
project_id: msg['args'][0],
exception: e,
fail_import: true
)
end
end
# project_id - The ID of the GitLab project to import the data into.
def perform(project_id)
info(project_id, message: 'starting stage')
return unless (project = find_project(project_id))
import(project)
info(project_id, message: 'stage finished')
rescue StandardError => e
Gitlab::Import::ImportFailureService.track(
project_id: project_id,
exception: e,
error_source: self.class.name,
fail_import: abort_on_failure
)
raise(e)
end
def find_project(id)
# If the project has been marked as failed we want to bail out
# automatically.
# rubocop: disable CodeReuse/ActiveRecord
Project.joins_import_state.where(import_state: { status: :started }).find_by_id(id)
# rubocop: enable CodeReuse/ActiveRecord
end
def abort_on_failure
false
end
private
def info(project_id, extra = {})
Logger.info(log_attributes(project_id, extra))
end
def log_attributes(project_id, extra = {})
extra.merge(
project_id: project_id,
import_stage: self.class.name
)
end
end
end
end
# frozen_string_literal: true
module Gitlab
module BitbucketServerImport
# AdvanceStageWorker is a worker used by the BitBucket Server Importer to wait for a
# number of jobs to complete, without blocking a thread. Once all jobs have
# been completed this worker will advance the import process to the next
# stage.
class AdvanceStageWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
include ::Gitlab::Import::AdvanceStage
data_consistency :delayed
sidekiq_options dead: false, retry: 3
feature_category :importers
loggable_arguments 1, 2
# The known importer stages and their corresponding Sidekiq workers.
STAGES = {
notes: Stage::ImportNotesWorker,
lfs_objects: Stage::ImportLfsObjectsWorker,
finish: Stage::FinishImportWorker
}.freeze
def find_import_state(project_id)
ProjectImportState.jid_by(project_id: project_id, status: :started)
end
private
def next_stage_worker(next_stage)
STAGES.fetch(next_stage.to_sym)
end
end
end
end
# frozen_string_literal: true
module Gitlab
module BitbucketServerImport
class ImportLfsObjectWorker # rubocop:disable Scalability/IdempotentWorker
include ObjectImporter
def importer_class
Importers::LfsObjectImporter
end
end
end
end
# frozen_string_literal: true
module Gitlab
module BitbucketServerImport
class ImportPullRequestNotesWorker
include ObjectImporter
idempotent!
def importer_class
Importers::PullRequestNotesImporter
end
end
end
end
# frozen_string_literal: true
module Gitlab
module BitbucketServerImport
class ImportPullRequestWorker
include ObjectImporter
idempotent!
def importer_class
Importers::PullRequestImporter
end
end
end
end
# frozen_string_literal: true
module Gitlab
module BitbucketServerImport
module Stage
class FinishImportWorker # rubocop:disable Scalability/IdempotentWorker
include StageMethods
private
# project - An instance of Project.
def import(project)
project.after_import
Gitlab::Import::Metrics.new(:bitbucket_server_importer, project).track_finished_import
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module BitbucketServerImport
module Stage
class ImportLfsObjectsWorker # rubocop:disable Scalability/IdempotentWorker
include StageMethods
private
# project - An instance of Project.
def import(project)
waiter = importer_class.new(project).execute
project.import_state.refresh_jid_expiration
AdvanceStageWorker.perform_async(
project.id,
{ waiter.key => waiter.jobs_remaining },
:finish
)
end
def importer_class
Importers::LfsObjectsImporter
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module BitbucketServerImport
module Stage
class ImportNotesWorker # rubocop:disable Scalability/IdempotentWorker
include StageMethods
private
# project - An instance of Project.
def import(project)
waiter = importer_class.new(project).execute
project.import_state.refresh_jid_expiration
AdvanceStageWorker.perform_async(
project.id,
{ waiter.key => waiter.jobs_remaining },
:lfs_objects
)
end
def importer_class
Importers::NotesImporter
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module BitbucketServerImport
module Stage
class ImportPullRequestsWorker # rubocop:disable Scalability/IdempotentWorker
include StageMethods
private
# project - An instance of Project.
def import(project)
waiter = importer_class.new(project).execute
project.import_state.refresh_jid_expiration
AdvanceStageWorker.perform_async(
project.id,
{ waiter.key => waiter.jobs_remaining },
:notes
)
end
def importer_class
Importers::PullRequestsImporter
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module BitbucketServerImport
module Stage
class ImportRepositoryWorker # rubocop:disable Scalability/IdempotentWorker
include StageMethods
private
# project - An instance of Project.
def import(project)
importer = importer_class.new(project)
importer.execute
ImportPullRequestsWorker.perform_async(project.id)
end
def importer_class
Importers::RepositoryImporter
end
def abort_on_failure
true
end
end
end
end
end
---
name: bitbucket_server_parallel_importer
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/120931
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/411796
milestone: '16.1'
type: development
group: group::import and integrate
default_enabled: false
......@@ -77,6 +77,24 @@
- 1
- - batched_background_migrations
- 1
- - bitbucket_server_import_advance_stage
- 1
- - bitbucket_server_import_import_lfs_object
- 1
- - bitbucket_server_import_import_pull_request
- 1
- - bitbucket_server_import_import_pull_request_notes
- 1
- - bitbucket_server_import_stage_finish_import
- 1
- - bitbucket_server_import_stage_import_lfs_objects
- 1
- - bitbucket_server_import_stage_import_notes
- 1
- - bitbucket_server_import_stage_import_pull_requests
- 1
- - bitbucket_server_import_stage_import_repository
- 1
- - bulk_import
- 1
- - bulk_imports_entity
......
......@@ -68,6 +68,23 @@ def target_branch_sha
raw.dig('toRef', 'latestCommit')
end
def to_hash
{
iid: iid,
author: author,
author_email: author_email,
author_username: author_username,
description: description,
created_at: created_at,
updated_at: updated_at,
state: state,
title: title,
source_branch_name: source_branch_name,
target_branch_name: target_branch_name,
target_branch_sha: target_branch_sha
}
end
private
def created_date
......
......@@ -3,9 +3,10 @@
module Gitlab
module BitbucketServerImport
class Importer
include Loggable
attr_reader :recover_missing_commits
attr_reader :project, :project_key, :repository_slug, :client, :errors, :users, :already_imported_cache_key
attr_accessor :logger
BATCH_SIZE = 100
# The base cache key to use for tracking already imported objects.
......@@ -38,7 +39,6 @@ def initialize(project, recover_missing_commits: false)
@errors = []
@users = {}
@temp_branches = []
@logger = Gitlab::Import::Logger.build
@already_imported_cache_key = ALREADY_IMPORTED_CACHE_KEY %
{ project: project.id, collection: collection_method }
end
......@@ -427,26 +427,6 @@ def pull_request_comment_attributes(comment)
}
end
def log_debug(details)
logger.debug(log_base_data.merge(details))
end
def log_info(details)
logger.info(log_base_data.merge(details))
end
def log_warn(details)
logger.warn(log_base_data.merge(details))
end
def log_base_data
{
class: self.class.name,
project_id: project.id,
project_path: project.full_path
}
end
def metrics
@metrics ||= Gitlab::Import::Metrics.new(:bitbucket_server_importer, @project)
end
......
# frozen_string_literal: true
module Gitlab
module BitbucketServerImport
module Importers
class LfsObjectImporter
include Loggable
def initialize(project, lfs_attributes)
@project = project
@lfs_download_object = LfsDownloadObject.new(**lfs_attributes.symbolize_keys)
end
def execute
log_info(import_stage: 'import_lfs_object', message: 'starting', oid: lfs_download_object.oid)
Projects::LfsPointers::LfsDownloadService.new(project, lfs_download_object).execute
log_info(import_stage: 'import_lfs_object', message: 'finished', oid: lfs_download_object.oid)
end
private
attr_reader :project, :lfs_download_object
end
end
end
end
# frozen_string_literal: true
module Gitlab
module BitbucketServerImport
module Importers
class LfsObjectsImporter
include ParallelScheduling
def execute
log_info(import_stage: 'import_lfs_objects', message: 'starting')
download_service = Projects::LfsPointers::LfsObjectDownloadListService.new(project)
begin
queue_workers(download_service) if project&.lfs_enabled?
rescue StandardError => e
track_import_failure!(project, exception: e)
end
log_info(import_stage: 'import_lfs_objects', message: 'finished')
job_waiter
end
def sidekiq_worker_class
ImportLfsObjectWorker
end
def collection_method
:lfs_objects
end
def id_for_already_processed_cache(lfs_download_object)
lfs_download_object.oid
end
private
def queue_workers(download_service)
download_service.each_list_item do |lfs_download_object|
# Needs to come before `already_processed?` as `jobs_remaining` resets to zero when the job restarts and
# jobs_remaining needs to be the total amount of enqueued jobs
job_waiter.jobs_remaining += 1
next if already_processed?(lfs_download_object)
job_delay = calculate_job_delay(job_waiter.jobs_remaining)
sidekiq_worker_class.perform_in(job_delay, project.id, lfs_download_object.as_json, job_waiter.key)
mark_as_processed(lfs_download_object)
end
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module BitbucketServerImport
module Importers
class NotesImporter
include ParallelScheduling
def execute
project.merge_requests.find_each do |merge_request|
# Needs to come before `already_processed?` as `jobs_remaining` resets to zero when the job restarts and
# jobs_remaining needs to be the total amount of enqueued jobs
job_waiter.jobs_remaining += 1
next if already_processed?(merge_request)
job_delay = calculate_job_delay(job_waiter.jobs_remaining)
sidekiq_worker_class.perform_in(job_delay, project.id, { iid: merge_request.iid }, job_waiter.key)
mark_as_processed(merge_request)
end
job_waiter
end
private
attr_reader :project
def sidekiq_worker_class
ImportPullRequestNotesWorker
end
def id_for_already_processed_cache(merge_request)
merge_request.iid
end
def collection_method
:notes
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module BitbucketServerImport
module Importers
class PullRequestImporter
include Loggable
def initialize(project, hash)
@project = project
@formatter = Gitlab::ImportFormatter.new
@user_finder = UserFinder.new(project)
# Object should behave as a object so we can remove object.is_a?(Hash) check
# This will be fixed in https://gitlab.com/gitlab-org/gitlab/-/issues/412328
@object = hash.with_indifferent_access
end
def execute
log_info(import_stage: 'import_pull_request', message: 'starting', iid: object[:iid])
description = ''
description += author_line
description += object[:description] if object[:description]
attributes = {
iid: object[:iid],
title: object[:title],
description: description,
source_project_id: project.id,
source_branch: Gitlab::Git.ref_name(object[:source_branch_name]),
source_branch_sha: object[:source_branch_sha],
target_project_id: project.id,
target_branch: Gitlab::Git.ref_name(object[:target_branch_name]),
target_branch_sha: object[:target_branch_sha],
state_id: MergeRequest.available_states[object[:state]],
author_id: user_finder.author_id(object),
created_at: object[:created_at],
updated_at: object[:updated_at]
}
creator = Gitlab::Import::MergeRequestCreator.new(project)
creator.execute(attributes)
log_info(import_stage: 'import_pull_request', message: 'finished', iid: object[:iid])
end
private
attr_reader :object, :project, :formatter, :user_finder
def author_line
return '' if user_finder.uid(object)
formatter.author_line(object[:author])
end
end
end
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment