diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py index 2508e0b88e68e6ca4adb00501601a3fd21379606..a7208e8f287cff9ad445f9709ee9d6d51f433fb2 100644 --- a/buildstream/_artifactcache/artifactcache.py +++ b/buildstream/_artifactcache/artifactcache.py @@ -28,6 +28,7 @@ from ..exceptions import _ArtifactError from ..element import _KeyStrength from .._ostree import OSTreeError +from .pushreceive import check_push_connection from .pushreceive import push as push_artifact from .pushreceive import PushException @@ -67,6 +68,15 @@ class ArtifactCache(): self.__offline = False + def preflight(self): + if self.can_push() and not self.context.artifact_push.startswith("/"): + try: + check_push_connection(self.context.artifact_push, + self.context.artifact_push_port) + except PushException as e: + raise _ArtifactError("BuildStream will be unable to push artifacts " + "to the shared cache: {}".format(e)) + # contains(): # # Check whether the artifact for the specified Element is already available diff --git a/buildstream/_artifactcache/pushreceive.py b/buildstream/_artifactcache/pushreceive.py index 0706b5e41f07bb20c781ab7fd2402af9290b0a38..f47076c7672b9faf006cc8cf010f5a3f4ed04dee 100644 --- a/buildstream/_artifactcache/pushreceive.py +++ b/buildstream/_artifactcache/pushreceive.py @@ -29,6 +29,7 @@ import tempfile import shutil import tarfile import signal +import tempfile from enum import Enum from urllib.parse import urlparse @@ -135,6 +136,16 @@ class PushMessageWriter(object): self.file.write(msg) self.file.flush() + def send_hello(self): + # The 'hello' message is used to check connectivity, and is actually + # an empty info request in order to keep the receiver code simple. + args = { + 'mode': GLib.Variant('i', 0), + 'refs': GLib.Variant('a{ss}', {}) + } + command = PushCommand(PushCommandType.info, args) + self.write(command) + def send_info(self, repo, refs): cmdtype = PushCommandType.info mode = repo.get_mode() @@ -292,6 +303,48 @@ class PushMessageReader(object): return args +def parse_remote_location(remotepath, remote_port): + """Parse remote artifact cache URL that's been specified in our config.""" + remote_host = remote_user = remote_repo = None + + url = urlparse(remotepath) + if url.netloc: + if url.scheme != 'ssh': + raise PushException('Only URL scheme ssh is allowed, ' + 'not "%s"' % url.scheme) + remote_host = url.hostname + remote_user = url.username + remote_repo = url.path + remote_port = url.port + else: + # Scp/git style remote (user@hostname:path) + parts = remotepath.split('@', 1) + if len(parts) > 1: + remote_user = parts[0] + remainder = parts[1] + else: + remote_user = None + remainder = parts[0] + parts = remainder.split(':', 1) + if len(parts) != 2: + raise PushException('Remote repository "%s" does not ' + 'contain a hostname and path separated ' + 'by ":"' % remotepath) + remote_host, remote_repo = parts + + return remote_host, remote_user, remote_repo, remote_port + + +def ssh_commandline(remote_host, remote_user=None, remote_port=22): + ssh_cmd = ['ssh'] + if remote_user: + ssh_cmd += ['-l', remote_user] + if remote_port != 22: + ssh_cmd += ['-p', str(remote_port)] + ssh_cmd += [remote_host] + return ssh_cmd + + class OSTreePusher(object): def __init__(self, repopath, remotepath, remote_port, branches=[], verbose=False, debug=False, output=None): @@ -301,11 +354,8 @@ class OSTreePusher(object): self.debug = debug self.output = output - self.remote_host = None - self.remote_user = None - self.remote_repo = None - self.remote_port = remote_port - self._set_remote_args() + self.remote_host, self.remote_user, self.remote_repo, self.remote_port = \ + parse_remote_location(remotepath, remote_port) if self.repopath is None: self.repo = OSTree.Repo.new_default() @@ -323,18 +373,14 @@ class OSTreePusher(object): self.refs[branch] = rev # Start ssh - ssh_cmd = ['ssh'] - if self.remote_user: - ssh_cmd += ['-l', self.remote_user] - if self.remote_port != 22: - ssh_cmd += ['-p', str(self.remote_port)] + ssh_cmd = ssh_commandline(self.remote_host, self.remote_user, self.remote_port) - ssh_cmd += [self.remote_host, 'bst-artifact-receive'] + ssh_cmd += ['bst-artifact-receive'] if self.verbose: ssh_cmd += ['--verbose'] if self.debug: ssh_cmd += ['--debug'] - ssh_cmd += [self.remotepath] + ssh_cmd += [self.remote_repo] logging.info('Executing {}'.format(' '.join(ssh_cmd))) self.ssh = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE, @@ -345,32 +391,6 @@ class OSTreePusher(object): self.writer = PushMessageWriter(self.ssh.stdin) self.reader = PushMessageReader(self.ssh.stdout) - def _set_remote_args(self): - url = urlparse(self.remotepath) - if url.netloc: - if url.scheme != 'ssh': - raise PushException('Only URL scheme ssh is allowed, ' - 'not "%s"' % url.scheme) - self.remote_host = url.hostname - self.remote_user = url.username - self.remote_repo = url.path - self.remote_port = url.port - else: - # Scp/git style remote (user@hostname:path) - parts = self.remotepath.split('@', 1) - if len(parts) > 1: - self.remote_user = parts[0] - remainder = parts[1] - else: - self.remote_user = None - remainder = parts[0] - parts = remainder.split(':', 1) - if len(parts) != 2: - raise PushException('Remote repository "%s" does not ' - 'contain a hostname and path separated ' - 'by ":"' % self.remotepath) - self.remote_host, self.remotepath = parts - def needed_commits(self, remote, local, needed): parent = local if remote == '0' * 64: @@ -579,6 +599,41 @@ class OSTreeReceiver(object): return 0 +# check_push_connection() +# +# Test that we can connect to the remote bst-artifact-receive program. +# We don't want to make the user wait until the first artifact has been built +# to discover that they actually cannot push. +# +# Args: +# remote: The ssh remote url to push to +# remote_port: The ssh port at the remote url +# +# Raises: +# PushException if there was an issue connecting to the remote. +def check_push_connection(remote, remote_port): + remote_host, remote_user, remote_repo, remote_port = parse_remote_location(remote, remote_port) + ssh_cmd = ssh_commandline(remote_host, remote_user, remote_port) + + # We need a short timeout here because if 'remote' isn't reachable at + # all, the process will hang until the connection times out. + ssh_cmd += ['-oConnectTimeout=3'] + + ssh_cmd += ['bst-artifact-receive', remote_repo] + + ssh = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + writer = PushMessageWriter(ssh.stdin) + writer.send_hello() + writer.send_done() + + ssh.wait() + if ssh.returncode != 0: + error = ssh.stderr.read().decode('unicode-escape') + raise PushException(error) + + # push() # # Run the pusher in process, with logging going to the output file diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py index 960581a5c0ac3437958b9c0733461ea19ead436e..64ce4d6793685e6a157c35f6a25f3e0cde5496fe 100644 --- a/buildstream/_pipeline.py +++ b/buildstream/_pipeline.py @@ -19,6 +19,7 @@ # Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> # Jürg Billeter <juerg.billeter@codethink.co.uk> +import datetime import os import stat import shlex @@ -282,6 +283,21 @@ class Pipeline(): return element + # Internal: If a remote artifact cache is configured for pushing, check that it + # actually works. + def assert_remote_artifact_cache(self): + if self.artifacts.can_push(): + starttime = datetime.datetime.now() + self.message(self.target, MessageType.START, "Checking connectivity to remote artifact cache") + try: + self.artifacts.preflight() + except _ArtifactError as e: + self.message(self.target, MessageType.FAIL, str(e), + elapsed=datetime.datetime.now() - starttime) + raise PipelineError() + self.message(self.target, MessageType.SUCCESS, "Connectivity OK", + elapsed=datetime.datetime.now() - starttime) + ############################################################# # Commands # ############################################################# @@ -391,6 +407,8 @@ class Pipeline(): detail="\n".join([el + "-" + str(src) for el, src, _ in self.unused_workspaces])) + self.assert_remote_artifact_cache() + if build_all or track_first: plan = list(self.dependencies(Scope.ALL)) else: