Commit ce01f87e authored by Tristan Van Berkom's avatar Tristan Van Berkom
Browse files

_scheduler/scheduler.py: Run cache size exclusively at startup

When running any session that has Queues which require Resource.CACHE,
check if our loaded estimated size exceeds the quota, and if so;
lock the Resource.CACHE resource exclusively right away and run
an exclusive initial cache size job.

This ensures we cleanup first before doing anything which might
add to the cache at startup time, if deemed needed.

This is a partial fix for issue #737
parent 3e36e363
Loading
Loading
Loading
Loading
+59 −2
Original line number Diff line number Diff line
@@ -151,6 +151,9 @@ class Scheduler():
        # Handle unix signals while running
        self._connect_signals()

        # Check if we need to start with some cache maintenance
        self._check_cache_management()

        # Run the queues
        self._sched()
        self.loop.run_forever()
@@ -272,6 +275,31 @@ class Scheduler():
    #                  Local Private Methods              #
    #######################################################

    # _check_cache_management()
    #
    # Run an initial check if we need to lock the cache
    # resource and check the size and possibly launch
    # a cleanup.
    #
    # Sessions which do not add to the cache are not affected.
    #
    def _check_cache_management(self):

        # Only trigger the check for a scheduler run which has
        # queues which require the CACHE resource.
        if not any(q for q in self.queues
                   if ResourceType.CACHE in q.resources):
            return

        # If the estimated size outgrows the quota, queue a job to
        # actually check the real cache size initially, this one
        # should have exclusive access to the cache to ensure nothing
        # starts while we are checking the cache.
        #
        artifacts = self.context.artifactcache
        if artifacts.has_quota_exceeded():
            self._sched_cache_size_job(exclusive=True)

    # _spawn_job()
    #
    # Spanws a job
@@ -292,6 +320,11 @@ class Scheduler():
        self._cache_size_running = None
        self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])

        # Unregister the exclusive interest if there was any
        self.resources.unregister_exclusive_interest(
            [ResourceType.CACHE], 'cache-size'
        )

        # Schedule a cleanup job if we've hit the threshold
        if status != JobStatus.OK:
            return
@@ -344,11 +377,35 @@ class Scheduler():
    # Runs a cache size job if one is scheduled to run now and
    # sufficient recources are available.
    #
    def _sched_cache_size_job(self):
    # Args:
    #    exclusive (bool): Run a cache size job immediately and
    #                      hold the ResourceType.CACHE resource
    #                      exclusively (used at startup).
    #
    def _sched_cache_size_job(self, *, exclusive=False):

        # The exclusive argument is not intended (or safe) for arbitrary use.
        if exclusive:
            assert not self._cache_size_scheduled
            assert not self._cache_size_running
            assert not self._active_jobs
            self._cache_size_scheduled = True

        if self._cache_size_scheduled and not self._cache_size_running:

            if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS]):
            # Handle the exclusive launch
            exclusive_resources = set()
            if exclusive:
                exclusive_resources.add(ResourceType.CACHE)
                self.resources.register_exclusive_interest(
                    exclusive_resources, 'cache-size'
                )

            # Reserve the resources (with the possible exclusive cache resource)
            if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS],
                                      exclusive_resources):

                # Update state and launch
                self._cache_size_scheduled = False
                self._cache_size_running = \
                    CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,