Commit 944d49d2 authored by segfault's avatar segfault

Implement systemd status monitoring

The only D-Bus implementation which supports communicating with buses in
containers is systemd's sd-bus, which unfortunately doesn't have Python
bindings (yet). So this uses the `busctl` tool here to monitor the
systemd services in the containers, and parse its output. The parser is
very limited and probably error prone. We might want to replace this
when there are Python bindings for sd-bus.

There is currently no way to recover if a service stopped unexpectedly,
except by restarting onionkit.
parent b57c810d
......@@ -38,3 +38,7 @@ class OptionNotFoundError(Exception):
class FileNotEmptyError(Exception):
pass
class MonitorError(Exception):
pass
......@@ -23,6 +23,7 @@ from onionkit.options.allow_lan import AllowLAN
from onionkit.exceptions import ServiceNotInstalledError
from onionkit.exceptions import ServiceAlreadyStartedError
from onionkit.exceptions import MonitorError
from onionkit.systemd import SystemdManager
from onionkit.tor import TorManager
......@@ -109,12 +110,20 @@ class OnionService(DBusObject, metaclass=abc.ABCMeta):
self._address = self.private_key.derive_onion_address() if self.IsInstalled else str()
self.tor = TorManager(self.update_transaction_progress)
self.systemd = SystemdManager(self.Name)
self.systemd = SystemdManager(self.Name, self.systemd_service, self.update_status)
self.container = ContainerManager(self.Name, self.state_dir, self.update_transaction_status,
self.update_transaction_progress)
self.set_initial_status()
try:
if self.container.is_running():
self.systemd.start_monitoring()
except MonitorError as e:
# Don't abort initialization if monitoring fails, but set an error status
self.Status = Status.ERROR
logger.exception(e)
if self.IsInstalled:
self.initialize_options()
......@@ -198,8 +207,10 @@ class OnionService(DBusObject, metaclass=abc.ABCMeta):
self.TransactionStatus = _("Bind-mounting config files")
self.mount()
self.systemd.start_monitoring()
if not self.IsRunning:
self.systemd.restart(self.systemd_service)
self.systemd.restart()
self.on_systemd_service_started()
......@@ -211,7 +222,7 @@ class OnionService(DBusObject, metaclass=abc.ABCMeta):
except Exception:
logger.warning("Handling error in service.Start")
if self.IsRunning:
self.systemd.stop(self.systemd_service)
self.systemd.stop()
if self.IsPublished:
self.tor.stop_hidden_service(self.Address)
self.Status = Status.ERROR
......@@ -222,8 +233,10 @@ class OnionService(DBusObject, metaclass=abc.ABCMeta):
logger.info("Stopping service %r", self.Name)
self.Status = Status.STOPPING
if self.IsRunning:
self.systemd.stop(self.systemd_service)
self.systemd.stop_monitoring()
if self.container.is_running():
self.container.stop()
if self.IsPublished:
self.tor.stop_hidden_service(self.Address)
......@@ -258,14 +271,22 @@ class OnionService(DBusObject, metaclass=abc.ABCMeta):
@Status.setter
def Status(self, value: str):
if self._status == value:
return
logger.info("Service %r new Status: %r", self.Name, value)
changed_properties = dict()
if self._status != value:
self._status = changed_properties["Status"] = value
# Unset the transaction status, because the transaction must be finished if we have a new status
if self._transaction_status:
self._transaction_status = changed_properties["TransactionStatus"] = ""
if changed_properties:
self.emit_signal("org.freedesktop.DBus.Properties", "PropertiesChanged", changed_properties, "a{ss}")
# Don't accidentally overwrite error status
if self._status == Status.ERROR and value != Status.UNINSTALLING:
logger.warning("Not setting new status %r, because the current status is ERROR")
return
self._status = changed_properties["Status"] = value
self.emit_signal("org.freedesktop.DBus.Properties", "PropertiesChanged", changed_properties, "a{ss}")
@property
def TransactionStatus(self) -> str:
......@@ -307,7 +328,7 @@ class OnionService(DBusObject, metaclass=abc.ABCMeta):
@property
def IsRunning(self) -> bool:
return self.container.is_running() and self.systemd.is_running(self.systemd_service)
return self.container.is_running() and self.systemd.is_running()
@property
def IsPublished(self) -> bool:
......@@ -430,6 +451,9 @@ class OnionService(DBusObject, metaclass=abc.ABCMeta):
# ----- Not exported functions ----- #
def update_status(self, status):
self.Status = status
def update_transaction_status(self, status):
self.TransactionStatus = status
......@@ -466,7 +490,7 @@ class OnionService(DBusObject, metaclass=abc.ABCMeta):
def restart_systemd_service(self):
self.Status = Status.RESTARTING
self.systemd.restart(self.systemd_service)
self.systemd.restart()
self.Status = Status.RUNNING
def initialize_options(self):
......
from logging import getLogger
import sh
import subprocess
from threading import Thread
from onionkit.exceptions import MonitorError
from onionkit.service_status import Status
logger = getLogger()
STOP_WATCHER_TIMEOUT = 1
class SystemdManager(object):
"""Helper to perform various actions on systemd services via DBus calls to systemd.
Reference for systemd DBus API: https://www.freedesktop.org/wiki/Software/systemd/dbus/"""
def __init__(self, service_name):
self.name = "onionkit-" + service_name
def __init__(self, name: str, service: str, status_callback: callable):
self.container = "onionkit-" + name
self.service = service
self.status_callback = status_callback
# Name for identifying the service, only used in log messages
self.id = '%s/%s' % (self.container, self.service)
self.monitor = None # type: subprocess.Popen
self.parser = None # type: Thread
def restart(self, service_name: str):
sh.systemctl("-M", self.name, "restart", service_name)
def restart(self):
sh.systemctl("-M", self.container, "restart", self.service)
def stop(self, service_name: str):
sh.systemctl("-M", self.name, "stop", service_name)
def stop(self):
sh.systemctl("-M", self.container, "stop", self.service)
def is_running(self, service_name: str) -> bool:
def is_running(self) -> bool:
try:
sh.machinectl("show", self.name)
sh.machinectl("show", self.container)
except sh.ErrorReturnCode_1:
# Container is not running
return False
out = sh.systemctl("-M", self.name, "show", "-p", "ActiveState", "-p", "SubState", service_name)
lines = out.split()
active_state = lines[0].split('=')[-1]
sub_state = lines[1].split('=')[-1]
logger.debug("Service %r status: ActiveState: %r, SubState: %r", service_name, active_state, sub_state)
return sub_state == "active" and sub_state == "running"
active_state, sub_state = self._get_state()
return active_state == "active" and sub_state == "running"
def daemon_reload(self):
logger.debug("Reloading systemd manager configuration")
sh.systemctl("-M", self.name, "daemon-reload")
sh.systemctl("-M", self.container, "daemon-reload")
def start_monitoring(self):
logger.debug("Start monitoring %s", self.id)
if self.monitor and self.monitor.poll() is not None:
logger.warning("Monitor %s is already running. Trying to stop it first.", self.id)
self.stop_monitoring()
expression = "type='signal',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged'"
command = ["busctl", "-M", self.container, "--match", expression, "monitor", self.service]
logger.debug("Starting watcher with command '%s'", ' '.join(command))
self.monitor = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
bufsize=1)
self.parser = Thread(target=self._parse_monitor_output, daemon=True)
self.parser.start()
def stop_monitoring(self):
logger.debug("Stopping monitor %s", self.id)
if not self.monitor:
logger.warning("Cannot stop monitor %s: Monitor was not started", self.id)
return
if self.monitor.poll() is not None:
logger.warning("Cannot stop monitor %s: Monitor died with exit code %s",
self.id, self.monitor.poll())
return
self.monitor.kill()
try:
self.monitor.wait(STOP_WATCHER_TIMEOUT)
except subprocess.TimeoutExpired:
raise MonitorError("Failed to stop monitor %s (timeout: %s)" % (self.id, STOP_WATCHER_TIMEOUT))
def _parse_monitor_output(self):
while True:
exit_code = self.monitor.poll()
if exit_code == 0:
# The monitor exited without error, hopefully because we stopped it, so we exit too
return
if exit_code:
# The monitor exited with an error exit code
logger.error("Monitor %s died with exit code %s. stderr: %s",
self.id, self.monitor.poll(), self.monitor.stderr.read())
self.status_callback(Status.ERROR)
return
# This blocks until there is a new line on stdout
line = self.monitor.stdout.readline()
print("%r" % line)
if line.startswith('‣'):
message = self._read_message(line)
if 'STRING "ActiveState"' in message or 'STRING "SubState"' in message:
self._update_status()
else:
logger.warning("Unexpected output from monitor %s: %r", self.id, line)
def _read_message(self, first_line: str) -> str:
message = str()
line = first_line
while line != '\n':
message += line
line = self.monitor.stdout.readline()
print("%r" % line)
return message
def _update_status(self):
active_state, sub_state = self._get_state()
if active_state in ("inactive", "failed") or sub_state == "exited":
# Get systemctl status output for debugging
output = None
try:
output = sh.systemctl("--no-pager", "-M", self.container, "status", self.service, _ok_code=[0, 3])
except sh.ErrorReturnCode:
# `systemctl status` currently returns exit code 3 if the unit is stopped,
# but we shouldn't rely on that, so we ignore other error exit codes too
pass
logger.error("Service '%s' stopped. systemctl status: %s", self.service, output)
self.status_callback(Status.ERROR)
def _get_state(self) -> (str, str):
output = sh.systemctl("--no-pager", "-M", self.container, "show", "-p", "ActiveState,SubState", self.service)
logger.debug("Output of `systemctl -M %s show -p ActiveState -p SubState %s`: %s",
self.container, self.service, output)
lines = output.split()
active_state = lines[0].split('=')[-1]
sub_state = lines[1].split('=')[-1]
logger.debug("Service %r state: ActiveState: %r, SubState: %r", self.service, active_state, sub_state)
return active_state, sub_state
......@@ -20,6 +20,7 @@ import os
import sh
CONTAINER_ADDITIONAL_PACKAGES = ["dbus"]
CONTAINER_DEBIAN_RELEASE = "buster"
here = Path(__file__).parents[0]
data_dir = Path("/usr/share/onionkit")
......@@ -103,7 +104,7 @@ if not os.path.exists("/etc/amnesia"):
print("WARNING: Not installing base container rootfs: directory '%s' already exists" % base_rootfs)
else:
with TemporaryDirectory() as tmpdir:
sh.debootstrap("--include=%s" % ",".join(CONTAINER_ADDITIONAL_PACKAGES), "stretch", tmpdir,
sh.debootstrap("--include=%s" % ",".join(CONTAINER_ADDITIONAL_PACKAGES), CONTAINER_DEBIAN_RELEASE, tmpdir,
_out=sys.stdout.buffer, _err=sys.stderr.buffer)
base_rootfs.mkdir(mode=0o700, parents=True)
sh.mv(glob.glob(os.path.join(tmpdir, "*")), base_rootfs)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment