Skip to content
Snippets Groups Projects
Commit b251c674 authored by finn's avatar finn
Browse files

Separated out CAS instance from service

parent 5163c28a
No related branches found
No related tags found
Loading
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Storage Instances
=========
Instances of CAS and ByteStream
"""
from buildgrid._protos.google.bytestream import bytestream_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
from ...settings import HASH
class ContentAddressableStorageInstance:
def __init__(self, storage):
self._storage = storage
def find_missing_blobs(self, blob_digests):
storage = self._storage
return re_pb2.FindMissingBlobsResponse(
missing_blob_digests=storage.missing_blobs(blob_digests))
def batch_update_blobs(self, requests):
storage = self._storage
store = []
for request_proto in requests:
store.append((request_proto.digest, request_proto.data))
response = re_pb2.BatchUpdateBlobsResponse()
statuses = storage.bulk_update_blobs(store)
for (digest, _), status in zip(store, statuses):
response_proto = response.responses.add()
response_proto.digest.CopyFrom(digest)
response_proto.status.CopyFrom(status)
return response
class ByteStreamInstance:
BLOCK_SIZE = 1 * 1024 * 1024 # 1 MB block size
def __init__(self, storage):
self._storage = storage
def read(self, path, read_offset, read_limit):
storage = self._storage
if len(path) == 3:
path = [""] + path
# Parse/verify resource name.
# Read resource names look like "[instance/]blobs/abc123hash/99".
digest = re_pb2.Digest(hash=path[2], size_bytes=int(path[3]))
# Check the given read offset and limit.
if read_offset < 0 or read_offset > digest.size_bytes:
raise OutOfRangeError("Read offset out of range")
elif read_limit == 0:
bytes_remaining = digest.size_bytes - read_offset
elif read_limit > 0:
bytes_remaining = read_limit
else:
raise InvalidArgumentError("Negative read_limit is invalid")
# Read the blob from storage and send its contents to the client.
result = storage.get_blob(digest)
if result is None:
raise NotFoundError("Blob not found")
elif result.seekable():
result.seek(read_offset)
else:
result.read(read_offset)
while bytes_remaining > 0:
yield bytestream_pb2.ReadResponse(
data=result.read(min(self.BLOCK_SIZE, bytes_remaining)))
bytes_remaining -= self.BLOCK_SIZE
def write(self, requests):
storage = self._storage
first_request = next(requests)
path = first_request.resource_name.split("/")
if path[0] == "uploads":
path = [""] + path
if len(path) < 6 or path[1] != "uploads" or path[3] != "blobs" or not path[5].isdigit():
raise InvalidArgumentError("Invalid resource name")
digest = re_pb2.Digest(hash=path[4], size_bytes=int(path[5]))
write_session = storage.begin_write(digest)
# Start the write session and write the first request's data.
write_session.write(first_request.data)
hash_ = HASH(first_request.data)
bytes_written = len(first_request.data)
finished = first_request.finish_write
# Handle subsequent write requests.
while not finished:
for request in requests:
if finished:
raise InvalidArgumentError("Write request sent after write finished")
elif request.write_offset != bytes_written:
raise InvalidArgumentError("Invalid write offset")
elif request.resource_name and request.resource_name != first_request.resource_name:
raise InvalidArgumentError("Resource name changed mid-write")
finished = request.finish_write
bytes_written += len(request.data)
if bytes_written > digest.size_bytes:
raise InvalidArgumentError("Wrote too much data to blob")
write_session.write(request.data)
hash_.update(request.data)
# Check that the data matches the provided digest.
if bytes_written != digest.size_bytes or not finished:
raise NotImplementedError("Cannot close stream before finishing write")
elif hash_.hexdigest() != digest.hash:
raise InvalidArgumentError("Data does not match hash")
storage.commit_write(digest, write_session)
return bytestream_pb2.WriteResponse(committed_size=bytes_written)
......@@ -21,131 +21,128 @@ Implements the Content Addressable Storage API and ByteStream API.
"""
from itertools import tee
import logging
import grpc
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc as re_pb2_grpc
from ...settings import HASH
from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
class ContentAddressableStorageService(re_pb2_grpc.ContentAddressableStorageServicer):
def __init__(self, storage):
self._storage = storage
def __init__(self, instances):
self.logger = logging.getLogger(__name__)
self._instances = instances
def FindMissingBlobs(self, request, context):
# Only one instance for now.
storage = self._storage
return re_pb2.FindMissingBlobsResponse(
missing_blob_digests=storage.missing_blobs(request.blob_digests))
try:
instance = self._get_instance(request.instance_name)
return instance.find_missing_blobs(request.blob_digests)
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return re_pb2.FindMissingBlobsResponse()
def BatchUpdateBlobs(self, request, context):
# Only one instance for now.
storage = self._storage
requests = []
for request_proto in request.requests:
requests.append((request_proto.digest, request_proto.data))
response = re_pb2.BatchUpdateBlobsResponse()
for (digest, _), status in zip(requests, storage.bulk_update_blobs(requests)):
response_proto = response.responses.add()
response_proto.digest.CopyFrom(digest)
response_proto.status.CopyFrom(status)
return response
try:
instance = self._get_instance(request.instance_name)
return instance.batch_update_blobs(request.requests)
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
return re_pb2.BatchReadBlobsResponse()
BLOCK_SIZE = 1 * 1024 * 1024 # 1 MB block size
def _get_instance(self, instance_name):
try:
return self._instances[instance_name]
except KeyError:
raise InvalidArgumentError("Invalid instance name: {}".format(instance_name))
class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
def __init__(self, storage):
self._storage = storage
def __init__(self, instances):
self.logger = logging.getLogger(__name__)
self._instances = instances
def Read(self, request, context):
# Only one instance for now.
storage = self._storage
# Parse/verify resource name.
# Read resource names look like "[instance/]blobs/abc123hash/99".
path = request.resource_name.split("/")
if len(path) == 3:
path = [""] + path
if len(path) != 4 or path[1] != "blobs" or not path[3].isdigit():
context.abort(grpc.StatusCode.NOT_FOUND, "Invalid resource name")
# instance_name = path[0]
digest = re_pb2.Digest(hash=path[2], size_bytes=int(path[3]))
# Check the given read offset and limit.
if request.read_offset < 0 or request.read_offset > digest.size_bytes:
context.abort(grpc.StatusCode.OUT_OF_RANGE, "Read offset out of range")
elif request.read_limit == 0:
bytes_remaining = digest.size_bytes - request.read_offset
elif request.read_limit > 0:
bytes_remaining = request.read_limit
else:
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Negative read_limit is invalid")
# Read the blob from storage and send its contents to the client.
result = storage.get_blob(digest)
if result is None:
context.abort(grpc.StatusCode.NOT_FOUND, "Blob not found")
elif result.seekable():
result.seek(request.read_offset)
else:
result.read(request.read_offset)
while bytes_remaining > 0:
yield bytestream_pb2.ReadResponse(
data=result.read(min(self.BLOCK_SIZE, bytes_remaining)))
bytes_remaining -= self.BLOCK_SIZE
def Write(self, request_iterator, context):
# Only one instance for now.
storage = self._storage
requests = iter(request_iterator)
first_request = next(requests)
if first_request.write_offset != 0:
context.abort(grpc.StatusCode.UNIMPLEMENTED, "Nonzero write offset is unsupported")
# Parse/verify resource name.
# Write resource names look like "[instance/]uploads/SOME-GUID/blobs/abc123hash/99".
path = first_request.resource_name.split("/")
if path[0] == "uploads":
path = [""] + path
if len(path) < 6 or path[1] != "uploads" or path[3] != "blobs" or not path[5].isdigit():
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Invalid resource name")
# instance_name = path[0]
digest = re_pb2.Digest(hash=path[4], size_bytes=int(path[5]))
# Start the write session and write the first request's data.
write_session = storage.begin_write(digest)
write_session.write(first_request.data)
hash_ = HASH(first_request.data)
bytes_written = len(first_request.data)
done = first_request.finish_write
# Handle subsequent write requests.
for request in requests:
if done:
context.abort(grpc.StatusCode.INVALID_ARGUMENT,
"Write request sent after write finished")
elif request.write_offset != bytes_written:
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Invalid write offset")
elif request.resource_name and request.resource_name != first_request.resource_name:
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Resource name changed mid-write")
done = request.finish_write
bytes_written += len(request.data)
if bytes_written > digest.size_bytes:
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Wrote too much data to blob")
write_session.write(request.data)
hash_.update(request.data)
# Check that the data matches the provided digest.
if bytes_written != digest.size_bytes or not done:
context.abort(grpc.StatusCode.UNIMPLEMENTED,
"Cannot close stream before finishing write")
elif hash_.hexdigest() != digest.hash:
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Data does not match hash")
storage.commit_write(digest, write_session)
return bytestream_pb2.WriteResponse(committed_size=bytes_written)
try:
path = request.resource_name.split("/")
instance_name = path[0]
if instance_name == "blobs":
# TODO: Make default if no instance_name
instance_name = ""
instance = self._get_instance(instance_name)
yield from instance.read(path,
request.read_offset,
request.read_limit)
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
yield bytestream_pb2.ReadResponse()
except NotFoundError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.NOT_FOUND)
yield bytestream_pb2.ReadResponse()
except OutOfRangeError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.OUT_OF_RANGE)
yield bytestream_pb2.ReadResponse()
def Write(self, requests, context):
try:
requests, request_probe = tee(requests, 2)
first_request = next(request_probe)
path = first_request.resource_name.split("/")
instance_name = path[0]
if instance_name == "uploads":
# TODO: Make default if no instance_name
instance_name = ""
instance = self._get_instance(instance_name)
return instance.write(requests)
except NotImplementedError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
except NotFoundError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.NOT_FOUND)
return bytestream_pb2.WriteResponse()
def _get_instance(self, instance_name):
try:
return self._instances[instance_name]
except KeyError:
raise InvalidArgumentError("Invalid instance name: {}".format(instance_name))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment