Skip to content
Snippets Groups Projects
Commit 6b6d8039 authored by Martin Blanchard's avatar Martin Blanchard
Browse files

job.py: Hold the original Action object

parent 1480d996
No related branches found
No related tags found
No related merge requests found
Pipeline #33419849 passed
......@@ -48,7 +48,10 @@ class ExecutionInstance:
if not action:
raise FailedPreconditionError("Could not get action from storage.")
job = Job(action_digest, action.do_not_cache, message_queue)
job = Job(action, action_digest)
if message_queue is not None:
job.register_client(message_queue)
self.logger.info("Operation name: [{}]".format(job.name))
self._scheduler.queue_job(job, skip_cache_lookup)
......
......@@ -50,10 +50,11 @@ class LeaseState(Enum):
class Job:
def __init__(self, action_digest, do_not_cache=False, message_queue=None):
def __init__(self, action, action_digest):
self.logger = logging.getLogger(__name__)
self._name = str(uuid.uuid4())
self._action = remote_execution_pb2.Action()
self._operation = operations_pb2.Operation()
self._lease = None
......@@ -63,15 +64,13 @@ class Job:
self.__operation_metadata.action_digest.CopyFrom(action_digest)
self.__operation_metadata.stage = OperationStage.UNKNOWN.value
self._do_not_cache = do_not_cache
self._action.CopyFrom(action)
self._do_not_cache = self._action.do_not_cache
self._operation_update_queues = []
self._operation.name = self._name
self._operation.done = False
self._n_tries = 0
if message_queue is not None:
self.register_client(message_queue)
@property
def name(self):
return self._name
......@@ -80,6 +79,10 @@ class Job:
def do_not_cache(self):
return self._do_not_cache
@property
def action(self):
return self._action
@property
def action_digest(self):
return self.__operation_metadata.action_digest
......@@ -180,6 +183,10 @@ class Job:
elif self._lease.state == LeaseState.COMPLETED.value:
action_result = remote_execution_pb2.ActionResult()
# TODO: Make a distinction between build and bot failures!
if status.code != 0:
self._do_not_cache = True
if result is not None:
assert result.Is(action_result.DESCRIPTOR)
result.Unpack(action_result)
......
......@@ -116,9 +116,8 @@ class Scheduler:
job.update_lease_state(lease_state,
status=lease_status, result=lease_result)
if not job.do_not_cache and self._action_cache is not None:
if not job.lease.status.code:
self._action_cache.update_action_result(job.action_digest, job.action_result)
if self._action_cache is not None and not job.do_not_cache:
self._action_cache.update_action_result(job.action_digest, job.action_result)
job.update_operation_stage(OperationStage.COMPLETED)
......
......@@ -137,7 +137,7 @@ def test_update_leases_with_work(bot_session, context, instance):
bot_session=bot_session)
action_digest = remote_execution_pb2.Digest(hash='gaff')
_inject_work(instance._instances[""]._scheduler, action_digest)
_inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
response = instance.CreateBotSession(request, context)
......@@ -159,7 +159,7 @@ def test_update_leases_work_complete(bot_session, context, instance):
# Inject work
action_digest = remote_execution_pb2.Digest(hash='gaff')
_inject_work(instance._instances[""]._scheduler, action_digest)
_inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
request = bots_pb2.UpdateBotSessionRequest(name=response.name,
bot_session=response)
......@@ -188,7 +188,7 @@ def test_work_rejected_by_bot(bot_session, context, instance):
bot_session=bot_session)
# Inject work
action_digest = remote_execution_pb2.Digest(hash='gaff')
_inject_work(instance._instances[""]._scheduler, action_digest)
_inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
# Simulated the severed binding between client and server
response = copy.deepcopy(instance.CreateBotSession(request, context))
......@@ -210,7 +210,7 @@ def test_work_out_of_sync_from_pending(state, bot_session, context, instance):
bot_session=bot_session)
# Inject work
action_digest = remote_execution_pb2.Digest(hash='gaff')
_inject_work(instance._instances[""]._scheduler, action_digest)
_inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
# Simulated the severed binding between client and server
response = copy.deepcopy(instance.CreateBotSession(request, context))
......@@ -231,7 +231,7 @@ def test_work_out_of_sync_from_active(state, bot_session, context, instance):
bot_session=bot_session)
# Inject work
action_digest = remote_execution_pb2.Digest(hash='gaff')
_inject_work(instance._instances[""]._scheduler, action_digest)
_inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
# Simulated the severed binding between client and server
response = copy.deepcopy(instance.CreateBotSession(request, context))
......@@ -258,7 +258,7 @@ def test_work_active_to_active(bot_session, context, instance):
bot_session=bot_session)
# Inject work
action_digest = remote_execution_pb2.Digest(hash='gaff')
_inject_work(instance._instances[""]._scheduler, action_digest)
_inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
# Simulated the severed binding between client and server
response = copy.deepcopy(instance.CreateBotSession(request, context))
......@@ -280,8 +280,10 @@ def test_post_bot_event_temp(context, instance):
context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
def _inject_work(scheduler, action_digest=None):
def _inject_work(scheduler, action=None, action_digest=None):
if not action:
action = remote_execution_pb2.Action()
if not action_digest:
action_digest = remote_execution_pb2.Digest()
j = job.Job(action_digest, False)
j = job.Job(action, action_digest)
scheduler.queue_job(j, True)
......@@ -105,7 +105,7 @@ def test_no_action_digest_in_storage(instance, context):
def test_wait_execution(instance, controller, context):
j = job.Job(action_digest, None)
j = job.Job(action, action_digest)
j._operation.done = True
request = remote_execution_pb2.WaitExecutionRequest(name="{}/{}".format('', j.name))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment