Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • coverity
  • master
  • sminskyprimu/blake3
  • sotk/consolidate-leases-and-jobs/p2-write-only-leases
  • sotk/consolidate-leases-and-jobs/p3-drop-leases-table
  • sotk/logstream-testing
  • zchen723/skip-scheduler-metrics
  • 0.0.10
  • 0.0.11
  • 0.0.12
  • 0.0.13
  • 0.0.14
  • 0.0.16
  • 0.0.17
  • 0.0.19
  • 0.0.2
  • 0.0.20
  • 0.0.21
  • 0.0.23
  • 0.0.25
  • 0.0.26
  • 0.0.27
  • 0.0.28
  • 0.0.29
  • 0.0.3
  • 0.0.30
  • 0.0.31
  • 0.0.32
  • 0.0.33
  • 0.0.34
  • 0.0.35
  • 0.0.36
  • 0.0.37
  • 0.0.38
  • 0.0.39
  • 0.0.4
  • 0.0.40
  • 0.0.41
  • 0.0.42
  • 0.0.43
  • 0.0.44
  • 0.0.45
  • 0.0.46
  • 0.0.47
  • 0.0.48
  • 0.0.49
  • 0.0.5
  • 0.0.50
  • 0.0.51
  • 0.0.52
  • 0.0.53
  • 0.0.54
  • 0.0.55
  • 0.0.56
  • 0.0.57
  • 0.0.58
  • 0.0.59
  • 0.0.6
  • 0.0.60
  • 0.0.61
  • 0.0.62
  • 0.0.63
  • 0.0.64
  • 0.0.65
  • 0.0.66
  • 0.0.67
  • 0.0.68
  • 0.0.69
  • 0.0.7
  • 0.0.70
  • 0.0.71
  • 0.0.72
  • 0.0.73
  • 0.0.74
  • 0.0.75
  • 0.0.76
  • 0.0.78
  • 0.0.79
  • 0.0.8
  • 0.0.80
  • 0.0.81
  • 0.0.82
  • 0.0.83
  • 0.0.84
  • 0.0.85
  • 0.0.86
  • 0.0.87
  • 0.0.88
  • 0.0.89
  • 0.0.9
  • 0.0.90
  • 0.0.91
  • 0.0.92
  • 0.0.93
  • 0.0.94
  • 0.0.95
  • 0.0.96
  • 0.0.97
  • 0.0.98
  • 0.1.0
  • 0.1.1
  • 0.1.10
  • 0.1.11
  • 0.1.12
  • 0.1.13
  • 0.1.14
  • 0.1.15
107 results

Target

Select target project
  • edbaunton/buildgrid
  • BuildGrid/buildgrid
  • bloomberg/buildgrid
  • devcurmudgeon/buildgrid
  • mhadjimichael/buildgrid
  • jmacarthur/buildgrid
  • rkothur/buildgrid
  • valentindavid/buildgrid
  • jjardon/buildgrid
  • RichKen/buildgrid
  • jbonney/buildgrid
  • onsha_alexander/buildgrid
  • santigl/buildgrid
  • mostynb/buildgrid
  • hoffbrinkle/buildgrid
  • Malinskiy/buildgrid
  • coldtom/buildgrid
  • azeemb_a/buildgrid
  • pointswaves/buildgrid
  • BenjaminSchubert/buildgrid
  • michaellee8/buildgrid
  • anil-anil/buildgrid
  • seanborg/buildgrid
  • jdelong12/buildgrid
  • jclay/buildgrid
  • bweston92/buildgrid
  • zchen723/buildgrid
  • cpratt34/buildgrid
  • armbiant/apache-buildgrid
  • armbiant/android-buildgrid
  • itsme300/buildgrid
  • sbairoliya/buildgrid
32 results
Select Git revision
Show changes
Commits on Source (10)
Showing with 1107 additions and 134 deletions
......@@ -20,7 +20,6 @@ Server command
Create a BuildGrid server.
"""
import asyncio
import sys
import click
......@@ -51,18 +50,14 @@ def start(context, config):
click.echo("ERROR: Could not parse config: {}.\n".format(str(e)), err=True)
sys.exit(-1)
loop = asyncio.get_event_loop()
try:
server.start()
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
click.echo("Stopping server")
server.stop()
loop.close()
def _create_server_from_config(config):
......
......@@ -16,9 +16,13 @@
from enum import Enum
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.buildgrid.v2 import monitoring_pb2
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
# RWAPI enumerations
# From google/devtools/remoteworkers/v1test2/bots.proto:
class BotStatus(Enum):
# Initially unknown state.
BOT_STATUS_UNSPECIFIED = bots_pb2.BotStatus.Value('BOT_STATUS_UNSPECIFIED')
......@@ -45,6 +49,9 @@ class LeaseState(Enum):
CANCELLED = bots_pb2.LeaseState.Value('CANCELLED')
# REAPI enumerations
# From build/bazel/remote/execution/v2/remote_execution.proto:
class OperationStage(Enum):
# Initially unknown stage.
UNKNOWN = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('UNKNOWN')
......@@ -56,3 +63,41 @@ class OperationStage(Enum):
EXECUTING = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('EXECUTING')
# Finished execution.
COMPLETED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('COMPLETED')
# Internal enumerations
# From buildgrid.v2/monitoring.proto:
class LogRecordLevel(Enum):
# Initially unknown level.
NOTSET = monitoring_pb2.LogRecord.Level.Value('NOTSET')
# Debug message severity level.
DEBUG = monitoring_pb2.LogRecord.Level.Value('DEBUG')
# Information message severity level.
INFO = monitoring_pb2.LogRecord.Level.Value('INFO')
# Warning message severity level.
WARNING = monitoring_pb2.LogRecord.Level.Value('WARNING')
# Error message severity level.
ERROR = monitoring_pb2.LogRecord.Level.Value('ERROR')
# Critical message severity level.
CRITICAL = monitoring_pb2.LogRecord.Level.Value('CRITICAL')
class MetricRecordDomain(Enum):
# Initially unknown domain.
UNKNOWN = monitoring_pb2.MetricRecord.Domain.Value('UNKNOWN')
# A server state related metric.
STATE = monitoring_pb2.MetricRecord.Domain.Value('STATE')
# A build execution related metric.
BUILD = monitoring_pb2.MetricRecord.Domain.Value('BUILD')
class MetricRecordType(Enum):
# Initially unknown type.
NONE = monitoring_pb2.MetricRecord.Type.Value('NONE')
# A metric for counting.
COUNTER = monitoring_pb2.MetricRecord.Type.Value('COUNTER')
# A metric for mesuring a duration.
TIMER = monitoring_pb2.MetricRecord.Type.Value('TIMER')
# A metric in arbitrary value.
GAUGE = monitoring_pb2.MetricRecord.Type.Value('GAUGE')
// Copyright (C) 2018 Bloomberg LP
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// <http://www.apache.org/licenses/LICENSE-2.0>
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package buildgrid.v2;
import "google/api/annotations.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
message BusMessage {
// The position of this message in the bus stream.
int64 sequence_number = 1;
// The carried message.
oneof record {
LogRecord log_record = 2;
MetricRecord metric_record = 3;
}
}
message LogRecord {
// When the record has been created.
google.protobuf.Timestamp creation_timestamp = 1;
enum Level {
NOTSET = 0;
// Debug message severity level.
DEBUG = 1;
// Information message severity level.
INFO = 2;
// Warning message severity level.
WARNING = 3;
// Error message severity level.
ERROR = 4;
// Critical message severity level.
CRITICAL = 5;
}
// The domain name for the record.
string domain = 2;
// The severity level of the record.
Level level = 3;
// The human-readable record's message.
string message = 4;
// An optional list of additional metadata.
map<string, string> metadata = 5;
}
message MetricRecord {
// When the metric has been created.
google.protobuf.Timestamp creation_timestamp = 1;
enum Domain {
UNKNOWN = 0;
// A server state related metric.
STATE = 1;
// A build execution related metric.
BUILD = 2;
}
// The domain for the record.
Domain domain = 2;
enum Type {
NONE = 0;
// A metric for counting.
COUNTER = 1;
// A metric for mesuring a duration.
TIMER = 2;
// A metric in arbitrary value.
GAUGE = 3;
}
// The type of metric, see Type.
Type type = 3;
// The name identifying the metric.
string name = 4;
// The carried value, depending on the metric's type.
oneof data {
// Set for Type.COUNTER metrics.
int32 count = 5;
// Set for Type.TIMER metrics.
google.protobuf.Duration duration = 6;
// Set for Type.GAUGE metrics.
int32 value = 7;
}
// An optional list of additional metadata.
map<string, string> metadata = 8;
}
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: buildgrid/v2/monitoring.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from buildgrid._protos.google.api import annotations_pb2 as google_dot_api_dot_annotations__pb2
from google.protobuf import duration_pb2 as google_dot_protobuf_dot_duration__pb2
from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2
DESCRIPTOR = _descriptor.FileDescriptor(
name='buildgrid/v2/monitoring.proto',
package='buildgrid.v2',
syntax='proto3',
serialized_options=None,
serialized_pb=_b('\n\x1d\x62uildgrid/v2/monitoring.proto\x12\x0c\x62uildgrid.v2\x1a\x1cgoogle/api/annotations.proto\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"\x93\x01\n\nBusMessage\x12\x17\n\x0fsequence_number\x18\x01 \x01(\x03\x12-\n\nlog_record\x18\x02 \x01(\x0b\x32\x17.buildgrid.v2.LogRecordH\x00\x12\x33\n\rmetric_record\x18\x03 \x01(\x0b\x32\x1a.buildgrid.v2.MetricRecordH\x00\x42\x08\n\x06record\"\xcc\x02\n\tLogRecord\x12\x36\n\x12\x63reation_timestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0e\n\x06\x64omain\x18\x02 \x01(\t\x12,\n\x05level\x18\x03 \x01(\x0e\x32\x1d.buildgrid.v2.LogRecord.Level\x12\x0f\n\x07message\x18\x04 \x01(\t\x12\x37\n\x08metadata\x18\x05 \x03(\x0b\x32%.buildgrid.v2.LogRecord.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"N\n\x05Level\x12\n\n\x06NOTSET\x10\x00\x12\t\n\x05\x44\x45\x42UG\x10\x01\x12\x08\n\x04INFO\x10\x02\x12\x0b\n\x07WARNING\x10\x03\x12\t\n\x05\x45RROR\x10\x04\x12\x0c\n\x08\x43RITICAL\x10\x05\"\xde\x03\n\x0cMetricRecord\x12\x36\n\x12\x63reation_timestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x31\n\x06\x64omain\x18\x02 \x01(\x0e\x32!.buildgrid.v2.MetricRecord.Domain\x12-\n\x04type\x18\x03 \x01(\x0e\x32\x1f.buildgrid.v2.MetricRecord.Type\x12\x0c\n\x04name\x18\x04 \x01(\t\x12\x0f\n\x05\x63ount\x18\x05 \x01(\x05H\x00\x12-\n\x08\x64uration\x18\x06 \x01(\x0b\x32\x19.google.protobuf.DurationH\x00\x12\x0f\n\x05value\x18\x07 \x01(\x05H\x00\x12:\n\x08metadata\x18\x08 \x03(\x0b\x32(.buildgrid.v2.MetricRecord.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"+\n\x06\x44omain\x12\x0b\n\x07UNKNOWN\x10\x00\x12\t\n\x05STATE\x10\x01\x12\t\n\x05\x42UILD\x10\x02\"3\n\x04Type\x12\x08\n\x04NONE\x10\x00\x12\x0b\n\x07\x43OUNTER\x10\x01\x12\t\n\x05TIMER\x10\x02\x12\t\n\x05GAUGE\x10\x03\x42\x06\n\x04\x64\x61tab\x06proto3')
,
dependencies=[google_dot_api_dot_annotations__pb2.DESCRIPTOR,google_dot_protobuf_dot_duration__pb2.DESCRIPTOR,google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,])
_LOGRECORD_LEVEL = _descriptor.EnumDescriptor(
name='Level',
full_name='buildgrid.v2.LogRecord.Level',
filename=None,
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='NOTSET', index=0, number=0,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='DEBUG', index=1, number=1,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='INFO', index=2, number=2,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='WARNING', index=3, number=3,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='ERROR', index=4, number=4,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='CRITICAL', index=5, number=5,
serialized_options=None,
type=None),
],
containing_type=None,
serialized_options=None,
serialized_start=547,
serialized_end=625,
)
_sym_db.RegisterEnumDescriptor(_LOGRECORD_LEVEL)
_METRICRECORD_DOMAIN = _descriptor.EnumDescriptor(
name='Domain',
full_name='buildgrid.v2.MetricRecord.Domain',
filename=None,
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='UNKNOWN', index=0, number=0,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='STATE', index=1, number=1,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='BUILD', index=2, number=2,
serialized_options=None,
type=None),
],
containing_type=None,
serialized_options=None,
serialized_start=1002,
serialized_end=1045,
)
_sym_db.RegisterEnumDescriptor(_METRICRECORD_DOMAIN)
_METRICRECORD_TYPE = _descriptor.EnumDescriptor(
name='Type',
full_name='buildgrid.v2.MetricRecord.Type',
filename=None,
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='NONE', index=0, number=0,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='COUNTER', index=1, number=1,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='TIMER', index=2, number=2,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='GAUGE', index=3, number=3,
serialized_options=None,
type=None),
],
containing_type=None,
serialized_options=None,
serialized_start=1047,
serialized_end=1098,
)
_sym_db.RegisterEnumDescriptor(_METRICRECORD_TYPE)
_BUSMESSAGE = _descriptor.Descriptor(
name='BusMessage',
full_name='buildgrid.v2.BusMessage',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='sequence_number', full_name='buildgrid.v2.BusMessage.sequence_number', index=0,
number=1, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='log_record', full_name='buildgrid.v2.BusMessage.log_record', index=1,
number=2, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='metric_record', full_name='buildgrid.v2.BusMessage.metric_record', index=2,
number=3, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
_descriptor.OneofDescriptor(
name='record', full_name='buildgrid.v2.BusMessage.record',
index=0, containing_type=None, fields=[]),
],
serialized_start=143,
serialized_end=290,
)
_LOGRECORD_METADATAENTRY = _descriptor.Descriptor(
name='MetadataEntry',
full_name='buildgrid.v2.LogRecord.MetadataEntry',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='key', full_name='buildgrid.v2.LogRecord.MetadataEntry.key', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='value', full_name='buildgrid.v2.LogRecord.MetadataEntry.value', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=_b('8\001'),
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=498,
serialized_end=545,
)
_LOGRECORD = _descriptor.Descriptor(
name='LogRecord',
full_name='buildgrid.v2.LogRecord',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='creation_timestamp', full_name='buildgrid.v2.LogRecord.creation_timestamp', index=0,
number=1, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='domain', full_name='buildgrid.v2.LogRecord.domain', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='level', full_name='buildgrid.v2.LogRecord.level', index=2,
number=3, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='message', full_name='buildgrid.v2.LogRecord.message', index=3,
number=4, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='metadata', full_name='buildgrid.v2.LogRecord.metadata', index=4,
number=5, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[_LOGRECORD_METADATAENTRY, ],
enum_types=[
_LOGRECORD_LEVEL,
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=293,
serialized_end=625,
)
_METRICRECORD_METADATAENTRY = _descriptor.Descriptor(
name='MetadataEntry',
full_name='buildgrid.v2.MetricRecord.MetadataEntry',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='key', full_name='buildgrid.v2.MetricRecord.MetadataEntry.key', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='value', full_name='buildgrid.v2.MetricRecord.MetadataEntry.value', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=_b('8\001'),
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=498,
serialized_end=545,
)
_METRICRECORD = _descriptor.Descriptor(
name='MetricRecord',
full_name='buildgrid.v2.MetricRecord',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='creation_timestamp', full_name='buildgrid.v2.MetricRecord.creation_timestamp', index=0,
number=1, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='domain', full_name='buildgrid.v2.MetricRecord.domain', index=1,
number=2, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='type', full_name='buildgrid.v2.MetricRecord.type', index=2,
number=3, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='name', full_name='buildgrid.v2.MetricRecord.name', index=3,
number=4, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='count', full_name='buildgrid.v2.MetricRecord.count', index=4,
number=5, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='duration', full_name='buildgrid.v2.MetricRecord.duration', index=5,
number=6, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='value', full_name='buildgrid.v2.MetricRecord.value', index=6,
number=7, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='metadata', full_name='buildgrid.v2.MetricRecord.metadata', index=7,
number=8, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[_METRICRECORD_METADATAENTRY, ],
enum_types=[
_METRICRECORD_DOMAIN,
_METRICRECORD_TYPE,
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
_descriptor.OneofDescriptor(
name='data', full_name='buildgrid.v2.MetricRecord.data',
index=0, containing_type=None, fields=[]),
],
serialized_start=628,
serialized_end=1106,
)
_BUSMESSAGE.fields_by_name['log_record'].message_type = _LOGRECORD
_BUSMESSAGE.fields_by_name['metric_record'].message_type = _METRICRECORD
_BUSMESSAGE.oneofs_by_name['record'].fields.append(
_BUSMESSAGE.fields_by_name['log_record'])
_BUSMESSAGE.fields_by_name['log_record'].containing_oneof = _BUSMESSAGE.oneofs_by_name['record']
_BUSMESSAGE.oneofs_by_name['record'].fields.append(
_BUSMESSAGE.fields_by_name['metric_record'])
_BUSMESSAGE.fields_by_name['metric_record'].containing_oneof = _BUSMESSAGE.oneofs_by_name['record']
_LOGRECORD_METADATAENTRY.containing_type = _LOGRECORD
_LOGRECORD.fields_by_name['creation_timestamp'].message_type = google_dot_protobuf_dot_timestamp__pb2._TIMESTAMP
_LOGRECORD.fields_by_name['level'].enum_type = _LOGRECORD_LEVEL
_LOGRECORD.fields_by_name['metadata'].message_type = _LOGRECORD_METADATAENTRY
_LOGRECORD_LEVEL.containing_type = _LOGRECORD
_METRICRECORD_METADATAENTRY.containing_type = _METRICRECORD
_METRICRECORD.fields_by_name['creation_timestamp'].message_type = google_dot_protobuf_dot_timestamp__pb2._TIMESTAMP
_METRICRECORD.fields_by_name['domain'].enum_type = _METRICRECORD_DOMAIN
_METRICRECORD.fields_by_name['type'].enum_type = _METRICRECORD_TYPE
_METRICRECORD.fields_by_name['duration'].message_type = google_dot_protobuf_dot_duration__pb2._DURATION
_METRICRECORD.fields_by_name['metadata'].message_type = _METRICRECORD_METADATAENTRY
_METRICRECORD_DOMAIN.containing_type = _METRICRECORD
_METRICRECORD_TYPE.containing_type = _METRICRECORD
_METRICRECORD.oneofs_by_name['data'].fields.append(
_METRICRECORD.fields_by_name['count'])
_METRICRECORD.fields_by_name['count'].containing_oneof = _METRICRECORD.oneofs_by_name['data']
_METRICRECORD.oneofs_by_name['data'].fields.append(
_METRICRECORD.fields_by_name['duration'])
_METRICRECORD.fields_by_name['duration'].containing_oneof = _METRICRECORD.oneofs_by_name['data']
_METRICRECORD.oneofs_by_name['data'].fields.append(
_METRICRECORD.fields_by_name['value'])
_METRICRECORD.fields_by_name['value'].containing_oneof = _METRICRECORD.oneofs_by_name['data']
DESCRIPTOR.message_types_by_name['BusMessage'] = _BUSMESSAGE
DESCRIPTOR.message_types_by_name['LogRecord'] = _LOGRECORD
DESCRIPTOR.message_types_by_name['MetricRecord'] = _METRICRECORD
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
BusMessage = _reflection.GeneratedProtocolMessageType('BusMessage', (_message.Message,), dict(
DESCRIPTOR = _BUSMESSAGE,
__module__ = 'buildgrid.v2.monitoring_pb2'
# @@protoc_insertion_point(class_scope:buildgrid.v2.BusMessage)
))
_sym_db.RegisterMessage(BusMessage)
LogRecord = _reflection.GeneratedProtocolMessageType('LogRecord', (_message.Message,), dict(
MetadataEntry = _reflection.GeneratedProtocolMessageType('MetadataEntry', (_message.Message,), dict(
DESCRIPTOR = _LOGRECORD_METADATAENTRY,
__module__ = 'buildgrid.v2.monitoring_pb2'
# @@protoc_insertion_point(class_scope:buildgrid.v2.LogRecord.MetadataEntry)
))
,
DESCRIPTOR = _LOGRECORD,
__module__ = 'buildgrid.v2.monitoring_pb2'
# @@protoc_insertion_point(class_scope:buildgrid.v2.LogRecord)
))
_sym_db.RegisterMessage(LogRecord)
_sym_db.RegisterMessage(LogRecord.MetadataEntry)
MetricRecord = _reflection.GeneratedProtocolMessageType('MetricRecord', (_message.Message,), dict(
MetadataEntry = _reflection.GeneratedProtocolMessageType('MetadataEntry', (_message.Message,), dict(
DESCRIPTOR = _METRICRECORD_METADATAENTRY,
__module__ = 'buildgrid.v2.monitoring_pb2'
# @@protoc_insertion_point(class_scope:buildgrid.v2.MetricRecord.MetadataEntry)
))
,
DESCRIPTOR = _METRICRECORD,
__module__ = 'buildgrid.v2.monitoring_pb2'
# @@protoc_insertion_point(class_scope:buildgrid.v2.MetricRecord)
))
_sym_db.RegisterMessage(MetricRecord)
_sym_db.RegisterMessage(MetricRecord.MetadataEntry)
_LOGRECORD_METADATAENTRY._options = None
_METRICRECORD_METADATAENTRY._options = None
# @@protoc_insertion_point(module_scope)
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import ctypes
from enum import Enum
import sys
from google.protobuf import json_format
from buildgrid._protos.buildgrid.v2 import monitoring_pb2
class MonitoringOutputType(Enum):
# Standard output stream.
STDOUT = 'stdout'
# On-disk file.
FILE = 'file'
# UNIX domain socket.
SOCKET = 'socket'
class MonitoringOutputFormat(Enum):
# Protobuf binary format.
BINARY = 'binary'
# JSON format.
JSON = 'json'
class MonitoringBus:
def __init__(self, event_loop,
endpoint_type=MonitoringOutputType.SOCKET, endpoint_location=None,
serialisation_format=MonitoringOutputFormat.BINARY):
self.__event_loop = event_loop
self.__streaming_task = None
self.__message_queue = asyncio.Queue(loop=self.__event_loop)
self.__sequence_number = 1
self.__output_location = None
self.__async_output = False
self.__json_output = False
if endpoint_type == MonitoringOutputType.FILE:
self.__output_location = endpoint_location
elif endpoint_type == MonitoringOutputType.SOCKET:
self.__output_location = endpoint_location
self.__async_output = True
if serialisation_format == MonitoringOutputFormat.JSON:
self.__json_output = True
# --- Public API ---
def start(self):
"""Starts the monitoring bus worker task."""
if self.__streaming_task is not None:
return
self.__streaming_task = asyncio.ensure_future(
self._streaming_worker(), loop=self.__event_loop)
def stop(self):
"""Cancels the monitoring bus worker task."""
if self.__streaming_task is None:
return
self.__streaming_task.cancel()
async def send_record(self, record):
"""Publishes a record onto the bus asynchronously.
Args:
record (Message): The
"""
await self.__message_queue.put(record)
def send_record_nowait(self, record):
"""Publishes a record onto the bus.
Args:
record (Message): The
"""
self.__message_queue.put_nowait(record)
# --- Private API ---
async def _streaming_worker(self):
"""Handles bus messages streaming work."""
async def __streaming_worker(end_points):
record = await self.__message_queue.get()
message = monitoring_pb2.BusMessage()
message.sequence_number = self.__sequence_number
if record.DESCRIPTOR is monitoring_pb2.LogRecord.DESCRIPTOR:
message.log_record.CopyFrom(record)
elif record.DESCRIPTOR is monitoring_pb2.MetricRecord.DESCRIPTOR:
message.metric_record.CopyFrom(record)
else:
return False
if self.__json_output:
blob_message = json_format.MessageToJson(message).encode()
for end_point in end_points:
end_point.write(blob_message)
else:
blob_size = ctypes.c_uint32(message.ByteSize())
blob_message = message.SerializeToString()
for end_point in end_points:
end_point.write(bytes(blob_size))
end_point.write(blob_message)
return True
output_writers, output_file = [], None
async def __client_connected_callback(reader, writer):
output_writers.append(writer)
try:
if self.__async_output and self.__output_location:
await asyncio.start_unix_server(
__client_connected_callback, path=self.__output_location,
loop=self.__event_loop)
while True:
if await __streaming_worker(output_writers):
self.__sequence_number += 1
for writer in output_writers:
await writer.drain()
elif self.__output_location:
output_file = open(self.__output_location, mode='wb')
output_writers.append(output_file)
while True:
if await __streaming_worker(iter(output_file)):
self.__sequence_number += 1
else:
output_writers.append(sys.stdout.buffer)
while True:
if await __streaming_worker(output_writers):
self.__sequence_number += 1
except asyncio.CancelledError:
if output_file is not None:
output_file.close()
elif output_writers:
for writer in output_writers:
writer.close()
await writer.wait_closed()
......@@ -24,7 +24,7 @@ import logging
from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
from buildgrid._protos.google.bytestream import bytestream_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
from buildgrid.settings import HASH
from buildgrid.settings import HASH, HASH_LENGTH
class ContentAddressableStorageInstance:
......@@ -71,15 +71,12 @@ class ByteStreamInstance:
def register_instance_with_server(self, instance_name, server):
server.add_bytestream_instance(self, instance_name)
def read(self, path, read_offset, read_limit):
storage = self._storage
if path[0] == "blobs":
path = [""] + path
def read(self, digest_hash, digest_size, read_offset, read_limit):
if len(digest_hash) != HASH_LENGTH or not digest_size.isdigit():
raise InvalidArgumentError("Invalid digest [{}/{}]"
.format(digest_hash, digest_size))
# Parse/verify resource name.
# Read resource names look like "[instance/]blobs/abc123hash/99".
digest = re_pb2.Digest(hash=path[2], size_bytes=int(path[3]))
digest = re_pb2.Digest(hash=digest_hash, size_bytes=int(digest_size))
# Check the given read offset and limit.
if read_offset < 0 or read_offset > digest.size_bytes:
......@@ -95,7 +92,7 @@ class ByteStreamInstance:
raise InvalidArgumentError("Negative read_limit is invalid")
# Read the blob from storage and send its contents to the client.
result = storage.get_blob(digest)
result = self._storage.get_blob(digest)
if result is None:
raise NotFoundError("Blob not found")
......@@ -110,51 +107,35 @@ class ByteStreamInstance:
data=result.read(min(self.BLOCK_SIZE, bytes_remaining)))
bytes_remaining -= self.BLOCK_SIZE
def write(self, requests):
storage = self._storage
def write(self, digest_hash, digest_size, first_block, other_blocks):
if len(digest_hash) != HASH_LENGTH or not digest_size.isdigit():
raise InvalidArgumentError("Invalid digest [{}/{}]"
.format(digest_hash, digest_size))
first_request = next(requests)
path = first_request.resource_name.split("/")
digest = re_pb2.Digest(hash=digest_hash, size_bytes=int(digest_size))
if path[0] == "uploads":
path = [""] + path
digest = re_pb2.Digest(hash=path[4], size_bytes=int(path[5]))
write_session = storage.begin_write(digest)
write_session = self._storage.begin_write(digest)
# Start the write session and write the first request's data.
write_session.write(first_request.data)
hash_ = HASH(first_request.data)
bytes_written = len(first_request.data)
finished = first_request.finish_write
# Handle subsequent write requests.
while not finished:
for request in requests:
if finished:
raise InvalidArgumentError("Write request sent after write finished")
elif request.write_offset != bytes_written:
raise InvalidArgumentError("Invalid write offset")
write_session.write(first_block)
elif request.resource_name and request.resource_name != first_request.resource_name:
raise InvalidArgumentError("Resource name changed mid-write")
computed_hash = HASH(first_block)
bytes_written = len(first_block)
finished = request.finish_write
bytes_written += len(request.data)
if bytes_written > digest.size_bytes:
raise InvalidArgumentError("Wrote too much data to blob")
# Handle subsequent write requests.
for next_block in other_blocks:
write_session.write(next_block)
write_session.write(request.data)
hash_.update(request.data)
computed_hash.update(next_block)
bytes_written += len(next_block)
# Check that the data matches the provided digest.
if bytes_written != digest.size_bytes or not finished:
if bytes_written != digest.size_bytes:
raise NotImplementedError("Cannot close stream before finishing write")
elif hash_.hexdigest() != digest.hash:
elif computed_hash.hexdigest() != digest.hash:
raise InvalidArgumentError("Data does not match hash")
storage.commit_write(digest, write_session)
self._storage.commit_write(digest, write_session)
return bytestream_pb2.WriteResponse(committed_size=bytes_written)
......@@ -21,7 +21,6 @@ Implements the Content Addressable Storage API and ByteStream API.
"""
from itertools import tee
import logging
import grpc
......@@ -115,27 +114,30 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
def Read(self, request, context):
self.__logger.debug("Read request from [%s]", context.peer())
names = request.resource_name.split('/')
try:
path = request.resource_name.split("/")
instance_name = path[0]
instance_name = ''
# Format: "{instance_name}/blobs/{hash}/{size}":
if len(names) < 3 or names[-3] != 'blobs':
raise InvalidArgumentError("Invalid resource name: [{}]"
.format(request.resource_name))
# TODO: Decide on default instance name
if path[0] == "blobs":
if len(path) < 3 or not path[2].isdigit():
raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
instance_name = ""
elif names[0] != 'blobs':
index = names.index('blobs')
instance_name = '/'.join(names[:index])
names = names[index:]
elif path[1] == "blobs":
if len(path) < 4 or not path[3].isdigit():
raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
if len(names) < 3:
raise InvalidArgumentError("Invalid resource name: [{}]"
.format(request.resource_name))
else:
raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
hash_, size_bytes = names[1], names[2]
instance = self._get_instance(instance_name)
yield from instance.read(path,
request.read_offset,
request.read_limit)
yield from instance.read(hash_, size_bytes,
request.read_offset, request.read_limit)
except InvalidArgumentError as e:
self.__logger.error(e)
......@@ -158,31 +160,31 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
def Write(self, requests, context):
self.__logger.debug("Write request from [%s]", context.peer())
try:
requests, request_probe = tee(requests, 2)
first_request = next(request_probe)
path = first_request.resource_name.split("/")
request = next(requests)
names = request.resource_name.split('/')
instance_name = path[0]
try:
instance_name = ''
# Format: "{instance_name}/uploads/{uuid}/blobs/{hash}/{size}/{anything}":
if len(names) < 5 or 'uploads' not in names or 'blobs' not in names:
raise InvalidArgumentError("Invalid resource name: [{}]"
.format(request.resource_name))
# TODO: Sort out no instance name
if path[0] == "uploads":
if len(path) < 5 or path[2] != "blobs" or not path[4].isdigit():
raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
instance_name = ""
elif names[0] != 'uploads':
index = names.index('uploads')
instance_name = '/'.join(names[:index])
names = names[index:]
elif path[1] == "uploads":
if len(path) < 6 or path[3] != "blobs" or not path[5].isdigit():
raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
if len(names) < 5:
raise InvalidArgumentError("Invalid resource name: [{}]"
.format(request.resource_name))
else:
raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
_, hash_, size_bytes = names[1], names[3], names[4]
instance = self._get_instance(instance_name)
response = instance.write(requests)
return response
return instance.write(hash_, size_bytes, request.data,
[request.data for request in requests])
except NotImplementedError as e:
self.__logger.error(e)
......
......@@ -13,18 +13,21 @@
# limitations under the License.
import asyncio
from concurrent import futures
import logging
import os
import signal
import grpc
from .cas.service import ByteStreamService, ContentAddressableStorageService
from .actioncache.service import ActionCacheService
from .execution.service import ExecutionService
from .operations.service import OperationsService
from .bots.service import BotsService
from .referencestorage.service import ReferenceStorageService
from buildgrid.server.actioncache.service import ActionCacheService
from buildgrid.server.bots.service import BotsService
from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
from buildgrid.server.execution.service import ExecutionService
from buildgrid.server._monitoring import MonitoringBus, MonitoringOutputType, MonitoringOutputFormat
from buildgrid.server.operations.service import OperationsService
from buildgrid.server.referencestorage.service import ReferenceStorageService
class BuildGridServer:
......@@ -34,7 +37,7 @@ class BuildGridServer:
requisite services.
"""
def __init__(self, max_workers=None):
def __init__(self, max_workers=None, monitor=False):
"""Initializes a new :class:`BuildGridServer` instance.
Args:
......@@ -46,9 +49,11 @@ class BuildGridServer:
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
self.__grpc_executor = futures.ThreadPoolExecutor(max_workers)
self.__grpc_server = grpc.server(self.__grpc_executor)
self._server = server
self.__main_loop = asyncio.get_event_loop()
self.__monitoring_bus = None
self._execution_service = None
self._bots_service = None
......@@ -58,15 +63,34 @@ class BuildGridServer:
self._cas_service = None
self._bytestream_service = None
self._is_instrumented = monitor
if self._is_instrumented:
self.__monitoring_bus = MonitoringBus(
self.__main_loop, endpoint_type=MonitoringOutputType.STDOUT,
serialisation_format=MonitoringOutputFormat.JSON)
# --- Public API ---
def start(self):
"""Starts the server.
"""
self._server.start()
"""Starts the BuildGrid server."""
self.__grpc_server.start()
def stop(self, grace=0):
"""Stops the server.
"""
self._server.stop(grace)
if self._is_instrumented:
self.__monitoring_bus.start()
self.__main_loop.add_signal_handler(signal.SIGTERM, self.stop)
self.__main_loop.run_forever()
def stop(self):
"""Stops the BuildGrid server."""
if self._is_instrumented:
self.__monitoring_bus.stop()
self.__main_loop.stop()
self.__grpc_server.stop(None)
def add_port(self, address, credentials):
"""Adds a port to the server.
......@@ -77,14 +101,19 @@ class BuildGridServer:
Args:
address (str): The address with port number.
credentials (:obj:`grpc.ChannelCredentials`): Credentials object.
Returns:
int: Number of the bound port.
"""
if credentials is not None:
self.__logger.info("Adding secure connection on: [%s]", address)
self._server.add_secure_port(address, credentials)
port_number = self.__grpc_server.add_secure_port(address, credentials)
else:
self.__logger.info("Adding insecure connection on [%s]", address)
self._server.add_insecure_port(address)
port_number = self.__grpc_server.add_insecure_port(address)
return port_number
def add_execution_instance(self, instance, instance_name):
"""Adds an :obj:`ExecutionInstance` to the service.
......@@ -96,7 +125,7 @@ class BuildGridServer:
instance_name (str): Instance name.
"""
if self._execution_service is None:
self._execution_service = ExecutionService(self._server)
self._execution_service = ExecutionService(self.__grpc_server)
self._execution_service.add_instance(instance_name, instance)
......@@ -110,7 +139,7 @@ class BuildGridServer:
instance_name (str): Instance name.
"""
if self._bots_service is None:
self._bots_service = BotsService(self._server)
self._bots_service = BotsService(self.__grpc_server)
self._bots_service.add_instance(instance_name, instance)
......@@ -124,7 +153,7 @@ class BuildGridServer:
instance_name (str): Instance name.
"""
if self._operations_service is None:
self._operations_service = OperationsService(self._server)
self._operations_service = OperationsService(self.__grpc_server)
self._operations_service.add_instance(instance_name, instance)
......@@ -138,7 +167,7 @@ class BuildGridServer:
instance_name (str): Instance name.
"""
if self._reference_storage_service is None:
self._reference_storage_service = ReferenceStorageService(self._server)
self._reference_storage_service = ReferenceStorageService(self.__grpc_server)
self._reference_storage_service.add_instance(instance_name, instance)
......@@ -152,7 +181,7 @@ class BuildGridServer:
instance_name (str): Instance name.
"""
if self._action_cache_service is None:
self._action_cache_service = ActionCacheService(self._server)
self._action_cache_service = ActionCacheService(self.__grpc_server)
self._action_cache_service.add_instance(instance_name, instance)
......@@ -166,7 +195,7 @@ class BuildGridServer:
instance_name (str): Instance name.
"""
if self._cas_service is None:
self._cas_service = ContentAddressableStorageService(self._server)
self._cas_service = ContentAddressableStorageService(self.__grpc_server)
self._cas_service.add_instance(instance_name, instance)
......@@ -180,6 +209,12 @@ class BuildGridServer:
instance_name (str): Instance name.
"""
if self._bytestream_service is None:
self._bytestream_service = ByteStreamService(self._server)
self._bytestream_service = ByteStreamService(self.__grpc_server)
self._bytestream_service.add_instance(instance_name, instance)
# --- Public API: Monitoring ---
@property
def is_instrumented(self):
return self._is_instrumented
......@@ -137,7 +137,7 @@ def test_bytestream_write(mocked, instance, extra_data):
bytestream_pb2.WriteRequest(data=b'def', write_offset=3, finish_write=True)
]
response = servicer.Write(requests, context)
response = servicer.Write(iter(requests), context)
assert response.committed_size == 6
assert len(storage.data) == 1
assert (hash_, 6) in storage.data
......@@ -159,7 +159,7 @@ def test_bytestream_write_rejects_wrong_hash(mocked):
bytestream_pb2.WriteRequest(resource_name=resource_name, data=data, finish_write=True)
]
servicer.Write(requests, context)
servicer.Write(iter(requests), context)
context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
assert not storage.data
......
......@@ -13,19 +13,24 @@
# limitations under the License.
from buildgrid._app.settings import parser
from buildgrid._app.commands.cmd_server import _create_server_from_config
from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
from buildgrid.server.actioncache.service import ActionCacheService
from buildgrid.server.execution.service import ExecutionService
from buildgrid.server.operations.service import OperationsService
from buildgrid.server.bots.service import BotsService
from buildgrid.server.referencestorage.service import ReferenceStorageService
import grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
from buildgrid._protos.buildstream.v2 import buildstream_pb2
from buildgrid._protos.buildstream.v2 import buildstream_pb2_grpc
from buildgrid._protos.google.bytestream import bytestream_pb2
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
from buildgrid._protos.google.longrunning import operations_pb2
from buildgrid._protos.google.longrunning import operations_pb2_grpc
from .utils.utils import run_in_subprocess
from .utils.server import serve
config = """
CONFIGURATION = """
server:
- !channel
port: 50051
......@@ -72,24 +77,102 @@ instances:
def test_create_server():
# Actual test function, to be run in a subprocess:
def __test_create_server(queue, config_data):
settings = parser.get_parser().safe_load(config)
server = _create_server_from_config(settings)
def __test_create_server(queue, remote):
# Open a channel to the remote server:
channel = grpc.insecure_channel(remote)
server.start()
server.stop()
try:
stub = remote_execution_pb2_grpc.ExecutionStub(channel)
request = remote_execution_pb2.ExecuteRequest(instance_name='main')
response = next(stub.Execute(request))
assert response.DESCRIPTOR is operations_pb2.Operation.DESCRIPTOR
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNIMPLEMENTED:
queue.put(False)
except AssertionError:
queue.put(False)
try:
stub = remote_execution_pb2_grpc.ActionCacheStub(channel)
request = remote_execution_pb2.GetActionResultRequest(instance_name='main')
response = stub.GetActionResult(request)
assert response.DESCRIPTOR is remote_execution_pb2.ActionResult.DESCRIPTOR
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNIMPLEMENTED:
queue.put(False)
except AssertionError:
queue.put(False)
try:
stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel)
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name='main')
response = stub.BatchUpdateBlobs(request)
assert response.DESCRIPTOR is remote_execution_pb2.BatchUpdateBlobsResponse.DESCRIPTOR
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNIMPLEMENTED:
queue.put(False)
except AssertionError:
queue.put(False)
try:
stub = buildstream_pb2_grpc.ReferenceStorageStub(channel)
request = buildstream_pb2.GetReferenceRequest(instance_name='main')
response = stub.GetReference(request)
assert response.DESCRIPTOR is buildstream_pb2.GetReferenceResponse.DESCRIPTOR
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNIMPLEMENTED:
queue.put(False)
except AssertionError:
queue.put(False)
try:
assert isinstance(server._execution_service, ExecutionService)
assert isinstance(server._operations_service, OperationsService)
assert isinstance(server._bots_service, BotsService)
assert isinstance(server._reference_storage_service, ReferenceStorageService)
assert isinstance(server._action_cache_service, ActionCacheService)
assert isinstance(server._cas_service, ContentAddressableStorageService)
assert isinstance(server._bytestream_service, ByteStreamService)
stub = bytestream_pb2_grpc.ByteStreamStub(channel)
request = bytestream_pb2.ReadRequest()
response = stub.Read(request)
assert next(response).DESCRIPTOR is bytestream_pb2.ReadResponse.DESCRIPTOR
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNIMPLEMENTED:
queue.put(False)
except AssertionError:
queue.put(False)
else:
queue.put(True)
assert run_in_subprocess(__test_create_server, config)
try:
stub = operations_pb2_grpc.OperationsStub(channel)
request = operations_pb2.ListOperationsRequest(name='main')
response = stub.ListOperations(request)
assert response.DESCRIPTOR is operations_pb2.ListOperationsResponse.DESCRIPTOR
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNIMPLEMENTED:
queue.put(False)
except AssertionError:
queue.put(False)
try:
stub = bots_pb2_grpc.BotsStub(channel)
request = bots_pb2.CreateBotSessionRequest()
response = stub.CreateBotSession(request)
assert response.DESCRIPTOR is bots_pb2.BotSession.DESCRIPTOR
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNIMPLEMENTED:
queue.put(False)
except AssertionError:
queue.put(False)
queue.put(True)
with serve(CONFIGURATION) as server:
assert run_in_subprocess(__test_create_server, server.remote)
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from contextlib import contextmanager
import multiprocessing
import signal
import pytest_cov
from buildgrid._app.settings import parser
from buildgrid.server.instance import BuildGridServer
@contextmanager
def serve(configuration):
server = Server(configuration)
try:
yield server
finally:
server.quit()
class Server:
def __init__(self, configuration):
self.configuration = configuration
self.__queue = multiprocessing.Queue()
self.__process = multiprocessing.Process(
target=Server.serve,
args=(self.__queue, self.configuration))
self.__process.start()
self.port = self.__queue.get()
self.remote = 'localhost:{}'.format(self.port)
@classmethod
def serve(cls, queue, configuration):
pytest_cov.embed.cleanup_on_sigterm()
server = BuildGridServer()
def __signal_handler(signum, frame):
server.stop()
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, __signal_handler)
instances = parser.get_parser().safe_load(configuration)['instances']
for instance in instances:
instance_name = instance['name']
services = instance['services']
for service in services:
service.register_instance_with_server(instance_name, server)
port = server.add_port('localhost:0', None)
queue.put(port)
server.start()
def quit(self):
if self.__process:
self.__process.terminate()
self.__process.join()