Commit bb75626c authored by bst-marge-bot's avatar bst-marge-bot

Merge branch 'raoul/965-AaaP-service' into 'master'

Artifact as a Proto: protos and service

Closes #965

See merge request !1259
parents 19bbc4c4 cdbd98e1
Pipeline #55802718 failed with stage
in 1200 minutes and 1 second
......@@ -28,12 +28,14 @@ import errno
import threading
import grpc
from google.protobuf.message import DecodeError
import click
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
from .._protos.google.rpc import code_pb2
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \
artifact_pb2, artifact_pb2_grpc
from .._exceptions import CASError
......@@ -62,6 +64,7 @@ def create_server(repo, *, enable_push,
max_head_size=int(10e9),
min_head_size=int(2e9)):
cas = CASCache(os.path.abspath(repo))
artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs')
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
......@@ -81,6 +84,9 @@ def create_server(repo, *, enable_push,
buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
_ReferenceStorageServicer(cas, enable_push=enable_push), server)
artifact_pb2_grpc.add_ArtifactServiceServicer_to_server(
_ArtifactServicer(cas, artifactdir), server)
return server
......@@ -405,6 +411,81 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
return response
class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
def __init__(self, cas, artifactdir):
super().__init__()
self.cas = cas
self.artifactdir = artifactdir
os.makedirs(artifactdir, exist_ok=True)
def GetArtifact(self, request, context):
artifact_path = os.path.join(self.artifactdir, request.cache_key)
if not os.path.exists(artifact_path):
context.abort(grpc.StatusCode.NOT_FOUND, "Artifact proto not found")
artifact = artifact_pb2.Artifact()
with open(artifact_path, 'rb') as f:
artifact.ParseFromString(f.read())
files_digest = artifact.files
# Now update mtimes of files present.
try:
self.cas.update_tree_mtime(files_digest)
except FileNotFoundError:
os.unlink(artifact_path)
context.abort(grpc.StatusCode.NOT_FOUND,
"Artifact files incomplete")
except DecodeError:
context.abort(grpc.StatusCode.NOT_FOUND,
"Artifact files not valid")
return artifact
def UpdateArtifact(self, request, context):
artifact = request.artifact
# Check that the files specified are in the CAS
self._check_directory("files", artifact.files, context)
# Unset protocol buffers don't evaluated to False but do return empty
# strings, hence str()
if str(artifact.buildtree):
self._check_directory("buildtree", artifact.buildtree, context)
if str(artifact.public_data):
self._check_file("public data", artifact.public_data, context)
for log_file in artifact.logs:
self._check_file("log digest", log_file.digest, context)
# Add the artifact proto to the cas
artifact_path = os.path.join(self.artifactdir, request.cache_key)
os.makedirs(os.path.dirname(artifact_path), exist_ok=True)
with open(artifact_path, 'wb') as f:
f.write(artifact.SerializeToString())
return artifact
def _check_directory(self, name, digest, context):
try:
directory = remote_execution_pb2.Directory()
with open(self.cas.objpath(digest), 'rb') as f:
directory.ParseFromString(f.read())
except FileNotFoundError:
context.abort(grpc.StatusCode.FAILED_PRECONDITION,
"Artifact {} specified but no files found".format(name))
except DecodeError:
context.abort(grpc.StatusCode.FAILED_PRECONDITION,
"Artifact {} specified but directory not found".format(name))
def _check_file(self, name, digest, context):
if not os.path.exists(self.cas.objpath(digest)):
context.abort(grpc.StatusCode.FAILED_PRECONDITION,
"Artifact {} specified but not found".format(name))
def _digest_from_download_resource_name(resource_name):
parts = resource_name.split('/')
......
// Copyright 2019 Bloomberg Finance LP
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors
// Raoul Hidalgo Charman <raoul.hidalgo.charman@gmail.com>
syntax = "proto3";
package buildstream.v2;
import "build/bazel/remote/execution/v2/remote_execution.proto";
import "google/api/annotations.proto";
service ArtifactService {
// Retrieves an Artifact message
//
// Errors:
// * `NOT_FOUND`: Artifact not found on server
rpc GetArtifact(GetArtifactRequest) returns (Artifact) {}
// Sets an Artifact message
//
// Errors:
// * `FAILED_PRECONDITION`: Files specified in upload aren't present in CAS
rpc UpdateArtifact(UpdateArtifactRequest) returns (Artifact) {}
}
message Artifact {
// This version number must always be present and can be used to
// further indicate presence or absence of parts of the proto at a
// later date. It only needs incrementing if a change to what is
// *mandatory* changes.
int32 version = 1;
// Core metadata
bool build_success = 2;
string build_error = 3; // optional
string build_error_details = 4;
string strong_key = 5;
string weak_key = 6;
bool was_workspaced = 7;
// digest of a Directory
build.bazel.remote.execution.v2.Digest files = 8;
// Information about the build dependencies
message Dependency {
string element_name = 1;
string cache_key = 2;
bool was_workspaced = 3;
};
repeated Dependency build_deps = 9;
// The public data is a yaml file which is stored into the CAS
// Digest is of a directory
build.bazel.remote.execution.v2.Digest public_data = 10;
// The logs are stored in the CAS
message LogFile {
string name = 1;
// digest of a file
build.bazel.remote.execution.v2.Digest digest = 2;
};
repeated LogFile logs = 11; // Zero or more log files here
// digest of a directory
build.bazel.remote.execution.v2.Digest buildtree = 12; // optional
}
message GetArtifactRequest {
string instance_name = 1;
string cache_key = 2;
}
message UpdateArtifactRequest {
string instance_name = 1;
string cache_key = 2;
Artifact artifact = 3;
}
This diff is collapsed.
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc
from buildstream._protos.buildstream.v2 import artifact_pb2 as buildstream_dot_v2_dot_artifact__pb2
class ArtifactServiceStub(object):
# missing associated documentation comment in .proto file
pass
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.GetArtifact = channel.unary_unary(
'/buildstream.v2.ArtifactService/GetArtifact',
request_serializer=buildstream_dot_v2_dot_artifact__pb2.GetArtifactRequest.SerializeToString,
response_deserializer=buildstream_dot_v2_dot_artifact__pb2.Artifact.FromString,
)
self.UpdateArtifact = channel.unary_unary(
'/buildstream.v2.ArtifactService/UpdateArtifact',
request_serializer=buildstream_dot_v2_dot_artifact__pb2.UpdateArtifactRequest.SerializeToString,
response_deserializer=buildstream_dot_v2_dot_artifact__pb2.Artifact.FromString,
)
class ArtifactServiceServicer(object):
# missing associated documentation comment in .proto file
pass
def GetArtifact(self, request, context):
"""Retrieves an Artifact message
Errors:
* `NOT_FOUND`: Artifact not found on server
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def UpdateArtifact(self, request, context):
"""Sets an Artifact message
Errors:
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_ArtifactServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'GetArtifact': grpc.unary_unary_rpc_method_handler(
servicer.GetArtifact,
request_deserializer=buildstream_dot_v2_dot_artifact__pb2.GetArtifactRequest.FromString,
response_serializer=buildstream_dot_v2_dot_artifact__pb2.Artifact.SerializeToString,
),
'UpdateArtifact': grpc.unary_unary_rpc_method_handler(
servicer.UpdateArtifact,
request_deserializer=buildstream_dot_v2_dot_artifact__pb2.UpdateArtifactRequest.FromString,
response_serializer=buildstream_dot_v2_dot_artifact__pb2.Artifact.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'buildstream.v2.ArtifactService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
#
# Copyright (C) 2019 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>
#
import os
import pytest
from urllib.parse import urlparse
import grpc
from buildstream._protos.buildstream.v2.artifact_pb2 \
import Artifact, GetArtifactRequest, UpdateArtifactRequest
from buildstream._protos.buildstream.v2.artifact_pb2_grpc import ArtifactServiceStub
from buildstream._protos.build.bazel.remote.execution.v2 \
import remote_execution_pb2 as re_pb2
from buildstream._protos.build.bazel.remote.execution.v2 \
import remote_execution_pb2_grpc as re_pb2_grpc
from buildstream import utils
from tests.testutils.artifactshare import create_artifact_share
def test_artifact_get_not_found(tmpdir):
sharedir = os.path.join(str(tmpdir), "share")
with create_artifact_share(sharedir) as share:
# set up artifact service stub
url = urlparse(share.repo)
channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
artifact_stub = ArtifactServiceStub(channel)
# Run GetArtifact and check it throws a not found error
request = GetArtifactRequest()
request.cache_key = "@artifact/something/not_there"
try:
artifact_stub.GetArtifact(request)
except grpc.RpcError as e:
assert e.code() == grpc.StatusCode.NOT_FOUND
assert e.details() == "Artifact proto not found"
else:
assert False
# Successfully getting the artifact
@pytest.mark.parametrize("files", ["present", "absent", "invalid"])
def test_update_artifact(tmpdir, files):
sharedir = os.path.join(str(tmpdir), "share")
with create_artifact_share(sharedir) as share:
url = urlparse(share.repo)
channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
artifact_stub = ArtifactServiceStub(channel)
# initialise an artifact
artifact = Artifact()
artifact.version = 0
artifact.build_success = True
artifact.strong_key = "abcdefghijklmnop"
artifact.files.hash = "hashymchashash"
artifact.files.size_bytes = 10
# put files object
if files == "present":
directory = re_pb2.Directory()
digest = share.cas.add_object(buffer=directory.SerializeToString())
elif files == "invalid":
digest = share.cas.add_object(buffer="abcdefghijklmnop".encode("utf-8"))
elif files == "absent":
digest = utils._message_digest("abcdefghijklmnop".encode("utf-8"))
artifact.files.CopyFrom(digest)
# Put it in the artifact share with an UpdateArtifactRequest
request = UpdateArtifactRequest()
request.artifact.CopyFrom(artifact)
request.cache_key = "a-cache-key"
# should return the same artifact back
if files == "present":
response = artifact_stub.UpdateArtifact(request)
assert response == artifact
else:
try:
artifact_stub.UpdateArtifact(request)
except grpc.RpcError as e:
assert e.code() == grpc.StatusCode.FAILED_PRECONDITION
if files == "absent":
assert e.details() == "Artifact files specified but no files found"
elif files == "invalid":
assert e.details() == "Artifact files specified but directory not found"
return
# If we uploaded the artifact check GetArtifact
request = GetArtifactRequest()
request.cache_key = "a-cache-key"
response = artifact_stub.GetArtifact(request)
assert response == artifact
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment