Commit 6d9fe61f authored by Benjamin Schubert's avatar Benjamin Schubert

WIP

parent 5754ac64
Pipeline #53200358 failed with stages
in 797 minutes and 25 seconds
......@@ -31,7 +31,9 @@ from ._message import Message, MessageType
from ._profile import Topics, profile_start, profile_end
from . import Scope, Consistency
from ._project import ProjectRefStorage
import sys
def debug(*args):
print("➤➤➤", *args, file=sys.stderr)
# PipelineSelection()
#
......@@ -134,6 +136,7 @@ class Pipeline():
def resolve_elements(self, targets):
with self._context.timed_activity("Resolving cached state", silent_nested=True):
for element in self.dependencies(targets, Scope.ALL):
debug("Element: ", str(element))
# Preflight
element._preflight()
......@@ -220,6 +223,7 @@ class Pipeline():
elements = list(self.dependencies(targets, scope))
debug("GET_SELECTION", mode, [str(e) for e in elements])
return elements
# except_elements():
......
......@@ -66,22 +66,30 @@ class BuildQueue(Queue):
return super().enqueue(to_queue)
def push(self, element):
if element._cached_success():
self.skip(element)
assert element._buildable()
super().push(element)
def process(self, element):
return element._assemble()
def status(self, element):
if not element._is_required():
# Artifact is not currently required but it may be requested later.
# Keep it in the queue.
return QueueStatus.WAIT
# def status(self, element):
# if not element._is_required():
# # Artifact is not currently required but it may be requested later.
# # Keep it in the queue.
# return QueueStatus.WAIT
if element._cached_success():
return QueueStatus.SKIP
# if element._cached_success():
# return QueueStatus.SKIP
if not element._buildable():
return QueueStatus.WAIT
# if not element._buildable():
# return QueueStatus.WAIT
return QueueStatus.READY
# return QueueStatus.READY
def _check_cache_size(self, job, element, artifact_size):
......
......@@ -35,8 +35,8 @@ class FetchQueue(Queue):
complete_name = "Fetched"
resources = [ResourceType.DOWNLOAD]
def __init__(self, scheduler, skip_cached=False, fetch_original=False):
super().__init__(scheduler)
def __init__(self, scheduler, next_queue=None, skip_cached=False, fetch_original=False):
super().__init__(scheduler, next_queue)
self._skip_cached = skip_cached
self._fetch_original = fetch_original
......@@ -44,27 +44,36 @@ class FetchQueue(Queue):
def process(self, element):
element._fetch(fetch_original=self._fetch_original)
def status(self, element):
if not element._is_required():
# Artifact is not currently required but it may be requested later.
# Keep it in the queue.
return QueueStatus.WAIT
def push(self, element):
assert element._is_required()
assert element._can_query_cache()
# Optionally skip elements that are already in the artifact cache
if self._skip_cached:
if not element._can_query_cache():
return QueueStatus.WAIT
if element._cached() or not element._should_fetch(self._fetch_original):
self.skip(element)
if element._cached():
return QueueStatus.SKIP
super().push(element)
# This will automatically skip elements which
# have no sources.
# def status(self, element):
# if not element._is_required():
# # Artifact is not currently required but it may be requested later.
# # Keep it in the queue.
# return QueueStatus.WAIT
if not element._should_fetch(self._fetch_original):
return QueueStatus.SKIP
# # Optionally skip elements that are already in the artifact cache
# if self._skip_cached:
# if not element._can_query_cache():
# return QueueStatus.WAIT
return QueueStatus.READY
# if element._cached():
# return QueueStatus.SKIP
# # This will automatically skip elements which
# # have no sources.
# if not element._should_fetch(self._fetch_original):
# return QueueStatus.SKIP
# return QueueStatus.READY
def done(self, _, element, result, status):
......
......@@ -31,7 +31,11 @@ from ..resources import ResourceType
# BuildStream toplevel imports
from ..._exceptions import BstError, set_last_task_error
from ..._message import Message, MessageType
from ...types import _UniquePriorityQueue
import sys
def debug(*args):
print("➤➤➤", *args, file=sys.stderr)
# Queue status for a given element
#
......@@ -59,7 +63,7 @@ class Queue():
complete_name = None
resources = [] # Resources this queues' jobs want
def __init__(self, scheduler):
def __init__(self, scheduler, next_queue=None):
#
# Public members
......@@ -73,10 +77,11 @@ class Queue():
#
self._scheduler = scheduler
self._resources = scheduler.resources # Shared resource pool
self._wait_queue = deque() # Ready / Waiting elements
self._done_queue = deque() # Processed / Skipped elements
self._queue = _UniquePriorityQueue() # Ready / Waiting elements
self._max_retries = 0
self._next_queue = next_queue
# Assert the subclass has setup class data
assert self.action_name is not None
assert self.complete_name is not None
......@@ -101,7 +106,7 @@ class Queue():
#
#
def process(self, element):
pass
raise NotImplementedError()
# status()
#
......@@ -114,7 +119,7 @@ class Queue():
# (QueueStatus): The element status
#
def status(self, element):
return QueueStatus.READY
raise NotImplementedError()
# done()
#
......@@ -140,86 +145,34 @@ class Queue():
# Args:
# elts (list): A list of Elements
#
def enqueue(self, elts):
if not elts:
return
# Place skipped elements on the done queue right away.
#
# The remaining ready and waiting elements must remain in the
# same queue, and ready status must be determined at the moment
# which the scheduler is asking for the next job.
#
skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
wait = [elt for elt in elts if elt not in skip]
self.skipped_elements.extend(skip) # Public record of skipped elements
self._done_queue.extend(skip) # Elements to be processed
self._wait_queue.extend(wait) # Elements eligible to be dequeued
# dequeue()
#
# A generator which dequeues the elements which
# are ready to exit the queue.
#
# Yields:
# (Element): Elements being dequeued
#
def dequeue(self):
while self._done_queue:
yield self._done_queue.popleft()
# dequeue_ready()
#
# Reports whether any elements can be promoted to other queues
#
# Returns:
# (bool): Whether there are elements ready
#
def dequeue_ready(self):
return any(self._done_queue)
def push(self, element):
debug("Adding element", element, "to", self.action_name)
self._queue.push(element._pipeline_id, element)
# harvest_jobs()
#
# Process elements in the queue, moving elements which were enqueued
# into the dequeue pool, and creating as many jobs for which resources
# can be reserved.
#
# Returns:
# ([Job]): A list of jobs which can be run now
#
def harvest_jobs(self):
unready = []
ready = []
def skip(self, element):
self.skipped_elements.append(element)
if self._next_queue:
self._next_queue.push(element)
while self._wait_queue:
if not self._resources.reserve(self.resources, peek=True):
break
def pop(self):
debug("Popping", self.action_name, [(str(x[1]), x[1]._unique_id) for x in self._queue._heap])
element = self._wait_queue.popleft()
status = self.status(element)
if not self._resources.reserve(self.resources) or not self._queue:
raise IndexError()
if status == QueueStatus.WAIT:
unready.append(element)
elif status == QueueStatus.SKIP:
self._done_queue.append(element)
self.skipped_elements.append(element)
else:
reserved = self._resources.reserve(self.resources)
assert reserved
ready.append(element)
self._wait_queue.extendleft(unready)
return [
ElementJob(self._scheduler, self.action_name,
self._element_log_path(element),
element=element, queue=self,
action_cb=self.process,
complete_cb=self._job_done,
max_retries=self._max_retries)
for element in ready
]
else:
element = self._queue.pop()
return ElementJob(
self._scheduler,
self.action_name,
self._element_log_path(element),
element=element,
queue=self,
action_cb=self.process,
complete_cb=self._job_done,
max_retries=self._max_retries,
)
#####################################################
# Private Methods #
......@@ -301,8 +254,8 @@ class Queue():
detail=traceback.format_exc())
self.failed_elements.append(element)
else:
# All elements get placed on the done queue for later processing.
self._done_queue.append(element)
if self._next_queue:
self._next_queue.push(element)
# These lists are for bookkeeping purposes for the UI and logging.
if status == JobStatus.SKIPPED or job.get_terminated():
......
......@@ -38,15 +38,12 @@ class TrackQueue(Queue):
def process(self, element):
return element._track()
def status(self, element):
# We can skip elements entirely if they have no sources.
if not list(element.sources()):
# But we still have to mark them as tracked
def push(self, element):
if not any(element.sources()):
element._tracking_done()
return QueueStatus.SKIP
self.skip(element)
return QueueStatus.READY
super().push(element)
def done(self, _, element, result, status):
......
......@@ -32,6 +32,11 @@ from .jobs import JobStatus, CacheSizeJob, CleanupJob
from .._profile import Topics, profile_start, profile_end
import sys
def debug(*args):
print("➤➤➤", *args, file=sys.stderr)
# A decent return code for Scheduler.run()
class SchedStatus():
SUCCESS = 0
......@@ -429,42 +434,14 @@ class Scheduler():
# and process anything that is ready.
#
def _sched_queue_jobs(self):
ready = []
process_queues = True
while self._queue_jobs and process_queues:
# Pull elements forward through queues
elements = []
for queue in self.queues:
queue.enqueue(elements)
elements = list(queue.dequeue())
# Kickoff whatever processes can be processed at this time
#
# We start by queuing from the last queue first, because
# we want to give priority to queues later in the
# scheduling process in the case that multiple queues
# share the same token type.
#
# This avoids starvation situations where we dont move on
# to fetch tasks for elements which failed to pull, and
# thus need all the pulls to complete before ever starting
# a build
ready.extend(chain.from_iterable(
q.harvest_jobs() for q in reversed(self.queues)
))
# harvest_jobs() may have decided to skip some jobs, making
# them eligible for promotion to the next queue as a side effect.
#
# If that happens, do another round.
process_queues = any(q.dequeue_ready() for q in self.queues)
# Spawn the jobs
#
for job in ready:
self._spawn_job(job)
debug("Scheduling queues")
for queue in self.queues:
try:
while True:
job = queue.pop()
self._spawn_job(job)
except IndexError:
pass
# _sched()
#
......
......@@ -41,6 +41,9 @@ from .types import _KeyStrength
from . import utils, _yaml, _site
from . import Scope, Consistency
import sys
def debug(*args):
print("➤➤➤", *args, file=sys.stderr)
# Stream()
#
......@@ -152,6 +155,7 @@ class Stream():
isolate=False,
command=None,
usebuildtree=None):
raise NotImplementedError()
# Assert we have everything we need built, unless the directory is specified
# in which case we just blindly trust the directory, using the element
......@@ -258,8 +262,10 @@ class Stream():
if self._artifacts.has_fetch_remotes():
self._add_queue(PullQueue(self._scheduler))
self._add_queue(FetchQueue(self._scheduler, skip_cached=True))
self._add_queue(BuildQueue(self._scheduler))
build_queue = BuildQueue(self._scheduler)
self._add_queue(FetchQueue(self._scheduler, next_queue=build_queue, skip_cached=True))
self._add_queue(build_queue)
if self._artifacts.has_push_remotes():
self._add_queue(PushQueue(self._scheduler))
......@@ -359,7 +365,6 @@ class Stream():
selection=PipelineSelection.NONE,
ignore_junction_targets=False,
remote=None):
use_config = True
if remote:
use_config = False
......@@ -400,6 +405,7 @@ class Stream():
selection=PipelineSelection.NONE,
ignore_junction_targets=False,
remote=None):
raise NotImplementedError()
use_config = True
if remote:
......@@ -1022,6 +1028,8 @@ class Stream():
rewritable=rewritable,
fetch_subprojects=fetch_subprojects)
debug("Elements order", [str(e) for e in elements])
# Obtain the ArtifactElement objects
artifacts = [self._project.create_artifact_element(ref) for ref in target_artifacts]
......@@ -1093,6 +1101,8 @@ class Stream():
selected,
except_elements)
debug("SELECTED", [str(e) for e in selected])
# Set the "required" artifacts that should not be removed
# while this pipeline is active
#
......@@ -1107,6 +1117,10 @@ class Stream():
itertools.chain.from_iterable(
[element.sources() for element in required_elements()]))
counter = itertools.count()
for element in selected:
element._pipeline_id = next(counter)
debug("Setting id to", element._pipeline_id, "for", element)
if selection == PipelineSelection.PLAN and dynamic_plan:
# We use a dynamic build plan, only request artifacts of top-level targets,
# others are requested dynamically as needed.
......@@ -1153,7 +1167,10 @@ class Stream():
def _enqueue_plan(self, plan, *, queue=None):
queue = queue or self._first_non_track_queue
queue.enqueue(plan)
debug("Plan is: ", [str(e) for e in plan])
for element in plan:
queue.push(element)
self.session_elements += plan
# _run()
......@@ -1188,10 +1205,11 @@ class Stream():
# fetch_original (Bool): Whether to fetch original unstaged
#
def _fetch(self, elements, *, track_elements=None, fetch_original=False):
if track_elements is None:
track_elements = []
debug("FETCH", [str(e) for e in elements], [str(e) for e in track_elements])
# Subtract the track elements from the fetch elements, they will be added separately
fetch_plan = self._pipeline.subtract_elements(elements, track_elements)
......
......@@ -963,9 +963,36 @@ class Element(Plugin):
if meta in cls.__instantiated_elements:
return cls.__instantiated_elements[meta]
build_deps = [
Element._new_from_meta(meta_dep)
for meta_dep in meta.build_dependencies
]
runtime_deps = [
Element._new_from_meta(meta_dep)
for meta_dep in meta.dependencies
]
# # Instantiate dependencies
# for meta_dep in meta.dependencies:
# dependency = Element._new_from_meta(meta_dep)
# element.__runtime_dependencies.append(dependency)
# dependency.__reverse_dependencies.add(element)
# for meta_dep in meta.build_dependencies:
# dependency = Element._new_from_meta(meta_dep)
# element.__build_dependencies.append(dependency)
# dependency.__reverse_dependencies.add(element)
element = meta.project.create_element(meta, first_pass=meta.first_pass)
cls.__instantiated_elements[meta] = element
element.__runtime_dependencies = runtime_deps
element.__build_dependencies = build_deps
for dep in chain(build_deps, runtime_deps):
dep.__reverse_dependencies.add(element)
# Instantiate sources and generate their keys
for meta_source in meta.sources:
meta_source.first_pass = meta.kind == "junction"
......@@ -979,17 +1006,6 @@ class Element(Plugin):
if redundant_ref is not None:
cls.__redundant_source_refs.append((source, redundant_ref))
# Instantiate dependencies
for meta_dep in meta.dependencies:
dependency = Element._new_from_meta(meta_dep)
element.__runtime_dependencies.append(dependency)
dependency.__reverse_dependencies.add(element)
for meta_dep in meta.build_dependencies:
dependency = Element._new_from_meta(meta_dep)
element.__build_dependencies.append(dependency)
dependency.__reverse_dependencies.add(element)
return element
# _clear_meta_elements_cache()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment