Skip to content

Resolve "Replace ProcessPoolExecutor as it does not handle abrupt pool process termination"

Replace ProcessPoolExecutor with a custom-made worker queue processor.

The new worker queue starts a new multiprocessing.Process for each new worker and keeps track of its execution status using coroutine _process_worker().

Reception of signal SIGCHLD is used as trigger to check termination status of worker child processes, so that asyncio.sleep()-based polling is avoided.

Upon cancellation of the worker queue, currently running worker processes will be sent a SIGTERM signal followed by a SIGKILL signal one second later if still alive.

The new worker queue enables custom logic to be added to function _worker_can_run_now() to determine when there is enough available resources to start a new worker process. Current implementation keeps the semantics of the previous worker queue, where a process is allowed to start as long as the current number of workers being processed is lower than Config.MAX_WORKERS. This function would be able to inspect the properties of currently running workers as well as the properties of the worker being considered for startup, allowing decision to be made on the number of currently running workers of a specific type of job, for example.

Closes #151

Edited by Knut Aksel Røysland

Merge request reports