Commit b3ffcdc8 authored by Jürg Billeter's avatar Jürg Billeter
Browse files

Merge branch 'juerg/cas-batch' into 'master'

_artifactcache/casserver.py: Implement BatchReadBlobs

Closes #632

See merge request !785
parents e0bb71b2 1a7fb3cb
Loading
Loading
Loading
Loading
Loading
+49 −0
Original line number Diff line number Diff line
@@ -38,6 +38,10 @@ from .._context import Context
from .cascache import CASCache


# The default limit for gRPC messages is 4 MiB
_MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024


# Trying to push an artifact that is too large
class ArtifactTooLargeException(Exception):
    pass
@@ -67,6 +71,9 @@ def create_server(repo, *, enable_push):
    remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
        _ContentAddressableStorageServicer(artifactcache), server)

    remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
        _CapabilitiesServicer(), server)

    buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
        _ReferenceStorageServicer(artifactcache, enable_push=enable_push), server)

@@ -229,6 +236,48 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
                d.size_bytes = digest.size_bytes
        return response

    def BatchReadBlobs(self, request, context):
        response = remote_execution_pb2.BatchReadBlobsResponse()
        batch_size = 0

        for digest in request.digests:
            batch_size += digest.size_bytes
            if batch_size > _MAX_BATCH_TOTAL_SIZE_BYTES:
                context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
                return response

            blob_response = response.responses.add()
            blob_response.digest.hash = digest.hash
            blob_response.digest.size_bytes = digest.size_bytes
            try:
                with open(self.cas.objpath(digest), 'rb') as f:
                    if os.fstat(f.fileno()).st_size != digest.size_bytes:
                        blob_response.status.code = grpc.StatusCode.NOT_FOUND
                        continue

                    blob_response.data = f.read(digest.size_bytes)
            except FileNotFoundError:
                blob_response.status.code = grpc.StatusCode.NOT_FOUND

        return response


class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
    def GetCapabilities(self, request, context):
        response = remote_execution_pb2.ServerCapabilities()

        cache_capabilities = response.cache_capabilities
        cache_capabilities.digest_function.append(remote_execution_pb2.SHA256)
        cache_capabilities.action_cache_update_capabilities.update_enabled = False
        cache_capabilities.max_batch_total_size_bytes = _MAX_BATCH_TOTAL_SIZE_BYTES
        cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED

        response.deprecated_api_version.major = 2
        response.low_api_version.major = 2
        response.high_api_version.major = 2

        return response


class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
    def __init__(self, cas, *, enable_push):