Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • willsalmon/buildstream
  • CumHoleZH/buildstream
  • tchaik/buildstream
  • DCotyPortfolio/buildstream
  • jesusoctavioas/buildstream
  • patrickmmartin/buildstream
  • franred/buildstream
  • tintou/buildstream
  • alatiera/buildstream
  • martinblanchard/buildstream
  • neverdie22042524/buildstream
  • Mattlk13/buildstream
  • PServers/buildstream
  • phamnghia610909/buildstream
  • chiaratolentino/buildstream
  • eysz7-x-x/buildstream
  • kerrick1/buildstream
  • matthew-yates/buildstream
  • twofeathers/buildstream
  • mhadjimichael/buildstream
  • pointswaves/buildstream
  • Mr.JackWilson/buildstream
  • Tw3akG33k/buildstream
  • AlexFazakas/buildstream
  • eruidfkiy/buildstream
  • clamotion2/buildstream
  • nanonyme/buildstream
  • wickyjaaa/buildstream
  • nmanchev/buildstream
  • bojorquez.ja/buildstream
  • mostynb/buildstream
  • highpit74/buildstream
  • Demo112/buildstream
  • ba2014sheer/buildstream
  • tonimadrino/buildstream
  • usuario2o/buildstream
  • Angelika123456/buildstream
  • neo355/buildstream
  • corentin-ferlay/buildstream
  • coldtom/buildstream
  • wifitvbox81/buildstream
  • 358253885/buildstream
  • seanborg/buildstream
  • SotK/buildstream
  • DouglasWinship/buildstream
  • karansthr97/buildstream
  • louib/buildstream
  • bwh-ct/buildstream
  • robjh/buildstream
  • we88c0de/buildstream
  • zhengxian5555/buildstream
51 results
Show changes
Commits on Source (14)
Showing
with 709 additions and 7 deletions
......@@ -128,6 +128,7 @@ class Project():
self._shell_host_files = [] # A list of HostMount objects
self.artifact_cache_specs = None
self.remote_execution_url = None
self._sandbox = None
self._splits = None
......@@ -471,7 +472,7 @@ class Project():
'aliases', 'name',
'artifacts', 'options',
'fail-on-overlap', 'shell', 'fatal-warnings',
'ref-storage', 'sandbox', 'mirrors'
'ref-storage', 'sandbox', 'mirrors', 'remote-execution'
])
#
......@@ -482,6 +483,11 @@ class Project():
# Load artifacts pull/push configuration for this project
self.artifact_cache_specs = ArtifactCache.specs_from_config_node(config, self.directory)
# Load remote-execution configuration for this project
remote_execution = _yaml.node_get(config, Mapping, 'remote-execution')
_yaml.node_validate(remote_execution, ['url'])
self.remote_execution_url = _yaml.node_get(remote_execution, str, 'url')
# Load sandbox environment variables
self.base_environment = _yaml.node_get(config, Mapping, 'environment')
self.base_env_nocache = _yaml.node_get(config, list, 'environment-nocache')
......
......@@ -204,3 +204,6 @@ shell:
# Command to run when `bst shell` does not provide a command
#
command: [ 'sh', '-i' ]
remote-execution:
url: ""
\ No newline at end of file
......@@ -95,6 +95,7 @@ from . import _site
from ._platform import Platform
from .plugin import CoreWarnings
from .sandbox._config import SandboxConfig
from .sandbox._sandboxremote import SandboxRemote
from .storage.directory import Directory
from .storage._filebaseddirectory import FileBasedDirectory
......@@ -250,6 +251,12 @@ class Element(Plugin):
# Extract Sandbox config
self.__sandbox_config = self.__extract_sandbox_config(meta)
# Extract remote execution URL
if not self.__is_junction:
self.__remote_execution_url = project.remote_execution_url
else:
self.__remote_execution_url = None
def __lt__(self, other):
return self.name < other.name
......@@ -1570,6 +1577,8 @@ class Element(Plugin):
finally:
if collect is not None:
try:
# Sandbox will probably have replaced its virtual directory, so get it again
sandbox_vroot = sandbox.get_virtual_directory()
collectvdir = sandbox_vroot.descend(collect.lstrip(os.sep).split(os.sep))
except VirtualDirectoryError:
# No collect directory existed
......@@ -2146,7 +2155,32 @@ class Element(Plugin):
project = self._get_project()
platform = Platform.get_platform()
if directory is not None and os.path.exists(directory):
if self.__remote_execution_url and self.BST_VIRTUAL_DIRECTORY:
if not self.__artifacts.has_push_remotes(element=self):
# Give an early warning if remote execution will not work
raise ElementError("Artifact {} is configured to use remote execution but has no push remotes. "
.format(self.name) +
"The remote artifact server(s) may not be correctly configured or contactable.")
self.info("Using a remote sandbox for artifact {}".format(self.name))
sandbox = SandboxRemote(context, project,
directory,
stdout=stdout,
stderr=stderr,
config=config,
server_url=self.__remote_execution_url,
allow_real_directory=False)
yield sandbox
elif directory is not None and os.path.exists(directory):
if self.__remote_execution_url:
self.warn("Artifact {} is configured to use remote execution but element plugin does not support it."
.format(self.name), detail="Element plugin '{kind}' does not support virtual directories."
.format(kind=self.get_kind()), warning_token="remote-failure")
self.info("Falling back to local sandbox for artifact {}".format(self.name))
sandbox = platform.create_sandbox(context, project,
directory,
stdout=stdout,
......
......@@ -57,7 +57,8 @@ from buildstream import BuildElement
# Element implementation for the 'autotools' kind.
class AutotoolsElement(BuildElement):
pass
# Supports virtual directories (required for remote execution)
BST_VIRTUAL_DIRECTORY = True
# Plugin entry point
......
......@@ -56,7 +56,8 @@ from buildstream import BuildElement
# Element implementation for the 'cmake' kind.
class CMakeElement(BuildElement):
pass
# Supports virtual directories (required for remote execution)
BST_VIRTUAL_DIRECTORY = True
# Plugin entry point
......
......@@ -38,7 +38,8 @@ from buildstream import BuildElement
# Element implementation for the 'make' kind.
class MakeElement(BuildElement):
pass
# Supports virtual directories (required for remote execution)
BST_VIRTUAL_DIRECTORY = True
# Plugin entry point
......
......@@ -53,7 +53,8 @@ from buildstream import BuildElement
# Element implementation for the 'meson' kind.
class MesonElement(BuildElement):
pass
# Supports virtual directories (required for remote execution)
BST_VIRTUAL_DIRECTORY = True
# Plugin entry point
......
......@@ -33,7 +33,8 @@ from buildstream import BuildElement
# Element implementation for the 'qmake' kind.
class QMakeElement(BuildElement):
pass
# Supports virtual directories (required for remote execution)
BST_VIRTUAL_DIRECTORY = True
# Plugin entry point
......
......@@ -20,3 +20,4 @@
from .sandbox import Sandbox, SandboxFlags
from ._sandboxchroot import SandboxChroot
from ._sandboxbwrap import SandboxBwrap
from ._sandboxremote import SandboxRemote
#!/usr/bin/env python3
#
# Copyright (C) 2018 Bloomberg LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Jim MacArthur <jim.macarthur@codethink.co.uk>
import os
import re
from urllib.parse import urlparse
import grpc
from . import Sandbox
from ..storage._filebaseddirectory import FileBasedDirectory
from ..storage._casbaseddirectory import CasBasedDirectory
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._artifactcache.cascache import CASCache
class SandboxError(Exception):
pass
# SandboxRemote()
#
# This isn't really a sandbox, it's a stub which sends all the sources and build
# commands to a remote server and retrieves the results from it.
#
class SandboxRemote(Sandbox):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cascache = None
url = urlparse(kwargs['server_url'])
if not url.scheme or not url.hostname or not url.port:
raise SandboxError("Configured remote URL '{}' does not match the expected layout. "
.format(self.server_url) +
"It should be of the form <protocol>://<domain name>:<port>.")
elif url.scheme != 'http':
raise SandboxError("Configured remote '{}' uses an unsupported protocol. "
"Only plain HTTP is currenlty supported (no HTTPS).")
self.server_url = '{}:{}'.format(url.hostname, url.port)
def _get_cascache(self):
if self.cascache is None:
self.cascache = CASCache(self._get_context())
self.cascache.setup_remotes(use_config=True)
return self.cascache
def run_remote_command(self, command, input_root_digest, working_directory, environment):
# Sends an execution request to the remote execution server.
#
# This function blocks until it gets a response from the server.
#
environment_variables = [remote_execution_pb2.Command.
EnvironmentVariable(name=k, value=v)
for (k, v) in environment.items()]
# Create and send the Command object.
remote_command = remote_execution_pb2.Command(arguments=command,
working_directory=working_directory,
environment_variables=environment_variables,
output_files=[],
output_directories=[self._output_directory],
platform=None)
cascache = self._get_cascache()
# Upload the Command message to the remote CAS server
command_digest = cascache.push_message(self._get_project(), remote_command)
if not command_digest or not cascache.verify_digest_pushed(self._get_project(), command_digest):
# Command push failed
return None
# Create and send the action.
action = remote_execution_pb2.Action(command_digest=command_digest,
input_root_digest=input_root_digest,
timeout=None,
do_not_cache=False)
# Upload the Action message to the remote CAS server
action_digest = cascache.push_message(self._get_project(), action)
if not action_digest or not cascache.verify_digest_pushed(self._get_project(), action_digest):
# Action push failed
return None
# Next, try to create a communication channel to the BuildGrid server.
channel = grpc.insecure_channel(self.server_url)
stub = remote_execution_pb2_grpc.ExecutionStub(channel)
request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
skip_cache_lookup=False)
try:
operation_iterator = stub.Execute(request)
except grpc.RpcError:
return None
operation = None
with self._get_context().timed_activity("Waiting for the remote build to complete"):
# It is advantageous to check operation_iterator.code() is grpc.StatusCode.OK here,
# which will check the server is actually contactable. However, calling it when the
# server is available seems to cause .code() to hang forever.
for operation in operation_iterator:
if operation.done:
break
return operation
def process_job_output(self, output_directories, output_files):
# Reads the remote execution server response to an execution request.
#
# output_directories is an array of OutputDirectory objects.
# output_files is an array of OutputFile objects.
#
# We only specify one output_directory, so it's an error
# for there to be any output files or more than one directory at the moment.
#
if output_files:
raise SandboxError("Output files were returned when we didn't request any.")
elif not output_directories:
error_text = "No output directory was returned from the build server."
raise SandboxError(error_text)
elif len(output_directories) > 1:
error_text = "More than one output directory was returned from the build server: {}."
raise SandboxError(error_text.format(output_directories))
tree_digest = output_directories[0].tree_digest
if tree_digest is None or not tree_digest.hash:
raise SandboxError("Output directory structure had no digest attached.")
cascache = self._get_cascache()
# Now do a pull to ensure we have the necessary parts.
dir_digest = cascache.pull_tree(self._get_project(), tree_digest)
if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes:
raise SandboxError("Output directory structure pulling from remote failed.")
path_components = os.path.split(self._output_directory)
# Now what we have is a digest for the output. Once we return, the calling process will
# attempt to descend into our directory and find that directory, so we need to overwrite
# that.
if not path_components:
# The artifact wants the whole directory; we could just return the returned hash in its
# place, but we don't have a means to do that yet.
raise SandboxError("Unimplemented: Output directory is empty or equal to the sandbox root.")
# At the moment, we will get the whole directory back in the first directory argument and we need
# to replace the sandbox's virtual directory with that. Creating a new virtual directory object
# from another hash will be interesting, though...
new_dir = CasBasedDirectory(self._get_context(), ref=dir_digest)
self._set_virtual_directory(new_dir)
def run(self, command, flags, *, cwd=None, env=None):
# Upload sources
upload_vdir = self.get_virtual_directory()
if isinstance(upload_vdir, FileBasedDirectory):
# Make a new temporary directory to put source in
upload_vdir = CasBasedDirectory(self._get_context(), ref=None)
upload_vdir.import_files(self.get_virtual_directory()._get_underlying_directory())
upload_vdir.recalculate_hash()
cascache = self._get_cascache()
# Now, push that key (without necessarily needing a ref) to the remote.
vdir_digest = cascache.push_directory(self._get_project(), upload_vdir)
if not vdir_digest or not cascache.verify_digest_pushed(self._get_project(), vdir_digest):
raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
# Set up environment and working directory
if cwd is None:
cwd = self._get_work_directory()
if cwd is None:
cwd = '/'
if env is None:
env = self._get_environment()
# We want command args as a list of strings
if isinstance(command, str):
command = [command]
# Now transmit the command to execute
operation = self.run_remote_command(command, upload_vdir.ref, cwd, env)
if operation is None:
# Failure of remote execution, usually due to an error in BuildStream
# NB This error could be raised in __run_remote_command
raise SandboxError("No response returned from server")
assert(not operation.HasField('error') and operation.HasField('response'))
execution_response = remote_execution_pb2.ExecuteResponse()
# The response is expected to be an ExecutionResponse message
assert(operation.response.Is(execution_response.DESCRIPTOR))
operation.response.Unpack(execution_response)
if execution_response.status.code != 0:
# A normal error during the build: the remote execution system
# has worked correctly but the command failed.
# execution_response.error also contains 'message' (str) and
# 'details' (iterator of Any) which we ignore at the moment.
return execution_response.status.code
action_result = execution_response.result
self.process_job_output(action_result.output_directories, action_result.output_files)
return 0
......@@ -204,6 +204,24 @@ with an artifact share.
You can also specify a list of caches here; earlier entries in the list
will have higher priority than later ones.
Remote execution
~~~~~~~~~~~~~~~~
BuildStream supports remote execution using the Google Remote Execution API
(REAPI). A description of how remote execution works is beyond the scope
of this document, but you can specify a remote server complying with the REAPI
using the `remote-execution` option:
.. code:: yaml
remote-execution:
# A url defining a remote execution server
url: http://buildserver.example.com:50051
The url should contain a hostname and port separated by ':'. Only plain HTTP is
currently suported (no HTTPS).
The Remote Execution API can be found via https://github.com/bazelbuild/remote-apis.
.. _project_essentials_mirrors:
......
kind: compose
depends:
- filename: import-bin.bst
type: build
- filename: import-dev.bst
type: build
config:
# Dont try running the sandbox, we dont have a
# runtime to run anything in this context.
integrate: False
kind: import
sources:
- kind: local
path: files/bin-files
kind: import
sources:
- kind: local
path: files/dev-files
kind: stack
description: |
Main stack target for the bst build test
depends:
- import-bin.bst
- import-dev.bst
- compose-all.bst
#!/bin/bash
echo "Hello !"
#ifndef __PONY_H__
#define __PONY_H__
#define PONY_BEGIN "Once upon a time, there was a pony."
#define PONY_END "And they lived happily ever after, the end."
#define MAKE_PONY(story) \
PONY_BEGIN \
story \
PONY_END
#endif /* __PONY_H__ */
# Project config for frontend build test
name: test
element-path: elements
import hashlib
import os
import pytest
from buildstream._artifactcache.artifactcache import ArtifactCacheSpec
from buildstream._artifactcache.cascache import CASCache
from buildstream._context import Context
from buildstream._project import Project
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from tests.testutils import cli, create_artifact_share
# Project directory
DATA_DIR = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"project",
)
# Handle messages from the pipeline
def message_handler(message, context):
pass
def tree_maker(cas, tree, directory):
if tree.root.ByteSize() == 0:
tree.root.CopyFrom(directory)
for directory_node in directory.directories:
child_directory = tree.children.add()
with open(cas.objpath(directory_node.digest), 'rb') as f:
child_directory.ParseFromString(f.read())
tree_maker(cas, tree, child_directory)
@pytest.mark.datafiles(DATA_DIR)
def test_pull(cli, tmpdir, datafiles):
project_dir = str(datafiles)
# Set up an artifact cache.
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
# Configure artifact share
cli.configure({
'scheduler': {
'pushers': 1
},
'artifacts': {
'url': share.repo,
'push': True,
}
})
# First build the project with the artifact cache configured
result = cli.run(project=project_dir, args=['build', 'target.bst'])
result.assert_success()
# Assert that we are now cached locally
assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
# Assert that we shared/pushed the cached artifact
element_key = cli.get_element_key(project_dir, 'target.bst')
assert share.has_artifact('test', 'target.bst', element_key)
# Delete the artifact locally
cli.remove_artifact_from_cache(project_dir, 'target.bst')
# Assert that we are not cached locally anymore
assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
# Fake minimal context
context = Context()
context.set_message_handler(message_handler)
context.sched_pushers = 1
context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
push=True)]
# Load the project and CAS cache
project = Project(project_dir, context)
project.ensure_fully_loaded()
cas = CASCache(context)
# Assert that the element's artifact is **not** cached
element = project.load_elements(['target.bst'], cas)[0]
element_key = cli.get_element_key(project_dir, 'target.bst')
assert not cas.contains(element, element_key)
# Manually setup the CAS remote
cas.setup_remotes(use_config=True)
cas.initialize_remotes()
assert cas.has_push_remotes()
# Pull the artifact
pulled = cas.pull(element, element_key)
assert pulled is True
assert cas.contains(element, element_key)
# Finally, close the opened gRPC channels properly!
for remote in cas._remotes[project]:
if remote.channel:
remote.channel.close()
@pytest.mark.datafiles(DATA_DIR)
def test_pull_tree(cli, tmpdir, datafiles):
project_dir = str(datafiles)
# Set up an artifact cache.
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
# Configure artifact share
cli.configure({
'scheduler': {
'pushers': 1
},
'artifacts': {
'url': share.repo,
'push': True,
}
})
# First build the project with the artifact cache configured
result = cli.run(project=project_dir, args=['build', 'target.bst'])
result.assert_success()
# Assert that we are now cached locally
assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
# Assert that we shared/pushed the cached artifact
element_key = cli.get_element_key(project_dir, 'target.bst')
assert share.has_artifact('test', 'target.bst', element_key)
# Fake minimal context
context = Context()
context.set_message_handler(message_handler)
context.sched_pushers = 1
context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
push=True)]
# Load the project and CAS cache
project = Project(project_dir, context)
project.ensure_fully_loaded()
cas = CASCache(context)
# Assert that the element's artifact is cached
element = project.load_elements(['target.bst'], cas)[0]
element_key = cli.get_element_key(project_dir, 'target.bst')
assert cas.contains(element, element_key)
# Manually setup the CAS remote
cas.setup_remotes(use_config=True)
cas.initialize_remotes()
assert cas.has_push_remotes(element=element)
# Retrieve the Directory object from the cached artifact
artifact_ref = cas.get_artifact_fullname(element, element_key)
artifact_digest = cas.resolve_ref(artifact_ref)
directory = remote_execution_pb2.Directory()
with open(cas.objpath(artifact_digest), 'rb') as f:
directory.ParseFromString(f.read())
# Build the Tree object while we are still cached
tree = remote_execution_pb2.Tree()
tree_maker(cas, tree, directory)
# Push the Tree as a regular message
tree_digest = cas.push_message(project, tree)
# Now delete the artifact locally
cli.remove_artifact_from_cache(project_dir, 'target.bst')
# Assert that we are not cached locally anymore
assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
# Pull the artifact using the Tree object
directory_digest = cas.pull_tree(project, tree_digest)
assert directory_digest == artifact_digest
# Ensure the entire Tree stucture has been pulled
assert os.path.exists(cas.objpath(directory_digest))
for child_directory in tree.children:
child_blob = child_directory.SerializeToString()
child_digest = remote_execution_pb2.Digest()
child_digest.hash = hashlib.sha256(child_blob).hexdigest()
child_digest.size_bytes = len(child_blob)
assert os.path.exists(cas.objpath(child_digest))
# Finally, close the opened gRPC channels properly!
for remote in cas._remotes[project]:
if remote.channel:
remote.channel.close()
import os
import pytest
from pluginbase import PluginBase
from buildstream._artifactcache.artifactcache import ArtifactCacheSpec
from buildstream._artifactcache.cascache import CASCache
from buildstream._context import Context
from buildstream._project import Project
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildstream.storage._casbaseddirectory import CasBasedDirectory
from tests.testutils import cli, create_artifact_share
# Project directory
DATA_DIR = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"project",
)
# Handle messages from the pipeline
def message_handler(message, context):
pass
@pytest.mark.datafiles(DATA_DIR)
def test_push(cli, tmpdir, datafiles):
project_dir = str(datafiles)
# First build the project without the artifact cache configured
result = cli.run(project=project_dir, args=['build', 'target.bst'])
result.assert_success()
# Assert that we are now cached locally
assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
# Set up an artifact cache.
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
# Fake minimal context
context = Context()
context.set_message_handler(message_handler)
context.sched_pushers = 1
context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
push=True)]
# Load the project and CAS cache
project = Project(project_dir, context)
project.ensure_fully_loaded()
cas = CASCache(context)
# Assert that the element's artifact is cached
element = project.load_elements(['target.bst'], cas)[0]
element_key = cli.get_element_key(project_dir, 'target.bst')
assert cas.contains(element, element_key)
# Manually setup the CAS remote
cas.setup_remotes(use_config=True)
cas.initialize_remotes()
assert cas.has_push_remotes(element=element)
# Push the element's artifact
pushed = cas.push(element, [element_key])
assert pushed is True
assert share.has_artifact('test', 'target.bst', element_key)
# Finally, close the opened gRPC channels properly!
for remote in cas._remotes[project]:
if remote.channel:
remote.channel.close()
@pytest.mark.datafiles(DATA_DIR)
def test_push_directory(cli, tmpdir, datafiles):
project_dir = str(datafiles)
# First build the project without the artifact cache configured
result = cli.run(project=project_dir, args=['build', 'target.bst'])
result.assert_success()
# Assert that we are now cached locally
assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
# Set up an artifact cache.
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
# Fake minimal context
context = Context()
context.set_message_handler(message_handler)
context.sched_pushers = 1
context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
push=True)]
# Load the project and CAS cache
project = Project(project_dir, context)
project.ensure_fully_loaded()
cas = CASCache(context)
# Assert that the element's artifact is cached
element = project.load_elements(['target.bst'], cas)[0]
element_key = cli.get_element_key(project_dir, 'target.bst')
assert cas.contains(element, element_key)
# Manually setup the CAS remote
cas.setup_remotes(use_config=True)
cas.initialize_remotes()
assert cas.has_push_remotes(element=element)
# Recreate the CasBasedDirectory object from the cached artifact
artifact_ref = cas.get_artifact_fullname(element, element_key)
artifact_digest = cas.resolve_ref(artifact_ref)
directory = CasBasedDirectory(context, ref=artifact_digest)
# Push the CasBasedDirectory object
directory_digest = cas.push_directory(project, directory)
assert directory_digest == artifact_digest
assert share.has_object(directory_digest)
# Finally, close the opened gRPC channels properly!
for remote in cas._remotes[project]:
if remote.channel:
remote.channel.close()
@pytest.mark.datafiles(DATA_DIR)
def test_push_message(cli, tmpdir, datafiles):
project_dir = str(datafiles)
# Set up an artifact cache.
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
# Fake minimal context
context = Context()
context.set_message_handler(message_handler)
context.sched_pushers = 1
context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
push=True)]
# Load the project and CAS cache
project = Project(project_dir, context)
project.ensure_fully_loaded()
cas = CASCache(context)
# Manually setup the CAS remote
cas.setup_remotes(use_config=True)
cas.initialize_remotes()
assert cas.has_push_remotes()
# Create an example message object
command = remote_execution_pb2.Command(arguments=['/usr/bin/gcc', '--help'],
working_directory='/buildstream-build',
output_directories=['/buildstream-install'])
# Push the message object
digest = cas.push_message(project, command)
assert digest
assert share.has_object(digest)
# Finally, close the opened gRPC channels properly!
for remote in cas._remotes[project]:
if remote.channel:
remote.channel.close()