Skip to content
Snippets Groups Projects
Commit 6ad31e59 authored by Martin Blanchard's avatar Martin Blanchard
Browse files

tests/utils/cas.py: New CAS server helper

#77
parent 37f2bd24
No related branches found
No related tags found
No related merge requests found
......@@ -89,6 +89,7 @@ tests_require = [
'coverage == 4.4.0',
'moto',
'pep8',
'psutil',
'pytest == 3.6.4',
'pytest-cov >= 2.6.0',
'pytest-pep8',
......
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from concurrent import futures
from contextlib import contextmanager
import multiprocessing
import os
import signal
import tempfile
import grpc
import psutil
import pytest_cov
from buildgrid.server.cas.service import ByteStreamService
from buildgrid.server.cas.service import ContentAddressableStorageService
from buildgrid.server.cas.instance import ByteStreamInstance
from buildgrid.server.cas.instance import ContentAddressableStorageInstance
from buildgrid.server.cas.storage.disk import DiskStorage
@contextmanager
def serve_cas(instances):
server = Server(instances)
try:
yield server
finally:
server.quit()
def kill_process_tree(pid):
proc = psutil.Process(pid)
children = proc.children(recursive=True)
def kill_proc(p):
try:
p.kill()
except psutil.AccessDenied:
# Ignore this error, it can happen with
# some setuid bwrap processes.
pass
# Bloody Murder
for child in children:
kill_proc(child)
kill_proc(proc)
def run_in_subprocess(function, *arguments):
queue = multiprocessing.Queue()
# Use subprocess to avoid creation of gRPC threads in main process
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md
process = multiprocessing.Process(target=function,
args=(queue, *arguments))
try:
process.start()
result = queue.get()
process.join()
except KeyboardInterrupt:
kill_process_tree(process.pid)
raise
return result
class Server:
def __init__(self, instances):
self.instances = instances
self.__storage_path = tempfile.TemporaryDirectory()
self.__storage = DiskStorage(self.__storage_path.name)
self.__queue = multiprocessing.Queue()
self.__process = multiprocessing.Process(
target=Server.serve,
args=(self.__queue, self.instances, self.__storage_path.name))
self.__process.start()
self.port = self.__queue.get()
self.remote = 'localhost:{}'.format(self.port)
@classmethod
def serve(cls, queue, instances, storage_path):
pytest_cov.embed.cleanup_on_sigterm()
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
port = server.add_insecure_port('localhost:0')
storage = DiskStorage(storage_path)
bs_service = ByteStreamService(server)
cas_service = ContentAddressableStorageService(server)
for name in instances:
bs_service.add_instance(name, ByteStreamInstance(storage))
cas_service.add_instance(name, ContentAddressableStorageInstance(storage))
server.start()
queue.put(port)
signal.pause()
def has(self, digest):
return self.__storage.has_blob(digest)
def compare_blobs(self, digest, blob):
if not self.__storage.has_blob(digest):
return False
stored_blob = self.__storage.get_blob(digest)
stored_blob = stored_blob.read()
return blob == stored_blob
def compare_messages(self, digest, message):
if not self.__storage.has_blob(digest):
return False
message_blob = message.SerializeToString()
stored_blob = self.__storage.get_blob(digest)
stored_blob = stored_blob.read()
return message_blob == stored_blob
def compare_files(self, digest, file_path):
if not self.__storage.has_blob(digest):
return False
with open(file_path, 'rb') as file_bytes:
file_blob = file_bytes.read()
stored_blob = self.__storage.get_blob(digest)
stored_blob = stored_blob.read()
return file_blob == stored_blob
def quit(self):
if self.__process:
self.__process.terminate()
self.__process.join()
self.__storage_path.cleanup()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment