Commit 5f094da3 authored by Qinusty's avatar Qinusty Committed by Qinusty
Browse files

Overhaul internal messaging API

_context.py: Added helpers for the majority of MessageTypes,
start, failure and success have been avoided to avoid temptation to use
these throughout the code base. These should be called primarily from
timed_activity(). report_unhandled_exceptions() has been added as a
context manager to reduce the amount of places BUG messages are
generated from.

plugin.py: Added skipped() to the API.

job.py, elementjob.py: Add helper functions to populate task_id kwarg. _fail() exists
purely to be overridden by ElementJob as it requires the scheduler kwarg
to be set to True.

Other: Switched to use _context.py message helpers throughout the code
base.
parent 565d3aab
Loading
Loading
Loading
Loading
+0 −1
Original line number Diff line number Diff line
@@ -23,7 +23,6 @@ from collections import Mapping, namedtuple

from ..element_enums import _KeyStrength
from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
from .._message import Message, MessageType
from .. import utils
from .. import _yaml

+2 −5
Original line number Diff line number Diff line
@@ -34,7 +34,6 @@ from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc

from .._message import MessageType, Message
from .. import _signals, utils
from .._exceptions import ArtifactError

@@ -352,12 +351,10 @@ class CASCache(ArtifactCache):
                    raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e

            if skipped_remote:
                self.context.message(Message(
                    None,
                    MessageType.SKIPPED,
                self.context.skipped(
                    "Remote ({}) already has {} cached".format(
                        remote.spec.url, element._get_brief_display_key())
                ))
                )
        return pushed

    ################################################
+64 −30
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@

import os
import datetime
import traceback
from collections import deque, Mapping
from contextlib import contextmanager
from . import utils
@@ -318,7 +319,7 @@ class Context():
    # the context.
    #
    # The message handler should have the same signature as
    # the message() method
    # the _send_message() method
    def set_message_handler(self, handler):
        self._message_handler = handler

@@ -333,9 +334,9 @@ class Context():
                return True
        return False

    # message():
    # _send_message():
    #
    # Proxies a message back to the caller, this is the central
    # Proxies a message back through the message handler, this is the central
    # point through which all messages pass.
    #
    # Args:
@@ -363,6 +364,8 @@ class Context():
    # final message.
    #
    def _message(self, text, *, plugin=None, msg_type=None, **kwargs):
        assert msg_type is not None

        if isinstance(plugin, Plugin):
            plugin_id = plugin._get_unique_id()
        else:
@@ -370,35 +373,55 @@ class Context():

        self._send_message(Message(plugin_id, msg_type, str(text), **kwargs))

    def start(self, text, *, plugin=None, **kwargs):
        self._message(text, plugin=plugin, msg_type=MessageType.START, **kwargs)

    def success(self, text, *, plugin=None, **kwargs):
        self._message(text, plugin=plugin, msg_type=MessageType.SUCCESS, **kwargs)

    def failure(self, text, *, plugin=None, **kwargs):
        self._message(text, plugin=plugin, msg_type=MessageType.FAIL, **kwargs)

    def debug(self, text, *, plugin=None, **kwargs):
        self._message(text, plugin=plugin, msg_type=MessageType.DEBUG, **kwargs)
    # skipped():
    #
    # Produce and send a skipped message through the context.
    #
    def skipped(self, text, **kwargs):
        self._message(text, msg_type=MessageType.SKIPPED, **kwargs)

    def status(self, text, *, plugin=None, **kwargs):
        self._message(text, plugin=plugin, msg_type=MessageType.STATUS, **kwargs)
    # debug():
    #
    # Produce and send a debug message through the context.
    #
    def debug(self, text, **kwargs):
        if self.log_debug:
            self._message(text, msg_type=MessageType.DEBUG, **kwargs)

    def info(self, text, *, plugin=None, **kwargs):
        self._message(text, plugin=plugin, msg_type=MessageType.INFO, **kwargs)
    # status():
    #
    # Produce and send a status message through the context.
    #
    def status(self, text, **kwargs):
        self._message(text, msg_type=MessageType.STATUS, **kwargs)

    def warn(self, text, *, plugin=None, **kwargs):
        self._message(text, plugin=plugin, msg_type=MessageType.WARN, **kwargs)
    # info():
    #
    # Produce and send a info message through the context.
    #
    def info(self, text, **kwargs):
        self._message(text, msg_type=MessageType.INFO, **kwargs)

    def error(self, text, *, plugin=None, **kwargs):
        self._message(text, plugin=plugin, msg_type=MessageType.ERROR, **kwargs)
    # warn():
    #
    # Produce and send a warning message through the context.
    #
    def warn(self, text, **kwargs):
        self._message(text, msg_type=MessageType.WARN, **kwargs)

    def bug(self, text, *, plugin=None, **kwargs):
        self._message(text, plugin=plugin, msg_type=MessageType.BUG, **kwargs)
    # error():
    #
    # Produce and send a error message through the context.
    #
    def error(self, text, **kwargs):
        self._message(text, msg_type=MessageType.ERROR, **kwargs)

    def log(self, text, *, plugin=None, **kwargs):
        self._message(text, plugin=plugin, msg_type=MessageType.LOG, **kwargs)
    # log():
    #
    # Produce and send a log message through the context.
    #
    def log(self, text, **kwargs):
        self._message(text, msg_type=MessageType.LOG, **kwargs)

    # silence()
    #
@@ -415,6 +438,14 @@ class Context():
        finally:
            self._pop_message_depth()

    @contextmanager
    def report_unhandled_exceptions(self, brief="An unhandled exception occured", *, unique_id=None, **kwargs):
        try:
            yield
        except Exception:  # pylint: disable=broad-except
            self._message(brief, plugin=unique_id, detail=traceback.format_exc(),
                          msg_type=MessageType.BUG, **kwargs)

    # timed_activity()
    #
    # Context manager for performing timed activities and logging those
@@ -444,7 +475,8 @@ class Context():
        with _signals.suspendable(stop_time, resume_time):
            try:
                # Push activity depth for status messages
                self.start(activity_name, detail=detail, plugin=unique_id)
                self._message(activity_name, detail=detail, plugin=unique_id,
                              msg_type=MessageType.START)
                self._push_message_depth(silent_nested)
                yield

@@ -453,13 +485,15 @@ class Context():
                # expects an error when there is an error.
                elapsed = datetime.datetime.now() - starttime
                self._pop_message_depth()
                self.failure(activity_name, detail=detail, elapsed=elapsed, plugin=unique_id)
                self._message(activity_name, detail=detail, elapsed=elapsed, plugin=unique_id,
                              msg_type=MessageType.FAIL)
                raise

            elapsed = datetime.datetime.now() - starttime
            self._pop_message_depth()
            self.success(activity_name, detail=detail,
                         elapsed=elapsed, plugin=unique_id)
            self._message(activity_name, detail=detail,
                          elapsed=elapsed, plugin=unique_id,
                          msg_type=MessageType.SUCCESS)

    # recorded_messages()
    #
+5 −3
Original line number Diff line number Diff line
@@ -255,7 +255,7 @@ class App():

        # Mark the beginning of the session
        if session_name:
            self.context.start(session_name)
            self.context._message(session_name, msg_type=MessageType.START)

        # Run the body of the session here, once everything is loaded
        try:
@@ -269,7 +269,7 @@ class App():
                if isinstance(e, StreamError) and e.terminated:  # pylint: disable=no-member
                    self.context.warn(session_name + ' Terminated', elapsed=elapsed)
                else:
                    self.context.failure(session_name, elapsed=elapsed)
                    self.context._message(session_name, elapsed=elapsed, msg_type=MessageType.FAIL)

                    # Notify session failure
                    self._notify("{} failed".format(session_name), "{}".format(e))
@@ -287,7 +287,7 @@ class App():
        else:
            # No exceptions occurred, print session time and summary
            if session_name:
                self.context.success(session_name, elapsed=self.stream.elapsed_time)
                self.context._message(session_name, elapsed=self.stream.elapsed_time, msg_type=MessageType.SUCCESS)
                if self._started:
                    self._print_summary()

@@ -439,6 +439,8 @@ class App():

        # Print the regular BUG message
        formatted = "".join(traceback.format_exception(etype, value, tb))
        self.context._message(str(value), detail=formatted, msg_type=MessageType.BUG)

        # If the scheduler has started, try to terminate all jobs gracefully,
        # otherwise exit immediately.
        if self.stream.running:
+0 −1
Original line number Diff line number Diff line
@@ -24,7 +24,6 @@ import itertools
from operator import itemgetter

from ._exceptions import PipelineError
from ._message import Message, MessageType
from ._profile import Topics, profile_start, profile_end
from . import Scope, Consistency
from ._project import ProjectRefStorage
Loading