Skip to content
Snippets Groups Projects
Commit e706ce2f authored by Martin Blanchard's avatar Martin Blanchard
Browse files

Track operations by peer

parent e5f4a597
No related branches found
No related tags found
Loading
Pipeline #39743800 canceled
......@@ -44,7 +44,7 @@ class ExecutionInstance:
def hash_type(self):
return get_hash_type()
def execute(self, action_digest, skip_cache_lookup):
def execute(self, action_digest, skip_cache_lookup, peer):
""" Sends a job for execution.
Queues an action and creates an Operation instance to be associated with
this action.
......@@ -54,7 +54,7 @@ class ExecutionInstance:
if not action:
raise FailedPreconditionError("Could not get action from storage.")
return self._scheduler.queue_job(action, action_digest, skip_cache_lookup)
return self._scheduler.queue_job(peer, action, action_digest, skip_cache_lookup)
def register_operation_client(self, operation_name, peer, message_queue):
try:
......
......@@ -35,7 +35,7 @@ class Job:
self._name = str(uuid.uuid4())
self._priority = priority
self._action = remote_execution_pb2.Action()
self._operation = operations_pb2.Operation()
self._operations = []
self._lease = None
self.__execute_response = None
......@@ -46,17 +46,18 @@ class Job:
self.__worker_start_timestamp = timestamp_pb2.Timestamp()
self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
self.__operation_message_queues = {}
self.__operation_cancelled = False
self.__operations_by_name = {}
self.__operations_by_peer = {}
self.__operations_message_queues = {}
self.__operations_cancelled = []
self.__lease_cancelled = False
self.__job_cancelled = False
self.__operation_metadata.action_digest.CopyFrom(action_digest)
self.__operation_metadata.stage = OperationStage.UNKNOWN.value
self._action.CopyFrom(action)
self._do_not_cache = self._action.do_not_cache
self._operation.name = self._name
self._operation.done = False
self._n_tries = 0
def __eq__(self, other):
......@@ -111,6 +112,9 @@ class Job:
def operation_stage(self):
return OperationStage(self.__operation_metadata.state)
def operations(self):
return self._operations
@property
def lease(self):
return self._lease
......@@ -132,7 +136,9 @@ class Job:
@property
def n_clients(self):
return len(self.__operation_message_queues)
return len(self.__operations_message_queues)
# --- Public API: REAPI-side ---
def register_operation_client(self, peer, message_queue):
"""Subscribes to the job's :class:`Operation` stage changes.
......@@ -146,12 +152,25 @@ class Job:
Return:
str: the subscribed operation's name.
"""
if peer not in self.__operation_message_queues:
self.__operation_message_queues[peer] = message_queue
if peer not in self.__operations_message_queues:
self.__operations_message_queues[peer] = message_queue
elif self.__operations_message_queues[peer] != message_queue:
self.__operations_message_queues[peer] = message_queue
if peer not in self.__operations_by_peer:
operation = operations_pb2.Operation()
operation.name = str(uuid.uuid4())
operation.done = False
self.__operations_by_name[operation.name] = operation
self.__operations_by_peer[peer] = operation
self._operations.append(operation)
message = (None, self._copy_operation(self._operation),)
send_operation_update = True
message_queue.put(message)
if send_operation_update:
message = (None, self._copy_operation(self._operation),)
message_queue.put(message)
return self._operation.name
......@@ -161,8 +180,8 @@ class Job:
Args:
peer (str): a unique string identifying the client.
"""
if peer not in self.__operation_message_queues:
del self.__operation_message_queues[peer]
if peer not in self.__operations_message_queues:
del self.__operations_message_queues[peer]
def set_cached_result(self, action_result):
"""Allows specifying an action result form the action cache for the job.
......@@ -171,6 +190,50 @@ class Job:
self.__execute_response.result.CopyFrom(action_result)
self.__execute_response.cached_result = True
def update_operation_stage(self, stage):
"""Operates a stage transition for the job's :class:Operation.
Args:
stage (OperationStage): the operation stage to transition to.
"""
if stage.value == self.__operation_metadata.stage:
return
self.__operation_metadata.stage = stage.value
if self.__operation_metadata.stage == OperationStage.QUEUED.value:
if self.__queued_timestamp.ByteSize() == 0:
self.__queued_timestamp.GetCurrentTime()
self._n_tries += 1
self._update_operations()
def cancel_operation(self):
"""Triggers a job's :class:Operation cancellation.
This will also cancel any job's :class:Lease that may have been issued.
"""
self.__operation_cancelled = True
if self._lease is not None:
self.cancel_lease()
self.__execute_response = remote_execution_pb2.ExecuteResponse()
self.__execute_response.status.code = code_pb2.CANCELLED
self.__execute_response.status.message = "Operation cancelled by client."
self.update_operation_stage(OperationStage.COMPLETED)
def check_operation_status(self):
"""Reports errors on unexpected job's :class:Operation state.
Raises:
CancelledError: if the job's :class:Operation was cancelled.
"""
if self.__operation_cancelled:
raise CancelledError(self.__execute_response.status.message)
# --- Public API: RWAPI-side ---
def create_lease(self):
"""Emits a new :class:`Lease` for the job.
......@@ -246,32 +309,30 @@ class Job:
if self._lease is not None:
self.update_lease_state(LeaseState.CANCELLED)
def update_operation_stage(self, stage):
"""Operates a stage transition for the job's :class:Operation.
# --- Private API ---
Args:
stage (OperationStage): the operation stage to transition to.
"""
if stage.value == self.__operation_metadata.stage:
return
def _copy_operation(self, operation):
new_operation = operations_pb2.Operation()
self.__operation_metadata.stage = stage.value
new_operation.CopyFrom(operation)
if self.__operation_metadata.stage == OperationStage.QUEUED.value:
if self.__queued_timestamp.ByteSize() == 0:
self.__queued_timestamp.GetCurrentTime()
self._n_tries += 1
return new_operation
def _update_operations(self, restricted_peers_to_notify=None):
for operation in self._operations:
if operation.name not in self.__operations_by_name:
continue
elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
if self.__execute_response is not None:
self._operation.response.Pack(self.__execute_response)
self._operation.done = True
elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
if self.__execute_response is not None:
self._operation.response.Pack(self.__execute_response)
self._operation.done = True
self._operation.metadata.Pack(self.__operation_metadata)
operation.metadata.Pack(self.__operation_metadata)
if not self.__operation_cancelled:
message = (None, self._copy_operation(self._operation),)
......
......@@ -110,10 +110,12 @@ class Scheduler:
self.__leases_by_state[LeaseState.ACTIVE].discard(job_name)
self.__leases_by_state[LeaseState.COMPLETED].discard(job_name)
def queue_job(self, action, action_digest, priority=0, skip_cache_lookup=False):
def queue_job(self, peer, action, action_digest, priority=0, skip_cache_lookup=False):
"""Inserts a newly created job into the execution queue.
Args:
peer (str): a unique string identifying the client.
action (Action): the given action to queue for execution.
action_digest (Digest): the digest of the given action.
priority (int): the execution job's priority.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment