Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • edbaunton/buildgrid
  • BuildGrid/buildgrid
  • bloomberg/buildgrid
  • devcurmudgeon/buildgrid
  • mhadjimichael/buildgrid
  • jmacarthur/buildgrid
  • rkothur/buildgrid
  • valentindavid/buildgrid
  • jjardon/buildgrid
  • RichKen/buildgrid
  • jbonney/buildgrid
  • onsha_alexander/buildgrid
  • santigl/buildgrid
  • mostynb/buildgrid
  • hoffbrinkle/buildgrid
  • Malinskiy/buildgrid
  • coldtom/buildgrid
  • azeemb_a/buildgrid
  • pointswaves/buildgrid
  • BenjaminSchubert/buildgrid
  • michaellee8/buildgrid
  • anil-anil/buildgrid
  • seanborg/buildgrid
  • jdelong12/buildgrid
  • jclay/buildgrid
  • bweston92/buildgrid
  • zchen723/buildgrid
  • cpratt34/buildgrid
  • armbiant/apache-buildgrid
  • armbiant/android-buildgrid
  • itsme300/buildgrid
  • sbairoliya/buildgrid
32 results
Show changes
Commits on Source (3)
......@@ -30,7 +30,8 @@ from buildgrid.client.authentication import setup_channel
from buildgrid.client.cas import download, upload
from buildgrid._exceptions import InvalidArgumentError
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid.utils import create_digest
from buildgrid.settings import REQUEST_METADATA_TOOL_NAME, REQUEST_METADATA_TOOL_VERSION
from buildgrid.utils import create_digest, request_metadata_header_entry
from ..cli import pass_context
......@@ -48,8 +49,17 @@ from ..cli import pass_context
help="Public server certificate for TLS (PEM-encoded).")
@click.option('--instance-name', type=click.STRING, default=None, show_default=True,
help="Targeted farm instance name.")
@click.option('-t', '--tool-name', type=str, default=REQUEST_METADATA_TOOL_NAME,
help='Tool name.')
@click.option('-n', '--tool-version', type=str, default=REQUEST_METADATA_TOOL_VERSION,
help='Tool version.')
@click.option('-a', '--action-id', type=str, help='Action ID.')
@click.option('-i', '--invocation-id', type=str, help='Tool invocation ID.')
@click.option('-c', '--correlation-id', type=str, help='Correlated invocation ID.')
@pass_context
def cli(context, remote, instance_name, auth_token, client_key, client_cert, server_cert):
def cli(context, remote, instance_name, auth_token, client_key, client_cert,
server_cert, tool_name, tool_version, action_id, invocation_id,
correlation_id):
"""Entry point for the bgd-execute CLI command group."""
try:
context.channel, _ = setup_channel(remote, auth_token=auth_token,
......@@ -62,6 +72,14 @@ def cli(context, remote, instance_name, auth_token, client_key, client_cert, ser
context.instance_name = instance_name
request_metadata = request_metadata_header_entry(tool_name=tool_name,
tool_version=tool_version,
action_id=action_id,
tool_invocation_id=invocation_id,
correlated_invocations_id=correlation_id)
context.request_metadata = request_metadata
@cli.command('request-dummy', short_help="Send a dummy action.")
@click.option('--number', type=click.INT, default=1, show_default=True,
......@@ -85,12 +103,12 @@ def request_dummy(context, number, wait_for_completion):
action_digest=action_digest,
skip_cache_lookup=True)
print(context.request_metadata)
responses = []
for _ in range(0, number):
responses.append(stub.Execute(request))
responses.append(stub.Execute(request, metadata=context.request_metadata))
for response in responses:
if wait_for_completion:
result = None
for stream in response:
......@@ -121,6 +139,7 @@ def run_command(context, input_root, commands, output_file, output_directory,
stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
output_executables = []
with upload(context.channel, instance=context.instance_name) as uploader:
command = remote_execution_pb2.Command()
......@@ -157,7 +176,8 @@ def run_command(context, input_root, commands, output_file, output_directory,
request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name,
action_digest=action_digest,
skip_cache_lookup=True)
response = stub.Execute(request)
response = stub.Execute(request, metadata=context.request_metadata)
stream = None
for stream in response:
......
......@@ -27,9 +27,11 @@ from functools import partial
import grpc
from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, CancelledError
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.longrunning import operations_pb2
from buildgrid.server.peer import Peer
from buildgrid.server._authentication import AuthContext, authorize
from buildgrid.settings import REQUEST_METADATA_HEADER_NAME
class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
......@@ -94,7 +96,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
instance_name = request.instance_name
message_queue = queue.Queue()
peer = context.peer()
peer_id = context.peer()
request_metadata = self._context_extract_request_metadata(context)
peer = Peer(peer_id=peer_id, request_metadata=request_metadata)
try:
instance = self._get_instance(instance_name)
......@@ -102,8 +107,8 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
job_name = instance.execute(request.action_digest,
request.skip_cache_lookup)
operation_name = instance.register_job_peer(job_name,
peer, message_queue)
operation_name = instance.register_job_peer(job_name, peer,
message_queue)
context.add_callback(partial(self._rpc_termination_callback,
peer, instance_name, operation_name))
......@@ -161,8 +166,7 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
try:
instance = self._get_instance(instance_name)
instance.register_operation_peer(operation_name,
peer, message_queue)
instance.register_operation_peer(operation_name, peer, message_queue)
context.add_callback(partial(self._rpc_termination_callback,
peer, instance_name, operation_name))
......@@ -231,3 +235,21 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
except KeyError:
raise InvalidArgumentError("Instance doesn't exist on server: [{}]".format(name))
@classmethod
def _context_extract_request_metadata(cls, context):
"""Given a context object, extract the RequestMetadata header
values if they are present. If they were not provided,
returns None.
"""
invocation_metadata = context.invocation_metadata()
request_metadata_entry = next((entry for entry in invocation_metadata
if entry.key == REQUEST_METADATA_HEADER_NAME),
None)
if not request_metadata_entry:
return None
request_metadata = remote_execution_pb2.RequestMetadata()
request_metadata.ParseFromString(request_metadata_entry.value)
return request_metadata
......@@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
import logging
from threading import Lock
import uuid
from google.protobuf import duration_pb2, timestamp_pb2
......@@ -62,6 +62,9 @@ class Job:
self._platform_requirements = platform_requirements \
if platform_requirements else dict()
self.__peers_lock = Lock()
self.__peers = set()
self._done = False
def __lt__(self, other):
......@@ -175,7 +178,7 @@ class Job:
"""Subscribes to a new job's :class:`Operation` stage changes.
Args:
peer (str): a unique string identifying the client.
peer (Peer): an object that represents the client.
message_queue (queue.Queue): the event queue to register.
Returns:
......@@ -194,10 +197,13 @@ class Job:
self._name, new_operation.name)
self.__operations_by_name[new_operation.name] = new_operation
self.__operations_by_peer[peer] = new_operation
self.__operations_message_queues[peer] = message_queue
self.__operations_by_peer[peer.uid] = new_operation
self.__operations_message_queues[peer.uid] = message_queue
self._send_operations_updates(peers=[peer.uid])
self._send_operations_updates(peers=[peer])
with self.__peers_lock:
self.__peers.add(peer)
return new_operation.name
......@@ -206,7 +212,7 @@ class Job:
Args:
operation_name (str): an existing operation's name to subscribe to.
peer (str): a unique string identifying the client.
peer (Peer): an object that represents the client.
message_queue (queue.Queue): the event queue to register.
Returns:
......@@ -222,18 +228,20 @@ class Job:
raise NotFoundError("Operation name does not exist: [{}]"
.format(operation_name))
self.__operations_by_peer[peer] = operation
self.__operations_message_queues[peer] = message_queue
self.__operations_by_peer[peer.uid] = operation
self.__operations_message_queues[peer.uid] = message_queue
self._send_operations_updates(peers=[peer])
self._send_operations_updates(peers=[peer.uid])
with self.__peers_lock:
self.__peers.add(peer)
def unregister_operation_peer(self, operation_name, peer):
"""Unsubscribes to the job's :class:`Operation` stage change.
Args:
operation_name (str): an existing operation's name to unsubscribe from.
peer (str): a unique string identifying the client.
peer (Peer): an object that represents the client.
Raises:
NotFoundError: If no operation with `operation_name` exists.
"""
......@@ -244,10 +252,10 @@ class Job:
raise NotFoundError("Operation name does not exist: [{}]"
.format(operation_name))
if peer in self.__operations_message_queues:
del self.__operations_message_queues[peer]
if peer.uid in self.__operations_message_queues:
del self.__operations_message_queues[peer.uid]
del self.__operations_by_peer[peer]
del self.__operations_by_peer[peer.uid]
# Drop the operation if nobody is watching it anymore:
if operation not in self.__operations_by_peer.values():
......@@ -258,6 +266,9 @@ class Job:
self.__logger.debug("Operation deleted for job [%s]: [%s]",
self._name, operation.name)
with self.__peers_lock:
self.__peers.remove(peer)
def list_operations(self):
"""Lists the :class:`Operation` related to a job.
......
# Copyright (C) 2019 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class Peer:
"""Represents a client during a session."""
def __init__(self, peer_id, token=None, request_metadata=None):
self._id = peer_id # This uniquely identifies a client
self._token = token
self.__request_metadata = request_metadata
def __eq__(self, other):
if isinstance(other, Peer):
return self.uid == other.uid
return False
def __hash__(self):
return hash(self.uid) # This string is unique for each peer
@property
def uid(self):
return self._id
@property
def token(self):
return self._token
# -- `RequestMetadata` optional values (attached to the Execute() call) --
@property
def request_metadata(self):
return self.__request_metadata
@property
def tool_name(self):
if self.__request_metadata and self.__request_metadata.tool_details:
return self.__request_metadata.tool_details.tool_name
return None
def tool_version(self):
if self.__request_metadata and self.__request_metadata.tool_details:
return self.__request_metadata.tool_details.tool_version
return None
@property
def action_id(self):
if self.__request_metadata:
return self.__request_metadata.action_id
return None
@property
def tool_invocation_id(self):
if self.__request_metadata:
return self.__request_metadata.tool_invocation_id
return None
@property
def correlated_invocations_id(self):
if self.__request_metadata:
return self.__request_metadata.correlated_invocations_id
return None
......@@ -54,9 +54,8 @@ class Scheduler:
self.__queue = []
self._is_instrumented = monitor
if self._is_instrumented:
self._is_instrumented = False
if monitor:
self.activate_monitoring()
# --- Public API ---
......@@ -87,7 +86,7 @@ class Scheduler:
Args:
job_name (str): name of the job to subscribe to.
peer (str): a unique string identifying the client.
peer (Peer): object that represents the client
message_queue (queue.Queue): the event queue to register.
Returns:
......@@ -114,7 +113,7 @@ class Scheduler:
Args:
operation_name (str): name of the operation to subscribe to.
peer (str): a unique string identifying the client.
peer (Peer): an object that represents the client.
message_queue (queue.Queue): the event queue to register.
Returns:
......@@ -137,7 +136,7 @@ class Scheduler:
Args:
operation_name (str): name of the operation to unsubscribe from.
peer (str): a unique string identifying the client.
peer (Peer): object that represents the client
Raises:
NotFoundError: If no operation with `operation_name` exists.
......
......@@ -14,6 +14,7 @@
import hashlib
from _version import __version__
# Hash function used for computing digests:
......@@ -43,3 +44,13 @@ BROWSER_URL_FORMAT = '%(type)s/%(instance)s/%(hash)s/%(sizebytes)s/'
# type - Type of CAS object, eg. 'action_result', 'command'...
# hash - Object's digest hash.
# sizebytes - Object's digest size in bytes.
# Name of the header key to attach optional `RequestMetadata`values.
# (This is defined in the REAPI specification.)
REQUEST_METADATA_HEADER_NAME = 'requestmetadata-bin'
# 'RequestMetadata' header values. These values will be used when
# attaching optional metadata to a gRPC request's header:
REQUEST_METADATA_TOOL_NAME = 'buildgrid'
REQUEST_METADATA_TOOL_VERSION = __version__
......@@ -18,7 +18,7 @@ from operator import attrgetter
import os
import socket
from buildgrid.settings import HASH, HASH_LENGTH, BROWSER_URL_FORMAT
from buildgrid.settings import HASH, HASH_LENGTH, BROWSER_URL_FORMAT, REQUEST_METADATA_HEADER_NAME
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
......@@ -284,3 +284,25 @@ def output_directory_maker(directory_path, working_path, tree_digest):
output_directory.path = os.path.relpath(directory_path, start=working_path)
return output_directory
def request_metadata_header_entry(tool_name=None, tool_version=None,
action_id=None, tool_invocation_id=None,
correlated_invocations_id=None):
"""Creates a serialized RequestMetadata entry to attach to a gRPC
call header. Arguments should be of type str or None.
"""
request_metadata = remote_execution_pb2.RequestMetadata()
if action_id:
request_metadata.action_id = action_id
if tool_invocation_id:
request_metadata.tool_invocation_id = tool_invocation_id
if correlated_invocations_id:
request_metadata.correlated_invocations_id = correlated_invocations_id
if tool_name:
request_metadata.tool_details.tool_name = tool_name
if tool_version:
request_metadata.tool_details.tool_version = tool_version
return ((REQUEST_METADATA_HEADER_NAME,
request_metadata.SerializeToString()),)