Skip to content
Snippets Groups Projects
Commit 5163c28a authored by finn's avatar finn
Browse files

Adding `remote` implementation of storage_abc

parent dd2e8d60
No related branches found
No related tags found
Loading
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
RemoteStorage
==================
Forwwards storage requests to a remote storage.
"""
import io
import logging
import grpc
from buildgrid.utils import gen_fetch_blob, gen_write_request_blob
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .storage_abc import StorageABC
class RemoteStorage(StorageABC):
def __init__(self, channel, instance_name=""):
self.logger = logging.getLogger(__name__)
self._instance_name = instance_name
self._stub_bs = bytestream_pb2_grpc.ByteStreamStub(channel)
self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel)
def has_blob(self, digest):
try:
if self.get_blob(digest):
return True
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
pass
else:
raise e
return False
def get_blob(self, digest):
fetched_data = io.BytesIO()
length = 0
for data in gen_fetch_blob(self._stub_bs, digest, self._instance_name):
length += fetched_data.write(data)
if length:
assert digest.size_bytes == length
fetched_data.seek(0)
return fetched_data
else:
return None
def begin_write(self, digest):
return io.BytesIO(digest.SerializeToString())
def commit_write(self, digest, write_session):
write_session.seek(0)
for request in gen_write_request_blob(write_session, digest, self._instance_name):
self._stub_bs.Write(request)
def missing_blobs(self, blobs):
request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=self._instance_name)
for blob in blobs:
request_digest = request.blob_digests.add()
request_digest.hash = blob.hash
request_digest.size_bytes = blob.size_bytes
response = self._stub_cas.FindMissingBlobs(request)
return [x for x in response.missing_blob_digests]
def bulk_update_blobs(self, blobs):
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=self._instance_name)
for digest, data in blobs:
reqs = request.requests.add()
reqs.digest.CopyFrom(digest)
reqs.data = data
response = self._stub_cas.BatchUpdateBlobs(request)
responses = response.responses
# Check everything was sent back, even if order changed
assert ([x.digest for x in request.requests].sort(key=lambda x: x.hash)) == \
([x.digest for x in responses].sort(key=lambda x: x.hash))
return [x.status for x in responses]
......@@ -14,6 +14,7 @@
import os
import uuid
from buildgrid.settings import HASH
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
......@@ -27,10 +28,37 @@ def gen_fetch_blob(stub, digest, instance_name=""):
resource_name = os.path.join(instance_name, 'blobs', digest.hash, str(digest.size_bytes))
request = bytestream_pb2.ReadRequest(resource_name=resource_name,
read_offset=0)
for response in stub.Read(request):
yield response.data
def gen_write_request_blob(digest_bytes, digest, instance_name=""):
""" Generates a bytestream write request
"""
resource_name = os.path.join(instance_name, 'uploads', str(uuid.uuid4()),
'blobs', digest.hash, str(digest.size_bytes))
offset = 0
finished = False
remaining = digest.size_bytes
while not finished:
chunk_size = min(remaining, 64 * 1024)
remaining -= chunk_size
finished = remaining <= 0
request = bytestream_pb2.WriteRequest()
request.resource_name = resource_name
request.write_offset = offset
request.data = digest_bytes.read(chunk_size)
request.finish_write = finished
yield request
offset += chunk_size
def write_fetch_directory(directory, stub, digest, instance_name=""):
""" Given a directory digest, fetches files and writes them to a directory
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment