Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • edbaunton/buildgrid
  • BuildGrid/buildgrid
  • bloomberg/buildgrid
  • devcurmudgeon/buildgrid
  • mhadjimichael/buildgrid
  • jmacarthur/buildgrid
  • rkothur/buildgrid
  • valentindavid/buildgrid
  • jjardon/buildgrid
  • RichKen/buildgrid
  • jbonney/buildgrid
  • onsha_alexander/buildgrid
  • santigl/buildgrid
  • mostynb/buildgrid
  • hoffbrinkle/buildgrid
  • Malinskiy/buildgrid
  • coldtom/buildgrid
  • azeemb_a/buildgrid
  • pointswaves/buildgrid
  • BenjaminSchubert/buildgrid
  • michaellee8/buildgrid
  • anil-anil/buildgrid
  • seanborg/buildgrid
  • jdelong12/buildgrid
  • jclay/buildgrid
  • bweston92/buildgrid
  • zchen723/buildgrid
  • cpratt34/buildgrid
  • armbiant/apache-buildgrid
  • armbiant/android-buildgrid
  • itsme300/buildgrid
  • sbairoliya/buildgrid
32 results
Show changes
Commits on Source (10)
......@@ -391,7 +391,7 @@ class Downloader:
except grpc.RpcError as e:
status_code = e.code()
if status_code == grpc.StatusCode.UNIMPLEMENTED:
_CallCache.mark_unimplemented(self.channel, 'BatchUpdateBlobs')
_CallCache.mark_unimplemented(self.channel, 'GetTree')
elif status_code == grpc.StatusCode.NOT_FOUND:
raise NotFoundError("Requested directory does not exist on the remote.")
......
......@@ -126,7 +126,7 @@ class BotsInterface:
# Job does not exist, remove from bot.
return None
self._scheduler.update_job_lease(lease)
self._scheduler.update_job_lease_state(lease.id, lease)
if lease_state == LeaseState.COMPLETED:
return None
......@@ -164,7 +164,7 @@ class BotsInterface:
self.__logger.error("Assigned lease id=[%s],"
" not found on bot with name=[%s] and id=[%s]."
" Retrying job", lease_id, bot_session.name, bot_session.bot_id)
self._scheduler.retry_job(lease_id)
self._scheduler.retry_job_lease(lease_id)
def _close_bot_session(self, name):
""" Before removing the session, close any leases and
......@@ -177,7 +177,7 @@ class BotsInterface:
self.__logger.debug("Attempting to close [%s] with name: [%s]", bot_id, name)
for lease_id in self._assigned_leases[name]:
self._scheduler.retry_job(lease_id)
self._scheduler.retry_job_lease(lease_id)
self._assigned_leases.pop(name)
self.__logger.debug("Closing bot session: [%s]", name)
......
......@@ -21,11 +21,9 @@ An instance of the Remote Execution Service.
import logging
from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
from ..job import Job
from ...utils import get_hash_type
from buildgrid.utils import get_hash_type
class ExecutionInstance:
......@@ -46,44 +44,46 @@ class ExecutionInstance:
def hash_type(self):
return get_hash_type()
def execute(self, action_digest, skip_cache_lookup, message_queue=None):
def execute(self, action_digest, skip_cache_lookup):
""" Sends a job for execution.
Queues an action and creates an Operation instance to be associated with
this action.
"""
action = self._storage.get_message(action_digest, Action)
if not action:
raise FailedPreconditionError("Could not get action from storage.")
job = Job(action, action_digest)
if message_queue is not None:
job.register_client(message_queue)
return self._scheduler.queue_job_operation(action, action_digest,
skip_cache_lookup=skip_cache_lookup)
self._scheduler.queue_job(job, skip_cache_lookup)
def register_operation_peer(self, operation_name, peer, message_queue):
try:
return self._scheduler.register_job_operation_peer(operation_name,
peer, message_queue)
return job.operation
except NotFoundError:
raise InvalidArgumentError("Operation name does not exist: [{}]"
.format(operation_name))
def register_message_client(self, name, queue):
def unregister_operation_peer(self, operation_name, peer):
try:
self._scheduler.register_client(name, queue)
self._scheduler.unregister_job_operation_peer(operation_name, peer)
except KeyError:
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
except NotFoundError:
raise InvalidArgumentError("Operation name does not exist: [{}]"
.format(operation_name))
def unregister_message_client(self, name, queue):
try:
self._scheduler.unregister_client(name, queue)
def stream_operation_updates(self, message_queue):
error, operation = message_queue.get()
if error is not None:
raise error
except KeyError:
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
while not operation.done:
yield operation
def stream_operation_updates(self, message_queue, operation_name):
job = message_queue.get()
while not job.operation.done:
yield job.operation
job = message_queue.get()
job.check_operation_status()
error, operation = message_queue.get()
if error is not None:
raise error
yield job.operation
yield operation
......@@ -98,12 +98,15 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
try:
instance = self._get_instance(instance_name)
operation = instance.execute(request.action_digest,
request.skip_cache_lookup,
message_queue)
job_name = instance.execute(request.action_digest,
request.skip_cache_lookup)
operation_name = instance.register_operation_peer(job_name,
peer, message_queue)
context.add_callback(partial(self._rpc_termination_callback,
peer, instance_name, operation.name, message_queue))
peer, instance_name, operation_name))
if self._is_instrumented:
if peer not in self.__peers:
......@@ -112,16 +115,13 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
else:
self.__peers[peer] += 1
instanced_op_name = "{}/{}".format(instance_name, operation.name)
operation_full_name = "{}/{}".format(instance_name, operation_name)
self.__logger.info("Operation name: [%s]", instanced_op_name)
self.__logger.info("Operation name: [%s]", operation_full_name)
for operation in instance.stream_operation_updates(message_queue,
operation.name):
op = operations_pb2.Operation()
op.CopyFrom(operation)
op.name = instanced_op_name
yield op
for operation in instance.stream_operation_updates(message_queue):
operation.name = operation_full_name
yield operation
except InvalidArgumentError as e:
self.__logger.error(e)
......@@ -160,9 +160,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
try:
instance = self._get_instance(instance_name)
instance.register_message_client(operation_name, message_queue)
operation_name = instance.register_operation_peer(operation_name,
peer, message_queue)
context.add_callback(partial(self._rpc_termination_callback,
peer, instance_name, operation_name, message_queue))
peer, instance_name, operation_name))
if self._is_instrumented:
if peer not in self.__peers:
......@@ -171,12 +173,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
else:
self.__peers[peer] += 1
for operation in instance.stream_operation_updates(message_queue,
operation_name):
op = operations_pb2.Operation()
op.CopyFrom(operation)
op.name = request.name
yield op
operation_full_name = "{}/{}".format(instance_name, operation_name)
for operation in instance.stream_operation_updates(message_queue):
operation.name = operation_full_name
yield operation
except InvalidArgumentError as e:
self.__logger.error(e)
......@@ -211,10 +212,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
# --- Private API ---
def _rpc_termination_callback(self, peer, instance_name, job_name, message_queue):
def _rpc_termination_callback(self, peer, instance_name, operation_name):
instance = self._get_instance(instance_name)
instance.unregister_message_client(job_name, message_queue)
instance.unregister_operation_peer(operation_name, peer)
if self._is_instrumented:
if self.__peers[peer] > 1:
......
......@@ -20,7 +20,7 @@ import uuid
from google.protobuf import duration_pb2, timestamp_pb2
from buildgrid._enums import LeaseState, OperationStage
from buildgrid._exceptions import CancelledError
from buildgrid._exceptions import CancelledError, NotFoundError
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
from buildgrid._protos.google.longrunning import operations_pb2
......@@ -29,35 +29,70 @@ from buildgrid._protos.google.rpc import code_pb2
class Job:
def __init__(self, action, action_digest):
def __init__(self, action, action_digest, priority=0):
self.__logger = logging.getLogger(__name__)
self._name = str(uuid.uuid4())
self._priority = priority
self._action = remote_execution_pb2.Action()
self._operation = operations_pb2.Operation()
self._lease = None
self.__execute_response = None
self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
self.__operations_by_name = {} # Name to Operation 1:1 mapping
self.__operations_by_peer = {} # Peer to Operation 1:1 mapping
self.__queued_timestamp = timestamp_pb2.Timestamp()
self.__queued_time_duration = duration_pb2.Duration()
self.__worker_start_timestamp = timestamp_pb2.Timestamp()
self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
self.__operation_cancelled = False
self.__operations_message_queues = {}
self.__operations_cancelled = set()
self.__lease_cancelled = False
self.__job_cancelled = False
self.__operation_metadata.action_digest.CopyFrom(action_digest)
self.__operation_metadata.stage = OperationStage.UNKNOWN.value
self._action.CopyFrom(action)
self._do_not_cache = self._action.do_not_cache
self._operation_update_queues = []
self._operation.name = self._name
self._operation.done = False
self._n_tries = 0
self._done = False
def __lt__(self, other):
try:
return self.priority < other.priority
except AttributeError:
return NotImplemented
def __le__(self, other):
try:
return self.priority <= other.priority
except AttributeError:
return NotImplemented
def __eq__(self, other):
if isinstance(other, Job):
return self.name == other.name
return False
def __ne__(self, other):
return not self.__eq__(other)
def __gt__(self, other):
try:
return self.priority > other.priority
except AttributeError:
return NotImplemented
def __ge__(self, other):
try:
return self.priority >= other.priority
except AttributeError:
return NotImplemented
# --- Public API ---
@property
......@@ -65,17 +100,27 @@ class Job:
return self._name
@property
def do_not_cache(self):
return self._do_not_cache
def priority(self):
return self._priority
@property
def done(self):
return self._done
# --- Public API: REAPI ---
@property
def action(self):
return self._action
def do_not_cache(self):
return self._do_not_cache
@property
def action_digest(self):
return self.__operation_metadata.action_digest
@property
def operation_stage(self):
return OperationStage(self.__operation_metadata.stage)
@property
def action_result(self):
if self.__execute_response is not None:
......@@ -84,19 +129,177 @@ class Job:
return None
@property
def holds_cached_action_result(self):
def holds_cached_result(self):
if self.__execute_response is not None:
return self.__execute_response.cached_result
else:
return False
@property
def operation(self):
return self._operation
def set_cached_result(self, action_result):
"""Allows specifying an action result form the action cache for the job.
Note:
This won't trigger any :class:`Operation` stage transition.
Args:
action_result (ActionResult): The result from cache.
"""
self.__execute_response = remote_execution_pb2.ExecuteResponse()
self.__execute_response.result.CopyFrom(action_result)
self.__execute_response.cached_result = True
@property
def operation_stage(self):
return OperationStage(self.__operation_metadata.state)
def n_peers(self):
return len(self.__operations_message_queues)
def register_operation_peer(self, peer, message_queue):
"""Subscribes to the job's :class:`Operation` stage changes.
Args:
peer (str): a unique string identifying the client.
message_queue (queue.Queue): the event queue to register.
Returns:
str: The name of the subscribed :class:`Operation`.
"""
if peer in self.__operations_by_peer:
operation = self.__operations_by_peer[peer]
else:
operation = self.create_operation_for_peer(peer)
self.__operations_message_queues[peer] = message_queue
self._send_operations_updates(peers=[peer])
return operation.name
def unregister_operation_peer(self, peer):
"""Unsubscribes to the job's :class:`Operation` stage change.
Args:
peer (str): a unique string identifying the client.
"""
if peer in self.__operations_message_queues:
del self.__operations_message_queues[peer]
# Drop the operation if nobody is watching it anymore:
if peer in self.__operations_by_peer:
operation = self.__operations_by_peer.pop(peer)
if operation not in self.__operations_by_peer.values():
del self.__operations_by_name[operation.name]
self.__operations_cancelled.discard(operation.name)
def create_operation_for_peer(self, peer):
"""Generates a new :class:`Operation` for `peer`.
Args:
peer (str): a unique string identifying the client.
"""
if peer in self.__operations_by_peer:
return self.__operations_by_peer[peer]
new_operation = operations_pb2.Operation()
# Copy state from first existing and non cancelled operation:
for operation in self.__operations_by_name.values():
if operation.name not in self.__operations_cancelled:
new_operation.CopyFrom(operation)
break
new_operation.name = str(uuid.uuid4())
self.__operations_by_name[new_operation.name] = new_operation
self.__operations_by_peer[peer] = new_operation
return new_operation
def cancel_operation_for_peer(self, peer):
"""Triggers a job's :class:`Operation` cancellation.
This may cancel any job's :class:`Lease` that may have been issued.
Args:
peer (str): a unique string identifying the client.
"""
operations_to_cancel, peers_to_notify = set(), set()
# If the peer is watching the job, only cancel its operation, if not,
# cancel the entire job (including all operations and lease):
if peer in self.__operations_by_peer:
operations_to_cancel.add(self.__operations_by_peer[peer].name)
peers_to_notify.add(peer)
else:
operations_to_cancel.update(self.__operations_by_name.keys())
peers_to_notify.update(self.__operations_by_peer.keys())
operations_to_cancel = operations_to_cancel - self.__operations_cancelled
if not operations_to_cancel:
return
self.__operations_cancelled.update(operations_to_cancel)
ongoing_operations = set(self.__operations_by_name.keys())
# Job is cancelled if all the operation are:
self.__job_cancelled = ongoing_operations.issubset(self.__operations_cancelled)
if self.__job_cancelled and self._lease is not None:
self.cancel_lease()
self._send_operations_updates(peers=peers_to_notify, notify_cancelled=True)
def list_operations(self):
"""Lists the :class:`Operation` related to a job.
Returns:
list: A list of :class:`Operation` names.
"""
return list(self.__operations_by_name.keys())
def get_operation(self, operation_name):
"""Returns a copy of the the job's :class:`Operation`.
Args:
operation_name (str): the operation's name.
Raises:
NotFoundError: If no operation with `operation_name` exists.
"""
try:
operation = self.__operations_by_name[operation_name]
except KeyError:
raise NotFoundError("Operation name does not exist: [{}]"
.format(operation_name))
return self._copy_operation(operation)
def update_operation_stage(self, stage):
"""Operates a stage transition for the job's :class:`Operation`.
Args:
stage (OperationStage): the operation stage to transition to.
"""
if stage.value == self.__operation_metadata.stage:
return
self.__operation_metadata.stage = stage.value
if self.__operation_metadata.stage == OperationStage.QUEUED.value:
if self.__queued_timestamp.ByteSize() == 0:
self.__queued_timestamp.GetCurrentTime()
self._n_tries += 1
elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
self._done = True
self._send_operations_updates()
# --- Public API: RWAPI ---
@property
def lease(self):
......@@ -117,45 +320,15 @@ class Job:
def n_tries(self):
return self._n_tries
@property
def n_clients(self):
return len(self._operation_update_queues)
def register_client(self, queue):
"""Subscribes to the job's :class:`Operation` stage change events.
Queues this :object:`Job` instance.
Args:
queue (queue.Queue): the event queue to register.
"""
self._operation_update_queues.append(queue)
queue.put(self)
def unregister_client(self, queue):
"""Unsubscribes to the job's :class:`Operation` stage change events.
Args:
queue (queue.Queue): the event queue to unregister.
"""
self._operation_update_queues.remove(queue)
def set_cached_result(self, action_result):
"""Allows specifying an action result form the action cache for the job.
"""
self.__execute_response = remote_execution_pb2.ExecuteResponse()
self.__execute_response.result.CopyFrom(action_result)
self.__execute_response.cached_result = True
def create_lease(self):
"""Emits a new :class:`Lease` for the job.
Only one :class:`Lease` can be emitted for a given job. This method
should only be used once, any furhter calls are ignored.
should only be used once, any further calls are ignored.
"""
if self.__operation_cancelled:
return None
elif self._lease is not None:
if self._lease is not None:
return self._lease
elif self.__job_cancelled:
return None
self._lease = bots_pb2.Lease()
......@@ -166,14 +339,14 @@ class Job:
return self._lease
def update_lease_state(self, state, status=None, result=None):
"""Operates a state transition for the job's current :class:Lease.
"""Operates a state transition for the job's current :class:`Lease`.
Args:
state (LeaseState): the lease state to transition to.
status (google.rpc.Status): the lease execution status, only
required if `state` is `COMPLETED`.
result (google.protobuf.Any): the lease execution result, only
required if `state` is `COMPLETED`.
status (google.rpc.Status, optional): the lease execution status,
only required if `state` is `COMPLETED`.
result (google.protobuf.Any, optional): the lease execution result,
only required if `state` is `COMPLETED`.
"""
if state.value == self._lease.state:
return
......@@ -214,79 +387,96 @@ class Job:
self.__execute_response.status.CopyFrom(status)
def cancel_lease(self):
"""Triggers a job's :class:Lease cancellation.
"""Triggers a job's :class:`Lease` cancellation.
This will not cancel the job's :class:Operation.
Note:
This will not cancel the job's :class:`Operation`.
"""
self.__lease_cancelled = True
if self._lease is not None:
self.update_lease_state(LeaseState.CANCELLED)
def delete_lease(self):
"""Discard the job's :class:Lease."""
"""Discard the job's :class:`Lease`.
Note:
This will not cancel the job's :class:`Operation`.
"""
self.__worker_start_timestamp.Clear()
self.__worker_completed_timestamp.Clear()
self._lease = None
def update_operation_stage(self, stage):
"""Operates a stage transition for the job's :class:Operation.
# --- Public API: Monitoring ---
Args:
stage (OperationStage): the operation stage to transition to.
"""
if stage.value == self.__operation_metadata.stage:
return
def query_queue_time(self):
return self.__queued_time_duration.ToTimedelta()
self.__operation_metadata.stage = stage.value
def query_n_retries(self):
return self._n_tries - 1 if self._n_tries > 0 else 0
if self.__operation_metadata.stage == OperationStage.QUEUED.value:
if self.__queued_timestamp.ByteSize() == 0:
self.__queued_timestamp.GetCurrentTime()
self._n_tries += 1
# --- Private API ---
elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
def _copy_operation(self, operation):
"""Simply duplicates a given :class:`Lease` object."""
new_operation = operations_pb2.Operation()
new_operation.CopyFrom(operation)
elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
if self.__execute_response is not None:
self._operation.response.Pack(self.__execute_response)
self._operation.done = True
return new_operation
self._operation.metadata.Pack(self.__operation_metadata)
def _update_operation(self, operation, operation_metadata, execute_response=None, done=False):
"""Forges a :class:`Operation` message given input data."""
operation.metadata.Pack(operation_metadata)
for queue in self._operation_update_queues:
queue.put(self)
if execute_response is not None:
operation.response.Pack(execute_response)
def check_operation_status(self):
"""Reports errors on unexpected job's :class:Operation state.
operation.done = done
Raises:
CancelledError: if the job's :class:Operation was cancelled.
"""
if self.__operation_cancelled:
raise CancelledError(self.__execute_response.status.message)
def _update_cancelled_operation(self, operation, operation_metadata, execute_response=None):
"""Forges a cancelled :class:`Operation` message given input data."""
cancelled_operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
cancelled_operation_metadata.CopyFrom(operation_metadata)
cancelled_operation_metadata.stage = OperationStage.COMPLETED.value
def cancel_operation(self):
"""Triggers a job's :class:Operation cancellation.
operation.metadata.Pack(cancelled_operation_metadata)
This will also cancel any job's :class:Lease that may have been issued.
"""
self.__operation_cancelled = True
if self._lease is not None:
self.cancel_lease()
cancelled_execute_response = remote_execution_pb2.ExecuteResponse()
if execute_response is not None:
cancelled_execute_response.CopyFrom(self.__execute_response)
cancelled_execute_response.status.code = code_pb2.CANCELLED
cancelled_execute_response.status.message = "Operation cancelled by client."
self.__execute_response = remote_execution_pb2.ExecuteResponse()
self.__execute_response.status.code = code_pb2.CANCELLED
self.__execute_response.status.message = "Operation cancelled by client."
operation.response.Pack(cancelled_execute_response)
self.update_operation_stage(OperationStage.COMPLETED)
operation.done = True
# --- Public API: Monitoring ---
def _send_operations_updates(self, peers=None, notify_cancelled=False):
"""Sends :class:`Operation` stage change messages to watchers."""
for operation in self.__operations_by_name.values():
if operation.name in self.__operations_cancelled:
self._update_cancelled_operation(operation, self.__operation_metadata,
execute_response=self.__execute_response)
def query_queue_time(self):
return self.__queued_time_duration.ToTimedelta()
else:
self._update_operation(operation, self.__operation_metadata,
execute_response=self.__execute_response,
done=self._done)
def query_n_retries(self):
return self._n_tries - 1 if self._n_tries > 0 else 0
for peer, message_queue in self.__operations_message_queues.items():
if peer not in self.__operations_by_peer:
continue
elif peers and peer not in peers:
continue
operation = self.__operations_by_peer[peer]
# Messages are pairs of (Exception, Operation,):
if not notify_cancelled and operation.name in self.__operations_cancelled:
continue
elif operation.name not in self.__operations_cancelled:
message = (None, self._copy_operation(operation),)
else:
message = (CancelledError("Operation has been cancelled"),
self._copy_operation(operation),)
message_queue.put(message)
......@@ -21,7 +21,7 @@ An instance of the LongRunningOperations Service.
import logging
from buildgrid._exceptions import InvalidArgumentError
from buildgrid._exceptions import InvalidArgumentError, NotFoundError
from buildgrid._protos.google.longrunning import operations_pb2
......@@ -39,62 +39,43 @@ class OperationsInstance:
def register_instance_with_server(self, instance_name, server):
server.add_operations_instance(self, instance_name)
def get_operation(self, name):
job = self._scheduler.jobs.get(name)
def get_operation(self, job_name):
try:
operation = self._scheduler.get_job_operation(job_name)
if job is None:
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
except NotFoundError:
raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
else:
return job.operation
return operation
def list_operations(self, list_filter, page_size, page_token):
# TODO: Pages
# Spec says number of pages and length of a page are optional
response = operations_pb2.ListOperationsResponse()
operation_names = [operation_name for job_name in
self._scheduler.list_current_jobs() for operation_name in
self._scheduler.list_job_operations(job_name)]
operations = []
for job in self._scheduler.list_jobs():
op = operations_pb2.Operation()
op.CopyFrom(job.operation)
operations.append(op)
for operation_name in operation_names:
operation = self._scheduler.get_job_operation(operation_name)
operations.append(operation)
response.operations.extend(operations)
return response
def delete_operation(self, name):
try:
self._scheduler.jobs.pop(name)
except KeyError:
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
def cancel_operation(self, name):
def delete_operation(self, job_name):
try:
self._scheduler.cancel_job_operation(name)
self._scheduler.delete_job_operation(job_name)
except KeyError:
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
except NotFoundError:
raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
def register_message_client(self, name, queue):
def cancel_operation(self, job_name):
try:
self._scheduler.register_client(name, queue)
except KeyError:
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
def unregister_message_client(self, name, queue):
try:
self._scheduler.unregister_client(name, queue)
except KeyError:
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
def stream_operation_updates(self, message_queue, operation_name):
job = message_queue.get()
while not job.operation.done:
yield job.operation
job = message_queue.get()
job.check_operation_status()
self._scheduler.cancel_job_operation(job_name)
yield job.operation
except NotFoundError:
raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
......@@ -19,12 +19,13 @@ Scheduler
Schedules jobs.
"""
from collections import deque
import bisect
from datetime import timedelta
import logging
from buildgrid._enums import LeaseState, OperationStage
from buildgrid._exceptions import NotFoundError
from buildgrid.server.job import Job
class Scheduler:
......@@ -42,8 +43,12 @@ class Scheduler:
self.__retries_count = 0
self._action_cache = action_cache
self.jobs = {}
self.queue = deque()
self.__jobs_by_action = {} # Action to Job 1:1 mapping
self.__jobs_by_operation = {} # Operation to Job 1:1 mapping
self.__jobs_by_name = {} # Name to Job 1:1 mapping
self.__queue = []
self._is_instrumented = monitor
......@@ -52,61 +57,189 @@ class Scheduler:
# --- Public API ---
def register_client(self, job_name, queue):
job = self.jobs[job_name]
def list_current_jobs(self):
"""Returns a list of the :class:`Job` names currently managed."""
return self.__jobs_by_name.keys()
def list_job_operations(self, job_name):
"""Returns a list of :class:`Operation` names for a :class:`Job`."""
if job_name in self.__jobs_by_name:
return self.__jobs_by_name[job_name].list_operations()
else:
return []
job.register_client(queue)
# --- Public API: REAPI ---
def unregister_client(self, job_name, queue):
job = self.jobs[job_name]
def register_job_operation_peer(self, operation_name, peer, message_queue):
"""Subscribes to one of the job's :class:`Operation` stage changes.
job.unregister_client(queue)
Args:
operation_name (str): name of the operation to subscribe to.
peer (str): a unique string identifying the client.
message_queue (queue.Queue): the event queue to register.
Returns:
str: The name of the subscribed :class:`Operation`.
Raises:
NotFoundError: If no operation with `operation_name` exists.
"""
if operation_name in self.__jobs_by_operation:
job = self.__jobs_by_operation[operation_name]
if not job.n_clients and job.operation.done and not job.lease:
elif operation_name in self.__jobs_by_name:
job = self.__jobs_by_name[operation_name]
else:
raise NotFoundError("Operation name does not exist: [{}]"
.format(operation_name))
operation_name = job.register_operation_peer(peer, message_queue)
self.__jobs_by_operation[operation_name] = job
return operation_name
def unregister_job_operation_peer(self, operation_name, peer):
"""Unsubscribes to one of the job's :class:`Operation` stage change.
Args:
operation_name (str): name of the operation to unsubscribe from.
peer (str): a unique string identifying the client.
Raises:
NotFoundError: If no operation with `operation_name` exists.
"""
if operation_name in self.__jobs_by_operation:
job = self.__jobs_by_operation[operation_name]
else:
raise NotFoundError("Operation name does not exist: [{}]"
.format(operation_name))
if operation_name in self.__jobs_by_operation:
del self.__jobs_by_operation[operation_name]
job.unregister_operation_peer(peer)
if not job.n_peers and job.done and not job.lease:
self._delete_job(job.name)
def queue_job(self, job, skip_cache_lookup=False):
self.jobs[job.name] = job
def queue_job_operation(self, action, action_digest, priority=0, skip_cache_lookup=False):
"""Inserts a newly created job into the execution queue.
Warning:
Priority is handle like a POSIX ``nice`` values: a higher value
means a low priority, 0 being default priority.
Args:
action (Action): the given action to queue for execution.
action_digest (Digest): the digest of the given action.
priority (int): the execution job's priority.
skip_cache_lookup (bool): whether or not to look for pre-computed
result for the given action.
Returns:
str: the newly created operation's name.
"""
if action_digest.hash in self.__jobs_by_action:
job = self.__jobs_by_action[action_digest.hash]
# Reschedule if priority is now greater:
if priority < job.priority:
job.priority = priority
if job.operation_stage == OperationStage.QUEUED:
self._queue_job(job.name)
return job.name
job = Job(action, action_digest, priority=priority)
self.__jobs_by_action[job.action_digest.hash] = job
self.__jobs_by_name[job.name] = job
operation_stage = None
if self._action_cache is not None and not skip_cache_lookup:
try:
action_result = self._action_cache.get_action_result(job.action_digest)
except NotFoundError:
operation_stage = OperationStage.QUEUED
self.queue.append(job)
self._queue_job(job.name)
else:
job.set_cached_result(action_result)
operation_stage = OperationStage.COMPLETED
job.set_cached_result(action_result)
if self._is_instrumented:
self.__retries_count += 1
else:
operation_stage = OperationStage.QUEUED
self.queue.append(job)
self._queue_job(job.name)
self._update_job_operation_stage(job.name, operation_stage)
def retry_job(self, job_name):
job = self.jobs[job_name]
return job.name
operation_stage = None
if job.n_tries >= self.MAX_N_TRIES:
# TODO: Decide what to do with these jobs
operation_stage = OperationStage.COMPLETED
# TODO: Mark these jobs as done
def get_job_operation(self, operation_name):
"""Retrieves a job's :class:`Operation` by name.
else:
operation_stage = OperationStage.QUEUED
job.update_lease_state(LeaseState.PENDING)
self.queue.append(job)
Args:
operation_name (str): name of the operation to query.
self._update_job_operation_stage(job_name, operation_stage)
Raises:
NotFoundError: If no operation with `operation_name` exists.
"""
try:
job = self.__jobs_by_operation[operation_name]
def list_jobs(self):
return self.jobs.values()
except KeyError:
raise NotFoundError("Operation name does not exist: [{}]"
.format(operation_name))
return job.get_operation(operation_name)
def cancel_job_operation(self, operation_name):
""""Cancels a job's :class:`Operation` by name.
Args:
operation_name (str): name of the operation to cancel.
Raises:
NotFoundError: If no operation with `operation_name` exists.
"""
try:
job = self.__jobs_by_operation[operation_name]
except KeyError:
raise NotFoundError("Operation name does not exist: [{}]"
.format(operation_name))
job.cancel_operation_for_peer(operation_name)
def delete_job_operation(self, operation_name):
""""Removes a job.
Args:
operation_name (str): name of the operation to delete.
Raises:
NotFoundError: If no operation with `operation_name` exists.
"""
try:
job = self.__jobs_by_operation[operation_name]
except KeyError:
raise NotFoundError("Operation name does not exist: [{}]"
.format(operation_name))
if not job.n_peers and job.done and not job.lease:
self._delete_job(job.name)
# --- Public API: RWAPI ---
def request_job_leases(self, worker_capabilities):
"""Generates a list of the highest priority leases to be run.
......@@ -115,11 +248,14 @@ class Scheduler:
worker_capabilities (dict): a set of key-value pairs decribing the
worker properties, configuration and state at the time of the
request.
Warning: Worker capabilities handling is not implemented at the moment!
"""
if not self.queue:
if not self.__queue:
return []
job = self.queue.popleft()
# TODO: Try to match worker_capabilities with jobs properties.
job = self.__queue.pop()
lease = job.lease
......@@ -132,18 +268,25 @@ class Scheduler:
return None
def update_job_lease(self, lease):
def update_job_lease_state(self, job_name, lease):
"""Requests a state transition for a job's current :class:Lease.
Note:
This may trigger a job's :class:`Operation` stage transition.
Args:
job_name (str): name of the job to query.
lease_state (LeaseState): the lease state to transition to.
lease_status (google.rpc.Status): the lease execution status, only
required if `lease_state` is `COMPLETED`.
lease_result (google.protobuf.Any): the lease execution result, only
required if `lease_state` is `COMPLETED`.
job_name (str): name of the job to update lease state from.
lease (Lease): the lease holding the new state.
Raises:
NotFoundError: If no job with `job_name` exists.
"""
job = self.jobs[lease.id]
try:
job = self.__jobs_by_name[job_name]
except KeyError:
raise NotFoundError("Job name does not exist: [{}]".format(job_name))
lease_state = LeaseState(lease.state)
operation_stage = None
......@@ -179,38 +322,93 @@ class Scheduler:
self.__leases_by_state[LeaseState.ACTIVE].discard(lease.id)
self.__leases_by_state[LeaseState.COMPLETED].add(lease.id)
self._update_job_operation_stage(lease.id, operation_stage)
self._update_job_operation_stage(job_name, operation_stage)
def retry_job_lease(self, job_name):
"""Re-queues a job on lease execution failure.
Note:
This may trigger a job's :class:`Operation` stage transition.
Args:
job_name (str): name of the job to retry the lease from.
Raises:
NotFoundError: If no job with `job_name` exists.
"""
try:
job = self.__jobs_by_name[job_name]
except KeyError:
raise NotFoundError("Job name does not exist: [{}]".format(job_name))
operation_stage = None
if job.n_tries >= self.MAX_N_TRIES:
# TODO: Decide what to do with these jobs
operation_stage = OperationStage.COMPLETED
# TODO: Mark these jobs as done
else:
operation_stage = OperationStage.QUEUED
self._queue_job(job.name)
job.update_lease_state(LeaseState.PENDING)
self._update_job_operation_stage(job_name, operation_stage)
def get_job_lease(self, job_name):
"""Returns the lease associated to job, if any have been emitted yet."""
return self.jobs[job_name].lease
"""Returns the lease associated to job, if any have been emitted yet.
def get_job_lease_cancelled(self, job_name):
"""Returns true if the lease is cancelled"""
return self.jobs[job_name].lease_cancelled
Args:
job_name (str): name of the job to query the lease from.
Raises:
NotFoundError: If no job with `job_name` exists.
"""
try:
job = self.__jobs_by_name[job_name]
except KeyError:
raise NotFoundError("Job name does not exist: [{}]".format(job_name))
return job.lease
def delete_job_lease(self, job_name):
"""Discards the lease associated to a job."""
job = self.jobs[job_name]
"""Discards the lease associated with a job.
self.jobs[job.name].delete_lease()
Args:
job_name (str): name of the job to delete the lease from.
if not job.n_clients and job.operation.done:
self._delete_job(job.name)
Raises:
NotFoundError: If no job with `job_name` exists.
"""
try:
job = self.__jobs_by_name[job_name]
except KeyError:
raise NotFoundError("Job name does not exist: [{}]".format(job_name))
def get_job_operation(self, job_name):
"""Returns the operation associated to job."""
return self.jobs[job_name].operation
job.delete_lease()
def cancel_job_operation(self, job_name):
""""Cancels the underlying operation of a given job.
if not job.n_peers and job.operation.done:
self._delete_job(job.name)
This will also cancel any job's lease that may have been issued.
def get_job_lease_cancelled(self, job_name):
"""Returns true if the lease is cancelled.
Args:
job_name (str): name of the job holding the operation to cancel.
job_name (str): name of the job to query the lease state from.
Raises:
NotFoundError: If no job with `job_name` exists.
"""
self.jobs[job_name].cancel_operation()
try:
job = self.__jobs_by_name[job_name]
except KeyError:
raise NotFoundError("Job name does not exist: [{}]".format(job_name))
return job.lease_cancelled
# --- Public API: Monitoring ---
......@@ -260,11 +458,11 @@ class Scheduler:
self.__build_metadata_queues.append(message_queue)
def query_n_jobs(self):
return len(self.jobs)
return len(self.__jobs_by_name)
def query_n_operations(self):
# For now n_operations == n_jobs:
return len(self.jobs)
return len(self.__jobs_by_operation)
def query_n_operations_by_stage(self, operation_stage):
try:
......@@ -275,7 +473,7 @@ class Scheduler:
return 0
def query_n_leases(self):
return len(self.jobs)
return len(self.__jobs_by_name)
def query_n_leases_by_state(self, lease_state):
try:
......@@ -295,19 +493,35 @@ class Scheduler:
# --- Private API ---
def _queue_job(self, job_name):
"""Schedules or reschedules a job."""
job = self.__jobs_by_name[job_name]
if job.operation_stage == OperationStage.QUEUED:
self.__queue.sort()
else:
bisect.insort(self.__queue, job)
def _delete_job(self, job_name):
"""Drops an entry from the internal list of jobs."""
del self.jobs[job_name]
job = self.__jobs_by_name[job_name]
if job.operation_stage == OperationStage.QUEUED:
self.__queue.remove(job)
del self.__jobs_by_action[job.action_digest.hash]
del self.__jobs_by_name[job.name]
if self._is_instrumented:
self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job.name)
self.__operations_by_stage[OperationStage.QUEUED].discard(job.name)
self.__operations_by_stage[OperationStage.EXECUTING].discard(job.name)
self.__operations_by_stage[OperationStage.COMPLETED].discard(job.name)
self.__leases_by_state[LeaseState.PENDING].discard(job_name)
self.__leases_by_state[LeaseState.ACTIVE].discard(job_name)
self.__leases_by_state[LeaseState.COMPLETED].discard(job_name)
self.__leases_by_state[LeaseState.PENDING].discard(job.name)
self.__leases_by_state[LeaseState.ACTIVE].discard(job.name)
self.__leases_by_state[LeaseState.COMPLETED].discard(job.name)
def _update_job_operation_stage(self, job_name, operation_stage):
"""Requests a stage transition for the job's :class:Operations.
......@@ -316,7 +530,7 @@ class Scheduler:
job_name (str): name of the job to query.
operation_stage (OperationStage): the stage to transition to.
"""
job = self.jobs[job_name]
job = self.__jobs_by_name[job_name]
if operation_stage == OperationStage.CACHE_CHECK:
job.update_operation_stage(OperationStage.CACHE_CHECK)
......@@ -365,7 +579,7 @@ class Scheduler:
self.__queue_time_average = average_order, average_time
if not job.holds_cached_action_result:
if not job.holds_cached_result:
execution_metadata = job.action_result.execution_metadata
context_metadata = {'job-is': job.name}
......
......@@ -182,3 +182,11 @@ texinfo_documents = [
author, 'BuildGrid', 'One line description of project.',
'Miscellaneous'),
]
# -- Options for the autodoc extension ----------------------------------------
# This value selects if automatically documented members are sorted
# alphabetical (value 'alphabetical'), by member type (value 'groupwise') or
# by source order (value 'bysource'). The default is alphabetical.
autodoc_member_order = 'bysource'
......@@ -25,7 +25,6 @@ import pytest
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
from buildgrid.server import job
from buildgrid.server.controller import ExecutionController
from buildgrid.server.job import LeaseState
from buildgrid.server.bots import service
......@@ -159,7 +158,8 @@ def test_post_bot_event_temp(context, instance):
def _inject_work(scheduler, action=None, action_digest=None):
if not action:
action = remote_execution_pb2.Action()
if not action_digest:
action_digest = remote_execution_pb2.Digest()
j = job.Job(action, action_digest)
scheduler.queue_job(j, True)
scheduler.queue_job_operation(action, action_digest, skip_cache_lookup=True)
......@@ -20,11 +20,11 @@
import uuid
from unittest import mock
from google.protobuf import any_pb2
import grpc
from grpc._server import _Context
import pytest
from buildgrid._enums import OperationStage
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.google.longrunning import operations_pb2
......@@ -82,7 +82,7 @@ def test_execute(skip_cache_lookup, instance, context):
assert isinstance(result, operations_pb2.Operation)
metadata = remote_execution_pb2.ExecuteOperationMetadata()
result.metadata.Unpack(metadata)
assert metadata.stage == job.OperationStage.QUEUED.value
assert metadata.stage == OperationStage.QUEUED.value
operation_uuid = result.name.split('/')[-1]
assert uuid.UUID(operation_uuid, version=4)
assert result.done is False
......@@ -106,18 +106,14 @@ def test_no_action_digest_in_storage(instance, context):
def test_wait_execution(instance, controller, context):
j = job.Job(action, action_digest)
j._operation.done = True
job_name = controller.execution_instance._scheduler.queue_job_operation(action,
action_digest,
skip_cache_lookup=True)
request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
controller.execution_instance._scheduler._update_job_operation_stage(job_name,
OperationStage.COMPLETED)
controller.execution_instance._scheduler.jobs[j.name] = j
action_result_any = any_pb2.Any()
action_result = remote_execution_pb2.ActionResult()
action_result_any.Pack(action_result)
j.update_operation_stage(job.OperationStage.COMPLETED)
request = remote_execution_pb2.WaitExecutionRequest(name=job_name)
response = instance.WaitExecution(request, context)
......@@ -127,7 +123,6 @@ def test_wait_execution(instance, controller, context):
metadata = remote_execution_pb2.ExecuteOperationMetadata()
result.metadata.Unpack(metadata)
assert metadata.stage == job.OperationStage.COMPLETED.value
assert uuid.UUID(result.name, version=4)
assert result.done is True
......
......@@ -17,6 +17,7 @@
# pylint: disable=redefined-outer-name
import queue
from unittest import mock
from google.protobuf import any_pb2
......@@ -86,8 +87,13 @@ def blank_instance(controller):
# Queue an execution, get operation corresponding to that request
def test_get_operation(instance, controller, execute_request, context):
response_execute = controller.execution_instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
job_name = controller.execution_instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
message_queue = queue.Queue()
operation_name = controller.execution_instance.register_operation_peer(job_name,
context.peer(),
message_queue)
request = operations_pb2.GetOperationRequest()
......@@ -95,25 +101,28 @@ def test_get_operation(instance, controller, execute_request, context):
# we're manually creating the instance here, it doesn't get a name.
# Therefore we need to manually add the instance name to the operation
# name in the GetOperation request.
request.name = "{}/{}".format(instance_name, response_execute.name)
request.name = "{}/{}".format(instance_name, operation_name)
response = instance.GetOperation(request, context)
assert response.name == "{}/{}".format(instance_name, response_execute.name)
assert response.done == response_execute.done
assert response.name == "{}/{}".format(instance_name, operation_name)
# Queue an execution, get operation corresponding to that request
def test_get_operation_blank(blank_instance, controller, execute_request, context):
response_execute = controller.execution_instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
job_name = controller.execution_instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
message_queue = queue.Queue()
operation_name = controller.execution_instance.register_operation_peer(job_name,
context.peer(),
message_queue)
request = operations_pb2.GetOperationRequest()
request.name = response_execute.name
request.name = operation_name
response = blank_instance.GetOperation(request, context)
assert response.name == response_execute.name
assert response.done == response_execute.done
assert response.name == operation_name
def test_get_operation_fail(instance, context):
......@@ -133,25 +142,35 @@ def test_get_operation_instance_fail(instance, context):
def test_list_operations(instance, controller, execute_request, context):
response_execute = controller.execution_instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
job_name = controller.execution_instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
message_queue = queue.Queue()
operation_name = controller.execution_instance.register_operation_peer(job_name,
context.peer(),
message_queue)
request = operations_pb2.ListOperationsRequest(name=instance_name)
response = instance.ListOperations(request, context)
names = response.operations[0].name.split('/')
assert names[0] == instance_name
assert names[1] == response_execute.name
assert names[1] == operation_name
def test_list_operations_blank(blank_instance, controller, execute_request, context):
response_execute = controller.execution_instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
job_name = controller.execution_instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
message_queue = queue.Queue()
operation_name = controller.execution_instance.register_operation_peer(job_name,
context.peer(),
message_queue)
request = operations_pb2.ListOperationsRequest(name='')
response = blank_instance.ListOperations(request, context)
assert response.operations[0].name.split('/')[-1] == response_execute.name
assert response.operations[0].name.split('/')[-1] == operation_name
def test_list_operations_instance_fail(instance, controller, execute_request, context):
......@@ -174,14 +193,19 @@ def test_list_operations_empty(instance, context):
# Send execution off, delete, try to find operation should fail
def test_delete_operation(instance, controller, execute_request, context):
response_execute = controller.execution_instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
job_name = controller.execution_instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
message_queue = queue.Queue()
operation_name = controller.execution_instance.register_operation_peer(job_name,
context.peer(),
message_queue)
request = operations_pb2.DeleteOperationRequest()
request.name = response_execute.name
request.name = operation_name
instance.DeleteOperation(request, context)
request_name = "{}/{}".format(instance_name, response_execute.name)
request_name = "{}/{}".format(instance_name, operation_name)
with pytest.raises(InvalidArgumentError):
controller.operations_instance.get_operation(request_name)
......@@ -189,17 +213,11 @@ def test_delete_operation(instance, controller, execute_request, context):
# Send execution off, delete, try to find operation should fail
def test_delete_operation_blank(blank_instance, controller, execute_request, context):
response_execute = controller.execution_instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
request = operations_pb2.DeleteOperationRequest()
request.name = response_execute.name
request.name = "runner"
blank_instance.DeleteOperation(request, context)
request_name = response_execute.name
with pytest.raises(InvalidArgumentError):
controller.operations_instance.get_operation(request_name)
context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
def test_delete_operation_fail(instance, context):
......@@ -211,11 +229,16 @@ def test_delete_operation_fail(instance, context):
def test_cancel_operation(instance, controller, execute_request, context):
response_execute = controller.execution_instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
job_name = controller.execution_instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
message_queue = queue.Queue()
operation_name = controller.execution_instance.register_operation_peer(job_name,
context.peer(),
message_queue)
request = operations_pb2.CancelOperationRequest()
request.name = "{}/{}".format(instance_name, response_execute.name)
request.name = "{}/{}".format(instance_name, operation_name)
instance.CancelOperation(request, context)
......@@ -238,7 +261,7 @@ def test_cancel_operation_blank(blank_instance, context):
context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
def test_cancel_operation_instance_fail(instance, context):
def test_cancel_operation__fail(instance, context):
request = operations_pb2.CancelOperationRequest()
instance.CancelOperation(request, context)
......