Resolve "Replace ProcessPoolExecutor as it does not handle abrupt pool process termination"
Replace ProcessPoolExecutor
with a custom-made worker queue processor.
The new worker queue starts a new multiprocessing.Process
for each new worker and keeps track of its execution status using coroutine _process_worker()
.
Reception of signal SIGCHLD
is used as trigger to check termination status of worker child processes, so that asyncio.sleep()
-based polling is avoided.
Upon cancellation of the worker queue, currently running worker processes will be sent a SIGTERM
signal followed by a SIGKILL
signal one second later if still alive.
The new worker queue enables custom logic to be added to function _worker_can_run_now()
to determine when there is enough available resources
to start a new worker process. Current implementation keeps the semantics of the previous worker queue, where a process is allowed to start as long as the current number of workers being processed is lower than Config.MAX_WORKERS
. This function would be able to inspect the properties of currently running workers as well as the properties of the worker being considered for startup, allowing decision to be made on the number of currently running workers of a specific type of job, for example.
Closes #151