Skip to content
Snippets Groups Projects
Commit 0b50c52e authored by Tristan Van Berkom's avatar Tristan Van Berkom
Browse files

_scheduler/jobs/job.py: Allow subclasses to message the frontend

parent 60586f80
No related branches found
No related tags found
Loading
......@@ -58,10 +58,10 @@ class JobStatus():
# Used to distinguish between status messages and return values
class Envelope():
class _Envelope():
def __init__(self, message_type, message):
self._message_type = message_type
self._message = message
self.message_type = message_type
self.message = message
# Process class that doesn't call waitpid on its own.
......@@ -275,10 +275,37 @@ class Job():
def set_task_id(self, task_id):
self._task_id = task_id
# send_message()
#
# To be called from inside Job.child_process() implementations
# to send messages to the main process during processing.
#
# These messages will be processed by the class's Job.handle_message()
# implementation.
#
def send_message(self, message_type, message):
self._queue.put(_Envelope(message_type, message))
#######################################################
# Abstract Methods #
#######################################################
# handle_message()
#
# Handle a custom message. This will be called in the main process in
# response to any messages sent to the main proces using the
# Job.send_message() API from inside a Job.child_process() implementation
#
# Args:
# message_type (str): A string to identify the message type
# message (any): A simple serializable object
#
# Returns:
# (bool): Should return a truthy value if message_type is handled.
#
def handle_message(self, message_type, message):
return False
# parent_complete()
#
# This will be executed after the job finishes, and is expected to
......@@ -416,7 +443,7 @@ class Job():
elapsed=elapsed, detail=e.detail,
logfile=filename, sandbox=e.sandbox)
self._queue.put(Envelope('child_data', self.child_process_data()))
self._queue.put(_Envelope('child_data', self.child_process_data()))
# Report the exception to the parent (for internal testing purposes)
self._child_send_error(e)
......@@ -442,7 +469,7 @@ class Job():
else:
# No exception occurred in the action
self._queue.put(Envelope('child_data', self.child_process_data()))
self._queue.put(_Envelope('child_data', self.child_process_data()))
self._child_send_result(result)
elapsed = datetime.datetime.now() - starttime
......@@ -469,7 +496,7 @@ class Job():
domain = e.domain
reason = e.reason
envelope = Envelope('error', {
envelope = _Envelope('error', {
'domain': domain,
'reason': reason
})
......@@ -487,7 +514,7 @@ class Job():
#
def _child_send_result(self, result):
if result is not None:
envelope = Envelope('result', result)
envelope = _Envelope('result', result)
self._queue.put(envelope)
# _child_shutdown()
......@@ -524,7 +551,7 @@ class Job():
if message.message_type == MessageType.LOG:
return
self._queue.put(Envelope('message', message))
self._queue.put(_Envelope('message', message))
# _parent_shutdown()
#
......@@ -588,24 +615,28 @@ class Job():
if not self._listening:
return
if envelope._message_type == 'message':
if envelope.message_type == 'message':
# Propagate received messages from children
# back through the context.
self._scheduler.context.message(envelope._message)
elif envelope._message_type == 'error':
self._scheduler.context.message(envelope.message)
elif envelope.message_type == 'error':
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
# is currently managed in _exceptions.py
set_last_task_error(envelope._message['domain'],
envelope._message['reason'])
elif envelope._message_type == 'result':
set_last_task_error(envelope.message['domain'],
envelope.message['reason'])
elif envelope.message_type == 'result':
assert self._result is None
self._result = envelope._message
elif envelope._message_type == 'child_data':
self._result = envelope.message
elif envelope.message_type == 'child_data':
# If we retry a job, we assign a new value to this
self.child_data = envelope._message
else:
raise Exception()
self.child_data = envelope.message
# Try Job subclass specific messages now
elif not self.handle_message(envelope.message_type,
envelope.message):
assert 0, "Unhandled message type '{}': {}" \
.format(envelope.message_type, envelope.message)
# _parent_process_queue()
#
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment