Skip to content
Snippets Groups Projects
Commit 57f04b60 authored by Finn's avatar Finn
Browse files

Message queue now queues the job.

Check for operation status when streaming operations.
Allows client to catch a cancelled job.
parent a76ce574
No related branches found
No related tags found
No related merge requests found
......@@ -72,8 +72,10 @@ class ExecutionInstance:
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
def stream_operation_updates(self, message_queue, operation_name):
operation = message_queue.get()
while not operation.done:
yield operation
operation = message_queue.get()
yield operation
job = message_queue.get()
while not job.operation.done:
yield job.operation
job = message_queue.get()
job.check_operation_status()
yield job.operation
......@@ -109,11 +109,13 @@ class Job:
def register_client(self, queue):
"""Subscribes to the job's :class:`Operation` stage change events.
Queues this :object:`Job` instance.
Args:
queue (queue.Queue): the event queue to register.
"""
self._operation_update_queues.append(queue)
queue.put(self._operation)
queue.put(self)
def unregister_client(self, queue):
"""Unsubscribes to the job's :class:`Operation` stage change events.
......@@ -229,7 +231,16 @@ class Job:
self._operation.metadata.Pack(self.__operation_metadata)
for queue in self._operation_update_queues:
queue.put(self._operation)
queue.put(self)
def check_operation_status(self):
"""Reports errors on unexpected job's :class:Operation state.
Raises:
CancelledError: if the job's :class:Operation was cancelled.
"""
if self.__operation_cancelled:
raise CancelledError(self.__execute_response.status.message)
def cancel_operation(self):
"""Triggers a job's :class:Operation cancellation.
......
......@@ -87,8 +87,10 @@ class OperationsInstance:
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
def stream_operation_updates(self, message_queue, operation_name):
operation = message_queue.get()
while not operation.done:
yield operation
operation = message_queue.get()
yield operation
job = message_queue.get()
while not job.operation.done:
yield job.operation
job = message_queue.get()
job.check_operation_status()
yield job.operation
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment