Commit 6650b777 authored by Conor Anderson's avatar Conor Anderson

Add the plumbing code

parent bfc299f4
......@@ -2,5 +2,6 @@
.Rproj.user
deploy.sh
local_settings.R
plumber_settings.R
packrat/lib*/
packrat/src/
packrat/lib*
packrat/src
.dockerignore
.git
.gitignore
.Rhistory
.Rproj.user
FROM trestletech/plumber
MAINTAINER Conor Anderson <[email protected]>
COPY app/plumber.R /app/plumber.R
RUN apt-get update &&\
apt-get install -y --no-install-recommends curl libnetcdf-dev &&\
apt-get clean && rm -rf /tmp/* /var/lib/apt/lists/*
RUN cd /app &&\
sed -rn 's/library\((.*)\)/\1/p' plumber.R | sort | uniq > needed_packages &&\
curl -O https://gitlab.com/ConorIA/conr_ca/raw/master/misc/install_pkgs.R &&\
Rscript -e "source('install_pkgs.R')" &&\
rm -rf /tmp/* needed_packages install_pkgs.R
CMD ["/app/plumber.R"]
library(plumber)
library(dplyr)
library(ncdf4.helpers)
library(stringr)
library(ncdf4)
library(zoo)
library(lubridate)
library(raster)
library(storr)
library(purrr)
library(tibble)
library(jsonlite)
source("plumber_settings.R")
## FIXME: This is the nuclear option.
if (!st_meta$exists("cache_ver") || st_meta$get("cache_ver") != cache_ver) {
message("Metadata cache is invalid. Deleting.")
st_meta$clear()
st_meta$set("cache_ver", cache_ver)
}
#* @filter checkAuth
function(req, res){
auth = sub("Basic ", "", req$HTTP_AUTHORIZATION)
if (!(rawToChar(jsonlite::base64_dec(auth)) %in% paste(users, passwords, sep = ":"))) {
res$status <- 401 # Unauthorized
return(list(error="Authentication required"))
} else {
plumber::forward()
}
}
get_choices <- function(var, lim) {
filenames <- if (lim) {
grep("200601-210012.nc$", dir(file.path(file_dir, var, "verified")), value = TRUE)
} else {
dir(file.path(file_dir, var, "verified"))
}
choices <- as_tibble(t(sapply(filenames, ncdf4.helpers::get.split.filename.cmip5)))
names(choices) <- c("Variable", "Period", "Model", "Scenario", "Ensemble", "Range", "Start", "End")
choices <- choices %>%
mutate(Model = gsub("([0-9])-([0-9])", paste0("\\1", ".", "\\2"), gsub("([0-9])-([0-9])", paste0("\\1", ".", "\\2"), Model)),
Scenario = sub("rcp([0-9])([0-9])", paste0("RCP", "\\1", ".", "\\2"), Scenario),
Filenames = filenames)
}
#* Return the bounds of models
#* @param var the variable for which to query
#* @serializer contentType list(type="application/octet-stream")
#* @post /bounds
function(var) {
if (st_meta$exists(paste0("lookup_table_bounds_", var))) {
bounds_tab <- st_meta$get(paste0("lookup_table_bounds_", var))
} else {
choices <- get_choices(var, FALSE)
## As far as I can tell, the model bounds never change, regardless of scenario, experiment etc. So we just need one file for each
# Note, we could confirm this if it weren't for https://github.com/tidyverse/dplyr/issues/3088
choices <- choices %>% group_by(Model) %>% summarize(Filename = head(Filenames, 1))
get_bounds <- function(file_dir, var, x) {
nc_nc <- nc_open(file.path(file_dir, var, "verified", x))
lon_bnds <- try(ncvar_get(nc_nc, "lon_bnds"), silent = TRUE)
if (inherits(lon_bnds, "try-error")) {
lon_bnds <- ncvar_get(nc_nc, "lon_bounds")
}
lat_bnds <- try(ncvar_get(nc_nc, "lat_bnds"), silent = TRUE)
if (inherits(lat_bnds, "try-error")) {
lat_bnds <- ncvar_get(nc_nc, "lat_bounds")
}
c(Lat_Bot = list(lat_bnds[1,]), Lat_Top = list(lat_bnds[2,]),
Lon_Lef = list(lon_bnds[1,]), Lon_Rig = list(lon_bnds[2,]))
}
bounds <- choices$Filename %>%
map(~get_bounds(file_dir, var, .)) %>%
map_df(~tibble(Lat_Bot = .[1], Lat_Top = .[2], Lon_Lef = .[3], Lon_Rig = .[4]))
bounds_tab <- bind_cols(choices %>% dplyr::select(-`Filename`), bounds)
st_meta$set(key = paste0("lookup_table_bounds_", var), value = bounds_tab)
}
serialize(bounds_tab, NULL)
}
#* Return the sum of two numbers
#* @param var The second number to add
#* @param lim Limit
#* @serializer contentType list(type="application/octet-stream")
#* @post /choices
function(var, lim) {
serialize(get_choices(var, lim), NULL)
}
#* Return the sum of two numbers
#* @param key The first number to add
#* @serializer contentType list(type="application/octet-stream")
#* @post /timeseries
function(key) {
time_tmp <- NULL
if (st_point$exists(key)) {
message("Hit the cache for ", key)
time_tmp <- st_point$get(key)
## Invalidate old cache entries.
if (is.null(attr(time_tmp, "cache_ver")) || attr(time_tmp, "cache_ver") != cache_ver) {
message("Cached version is invalid. Deleting.")
st_point$del(key)
time_tmp <- NULL
}
}
if (is.null(time_tmp)) {
message("Missed the cache for ", key)
filename <- str_extract(key, ".*\\.nc4?")
# Get the time
components <- get.split.filename.cmip5(filename)
nc_nc <- try(nc_open(file.path(file_dir, components[['var']], "verified", filename)))
if (inherits(nc_nc, "try-error")) nc_nc <- nc_open(file.path(file_dir, components[['var']], "verified", filename))
nc_time <- try(nc.get.time.series(nc_nc, v = components[['var']], time.dim.name = "time"), silent = TRUE)
if (inherits(nc_time, "try-error")) {
nc_time <- ncvar_get(nc_nc, "time")
nc_time_units <- ncatt_get(nc_nc, "time", attname="units")$value
nc_time_origin <- strsplit(nc_time_units, " ")
nc_time_origin <- unlist(nc_time_origin)[3]
nc_time <- as.yearmon(ymd(nc_time_origin) + nc_time)
if (paste0(format(nc_time[1], format = "%Y"), format(nc_time[1], format = "%m")) != unname(components['tstart']) || paste0(format(nc_time[length(nc_time)], format = "%Y"), format(nc_time[length(nc_time)], format = "%m")) != unname(components['tend'])) {
stop(paste("Error processing time for", filename))
}
} else {
nc_time <- as.yearmon(format(nc_time, format = "%Y-%m-%d hh:mm:ss"))
}
cells <- unlist(strsplit(sub(paste0(filename, "_"), "", key), "_"))
lon_cell <- cells[2]
lat_cell <- cells[1]
# Now load only that grid data
nc_var <- nc.get.var.subset.by.axes(nc_nc, components[['var']], axis.indices = list(X = lon_cell, Y = lat_cell))
# Close the nc connection
nc_close(nc_nc); rm(nc_nc)
# Note, does this still apply?
if (dim(nc_var)[1] > 1 || dim(nc_var)[2] > 1) {
warning("We got two latitud or longitude cells. We'll average across them.")
nc_var <- apply(nc_var, 3, mean)
} else {
nc_var <- nc_var[1, 1, ]
}
time_tmp <- tibble(Var = components[['var']],
Model = gsub("([0-9])-([0-9])", paste0("\\1", ".", "\\2"), gsub("([0-9])-([0-9])", paste0("\\1", ".", "\\2"), components[['model']])),
Scenario = sub("rcp([0-9])([0-9])", paste0("RCP", "\\1", ".", "\\2"), components[['emissions']]),
Ensemble = components[['run']],
Time = nc_time,
Year = format(as.yearmon(Time), format = "%Y"),
Month = format(as.yearmon(Time), format = "%m"),
Value = nc_var)
rm(nc_var, nc_time); gc()
attr(time_tmp, "cache_ver") <- cache_ver
st_point$set(key, time_tmp)
message("Cached data")
}
serialize(time_tmp, NULL)
}
#* Return the sum of two numbers
#* @param key The first number to add
#* @serializer contentType list(type="application/octet-stream")
#* @post /mapseries
function(key, var) {
if (st_avg$exists(key)) {
if (debug_flag) message("Hit the cache.")
map_data <- st_avg$get(key)
} else {
if (debug_flag) message("Missed the cache.")
filename <- str_extract(key, ".*\\.nc4?")
unlisted <- unlist(str_split(key, "_"))
nc_nc <- nc_open(file.path(file_dir, unlisted[1], "verified", filename), readunlim = FALSE)
nc_time <- try(nc.get.time.series(nc_nc, v = unlisted[1], time.dim.name = "time"))
if (inherits(nc_time, "try-error")) {
warning(paste(fs[f], "failed"))
break
}
nc_time <- as.yearmon(format(nc_time, format = "%Y-%m-%d hh:mm:ss"))
# Get the time that we are interested in
index_start <- min(which(format(nc_time, format = "%Y") == unlisted[length(unlisted)-1]))
index_end <- max(which(format(nc_time, format = "%Y") == unlisted[length(unlisted)]))
nc_lat <- ncvar_get(nc_nc, "lat")
nc_lon <- ncvar_get(nc_nc, "lon")
lon_bnds <- try(ncvar_get(nc_nc, "lon_bnds"), silent = TRUE)
if (inherits(lon_bnds, "try-error")) {
lon_bnds <- ncvar_get(nc_nc, "lon_bounds")
}
lat_bnds <- try(ncvar_get(nc_nc, "lat_bnds"), silent = TRUE)
if (inherits(lat_bnds, "try-error")) {
lat_bnds <- ncvar_get(nc_nc, "lat_bounds")
}
nc_var <- nc.get.var.subset.by.axes(nc_nc, unlisted[1], axis.indices = list(T = index_start:index_end))
# Close the nc connection
nc_close(nc_nc)
rm(nc_nc)
map_data_mat <- apply(nc_var, c(1,2), mean)
map_data <- raster(t(map_data_mat),
xmn = mean(lon_bnds[,1]), xmx = mean(lon_bnds[,ncol(lon_bnds)]),
ymn = mean(lat_bnds[,1]), ymx = mean(lat_bnds[,ncol(lat_bnds)]),
crs="+proj=longlat +datum=WGS84")
# This step fails if the model outputs the corner instead of the centre point.
map_data <- try(rotate(map_data))
if(inherits(map_data, "try-error")) {
warning(paste(key,
"failed mapping using bounds!"))
map_data <- raster(t(map_data_mat), xmn = (nc_lon[2] - nc_lon[1])/2, xmx = max(nc_lon) + (nc_lon[2] - nc_lon[1])/2, ymn = min(nc_lat), ymx = max(nc_lat), crs="+proj=longlat +datum=WGS84")
map_data <- rotate(map_data)
}
map_data <- flip(map_data, 'y')
st_avg$set(key, map_data)
if (debug_flag) message("Cached data.")
}
serialize(map_data, NULL)
}
#* Return the sum of two numbers
#* @serializer contentType list(type="application/octet-stream")
#* @param key The key to save
#* @post /cacheget
function(key){
serialize(
(if (st_meta$exists(key))st_meta$get(key) else "NULL"),
NULL)
}
#* Return the sum of two numbers
#* @param key The key to save
#* @post /cacheset
function(req, key){
message("The key is ", key)
st_meta$set(key = key, value = as_tibble(fromJSON(req$postBody)))
}
version: '2'
services:
cjtplumb:
image: conjuntool_plumber
restart: always
volumes:
- /data/climate_data:/data/climate_data
- /home/conor/git/R/conjuntool/plumber/plumber_settings.R:/plumber_settings.R:ro
- /home/conor/git/R/conjuntool/plumber/app/plumber.R:/app/plumber.R:ro
nginx:
image: nginx:1.9
volumes:
- /home/conor/git/R/conjuntool/plumber/nginx.conf:/etc/nginx/nginx.conf:ro
restart: always
depends_on:
- cjtplumb
lb:
image: 'dockercloud/haproxy:1.2.1'
links:
- cjtplumb
volumes:
- /var/run/docker.sock:/var/run/docker.sock
events {
worker_connections 4096; ## Default: 1024
}
http {
default_type application/octet-stream;
sendfile on;
tcp_nopush on;
server_names_hash_bucket_size 128; # this seems to be required for some vhosts
server {
listen 80 default_server;
listen [::]:80 default_server ipv6only=on;
root /usr/share/nginx/html;
index index.html index.htm;
server_name api.conr.ca;
location /conjuntool/ {
proxy_pass http://lb:80/;
proxy_set_header Host $host;
}
location ~ /\.ht {
deny all;
}
}
}
# Debug options
debug_flag = FALSE
# File storage location
file_dir = "/data/climate_data"
# Cache details
cache_root = file.path(file_dir, ".cache")
st_avg = storr::storr_rds(file.path(cache_root, "avg-cache"), default_namespace = "avg")
st_point = storr::storr_rds(file.path(cache_root, "point-cache"), default_namespace = "point")
st_meta = storr::storr_rds(file.path(cache_root, "meta-cache"), default_namespace = "meta")
cache_ver = "2018-12-06"
# Users and Passwords
users = c()
passwords = c()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment