Commit bc9605d9 authored by Hugh Brown's avatar Hugh Brown

Add polaris batch command

Signed-off-by: Hugh Brown's avatarHugh Brown (Saint Aardvark the Carpeted) <[email protected]>
parent 5e4a137a
"""Module for running polaris batch commands
"""
import datetime
import json
import logging
import subprocess
import time
from polaris.common.config import PolarisConfig
LOGGER = logging.getLogger(__name__)
def log_batch_operation(config, command, retcode):
"""Log batch operations
:param command: command that was run
:param retcode: exit code from that command
"""
LOGGER.info("%s Command: %s Retcode: %d", config.name, command, retcode)
def find_last_fetch_date(config):
"""Find the date fetch was last run successfully.
Note: we assume here that if fetch was run successfully, then we
have all the data we need up to that point. That is, we
explicitly are ignoring the possibility that:
- "polaris fetch -e 2019-12-01" was run...
- ...it exited with exit code 0...
- ...but for some reason, we don't have all the data from that
day.
:param config: polaris configuration for satellite
:return: time of last fetch date as timetuple
"""
normalized_frame_file = config.normalized_file_path
LOGGER.debug('Trying to find last fetch date in %s', normalized_frame_file)
# Copy-pasta of code in data_fetch_decoder.py. Refactor.
try:
with open(normalized_frame_file) as f_handle:
try:
decoded_frame_list = json.load(f_handle)
except json.JSONDecodeError:
LOGGER.error("Cannot load % - is it a valid JSON document?",
normalized_frame_file)
raise json.JSONDecodeError
dates = [i['time'] for i in decoded_frame_list['frames']]
latest_date = sorted(dates,
key=lambda x: datetime.datetime.strptime(
x, "%Y-%m-%d %H:%M:%S"))[-1]
latest_date = datetime.datetime.strptime(latest_date,
"%Y-%m-%d %H:%M:%S")
return latest_date.timetuple()
except FileNotFoundError:
return None
def build_date_arg(last_fetch_date=None):
"""Build date argument for fetch.
:param last_fetch_date: Date of last successful fetch.
"""
def tformat(timestamp):
"""Standard format for time arguments
"""
return time.strftime('%Y-%m-%d', timestamp)
if last_fetch_date is None:
LOGGER.info('No previous fetch run for this sat, fetching everything')
start_date = time.gmtime(0) # Beginning of time
else:
start_date = last_fetch_date
now = time.gmtime()
return "--start_date {} --end_date {}".format(tformat(start_date),
tformat(now))
def build_fetch_args(config):
"""Build arguments for fetch command when invoked from batch.
:param config: polaris configuration for satellite
"""
cache_arg = '--cache_dir {}'.format(config.cache_dir)
last_fetch_date = find_last_fetch_date(config)
date_arg = build_date_arg(last_fetch_date=last_fetch_date)
norm_file = config.normalized_file_path
return '{} {} {} {}'.format(cache_arg, date_arg, config.name, norm_file)
def build_learn_args(config):
"""Build arguments for learn command when invoked from batch
:param config: polaris configuration for satellite
"""
norm_file = config.normalized_file_path
output_graph_file = config.output_graph_file
return '--output_graph_file {} {}'.format(output_graph_file, norm_file)
def build_viz_args(config):
"""Build arguments for viz command when invoked from batch
:param config: polaris configuration for satellite
"""
output_graph_file = config.output_graph_file
return '--graph_file {}'.format(output_graph_file)
def maybe_run(cmd=None, config=None, dry_run=False):
"""Run polaris command for a particular satellite
:param cmd: command to run
:param config: polaris configuration for satellite
:param dry_run: bool for dry run mode
"""
# First, check the configuration to see if we're meant to run this
# command.
if config.should_batch_run(cmd) is False:
return
LOGGER.info('Running polaris %s for %s', cmd, config.name)
arg_builder = {}
arg_builder['fetch'] = build_fetch_args
arg_builder['learn'] = build_learn_args
arg_builder['viz'] = build_viz_args
args = arg_builder[cmd](config)
full_cmd = 'polaris {} {}'.format(cmd, args)
LOGGER.debug(full_cmd)
if dry_run is True:
return
process_info = subprocess.run(full_cmd.split())
log_batch_operation(config, full_cmd, process_info.returncode)
def batch(config_file, dry_run):
"""Run polaris fetch and learn non-interactively, based on configuration file.
:param config_file: path to config file for batch
:param dry_run: Bool for dry run mode
"""
config = PolarisConfig(file=config_file)
for cmd in ['fetch', 'learn', 'viz']:
maybe_run(cmd=cmd, config=config, dry_run=dry_run)
......@@ -107,3 +107,9 @@ class PolarisConfig():
"""Batch settings
"""
return self._data['satellite']['batch']
def should_batch_run(self, cmd):
"""Return True if the configuration for batch says we should run this
command; else, return False
"""
return self.batch_settings[cmd]
......@@ -6,6 +6,7 @@ import logging
import click
from polaris import __version__
from polaris.batch.batch import batch
from polaris.fetch.data_fetch_decoder import data_fetch_decode_normalize
from polaris.learn.analysis import cross_correlate, feature_extraction
from polaris.viz.server import launch_webserver
......@@ -134,8 +135,29 @@ def cli_viz(graph_file):
launch_webserver(graph_file)
@click.command('batch', short_help='Run polaris commands in batch mode')
@click.option('--config_file',
is_flag=False,
required=False,
default='polaris_config.json',
type=click.Path(resolve_path=True),
help='Config file for polaris batch.')
@click.option('--dry-run/--no-dry-run',
required=False,
default=False,
help='Show what would be run in batch mode')
def cli_batch(config_file, dry_run):
""" Run polaris from batch: runs polaris commands non-interactively
:param config_file: path to configuration file
:param dry_run: Bool for dry run mode
"""
batch(config_file, dry_run)
# click doesn't automagically add the commands to the group
# (and thus to the help output); you have to do it manually.
cli.add_command(cli_fetch)
cli.add_command(cli_learn)
cli.add_command(cli_viz)
cli.add_command(cli_batch)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment