Commit 86cdf8f2 by Leonardo Silvestri

nested (list) append

1 parent 5c39b66f
include ../../src/Makefile.header
COREDIR =../../src
COREDIR =../..
CPPFLAGS += -I$(COREDIR)
%.o : $(COREDIR)/%.cpp
......
# Usage
On a ztsdb instance, create an empty time-series; in the example below
we create a list containing a time series:
data <- matrix(0, 0, 3, dimnames=list(NULL, c("a","b","c")))
idx <- as.time(NULL)
a <<- list(b=zts(idx, data))
In a terminal, run 'append', making sure to use the port on which the
ztsdb instance is listening (in this example we use 10000):
./append 127.0.0.1 10000 10 a,b 3 100
This will run append generating 10 messages per second until 100
messages have been sent, updating element 'b' of list 'a' which has 3
columns.
......@@ -25,6 +25,7 @@
#include <sys/epoll.h>
#include <string>
#include <iostream>
#include <sstream>
#include <limits>
#include <ztsdb/zcpp.hpp>
#include <ztsdb/cow_ptr.hpp>
......@@ -33,7 +34,7 @@
static void loop_append(const std::string ip,
int port,
size_t rate,
const std::string varname,
const std::vector<std::string>& names,
size_t ncols,
size_t max_msg)
{
......@@ -60,11 +61,19 @@ static void loop_append(const std::string ip,
throw std::system_error(std::error_code(errno, std::system_category()), "timerfd_create");
}
itimerspec tmr;
bzero(&tmr, sizeof(tmr));
tmr.it_value.tv_sec = 0;
tmr.it_value.tv_nsec = 1;
tmr.it_interval.tv_sec = 0;
tmr.it_interval.tv_nsec = 1e9/rate;
bzero(&tmr, sizeof(tmr));
if (rate == 1) {
tmr.it_value.tv_sec = 1;
tmr.it_value.tv_nsec = 0;
tmr.it_interval.tv_sec = 1;
tmr.it_interval.tv_nsec = 0;
}
else {
tmr.it_value.tv_sec = 0;
tmr.it_value.tv_nsec = 1e9/rate;
tmr.it_interval.tv_sec = 0;
tmr.it_interval.tv_nsec = 1e9/rate;
}
if (timerfd_settime(msg_timerfd, 0, &tmr, NULL) == -1) {
throw std::system_error(std::error_code(errno, std::system_category()), "timerfd_settime");
}
......@@ -127,7 +136,7 @@ static void loop_append(const std::string ip,
}
// create and send the append message:
auto msg = arr::make_append_msg(varname,
auto msg = arr::make_append_msg(names,
arr::Vector<Global::dtime>{now},
data);
nmsgs++;
......@@ -168,12 +177,12 @@ static void loop_append(const std::string ip,
// 5. number of columns to append
// 6. maximum number of messages to send
int main(int argc, char* argv[]) {
enum { IP=1, PORT, RATE, VARNAME, NCOLS, MAX_MSG };
enum { IP=1, PORT, RATE, VARNAMES, NCOLS, MAX_MSG };
// grab a message rate (# per second)
if (argc < 6 || argc > 7) {
std::cerr << "usage: " << argv[0]
<< " <ip> <port> <rate> <varname> <ncols> [max-msgs]" << std::endl;
<< " <ip> <port> <rate> <varname[,name1,name2,...]> <ncols> [max-msgs]" << std::endl;
return -1;
}
......@@ -183,7 +192,13 @@ int main(int argc, char* argv[]) {
size_t max_msg = argc == 6 ? std::numeric_limits<size_t>::max() :
std::stoll(argv[MAX_MSG]); // -1 means run forever
loop_append(argv[IP], port, rate, argv[VARNAME], ncols, max_msg);
std::istringstream varnames_ss(argv[VARNAMES]);
std::string token;
std::vector<std::string> names;
while (std::getline(varnames_ss, token, ',')) {
names.push_back(token);
}
loop_append(argv[IP], port, rate, names, ncols, max_msg);
return 0;
}
include ../../src/Makefile.header
COREDIR =../../src
COREDIR =../..
CPPFLAGS += -I$(COREDIR)
%.o : $(COREDIR)/%.cpp
......
......@@ -62,7 +62,9 @@ static void simple_append(const std::string ip,
}
// create the append message:
auto msg = arr::make_append_msg(varname, std::vector<Global::dtime>{now}, data);
auto msg = arr::make_append_msg(std::vector<std::string>{varname},
std::vector<Global::dtime>{now},
data);
// send it:
ssize_t wres = write(fd, msg.first.get(), msg.second);
......
include ../../src/Makefile.header
LDFLAGS += -L/home/lsilvest/repos/ztsdb/shlib_client -lztsdb_client
COREDIR =../../src
LDFLAGS += -L../../shlib_client -lztsdb_client
COREDIR =../..
CPPFLAGS += -I$(COREDIR)
......
......@@ -31,7 +31,7 @@
#include "ztsdb/zc.h"
static void simple_append(const char* ip, int port, const char* varname, size_t ncols)
static void simple_append(const char* ip, int port, const char** names, size_t nameslen, size_t ncols)
{
// open TCP connection:
struct sockaddr_in addr;
......@@ -81,7 +81,11 @@ static void simple_append(const char* ip, int port, const char* varname, size_t
// create the append message:
char* buf;
size_t buflen;
int res = make_append_msg(varname, &timestamp, 1, data, ncols, &buf, &buflen);
int res = make_append_msg(names, nameslen, &timestamp, 1, data, ncols, &buf, &buflen);
if (res < 0) {
fprintf(stderr, "res error: %d\n", res);
exit(res);
}
free(data);
// send it:
......@@ -103,7 +107,7 @@ static void simple_append(const char* ip, int port, const char* varname, size_t
// 3. name of variable to append to (assumed to be a zts)
// 4. number of columns to append
int main(int argc, char* argv[]) {
enum { IP=1, PORT, VARNAME, NCOLS };
enum { IP=1, PORT, VARNAMES, NCOLS };
// grab a message rate (# per second)
if (argc != 5) {
......@@ -115,7 +119,22 @@ int main(int argc, char* argv[]) {
char* endptr;
size_t ncols = strtoull(argv[NCOLS], &endptr, 10);
simple_append(argv[IP], port, argv[VARNAME], ncols);
const unsigned MAX_DEPTH = 100;
const unsigned MAX_LEN = 255;
const char* names[MAX_LEN+1];
unsigned n = 0;
char *p;
for (p = strtok(argv[VARNAMES], ","); p != NULL; p = strtok(NULL, ",")) {
names[n] = strndup(p, MAX_LEN);
++n;
}
simple_append(argv[IP], port, names, n, ncols);
unsigned i;
for (i=0; i<n; ++i) {
free((void*)names[i]);
}
return 0;
}
......@@ -185,9 +185,7 @@ namespace arr {
/// Construct an array from file. Note that the data must have
/// been tested to determine typename 'T'. 'mapped' indicates if
/// the array is to remain mmapped to the file or if it should
/// simply be read in memory (not implemented yet LLL).
/// been tested to determine typename 'T'.
Array(std::unique_ptr<AllocFactory>&& allocf_p)
: allocf(std::move(allocf_p))
{
......
......@@ -182,21 +182,53 @@ size_t zcore::InterpCtx::readRspData(Global::reqid_t reqid,
}
static ssize_t readHeader(const char* buf,
size_t len,
size_t& off,
val::Value& val,
std::shared_ptr<interp::BaseFrame>& r) {
// we need to check throughout here that we are not going futher than slen!!! LLL
// find the string name:
auto slen = ntoh64(*(reinterpret_cast<const uint64_t*>(buf)));
auto ns = slen >> 32;
slen &= 0xffffffff;
off += sizeof(uint64_t);
// the first name must be a variable name retrievable from the global environment:
auto sz = (buf + off++)[0];
const string s(buf + off, buf + off + sz);
val = r->global->find(s);
off += sz + 1;
// subsequent names must be list elements:
for (size_t i=1; i<ns; ++i) {
if (val.which() != val::vt_list) {
lg.log(zlog::SV_DEBUG, "invalid append: incorrect type");
return -1;
}
auto& l = get<val::SpVList>(val);
auto sz = (buf + off++)[0];
const string s(buf + off, buf + off + sz);
val = (*l)[s];
off += sz + 1;
}
off = slen;
return 0;
}
ssize_t zcore::InterpCtx::readAppendData(const char* buf, size_t len) {
#ifdef DEBUG
cout << "InterpCtx::readAppendData():" << endl;
// cout << printBuf(buf, len) << endl;
#endif
// find the string name:
size_t off = 0;
auto slen = ntoh64(*(reinterpret_cast<const uint64_t*>(buf)));
off += sizeof(uint64_t);
const string s(buf + off, buf + off + slen);
static const unsigned STRALIGN = 8; // find this constant somewhere globally !!! LLL
off += getAlignedLength(s.size(), STRALIGN);
try {
auto val = r->global->find(s);
size_t off = 0;
val::Value val;
auto res = readHeader(buf, len, off, val, r);
if (res < 0) return res;
switch(val.which()) {
case val::vt_zts:
get<val::SpZts>(val).get()->append(buf + off, len - off, off); // get() to avoid the copy
......@@ -219,6 +251,7 @@ ssize_t zcore::InterpCtx::readAppendData(const char* buf, size_t len) {
// can't do strings efficiently...
// we should specialize encoding functions to prevent strings being encoded
default:
// don't want to log, but instead increase a stat!!! LLL
lg.log(zlog::SV_DEBUG, "invalid append: incorrect type");
return -1;
}
......@@ -239,16 +272,13 @@ ssize_t zcore::InterpCtx::readAppendVectorData(const char* buf, size_t len) {
cout << "InterpCtx::readAppendVectorData():" << endl;
// cout << printBuf(buf, len) << endl;
#endif
// find the string name:
size_t off = 0;
auto slen = ntoh64(*(reinterpret_cast<const uint64_t*>(buf)));
off += sizeof(uint64_t);
const string s(buf + off, buf + off + slen);
static const unsigned STRALIGN = 8;
off += getAlignedLength(s.size(), STRALIGN);
try {
auto val = r->global->find(s);
size_t off = 0;
val::Value val;
auto res = readHeader(buf, len, off, val, r);
if (res < 0) return res;
switch(val.which()) {
case val::vt_zts:
get<val::SpZts>(val).get()->appendVector(buf + off, len - off); // get() to avoid the copy
......
......@@ -112,6 +112,10 @@ std::string val::VFuture::to_string() const {
val::VList::VList() : a(arr::rsv, {0}) { }
val::VList::VList(const Array<Value>& a_p) : a(a_p) { }
val::VList::VList(const VList& l) : a(l.a) { }
val::Value val::VList::operator[](const std::string& s) {
auto idx = a.getNames(0)[s];
return (*a.v[0].get())[idx];
}
// to reduce boilerplate switch code:
......
......@@ -379,6 +379,7 @@ namespace val {
inline Value operator[](arr::idx_type i) const {
return a[i];
}
Value operator[](const std::string& s);
Array<Value> a;
};
......
......@@ -217,7 +217,7 @@ namespace arr {
};
// redefine these so as not to take into account ordering:
// we redefine these so as not to take into account ordering:
template <>
inline void setv(Vector<val::Value>& v, size_t i, const val::Value& t) {
if (i >= v.size()) throw std::range_error("subscript out of bounds");
......
......@@ -24,7 +24,7 @@
extern "C" {
#endif
int make_append_msg(const char* name,
int make_append_msg(const char* name[], size_t nameslen,
const int64_t* time, size_t tlen,
const double* data, size_t len,
char** buf, size_t* buflen);
......
......@@ -22,14 +22,21 @@
#include "net_handler.hpp"
size_t arr::getHeaderLength(const std::string& name) {
const unsigned alen = getAlignedLength(name.size(), Global::STRALIGN);
static const auto MAXLEN = 255;
size_t arr::getHeaderLength(const std::vector<std::string>& names) {
// note + 2: 1 byte for size and one byte for terminating 0
auto combined_length = accumulate(names.begin(), names.end(), 0.0,
[](size_t s, const std::string& e) { return s + e.size() + 2; });
const unsigned alen = getAlignedLength(combined_length, Global::STRALIGN);
return net::INIT_OFFSET + sizeof(Global::MsgType) + sizeof(size_t) + alen;
}
void arr::writeHeader(Global::buflen_pair& buf, Global::MsgType msgtype, const std::string& name) {
size_t offset = 0;
static void writeHeaderHeader(Global::buflen_pair& buf,
Global::MsgType msgtype,
size_t& offset) {
auto magicnb = hton64(Global::MAGICNB);
memcpy(buf.first.get() + offset, &magicnb, sizeof(Global::MAGICNB));
offset += sizeof(Global::MAGICNB);
......@@ -39,46 +46,114 @@ void arr::writeHeader(Global::buflen_pair& buf, Global::MsgType msgtype, const s
auto msgtype64 = hton64(static_cast<uint64_t>(msgtype));
memcpy(buf.first.get() + offset, &msgtype64, sizeof(msgtype));
offset += sizeof(msgtype);
}
const size_t namesz = hton64(name.size());
memcpy(buf.first.get() + offset, &namesz, sizeof(namesz));
offset += sizeof(namesz);
memcpy(buf.first.get() + offset, name.c_str(), name.size());
offset += name.size();
size_t paddingsz = getAlignedLength(name.size(), Global::STRALIGN) - name.size();
static void writeLengthAndPadding(size_t nameslen,
Global::buflen_pair& buf,
size_t& offset,
uint64_t namesz_loc) {
size_t paddingsz = getAlignedLength(offset - namesz_loc, Global::STRALIGN) - (offset - namesz_loc);
memset(buf.first.get() + offset, 0, paddingsz);
// now write the number of strings and the total length:
const uint64_t namessz = (offset - namesz_loc + paddingsz) | (nameslen << 32);
const uint64_t namessz_hton = hton64(namessz);
memcpy(buf.first.get() + namesz_loc, &namessz_hton, sizeof(namessz_hton));
offset += sizeof(namessz);
}
int arr::writeHeader(Global::buflen_pair& buf,
Global::MsgType msgtype,
const std::vector<std::string>& names) {
size_t offset = 0;
writeHeaderHeader(buf, msgtype, offset);
const uint64_t namesz_loc = offset; // remember where to write length information
offset += sizeof(uint64_t);
for (const auto& n : names) {
if (n.size() > MAXLEN) {
return -1;
}
buf.first.get()[offset++] = n.size();
memcpy(buf.first.get() + offset, n.c_str(), n.size());
offset += n.size();
buf.first.get()[offset++] = 0;
}
writeLengthAndPadding(names.size(), buf, offset, namesz_loc);
return 0;
}
// C linkage functions -------------------
int make_append_msg(const char* name,
const int64_t* idx, size_t ilen,
const double* data, size_t len,
static size_t getHeaderLength(const char** names, size_t nameslen) {
// note + 2: 1 byte for size and one byte for terminating 0
size_t combined_length = 0;
for (size_t i=0; i<nameslen; ++i) {
combined_length += strnlen(names[i], MAXLEN) + 2;
}
const unsigned alen = getAlignedLength(combined_length, Global::STRALIGN);
return net::INIT_OFFSET + sizeof(Global::MsgType) + sizeof(size_t) + alen;
}
static int writeHeader(Global::buflen_pair& buf,
Global::MsgType msgtype,
const char** names,
size_t nameslen) {
size_t offset = 0;
writeHeaderHeader(buf, msgtype, offset);
const uint64_t namesz_loc = offset; // remember where to write length information
offset += sizeof(uint64_t);
for (size_t i = 0; i<nameslen; ++i) {
auto slen = strnlen(names[i], MAXLEN+1);
if (slen == MAXLEN+1) {
return -1;
}
buf.first.get()[offset++] = slen;
memcpy(buf.first.get() + offset, names[i], slen);
offset += slen;
buf.first.get()[offset++] = 0;
}
writeLengthAndPadding(nameslen, buf, offset, namesz_loc);
return 0;
}
int make_append_msg(const char* names[], size_t nameslen,
const int64_t* idx, size_t idxlen,
const double* data, size_t datalen,
char** buf, size_t* buflen)
{
if (ilen == 0 || len % ilen) {
if (idxlen == 0 || datalen % idxlen) {
cout << idxlen << " " << datalen << endl;
return -1;
}
for (size_t i=1; i<ilen; ++i) {
for (size_t i=1; i<idxlen; ++i) {
if (idx[i-1] >= idx[i]) {
return -2;
}
}
const auto headersz = arr::getHeaderLength(name);
const auto datasz = len*sizeof(double);
const auto headersz = getHeaderLength(names, nameslen);
const auto datasz = datalen*sizeof(double);
const auto rawvecsz = sizeof(arr::RawVector<double>);
const auto idxdatasz = ilen*sizeof(Global::dtime);
const auto idxdatasz = idxlen*sizeof(Global::dtime);
const auto totalsz = headersz + rawvecsz + idxdatasz + rawvecsz + datasz;
auto buf_p = std::make_pair(std::make_unique<char[]>(totalsz), totalsz);
arr::writeHeader(buf_p, Global::MsgType::APPEND_VECTOR, name);
if (writeHeader(buf_p, Global::MsgType::APPEND_VECTOR, names, nameslen) < 0) {
return -3;
}
const arr::RawVector<Global::dtime> idx_rv{arr::TypeNumber<Global::dtime>::n, ilen, 1};
const arr::RawVector<Global::dtime> idx_rv{arr::TypeNumber<Global::dtime>::n, idxlen, 1};
memcpy(buf_p.first.get() + headersz, &idx_rv, rawvecsz);
memcpy(buf_p.first.get() + headersz + rawvecsz, idx, idxdatasz);
const arr::RawVector<double> v_rv{arr::TypeNumber<double>::n, len, 1};
const arr::RawVector<double> v_rv{arr::TypeNumber<double>::n, datalen, 1};
memcpy(buf_p.first.get() + headersz + rawvecsz + idxdatasz, &v_rv, rawvecsz);
memcpy(buf_p.first.get() + headersz + rawvecsz + idxdatasz + rawvecsz, data, datasz);
......@@ -91,7 +166,7 @@ int make_append_msg(const char* name,
// C++ functions with declaration using only C++ stdlib -------------------
Global::buflen_pair arr::make_append_msg(const std::string& name,
Global::buflen_pair arr::make_append_msg(const std::vector<std::string>& names,
const arr::Vector<Global::dtime>& idx,
const arr::Vector<double>& v)
{
......@@ -105,13 +180,13 @@ Global::buflen_pair arr::make_append_msg(const std::string& name,
throw std::out_of_range("make_append_msg: idx must be sorted");
}
const auto headersz = getHeaderLength(name);
const auto headersz = getHeaderLength(names);
const auto datasz = v.size()*sizeof(double);
const auto rawvecsz = sizeof(RawVector<double>);
const auto idxdatasz = idx.size()*sizeof(Global::dtime);
const auto totalsz = headersz + rawvecsz + idxdatasz + rawvecsz + datasz;
auto buf = std::make_pair(std::make_unique<char[]>(totalsz), totalsz);
writeHeader(buf, Global::MsgType::APPEND_VECTOR, name);
writeHeader(buf, Global::MsgType::APPEND_VECTOR, names);
memcpy(buf.first.get() + headersz, idx.getRawVectorPtr(), rawvecsz);
memcpy(buf.first.get() + headersz + rawvecsz, &idx.front(), idxdatasz);
memcpy(buf.first.get() + headersz + rawvecsz + idxdatasz, v.getRawVectorPtr(), rawvecsz);
......@@ -120,7 +195,7 @@ Global::buflen_pair arr::make_append_msg(const std::string& name,
}
Global::buflen_pair arr::make_append_msg(const std::string& name,
Global::buflen_pair arr::make_append_msg(const std::vector<std::string>& names,
const std::vector<Global::dtime>& idx,
const std::vector<double>& v)
{
......@@ -136,13 +211,13 @@ Global::buflen_pair arr::make_append_msg(const std::string& name,
}
}
const auto headersz = getHeaderLength(name);
const auto headersz = getHeaderLength(names);
const auto datasz = v.size()*sizeof(double);
const auto rawvecsz = sizeof(RawVector<double>);
const auto idxdatasz = idx.size()*sizeof(Global::dtime);
const auto totalsz = headersz + rawvecsz + idxdatasz + rawvecsz + datasz;
auto buf = std::make_pair(std::make_unique<char[]>(totalsz), totalsz);
writeHeader(buf, Global::MsgType::APPEND_VECTOR, name);
writeHeader(buf, Global::MsgType::APPEND_VECTOR, names);
const RawVector<Global::dtime> idx_rv{TypeNumber<Global::dtime>::n, idx.size(), 1};
memcpy(buf.first.get() + headersz, &idx_rv, rawvecsz);
......
......@@ -35,47 +35,47 @@
namespace arr {
size_t getHeaderLength(const std::string& name);
void writeHeader(Global::buflen_pair& buf,
Global::MsgType msgtype,
const std::string& name);
size_t getHeaderLength(const std::vector<std::string>& name);
int writeHeader(Global::buflen_pair& buf,
Global::MsgType msgtype,
const std::vector<std::string>& names);
template<typename T>
Global::buflen_pair make_append_msg(const std::string& name, const arr::Array<T>& a)
Global::buflen_pair make_append_msg(const std::vector<std::string>& names, const arr::Array<T>& a)
{
if (a.size() == 0) {
throw std::out_of_range("make_append_msg: no data");
}
const auto headersz = getHeaderLength(name);
const auto headersz = getHeaderLength(names);
const auto totalsz = headersz + a.getBufferSize();
auto buf = std::make_pair(std::make_unique<char[]>(totalsz), totalsz);
a.to_buffer(buf.first.get() + headersz);
writeHeader(buf, Global::MsgType::APPEND, name);
writeHeader(buf, Global::MsgType::APPEND, names);
return buf;
}
template<typename T>
Global::buflen_pair make_append_msg(const std::string& name, const Vector<T>& v)
Global::buflen_pair make_append_msg(const std::vector<std::string>& names, const Vector<T>& v)
{
if (v.size() == 0) {
throw std::out_of_range("make_append_msg: no data");
}
const auto headersz = getHeaderLength(name);
const auto headersz = getHeaderLength(names);
const auto totalsz = headersz + v.getBufferSize();
auto buf = std::make_pair(std::make_unique<char[]>(totalsz), totalsz);
writeHeader(buf, Global::MsgType::APPEND_VECTOR, name);
v.to_buffer(buf.first.get() + headersz);
writeHeader(buf, Global::MsgType::APPEND_VECTOR, names);
return buf;
}
Global::buflen_pair make_append_msg(const std::string& name,
Global::buflen_pair make_append_msg(const std::vector<std::string>& names,
const Vector<Global::dtime>& idx,
const Vector<double>& v);
// specialize the above with string and zstring so that it fails... LLL
Global::buflen_pair make_append_msg(const string& name, const arr::zts& z);
Global::buflen_pair make_append_msg(const std::vector<string>& name, const arr::zts& z);
}
......
......@@ -25,7 +25,7 @@
namespace arr {
Global::buflen_pair make_append_msg(const std::string& name,
Global::buflen_pair make_append_msg(const std::vector<std::string>& name,
const std::vector<Global::dtime>& idx,
const std::vector<double>& v);
......
......@@ -19,7 +19,7 @@
#include "zcpp_zts.hpp"
Global::buflen_pair arr::make_append_msg(const string& name, const arr::zts& z) {
Global::buflen_pair arr::make_append_msg(const std::vector<std::string>& name, const arr::zts& z) {
auto buf = z.to_buffer(getHeaderLength(name));
writeHeader(buf, Global::MsgType::APPEND, name);
return buf;
......
......@@ -35,7 +35,7 @@
namespace arr {
Global::buflen_pair make_append_msg(const string& name, const arr::zts& z);
Global::buflen_pair make_append_msg(const std::vector<std::string>& name, const arr::zts& z);
}
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!