Skip to content
Snippets Groups Projects
Commit 0483af91 authored by finn's avatar finn
Browse files

Bot work is now sent on an executor to stop them blocking the session.

Also some small fixes to returned work from bots to execution.
parent 7b170f95
No related branches found
No related tags found
No related merge requests found
......@@ -104,14 +104,15 @@ class BotSession:
self._update_lease_from_server(lease)
def update_bot_session(self):
self.logger.debug("Updating bot session: {}".format(self._bot_id))
session = self._interface.update_bot_session(self.get_pb2())
for lease in session.leases:
self._update_lease_from_server(lease)
for k, v in self._leases.items():
for k, v in list(self._leases.items()):
if v.state == LeaseState.COMPLETED.value:
del self._leases[k]
for lease in session.leases:
self._update_lease_from_server(lease)
def get_pb2(self):
leases = list(self._leases.values())
if not leases:
......@@ -134,12 +135,16 @@ class BotSession:
# TODO: Compare with previous state of lease
if lease.state == LeaseState.PENDING.value:
lease.state = LeaseState.ACTIVE.value
asyncio.ensure_future(self.create_work(lease))
self._leases[lease.id] = lease
self.update_bot_session()
asyncio.ensure_future(self.create_work(lease))
async def create_work(self, lease):
self.logger.debug("Work created: {}".format(lease.id))
lease = await self._work(self._context, lease)
loop = asyncio.get_event_loop()
lease = await loop.run_in_executor(None, self._work, self._context, lease)
self.logger.debug("Work complete: {}".format(lease.id))
self.lease_completed(lease)
......
......@@ -21,26 +21,25 @@ from enum import Enum
from google.protobuf import any_pb2
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteOperationMetadata
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteResponse
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
from buildgrid._protos.google.longrunning import operations_pb2
class ExecuteStage(Enum):
UNKNOWN = ExecuteOperationMetadata.Stage.Value('UNKNOWN')
UNKNOWN = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('UNKNOWN')
# Checking the result against the cache.
CACHE_CHECK = ExecuteOperationMetadata.Stage.Value('CACHE_CHECK')
CACHE_CHECK = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('CACHE_CHECK')
# Currently idle, awaiting a free machine to execute.
QUEUED = ExecuteOperationMetadata.Stage.Value('QUEUED')
QUEUED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('QUEUED')
# Currently being executed by a worker.
EXECUTING = ExecuteOperationMetadata.Stage.Value('EXECUTING')
EXECUTING = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('EXECUTING')
# Finished execution.
COMPLETED = ExecuteOperationMetadata.Stage.Value('COMPLETED')
COMPLETED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('COMPLETED')
class BotStatus(Enum):
......@@ -80,13 +79,13 @@ class Job:
def __init__(self, action_digest, do_not_cache=False, message_queue=None):
self.lease = None
self.logger = logging.getLogger(__name__)
self.n_tries = 0
self.result = None
self.result_cached = False
self._action_digest = action_digest
self._do_not_cache = do_not_cache
self._execute_stage = ExecuteStage.UNKNOWN
self._n_tries = 0
self._name = str(uuid.uuid4())
self._operation = operations_pb2.Operation(name=self._name)
self._operation_update_queues = []
......@@ -122,15 +121,16 @@ class Job:
self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
if self.result is not None:
self._operation.done = True
response = ExecuteResponse()
self.result.Unpack(response.result)
response.cached_result = self.result_cached
action_result = remote_execution_pb2.ActionResult()
self.result.Unpack(action_result)
response = remote_execution_pb2.ExecuteResponse(result=action_result,
cached_result=self.result_cached)
self._operation.response.CopyFrom(self._pack_any(response))
return self._operation
def get_operation_meta(self):
meta = ExecuteOperationMetadata()
meta = remote_execution_pb2.ExecuteOperationMetadata()
meta.stage = self._execute_stage.value
meta.action_digest.CopyFrom(self._action_digest)
......
......@@ -25,7 +25,6 @@ from collections import deque
from google.protobuf import any_pb2
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult
from buildgrid._protos.google.longrunning import operations_pb2
from .job import ExecuteStage, LeaseState
......@@ -83,9 +82,7 @@ class Scheduler:
job.update_execute_stage(ExecuteStage.COMPLETED)
self.jobs[name] = job
if not job.do_not_cache and self.action_cache is not None:
action_result = ActionResult()
result.Unpack(action_result)
self.action_cache.put_action_result(job.action_digest, action_result)
self.action_cache.put_action_result(job.action_digest, result)
def get_operations(self):
response = operations_pb2.ListOperationsResponse()
......@@ -94,7 +91,7 @@ class Scheduler:
return response
def update_job_lease_state(self, name, state):
job = self.jobs.get(name)
job = self.jobs[name]
job.lease.state = state
self.jobs[name] = job
......
......@@ -100,12 +100,14 @@ def test_list_operations_with_result(instance, execute_request, context):
action_result = remote_execution_pb2.ActionResult()
output_file = remote_execution_pb2.OutputFile(path='unicorn')
action_result.output_files.extend([output_file])
instance._instance._scheduler.jobs[response_execute.name].result = _pack_any(action_result)
instance._instance._scheduler.job_complete(response_execute.name, _pack_any(action_result))
request = operations_pb2.ListOperationsRequest()
response = instance.ListOperations(request, context)
assert response.operations[0].name == response_execute.name
execute_response = remote_execution_pb2.ExecuteResponse()
response.operations[0].response.Unpack(execute_response)
assert execute_response.result == action_result
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment