Commit 80331b54 authored by Hugh Brown's avatar Hugh Brown

Merge branch 'feature/add-batch-command' into 'master'

Add polaris batch command, plus some changes to config file to support it

See merge request crespum/polaris!83
parents 587ab52a 1bd824ed
Pipeline #129362847 passed with stages
in 7 minutes and 16 seconds
......@@ -19,9 +19,11 @@ If you want to **know more**:
contrib/ - code that is not directly dependent on Polaris, but is used in the project
docs/ - Some documentation on the project (though more is in the wiki)
polaris/ - Project source code
common/ - Modules common to all of Polaris
fetch/ - Module to fetch and prepare data for the analysis
viz/ - Module to visualize the analysis results
learn/ - Module to perform the data analysis
batch/ - Module to perform batch operations
polaris.py - Polaris entry point
tests/ - Project unit tests
......@@ -59,10 +61,10 @@ Options:
--help Show this message and exit.
Commands:
fetch Download data set(s)
learn Analyze data
viz Display results
batch Run polaris commands in batch mode
fetch Download data set(s)
learn Analyze data
viz Displaying results
# To fetch and decode data from the SatNOGS network, run:
$ (.venv) polaris fetch -s 2019-08-10 -e 2019-10-5 LightSail-2 /tmp/
......@@ -91,11 +93,27 @@ $ (.venv) polaris learn -g /tmp/new_graph.json /tmp/normalized_frames.json
$ (.venv) polaris viz /tmp/new_graph.json
# Then visit http://localhost:8080 in your browser
```
## Batch operations
Batch operations allow automation of repeated steps. For example:
- running `polaris fetch` so that it fetches the latest data for a particular satellite, then running `polaris learn` to update the model
- running `polaris fetch`, `polaris learn` and `polaris viz` as part of an integration test
The `polaris batch` command is controlled by a JSON configuration file; an example can be found at `polaris/common/polaris_config.json.EXAMPLE`.
```bash
$ (.venv) polaris batch --config_file polaris/common/polaris_config.json.EXAMPLE
```
## MLflow
Installing Polaris will install MLflow as a dependency. At this time Polaris is using MLflow during the cross check dependencies process and the database is stored in the current working directory under the mlruns folder.
Installing Polaris will install MLflow as a dependency. At this time Polaris is using MLflow during the cross check dependencies process, and the database is stored in the current working directory under the mlruns folder.
To view the logs in MLflow, run this command in the directory that holds the `mlruns` folder (by default, this is the project root directory):
To view the logs into MLflow, you have to run that command line from where the mlruns folder is located :
```bash
$ mlflow ui
```
......
"""Module for running polaris batch commands
"""
import datetime
import json
import logging
import subprocess
import sys
import time
from polaris.common.config import InvalidConfigurationFile, PolarisConfig
LOGGER = logging.getLogger(__name__)
def log_batch_operation(config, command, retcode):
"""Log batch operations
:param command: command that was run
:param retcode: exit code from that command
"""
LOGGER.info("%s Command: %s Retcode: %d", config.name, command, retcode)
def find_last_fetch_date(config):
"""Find the date fetch was last run successfully.
Note: we assume here that if fetch was run successfully, then we
have all the data we need up to that point. That is, we
explicitly are ignoring the possibility that:
- "polaris fetch -e 2019-12-01" was run...
- ...it exited with exit code 0...
- ...but for some reason, we don't have all the data from that
day.
:param config: polaris configuration for satellite
:return: time of last fetch date as timetuple
"""
normalized_frame_file = config.normalized_file_path
LOGGER.debug('Trying to find last fetch date in %s', normalized_frame_file)
# Copy-pasta of code in data_fetch_decoder.py. Refactor.
try:
with open(normalized_frame_file) as f_handle:
try:
decoded_frame_list = json.load(f_handle)
except json.JSONDecodeError:
LOGGER.error("Cannot load % - is it a valid JSON document?",
normalized_frame_file)
raise json.JSONDecodeError
dates = [i['time'] for i in decoded_frame_list['frames']]
latest_date = sorted(dates,
key=lambda x: datetime.datetime.strptime(
x, "%Y-%m-%d %H:%M:%S"))[-1]
latest_date = datetime.datetime.strptime(latest_date,
"%Y-%m-%d %H:%M:%S")
return latest_date.timetuple()
except FileNotFoundError:
return None
def build_date_arg(last_fetch_date=None):
"""Build date argument for fetch.
:param last_fetch_date: Date of last successful fetch.
"""
def tformat(timestamp):
"""Standard format for time arguments
"""
return time.strftime('%Y-%m-%d', timestamp)
if last_fetch_date is None:
LOGGER.info('No previous fetch run for this sat, fetching everything')
start_date = time.gmtime(0) # Beginning of time
else:
start_date = last_fetch_date
now = time.gmtime()
return "--start_date {} --end_date {}".format(tformat(start_date),
tformat(now))
def build_fetch_args(config):
"""Build arguments for fetch command when invoked from batch.
:param config: polaris configuration for satellite
"""
cache_arg = '--cache_dir {}'.format(config.cache_dir)
last_fetch_date = find_last_fetch_date(config)
date_arg = build_date_arg(last_fetch_date=last_fetch_date)
norm_file = config.normalized_file_path
return '{} {} {} {}'.format(cache_arg, date_arg, config.name, norm_file)
def build_learn_args(config):
"""Build arguments for learn command when invoked from batch
:param config: polaris configuration for satellite
"""
norm_file = config.normalized_file_path
output_graph_file = config.output_graph_file
return '--output_graph_file {} {}'.format(output_graph_file, norm_file)
def build_viz_args(config):
"""Build arguments for viz command when invoked from batch
:param config: polaris configuration for satellite
"""
output_graph_file = config.output_graph_file
return '--graph_file {}'.format(output_graph_file)
def maybe_run(cmd=None, config=None, dry_run=False):
"""Run polaris command for a particular satellite
:param cmd: command to run
:param config: polaris configuration for satellite
:param dry_run: bool for dry run mode
"""
# First, check the configuration to see if we're meant to run this
# command.
if config.should_batch_run(cmd) is False:
return
LOGGER.info('Running polaris %s for %s', cmd, config.name)
arg_builder = {}
arg_builder['fetch'] = build_fetch_args
arg_builder['learn'] = build_learn_args
arg_builder['viz'] = build_viz_args
args = arg_builder[cmd](config)
full_cmd = 'polaris {} {}'.format(cmd, args)
LOGGER.debug(full_cmd)
if dry_run is True:
return
process_info = subprocess.run(full_cmd.split())
log_batch_operation(config, full_cmd, process_info.returncode)
try:
process_info.check_returncode()
except subprocess.CalledProcessError:
LOGGER.warning("%s failed")
if config.batch_stop_at_first_failure is True:
LOGGER.critical("Batch configured to exit on failure")
sys.exit(1)
def batch(config_file, dry_run):
"""Run polaris fetch and learn non-interactively, based on configuration file.
:param config_file: path to config file for batch
:param dry_run: Bool for dry run mode
"""
try:
config = PolarisConfig(file=config_file)
except FileNotFoundError:
LOGGER.critical("Cannot find or open config file %s", config_file)
sys.exit(1)
except InvalidConfigurationFile:
LOGGER.critical("Configuration file %s is invalid", config_file)
sys.exit(1)
for cmd in ['fetch', 'learn', 'viz']:
maybe_run(cmd=cmd, config=config, dry_run=dry_run)
......@@ -6,6 +6,11 @@ import json
from mergedeep import merge
class InvalidConfigurationFile(Exception):
"""Raised when the config file is invalid
"""
# Disabling check for public methods; the python_json_config class has
# all the methods we need, and we're explicitly deferring to it.
class PolarisConfig():
......@@ -38,7 +43,10 @@ class PolarisConfig():
defaults = defaults or self._DEFAULT_SETTINGS
with open(file) as f_handle:
# data from file overrides the defaults
self._data = merge({}, defaults, json.load(f_handle))
try:
self._data = merge({}, defaults, json.load(f_handle))
except json.decoder.JSONDecodeError:
raise InvalidConfigurationFile
@property
def root_dir(self):
......@@ -107,3 +115,25 @@ class PolarisConfig():
"""Batch settings
"""
return self._data['satellite']['batch']
@batch_settings.setter
def batch_settings(self, new_settings):
"""Update batch settings
@param new_settings: dictionary of all fetch settings
"""
self._data['satellite']['batch'] = new_settings
def should_batch_run(self, cmd):
"""Return True if the configuration for batch says we should run this
command; else, return False
"""
return self.batch_settings[cmd]
@property
def batch_stop_at_first_failure(self):
"""Setting for whether batch should stop if a subcommand fails.
Hardcoded for now, but we can change that later if needed.
"""
return True
......@@ -4,6 +4,8 @@ Helpers to write data file as input for different targets.
"""
import json
import os
from pathlib import Path
import numpy as np
......@@ -63,5 +65,15 @@ def heatmap_to_graph(heatmap,
mdict[source][target]
})
# Create parent directory if not already present
create_parent_directory(output_graph_file)
with open(output_graph_file, "w") as graph_file:
json.dump(graph_dict, graph_file, indent=constants.JSON_INDENT)
def create_parent_directory(file_name):
"""Create parent directory for file if needed
"""
directory = os.path.dirname(file_name)
Path(directory).mkdir(parents=True, exist_ok=True)
......@@ -6,6 +6,7 @@ import logging
import click
from polaris import __version__
from polaris.batch.batch import batch
from polaris.fetch.data_fetch_decoder import data_fetch_decode_normalize
from polaris.learn.analysis import cross_correlate, feature_extraction
from polaris.viz.server import launch_webserver
......@@ -134,8 +135,29 @@ def cli_viz(graph_file):
launch_webserver(graph_file)
@click.command('batch', short_help='Run polaris commands in batch mode')
@click.option('--config_file',
is_flag=False,
required=False,
default='polaris_config.json',
type=click.Path(resolve_path=True),
help='Config file for polaris batch.')
@click.option('--dry-run/--no-dry-run',
required=False,
default=False,
help='Show what would be run in batch mode')
def cli_batch(config_file, dry_run):
""" Run polaris from batch: runs polaris commands non-interactively
:param config_file: path to configuration file
:param dry_run: Bool for dry run mode
"""
batch(config_file, dry_run)
# click doesn't automagically add the commands to the group
# (and thus to the help output); you have to do it manually.
cli.add_command(cli_fetch)
cli.add_command(cli_learn)
cli.add_command(cli_viz)
cli.add_command(cli_batch)
"""
Module for testing batch.py script.
"""
import subprocess
import time
from datetime import date, timedelta
import polaris.batch.batch as batch
from polaris.common.config import PolarisConfig
TODAY_TIMESTRUCT = time.gmtime()
TODAY_STRING = time.strftime('%Y-%m-%d', TODAY_TIMESTRUCT)
YESTERDAY_TIMESTRUCT = (date.today() - timedelta(days=1)).timetuple()
YESTERDAY_STRING = time.strftime('%Y-%m-%d', YESTERDAY_TIMESTRUCT)
BEGINNING_OF_TIME_TIMESTRUCT = time.gmtime(0)
BEGINNING_OF_TIME_STRING = time.strftime('%Y-%m-%d',
BEGINNING_OF_TIME_TIMESTRUCT)
def test_build_date_arg_no_last_fetch():
"""Test build_date_arg function, with no last fetch
"""
last_fetch_date = None
date_arg = batch.build_date_arg(last_fetch_date)
assert date_arg == '--start_date 1970-01-01 --end_date ' + TODAY_STRING
def test_build_date_arg_last_fetch_yesterday():
"""Test build_date_arg function, with last fetch date of yesterday
"""
last_fetch_date = YESTERDAY_TIMESTRUCT
date_arg = batch.build_date_arg(last_fetch_date)
assert date_arg == '--start_date ' + YESTERDAY_STRING + \
' --end_date ' + TODAY_STRING
def test_maybe_run(polaris_config, tmp_path, mocker):
"""Test maybe_run function
"""
# Disable the no-member test; this does not play well with the
# mocked subprocess.run()
# pylint: disable=no-member
fullpath = tmp_path / 'maybe_run_config.json'
with open(fullpath.as_posix(), 'w') as f_handle:
f_handle.write(polaris_config)
config_from_file = PolarisConfig(file=fullpath)
my_batch_settings = {'fetch': True, 'learn': False, 'viz': True}
config_from_file.batch_settings = my_batch_settings
mocker.patch('subprocess.run')
# Given the configuration above, we expect that fetch will run;
# learn will not; and viz will run. To test that, we call each
# one, and check call_count() after each to make sure it's what we
# expect.
batch.maybe_run(cmd='fetch', config=config_from_file)
assert subprocess.run.call_count == 1
# Should not be called, so call count should not change...
batch.maybe_run(cmd='learn', config=config_from_file)
assert subprocess.run.call_count == 1
# This *should* be called, so the call count should be 2 here.
batch.maybe_run(cmd='viz', config=config_from_file)
assert subprocess.run.call_count == 2
......@@ -69,3 +69,23 @@ def test_polaris_configuration_normalized_file_path(polaris_config, tmp_path):
config_from_file = PolarisConfig(file=fullpath)
expected_path = '/tmp/polaris/LightSail-2/cache/normalized_frames.json'
assert config_from_file.normalized_file_path == expected_path
def test_should_batch_run(polaris_config, tmp_path):
"""Test should_batch_run(), including updates
"""
fullpath = tmp_path / 'simple_config.json'
with open(fullpath.as_posix(), 'w') as f_handle:
f_handle.write(polaris_config)
config_from_file = PolarisConfig(file=fullpath)
# Supply our own batch settings, and test those
config_from_file.batch_settings = {
'fetch': True,
'learn': False,
'viz': True
}
assert config_from_file.should_batch_run('fetch') is True
assert config_from_file.should_batch_run('learn') is False
assert config_from_file.should_batch_run('viz') is True
......@@ -9,6 +9,7 @@ astroid = 2.2.5
pylint = 2.3.1
pytest = 4.4.2
pytest-cov = 2.7.1
pytest-mock = 2.0.0
[testenv:flake8]
deps =
......@@ -58,6 +59,7 @@ commands =
deps =
pytest=={[depversions]pytest}
pytest-cov=={[depversions]pytest-cov}
pytest-mock=={[depversions]pytest-mock}
commands = pytest -v --cov=polaris --cov=contrib tests
[testenv:pylint]
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment