Commit f872fb2c authored by Leonardo Silvestri's avatar Leonardo Silvestri

remote errors now throw; improve examples coherence and doc; add 'lapply'

parent 5800aef4
......@@ -84,3 +84,16 @@ RUnit_comm_etexprsublist <- function() {
RUnit_comm_zts <- function() {
all.equal(z1, con1 ? ++z1)
}
## communication errors:
RUnit_comm_remote_error_synchronous <- function() {
## note the assignment to 'a'; if not, like in the next test, the
## call in asynchronous and no exception is raised:
tryCatch(a <- (con1 ? "a" < 1),
.Last.error == "remote: invalid type for binary operator (string < double)")
}
RUnit_comm_remote_error_asynchronous <- function() {
## note the assignment to 'a'; if not, like in the next test, the
## call in asynchronous and no exception is raised:
tryCatch({ con1 ? "a" < 1; 2017 }, "hello") == 2017
}
## Copyright (C) 2017 Leonardo Silvestri
##
## This file is part of ztsdb.
##
## ztsdb is free software: you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation, either version 3 of the License, or
## (at your option) any later version.
##
## ztsdb is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with ztsdb. If not, see <http://www.gnu.org/licenses/>.
RUnit_lapply_array <- function() {
all.equal(lapply(1:5, function(x) x*2), list(2,4,6,8,10))
}
RUnit_lapply_array_builtin <- function() {
all.equal(lapply(1:5, sin), list(sin(1),sin(2),sin(3),sin(4),sin(5)))
}
RUnit_lapply_list <- function() {
l <- list(1,2,3,4,5)
all.equal(lapply(l, function(x) x*2), list(2,4,6,8,10))
}
RUnit_lapply_list_builtin <- function() {
l <- list(1,2,3,4,5)
all.equal(lapply(l, sin), list(sin(1),sin(2),sin(3),sin(4),sin(5)))
}
RUnit_lapply_incorrect_type <- function() {
tryCatch(lapply(function(x) x, sin), .Last.error=="function cannot be double subsetted")
}
......@@ -5,19 +5,23 @@ ztsdb.
# Running the examples
To run an example, start a ztsdb instance on port 123123 in the
To run an example, start a ztsdb instance on port 12300 in the
directory of the test
ztsdb -p 123123
ztsdb -p 12300
One can use an already running ztsdb instance, but one then might have
to change the `path` and the port number in the script file.
One can use an already running ztsdb instance, but one has to change
the `path` and the port number in the script file.
When there is only one file for a particular example then this file
should be sourced in the ztsdb instance. If there are multiple files,
the file that needs to be sourced by the ztsdb instance is postfixed
All files that need to be sourced by the ztsdb instance are postfixed
by _ztsdb_. Similarly a query file (runnable either in an R session or
by another ztsdb instance) is postfixed by _query_. Sourcing is done
with the following command:
source("file_to_source.R")
When running from an R session, make sure the 'rztsdb' package is
loaded:
library(rztsdb)
......@@ -24,7 +24,7 @@
## For more information, please refer to <http://unlicense.org/>
## create a connection 'c1' towards a ztsdb instance
c1 <- connection("127.0.0.1", 123123)
c1 <- connection("127.0.0.1", 12300)
## querying the futures data can then be made like this:
id <- "CD"
......
......@@ -3,15 +3,15 @@
## order to populate the data
## create a connection 'c1' towards a ztsdb instance
c1 <- connection("127.0.0.1", 123123)
c1 <- connection("127.0.0.1", 12300)
ids <- list(AAPL="AAPL", MSFT="MSFT", XOM="XOM")
adjusted_list <- lapply(ids, function(x) c1 ? get_adjusted_price(++x))
## The result will be a list of three 'xts' containing adjusted
## data:
## If run from R, the result will be a list of three 'xts' containing
## adjusted data:
##
## str(adjusted_list)
## List of 3
......@@ -42,3 +42,5 @@ adjusted_list <- lapply(ids, function(x) c1 ? get_adjusted_price(++x))
## xts Attributes:
## List of 1
## ..$ descr: chr ""
## If run from ztsdb, the result will be a list of three 'zts'.
......@@ -7,13 +7,24 @@ we create a list containing a time series:
idx <- as.time(NULL)
a <<- list(b=zts(idx, data))
In a terminal, run 'append', making sure to use the port on which the
ztsdb instance is listening (in this example we use 10000):
ztsdb instance is listening (in this example we use 19300):
./append 127.0.0.1 10000 10 a,b 3 100
./append 127.0.0.1 19300 10 a,b 3 100
This will run append generating 10 messages per second until 100
This will run 'append', generating 10 messages per second until 100
messages have been sent, updating element 'b' of list 'a' which has 3
columns.
The append command line usage is:
usage: append <ip> <port> <rate> <varname[,name1,name2,...]> <ncols> [max-msgs]
Where 'ip' and 'port' are the IP address and port of the remote ztsdb
instance, rate is the number of messages per second to generate,
'varname' is the name of the variable on the remote instance that
contains the time-series. If the time-series is nested in a list,
'name1', 'name2' are used optionally to indicate recursively the list
entry names. 'ncols' is the number of columns of the time-series to
update. Optionally, the number 'max-msgs' can be specified to stop
appending after reaching that number.
......@@ -77,6 +77,8 @@ SET(SOURCE_FILES
interp.hpp
load_builtin.cpp
load_builtin.hpp
load_function.cpp
load_function.hpp
logging.hpp
misc.cpp
misc.hpp
......
......@@ -7,13 +7,14 @@ include Makefile.rules
SRCS = main_parser/lexer.cpp main_parser/parser.cpp \
config_parser/cfglexer.cpp config_parser/cfgparser.cpp \
parser_ctx.cpp anf.cpp array.cpp interp.cpp ast.cpp \
load_builtin.cpp valuevar.cpp valuevar_ic.cpp dname.cpp \
net_handler.cpp msg_handler.cpp interp_ctx.cpp ztsdb.cpp \
encode.cpp misc.cpp zts.cpp display.cpp base_funcs.cpp \
base_funcs_math.cpp base_funcs_array.cpp \
base_funcs_array_idx.cpp base_funcs_set.cpp base_funcs_ic.cpp \
base_funcs_roll.cpp conversion_funcs.cpp csv.cpp string.cpp \
base_types.cpp timezone/ztime.cpp timezone/zone.cpp \
load_builtin.cpp load_function.cpp valuevar.cpp \
valuevar_ic.cpp dname.cpp net_handler.cpp msg_handler.cpp \
interp_ctx.cpp ztsdb.cpp encode.cpp misc.cpp zts.cpp \
display.cpp base_funcs.cpp base_funcs_math.cpp \
base_funcs_array.cpp base_funcs_array_idx.cpp \
base_funcs_set.cpp base_funcs_ic.cpp base_funcs_roll.cpp \
conversion_funcs.cpp csv.cpp string.cpp base_types.cpp \
timezone/ztime.cpp timezone/zone.cpp \
timezone/ztime_vector.cpp timezone/localtime.cpp \
unop_binop_funcs.cpp config_ctx.cpp config.cpp \
interp_error.cpp zcpp.cpp period.cpp
......
......@@ -109,6 +109,7 @@ static Array<T> makeVector(arr::Array<T>& r,
case val::vt_clos:
case val::vt_connection:
case val::vt_timer:
case val::vt_zts:
r.concat(val::getVal(e), val::getName(e));
break;
case val::vt_null: // like in R, NULLs are just ignored in 'c'
......@@ -167,6 +168,9 @@ val::Value funcs::c(vector<val::VBuiltinG::arg_t>& v, zcore::InterpCtx& ic) {
std::cout << "c..." << std::endl;
#endif
auto vt = getCType(v.cbegin(), v.cend()); // examines all elts to determine type of vector
if (vt == val::vt_zts) {
vt = val::vt_list;
}
switch (vt) {
case val::vt_double:
......
......@@ -170,12 +170,14 @@ size_t zcore::InterpCtx::readRspData(Global::reqid_t reqid,
#ifdef DEBUG
cout << "| rsp->second.name: " << rsp->second.name << endl;
#endif
++stats.nbInRSP;
if (rsp->second.future.use_count() > 1 && rsp->second.future->getvalptr()) {
*rsp->second.future->getvalptr() = std::move(rsp->second.valstack[0].val);
return interpret(state, rsp->second.future->getvalptr());
}
else {
return interpret(state, nullptr);
}
++stats.nbInRSP;
return interpret(state);
}
return 1; // indicates we are not finished
......@@ -544,12 +546,21 @@ int zcore::InterpCtxLocal::processReqData(Global::conn_id_t peerid,
}
int zcore::InterpCtxLocal::interpret(InterpState& state)
int zcore::InterpCtxLocal::interpret(InterpState& state, const val::Value* retval)
{
s = &state;
sigint = 0;
while (s->k->next && !sigint) {
try {
if (retval && retval->which() == val::vt_error) {
const auto& ve = get<val::VError>(*retval);
retval = nullptr;
throw interp::RemoteErrorException(ve.what);
}
else {
retval = nullptr;
}
s->k = interp::step(s->k, s->fstack, *this);
}
catch (const interp::FutureException& e) {
......@@ -588,7 +599,7 @@ int zcore::InterpCtxLocal::interpret(InterpState& state)
// interpreter (for example it might have happened on the
// lower layers during a req/rsp, etc.); in this case we
// don't have a precise location, so we use the location of
// the control of the step which is better than nothing:
// the control of the step (which is better than nothing):
if (s->k->control->loc.begin.line) {
std::stringstream ss;
std::cerr << s->k->control->loc << ": ";
......@@ -609,6 +620,7 @@ int zcore::InterpCtxLocal::interpret(InterpState& state)
for (auto &elt : s->fstack) { elt->clear(); }
states.erase(state.reqid);
s = nullptr;
ir.enableKeyboardPoll();
return 0;
}
}
......@@ -619,6 +631,12 @@ int zcore::InterpCtxLocal::interpret(InterpState& state)
if (lv.which() == val::vt_future) {
return 1;
}
if (lv.which() == val::vt_error) {
auto ve = get<val::VError>(lv);
std::cerr << "Error: " << std::flush;
std::cout << val::display(lv) << std::endl;
r->add(".Last.error", ve);
}
else {
std::cout << val::display(lv) << std::endl;
}
......@@ -650,7 +668,7 @@ void zcore::InterpCtxLocal::sendGcStateMessage(const string& ip,
/// pass the iterator to InterpState for easier deletion! LLL
int zcore::InterpCtxRemote::interpret(InterpState& state)
int zcore::InterpCtxRemote::interpret(InterpState& state, const val::Value* retval)
{
s = &state;
sigint = 0;
......@@ -708,7 +726,7 @@ int zcore::InterpCtxRemote::interpret(InterpState& state)
auto n = ir.sendRsp(state.peerid,
state.reqid,
state.sourceid,
val::VError{"Remote error: "s + locInfo + e.what()});
val::VError{"remote: "s + locInfo + e.what()});
++stats.nbOutRSP;
stats.bytesOutRSP += n;
states.erase(state.reqid);
......@@ -738,7 +756,7 @@ int zcore::InterpCtxRemote::interpret(InterpState& state)
}
if (val::any_of(rsp, [](const val::Value& v) { return !zcore::isTransmissible(v); })) {
auto n = ir.sendRsp(state.peerid, state.reqid, state.sourceid,
val::VError{"Remote error: untransmissible type ("s
val::VError{"remote: untransmissible type ("s
+ std::to_string(rsp.which()) + ")"});
++stats.nbOutRSP;
stats.bytesOutRSP += n;
......@@ -766,7 +784,7 @@ void zcore::InterpCtxRemote::sendGcStateMessage(const string& ip,
auto n = ir.sendRsp(state.peerid,
state.reqid,
state.sourceid,
val::VError{"Remote error: response time out from "s + ip + ":"
val::VError{"remote: response time out from "s + ip + ":"
+ std::to_string(port) + " [" + std::to_string(peerid) + ']'});
++stats.nbOutRSP;
stats.bytesOutRSP += n;
......@@ -787,7 +805,7 @@ void zcore::InterpCtxTimer::removeTimer(val::SpTimer& tmr) {
/// pass the iterator to InterpState for easier deletion! LLL
int zcore::InterpCtxTimer::interpret(InterpState& state)
int zcore::InterpCtxTimer::interpret(InterpState& state, const val::Value* retval)
{
s = &state;
sigint = 0;
......
......@@ -170,7 +170,7 @@ namespace zcore {
val::SpFuture& future);
void setStop();
virtual int interpret(InterpState& state) = 0;
virtual int interpret(InterpState& state, const val::Value* retval=nullptr) = 0;
/// When deleting a state, sends or displays an error message.
virtual void sendGcStateMessage(const string& ip,
......@@ -212,7 +212,7 @@ namespace zcore {
InterpCtxLocal(MsgHandlerBase& ir_p, interp::shpfrm& global) :
InterpCtx(ir_p, global) { }
virtual int interpret(InterpState& state);
virtual int interpret(InterpState& state, const val::Value* retval=nullptr);
virtual void sendGcStateMessage(const string& ip,
int port,
Global::conn_id_t peerid,
......@@ -227,7 +227,7 @@ namespace zcore {
InterpCtxRemote(MsgHandlerBase& ir_p, interp::shpfrm& global) :
InterpCtx(ir_p, global) { }
virtual int interpret(InterpState& state);
virtual int interpret(InterpState& state, const val::Value* retval=nullptr);
virtual void sendGcStateMessage(const string& ip,
int port,
Global::conn_id_t peerid,
......@@ -239,7 +239,7 @@ namespace zcore {
InterpCtx(ir_p, global), timer_wptr(timer_p) { timer_p->start(); }
virtual void removeTimer(val::SpTimer& tmr);
virtual int interpret(InterpState& state);
virtual int interpret(InterpState& state, const val::Value* retval=nullptr);
virtual void sendGcStateMessage(const string& ip,
int port,
Global::conn_id_t peerid,
......
......@@ -30,6 +30,10 @@ namespace interp {
FutureException(const std::string& what_p) : std::out_of_range(what_p) { }
};
struct RemoteErrorException : public std::out_of_range {
RemoteErrorException(const std::string& what_p) : std::out_of_range(what_p) { }
};
struct EvalException : public std::out_of_range {
EvalException(const std::string& what_p, const yy::location& loc_p) :
std::out_of_range(what_p), loc(loc_p) { }
......
// (C) 2017 Leonardo Silvestri
//
// This file is part of ztsdb.
//
// ztsdb is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// ztsdb is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with ztsdb. If not, see <http://www.gnu.org/licenses/>.
#include "load_function.hpp"
#include "parser_ctx.hpp"
#include "anf.hpp"
// #define DEBUG
void core::loadFunctions(interp::BaseFrame* r) {
ParserCtx pctx;
auto res = pctx.parse(std::make_shared<std::string>
("function(X, FUN) {"
" l <- list() \n"
" for (i in X) { l <- c(l, FUN(i)) } \n"
" l }"));
if (res == 0) {
anf::convertToANF(pctx.prog.get());
const auto f = static_cast<const Function*>(pctx.prog.get()->begin->e);
r->add("lapply"s, val::Value(std::make_shared<val::VClos>(f)));
}
}
// (C) 2017 Leonardo Silvestri
//
// This file is part of ztsdb.
//
// ztsdb is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// ztsdb is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with ztsdb. If not, see <http://www.gnu.org/licenses/>.
#ifndef LOAD_FUNCTION
#define LOAD_FUNCTION
#include "env.hpp"
namespace core {
void loadFunctions(interp::BaseFrame* r);
}
#endif
......@@ -46,7 +46,7 @@ extern zlog::Logger lg;
static void sigint_handler(int signum)
{
std::cout << "signum: " << signum << std::endl;
// std::cout << "signum: " << signum << std::endl;
zcore::InterpCtx::sigint = 1;
zcore::MsgHandler::waitingOnResp = 0;
}
......
......@@ -338,7 +338,7 @@ namespace val {
/// difficult to know the exact Value location of a future in
/// advance, a future is created in two steps. The memory position
/// is created here and then the Value location will be updated
/// with 'updateValuePtr'.
/// with 'setvalptr'.
VFuture() : val(nullptr) { }
void setvalptr(val::Value& val_p, const std::shared_ptr<interp::BaseFrame>& frame_p);
......@@ -421,7 +421,8 @@ namespace val {
string operator()(const std::shared_ptr<VClos>&) const { return "function"; }
string operator()(const VConn&) const { return "connection"; }
string operator()(const VTimer&) const { return "timer"; }
string operator()(const VBuiltinG&) const { return "builtin"; }
string operator()(const SpBuiltin&) const { return "builtin"; }
string operator()(const SpFuture&) const { return "future"; }
string operator()(const SpVAD&) const { return "double"; }
string operator()(const SpVAS&) const { return "character"; }
string operator()(const SpVAB&) const { return "logical"; }
......
......@@ -25,6 +25,7 @@
#include <sys/eventfd.h>
#include "timezone/zone.hpp"
#include "load_builtin.hpp"
#include "load_function.hpp"
#include "cmdline.h" // generated by gengetopt
#include "anf.hpp"
#include "parser_ctx.hpp"
......@@ -203,6 +204,7 @@ int main(int argc, char *argv[]) {
// load predefined functions in global env:
core::loadBuiltinFunctions(base.get());
core::loadFunctions(base.get());
defineConstants(base.get());
// set the signal mask before running the comm thread:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment