Commit da0dade7 authored by Martin Blanchard's avatar Martin Blanchard
Browse files

storage/remote.py: Port to new CAS downloader helper

#79
parent 63832b70
Loading
Loading
Loading
Loading
+6 −24
Original line number Original line Diff line number Diff line
@@ -23,14 +23,10 @@ Forwwards storage requests to a remote storage.
import io
import io
import logging
import logging


import grpc
from buildgrid.client.cas import download, upload

from buildgrid.client.cas import upload
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.rpc import code_pb2
from buildgrid._protos.google.rpc import code_pb2
from buildgrid._protos.google.rpc import status_pb2
from buildgrid._protos.google.rpc import status_pb2
from buildgrid.utils import gen_fetch_blob
from buildgrid.settings import HASH
from buildgrid.settings import HASH


from .storage_abc import StorageABC
from .storage_abc import StorageABC
@@ -44,7 +40,6 @@ class RemoteStorage(StorageABC):
        self.instance_name = instance_name
        self.instance_name = instance_name
        self.channel = channel
        self.channel = channel


        self._stub_bs = bytestream_pb2_grpc.ByteStreamStub(channel)
        self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel)
        self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel)


    def has_blob(self, digest):
    def has_blob(self, digest):
@@ -53,24 +48,11 @@ class RemoteStorage(StorageABC):
        return False
        return False


    def get_blob(self, digest):
    def get_blob(self, digest):
        try:
        with download(self.channel, instance=self.instance_name) as cas:
            fetched_data = io.BytesIO()
            blob = cas.get_blob(digest)
            length = 0
            if blob is not None:

                return io.BytesIO(cas.get_blob(digest))
            for data in gen_fetch_blob(self._stub_bs, digest, self.instance_name):
                length += fetched_data.write(data)

            assert digest.size_bytes == length
            fetched_data.seek(0)
            return fetched_data

        except grpc.RpcError as e:
            if e.code() == grpc.StatusCode.NOT_FOUND:
                pass
            else:
            else:
                self.logger.error(e.details())
                raise

                return None
                return None


    def begin_write(self, digest):
    def begin_write(self, digest):