Skip to content
Snippets Groups Projects
Commit 19ec2ca2 authored by Martin Blanchard's avatar Martin Blanchard
Browse files

cas/service.py: Better parse resource names

parent a172b8e6
No related branches found
No related tags found
Loading
......@@ -24,7 +24,7 @@ import logging
from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
from buildgrid._protos.google.bytestream import bytestream_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
from buildgrid.settings import HASH
from buildgrid.settings import HASH, HASH_LENGTH
class ContentAddressableStorageInstance:
......@@ -71,15 +71,12 @@ class ByteStreamInstance:
def register_instance_with_server(self, instance_name, server):
server.add_bytestream_instance(self, instance_name)
def read(self, path, read_offset, read_limit):
storage = self._storage
if path[0] == "blobs":
path = [""] + path
def read(self, digest_hash, digest_size, read_offset, read_limit):
if len(digest_hash) != HASH_LENGTH or not digest_size.isdigit():
raise InvalidArgumentError("Invalid digest [{}/{}]"
.format(digest_hash, digest_size))
# Parse/verify resource name.
# Read resource names look like "[instance/]blobs/abc123hash/99".
digest = re_pb2.Digest(hash=path[2], size_bytes=int(path[3]))
digest = re_pb2.Digest(hash=digest_hash, size_bytes=int(digest_size))
# Check the given read offset and limit.
if read_offset < 0 or read_offset > digest.size_bytes:
......@@ -95,7 +92,7 @@ class ByteStreamInstance:
raise InvalidArgumentError("Negative read_limit is invalid")
# Read the blob from storage and send its contents to the client.
result = storage.get_blob(digest)
result = self._storage.get_blob(digest)
if result is None:
raise NotFoundError("Blob not found")
......@@ -110,51 +107,35 @@ class ByteStreamInstance:
data=result.read(min(self.BLOCK_SIZE, bytes_remaining)))
bytes_remaining -= self.BLOCK_SIZE
def write(self, requests):
storage = self._storage
def write(self, digest_hash, digest_size, first_block, other_blocks):
if len(digest_hash) != HASH_LENGTH or not digest_size.isdigit():
raise InvalidArgumentError("Invalid digest [{}/{}]"
.format(digest_hash, digest_size))
first_request = next(requests)
path = first_request.resource_name.split("/")
digest = re_pb2.Digest(hash=digest_hash, size_bytes=int(digest_size))
if path[0] == "uploads":
path = [""] + path
digest = re_pb2.Digest(hash=path[4], size_bytes=int(path[5]))
write_session = storage.begin_write(digest)
write_session = self._storage.begin_write(digest)
# Start the write session and write the first request's data.
write_session.write(first_request.data)
hash_ = HASH(first_request.data)
bytes_written = len(first_request.data)
finished = first_request.finish_write
# Handle subsequent write requests.
while not finished:
for request in requests:
if finished:
raise InvalidArgumentError("Write request sent after write finished")
elif request.write_offset != bytes_written:
raise InvalidArgumentError("Invalid write offset")
write_session.write(first_block)
elif request.resource_name and request.resource_name != first_request.resource_name:
raise InvalidArgumentError("Resource name changed mid-write")
computed_hash = HASH(first_block)
bytes_written = len(first_block)
finished = request.finish_write
bytes_written += len(request.data)
if bytes_written > digest.size_bytes:
raise InvalidArgumentError("Wrote too much data to blob")
# Handle subsequent write requests.
for next_block in other_blocks:
write_session.write(next_block)
write_session.write(request.data)
hash_.update(request.data)
computed_hash.update(next_block)
bytes_written += len(next_block)
# Check that the data matches the provided digest.
if bytes_written != digest.size_bytes or not finished:
if bytes_written != digest.size_bytes:
raise NotImplementedError("Cannot close stream before finishing write")
elif hash_.hexdigest() != digest.hash:
elif computed_hash.hexdigest() != digest.hash:
raise InvalidArgumentError("Data does not match hash")
storage.commit_write(digest, write_session)
self._storage.commit_write(digest, write_session)
return bytestream_pb2.WriteResponse(committed_size=bytes_written)
......@@ -21,7 +21,6 @@ Implements the Content Addressable Storage API and ByteStream API.
"""
from itertools import tee
import logging
import grpc
......@@ -115,27 +114,30 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
def Read(self, request, context):
self.__logger.debug("Read request from [%s]", context.peer())
names = request.resource_name.split('/')
try:
path = request.resource_name.split("/")
instance_name = path[0]
instance_name = ''
# Format: "{instance_name}/blobs/{hash}/{size}":
if len(names) < 3 or names[-3] != 'blobs':
raise InvalidArgumentError("Invalid resource name: [{}]"
.format(request.resource_name))
# TODO: Decide on default instance name
if path[0] == "blobs":
if len(path) < 3 or not path[2].isdigit():
raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
instance_name = ""
elif names[0] != 'blobs':
index = names.index('blobs')
instance_name = '/'.join(names[:index])
names = names[index:]
elif path[1] == "blobs":
if len(path) < 4 or not path[3].isdigit():
raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
if len(names) < 3:
raise InvalidArgumentError("Invalid resource name: [{}]"
.format(request.resource_name))
else:
raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
hash_, size_bytes = names[1], names[2]
instance = self._get_instance(instance_name)
yield from instance.read(path,
request.read_offset,
request.read_limit)
yield from instance.read(hash_, size_bytes,
request.read_offset, request.read_limit)
except InvalidArgumentError as e:
self.__logger.error(e)
......@@ -158,31 +160,31 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
def Write(self, requests, context):
self.__logger.debug("Write request from [%s]", context.peer())
try:
requests, request_probe = tee(requests, 2)
first_request = next(request_probe)
path = first_request.resource_name.split("/")
request = next(requests)
names = request.resource_name.split('/')
instance_name = path[0]
try:
instance_name = ''
# Format: "{instance_name}/uploads/{uuid}/blobs/{hash}/{size}/{anything}":
if len(names) < 5 or 'uploads' not in names or 'blobs' not in names:
raise InvalidArgumentError("Invalid resource name: [{}]"
.format(request.resource_name))
# TODO: Sort out no instance name
if path[0] == "uploads":
if len(path) < 5 or path[2] != "blobs" or not path[4].isdigit():
raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
instance_name = ""
elif names[0] != 'uploads':
index = names.index('uploads')
instance_name = '/'.join(names[:index])
names = names[index:]
elif path[1] == "uploads":
if len(path) < 6 or path[3] != "blobs" or not path[5].isdigit():
raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
if len(names) < 5:
raise InvalidArgumentError("Invalid resource name: [{}]"
.format(request.resource_name))
else:
raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
_, hash_, size_bytes = names[1], names[3], names[4]
instance = self._get_instance(instance_name)
response = instance.write(requests)
return response
return instance.write(hash_, size_bytes, request.data,
[request.data for request in requests])
except NotImplementedError as e:
self.__logger.error(e)
......
......@@ -137,7 +137,7 @@ def test_bytestream_write(mocked, instance, extra_data):
bytestream_pb2.WriteRequest(data=b'def', write_offset=3, finish_write=True)
]
response = servicer.Write(requests, context)
response = servicer.Write(iter(requests), context)
assert response.committed_size == 6
assert len(storage.data) == 1
assert (hash_, 6) in storage.data
......@@ -159,7 +159,7 @@ def test_bytestream_write_rejects_wrong_hash(mocked):
bytestream_pb2.WriteRequest(resource_name=resource_name, data=data, finish_write=True)
]
servicer.Write(requests, context)
servicer.Write(iter(requests), context)
context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
assert len(storage.data) is 0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment