Commit 8fc059bc authored by Leon Riesebos's avatar Leon Riesebos
Browse files

added core logger tool

parent 4e9b5896
Pipeline #292079449 passed with stage
in 2 minutes and 41 seconds
#!/usr/bin/env python3
UART logger for ARTIQ core devices.
import argparse
import logging
import typing
import datetime
import time
import os
import os.path
import serial # type: ignore
import dax_comtools.util.args
logger = logging.getLogger(__name__)
"""The module logger."""
def _get_timestamp(fmt: str = '%Y-%m-%d_%H:%M:%S') -> str:
"""Get a timestamp based on the given format."""
class CoreLogger:
"""UART logger for an ARTIQ core device."""
DEFAULT_BAUD_RATE: typing.ClassVar[int] = 115200
"""Default baud rate."""
DEFAULT_OUTPUT_DIR: typing.ClassVar[str] = './'
"""Default output directory."""
DEFAULT_RECONNECT_DELAY: typing.ClassVar[float] = 5.0
"""Default delay before reconnecting."""
def __init__(self, *,
port: str,
baud_rate: int = DEFAULT_BAUD_RATE,
output_dir: str = DEFAULT_OUTPUT_DIR):
"""Initialize the core logger class.
:param port: The port to connect to
:param baud_rate: Baud rate
:param output_dir: Output directory for the log files
assert isinstance(port, str)
assert isinstance(baud_rate, int)
assert isinstance(output_dir, str)
# Validate baud rate
if baud_rate not in serial.SerialBase.BAUDRATES:
raise ValueError('Invalid baud rate')
# Expand path and validate existence
self._output_dir = os.path.expanduser(output_dir)
if not os.path.isdir(self._output_dir):
raise NotADirectoryError(f'"{self._output_dir}" is not a valid directory')
# Create serial port'Opening serial connection to "{port}"')
self._serial = serial.Serial(
port=port, baudrate=baud_rate,
bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE
logger.debug(f'Serial: {self._serial}')
def start(self) -> None:
"""Start logging."""
# Generate file name
file_name = os.path.join(self._output_dir, f'{_get_timestamp()}.log')'Log file: {file_name}')
with open(file_name, mode='w') as file, self._serial:
while True:
# Get, print, and write the line
line = self._serial.readline().decode(encoding='utf8', errors='ignore')
line = f'[{_get_timestamp()}] {line}'
def run(cls, *, port: str, baud_rate: int, output_dir: str, reconnect_delay: float) -> None:
"""Run this core logger class in an infinite loop that reconnects if the connection is lost.
:param port: The port to connect to
:param baud_rate: Baud rate
:param output_dir: Output directory for the log files
:param reconnect_delay: Delay before reconnecting in seconds
assert isinstance(reconnect_delay, float)
assert reconnect_delay > 0.0
while True:
# Create the core logger
core_logger = cls(port=port, baud_rate=baud_rate, output_dir=output_dir)
except serial.SerialException as e:
logger.warning(f'Connection failed: {e}')
# Start logging
except serial.SerialException as e:
logger.warning(f'Connection lost: {e}')
# Insert delay'Waiting to reconnect ({reconnect_delay} s)...')
except ValueError:
logger.exception('Value error, logger terminated')
except OSError:
logger.exception('IO error, logger terminated')
except KeyboardInterrupt:'Core logger stopped by user')
def _get_args() -> argparse.Namespace:
# Create an argument parser
parser = argparse.ArgumentParser(description="UART logger for ARTIQ core devices")
group = parser.add_argument_group('Core device')
group.add_argument('port', type=str, help='Serial port to connect to')
group.add_argument('--baud-rate', type=int, default=CoreLogger.DEFAULT_BAUD_RATE,
help=f'Baud rate (default: {CoreLogger.DEFAULT_BAUD_RATE})')
group = parser.add_argument_group('Output')
group.add_argument('--output-dir', type=str, default=CoreLogger.DEFAULT_OUTPUT_DIR,
help=f'Output directory (default: {CoreLogger.DEFAULT_OUTPUT_DIR})')
def positive_float(arg: typing.Any) -> float:
f = float(arg)
except ValueError:
raise argparse.ArgumentTypeError(f"invalid float value: '{arg}'")
if not f > 0.0:
raise argparse.ArgumentTypeError(f'float value must be positive: {f}')
return f
group = parser.add_argument_group('Connection')
group.add_argument('--reconnect-delay', type=positive_float, default=CoreLogger.DEFAULT_RECONNECT_DELAY,
help=f'Delay before reconnecting in seconds (default: {CoreLogger.DEFAULT_RECONNECT_DELAY})')
# Add standard arguments
return parser.parse_args()
def main() -> None:
# Get the arguments
args = _get_args()
# Configure logger
# Run the core logger, baud_rate=args.baud_rate,
output_dir=args.output_dir, reconnect_delay=args.reconnect_delay)
if __name__ == '__main__':
......@@ -34,6 +34,7 @@ console_scripts =
# tool scripts
dct_owm_to_influx =
dct_kjl_sparc =
dct_core_logger =
dct_sensorpush_to_influx =
dct_sensorpush_generate_dashboard =
dct_ml_data_logger =
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment