Skip to content
Snippets Groups Projects
Commit f83019d6 authored by Valentin David's avatar Valentin David
Browse files

Cleanup cache in cas server more agressively

When there is less than 2GB left, it cleans up have 10GB available.
These values are configurable.
parent ba3c6400
No related branches found
No related tags found
No related merge requests found
......@@ -57,7 +57,9 @@ class ArtifactTooLargeException(Exception):
# repo (str): Path to CAS repository
# enable_push (bool): Whether to allow blob uploads and artifact updates
#
def create_server(repo, *, enable_push):
def create_server(repo, *, enable_push,
max_head_size=int(10e9),
min_head_size=int(2e9)):
context = Context()
context.artifactdir = os.path.abspath(repo)
......@@ -67,11 +69,13 @@ def create_server(repo, *, enable_push):
max_workers = (os.cpu_count() or 1) * 5
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size)
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
_ByteStreamServicer(artifactcache, enable_push=enable_push), server)
_ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
_ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
_ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
_CapabilitiesServicer(), server)
......@@ -89,9 +93,19 @@ def create_server(repo, *, enable_push):
@click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
@click.option('--enable-push', default=False, is_flag=True,
help="Allow clients to upload blobs and update artifact cache")
@click.option('--head-room-min', type=click.INT,
help="Disk head room minimum in bytes",
default=2e9)
@click.option('--head-room-max', type=click.INT,
help="Disk head room maximum in bytes",
default=10e9)
@click.argument('repo')
def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
server = create_server(repo, enable_push=enable_push)
def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
head_room_min, head_room_max):
server = create_server(repo,
max_head_size=head_room_max,
min_head_size=head_room_min,
enable_push=enable_push)
use_tls = bool(server_key)
......@@ -133,10 +147,11 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
def __init__(self, cas, *, enable_push):
def __init__(self, cas, cache_cleaner, *, enable_push):
super().__init__()
self.cas = cas
self.enable_push = enable_push
self.cache_cleaner = cache_cleaner
def Read(self, request, context):
resource_name = request.resource_name
......@@ -198,7 +213,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
if client_digest.size_bytes == 0:
break
try:
_clean_up_cache(self.cas, client_digest.size_bytes)
self.cache_cleaner.clean_up(client_digest.size_bytes)
except ArtifactTooLargeException as e:
context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
context.set_details(str(e))
......@@ -242,10 +257,11 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
def __init__(self, cas, *, enable_push):
def __init__(self, cas, cache_cleaner, *, enable_push):
super().__init__()
self.cas = cas
self.enable_push = enable_push
self.cache_cleaner = cache_cleaner
def FindMissingBlobs(self, request, context):
response = remote_execution_pb2.FindMissingBlobsResponse()
......@@ -314,7 +330,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
continue
try:
_clean_up_cache(self.cas, digest.size_bytes)
self.cache_cleaner.clean_up(digest.size_bytes)
with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
out.write(blob_request.data)
......@@ -435,63 +451,70 @@ def _digest_from_upload_resource_name(resource_name):
return None
# _clean_up_cache()
#
# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
# is enough space for the incoming artifact
#
# Args:
# cas: CASCache object
# object_size: The size of the object being received in bytes
#
# Returns:
# int: The total bytes removed on the filesystem
#
def _clean_up_cache(cas, object_size):
# Determine the available disk space, in bytes, of the file system
# which mounts the repo
stats = os.statvfs(cas.casdir)
buffer_ = int(2e9) # Add a 2 GB buffer
free_disk_space = (stats.f_bavail * stats.f_bsize) - buffer_
total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
if object_size > total_disk_space:
raise ArtifactTooLargeException("Artifact of size: {} is too large for "
"the filesystem which mounts the remote "
"cache".format(object_size))
if object_size <= free_disk_space:
# No need to clean up
return 0
# obtain a list of LRP artifacts
LRP_objects = cas.list_objects()
removed_size = 0 # in bytes
last_mtime = 0
while object_size - removed_size > free_disk_space:
try:
last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP objects
except IndexError:
# This exception is caught if there are no more artifacts in the list
# LRP_artifacts. This means the the artifact is too large for the filesystem
# so we abort the process
raise ArtifactTooLargeException("Artifact of size {} is too large for "
class _CacheCleaner:
def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
self.__cas = cas
self.__max_head_size = max_head_size
self.__min_head_size = min_head_size
# _clean_up_cache()
#
# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
# is enough space for the incoming artifact
#
# Args:
# object_size: The size of the object being received in bytes
#
# Returns:
# int: The total bytes removed on the filesystem
#
def clean_up(self, object_size):
stats = os.statvfs(self.__cas.casdir)
free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
if object_size > total_disk_space:
raise ArtifactTooLargeException("Artifact of size: {} is too large for "
"the filesystem which mounts the remote "
"cache".format(object_size))
try:
size = os.stat(to_remove).st_size
os.unlink(to_remove)
removed_size += size
except FileNotFoundError:
pass
if object_size <= free_disk_space:
# No need to clean up
return 0
cas.clean_up_refs_until(last_mtime)
stats = os.statvfs(self.__cas.casdir)
target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
if removed_size > 0:
logging.info("Successfully removed {} bytes from the cache".format(removed_size))
else:
logging.info("No artifacts were removed from the cache.")
# obtain a list of LRP artifacts
LRP_objects = self.__cas.list_objects()
removed_size = 0 # in bytes
last_mtime = 0
while object_size - removed_size > target_disk_space:
try:
last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact
except IndexError:
# This exception is caught if there are no more artifacts in the list
# LRP_artifacts. This means the the artifact is too large for the filesystem
# so we abort the process
raise ArtifactTooLargeException("Artifact of size {} is too large for "
"the filesystem which mounts the remote "
"cache".format(object_size))
try:
size = os.stat(to_remove).st_size
os.unlink(to_remove)
removed_size += size
except FileNotFoundError:
pass
self.__cas.clean_up_refs_until(last_mtime)
if removed_size > 0:
logging.info("Successfully removed {} bytes from the cache".format(removed_size))
else:
logging.info("No artifacts were removed from the cache.")
return removed_size
return removed_size
......@@ -208,6 +208,8 @@ def test_artifact_expires(cli, datafiles, tmpdir):
# Create an artifact share (remote artifact cache) in the tmpdir/artifactshare
# Mock a file system with 12 MB free disk space
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
min_head_size=int(2e9),
max_head_size=int(2e9),
total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
# Configure bst to push to the cache
......@@ -291,6 +293,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir):
# Create an artifact share (remote cache) in tmpdir/artifactshare
# Mock a file system with 12 MB free disk space
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
min_head_size=int(2e9),
max_head_size=int(2e9),
total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
# Configure bst to push to the cache
......
......@@ -29,7 +29,11 @@ from buildstream._exceptions import ArtifactError
#
class ArtifactShare():
def __init__(self, directory, *, total_space=None, free_space=None):
def __init__(self, directory, *,
total_space=None,
free_space=None,
min_head_size=int(2e9),
max_head_size=int(10e9)):
# The working directory for the artifact share (in case it
# needs to do something outside of it's backend's storage folder).
......@@ -53,6 +57,9 @@ class ArtifactShare():
self.total_space = total_space
self.free_space = free_space
self.max_head_size = max_head_size
self.min_head_size = min_head_size
q = Queue()
self.process = Process(target=self.run, args=(q,))
......@@ -76,7 +83,10 @@ class ArtifactShare():
self.free_space = self.total_space
os.statvfs = self._mock_statvfs
server = create_server(self.repodir, enable_push=True)
server = create_server(self.repodir,
max_head_size=self.max_head_size,
min_head_size=self.min_head_size,
enable_push=True)
port = server.add_insecure_port('localhost:0')
server.start()
......@@ -158,8 +168,11 @@ class ArtifactShare():
# Create an ArtifactShare for use in a test case
#
@contextmanager
def create_artifact_share(directory, *, total_space=None, free_space=None):
share = ArtifactShare(directory, total_space=total_space, free_space=free_space)
def create_artifact_share(directory, *, total_space=None, free_space=None,
min_head_size=int(2e9),
max_head_size=int(10e9)):
share = ArtifactShare(directory, total_space=total_space, free_space=free_space,
min_head_size=min_head_size, max_head_size=max_head_size)
try:
yield share
finally:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment