Skip to content
Snippets Groups Projects

Draft: Use pydantic to represent configs. Rework constructors

Closed Cal Pratt requested to merge cpratt34/yaml-loader into master
Files
61
@@ -23,7 +23,7 @@ Create a BCS cleanup daemon
import sys
from datetime import timedelta
from os import PathLike
from typing import Any, Dict
from typing import Dict, Optional
import click
@@ -33,7 +33,9 @@ from buildgrid.server.cas.storage.storage_abc import StorageABC
from buildgrid.server.monitoring import MonitoringOutputFormat, MonitoringOutputType
from ..cli import Context, pass_context, setup_logging
from ..settings import parser
from ..settings.loader import load_settings
from ..settings.parser import construct_server_instance
from ..settings.schemas import ServerSettings
def parse_size(string: str) -> int:
@@ -141,8 +143,8 @@ def start(
sys.exit(-1)
batch_size_val = min(batch_size_val, high_watermark_val - low_watermark_val)
with open(config, encoding="utf-8") as f:
settings = parser.get_parser().safe_load(f)
with open(config, "r") as stream:
settings = load_settings(stream)
cleanup = None
try:
@@ -174,7 +176,7 @@ def start(
def _create_cleanup_from_config(
configuration: Dict[str, Any],
settings: ServerSettings,
dry_run: bool,
high_watermark: int,
low_watermark: int,
@@ -183,80 +185,46 @@ def _create_cleanup_from_config(
only_if_unused_for: timedelta,
) -> CASCleanUp:
"""Parses configuration and setup a fresh server instance."""
kargs: Dict[str, Any] = {
"dry_run": dry_run,
"high_watermark": high_watermark,
"low_watermark": low_watermark,
"sleep_interval": sleep_interval,
"batch_size": batch_size,
"only_if_unused_for": only_if_unused_for,
}
try:
instances = configuration["instances"]
except KeyError as e:
click.echo(f"Error: Section missing from configuration: {e}.", err=True)
sys.exit(-1)
try:
instance_storages = {}
instance_indexes = {}
for instance in instances:
instance_name = instance["name"]
scheduler_list = instance.get("schedulers", [])
for scheduler in scheduler_list:
try:
scheduler.watcher_keep_running = False
scheduler.pruner_keep_running = False
except AttributeError:
pass
storage_list = instance["storages"]
tmp_store = None
tmp_index = None
for storage in storage_list:
if isinstance(storage, IndexABC):
tmp_index = storage
elif isinstance(storage, StorageABC):
tmp_store = storage
if tmp_store and tmp_index:
instance_storages[instance_name] = tmp_store
instance_indexes[instance_name] = tmp_index
else:
click.echo(f"Warning: Skipping instance {instance_name}.", err=False)
kargs["storages"] = instance_storages
kargs["indexes"] = instance_indexes
except KeyError as e:
click.echo(f"Error: Storage/Index missing from configuration: {e}.", err=True)
sys.exit(-1)
if "monitoring" in configuration:
monitoring = configuration["monitoring"]
try:
if "enabled" in monitoring:
kargs["monitor"] = monitoring["enabled"]
if "endpoint-type" in monitoring:
kargs["mon_endpoint_type"] = MonitoringOutputType(monitoring["endpoint-type"])
if "endpoint-location" in monitoring:
kargs["mon_endpoint_location"] = monitoring["endpoint-location"]
if "serialization-format" in monitoring:
kargs["mon_serialisation_format"] = MonitoringOutputFormat(monitoring["serialization-format"])
if "metric-prefix" in monitoring:
# Ensure there's only one period at the end of the prefix
kargs["mon_metric_prefix"] = monitoring["metric-prefix"].strip().rstrip(".") + "."
except (ValueError, OSError) as e:
click.echo(f"Error: Configuration, {e}.", err=True)
sys.exit(-1)
return CASCleanUp(**kargs)
instance_storages: Dict[str, StorageABC] = {}
instance_indexes: Dict[str, IndexABC] = {}
for instance_settings in settings.instances:
instance = construct_server_instance(instance_settings)
instance_name = instance.name
for scheduler in instance.schedulers:
scheduler.watcher_keep_running = False
scheduler.pruner_keep_running = False
tmp_store: Optional[StorageABC] = None
tmp_index: Optional[IndexABC] = None
for storage in instance.storages:
if isinstance(storage, IndexABC):
tmp_index = storage
elif isinstance(storage, StorageABC):
tmp_store = storage
if tmp_store and tmp_index:
instance_storages[instance_name] = tmp_store
instance_indexes[instance_name] = tmp_index
else:
click.echo(f"Warning: Skipping instance {instance_name}.", err=False)
return CASCleanUp(
dry_run=dry_run,
high_watermark=high_watermark,
low_watermark=low_watermark,
sleep_interval=sleep_interval,
batch_size=batch_size,
only_if_unused_for=only_if_unused_for,
storages=instance_storages,
indexes=instance_indexes,
monitor=settings.monitoring.enabled,
mon_endpoint_type=MonitoringOutputType(settings.monitoring.endpoint_type),
mon_endpoint_location=settings.monitoring.endpoint_location,
mon_serialisation_format=MonitoringOutputFormat(settings.monitoring.serialization_format),
mon_metric_prefix=(
settings.monitoring.metric_prefix.strip().rstrip(".") + "." if settings.monitoring.metric_prefix else ""
),
)
Loading