Skip to content
Snippets Groups Projects
Commit 443114f7 authored by Jim MacArthur's avatar Jim MacArthur
Browse files

tests: Avoid hangs due to exceptions in subprocesses

parent abef70fe
No related branches found
No related tags found
No related merge requests found
Pipeline #37712152 passed
......@@ -25,6 +25,17 @@ def message_handler(message, context):
pass
# Since parent processes wait for queue events, we need
# to put something on it if the called process raises an
# exception.
def _queue_wrapper(target, queue, *args):
try:
target(*args, queue=queue)
except Exception as e:
queue.put(str(e))
raise
def tree_maker(cas, tree, directory):
if tree.root.ByteSize() == 0:
tree.root.CopyFrom(directory)
......@@ -97,9 +108,9 @@ def test_pull(cli, tmpdir, datafiles):
queue = multiprocessing.Queue()
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
process = multiprocessing.Process(target=_test_pull,
args=(user_config_file, project_dir, artifact_dir,
'target.bst', element_key, queue))
process = multiprocessing.Process(target=_queue_wrapper,
args=(_test_pull, queue, user_config_file, project_dir,
artifact_dir, 'target.bst', element_key))
try:
# Keep SIGINT blocked in the child process
......@@ -205,9 +216,9 @@ def test_pull_tree(cli, tmpdir, datafiles):
queue = multiprocessing.Queue()
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
process = multiprocessing.Process(target=_test_push_tree,
args=(user_config_file, project_dir, artifact_dir,
artifact_digest, queue))
process = multiprocessing.Process(target=_queue_wrapper,
args=(_test_push_tree, queue, user_config_file, project_dir,
artifact_dir, artifact_digest))
try:
# Keep SIGINT blocked in the child process
......@@ -233,9 +244,9 @@ def test_pull_tree(cli, tmpdir, datafiles):
queue = multiprocessing.Queue()
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
process = multiprocessing.Process(target=_test_pull_tree,
args=(user_config_file, project_dir, artifact_dir,
tree_digest, queue))
process = multiprocessing.Process(target=_queue_wrapper,
args=(_test_pull_tree, queue, user_config_file, project_dir,
artifact_dir, tree_digest))
try:
# Keep SIGINT blocked in the child process
......
......@@ -26,6 +26,17 @@ def message_handler(message, context):
pass
# Since parent processes wait for queue events, we need
# to put something on it if the called process raises an
# exception.
def _queue_wrapper(target, queue, *args):
try:
target(*args, queue=queue)
except Exception as e:
queue.put(str(e))
raise
@pytest.mark.datafiles(DATA_DIR)
def test_push(cli, tmpdir, datafiles):
project_dir = str(datafiles)
......@@ -76,9 +87,9 @@ def test_push(cli, tmpdir, datafiles):
queue = multiprocessing.Queue()
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
process = multiprocessing.Process(target=_test_push,
args=(user_config_file, project_dir, artifact_dir,
'target.bst', element_key, queue))
process = multiprocessing.Process(target=_queue_wrapper,
args=(_test_push, queue, user_config_file, project_dir,
artifact_dir, 'target.bst', element_key))
try:
# Keep SIGINT blocked in the child process
......@@ -185,9 +196,9 @@ def test_push_directory(cli, tmpdir, datafiles):
queue = multiprocessing.Queue()
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
process = multiprocessing.Process(target=_test_push_directory,
args=(user_config_file, project_dir, artifact_dir,
artifact_digest, queue))
process = multiprocessing.Process(target=_queue_wrapper,
args=(_test_push_directory, queue, user_config_file,
project_dir, artifact_dir, artifact_digest))
try:
# Keep SIGINT blocked in the child process
......@@ -260,8 +271,9 @@ def test_push_message(cli, tmpdir, datafiles):
queue = multiprocessing.Queue()
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
process = multiprocessing.Process(target=_test_push_message,
args=(user_config_file, project_dir, artifact_dir, queue))
process = multiprocessing.Process(target=_queue_wrapper,
args=(_test_push_message, queue, user_config_file,
project_dir, artifact_dir))
try:
# Keep SIGINT blocked in the child process
......
......@@ -67,19 +67,24 @@ class ArtifactShare():
def run(self, q):
pytest_cov.embed.cleanup_on_sigterm()
# Optionally mock statvfs
if self.total_space:
if self.free_space is None:
self.free_space = self.total_space
os.statvfs = self._mock_statvfs
try:
# Optionally mock statvfs
if self.total_space:
if self.free_space is None:
self.free_space = self.total_space
os.statvfs = self._mock_statvfs
server = create_server(self.repodir, enable_push=True)
port = server.add_insecure_port('localhost:0')
server = create_server(self.repodir, enable_push=True)
port = server.add_insecure_port('localhost:0')
server.start()
server.start()
# Send port to parent
q.put(port)
# Send port to parent
q.put(port)
except Exception as e:
q.put(None)
raise
# Sleep until termination by signal
signal.pause()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment