Commit d835c37f authored by Tristan Van Berkom's avatar Tristan Van Berkom

Refactor: Use new logging mechanism from Context

  o element.py / plugin.py: Removed supporting logging code, and derive
                            the log handle from Context.

  o _scheduler/scheduler.py, _scheduler/queues/queue.py: Adapt to new Job initializer API for the logfile

  o _scheduler/jobs/job.py: Run job activities within the new context manager
                            which turns on logging for a job. Also removed a lot
			    of custom logging abstract methods which are unneeded.

  o _scheduler/jobs: Job implementations need not implement so much custom logging.
parent a950a985
Pipeline #25998268 canceled with stages
in 4 minutes and 6 seconds
......@@ -16,12 +16,8 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
import os
from contextlib import contextmanager
from .job import Job
from ..._platform import Platform
from ..._message import Message, MessageType
class CacheSizeJob(Job):
......@@ -38,45 +34,5 @@ class CacheSizeJob(Job):
if self._complete_cb:
self._complete_cb(result)
@contextmanager
def child_logging_enabled(self, logfile):
self._logfile = logfile.format(pid=os.getpid())
yield self._logfile
self._logfile = None
def message(self, message_type, message, **kwargs):
args = dict(kwargs)
args['scheduler'] = True
self._scheduler.context.message(Message(None, message_type, message, **args))
def child_log(self, message):
with open(self._logfile, 'a+') as log:
INDENT = " "
EMPTYTIME = "--:--:--"
template = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
detail = ''
if message.detail is not None:
template += "\n\n{detail}"
detail = message.detail.rstrip('\n')
detail = INDENT + INDENT.join(detail.splitlines(True))
timecode = EMPTYTIME
if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
minutes, seconds = divmod(remainder, 60)
timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
message_text = template.format(timecode=timecode,
type=message.message_type.upper(),
name='cache_size',
message=message.message,
detail=detail)
log.write('{}\n'.format(message_text))
log.flush()
return message
def child_process_data(self):
return {}
......@@ -16,12 +16,8 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
import os
from contextlib import contextmanager
from .job import Job
from ..._platform import Platform
from ..._message import Message
class CleanupJob(Job):
......@@ -38,26 +34,5 @@ class CleanupJob(Job):
if self._complete_cb:
self._complete_cb()
@contextmanager
def child_logging_enabled(self, logfile):
self._logfile = logfile.format(pid=os.getpid())
yield self._logfile
self._logfile = None
def message(self, message_type, message, **kwargs):
args = dict(kwargs)
args['scheduler'] = True
self._scheduler.context.message(Message(None, message_type, message, **args))
def child_log(self, message):
message.action_name = self.action_name
with open(self._logfile, 'a+') as log:
message_text = self.decorate_message(message, '[cleanup]')
log.write('{}\n'.format(message_text))
log.flush()
return message
def child_process_data(self):
return {}
......@@ -16,14 +16,9 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
import os
from contextlib import contextmanager
from ruamel import yaml
from ..._message import Message, MessageType
from ...plugin import _plugin_lookup
from ... import _signals
from .job import Job
......@@ -77,44 +72,30 @@ class ElementJob(Job):
self._action_cb = action_cb # The action callable function
self._complete_cb = complete_cb # The complete callable function
# Set the task wide ID for logging purposes
self.set_task_id(element._get_unique_id())
@property
def element(self):
return self._element
def child_process(self):
# Print the element's environment at the beginning of any element's log file.
#
# This should probably be omitted for non-build tasks but it's harmless here
elt_env = self._element.get_environment()
env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
self.message(MessageType.LOG,
"Build environment for element {}".format(self._element.name),
detail=env_dump)
# Run the action
return self._action_cb(self._element)
def parent_complete(self, success, result):
self._complete_cb(self, self._element, success, self._result)
@contextmanager
def child_logging_enabled(self, logfile):
self._logfile = logfile.format(pid=os.getpid())
with open(self._logfile, 'a') as log:
# Write one last line to the log and flush it to disk
def flush_log():
# If the process currently had something happening in the I/O stack
# then trying to reenter the I/O stack will fire a runtime error.
#
# So just try to flush as well as we can at SIGTERM time
try:
# FIXME: Better logging
log.write('\n\nAction {} for element {} forcefully terminated\n'
.format(self.action_name, self._element.name))
log.flush()
except RuntimeError:
os.fsync(log.fileno())
self._element._set_log_handle(log)
with _signals.terminator(flush_log):
self._print_start_message(self._element, self._logfile)
yield self._logfile
self._element._set_log_handle(None)
self._logfile = None
def message(self, message_type, message, **kwargs):
args = dict(kwargs)
args['scheduler'] = True
......@@ -124,34 +105,6 @@ class ElementJob(Job):
message,
**args))
def _print_start_message(self, element, logfile):
self.message(MessageType.START, self.action_name, logfile=logfile)
# Print the element's environment at the beginning of any element's log file.
#
# This should probably be omitted for non-build tasks but it's harmless here
elt_env = element.get_environment()
env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
self.message(MessageType.LOG,
"Build environment for element {}".format(element.name),
detail=env_dump, logfile=logfile)
def child_log(self, message):
# Tag them on the way out the door...
message.action_name = self.action_name
message.task_id = self._element._get_unique_id()
# Use the plugin for the task for the output, not a plugin
# which might be acting on behalf of the task
plugin = _plugin_lookup(message.task_id)
with plugin._output_file() as output:
message_text = self.decorate_message(message, '[{}]'.format(plugin.name))
output.write('{}\n'.format(message_text))
output.flush()
return message
def child_process_data(self):
data = {}
......
......@@ -27,13 +27,12 @@ import datetime
import traceback
import asyncio
import multiprocessing
from contextlib import contextmanager
import psutil
# BuildStream toplevel imports
from ..._exceptions import ImplError, BstError, set_last_task_error
from ..._message import MessageType, unconditional_messages
from ..._message import Message, MessageType, unconditional_messages
from ... import _signals, utils
......@@ -113,6 +112,7 @@ class Job():
self._result = None # Return value of child action in the parent
self._tries = 0 # Try count, for retryable jobs
self._logfile = logfile
self._task_id = None
# spawn()
#
......@@ -247,6 +247,24 @@ class Job():
os.kill(self._process.pid, signal.SIGCONT)
self._suspended = False
# set_task_id()
#
# This is called by Job subclasses to set a plugin ID
# associated with the task at large (if any element is related
# to the task).
#
# The task ID helps keep messages in the frontend coherent
# in the case that multiple plugins log in the context of
# a single task (e.g. running integration commands should appear
# in the frontend for the element being built, not the element
# running the integration commands).
#
# Args:
# (int): The plugin identifier for this task
#
def set_task_id(self, task_id):
self._task_id = task_id
#######################################################
# Abstract Methods #
#######################################################
......@@ -277,28 +295,10 @@ class Job():
raise ImplError("Job '{kind}' does not implement child_process()"
.format(kind=type(self).__name__))
# child_logging_enabled()
#
# Start the log for this job. This function will be given a
# template string for the path to a log file - this will contain
# "{pid}", which should be replaced with the current process'
# PID. (i.e., call something like `logfile.format(pid=os.getpid())`).
#
# Args:
# logfile (str): A template string that points to the logfile
# that should be used - replace {pid} first.
#
# Yields:
# (str) The path to the logfile with {pid} replaced.
#
@contextmanager
def child_logging_enabled(self, logfile):
raise ImplError("Job '{kind}' does not implement child_logging_enabled()"
.format(kind=type(self).__name__))
# message():
#
# Sends a message to the frontend
# Logs a message, this will be logged in the task's logfile and
# conditionally also be sent to the frontend.
#
# Args:
# message_type (MessageType): The type of message to send
......@@ -306,8 +306,9 @@ class Job():
# kwargs: Remaining Message() constructor arguments
#
def message(self, message_type, message, **kwargs):
raise ImplError("Job '{kind}' does not implement message()"
.format(kind=type(self).__name__))
args = dict(kwargs)
args['scheduler'] = True
self._scheduler.context.message(Message(None, message_type, message, **args))
# child_process_data()
#
......@@ -323,24 +324,6 @@ class Job():
def child_process_data(self):
return {}
# child_log()
#
# Log a message returned by the frontend's main message handler
# and return it to the main process.
#
# This method is also expected to add process-specific information
# to the message (notably, action_name and task_id).
#
# Arguments:
# message (str): The message to log
#
# Returns:
# message (Message): A message object
#
def child_log(self, message):
raise ImplError("Job '{kind}' does not implement child_log()"
.format(kind=type(self).__name__))
#######################################################
# Local Private Methods #
#######################################################
......@@ -353,42 +336,6 @@ class Job():
#
#######################################################
# decorate_message()
#
# Format a message so that it can be used nicely for logging
# purposes. This will prepend a time code and add other
# information to help determine what happened.
#
# Args:
# message (Message) - The message to create a text from.
# name (str) - A name for the executing context.
#
# Returns:
# (str) The text to log.
#
def decorate_message(self, message, name):
INDENT = " "
EMPTYTIME = "--:--:--"
template = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
detail = ''
if message.detail is not None:
template += "\n\n{detail}"
detail = message.detail.rstrip('\n')
detail = INDENT + INDENT.join(detail.splitlines(True))
timecode = EMPTYTIME
if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
minutes, seconds = divmod(remainder, 60)
timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
return template.format(timecode=timecode,
type=message.message_type.upper(),
name=name,
message=message.message,
detail=detail)
# _child_action()
#
# Perform the action in the child process, this calls the action_cb.
......@@ -398,8 +345,6 @@ class Job():
#
def _child_action(self, queue):
logfile = self._logfile
# This avoids some SIGTSTP signals from grandchildren
# getting propagated up to the master process
os.setsid()
......@@ -434,7 +379,9 @@ class Job():
# Time, log and and run the action function
#
with _signals.suspendable(stop_time, resume_time), \
self.child_logging_enabled(logfile) as filename:
self._scheduler.context.recorded_messages(self._logfile) as filename:
self.message(MessageType.START, self.action_name, logfile=filename)
try:
# Try the task action
......@@ -545,8 +492,8 @@ class Job():
#
def _child_message_handler(self, message, context):
# Log first
message = self.child_log(message)
message.action_name = self.action_name
message.task_id = self._task_id
if message.message_type == MessageType.FAIL and self._tries <= self._max_retries:
# Job will be retried, display failures as warnings in the frontend
......
......@@ -353,13 +353,8 @@ class Queue():
def _element_log_path(self, element):
project = element._get_project()
context = element._get_context()
key = element._get_display_key()[1]
action = self.action_name.lower()
logfile = "{key}-{action}.{{pid}}.log".format(key=key, action=action)
directory = os.path.join(context.logdir, project.name, element.normal_name)
logfile = "{key}-{action}".format(key=key, action=action)
os.makedirs(directory, exist_ok=True)
return os.path.join(directory, logfile)
return os.path.join(project.name, element.normal_name, logfile)
......@@ -317,8 +317,7 @@ class Scheduler():
if cache_size and cache_size < self.context.cache_quota:
return
logpath = os.path.join(self.context.logdir, 'cleanup.{pid}.log')
job = CleanupJob(self, 'cleanup', logpath,
job = CleanupJob(self, 'cleanup', 'cleanup',
resources=[ResourceType.CACHE,
ResourceType.PROCESS],
exclusive_resources=[ResourceType.CACHE],
......@@ -326,8 +325,7 @@ class Scheduler():
self.schedule_jobs([job])
def _check_cache_size_real(self):
logpath = os.path.join(self.context.logdir, 'cache_size.{pid}.log')
job = CacheSizeJob(self, 'cache_size', logpath,
job = CacheSizeJob(self, 'cache_size', 'cache_size',
resources=[ResourceType.CACHE,
ResourceType.PROCESS],
exclusive_resources=[ResourceType.CACHE],
......
......@@ -219,7 +219,6 @@ class Element(Plugin):
self.__tracking_scheduled = False # Sources are scheduled to be tracked
self.__tracking_done = False # Sources have been tracked
self.__pull_done = False # Whether pull was attempted
self.__log_path = None # Path to dedicated log file or None
self.__splits = None # Resolved regex objects for computing split domains
self.__whitelist_regex = None # Resolved regex object to check if file is allowed to overlap
self.__staged_sources_directory = None # Location where Element.stage_sources() was called
......@@ -1501,8 +1500,9 @@ class Element(Plugin):
utils.link_files(collectdir, filesdir)
# Copy build log
if self.__log_path:
shutil.copyfile(self.__log_path, os.path.join(logsdir, 'build.log'))
log_filename = context.get_log_filename()
if log_filename:
shutil.copyfile(log_filename, os.path.join(logsdir, 'build.log'))
# Store public data
_yaml.dump(_yaml.node_sanitize(self.__dynamic_public), os.path.join(metadir, 'public.yaml'))
......@@ -1837,47 +1837,6 @@ class Element(Plugin):
def _subst_string(self, value):
return self.__variables.subst(value)
# Run some element methods with logging directed to
# a dedicated log file, here we yield the filename
# we decided on for logging
#
@contextmanager
def _logging_enabled(self, action_name):
self.__log_path = self.__logfile(action_name)
with open(self.__log_path, 'a') as logfile:
# Write one last line to the log and flush it to disk
def flush_log():
# If the process currently had something happening in the I/O stack
# then trying to reenter the I/O stack will fire a runtime error.
#
# So just try to flush as well as we can at SIGTERM time
try:
logfile.write('\n\nAction {} for element {} forcefully terminated\n'
.format(action_name, self.name))
logfile.flush()
except RuntimeError:
os.fsync(logfile.fileno())
self._set_log_handle(logfile)
with _signals.terminator(flush_log):
yield self.__log_path
self._set_log_handle(None)
self.__log_path = None
# Override plugin _set_log_handle(), set it for our sources and dependencies too
#
# A log handle is set once in the context of a child task which will have only
# one log, so it's not harmful to modify the state of dependencies
def _set_log_handle(self, logfile, recurse=True):
super()._set_log_handle(logfile)
for source in self.sources():
source._set_log_handle(logfile)
if recurse:
for dep in self.dependencies(Scope.ALL):
dep._set_log_handle(logfile, False)
# Returns the element whose sources this element is ultimately derived from.
#
# This is intended for being used to redirect commands that operate on an
......@@ -2015,43 +1974,6 @@ class Element(Plugin):
if workspace:
workspace.prepared = True
# __logfile()
#
# Compose the log file for this action & pid.
#
# Args:
# action_name (str): The action name
# pid (int): Optional pid, current pid is assumed if not provided.
#
# Returns:
# (string): The log file full path
#
# Log file format, when there is a cache key, is:
#
# '{logdir}/{project}/{element}/{cachekey}-{action}.{pid}.log'
#
# Otherwise, it is:
#
# '{logdir}/{project}/{element}/{:0<64}-{action}.{pid}.log'
#
# This matches the order in which things are stored in the artifact cache
#
def __logfile(self, action_name, pid=None):
project = self._get_project()
context = self._get_context()
key = self.__get_brief_display_key()
if pid is None:
pid = os.getpid()
action = action_name.lower()
logfile = "{key}-{action}.{pid}.log".format(
key=key, action=action, pid=pid)
directory = os.path.join(context.logdir, project.name, self.normal_name)
os.makedirs(directory, exist_ok=True)
return os.path.join(directory, logfile)
# __assert_cached()
#
# Raises an error if the artifact is not cached.
......
......@@ -162,7 +162,6 @@ class Plugin():
self.__provenance = provenance # The Provenance information
self.__type_tag = type_tag # The type of plugin (element or source)
self.__unique_id = _plugin_register(self) # Unique ID
self.__log = None # The log handle when running a task
# Infer the kind identifier
modulename = type(self).__module__
......@@ -474,6 +473,7 @@ class Plugin():
self.call(... command which takes time ...)
"""
with self.__context.timed_activity(activity_name,
unique_id=self.__unique_id,
detail=detail,
silent_nested=silent_nested):
yield
......@@ -589,27 +589,18 @@ class Plugin():
def _get_provenance(self):
return self.__provenance
# Accessor for logging handle
#
def _get_log_handle(self, log):
return self.__log
# Mutator for logging handle
#
def _set_log_handle(self, log):
self.__log = log
# Context manager for getting the open file handle to this
# plugin's log. Used in the child context to add stuff to
# a log.
#
@contextmanager
def _output_file(self):
if not self.__log:
log = self.__context.get_log_handle()
if log is None:
with open(os.devnull, "w") as output:
yield output
else:
yield self.__log
yield log
# _preflight():
# Calls preflight() for the plugin, and allows generic preflight
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment