Loading buildstream/_artifactcache/artifactcache.py +109 −1 Original line number Diff line number Diff line Loading @@ -21,7 +21,8 @@ import os import string from collections import Mapping, namedtuple from .._exceptions import ImplError, LoadError, LoadErrorReason from ..element import _KeyStrength from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason from .._message import Message, MessageType from .. import utils from .. import _yaml Loading Loading @@ -77,6 +78,7 @@ ArtifactCacheSpec.__new__.__defaults__ = (None, None, None) class ArtifactCache(): def __init__(self, context): self.context = context self.required_artifacts = set() self.extractdir = os.path.join(context.artifactdir, 'extract') self.max_size = context.cache_quota self.estimated_size = None Loading Loading @@ -183,6 +185,75 @@ class ArtifactCache(): (str(provenance))) return cache_specs # append_required_artifacts(): # # Append to the list of elements whose artifacts are required for # the current run. Artifacts whose elements are in this list will # be locked by the artifact cache and not touched for the duration # of the current pipeline. # # Args: # elements (iterable): A set of elements to mark as required # def append_required_artifacts(self, elements): # We lock both strong and weak keys - deleting one but not the # other won't save space in most cases anyway, but would be a # user inconvenience. for element in elements: strong_key = element._get_cache_key(strength=_KeyStrength.STRONG) weak_key = element._get_cache_key(strength=_KeyStrength.WEAK) for key in (strong_key, weak_key): if key and key not in self.required_artifacts: self.required_artifacts.add(key) # We also update the usage times of any artifacts # we will be using, which helps preventing a # buildstream process that runs in parallel with # this one from removing artifacts in-use. try: self.update_atime(key) except ArtifactError: pass # clean(): # # Clean the artifact cache as much as possible. # def clean(self): artifacts = self.list_artifacts() while self.calculate_cache_size() >= self.context.cache_quota - self.context.cache_lower_threshold: try: to_remove = artifacts.pop(0) except IndexError: # If too many artifacts are required, and we therefore # can't remove them, we have to abort the build. # # FIXME: Asking the user what to do may be neater default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'], 'buildstream.conf') detail = ("There is not enough space to build the given element.\n" "Please increase the cache-quota in {}." .format(self.context.config_origin or default_conf)) if self.calculate_cache_size() > self.context.cache_quota: raise ArtifactError("Cache too full. Aborting.", detail=detail, reason="cache-too-full") else: break key = to_remove.rpartition('/')[2] if key not in self.required_artifacts: size = self.remove(to_remove) if size: self.cache_size -= size # This should be O(1) if implemented correctly return self.calculate_cache_size() # get_approximate_cache_size() # # A cheap method that aims to serve as an upper limit on the Loading Loading @@ -216,6 +287,17 @@ class ArtifactCache(): # Abstract methods for subclasses to implement # ################################################ # update_atime() # # Update the atime of an artifact. # # Args: # key (str): The key of the artifact. # def update_atime(self, key): raise ImplError("Cache '{kind}' does not implement contains()" .format(kind=type(self).__name__)) # initialize_remotes(): # # This will contact each remote cache. Loading @@ -241,6 +323,32 @@ class ArtifactCache(): raise ImplError("Cache '{kind}' does not implement contains()" .format(kind=type(self).__name__)) # list_artifacts(): # # List artifacts in this cache in LRU order. # # Returns: # ([str]) - A list of artifact names as generated by # `ArtifactCache.get_artifact_fullname` in LRU order # def list_artifacts(self): raise ImplError("Cache '{kind}' does not implement list_artifacts()" .format(kind=type(self).__name__)) # remove(): # # Removes the artifact for the specified ref from the local # artifact cache. # # Args: # ref (artifact_name): The name of the artifact to remove (as # generated by # `ArtifactCache.get_artifact_fullname`) # def remove(self, artifact_name): raise ImplError("Cache '{kind}' does not implement remove()" .format(kind=type(self).__name__)) # extract(): # # Extract cached artifact for the specified Element if it hasn't Loading buildstream/_artifactcache/cascache.py +6 −0 Original line number Diff line number Diff line Loading @@ -450,6 +450,12 @@ class CASCache(ArtifactCache): except FileNotFoundError as e: raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e def update_atime(self, ref): try: os.utime(self._refpath(ref)) except FileNotFoundError as e: raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e def calculate_cache_size(self): if self.cache_size is None: self.cache_size = utils._get_dir_size(self.casdir) Loading buildstream/_scheduler/jobs/__init__.py +1 −0 Original line number Diff line number Diff line from .elementjob import ElementJob from .cachesizejob import CacheSizeJob from .cleanupjob import CleanupJob buildstream/_scheduler/jobs/cleanupjob.py 0 → 100644 +63 −0 Original line number Diff line number Diff line # Copyright (C) 2018 Codethink Limited # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. # # Author: # Tristan Daniël Maat <tristan.maat@codethink.co.uk> # import os from contextlib import contextmanager from .job import Job from ..._platform import Platform from ..._message import Message class CleanupJob(Job): def __init__(self, *args, complete_cb, **kwargs): super().__init__(*args, **kwargs) self._complete_cb = complete_cb self._cache = Platform._instance.artifactcache def child_process(self): return self._cache.clean() def parent_complete(self, success, result): self._cache._set_cache_size(result) if self._complete_cb: self._complete_cb() @contextmanager def child_logging_enabled(self, logfile): self._logfile = logfile.format(pid=os.getpid()) yield self._logfile self._logfile = None def message(self, message_type, message, **kwargs): args = dict(kwargs) args['scheduler'] = True self._scheduler.context.message(Message(None, message_type, message, **args)) def child_log(self, message): message.action_name = self.action_name with open(self._logfile, 'a+') as log: message_text = self.decorate_message(message, '[cleanup]') log.write('{}\n'.format(message_text)) log.flush() return message def child_process_data(self): return {} buildstream/_scheduler/scheduler.py +14 −2 Original line number Diff line number Diff line Loading @@ -28,7 +28,7 @@ from contextlib import contextmanager # Local imports from .resources import Resources, ResourceType from .jobs import CacheSizeJob from .jobs import CacheSizeJob, CleanupJob # A decent return code for Scheduler.run() Loading Loading @@ -313,13 +313,25 @@ class Scheduler(): self.schedule_jobs(ready) self._sched() def _run_cleanup(self, cache_size): if cache_size and cache_size < self.context.cache_quota: return logpath = os.path.join(self.context.logdir, 'cleanup.{pid}.log') job = CleanupJob(self, 'cleanup', logpath, resources=[ResourceType.CACHE, ResourceType.PROCESS], exclusive_resources=[ResourceType.CACHE], complete_cb=None) self.schedule_jobs([job]) def _check_cache_size_real(self): logpath = os.path.join(self.context.logdir, 'cache_size.{pid}.log') job = CacheSizeJob(self, 'cache_size', logpath, resources=[ResourceType.CACHE, ResourceType.PROCESS], exclusive_resources=[ResourceType.CACHE], complete_cb=None) complete_cb=self._run_cleanup) self.schedule_jobs([job]) # _suspend_jobs() Loading Loading
buildstream/_artifactcache/artifactcache.py +109 −1 Original line number Diff line number Diff line Loading @@ -21,7 +21,8 @@ import os import string from collections import Mapping, namedtuple from .._exceptions import ImplError, LoadError, LoadErrorReason from ..element import _KeyStrength from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason from .._message import Message, MessageType from .. import utils from .. import _yaml Loading Loading @@ -77,6 +78,7 @@ ArtifactCacheSpec.__new__.__defaults__ = (None, None, None) class ArtifactCache(): def __init__(self, context): self.context = context self.required_artifacts = set() self.extractdir = os.path.join(context.artifactdir, 'extract') self.max_size = context.cache_quota self.estimated_size = None Loading Loading @@ -183,6 +185,75 @@ class ArtifactCache(): (str(provenance))) return cache_specs # append_required_artifacts(): # # Append to the list of elements whose artifacts are required for # the current run. Artifacts whose elements are in this list will # be locked by the artifact cache and not touched for the duration # of the current pipeline. # # Args: # elements (iterable): A set of elements to mark as required # def append_required_artifacts(self, elements): # We lock both strong and weak keys - deleting one but not the # other won't save space in most cases anyway, but would be a # user inconvenience. for element in elements: strong_key = element._get_cache_key(strength=_KeyStrength.STRONG) weak_key = element._get_cache_key(strength=_KeyStrength.WEAK) for key in (strong_key, weak_key): if key and key not in self.required_artifacts: self.required_artifacts.add(key) # We also update the usage times of any artifacts # we will be using, which helps preventing a # buildstream process that runs in parallel with # this one from removing artifacts in-use. try: self.update_atime(key) except ArtifactError: pass # clean(): # # Clean the artifact cache as much as possible. # def clean(self): artifacts = self.list_artifacts() while self.calculate_cache_size() >= self.context.cache_quota - self.context.cache_lower_threshold: try: to_remove = artifacts.pop(0) except IndexError: # If too many artifacts are required, and we therefore # can't remove them, we have to abort the build. # # FIXME: Asking the user what to do may be neater default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'], 'buildstream.conf') detail = ("There is not enough space to build the given element.\n" "Please increase the cache-quota in {}." .format(self.context.config_origin or default_conf)) if self.calculate_cache_size() > self.context.cache_quota: raise ArtifactError("Cache too full. Aborting.", detail=detail, reason="cache-too-full") else: break key = to_remove.rpartition('/')[2] if key not in self.required_artifacts: size = self.remove(to_remove) if size: self.cache_size -= size # This should be O(1) if implemented correctly return self.calculate_cache_size() # get_approximate_cache_size() # # A cheap method that aims to serve as an upper limit on the Loading Loading @@ -216,6 +287,17 @@ class ArtifactCache(): # Abstract methods for subclasses to implement # ################################################ # update_atime() # # Update the atime of an artifact. # # Args: # key (str): The key of the artifact. # def update_atime(self, key): raise ImplError("Cache '{kind}' does not implement contains()" .format(kind=type(self).__name__)) # initialize_remotes(): # # This will contact each remote cache. Loading @@ -241,6 +323,32 @@ class ArtifactCache(): raise ImplError("Cache '{kind}' does not implement contains()" .format(kind=type(self).__name__)) # list_artifacts(): # # List artifacts in this cache in LRU order. # # Returns: # ([str]) - A list of artifact names as generated by # `ArtifactCache.get_artifact_fullname` in LRU order # def list_artifacts(self): raise ImplError("Cache '{kind}' does not implement list_artifacts()" .format(kind=type(self).__name__)) # remove(): # # Removes the artifact for the specified ref from the local # artifact cache. # # Args: # ref (artifact_name): The name of the artifact to remove (as # generated by # `ArtifactCache.get_artifact_fullname`) # def remove(self, artifact_name): raise ImplError("Cache '{kind}' does not implement remove()" .format(kind=type(self).__name__)) # extract(): # # Extract cached artifact for the specified Element if it hasn't Loading
buildstream/_artifactcache/cascache.py +6 −0 Original line number Diff line number Diff line Loading @@ -450,6 +450,12 @@ class CASCache(ArtifactCache): except FileNotFoundError as e: raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e def update_atime(self, ref): try: os.utime(self._refpath(ref)) except FileNotFoundError as e: raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e def calculate_cache_size(self): if self.cache_size is None: self.cache_size = utils._get_dir_size(self.casdir) Loading
buildstream/_scheduler/jobs/__init__.py +1 −0 Original line number Diff line number Diff line from .elementjob import ElementJob from .cachesizejob import CacheSizeJob from .cleanupjob import CleanupJob
buildstream/_scheduler/jobs/cleanupjob.py 0 → 100644 +63 −0 Original line number Diff line number Diff line # Copyright (C) 2018 Codethink Limited # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. # # Author: # Tristan Daniël Maat <tristan.maat@codethink.co.uk> # import os from contextlib import contextmanager from .job import Job from ..._platform import Platform from ..._message import Message class CleanupJob(Job): def __init__(self, *args, complete_cb, **kwargs): super().__init__(*args, **kwargs) self._complete_cb = complete_cb self._cache = Platform._instance.artifactcache def child_process(self): return self._cache.clean() def parent_complete(self, success, result): self._cache._set_cache_size(result) if self._complete_cb: self._complete_cb() @contextmanager def child_logging_enabled(self, logfile): self._logfile = logfile.format(pid=os.getpid()) yield self._logfile self._logfile = None def message(self, message_type, message, **kwargs): args = dict(kwargs) args['scheduler'] = True self._scheduler.context.message(Message(None, message_type, message, **args)) def child_log(self, message): message.action_name = self.action_name with open(self._logfile, 'a+') as log: message_text = self.decorate_message(message, '[cleanup]') log.write('{}\n'.format(message_text)) log.flush() return message def child_process_data(self): return {}
buildstream/_scheduler/scheduler.py +14 −2 Original line number Diff line number Diff line Loading @@ -28,7 +28,7 @@ from contextlib import contextmanager # Local imports from .resources import Resources, ResourceType from .jobs import CacheSizeJob from .jobs import CacheSizeJob, CleanupJob # A decent return code for Scheduler.run() Loading Loading @@ -313,13 +313,25 @@ class Scheduler(): self.schedule_jobs(ready) self._sched() def _run_cleanup(self, cache_size): if cache_size and cache_size < self.context.cache_quota: return logpath = os.path.join(self.context.logdir, 'cleanup.{pid}.log') job = CleanupJob(self, 'cleanup', logpath, resources=[ResourceType.CACHE, ResourceType.PROCESS], exclusive_resources=[ResourceType.CACHE], complete_cb=None) self.schedule_jobs([job]) def _check_cache_size_real(self): logpath = os.path.join(self.context.logdir, 'cache_size.{pid}.log') job = CacheSizeJob(self, 'cache_size', logpath, resources=[ResourceType.CACHE, ResourceType.PROCESS], exclusive_resources=[ResourceType.CACHE], complete_cb=None) complete_cb=self._run_cleanup) self.schedule_jobs([job]) # _suspend_jobs() Loading