Commit 9983fc3a authored by Jürg Billeter's avatar Jürg Billeter

Merge branch 'juerg/cas-1.2' into 'bst-1.2'

CAS: Fix resource_name format for blobs

See merge request !711
parents d02e36b8 d9a97c33
Pipeline #28519489 passed with stage
in 23 seconds
=================
buildstream 1.1.7
=================
o Fix CAS resource_name format
Artifact servers need to be updated.
=================
buildstream 1.1.6
=================
......
......@@ -24,6 +24,7 @@ import os
import signal
import stat
import tempfile
import uuid
import errno
from urllib.parse import urlparse
......@@ -315,8 +316,11 @@ class CASCache(ArtifactCache):
# Upload any blobs missing on the server
skipped_remote = False
for digest in missing_blobs.values():
def request_stream():
resource_name = os.path.join(digest.hash, str(digest.size_bytes))
uuid_ = uuid.uuid4()
resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
digest.hash, str(digest.size_bytes)])
def request_stream(resname):
with open(self.objpath(digest), 'rb') as f:
assert os.fstat(f.fileno()).st_size == digest.size_bytes
offset = 0
......@@ -330,12 +334,12 @@ class CASCache(ArtifactCache):
request.write_offset = offset
# max. 64 kB chunks
request.data = f.read(chunk_size)
request.resource_name = resource_name # pylint: disable=cell-var-from-loop
request.resource_name = resname
request.finish_write = remaining <= 0
yield request
offset += chunk_size
finished = request.finish_write
response = remote.bytestream.Write(request_stream())
response = remote.bytestream.Write(request_stream(resource_name))
request = buildstream_pb2.UpdateReferenceRequest()
request.keys.append(ref)
......@@ -772,7 +776,7 @@ class CASCache(ArtifactCache):
yield from self._required_blobs(dirnode.digest)
def _fetch_blob(self, remote, digest, out):
resource_name = os.path.join(digest.hash, str(digest.size_bytes))
resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
request = bytestream_pb2.ReadRequest()
request.resource_name = resource_name
request.read_offset = 0
......
......@@ -23,6 +23,7 @@ import os
import signal
import sys
import tempfile
import uuid
import click
import grpc
......@@ -130,12 +131,21 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
def Read(self, request, context):
resource_name = request.resource_name
client_digest = _digest_from_resource_name(resource_name)
assert request.read_offset <= client_digest.size_bytes
client_digest = _digest_from_download_resource_name(resource_name)
if client_digest is None:
context.set_code(grpc.StatusCode.NOT_FOUND)
return
if request.read_offset > client_digest.size_bytes:
context.set_code(grpc.StatusCode.OUT_OF_RANGE)
return
try:
with open(self.cas.objpath(client_digest), 'rb') as f:
assert os.fstat(f.fileno()).st_size == client_digest.size_bytes
if os.fstat(f.fileno()).st_size != client_digest.size_bytes:
context.set_code(grpc.StatusCode.NOT_FOUND)
return
if request.read_offset > 0:
f.seek(request.read_offset)
......@@ -163,12 +173,18 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
resource_name = None
with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
for request in request_iterator:
assert not finished
assert request.write_offset == offset
if finished or request.write_offset != offset:
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response
if resource_name is None:
# First request
resource_name = request.resource_name
client_digest = _digest_from_resource_name(resource_name)
client_digest = _digest_from_upload_resource_name(resource_name)
if client_digest is None:
context.set_code(grpc.StatusCode.NOT_FOUND)
return response
try:
_clean_up_cache(self.cas, client_digest.size_bytes)
except ArtifactTooLargeException as e:
......@@ -177,14 +193,20 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
return response
elif request.resource_name:
# If it is set on subsequent calls, it **must** match the value of the first request.
assert request.resource_name == resource_name
if request.resource_name != resource_name:
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response
out.write(request.data)
offset += len(request.data)
if request.finish_write:
assert client_digest.size_bytes == offset
if client_digest.size_bytes != offset:
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response
out.flush()
digest = self.cas.add_object(path=out.name)
assert digest.hash == client_digest.hash
if digest.hash != client_digest.hash:
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response
finished = True
assert finished
......@@ -247,13 +269,48 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
return response
def _digest_from_resource_name(resource_name):
def _digest_from_download_resource_name(resource_name):
parts = resource_name.split('/')
# Accept requests from non-conforming BuildStream 1.1.x clients
if len(parts) == 2:
parts.insert(0, 'blobs')
if len(parts) != 3 or parts[0] != 'blobs':
return None
try:
digest = remote_execution_pb2.Digest()
digest.hash = parts[1]
digest.size_bytes = int(parts[2])
return digest
except ValueError:
return None
def _digest_from_upload_resource_name(resource_name):
parts = resource_name.split('/')
assert len(parts) == 2
digest = remote_execution_pb2.Digest()
digest.hash = parts[0]
digest.size_bytes = int(parts[1])
return digest
# Accept requests from non-conforming BuildStream 1.1.x clients
if len(parts) == 2:
parts.insert(0, 'uploads')
parts.insert(1, str(uuid.uuid4()))
parts.insert(2, 'blobs')
if len(parts) < 5 or parts[0] != 'uploads' or parts[2] != 'blobs':
return None
try:
uuid_ = uuid.UUID(hex=parts[1])
if uuid_.version != 4:
return None
digest = remote_execution_pb2.Digest()
digest.hash = parts[3]
digest.size_bytes = int(parts[4])
return digest
except ValueError:
return None
def _has_object(cas, digest):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment