Commit aaa3fa64 authored by Dan Baston's avatar Dan Baston

Merge branch 'forecast-cron' into 'master'

Support early downloading and preprocessing of forecasts

See merge request !20
parents c454335b 168244d1
Pipeline #72274467 passed with stages
in 19 minutes and 57 seconds
......@@ -26,6 +26,7 @@ import datetime
import os
from urllib.request import urlopen
def parse_args(args):
parser = argparse.ArgumentParser('Download a CFSv2 forecast GRIB file')
......@@ -43,6 +44,7 @@ def parse_args(args):
return parsed
def download(url, output_dir):
fname = url.rsplit('/', 1)[-1]
......@@ -69,17 +71,23 @@ def main(raw_args):
gribfile = "flxf.01.{TIMESTAMP}.{TARGET}.avrg.grib.grb2".format(TIMESTAMP=args.timestamp,
TARGET=args.target)
start_of_rolling_archive = datetime.datetime.now() - datetime.timedelta(days=6) # should have 7 days but doesn't always
start_of_rolling_archive = datetime.datetime.now() - datetime.timedelta(days=8) # should have 7 days but could have more or less
timestamp_datetime = datetime.datetime(year, month, day, hour)
if timestamp_datetime > datetime.datetime.utcnow():
print("Can't download forecast with timestamp in the future.", file=sys.stderr)
sys.exit(1)
url_patterns = []
if timestamp_datetime > start_of_rolling_archive:
print("Using rolling archive URL")
url_patterns = (
print("Attempting rolling archive URL")
url_patterns += (
'https://nomads.ncep.noaa.gov/pub/data/nccf/com/cfs/prod/cfs/cfs.{YEAR:04d}{MONTH:02d}{DAY:02d}/{HOUR:02d}/monthly_grib_01/{GRIBFILE}',
)
else:
print("Using long-term archive URL")
url_patterns = (
print("Attempting long-term archive URL")
url_patterns += (
'ftp://nomads.ncdc.noaa.gov/modeldata/cfsv2_forecast_mm_9mon/{YEAR:04d}/{YEAR:04d}{MONTH:02d}/{YEAR:04d}{MONTH:02d}{DAY:02d}/{TIMESTAMP}/{GRIBFILE}',
'https://nomads.ncdc.noaa.gov/modeldata/cfsv2_forecast_mm_9mon/{YEAR:04d}/{YEAR:04d}{MONTH:02d}/{YEAR:04d}{MONTH:02d}{DAY:02d}/{TIMESTAMP}/{GRIBFILE}',
)
......
# Copyright (c) 2018 ISciences, LLC.
# Copyright (c) 2018-2019 ISciences, LLC.
# All rights reserved.
#
# WSIM is licensed under the Apache License, Version 2.0 (the "License").
......@@ -13,10 +13,11 @@
# This configuration file is provided as an example of an automated operational WSIM workflow.
import datetime
import os
import re
from typing import List
from typing import List, Optional
from wsim_workflow import commands
from wsim_workflow import dates
......@@ -428,15 +429,22 @@ class CFSConfig(ConfigBase):
def result_fit_years(self):
return range(1950, 2010) # 1950-2009
def forecast_ensemble_members(self, yearmon):
def forecast_ensemble_members(self, yearmon, *, lag_hours: Optional[int] = None):
# Build an ensemble of 28 forecasts by taking the four
# forecasts issued on each of the last 7 days of the month.
last_day = dates.get_last_day_of_month(yearmon)
return ['{}{:02d}{:02d}'.format(yearmon, day, hour)
year, month = dates.parse_yearmon(yearmon)
members = [datetime.datetime(year, month, day, hour)
for day in range(last_day - 6, last_day + 1)
for hour in (0, 6, 12, 18)]
if lag_hours is not None:
members = [m for m in members if datetime.datetime.utcnow() - m > datetime.timedelta(hours=lag_hours)]
return [m.strftime('%Y%m%d%H') for m in members]
def forecast_targets(self, yearmon):
return dates.get_next_yearmons(yearmon, 9)
......
......@@ -75,6 +75,9 @@ def parse_args(args):
parser.add_argument('--stop',
help='End date in YYYYMM format',
required=False)
parser.add_argument('--forecast-lag-hours',
type=int,
help="Only attempt to download forecasts issued within the specified number of hours")
parsed = parser.parse_args(args)
......@@ -101,7 +104,8 @@ def main(raw_args):
no_spinup=args.nospinup,
forecasts=args.forecasts,
run_electric_power=not args.noelectric,
run_agriculture=not args.noagriculture)
run_agriculture=not args.noagriculture,
forecast_lag_hours=args.forecast_lag_hours)
duplicate_targets = workflow.find_duplicate_targets(steps)
if duplicate_targets:
......
......@@ -110,10 +110,12 @@ class ConfigBase(metaclass=abc.ABCMeta):
"""
return []
def forecast_ensemble_members(self, yearmon: str) -> List[str]:
def forecast_ensemble_members(self, yearmon: str, *, lag_hours: Optional[int] = None) -> List[str]:
"""
Provides a list of forecast ensemble members for a given YYYYMM, or
an empty list if the configuration does not contain forecasts.
If `lag_hours` is provided, only return ensemble members generated
within more than `lag_hours` from present time.
"""
return []
......
# Copyright (c) 2018 ISciences, LLC.
# Copyright (c) 2018-2019 ISciences, LLC.
# All rights reserved.
#
# WSIM is licensed under the Apache License, Version 2.0 (the "License").
......@@ -11,7 +11,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List
import datetime
from typing import Dict, List, Optional
from .actions import \
composite_anomalies, \
......@@ -94,25 +96,42 @@ def monthly_observed(config: Config, yearmon: str, meta_steps: Dict[str, Step])
return steps
def monthly_forecast(config: Config, yearmon: str, meta_steps: Dict[str, Step]) -> List[Step]:
def monthly_forecast(config: Config,
yearmon: str,
meta_steps: Dict[str, Step],
*, forecast_lag_hours: Optional[int] = None) -> List[Step]:
steps = []
if forecast_lag_hours is not None:
available = len(config.forecast_ensemble_members(yearmon, lag_hours=forecast_lag_hours))
total = len(config.forecast_ensemble_members(yearmon))
if total - available > 0:
print('Omitting prep steps for {} forecasts generated after {}'.format(
total-available,
(datetime.datetime.utcnow() - datetime.timedelta(hours=forecast_lag_hours)).strftime('%Y%m%d%H')))
for target in config.forecast_targets(yearmon):
lead_months = get_lead_months(yearmon, target)
print('Generating steps for', yearmon, 'forecast target', target)
for member in config.forecast_ensemble_members(yearmon):
for member in config.forecast_ensemble_members(yearmon, lag_hours=forecast_lag_hours):
if config.should_run_lsm(yearmon):
# Prepare the dataset for use (convert from GRIB to netCDF, etc.)
steps += config.forecast_data().prep_steps(yearmon=yearmon, target=target, member=member)
steps += meta_steps['prepare_forecasts'].require(
config.forecast_data().prep_steps(yearmon=yearmon, target=target, member=member))
# Bias-correct the forecast
steps += correct_forecast(config.forecast_data(),
member=member, target=target, lead_months=lead_months)
steps += meta_steps['prepare_forecasts'].require(
correct_forecast(config.forecast_data(), member=member, target=target, lead_months=lead_months))
# Assemble forcing inputs for forecast
steps += create_forcing_file(config.workspace(), config.forecast_data(),
yearmon=yearmon, target=target, member=member)
steps += meta_steps['prepare_forecasts'].require(
create_forcing_file(config.workspace(), config.forecast_data(),
yearmon=yearmon, target=target, member=member))
for member in config.forecast_ensemble_members(yearmon):
if config.should_run_lsm(yearmon):
# Run LSM with forecast data
steps += run_lsm(config.workspace(), config.static_data(),
yearmon=yearmon, target=target, member=member, lead_months=lead_months)
......
#!/usr/bin/env python3
# Copyright (c) 2018 ISciences, LLC.
# Copyright (c) 2018-2019 ISciences, LLC.
# All rights reserved.
#
# WSIM is licensed under the Apache License, Version 2.0 (the "License").
......@@ -15,7 +15,7 @@
import os
from typing import List
from typing import List, Optional
from . import agriculture
from . import dates
......@@ -42,15 +42,16 @@ def find_duplicate_targets(steps: List[Step]) -> List[Step]:
def get_meta_steps():
return {name: Step.create_meta(name) for name in (
'all_fits',
'all_composites',
'all_monthly_composites',
'agriculture_assessment',
'all_adjusted_composites',
'all_adjusted_monthly_composites',
'all_composites',
'all_fits',
'all_monthly_composites',
'electric_power_assessment',
'forcing_summaries',
'prepare_forecasts',
'results_summaries',
'electric_power_assessment',
'agriculture_assessment'
)}
......@@ -59,6 +60,7 @@ def generate_steps(config: ConfigBase, *,
stop: str,
no_spinup: bool,
forecasts: str,
forecast_lag_hours: Optional[int] = None,
run_electric_power: bool,
run_agriculture: bool) -> List[Step]:
steps = []
......@@ -84,7 +86,7 @@ def generate_steps(config: ConfigBase, *,
steps += agriculture.monthly_observed(config, yearmon, meta_steps)
if forecasts == 'all' or (forecasts == 'latest' and i == 0):
steps += monthly.monthly_forecast(config, yearmon, meta_steps)
steps += monthly.monthly_forecast(config, yearmon, meta_steps, forecast_lag_hours=forecast_lag_hours)
if run_electric_power:
steps += electric_power.monthly_forecast(config, yearmon, meta_steps)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment