Skip to content
Snippets Groups Projects
Commit b1c0ef09 authored by Martin Blanchard's avatar Martin Blanchard
Browse files

Keep track of peers watching jobs

#75
parent 939d3763
No related branches found
No related tags found
Loading
......@@ -21,7 +21,7 @@ An instance of the Remote Execution Service.
import logging
from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
from ..job import Job
......@@ -46,7 +46,7 @@ class ExecutionInstance:
def hash_type(self):
return get_hash_type()
def execute(self, action_digest, skip_cache_lookup, message_queue=None):
def execute(self, action_digest, skip_cache_lookup, peer=None, message_queue=None):
""" Sends a job for execution.
Queues an action and creates an Operation instance to be associated with
this action.
......@@ -58,26 +58,26 @@ class ExecutionInstance:
raise FailedPreconditionError("Could not get action from storage.")
job = Job(action, action_digest)
if message_queue is not None:
job.register_client(message_queue)
if peer is not None and message_queue is not None:
job.register_operation_client(peer, message_queue)
self._scheduler.queue_job(job, skip_cache_lookup)
return job.operation
def register_message_client(self, name, queue):
def register_operation_client(self, job_name, peer, message_queue):
try:
self._scheduler.register_client(name, queue)
self._scheduler.register_operation_client(job_name, peer, message_queue)
except KeyError:
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
except NotFoundError:
raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
def unregister_message_client(self, name, queue):
def unregister_operation_client(self, job_name, peer):
try:
self._scheduler.unregister_client(name, queue)
self._scheduler.unregister_operation_client(job_name, peer)
except KeyError:
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
except NotFoundError:
raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
def stream_operation_updates(self, message_queue, operation_name):
job = message_queue.get()
......
......@@ -96,12 +96,14 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
try:
instance = self._get_instance(instance_name)
operation = instance.execute(request.action_digest,
request.skip_cache_lookup,
message_queue)
peer=peer,
message_queue=message_queue)
context.add_callback(partial(self._rpc_termination_callback,
peer, instance_name, operation.name, message_queue))
peer, instance_name, operation.name))
if self._is_instrumented:
if peer not in self.__peers:
......@@ -157,9 +159,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
try:
instance = self._get_instance(instance_name)
instance.register_message_client(operation_name, message_queue)
instance.register_operation_client(operation_name,
peer, message_queue)
context.add_callback(partial(self._rpc_termination_callback,
peer, instance_name, operation_name, message_queue))
peer, instance_name, operation_name))
if self._is_instrumented:
if peer not in self.__peers:
......@@ -208,10 +212,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
# --- Private API ---
def _rpc_termination_callback(self, peer, instance_name, job_name, message_queue):
def _rpc_termination_callback(self, peer, instance_name, job_name):
instance = self._get_instance(instance_name)
instance.unregister_message_client(job_name, message_queue)
instance.unregister_operation_client(job_name, peer)
if self._is_instrumented:
if self.__peers[peer] > 1:
......
......@@ -46,6 +46,7 @@ class Job:
self.__worker_start_timestamp = timestamp_pb2.Timestamp()
self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
self.__operation_message_queues = {}
self.__operation_cancelled = False
self.__lease_cancelled = False
......@@ -54,7 +55,6 @@ class Job:
self._action.CopyFrom(action)
self._do_not_cache = self._action.do_not_cache
self._operation_update_queues = []
self._operation.name = self._name
self._operation.done = False
self._n_tries = 0
......@@ -132,26 +132,30 @@ class Job:
@property
def n_clients(self):
return len(self._operation_update_queues)
return len(self.__operation_message_queues)
def register_client(self, queue):
"""Subscribes to the job's :class:`Operation` stage change events.
def register_operation_client(self, peer, message_queue):
"""Subscribes to the job's :class:`Operation` stage changes.
Queues this :object:`Job` instance.
Args:
queue (queue.Queue): the event queue to register.
peer (str): a unique string identifying the client.
message_queue (queue.Queue): the event queue to register.
"""
self._operation_update_queues.append(queue)
queue.put(self)
if peer not in self.__operation_message_queues:
self.__operation_message_queues[peer] = message_queue
def unregister_client(self, queue):
"""Unsubscribes to the job's :class:`Operation` stage change events.
message_queue.put(self)
def unregister_operation_client(self, peer):
"""Unsubscribes to the job's :class:`Operation` stage change.
Args:
queue (queue.Queue): the event queue to unregister.
peer (str): a unique string identifying the client.
"""
self._operation_update_queues.remove(queue)
if peer not in self.__operation_message_queues:
del self.__operation_message_queues[peer]
def set_cached_result(self, action_result):
"""Allows specifying an action result form the action cache for the job.
......@@ -262,8 +266,8 @@ class Job:
self._operation.metadata.Pack(self.__operation_metadata)
for queue in self._operation_update_queues:
queue.put(self)
for message_queue in self.__operation_message_queues.values():
message_queue.put(self)
def check_operation_status(self):
"""Reports errors on unexpected job's :class:Operation state.
......
......@@ -75,26 +75,3 @@ class OperationsInstance:
except KeyError:
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
def register_message_client(self, name, queue):
try:
self._scheduler.register_client(name, queue)
except KeyError:
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
def unregister_message_client(self, name, queue):
try:
self._scheduler.unregister_client(name, queue)
except KeyError:
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
def stream_operation_updates(self, message_queue, operation_name):
job = message_queue.get()
while not job.operation.done:
yield job.operation
job = message_queue.get()
job.check_operation_status()
yield job.operation
......@@ -52,18 +52,43 @@ class Scheduler:
# --- Public API ---
def register_client(self, job_name, queue):
job = self.jobs[job_name]
def register_operation_client(self, job_name, peer, message_queue):
"""Subscribes to one of the job's :class:`Operation` stage changes.
job.register_client(queue)
Args:
job_name (str): name of the job subscribe to.
peer (str): a unique string identifying the client.
message_queue (queue.Queue): the event queue to register.
def unregister_client(self, job_name, queue):
job = self.jobs[job_name]
Raises:
NotFoundError: If no job with `job_name` exists.
"""
try:
job = self.jobs[job_name]
except KeyError:
raise NotFoundError('No job named {} found.'.format(job_name))
job.register_operation_client(peer, message_queue)
def unregister_operation_client(self, job_name, peer):
"""Unsubscribes to one of the job's :class:`Operation` stage change.
Args:
job_name (str): name of the job to unsubscribe from.
peer (str): a unique string identifying the client.
Raises:
NotFoundError: If no job with `job_name` exists.
"""
try:
job = self.jobs[job_name]
except KeyError:
raise NotFoundError('No job named {} found.'.format(job_name))
job.unregister_client(queue)
job.unregister_operation_client(peer)
if not job.n_clients and job.operation.done:
del self.jobs[job_name]
if job.n_clients == 0 and job.operation.done:
del self.jobs[job.name]
if self._is_instrumented:
self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment