Skip to content
Snippets Groups Projects
Commit a7dbf3aa authored by Quang-Minh Nguyen's avatar Quang-Minh Nguyen
Browse files

Propagate feature flag actors in all Gitaly RPC services

In gitaly#4459, we introduced
Repository actor for Gitaly. Due to its complication, we control the
new actor propagation strictly. All the new changes are behind
`actors_aware_gitaly_calls` flag. We also applied to Gitaly commit
service only.

This commit applies the actor propagation module to all Gitaly RPC
services. They are still controlled by the aforementioned feature flag
in case we face a performance degrade.

Issue: gitaly#4613


Changelog: other
Signed-off-by: Quang-Minh Nguyen's avatarQuang-Minh Nguyen <qmnguyen@gitlab.com>
parent fb1b6f78
No related branches found
No related tags found
1 merge request!103513Propagate feature flag actors in all Gitaly RPC services
Showing with 237 additions and 106 deletions
......@@ -4,9 +4,12 @@ module Gitlab
module GitalyClient
class BlobService
include Gitlab::EncodingHelper
include WithFeatureFlagActors
def initialize(repository)
@gitaly_repo = repository.gitaly_repository
self.repository_actor = repository
end
def get_blob(oid:, limit:)
......@@ -15,7 +18,7 @@ def get_blob(oid:, limit:)
oid: oid,
limit: limit
)
response = GitalyClient.call(@gitaly_repo.storage_name, :blob_service, :get_blob, request, timeout: GitalyClient.fast_timeout)
response = gitaly_client_call(@gitaly_repo.storage_name, :blob_service, :get_blob, request, timeout: GitalyClient.fast_timeout)
consume_blob_response(response)
end
......@@ -35,7 +38,7 @@ def list_blobs(revisions, limit: 0, bytes_limit: 0, with_paths: false, dynamic_t
GitalyClient.medium_timeout
end
response = GitalyClient.call(@gitaly_repo.storage_name, :blob_service, :list_blobs, request, timeout: timeout)
response = gitaly_client_call(@gitaly_repo.storage_name, :blob_service, :list_blobs, request, timeout: timeout)
GitalyClient::BlobsStitcher.new(GitalyClient::ListBlobsAdapter.new(response))
end
......@@ -47,7 +50,7 @@ def batch_lfs_pointers(blob_ids)
blob_ids: blob_ids
)
response = GitalyClient.call(@gitaly_repo.storage_name, :blob_service, :get_lfs_pointers, request, timeout: GitalyClient.medium_timeout)
response = gitaly_client_call(@gitaly_repo.storage_name, :blob_service, :get_lfs_pointers, request, timeout: GitalyClient.medium_timeout)
map_lfs_pointers(response)
end
......@@ -64,7 +67,7 @@ def get_blobs(revision_paths, limit = -1)
limit: limit
)
response = GitalyClient.call(
response = gitaly_client_call(
@gitaly_repo.storage_name,
:blob_service,
:get_blobs,
......@@ -87,7 +90,7 @@ def get_blob_types(revision_paths, limit = -1)
limit: limit
)
response = GitalyClient.call(
response = gitaly_client_call(
@gitaly_repo.storage_name,
:blob_service,
:get_blobs,
......@@ -107,7 +110,7 @@ def get_new_lfs_pointers(revisions, limit, not_in, dynamic_timeout = nil)
GitalyClient.medium_timeout
end
response = GitalyClient.call(
response = gitaly_client_call(
@gitaly_repo.storage_name,
:blob_service,
rpc,
......@@ -123,7 +126,7 @@ def get_all_lfs_pointers
revisions: [encode_binary("--all")]
)
response = GitalyClient.call(@gitaly_repo.storage_name, :blob_service, :list_lfs_pointers, request, timeout: GitalyClient.medium_timeout)
response = gitaly_client_call(@gitaly_repo.storage_name, :blob_service, :list_lfs_pointers, request, timeout: GitalyClient.medium_timeout)
map_lfs_pointers(response)
end
......
......@@ -3,6 +3,8 @@
module Gitlab
module GitalyClient
class CleanupService
include WithFeatureFlagActors
attr_reader :repository, :gitaly_repo, :storage
# 'repository' is a Gitlab::Git::Repository
......@@ -10,10 +12,12 @@ def initialize(repository)
@repository = repository
@gitaly_repo = repository.gitaly_repository
@storage = repository.storage
self.repository_actor = repository
end
def apply_bfg_object_map_stream(io, &blk)
response = GitalyClient.call(
response = gitaly_client_call(
storage,
:cleanup_service,
:apply_bfg_object_map_stream,
......
......@@ -4,6 +4,7 @@ module Gitlab
module GitalyClient
class ConflictsService
include Gitlab::EncodingHelper
include WithFeatureFlagActors
MAX_MSG_SIZE = 128.kilobytes.freeze
......@@ -12,6 +13,8 @@ def initialize(repository, our_commit_oid, their_commit_oid)
@repository = repository
@our_commit_oid = our_commit_oid
@their_commit_oid = their_commit_oid
self.repository_actor = repository
end
def list_conflict_files(allow_tree_conflicts: false)
......@@ -21,7 +24,7 @@ def list_conflict_files(allow_tree_conflicts: false)
their_commit_oid: @their_commit_oid,
allow_tree_conflicts: allow_tree_conflicts
)
response = GitalyClient.call(@repository.storage, :conflicts_service, :list_conflict_files, request, timeout: GitalyClient.long_timeout)
response = gitaly_client_call(@repository.storage, :conflicts_service, :list_conflict_files, request, timeout: GitalyClient.long_timeout)
GitalyClient::ConflictFilesStitcher.new(response, @gitaly_repo)
end
......@@ -50,7 +53,7 @@ def resolve_conflicts(target_repository, resolution, source_branch, target_branc
end
end
response = GitalyClient.call(@repository.storage, :conflicts_service, :resolve_conflicts, req_enum, remote_storage: target_repository.storage, timeout: GitalyClient.long_timeout)
response = gitaly_client_call(@repository.storage, :conflicts_service, :resolve_conflicts, req_enum, remote_storage: target_repository.storage, timeout: GitalyClient.long_timeout)
if response.resolution_error.present?
raise Gitlab::Git::Conflict::Resolver::ResolutionError, response.resolution_error
......
......@@ -3,6 +3,8 @@
module Gitlab
module GitalyClient
class ObjectPoolService
include WithFeatureFlagActors
attr_reader :object_pool, :storage
def initialize(object_pool)
......@@ -15,8 +17,10 @@ def create(repository)
object_pool: object_pool,
origin: repository.gitaly_repository)
GitalyClient.call(storage, :object_pool_service, :create_object_pool,
request, timeout: GitalyClient.medium_timeout)
GitalyClient.with_feature_flag_actors(**gitaly_feature_flag_actors(repository)) do
GitalyClient.call(storage, :object_pool_service, :create_object_pool,
request, timeout: GitalyClient.medium_timeout)
end
end
def delete
......@@ -32,8 +36,10 @@ def link_repository(repository)
repository: repository.gitaly_repository
)
GitalyClient.call(storage, :object_pool_service, :link_repository_to_object_pool,
request, timeout: GitalyClient.fast_timeout)
GitalyClient.with_feature_flag_actors(**gitaly_feature_flag_actors(repository)) do
GitalyClient.call(storage, :object_pool_service, :link_repository_to_object_pool,
request, timeout: GitalyClient.fast_timeout)
end
end
def fetch(repository)
......@@ -42,8 +48,10 @@ def fetch(repository)
origin: repository.gitaly_repository
)
GitalyClient.call(storage, :object_pool_service, :fetch_into_object_pool,
request, timeout: GitalyClient.long_timeout)
GitalyClient.with_feature_flag_actors(**gitaly_feature_flag_actors(repository)) do
GitalyClient.call(storage, :object_pool_service, :fetch_into_object_pool,
request, timeout: GitalyClient.long_timeout)
end
end
end
end
......
......@@ -4,12 +4,15 @@ module Gitlab
module GitalyClient
class OperationService
include Gitlab::EncodingHelper
include WithFeatureFlagActors
MAX_MSG_SIZE = 128.kilobytes.freeze
def initialize(repository)
@gitaly_repo = repository.gitaly_repository
@repository = repository
self.repository_actor = repository
end
def rm_tag(tag_name, user)
......@@ -19,7 +22,7 @@ def rm_tag(tag_name, user)
user: Gitlab::Git::User.from_gitlab(user).to_gitaly
)
response = GitalyClient.call(@repository.storage, :operation_service, :user_delete_tag, request, timeout: GitalyClient.long_timeout)
response = gitaly_client_call(@repository.storage, :operation_service, :user_delete_tag, request, timeout: GitalyClient.long_timeout)
if pre_receive_error = response.pre_receive_error.presence
raise Gitlab::Git::PreReceiveError, pre_receive_error
......@@ -36,7 +39,7 @@ def add_tag(tag_name, user, target, message)
timestamp: Google::Protobuf::Timestamp.new(seconds: Time.now.utc.to_i)
)
response = GitalyClient.call(@repository.storage, :operation_service, :user_create_tag, request, timeout: GitalyClient.long_timeout)
response = gitaly_client_call(@repository.storage, :operation_service, :user_create_tag, request, timeout: GitalyClient.long_timeout)
if pre_receive_error = response.pre_receive_error.presence
raise Gitlab::Git::PreReceiveError, pre_receive_error
elsif response.exists
......@@ -73,7 +76,7 @@ def user_create_branch(branch_name, user, start_point)
user: Gitlab::Git::User.from_gitlab(user).to_gitaly,
start_point: encode_binary(start_point)
)
response = GitalyClient.call(@repository.storage, :operation_service,
response = gitaly_client_call(@repository.storage, :operation_service,
:user_create_branch, request, timeout: GitalyClient.long_timeout)
if response.pre_receive_error.present?
......@@ -110,7 +113,7 @@ def user_update_branch(branch_name, user, newrev, oldrev)
oldrev: encode_binary(oldrev)
)
response = GitalyClient.call(@repository.storage, :operation_service,
response = gitaly_client_call(@repository.storage, :operation_service,
:user_update_branch, request, timeout: GitalyClient.long_timeout)
if pre_receive_error = response.pre_receive_error.presence
......@@ -125,7 +128,7 @@ def user_delete_branch(branch_name, user)
user: Gitlab::Git::User.from_gitlab(user).to_gitaly
)
response = GitalyClient.call(@repository.storage, :operation_service,
response = gitaly_client_call(@repository.storage, :operation_service,
:user_delete_branch, request, timeout: GitalyClient.long_timeout)
if pre_receive_error = response.pre_receive_error.presence
......@@ -156,7 +159,7 @@ def user_merge_to_ref(user, source_sha:, branch:, target_ref:, message:, first_p
timestamp: Google::Protobuf::Timestamp.new(seconds: Time.now.utc.to_i)
)
response = GitalyClient.call(@repository.storage, :operation_service,
response = gitaly_client_call(@repository.storage, :operation_service,
:user_merge_to_ref, request, timeout: GitalyClient.long_timeout)
response.commit_id
......@@ -164,7 +167,7 @@ def user_merge_to_ref(user, source_sha:, branch:, target_ref:, message:, first_p
def user_merge_branch(user, source_sha, target_branch, message)
request_enum = QueueEnumerator.new
response_enum = GitalyClient.call(
response_enum = gitaly_client_call(
@repository.storage,
:operation_service,
:user_merge_branch,
......@@ -225,7 +228,7 @@ def user_ff_branch(user, source_sha, target_branch)
branch: encode_binary(target_branch)
)
response = GitalyClient.call(
response = gitaly_client_call(
@repository.storage,
:operation_service,
:user_ff_branch,
......@@ -268,7 +271,7 @@ def rebase(user, rebase_id, branch:, branch_sha:, remote_repository:, remote_bra
request_enum = QueueEnumerator.new
rebase_sha = nil
response_enum = GitalyClient.call(
response_enum = gitaly_client_call(
@repository.storage,
:operation_service,
:user_rebase_confirmable,
......@@ -334,7 +337,7 @@ def user_squash(user, start_sha, end_sha, author, message, time = Time.now.utc)
timestamp: Google::Protobuf::Timestamp.new(seconds: time.to_i)
)
response = GitalyClient.call(
response = gitaly_client_call(
@repository.storage,
:operation_service,
:user_squash,
......@@ -376,7 +379,7 @@ def user_update_submodule(user:, submodule:, commit_sha:, branch:, message:)
timestamp: Google::Protobuf::Timestamp.new(seconds: Time.now.utc.to_i)
)
response = GitalyClient.call(
response = gitaly_client_call(
@repository.storage,
:operation_service,
:user_update_submodule,
......@@ -422,7 +425,7 @@ def user_commit_files(
end
end
response = GitalyClient.call(
response = gitaly_client_call(
@repository.storage, :operation_service, :user_commit_files, req_enum,
timeout: GitalyClient.long_timeout, remote_storage: start_repository&.storage)
......@@ -457,7 +460,7 @@ def user_commit_patches(user, branch_name, patches)
end
end
response = GitalyClient.call(@repository.storage, :operation_service,
response = gitaly_client_call(@repository.storage, :operation_service,
:user_apply_patch, chunks, timeout: GitalyClient.long_timeout)
Gitlab::Git::OperationService::BranchUpdate.from_gitaly(response.branch_update)
......@@ -493,7 +496,7 @@ def call_cherry_pick_or_revert(rpc, user:, commit:, branch_name:, message:, star
dry_run: dry_run
)
response = GitalyClient.call(
response = gitaly_client_call(
@repository.storage,
:operation_service,
:"user_#{rpc}",
......
......@@ -3,16 +3,20 @@
module Gitlab
module GitalyClient
class PraefectInfoService
include WithFeatureFlagActors
def initialize(repository)
@repository = repository
@gitaly_repo = repository.gitaly_repository
@storage = repository.storage
self.repository_actor = repository
end
def replicas
request = Gitaly::RepositoryReplicasRequest.new(repository: @gitaly_repo)
GitalyClient.call(@storage, :praefect_info_service, :repository_replicas, request, timeout: GitalyClient.fast_timeout)
gitaly_client_call(@storage, :praefect_info_service, :repository_replicas, request, timeout: GitalyClient.fast_timeout)
end
end
end
......
......@@ -4,6 +4,7 @@ module Gitlab
module GitalyClient
class RefService
include Gitlab::EncodingHelper
include WithFeatureFlagActors
TAGS_SORT_KEY = {
'name' => Gitaly::FindAllTagsRequest::SortBy::Key::REFNAME,
......@@ -21,17 +22,19 @@ def initialize(repository)
@repository = repository
@gitaly_repo = repository.gitaly_repository
@storage = repository.storage
self.repository_actor = repository
end
def branches
request = Gitaly::FindAllBranchesRequest.new(repository: @gitaly_repo)
response = GitalyClient.call(@storage, :ref_service, :find_all_branches, request, timeout: GitalyClient.fast_timeout)
response = gitaly_client_call(@storage, :ref_service, :find_all_branches, request, timeout: GitalyClient.fast_timeout)
consume_find_all_branches_response(response)
end
def remote_branches(remote_name)
request = Gitaly::FindAllRemoteBranchesRequest.new(repository: @gitaly_repo, remote_name: remote_name)
response = GitalyClient.call(@storage, :ref_service, :find_all_remote_branches, request, timeout: GitalyClient.medium_timeout)
response = gitaly_client_call(@storage, :ref_service, :find_all_remote_branches, request, timeout: GitalyClient.medium_timeout)
consume_find_all_remote_branches_response(remote_name, response)
end
......@@ -41,25 +44,25 @@ def merged_branches(branch_names = [])
merged_only: true,
merged_branches: branch_names.map { |s| encode_binary(s) }
)
response = GitalyClient.call(@storage, :ref_service, :find_all_branches, request, timeout: GitalyClient.fast_timeout)
response = gitaly_client_call(@storage, :ref_service, :find_all_branches, request, timeout: GitalyClient.fast_timeout)
consume_find_all_branches_response(response)
end
def default_branch_name
request = Gitaly::FindDefaultBranchNameRequest.new(repository: @gitaly_repo)
response = GitalyClient.call(@storage, :ref_service, :find_default_branch_name, request, timeout: GitalyClient.fast_timeout)
response = gitaly_client_call(@storage, :ref_service, :find_default_branch_name, request, timeout: GitalyClient.fast_timeout)
Gitlab::Git.branch_name(response.name)
end
def branch_names
request = Gitaly::FindAllBranchNamesRequest.new(repository: @gitaly_repo)
response = GitalyClient.call(@storage, :ref_service, :find_all_branch_names, request, timeout: GitalyClient.fast_timeout)
response = gitaly_client_call(@storage, :ref_service, :find_all_branch_names, request, timeout: GitalyClient.fast_timeout)
consume_refs_response(response) { |name| Gitlab::Git.branch_name(name) }
end
def tag_names
request = Gitaly::FindAllTagNamesRequest.new(repository: @gitaly_repo)
response = GitalyClient.call(@storage, :ref_service, :find_all_tag_names, request, timeout: GitalyClient.fast_timeout)
response = gitaly_client_call(@storage, :ref_service, :find_all_tag_names, request, timeout: GitalyClient.fast_timeout)
consume_refs_response(response) { |name| Gitlab::Git.tag_name(name) }
end
......@@ -74,7 +77,7 @@ def count_branch_names
def local_branches(sort_by: nil, pagination_params: nil)
request = Gitaly::FindLocalBranchesRequest.new(repository: @gitaly_repo, pagination_params: pagination_params)
request.sort_by = sort_local_branches_by_param(sort_by) if sort_by
response = GitalyClient.call(@storage, :ref_service, :find_local_branches, request, timeout: GitalyClient.fast_timeout)
response = gitaly_client_call(@storage, :ref_service, :find_local_branches, request, timeout: GitalyClient.fast_timeout)
consume_find_local_branches_response(response)
end
......@@ -82,13 +85,13 @@ def tags(sort_by: nil, pagination_params: nil)
request = Gitaly::FindAllTagsRequest.new(repository: @gitaly_repo, pagination_params: pagination_params)
request.sort_by = sort_tags_by_param(sort_by) if sort_by
response = GitalyClient.call(@storage, :ref_service, :find_all_tags, request, timeout: GitalyClient.medium_timeout)
response = gitaly_client_call(@storage, :ref_service, :find_all_tags, request, timeout: GitalyClient.medium_timeout)
consume_tags_response(response)
end
def ref_exists?(ref_name)
request = Gitaly::RefExistsRequest.new(repository: @gitaly_repo, ref: encode_binary(ref_name))
response = GitalyClient.call(@storage, :ref_service, :ref_exists, request, timeout: GitalyClient.fast_timeout)
response = gitaly_client_call(@storage, :ref_service, :ref_exists, request, timeout: GitalyClient.fast_timeout)
response.value
rescue GRPC::InvalidArgument => e
raise ArgumentError, e.message
......@@ -100,7 +103,7 @@ def find_branch(branch_name)
name: encode_binary(branch_name)
)
response = GitalyClient.call(@repository.storage, :ref_service, :find_branch, request, timeout: GitalyClient.medium_timeout)
response = gitaly_client_call(@repository.storage, :ref_service, :find_branch, request, timeout: GitalyClient.medium_timeout)
branch = response.branch
return unless branch
......@@ -116,7 +119,7 @@ def find_tag(tag_name)
tag_name: encode_binary(tag_name)
)
response = GitalyClient.call(@repository.storage, :ref_service, :find_tag, request, timeout: GitalyClient.medium_timeout)
response = gitaly_client_call(@repository.storage, :ref_service, :find_tag, request, timeout: GitalyClient.medium_timeout)
tag = response.tag
return unless tag
......@@ -140,7 +143,7 @@ def delete_refs(refs: [], except_with_prefixes: [])
except_with_prefix: except_with_prefixes.map { |r| encode_binary(r) }
)
response = GitalyClient.call(@repository.storage, :ref_service, :delete_refs, request, timeout: GitalyClient.medium_timeout)
response = gitaly_client_call(@repository.storage, :ref_service, :delete_refs, request, timeout: GitalyClient.medium_timeout)
raise Gitlab::Git::Repository::GitError, response.git_error if response.git_error.present?
rescue GRPC::BadStatus => e
......@@ -164,7 +167,7 @@ def tag_names_contains_sha(sha, limit: 0)
limit: limit
)
response = GitalyClient.call(@storage, :ref_service, :list_tag_names_containing_commit, request, timeout: GitalyClient.medium_timeout)
response = gitaly_client_call(@storage, :ref_service, :list_tag_names_containing_commit, request, timeout: GitalyClient.medium_timeout)
consume_ref_contains_sha_response(response, :tag_names)
end
......@@ -176,7 +179,7 @@ def branch_names_contains_sha(sha, limit: 0)
limit: limit
)
response = GitalyClient.call(@storage, :ref_service, :list_branch_names_containing_commit, request, timeout: GitalyClient.medium_timeout)
response = gitaly_client_call(@storage, :ref_service, :list_branch_names_containing_commit, request, timeout: GitalyClient.medium_timeout)
consume_ref_contains_sha_response(response, :branch_names)
end
......@@ -185,7 +188,7 @@ def get_tag_messages(tag_ids)
messages = Hash.new { |h, k| h[k] = +''.b }
current_tag_id = nil
response = GitalyClient.call(@storage, :ref_service, :get_tag_messages, request, timeout: GitalyClient.fast_timeout)
response = gitaly_client_call(@storage, :ref_service, :get_tag_messages, request, timeout: GitalyClient.fast_timeout)
response.each do |rpc_message|
current_tag_id = rpc_message.tag_id if rpc_message.tag_id.present?
......@@ -197,7 +200,7 @@ def get_tag_messages(tag_ids)
def get_tag_signatures(tag_ids)
request = Gitaly::GetTagSignaturesRequest.new(repository: @gitaly_repo, tag_revisions: tag_ids)
response = GitalyClient.call(@repository.storage, :ref_service, :get_tag_signatures, request, timeout: GitalyClient.fast_timeout)
response = gitaly_client_call(@repository.storage, :ref_service, :get_tag_signatures, request, timeout: GitalyClient.fast_timeout)
signatures = Hash.new { |h, k| h[k] = [+''.b, +''.b] }
current_tag_id = nil
......@@ -222,20 +225,20 @@ def list_refs(patterns = [Gitlab::Git::BRANCH_REF_PREFIX])
patterns: patterns
)
response = GitalyClient.call(@storage, :ref_service, :list_refs, request, timeout: GitalyClient.fast_timeout)
response = gitaly_client_call(@storage, :ref_service, :list_refs, request, timeout: GitalyClient.fast_timeout)
consume_list_refs_response(response)
end
def pack_refs
request = Gitaly::PackRefsRequest.new(repository: @gitaly_repo)
GitalyClient.call(@storage, :ref_service, :pack_refs, request, timeout: GitalyClient.long_timeout)
gitaly_client_call(@storage, :ref_service, :pack_refs, request, timeout: GitalyClient.long_timeout)
end
def find_refs_by_oid(oid:, limit:, ref_patterns: nil)
request = Gitaly::FindRefsByOIDRequest.new(repository: @gitaly_repo, sort_field: :refname, oid: oid, limit: limit, ref_patterns: ref_patterns)
response = GitalyClient.call(@storage, :ref_service, :find_refs_by_oid, request, timeout: GitalyClient.medium_timeout)
response = gitaly_client_call(@storage, :ref_service, :find_refs_by_oid, request, timeout: GitalyClient.medium_timeout)
response&.refs&.to_a
end
......
......@@ -4,6 +4,7 @@ module Gitlab
module GitalyClient
class RemoteService
include Gitlab::EncodingHelper
include WithFeatureFlagActors
MAX_MSG_SIZE = 128.kilobytes.freeze
......@@ -24,6 +25,8 @@ def initialize(repository)
@repository = repository
@gitaly_repo = repository.gitaly_repository
@storage = repository.storage
self.repository_actor = repository
end
def find_remote_root_ref(remote_url, authorization)
......@@ -31,7 +34,7 @@ def find_remote_root_ref(remote_url, authorization)
remote_url: remote_url,
http_authorization_header: authorization)
response = GitalyClient.call(@storage, :remote_service,
response = gitaly_client_call(@storage, :remote_service,
:find_remote_root_ref, request, timeout: GitalyClient.medium_timeout)
encode_utf8(response.ref)
......
......@@ -4,6 +4,7 @@ module Gitlab
module GitalyClient
class RepositoryService
include Gitlab::EncodingHelper
include WithFeatureFlagActors
MAX_MSG_SIZE = 128.kilobytes
......@@ -11,57 +12,59 @@ def initialize(repository)
@repository = repository
@gitaly_repo = repository.gitaly_repository
@storage = repository.storage
self.repository_actor = repository
end
def exists?
request = Gitaly::RepositoryExistsRequest.new(repository: @gitaly_repo)
response = GitalyClient.call(@storage, :repository_service, :repository_exists, request, timeout: GitalyClient.fast_timeout)
response = gitaly_client_call(@storage, :repository_service, :repository_exists, request, timeout: GitalyClient.fast_timeout)
response.exists
end
def optimize_repository
request = Gitaly::OptimizeRepositoryRequest.new(repository: @gitaly_repo)
GitalyClient.call(@storage, :repository_service, :optimize_repository, request, timeout: GitalyClient.long_timeout)
gitaly_client_call(@storage, :repository_service, :optimize_repository, request, timeout: GitalyClient.long_timeout)
end
def prune_unreachable_objects
request = Gitaly::PruneUnreachableObjectsRequest.new(repository: @gitaly_repo)
GitalyClient.call(@storage, :repository_service, :prune_unreachable_objects, request, timeout: GitalyClient.long_timeout)
gitaly_client_call(@storage, :repository_service, :prune_unreachable_objects, request, timeout: GitalyClient.long_timeout)
end
def garbage_collect(create_bitmap, prune:)
request = Gitaly::GarbageCollectRequest.new(repository: @gitaly_repo, create_bitmap: create_bitmap, prune: prune)
GitalyClient.call(@storage, :repository_service, :garbage_collect, request, timeout: GitalyClient.long_timeout)
gitaly_client_call(@storage, :repository_service, :garbage_collect, request, timeout: GitalyClient.long_timeout)
end
def repack_full(create_bitmap)
request = Gitaly::RepackFullRequest.new(repository: @gitaly_repo, create_bitmap: create_bitmap)
GitalyClient.call(@storage, :repository_service, :repack_full, request, timeout: GitalyClient.long_timeout)
gitaly_client_call(@storage, :repository_service, :repack_full, request, timeout: GitalyClient.long_timeout)
end
def repack_incremental
request = Gitaly::RepackIncrementalRequest.new(repository: @gitaly_repo)
GitalyClient.call(@storage, :repository_service, :repack_incremental, request, timeout: GitalyClient.long_timeout)
gitaly_client_call(@storage, :repository_service, :repack_incremental, request, timeout: GitalyClient.long_timeout)
end
def repository_size
request = Gitaly::RepositorySizeRequest.new(repository: @gitaly_repo)
response = GitalyClient.call(@storage, :repository_service, :repository_size, request, timeout: GitalyClient.long_timeout)
response = gitaly_client_call(@storage, :repository_service, :repository_size, request, timeout: GitalyClient.long_timeout)
response.size
end
def get_object_directory_size
request = Gitaly::GetObjectDirectorySizeRequest.new(repository: @gitaly_repo)
response = GitalyClient.call(@storage, :repository_service, :get_object_directory_size, request, timeout: GitalyClient.medium_timeout)
response = gitaly_client_call(@storage, :repository_service, :get_object_directory_size, request, timeout: GitalyClient.medium_timeout)
response.size
end
def apply_gitattributes(revision)
request = Gitaly::ApplyGitattributesRequest.new(repository: @gitaly_repo, revision: encode_binary(revision))
GitalyClient.call(@storage, :repository_service, :apply_gitattributes, request, timeout: GitalyClient.fast_timeout)
gitaly_client_call(@storage, :repository_service, :apply_gitattributes, request, timeout: GitalyClient.fast_timeout)
rescue GRPC::InvalidArgument => ex
raise Gitlab::Git::Repository::InvalidRef, ex
end
......@@ -69,7 +72,7 @@ def apply_gitattributes(revision)
def info_attributes
request = Gitaly::GetInfoAttributesRequest.new(repository: @gitaly_repo)
response = GitalyClient.call(@storage, :repository_service, :get_info_attributes, request, timeout: GitalyClient.fast_timeout)
response = gitaly_client_call(@storage, :repository_service, :get_info_attributes, request, timeout: GitalyClient.fast_timeout)
response.each_with_object([]) do |message, attributes|
attributes << message.attributes
end.join
......@@ -103,18 +106,18 @@ def fetch_remote(url, refmap:, ssh_auth:, forced:, no_tags:, timeout:, prune: tr
end
end
GitalyClient.call(@storage, :repository_service, :fetch_remote, request, timeout: GitalyClient.long_timeout)
gitaly_client_call(@storage, :repository_service, :fetch_remote, request, timeout: GitalyClient.long_timeout)
end
# rubocop: enable Metrics/ParameterLists
def create_repository(default_branch = nil)
request = Gitaly::CreateRepositoryRequest.new(repository: @gitaly_repo, default_branch: default_branch)
GitalyClient.call(@storage, :repository_service, :create_repository, request, timeout: GitalyClient.fast_timeout)
gitaly_client_call(@storage, :repository_service, :create_repository, request, timeout: GitalyClient.fast_timeout)
end
def has_local_branches?
request = Gitaly::HasLocalBranchesRequest.new(repository: @gitaly_repo)
response = GitalyClient.call(@storage, :repository_service, :has_local_branches, request, timeout: GitalyClient.fast_timeout)
response = gitaly_client_call(@storage, :repository_service, :has_local_branches, request, timeout: GitalyClient.fast_timeout)
response.value
end
......@@ -125,7 +128,7 @@ def find_merge_base(*revisions)
revisions: revisions.map { |r| encode_binary(r) }
)
response = GitalyClient.call(@storage, :repository_service, :find_merge_base, request, timeout: GitalyClient.fast_timeout)
response = gitaly_client_call(@storage, :repository_service, :find_merge_base, request, timeout: GitalyClient.fast_timeout)
response.base.presence
end
......@@ -135,7 +138,7 @@ def fork_repository(source_repository)
source_repository: source_repository.gitaly_repository
)
GitalyClient.call(
gitaly_client_call(
@storage,
:repository_service,
:create_fork,
......@@ -153,7 +156,7 @@ def import_repository(source, http_authorization_header: '', mirror: false)
mirror: mirror
)
GitalyClient.call(
gitaly_client_call(
@storage,
:repository_service,
:create_repository_from_url,
......@@ -170,7 +173,7 @@ def fetch_source_branch(source_repository, source_branch, local_ref)
target_ref: local_ref.b
)
response = GitalyClient.call(
response = gitaly_client_call(
@storage,
:repository_service,
:fetch_source_branch,
......@@ -184,7 +187,7 @@ def fetch_source_branch(source_repository, source_branch, local_ref)
def fsck
request = Gitaly::FsckRequest.new(repository: @gitaly_repo)
response = GitalyClient.call(@storage, :repository_service, :fsck, request, timeout: GitalyClient.long_timeout)
response = gitaly_client_call(@storage, :repository_service, :fsck, request, timeout: GitalyClient.long_timeout)
if response.error.empty?
["", 0]
......@@ -236,7 +239,7 @@ def create_from_snapshot(http_url, http_auth)
http_auth: http_auth
)
GitalyClient.call(
gitaly_client_call(
@storage,
:repository_service,
:create_repository_from_snapshot,
......@@ -253,11 +256,11 @@ def write_ref(ref_path, ref, old_ref)
)
request.old_revision = old_ref.b unless old_ref.nil?
GitalyClient.call(@storage, :repository_service, :write_ref, request, timeout: GitalyClient.fast_timeout)
gitaly_client_call(@storage, :repository_service, :write_ref, request, timeout: GitalyClient.fast_timeout)
end
def set_full_path(path)
GitalyClient.call(
gitaly_client_call(
@storage,
:repository_service,
:set_full_path,
......@@ -272,7 +275,7 @@ def set_full_path(path)
end
def full_path
response = GitalyClient.call(
response = gitaly_client_call(
@storage,
:repository_service,
:full_path,
......@@ -286,12 +289,12 @@ def full_path
def find_license
request = Gitaly::FindLicenseRequest.new(repository: @gitaly_repo)
GitalyClient.call(@storage, :repository_service, :find_license, request, timeout: GitalyClient.medium_timeout)
gitaly_client_call(@storage, :repository_service, :find_license, request, timeout: GitalyClient.medium_timeout)
end
def calculate_checksum
request = Gitaly::CalculateChecksumRequest.new(repository: @gitaly_repo)
response = GitalyClient.call(@storage, :repository_service, :calculate_checksum, request, timeout: GitalyClient.fast_timeout)
response = gitaly_client_call(@storage, :repository_service, :calculate_checksum, request, timeout: GitalyClient.fast_timeout)
response.checksum.presence
rescue GRPC::DataLoss => e
raise Gitlab::Git::Repository::InvalidRepository, e
......@@ -300,23 +303,23 @@ def calculate_checksum
def raw_changes_between(from, to)
request = Gitaly::GetRawChangesRequest.new(repository: @gitaly_repo, from_revision: from, to_revision: to)
GitalyClient.call(@storage, :repository_service, :get_raw_changes, request, timeout: GitalyClient.fast_timeout)
gitaly_client_call(@storage, :repository_service, :get_raw_changes, request, timeout: GitalyClient.fast_timeout)
end
def search_files_by_name(ref, query, limit: 0, offset: 0)
request = Gitaly::SearchFilesByNameRequest.new(repository: @gitaly_repo, ref: ref, query: query, limit: limit, offset: offset)
GitalyClient.call(@storage, :repository_service, :search_files_by_name, request, timeout: GitalyClient.fast_timeout).flat_map(&:files)
gitaly_client_call(@storage, :repository_service, :search_files_by_name, request, timeout: GitalyClient.fast_timeout).flat_map(&:files)
end
def search_files_by_content(ref, query, options = {})
request = Gitaly::SearchFilesByContentRequest.new(repository: @gitaly_repo, ref: ref, query: query)
response = GitalyClient.call(@storage, :repository_service, :search_files_by_content, request, timeout: GitalyClient.default_timeout)
response = gitaly_client_call(@storage, :repository_service, :search_files_by_content, request, timeout: GitalyClient.default_timeout)
search_results_from_response(response, options)
end
def search_files_by_regexp(ref, filter, limit: 0, offset: 0)
request = Gitaly::SearchFilesByNameRequest.new(repository: @gitaly_repo, ref: ref, query: '.', filter: filter, limit: limit, offset: offset)
GitalyClient.call(@storage, :repository_service, :search_files_by_name, request, timeout: GitalyClient.fast_timeout).flat_map(&:files)
gitaly_client_call(@storage, :repository_service, :search_files_by_name, request, timeout: GitalyClient.fast_timeout).flat_map(&:files)
end
def disconnect_alternates
......@@ -324,19 +327,19 @@ def disconnect_alternates
repository: @gitaly_repo
)
GitalyClient.call(@storage, :object_pool_service, :disconnect_git_alternates, request, timeout: GitalyClient.long_timeout)
gitaly_client_call(@storage, :object_pool_service, :disconnect_git_alternates, request, timeout: GitalyClient.long_timeout)
end
def rename(relative_path)
request = Gitaly::RenameRepositoryRequest.new(repository: @gitaly_repo, relative_path: relative_path)
GitalyClient.call(@storage, :repository_service, :rename_repository, request, timeout: GitalyClient.fast_timeout)
gitaly_client_call(@storage, :repository_service, :rename_repository, request, timeout: GitalyClient.fast_timeout)
end
def remove
request = Gitaly::RemoveRepositoryRequest.new(repository: @gitaly_repo)
GitalyClient.call(@storage, :repository_service, :remove_repository, request, timeout: GitalyClient.long_timeout)
gitaly_client_call(@storage, :repository_service, :remove_repository, request, timeout: GitalyClient.long_timeout)
end
def replicate(source_repository)
......@@ -345,7 +348,7 @@ def replicate(source_repository)
source: source_repository.gitaly_repository
)
GitalyClient.call(
gitaly_client_call(
@storage,
:repository_service,
:replicate_repository,
......@@ -383,7 +386,7 @@ def search_results_from_response(gitaly_response, options = {})
def gitaly_fetch_stream_to_file(save_path, rpc_name, request_class, timeout)
request = request_class.new(repository: @gitaly_repo)
response = GitalyClient.call(
response = gitaly_client_call(
@storage,
:repository_service,
rpc_name,
......@@ -416,7 +419,7 @@ def gitaly_repo_stream_request(file_path, rpc_name, request_class, timeout)
end
end
GitalyClient.call(
gitaly_client_call(
@storage,
:repository_service,
rpc_name,
......
......@@ -13,6 +13,41 @@ module WithFeatureFlagActors
attr_accessor :repository_actor
# gitaly_client_call performs Gitaly calls including collected feature flag actors. The actors are retrieved
# from repository actor and memoized. The service must set `self.repository_actor = a_repository` beforehand.
def gitaly_client_call(*args, **kargs)
return GitalyClient.call(*args, **kargs) unless actors_aware_gitaly_calls?
unless repository_actor
Gitlab::ErrorTracking.track_and_raise_for_dev_exception(
Feature::InvalidFeatureFlagError.new("gitaly_client_call called without setting repository_actor")
)
end
GitalyClient.with_feature_flag_actors(
repository: repository_actor,
user: user_actor,
project: project_actor,
group: group_actor
) do
GitalyClient.call(*args, **kargs)
end
end
# gitaly_feature_flag_actors returns a hash of actors implied from input repository. If actors_aware_gitaly_calls
# flag is not on, this method returns an empty hash.
def gitaly_feature_flag_actors(repository)
return {} unless actors_aware_gitaly_calls?
container = find_repository_container(repository)
{
repository: repository,
user: Feature::Gitaly.user_actor,
project: Feature::Gitaly.project_actor(container),
group: Feature::Gitaly.group_actor(container)
}
end
# Use actor here means the user who originally perform the action. It is collected from ApplicationContext. As
# this information is widely propagated in all entry points, User actor should be available everywhere, even in
# background jobs.
......@@ -35,35 +70,32 @@ def group_actor
end
end
def gitaly_client_call(*args, **kargs)
if Feature.enabled?(:actors_aware_gitaly_calls)
# The order of actors here is significant. Percentage-based actor selection may not work as expected if this
# order changes.
GitalyClient.with_feature_flag_actors(
repository: repository_actor,
user: user_actor,
project: project_actor,
group: group_actor
) do
GitalyClient.call(*args, **kargs)
end
else
GitalyClient.call(*args, **kargs)
end
end
private
def repository_container
strong_memoize(:repository_container) do
next if repository_actor&.gl_repository.blank?
find_repository_container(repository_actor)
end
end
def find_repository_container(repository)
return if repository&.gl_repository.blank?
if repository_actor.container.nil?
identifier = Gitlab::GlRepository::Identifier.parse(repository_actor.gl_repository)
if repository.container.nil?
begin
identifier = Gitlab::GlRepository::Identifier.parse(repository.gl_repository)
identifier.container
else
repository_actor.container
rescue Gitlab::GlRepository::Identifier::InvalidIdentifier
nil
end
else
repository.container
end
end
def actors_aware_gitaly_calls?
Feature.enabled?(:actors_aware_gitaly_calls)
end
end
end
end
......
......@@ -190,6 +190,23 @@
)
expect(result).to be(call_result)
end
context 'when call without repository_actor' do
before do
allow(service).to receive(:repository_actor).and_return(nil)
allow(Gitlab::ErrorTracking).to receive(:track_and_raise_for_dev_exception).and_call_original
end
it 'calls error tracking track_and_raise_for_dev_exception' do
expect do
service.gitaly_client_call(call_arg_1, call_arg_2, karg: call_arg_3)
end.to raise_error /gitaly_client_call called without setting repository_actor/
expect(Gitlab::ErrorTracking).to have_received(:track_and_raise_for_dev_exception).with(
be_a(Feature::InvalidFeatureFlagError)
)
end
end
end
context 'when actors_aware_gitaly_calls not enabled' do
......@@ -206,5 +223,53 @@
expect(result).to be(call_result)
end
end
describe '#gitaly_feature_flag_actors' do
let_it_be(:project) { create(:project) }
let(:repository_actor) { project.repository }
context 'when actors_aware_gitaly_calls flag is enabled' do
let(:user_actor) { instance_double(::User) }
let(:project_actor) { instance_double(Project) }
let(:group_actor) { instance_double(Group) }
before do
stub_feature_flags(actors_aware_gitaly_calls: true)
allow(Feature::Gitaly).to receive(:user_actor).and_return(user_actor)
allow(Feature::Gitaly).to receive(:project_actor).with(project).and_return(project_actor)
allow(Feature::Gitaly).to receive(:group_actor).with(project).and_return(group_actor)
end
it 'returns a hash with collected feature flag actors' do
result = service.gitaly_feature_flag_actors(repository_actor)
expect(result).to eql(
repository: repository_actor,
user: user_actor,
project: project_actor,
group: group_actor
)
expect(Feature::Gitaly).to have_received(:user_actor).with(no_args)
expect(Feature::Gitaly).to have_received(:project_actor).with(project)
expect(Feature::Gitaly).to have_received(:group_actor).with(project)
end
end
context 'when actors_aware_gitaly_calls not enabled' do
before do
stub_feature_flags(actors_aware_gitaly_calls: false)
end
it 'returns an empty hash' do
expect(Feature::Gitaly).not_to receive(:user_actor)
expect(Feature::Gitaly).not_to receive(:project_actor)
expect(Feature::Gitaly).not_to receive(:group_actor)
result = service.gitaly_feature_flag_actors(repository_actor)
expect(result).to eql({})
end
end
end
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment