Skip to content
Snippets Groups Projects
Commit 09c33403 authored by Tristan Maat's avatar Tristan Maat
Browse files

_pipeline.py: Merge the track queue into the scheduler run

parent 40df453f
No related branches found
No related tags found
Loading
......@@ -206,8 +206,8 @@ def cli(context, **kwargs):
def build(app, elements, all, track, track_save, track_except):
"""Build elements in a pipeline"""
app.initialize(elements, except_=track_except, rewritable=track_save,
inconsistent=bool(track), fetch_remote_refs=True)
app.initialize(elements, except_=track_except, rewritable=track_save)
app.pipeline.initialize(fetch_remote_refs=True, inconsistent=track)
app.print_heading()
try:
app.pipeline.build(app.scheduler, all, track, track_save)
......@@ -249,8 +249,8 @@ def fetch(app, elements, deps, track, except_):
plan: Only dependencies required for the build plan
all: All dependencies
"""
app.initialize(elements, except_=except_,
rewritable=track, inconsistent=track)
app.initialize(elements, except_=except_, rewritable=track)
app.pipeline.initialize(inconsistent=elements)
try:
dependencies = app.pipeline.deps_elements(deps)
app.print_heading(deps=dependencies)
......@@ -289,8 +289,8 @@ def track(app, elements, deps, except_):
none: No dependencies, just the element itself
all: All dependencies
"""
app.initialize(elements, except_=except_,
rewritable=True, inconsistent=True)
app.initialize(elements, except_=except_, rewritable=True)
app.pipeline.initialize(inconsistent=elements)
try:
dependencies = app.pipeline.deps_elements(deps)
app.print_heading(deps=dependencies)
......@@ -322,7 +322,8 @@ def pull(app, elements, deps):
none: No dependencies, just the element itself
all: All dependencies
"""
app.initialize(elements, fetch_remote_refs=True)
app.initialize(elements)
app.pipeline.initialize(fetch_remote_refs=True)
try:
to_pull = app.pipeline.deps_elements(deps)
app.pipeline.pull(app.scheduler, to_pull)
......@@ -352,7 +353,8 @@ def push(app, elements, deps):
none: No dependencies, just the element itself
all: All dependencies
"""
app.initialize(elements, fetch_remote_refs=True)
app.initialize(elements)
app.pipeline.initialize(fetch_remote_refs=True)
try:
to_push = app.pipeline.deps_elements(deps)
app.pipeline.push(app.scheduler, to_push)
......@@ -431,7 +433,8 @@ def show(app, elements, deps, except_, order, format, downloadable):
bst show target.bst --format \\
$'---------- %{name} ----------\\n%{vars}'
"""
app.initialize(elements, except_=except_, fetch_remote_refs=downloadable)
app.initialize(elements, except_=except_)
app.pipeline.initialize(fetch_remote_refs=downloadable)
try:
dependencies = app.pipeline.deps_elements(deps)
except PipelineError as e:
......@@ -484,6 +487,7 @@ def shell(app, element, sysroot, build, command):
scope = Scope.RUN
app.initialize((element,))
app.pipeline.initialize()
# Assert we have everything we need built.
missing_deps = []
......@@ -526,6 +530,7 @@ def checkout(app, element, directory, force, integrate):
"""Checkout a built artifact to the specified directory
"""
app.initialize((element,))
app.pipeline.initialize()
try:
app.pipeline.checkout(directory, force, integrate)
click.echo("")
......@@ -557,7 +562,8 @@ def checkout(app, element, directory, force, integrate):
def source_bundle(app, target, force, directory,
track, compression, except_):
"""Produce a source bundle to be manually executed"""
app.initialize((target,), rewritable=track, inconsistent=track)
app.initialize((target,), rewritable=track)
app.pipeline.initialize(inconsistent=[target])
try:
dependencies = app.pipeline.deps_elements('all')
app.print_heading(dependencies)
......@@ -598,7 +604,8 @@ def workspace():
def workspace_open(app, no_checkout, force, source, track, element, directory):
"""Open a workspace for manual source modification"""
app.initialize((element,), rewritable=track, inconsistent=track)
app.initialize((element,), rewritable=track)
app.pipeline.initialize(inconsistent=[element])
try:
app.pipeline.open_workspace(app.scheduler, directory, source, no_checkout, track, force)
click.echo("")
......@@ -623,6 +630,7 @@ def workspace_close(app, source, remove_dir, element):
"""Close a workspace"""
app.initialize((element,))
app.pipeline.initialize()
if app.interactive and remove_dir:
if not click.confirm('This will remove all your changes, are you sure?'):
click.echo('Aborting')
......@@ -653,6 +661,7 @@ def workspace_close(app, source, remove_dir, element):
def workspace_reset(app, source, track, no_checkout, element):
"""Reset a workspace to its original state"""
app.initialize((element,))
app.pipeline.initialize()
if app.interactive:
if not click.confirm('This will remove all your changes, are you sure?'):
click.echo('Aborting')
......@@ -846,9 +855,7 @@ class App():
try:
self.pipeline = Pipeline(self.context, self.project, elements, except_,
inconsistent=inconsistent,
rewritable=rewritable,
fetch_remote_refs=fetch_remote_refs,
load_ticker=self.load_ticker,
resolve_ticker=self.resolve_ticker,
remote_ticker=self.remote_ticker,
......
......@@ -53,7 +53,7 @@ class Planner():
# Here we want to traverse the same element more than once when
# it is reachable from multiple places, with the interest of finding
# the deepest occurance of every element
def plan_element(self, element, depth):
def plan_element(self, element, depth, ignore_cache):
if element in self.visiting_elements:
# circular dependency, already being processed
return
......@@ -65,19 +65,19 @@ class Planner():
self.visiting_elements.add(element)
for dep in element.dependencies(Scope.RUN, recurse=False):
self.plan_element(dep, depth)
self.plan_element(dep, depth, ignore_cache)
# Dont try to plan builds of elements that are cached already
if not element._cached() and not element._remotely_cached():
if ignore_cache or not element._cached() and not element._remotely_cached():
for dep in element.dependencies(Scope.BUILD, recurse=False):
self.plan_element(dep, depth + 1)
self.plan_element(dep, depth + 1, ignore_cache)
self.depth_map[element] = depth
self.visiting_elements.remove(element)
def plan(self, roots):
def plan(self, roots, ignore_cache=False):
for root in roots:
self.plan_element(root, 0)
self.plan_element(root, 0, ignore_cache)
depth_sorted = sorted(self.depth_map.items(), key=itemgetter(1), reverse=True)
return [item[0] for item in depth_sorted if not item[0]._cached()]
......@@ -114,13 +114,11 @@ class Planner():
class Pipeline():
def __init__(self, context, project, targets, except_,
inconsistent=False,
rewritable=False,
fetch_remote_refs=False,
load_ticker=None,
resolve_ticker=None,
remote_ticker=None,
cache_ticker=None):
cache_ticker=None,
load_ticker=None,
resolve_ticker=None):
self.context = context
self.project = project
self.session_elements = 0
......@@ -128,6 +126,9 @@ class Pipeline():
self.unused_workspaces = []
self._resolved_elements = {}
self.remote_ticker = remote_ticker
self.cache_ticker = cache_ticker
loader = Loader(self.project.element_path, targets + except_,
self.project._options)
meta_elements = loader.load(rewritable, load_ticker)
......@@ -154,14 +155,26 @@ class Pipeline():
if resolve_ticker:
resolve_ticker(None)
# Preflight directly after resolving elements, before ever interrogating
# caches or anything.
for plugin in self.dependencies(Scope.ALL, include_sources=True):
plugin.preflight()
def initialize(self, fetch_remote_refs=False, inconsistent=None):
# Preflight directly, before ever interrogating caches or
# anything.
self.preflight()
self.total_elements = len(list(self.dependencies(Scope.ALL)))
for element_name, source, workspace in project._list_workspaces():
self.initialize_workspaces()
if fetch_remote_refs and self.artifacts.can_fetch():
self.fetch_remote_refs()
self.resolve_cache_keys(inconsistent)
def preflight(self):
for plugin in self.dependencies(Scope.ALL, include_sources=True):
plugin.preflight()
def initialize_workspaces(self):
for element_name, source, workspace in self.project._list_workspaces():
for target in self.targets:
element = target.search(Scope.ALL, element_name)
......@@ -171,20 +184,24 @@ class Pipeline():
self.project._set_workspace(element, source, workspace)
if fetch_remote_refs and self.artifacts.can_fetch():
try:
if remote_ticker:
remote_ticker(self.artifacts.artifact_pull)
self.artifacts.fetch_remote_refs()
except ArtifactError:
self.message(MessageType.WARN, "Failed to fetch remote refs")
self.artifacts.set_offline()
def fetch_remote_refs(self):
try:
if self.remote_ticker:
self.remote_ticker(self.artifacts.artifact_pull)
self.artifacts.fetch_remote_refs()
except ArtifactError:
self.message(MessageType.WARN, "Failed to fetch remote refs")
self.artifacts.set_offline()
def resolve_cache_keys(self, inconsistent):
if inconsistent:
inconsistent = self.get_elements_to_track(inconsistent)
for element in self.dependencies(Scope.ALL):
if cache_ticker:
cache_ticker(element.name)
if self.cache_ticker:
self.cache_ticker(element.name)
if inconsistent:
if inconsistent and element.name in inconsistent:
# Load the pipeline in an explicitly inconsistent state, use
# this for pipelines with tracking queues enabled.
element._force_inconsistent()
......@@ -193,8 +210,8 @@ class Pipeline():
# for the first time.
element._cached()
if cache_ticker:
cache_ticker(None)
if self.cache_ticker:
self.cache_ticker(None)
# Generator function to iterate over elements and optionally
# also iterate over sources.
......@@ -217,10 +234,10 @@ class Pipeline():
#
# Consequently it also means that cache keys can be resolved.
#
def assert_consistent(self, toplevel, recalculate=False):
def assert_consistent(self, toplevel):
inconsistent = []
for element in toplevel:
if element._consistency(recalculate=recalculate) == Consistency.INCONSISTENT:
if element._consistency() == Consistency.INCONSISTENT:
inconsistent.append(element)
if inconsistent:
......@@ -239,7 +256,7 @@ class Pipeline():
build_plan = Planner().plan(self.targets)
if except_:
self.remove_elements(build_plan)
build_plan = self.remove_elements(build_plan)
for element in build_plan:
yield element
......@@ -320,10 +337,10 @@ class Pipeline():
# If no error is encountered while tracking, then the project files
# are rewritten inline.
#
def track(self, scheduler, dependencies, save=True):
def track(self, scheduler, dependencies):
dependencies = list(dependencies)
track = TrackQueue(save=save)
track = TrackQueue(save=True)
track.enqueue(dependencies)
self.session_elements = len(dependencies)
......@@ -395,6 +412,15 @@ class Pipeline():
"Fetched {} elements".format(fetched),
elapsed=elapsed)
def get_elements_to_track(self, track_targets):
planner = Planner()
target_elements = [e for e in self.dependencies(Scope.ALL)
if e.name in track_targets]
track_elements = planner.plan(target_elements, ignore_cache=True)
return self.remove_elements(track_elements)
# build()
#
# Builds (assembles) elements in the pipeline.
......@@ -416,30 +442,32 @@ class Pipeline():
# Set up a separate pipeline to track individual elements
# first.
visited = {}
track_plan = []
track_elements = [element for element in self.dependencies(Scope.ALL)
if element.name in track_first]
for element in track_elements:
track_plan.extend(element.dependencies(Scope.ALL, visited=visited))
if track_plan:
track_plan = self.remove_elements(track_plan)
self.track(scheduler, track_plan, save=save)
track_plan = set()
if track_first:
track_plan = set(self.get_elements_to_track(track_first))
if build_all:
plan = list(self.dependencies(Scope.ALL))
plan = self.dependencies(Scope.ALL)
else:
plan = list(self.plan(except_=False))
plan = self.plan(except_=False)
# We want to start the build queue with any elements that are
# not being tracked first
plan = [e for e in plan if e not in track_plan]
# Assert that we have a consistent pipeline now
self.assert_consistent(plan, recalculate=True)
# Assert that we have a consistent pipeline now (elements in
# track_plan will be made consistent)
self.assert_consistent(plan)
fetch = FetchQueue(skip_cached=True)
build = BuildQueue()
track = None
pull = None
push = None
queues = []
if track_plan:
track = TrackQueue(save=save)
queues.append(track)
if self.artifacts.can_fetch():
pull = PullQueue()
queues.append(pull)
......@@ -448,9 +476,14 @@ class Pipeline():
if self.can_push_remote_artifact_cache():
push = PushQueue()
queues.append(push)
queues[0].enqueue(plan)
self.session_elements = len(plan)
if track:
queues[0].enqueue(track_plan)
queues[1].enqueue(plan)
else:
queues[0].enqueue(plan)
self.session_elements = len(track_plan) + len(plan)
self.message(MessageType.START, "Starting build")
elapsed, status = scheduler.run(queues)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment