From 09c334030ef527a1c1a7bf957a15f04fc00bbd26 Mon Sep 17 00:00:00 2001
From: Tristan Maat <tristan.maat@codethink.co.uk>
Date: Tue, 21 Nov 2017 15:25:45 +0000
Subject: [PATCH] _pipeline.py: Merge the track queue into the scheduler run

---
 buildstream/_frontend/main.py |  33 +++++----
 buildstream/_pipeline.py      | 133 +++++++++++++++++++++-------------
 2 files changed, 103 insertions(+), 63 deletions(-)

diff --git a/buildstream/_frontend/main.py b/buildstream/_frontend/main.py
index 3cf43c9b57..d4aec5fc35 100644
--- a/buildstream/_frontend/main.py
+++ b/buildstream/_frontend/main.py
@@ -206,8 +206,8 @@ def cli(context, **kwargs):
 def build(app, elements, all, track, track_save, track_except):
     """Build elements in a pipeline"""
 
-    app.initialize(elements, except_=track_except, rewritable=track_save,
-                   inconsistent=bool(track), fetch_remote_refs=True)
+    app.initialize(elements, except_=track_except, rewritable=track_save)
+    app.pipeline.initialize(fetch_remote_refs=True, inconsistent=track)
     app.print_heading()
     try:
         app.pipeline.build(app.scheduler, all, track, track_save)
@@ -249,8 +249,8 @@ def fetch(app, elements, deps, track, except_):
         plan:  Only dependencies required for the build plan
         all:   All dependencies
     """
-    app.initialize(elements, except_=except_,
-                   rewritable=track, inconsistent=track)
+    app.initialize(elements, except_=except_, rewritable=track)
+    app.pipeline.initialize(inconsistent=elements)
     try:
         dependencies = app.pipeline.deps_elements(deps)
         app.print_heading(deps=dependencies)
@@ -289,8 +289,8 @@ def track(app, elements, deps, except_):
         none:  No dependencies, just the element itself
         all:   All dependencies
     """
-    app.initialize(elements, except_=except_,
-                   rewritable=True, inconsistent=True)
+    app.initialize(elements, except_=except_, rewritable=True)
+    app.pipeline.initialize(inconsistent=elements)
     try:
         dependencies = app.pipeline.deps_elements(deps)
         app.print_heading(deps=dependencies)
@@ -322,7 +322,8 @@ def pull(app, elements, deps):
         none:  No dependencies, just the element itself
         all:   All dependencies
     """
-    app.initialize(elements, fetch_remote_refs=True)
+    app.initialize(elements)
+    app.pipeline.initialize(fetch_remote_refs=True)
     try:
         to_pull = app.pipeline.deps_elements(deps)
         app.pipeline.pull(app.scheduler, to_pull)
@@ -352,7 +353,8 @@ def push(app, elements, deps):
         none:  No dependencies, just the element itself
         all:   All dependencies
     """
-    app.initialize(elements, fetch_remote_refs=True)
+    app.initialize(elements)
+    app.pipeline.initialize(fetch_remote_refs=True)
     try:
         to_push = app.pipeline.deps_elements(deps)
         app.pipeline.push(app.scheduler, to_push)
@@ -431,7 +433,8 @@ def show(app, elements, deps, except_, order, format, downloadable):
         bst show target.bst --format \\
             $'---------- %{name} ----------\\n%{vars}'
     """
-    app.initialize(elements, except_=except_, fetch_remote_refs=downloadable)
+    app.initialize(elements, except_=except_)
+    app.pipeline.initialize(fetch_remote_refs=downloadable)
     try:
         dependencies = app.pipeline.deps_elements(deps)
     except PipelineError as e:
@@ -484,6 +487,7 @@ def shell(app, element, sysroot, build, command):
         scope = Scope.RUN
 
     app.initialize((element,))
+    app.pipeline.initialize()
 
     # Assert we have everything we need built.
     missing_deps = []
@@ -526,6 +530,7 @@ def checkout(app, element, directory, force, integrate):
     """Checkout a built artifact to the specified directory
     """
     app.initialize((element,))
+    app.pipeline.initialize()
     try:
         app.pipeline.checkout(directory, force, integrate)
         click.echo("")
@@ -557,7 +562,8 @@ def checkout(app, element, directory, force, integrate):
 def source_bundle(app, target, force, directory,
                   track, compression, except_):
     """Produce a source bundle to be manually executed"""
-    app.initialize((target,), rewritable=track, inconsistent=track)
+    app.initialize((target,), rewritable=track)
+    app.pipeline.initialize(inconsistent=[target])
     try:
         dependencies = app.pipeline.deps_elements('all')
         app.print_heading(dependencies)
@@ -598,7 +604,8 @@ def workspace():
 def workspace_open(app, no_checkout, force, source, track, element, directory):
     """Open a workspace for manual source modification"""
 
-    app.initialize((element,), rewritable=track, inconsistent=track)
+    app.initialize((element,), rewritable=track)
+    app.pipeline.initialize(inconsistent=[element])
     try:
         app.pipeline.open_workspace(app.scheduler, directory, source, no_checkout, track, force)
         click.echo("")
@@ -623,6 +630,7 @@ def workspace_close(app, source, remove_dir, element):
     """Close a workspace"""
 
     app.initialize((element,))
+    app.pipeline.initialize()
     if app.interactive and remove_dir:
         if not click.confirm('This will remove all your changes, are you sure?'):
             click.echo('Aborting')
@@ -653,6 +661,7 @@ def workspace_close(app, source, remove_dir, element):
 def workspace_reset(app, source, track, no_checkout, element):
     """Reset a workspace to its original state"""
     app.initialize((element,))
+    app.pipeline.initialize()
     if app.interactive:
         if not click.confirm('This will remove all your changes, are you sure?'):
             click.echo('Aborting')
@@ -846,9 +855,7 @@ class App():
 
         try:
             self.pipeline = Pipeline(self.context, self.project, elements, except_,
-                                     inconsistent=inconsistent,
                                      rewritable=rewritable,
-                                     fetch_remote_refs=fetch_remote_refs,
                                      load_ticker=self.load_ticker,
                                      resolve_ticker=self.resolve_ticker,
                                      remote_ticker=self.remote_ticker,
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index 63c74de5da..5a6acb2519 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -53,7 +53,7 @@ class Planner():
     # Here we want to traverse the same element more than once when
     # it is reachable from multiple places, with the interest of finding
     # the deepest occurance of every element
-    def plan_element(self, element, depth):
+    def plan_element(self, element, depth, ignore_cache):
         if element in self.visiting_elements:
             # circular dependency, already being processed
             return
@@ -65,19 +65,19 @@ class Planner():
 
         self.visiting_elements.add(element)
         for dep in element.dependencies(Scope.RUN, recurse=False):
-            self.plan_element(dep, depth)
+            self.plan_element(dep, depth, ignore_cache)
 
         # Dont try to plan builds of elements that are cached already
-        if not element._cached() and not element._remotely_cached():
+        if ignore_cache or not element._cached() and not element._remotely_cached():
             for dep in element.dependencies(Scope.BUILD, recurse=False):
-                self.plan_element(dep, depth + 1)
+                self.plan_element(dep, depth + 1, ignore_cache)
 
         self.depth_map[element] = depth
         self.visiting_elements.remove(element)
 
-    def plan(self, roots):
+    def plan(self, roots, ignore_cache=False):
         for root in roots:
-            self.plan_element(root, 0)
+            self.plan_element(root, 0, ignore_cache)
 
         depth_sorted = sorted(self.depth_map.items(), key=itemgetter(1), reverse=True)
         return [item[0] for item in depth_sorted if not item[0]._cached()]
@@ -114,13 +114,11 @@ class Planner():
 class Pipeline():
 
     def __init__(self, context, project, targets, except_,
-                 inconsistent=False,
                  rewritable=False,
-                 fetch_remote_refs=False,
-                 load_ticker=None,
-                 resolve_ticker=None,
                  remote_ticker=None,
-                 cache_ticker=None):
+                 cache_ticker=None,
+                 load_ticker=None,
+                 resolve_ticker=None):
         self.context = context
         self.project = project
         self.session_elements = 0
@@ -128,6 +126,9 @@ class Pipeline():
         self.unused_workspaces = []
         self._resolved_elements = {}
 
+        self.remote_ticker = remote_ticker
+        self.cache_ticker = cache_ticker
+
         loader = Loader(self.project.element_path, targets + except_,
                         self.project._options)
         meta_elements = loader.load(rewritable, load_ticker)
@@ -154,14 +155,26 @@ class Pipeline():
         if resolve_ticker:
             resolve_ticker(None)
 
-        # Preflight directly after resolving elements, before ever interrogating
-        # caches or anything.
-        for plugin in self.dependencies(Scope.ALL, include_sources=True):
-            plugin.preflight()
+    def initialize(self, fetch_remote_refs=False, inconsistent=None):
+        # Preflight directly, before ever interrogating caches or
+        # anything.
+        self.preflight()
 
         self.total_elements = len(list(self.dependencies(Scope.ALL)))
 
-        for element_name, source, workspace in project._list_workspaces():
+        self.initialize_workspaces()
+
+        if fetch_remote_refs and self.artifacts.can_fetch():
+            self.fetch_remote_refs()
+
+        self.resolve_cache_keys(inconsistent)
+
+    def preflight(self):
+        for plugin in self.dependencies(Scope.ALL, include_sources=True):
+            plugin.preflight()
+
+    def initialize_workspaces(self):
+        for element_name, source, workspace in self.project._list_workspaces():
             for target in self.targets:
                 element = target.search(Scope.ALL, element_name)
 
@@ -171,20 +184,24 @@ class Pipeline():
 
                 self.project._set_workspace(element, source, workspace)
 
-        if fetch_remote_refs and self.artifacts.can_fetch():
-            try:
-                if remote_ticker:
-                    remote_ticker(self.artifacts.artifact_pull)
-                self.artifacts.fetch_remote_refs()
-            except ArtifactError:
-                self.message(MessageType.WARN, "Failed to fetch remote refs")
-                self.artifacts.set_offline()
+    def fetch_remote_refs(self):
+        try:
+            if self.remote_ticker:
+                self.remote_ticker(self.artifacts.artifact_pull)
+            self.artifacts.fetch_remote_refs()
+        except ArtifactError:
+            self.message(MessageType.WARN, "Failed to fetch remote refs")
+            self.artifacts.set_offline()
+
+    def resolve_cache_keys(self, inconsistent):
+        if inconsistent:
+            inconsistent = self.get_elements_to_track(inconsistent)
 
         for element in self.dependencies(Scope.ALL):
-            if cache_ticker:
-                cache_ticker(element.name)
+            if self.cache_ticker:
+                self.cache_ticker(element.name)
 
-            if inconsistent:
+            if inconsistent and element.name in inconsistent:
                 # Load the pipeline in an explicitly inconsistent state, use
                 # this for pipelines with tracking queues enabled.
                 element._force_inconsistent()
@@ -193,8 +210,8 @@ class Pipeline():
                 # for the first time.
                 element._cached()
 
-        if cache_ticker:
-            cache_ticker(None)
+        if self.cache_ticker:
+            self.cache_ticker(None)
 
     # Generator function to iterate over elements and optionally
     # also iterate over sources.
@@ -217,10 +234,10 @@ class Pipeline():
     #
     # Consequently it also means that cache keys can be resolved.
     #
-    def assert_consistent(self, toplevel, recalculate=False):
+    def assert_consistent(self, toplevel):
         inconsistent = []
         for element in toplevel:
-            if element._consistency(recalculate=recalculate) == Consistency.INCONSISTENT:
+            if element._consistency() == Consistency.INCONSISTENT:
                 inconsistent.append(element)
 
         if inconsistent:
@@ -239,7 +256,7 @@ class Pipeline():
         build_plan = Planner().plan(self.targets)
 
         if except_:
-            self.remove_elements(build_plan)
+            build_plan = self.remove_elements(build_plan)
 
         for element in build_plan:
             yield element
@@ -320,10 +337,10 @@ class Pipeline():
     # If no error is encountered while tracking, then the project files
     # are rewritten inline.
     #
-    def track(self, scheduler, dependencies, save=True):
+    def track(self, scheduler, dependencies):
 
         dependencies = list(dependencies)
-        track = TrackQueue(save=save)
+        track = TrackQueue(save=True)
         track.enqueue(dependencies)
         self.session_elements = len(dependencies)
 
@@ -395,6 +412,15 @@ class Pipeline():
                          "Fetched {} elements".format(fetched),
                          elapsed=elapsed)
 
+    def get_elements_to_track(self, track_targets):
+        planner = Planner()
+
+        target_elements = [e for e in self.dependencies(Scope.ALL)
+                           if e.name in track_targets]
+        track_elements = planner.plan(target_elements, ignore_cache=True)
+
+        return self.remove_elements(track_elements)
+
     # build()
     #
     # Builds (assembles) elements in the pipeline.
@@ -416,30 +442,32 @@ class Pipeline():
 
         # Set up a separate pipeline to track individual elements
         # first.
-        visited = {}
-        track_plan = []
-        track_elements = [element for element in self.dependencies(Scope.ALL)
-                          if element.name in track_first]
-        for element in track_elements:
-            track_plan.extend(element.dependencies(Scope.ALL, visited=visited))
-
-        if track_plan:
-            track_plan = self.remove_elements(track_plan)
-            self.track(scheduler, track_plan, save=save)
+        track_plan = set()
+        if track_first:
+            track_plan = set(self.get_elements_to_track(track_first))
 
         if build_all:
-            plan = list(self.dependencies(Scope.ALL))
+            plan = self.dependencies(Scope.ALL)
         else:
-            plan = list(self.plan(except_=False))
+            plan = self.plan(except_=False)
+
+        # We want to start the build queue with any elements that are
+        # not being tracked first
+        plan = [e for e in plan if e not in track_plan]
 
-        # Assert that we have a consistent pipeline now
-        self.assert_consistent(plan, recalculate=True)
+        # Assert that we have a consistent pipeline now (elements in
+        # track_plan will be made consistent)
+        self.assert_consistent(plan)
 
         fetch = FetchQueue(skip_cached=True)
         build = BuildQueue()
+        track = None
         pull = None
         push = None
         queues = []
+        if track_plan:
+            track = TrackQueue(save=save)
+            queues.append(track)
         if self.artifacts.can_fetch():
             pull = PullQueue()
             queues.append(pull)
@@ -448,9 +476,14 @@ class Pipeline():
         if self.can_push_remote_artifact_cache():
             push = PushQueue()
             queues.append(push)
-        queues[0].enqueue(plan)
 
-        self.session_elements = len(plan)
+        if track:
+            queues[0].enqueue(track_plan)
+            queues[1].enqueue(plan)
+        else:
+            queues[0].enqueue(plan)
+
+        self.session_elements = len(track_plan) + len(plan)
 
         self.message(MessageType.START, "Starting build")
         elapsed, status = scheduler.run(queues)
-- 
GitLab