Skip to content
Snippets Groups Projects
Commit 4c708a2d authored by finn's avatar finn
Browse files

Added messaging queues for operation updates

ExecutionService now registers for message updates.
Once all messages have been read and job is completed,
job is removed.
parent dfaadddf
No related branches found
No related tags found
Loading
Pipeline #27104276 passed
...@@ -34,12 +34,12 @@ class ExecutionInstance(): ...@@ -34,12 +34,12 @@ class ExecutionInstance():
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self._scheduler = scheduler self._scheduler = scheduler
def execute(self, action_digest, skip_cache_lookup): def execute(self, action_digest, skip_cache_lookup, message_queue=None):
""" Sends a job for execution. """ Sends a job for execution.
Queues an action and creates an Operation instance to be associated with Queues an action and creates an Operation instance to be associated with
this action. this action.
""" """
job = Job(action_digest) job = Job(action_digest, message_queue)
self.logger.info("Operation name: {}".format(job.name)) self.logger.info("Operation name: {}".format(job.name))
if not skip_cache_lookup: if not skip_cache_lookup:
...@@ -70,3 +70,15 @@ class ExecutionInstance(): ...@@ -70,3 +70,15 @@ class ExecutionInstance():
def cancel_operation(self, name): def cancel_operation(self, name):
# TODO: Cancel leases # TODO: Cancel leases
raise NotImplementedError("Cancelled operations not supported") raise NotImplementedError("Cancelled operations not supported")
def register_message_client(self, name, queue):
try:
self._scheduler.register_client(name, queue)
except KeyError:
raise InvalidArgumentError("Operation name does not exist: {}".format(name))
def unregister_message_client(self, name, queue):
try:
self._scheduler.unregister_client(name, queue)
except KeyError:
raise InvalidArgumentError("Operation name does not exist: {}".format(name))
...@@ -22,10 +22,9 @@ ExecutionService ...@@ -22,10 +22,9 @@ ExecutionService
Serves remote execution requests. Serves remote execution requests.
""" """
import copy
import grpc import grpc
import logging import logging
import time import queue
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2 from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
...@@ -35,17 +34,23 @@ from ._exceptions import InvalidArgumentError ...@@ -35,17 +34,23 @@ from ._exceptions import InvalidArgumentError
class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
def __init__(self, instance): def __init__(self, instance):
self._instance = instance
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self._instance = instance
def Execute(self, request, context): def Execute(self, request, context):
# Ignore request.instance_name for now # Ignore request.instance_name for now
# Have only one instance # Have only one instance
try: try:
message_queue = queue.Queue()
operation = self._instance.execute(request.action_digest, operation = self._instance.execute(request.action_digest,
request.skip_cache_lookup) request.skip_cache_lookup,
message_queue)
yield from self._stream_operation_updates(operation.name) remove_client = lambda : self._remove_client(operation.name, message_queue)
context.add_callback(remove_client)
yield from self._stream_operation_updates(message_queue,
operation.name)
except InvalidArgumentError as e: except InvalidArgumentError as e:
self.logger.error(e) self.logger.error(e)
...@@ -59,19 +64,28 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): ...@@ -59,19 +64,28 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
def WaitExecution(self, request, context): def WaitExecution(self, request, context):
try: try:
yield from self._stream_operation_updates(request.name) message_queue = queue.Queue()
operation_name = request.name
self._instance.register_message_client(operation_name, message_queue)
remove_client = lambda : self._remove_client(operation_name, message_queue)
context.add_callback(remove_client)
yield from self._stream_operation_updates(message_queue,
operation_name)
except InvalidArgumentError as e: except InvalidArgumentError as e:
self.logger.error(e) self.logger.error(e)
context.set_details(str(e)) context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT) context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
def _stream_operation_updates(self, name): def _remove_client(self, operation_name, message_queue):
stream_previous = None self._instance.unregister_message_client(operation_name, message_queue)
while True:
stream = self._instance.get_operation(name) def _stream_operation_updates(self, message_queue, operation_name):
if stream != stream_previous: operation = message_queue.get()
yield stream while not operation.done:
if stream.done == True: break yield operation
stream_previous = copy.deepcopy(stream) operation = message_queue.get()
time.sleep(1) yield operation
...@@ -51,9 +51,8 @@ class LeaseState(Enum): ...@@ -51,9 +51,8 @@ class LeaseState(Enum):
class Job(): class Job():
def __init__(self, action): def __init__(self, action_digest, message_queue=None):
self.action = action self.action_digest = action_digest
self.bot_status = BotStatus.BOT_STATUS_UNSPECIFIED
self.execute_stage = ExecuteStage.UNKNOWN self.execute_stage = ExecuteStage.UNKNOWN
self.lease = None self.lease = None
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
...@@ -62,10 +61,24 @@ class Job(): ...@@ -62,10 +61,24 @@ class Job():
self._n_tries = 0 self._n_tries = 0
self._operation = operations_pb2.Operation(name = self.name) self._operation = operations_pb2.Operation(name = self.name)
self._operation_update_queues = []
if message_queue is not None:
self.register_client(message_queue)
def check_job_finished(self):
if not self._operation_update_queues:
return self._operation.done
return False
def register_client(self, queue):
self._operation_update_queues.append(queue)
def unregister_client(self, queue):
self._operation_update_queues.remove(queue)
def get_operation(self): def get_operation(self):
self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta())) self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
if self.result is not None: if self.result is not None:
self._operation.done = True self._operation.done = True
response = ExecuteResponse() response = ExecuteResponse()
...@@ -81,10 +94,10 @@ class Job(): ...@@ -81,10 +94,10 @@ class Job():
return meta return meta
def create_lease(self): def create_lease(self):
action = self._pack_any(self.action) action_digest = self._pack_any(self.action_digest)
lease = bots_pb2.Lease(id = self.name, lease = bots_pb2.Lease(id = self.name,
payload = action, payload = action_digest,
state = LeaseState.PENDING.value) state = LeaseState.PENDING.value)
self.lease = lease self.lease = lease
return lease return lease
...@@ -92,6 +105,11 @@ class Job(): ...@@ -92,6 +105,11 @@ class Job():
def get_operations(self): def get_operations(self):
return operations_pb2.ListOperationsResponse(operations = [self.get_operation()]) return operations_pb2.ListOperationsResponse(operations = [self.get_operation()])
def update_execute_stage(self, stage):
self.execute_stage = stage
for queue in self._operation_update_queues:
queue.put(self.get_operation())
def _pack_any(self, pack): def _pack_any(self, pack):
any = any_pb2.Any() any = any_pb2.Any()
any.Pack(pack) any.Pack(pack)
......
...@@ -35,8 +35,17 @@ class Scheduler(): ...@@ -35,8 +35,17 @@ class Scheduler():
self.jobs = {} self.jobs = {}
self.queue = deque() self.queue = deque()
def register_client(self, name, queue):
self.jobs[name].register_client(queue)
def unregister_client(self, name, queue):
job = self.jobs[name]
job.unregister_client(queue)
if job.check_job_finished():
del self.jobs[name]
def append_job(self, job): def append_job(self, job):
job.execute_stage = ExecuteStage.QUEUED job.update_execute_stage(ExecuteStage.QUEUED)
self.jobs[job.name] = job self.jobs[job.name] = job
self.queue.append(job) self.queue.append(job)
...@@ -45,9 +54,9 @@ class Scheduler(): ...@@ -45,9 +54,9 @@ class Scheduler():
if job.n_tries >= self.MAX_N_TRIES: if job.n_tries >= self.MAX_N_TRIES:
# TODO: Decide what to do with these jobs # TODO: Decide what to do with these jobs
job.execute_stage = ExecuteStage.COMPLETED job.update_execute_stage(ExecuteStage.COMPLETED)
else: else:
job.execute_stage = ExecuteStage.QUEUED job.update_execute_stage(ExecuteStage.QUEUED)
job.n_tries += 1 job.n_tries += 1
self.queue.appendleft(job) self.queue.appendleft(job)
...@@ -56,15 +65,14 @@ class Scheduler(): ...@@ -56,15 +65,14 @@ class Scheduler():
def create_job(self): def create_job(self):
if len(self.queue) > 0: if len(self.queue) > 0:
job = self.queue.popleft() job = self.queue.popleft()
job.execute_stage = ExecuteStage.EXECUTING job.update_execute_stage(ExecuteStage.EXECUTING)
self.jobs[job.name] = job self.jobs[job.name] = job
return job return job
return None
def job_complete(self, name, result): def job_complete(self, name, result):
job = self.jobs[name] job = self.jobs[name]
job.execute_stage = ExecuteStage.COMPLETED
job.result = result job.result = result
job.update_execute_stage(ExecuteStage.COMPLETED)
self.jobs[name] = job self.jobs[name] = job
def get_operations(self): def get_operations(self):
...@@ -122,3 +130,7 @@ class Scheduler(): ...@@ -122,3 +130,7 @@ class Scheduler():
if state == LeaseState.PENDING.value or \ if state == LeaseState.PENDING.value or \
state == LeaseState.ACTIVE.value: state == LeaseState.ACTIVE.value:
self.retry_job(name) self.retry_job(name)
def _update_execute_stage(self, job, stage):
job.update_execute_stage(stage)
return job
...@@ -69,17 +69,22 @@ def test_execute(skip_cache_lookup, instance, context): ...@@ -69,17 +69,22 @@ def test_execute(skip_cache_lookup, instance, context):
assert result.done is False assert result.done is False
def test_wait_execution(instance, context): def test_wait_execution(instance, context):
# TODO: Figure out why next(response) hangs on the .get()
# method when running in pytest.
action_digest = remote_execution_pb2.Digest() action_digest = remote_execution_pb2.Digest()
action_digest.hash = 'zhora' action_digest.hash = 'zhora'
execution_request = remote_execution_pb2.ExecuteRequest(instance_name = '', j = job.Job(action_digest, None)
action_digest = action_digest, j._operation.done = True
skip_cache_lookup = True)
execution_response = next(instance.Execute(execution_request, context))
request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
request = remote_execution_pb2.WaitExecutionRequest(name=execution_response.name) instance._instance._scheduler.jobs[j.name] = j
response = next(instance.WaitExecution(request, context)) action_result_any = any_pb2.Any()
action_result = remote_execution_pb2.ActionResult()
action_result_any.Pack(action_result)
assert response == execution_response instance._instance._scheduler._update_execute_stage(j, job.ExecuteStage.COMPLETED)
response = instance.WaitExecution(request, context)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment