Skip to content
Snippets Groups Projects
Commit 7fcf4515 authored by Santiago Gil's avatar Santiago Gil
Browse files

Add support for RequestMetadata

parent d0a8c0bc
No related branches found
No related tags found
No related merge requests found
Pipeline #48708992 passed
......@@ -30,6 +30,8 @@ from buildgrid.client.authentication import setup_channel
from buildgrid.client.cas import download, upload
from buildgrid._exceptions import InvalidArgumentError
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid.settings import REQUEST_METADATA_HEADER_NAME, \
REQUEST_METADATA_TOOL_NAME, REQUEST_METADATA_TOOL_VERSION
from buildgrid.utils import create_digest
from ..cli import pass_context
......@@ -66,10 +68,12 @@ def cli(context, remote, instance_name, auth_token, client_key, client_cert, ser
@cli.command('request-dummy', short_help="Send a dummy action.")
@click.option('--number', type=click.INT, default=1, show_default=True,
help="Number of request to send.")
@click.option('--request-metadata', is_flag=True,
help="Attach RequestMetadata to the request header.")
@click.option('--wait-for-completion', is_flag=True,
help="Stream updates until jobs are completed.")
@pass_context
def request_dummy(context, number, wait_for_completion):
def request_dummy(context, number, request_metadata, wait_for_completion):
click.echo("Sending execution request...")
command = remote_execution_pb2.Command()
......@@ -85,12 +89,21 @@ def request_dummy(context, number, wait_for_completion):
action_digest=action_digest,
skip_cache_lookup=True)
# If enabled, we attach some `RequestMetadata` information to the request:
execute_arguments = {}
if request_metadata:
metadata = request_metadata_header_entry(tool_name=REQUEST_METADATA_TOOL_NAME,
tool_version=REQUEST_METADATA_TOOL_VERSION,
action_id='2',
tool_invocation_id='3',
correlated_invocations_id='4')
execute_arguments['metadata'] = metadata
responses = []
for _ in range(0, number):
responses.append(stub.Execute(request))
responses.append(stub.Execute(request, **execute_arguments))
for response in responses:
if wait_for_completion:
result = None
for stream in response:
......@@ -113,14 +126,22 @@ def request_dummy(context, number, wait_for_completion):
help="Output directory for the output files.")
@click.option('-p', '--platform-property', nargs=2, type=(click.STRING, click.STRING), multiple=True,
help="List of key-value pairs of required platform properties.")
@click.option('-t', '--tool-details', nargs=2, type=str,
default=(REQUEST_METADATA_TOOL_NAME, REQUEST_METADATA_TOOL_VERSION),
help="Tool name and version.")
@click.option('-a', '--action-id', type=str, help='Action ID.')
@click.option('-i', '--invocation-id', type=str, help='Tool invocation ID.')
@click.option('-c', '--correlation-id', type=str, help='Correlated invocation ID.')
@click.argument('input-root', nargs=1, type=click.Path(), required=True)
@click.argument('commands', nargs=-1, type=click.STRING, required=True)
@pass_context
def run_command(context, input_root, commands, output_file, output_directory,
platform_property):
platform_property, tool_details, action_id, invocation_id,
correlation_id):
stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
output_executables = []
with upload(context.channel, instance=context.instance_name) as uploader:
command = remote_execution_pb2.Command()
......@@ -157,7 +178,14 @@ def run_command(context, input_root, commands, output_file, output_directory,
request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name,
action_digest=action_digest,
skip_cache_lookup=True)
response = stub.Execute(request)
metadata = request_metadata_header_entry(tool_name=tool_details[0],
tool_version=tool_details[1],
action_id=action_id,
tool_invocation_id=invocation_id,
correlated_invocations_id=correlation_id)
response = stub.Execute(request, metadata=metadata)
stream = None
for stream in response:
......@@ -180,3 +208,25 @@ def run_command(context, input_root, commands, output_file, output_directory,
if output_file_response.path in output_executables:
st = os.stat(path)
os.chmod(path, st.st_mode | stat.S_IXUSR)
def request_metadata_header_entry(tool_name=None, tool_version=None,
action_id=None, tool_invocation_id=None,
correlated_invocations_id=None):
"""Creates a serialized RequestMetadata entry to attach to a gRPC
call header. Arguments should be of type str or None.
"""
request_metadata = remote_execution_pb2.RequestMetadata()
if action_id:
request_metadata.action_id = action_id
if tool_invocation_id:
request_metadata.tool_invocation_id = tool_invocation_id
if correlated_invocations_id:
request_metadata.correlated_invocations_id = correlated_invocations_id
if tool_name:
request_metadata.tool_details.tool_name = tool_name
if tool_version:
request_metadata.tool_details.tool_version = tool_version
return ((REQUEST_METADATA_HEADER_NAME,
request_metadata.SerializeToString()),)
......@@ -62,7 +62,7 @@ class ExecutionInstance:
return get_hash_type()
def execute(self, action_digest, skip_cache_lookup):
""" Sends a job for execution.
"""Sends a job for execution.
Queues an action and creates an Operation instance to be associated with
this action.
"""
......
......@@ -27,9 +27,11 @@ from functools import partial
import grpc
from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, CancelledError
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.longrunning import operations_pb2
from buildgrid.server.peer import Peer
from buildgrid.server._authentication import AuthContext, authorize
from buildgrid.settings import REQUEST_METADATA_HEADER_NAME
class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
......@@ -94,7 +96,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
instance_name = request.instance_name
message_queue = queue.Queue()
peer = context.peer()
peer_id = context.peer()
request_metadata = self._context_extract_request_metadata(context)
peer = Peer(peer_id=peer_id, request_metadata=request_metadata)
try:
instance = self._get_instance(instance_name)
......@@ -102,8 +107,8 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
job_name = instance.execute(request.action_digest,
request.skip_cache_lookup)
operation_name = instance.register_job_peer(job_name,
peer, message_queue)
operation_name = instance.register_job_peer(job_name, peer,
message_queue)
context.add_callback(partial(self._rpc_termination_callback,
peer, instance_name, operation_name))
......@@ -161,8 +166,7 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
try:
instance = self._get_instance(instance_name)
instance.register_operation_peer(operation_name,
peer, message_queue)
instance.register_operation_peer(operation_name, peer, message_queue)
context.add_callback(partial(self._rpc_termination_callback,
peer, instance_name, operation_name))
......@@ -231,3 +235,21 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
except KeyError:
raise InvalidArgumentError("Instance doesn't exist on server: [{}]".format(name))
@classmethod
def _context_extract_request_metadata(cls, context):
"""Given a context object, extract the RequestMetadata header
values if they are present. If they were not provided,
returns None.
"""
invocation_metadata = context.invocation_metadata()
request_metadata_entry = next((entry for entry in invocation_metadata
if entry.key == REQUEST_METADATA_HEADER_NAME),
None)
if not request_metadata_entry:
return None
request_metadata = remote_execution_pb2.RequestMetadata()
request_metadata.ParseFromString(request_metadata_entry.value)
return request_metadata
......@@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
import logging
from threading import Lock
import uuid
from google.protobuf import duration_pb2, timestamp_pb2
......@@ -62,6 +62,9 @@ class Job:
self._platform_requirements = platform_requirements \
if platform_requirements else dict()
self.__peers_lock = Lock()
self.__peers = set()
self._done = False
def __lt__(self, other):
......@@ -175,7 +178,7 @@ class Job:
"""Subscribes to a new job's :class:`Operation` stage changes.
Args:
peer (str): a unique string identifying the client.
peer (Peer): an object that represents the client.
message_queue (queue.Queue): the event queue to register.
Returns:
......@@ -194,10 +197,13 @@ class Job:
self._name, new_operation.name)
self.__operations_by_name[new_operation.name] = new_operation
self.__operations_by_peer[peer] = new_operation
self.__operations_message_queues[peer] = message_queue
self.__operations_by_peer[peer.peer_id] = new_operation
self.__operations_message_queues[peer.peer_id] = message_queue
self._send_operations_updates(peers=[peer.peer_id])
self._send_operations_updates(peers=[peer])
with self.__peers_lock:
self.__peers.add(peer)
return new_operation.name
......@@ -206,7 +212,7 @@ class Job:
Args:
operation_name (str): an existing operation's name to subscribe to.
peer (str): a unique string identifying the client.
peer (Peer): an object that represents the client.
message_queue (queue.Queue): the event queue to register.
Returns:
......@@ -222,18 +228,20 @@ class Job:
raise NotFoundError("Operation name does not exist: [{}]"
.format(operation_name))
self.__operations_by_peer[peer] = operation
self.__operations_message_queues[peer] = message_queue
self.__operations_by_peer[peer.peer_id] = operation
self.__operations_message_queues[peer.peer_id] = message_queue
self._send_operations_updates(peers=[peer])
self._send_operations_updates(peers=[peer.peer_id])
with self.__peers_lock:
self.__peers.add(peer)
def unregister_operation_peer(self, operation_name, peer):
"""Unsubscribes to the job's :class:`Operation` stage change.
Args:
operation_name (str): an existing operation's name to unsubscribe from.
peer (str): a unique string identifying the client.
peer (Peer): an object that represents the client.
Raises:
NotFoundError: If no operation with `operation_name` exists.
"""
......@@ -244,10 +252,10 @@ class Job:
raise NotFoundError("Operation name does not exist: [{}]"
.format(operation_name))
if peer in self.__operations_message_queues:
del self.__operations_message_queues[peer]
if peer.peer_id in self.__operations_message_queues:
del self.__operations_message_queues[peer.peer_id]
del self.__operations_by_peer[peer]
del self.__operations_by_peer[peer.peer_id]
# Drop the operation if nobody is watching it anymore:
if operation not in self.__operations_by_peer.values():
......@@ -258,6 +266,9 @@ class Job:
self.__logger.debug("Operation deleted for job [%s]: [%s]",
self._name, operation.name)
with self.__peers_lock:
self.__peers.remove(peer)
def list_operations(self):
"""Lists the :class:`Operation` related to a job.
......
# Copyright (C) 2019 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class Peer:
"""Represents a client during a session."""
def __init__(self, peer_id, token=None, request_metadata=None):
self._id = peer_id # This uniquely identifies a client
self._token = token
self.__request_metadata = request_metadata
def __eq__(self, other):
if isinstance(other, Peer):
return self.peer_id == other.peer_id
return False
def __hash__(self):
return hash(self.peer_id) # This string is unique for each peer
@property
def peer_id(self):
return self._id
@property
def token(self):
return self._token
# -- `RequestMetadata` optional values (attached to the Execute() call) --
@property
def request_metadata(self):
return self.__request_metadata
@property
def tool_name(self):
if self.__request_metadata and self.__request_metadata.tool_details:
return self.__request_metadata.tool_details.tool_name
return None
def tool_version(self):
if self.__request_metadata and self.__request_metadata.tool_details:
return self.__request_metadata.tool_details.tool_version
return None
@property
def action_id(self):
if self.__request_metadata:
return self.__request_metadata.action_id
return None
@property
def tool_invocation_id(self):
if self.__request_metadata:
return self.__request_metadata.tool_invocation_id
return None
@property
def correlated_invocations_id(self):
if self.__request_metadata:
return self.__request_metadata.correlated_invocations_id
return None
......@@ -54,9 +54,8 @@ class Scheduler:
self.__queue = []
self._is_instrumented = monitor
if self._is_instrumented:
self._is_instrumented = False
if monitor:
self.activate_monitoring()
# --- Public API ---
......@@ -87,7 +86,7 @@ class Scheduler:
Args:
job_name (str): name of the job to subscribe to.
peer (str): a unique string identifying the client.
peer (Peer): object that represents the client
message_queue (queue.Queue): the event queue to register.
Returns:
......@@ -114,7 +113,7 @@ class Scheduler:
Args:
operation_name (str): name of the operation to subscribe to.
peer (str): a unique string identifying the client.
peer (Peer): an object that represents the client.
message_queue (queue.Queue): the event queue to register.
Returns:
......@@ -137,7 +136,7 @@ class Scheduler:
Args:
operation_name (str): name of the operation to unsubscribe from.
peer (str): a unique string identifying the client.
peer (Peer): object that represents the client
Raises:
NotFoundError: If no operation with `operation_name` exists.
......
......@@ -43,3 +43,13 @@ BROWSER_URL_FORMAT = '%(type)s/%(instance)s/%(hash)s/%(sizebytes)s/'
# type - Type of CAS object, eg. 'action_result', 'command'...
# hash - Object's digest hash.
# sizebytes - Object's digest size in bytes.
# Name of the header key to attach optional `RequestMetadata`values.
# (This is defined in the REAPI specification.)
REQUEST_METADATA_HEADER_NAME = 'requestmetadata-bin'
# 'RequestMetadata' header values. These values will be used when
# attaching optional metadata to a gRPC request's header:
REQUEST_METADATA_TOOL_NAME = 'BuildGrid'
REQUEST_METADATA_TOOL_VERSION = '0.1'
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment