Skip to content
Snippets Groups Projects
Commit 1f32483f authored by Martin Blanchard's avatar Martin Blanchard
Browse files

tests/artifactcache: Add pull unit-tests

parent e63dd931
No related branches found
No related tags found
No related merge requests found
Pipeline #29585811 failed
import hashlib
import os
import pytest
from buildstream._artifactcache.artifactcache import ArtifactCacheSpec
from buildstream._artifactcache.cascache import CASCache
from buildstream._context import Context
from buildstream._project import Project
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from tests.testutils import cli, create_artifact_share
# Project directory
DATA_DIR = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"project",
)
# Handle messages from the pipeline
def message_handler(message, context):
pass
def tree_maker(cas, tree, directory):
if tree.root.ByteSize() == 0:
tree.root.CopyFrom(directory)
for directory_node in directory.directories:
child_directory = tree.children.add()
with open(cas.objpath(directory_node.digest), 'rb') as f:
child_directory.ParseFromString(f.read())
tree_maker(cas, tree, child_directory)
@pytest.mark.datafiles(DATA_DIR)
def test_pull(cli, tmpdir, datafiles):
project_dir = str(datafiles)
# Set up an artifact cache.
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
# Configure artifact share
cli.configure({
'scheduler': {
'pushers': 1
},
'artifacts': {
'url': share.repo,
'push': True,
}
})
# First build the project with the artifact cache configured
result = cli.run(project=project_dir, args=['build', 'target.bst'])
result.assert_success()
# Assert that we are now cached locally
assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
# Assert that we shared/pushed the cached artifact
element_key = cli.get_element_key(project_dir, 'target.bst')
assert share.has_artifact('test', 'target.bst', element_key)
# Delete the artifact locally
cli.remove_artifact_from_cache(project_dir, 'target.bst')
# Assert that we are not cached locally anymore
assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
# Fake minimal context
context = Context()
context.set_message_handler(message_handler)
context.sched_pushers = 1
context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
push=True)]
# Load the project and CAS cache
project = Project(project_dir, context)
project.ensure_fully_loaded()
cas = CASCache(context)
# Assert that the element's artifact is **not** cached
element = project.load_elements(['target.bst'], cas)[0]
element_key = cli.get_element_key(project_dir, 'target.bst')
assert not cas.contains(element, element_key)
# Manually setup the CAS remote
cas.setup_remotes(use_config=True)
cas.initialize_remotes()
assert cas.has_push_remotes()
# Pull the artifact
pulled = cas.pull(element, element_key)
assert pulled is True
assert cas.contains(element, element_key)
# Finally, close the opened gRPC channels properly!
for remote in cas._remotes[project]:
if remote.channel:
remote.channel.close()
@pytest.mark.datafiles(DATA_DIR)
def test_pull_tree(cli, tmpdir, datafiles):
project_dir = str(datafiles)
# Set up an artifact cache.
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
# Configure artifact share
cli.configure({
'scheduler': {
'pushers': 1
},
'artifacts': {
'url': share.repo,
'push': True,
}
})
# First build the project with the artifact cache configured
result = cli.run(project=project_dir, args=['build', 'target.bst'])
result.assert_success()
# Assert that we are now cached locally
assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
# Assert that we shared/pushed the cached artifact
element_key = cli.get_element_key(project_dir, 'target.bst')
assert share.has_artifact('test', 'target.bst', element_key)
# Fake minimal context
context = Context()
context.set_message_handler(message_handler)
context.sched_pushers = 1
context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
push=True)]
# Load the project and CAS cache
project = Project(project_dir, context)
project.ensure_fully_loaded()
cas = CASCache(context)
# Assert that the element's artifact is cached
element = project.load_elements(['target.bst'], cas)[0]
element_key = cli.get_element_key(project_dir, 'target.bst')
assert cas.contains(element, element_key)
# Manually setup the CAS remote
cas.setup_remotes(use_config=True)
cas.initialize_remotes()
assert cas.has_push_remotes(element=element)
# Retrieve the Directory object from the cached artifact
artifact_ref = cas.get_artifact_fullname(element, element_key)
artifact_digest = cas.resolve_ref(artifact_ref)
directory = remote_execution_pb2.Directory()
with open(cas.objpath(artifact_digest), 'rb') as f:
directory.ParseFromString(f.read())
# Build the Tree object while we are still cached
tree = remote_execution_pb2.Tree()
tree_maker(cas, tree, directory)
# Push the Tree as a regular message
tree_digest = cas.push_message(project, tree)
# Now delete the artifact locally
cli.remove_artifact_from_cache(project_dir, 'target.bst')
# Assert that we are not cached locally anymore
assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
# Pull the artifact using the Tree object
directory_digest = cas.pull_tree(project, tree_digest)
assert directory_digest == artifact_digest
# Ensure the entire Tree stucture has been pulled
assert os.path.exists(cas.objpath(directory_digest))
for child_directory in tree.children:
child_blob = child_directory.SerializeToString()
child_digest = remote_execution_pb2.Digest()
child_digest.hash = hashlib.sha256(child_blob).hexdigest()
child_digest.size_bytes = len(child_blob)
assert os.path.exists(cas.objpath(child_digest))
# Finally, close the opened gRPC channels properly!
for remote in cas._remotes[project]:
if remote.channel:
remote.channel.close()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment