simple_append.c 4.19 KB
Newer Older
1
// -*- compile-command: "gcc -g simple_append.c -o simple_append -lztsdb_client -lboost_system" -*-
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30

// Copyright (C) 2017 Leonardo Silvestri
//
// This file is part of ztsdb.
//
// ztsdb is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// ztsdb is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with ztsdb.  If not, see <http://www.gnu.org/licenses/>.


#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <strings.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
31
#include "ztsdb/zc.h"
32 33


34
static void simple_append(const char* ip, int port, const char** names, size_t nameslen, size_t ncols)
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
{
  // open TCP connection:
  struct sockaddr_in addr;
  bzero(&addr, sizeof(addr));
  addr.sin_family = AF_INET;
  addr.sin_addr.s_addr=inet_addr(ip);
  addr.sin_port=htons(port);
  int fd = socket(AF_INET, SOCK_STREAM, 0);
  if (fd == -1) {
    fprintf(stderr, "'socket' error: %s\n", strerror(errno));
  }
  int cres = connect(fd, (struct sockaddr *)&addr, sizeof(struct sockaddr_in));
  if (cres == -1) {
    close(fd);
    fprintf(stderr, "'connect' error: %s\n", strerror(errno));
  }

  // create and populate a data vector:
  double* data = malloc(sizeof(double) * ncols);
  size_t j;
  for (j=0; j<ncols; ++j) {
    data[j] = j;   // set at position j the value j
  }


  // get a time: ztsdb requires increasing values of time in a time
  // series; using a monotonic clock guarantees that, but
  // unfortunately, it's not required to be a time since Epoch (in
  // Linux, it's a value since boot time); here we build a monotonic
  // timestamp from the realtime. The clock is a bit off, but it
65
  // serves its purpose and this is a valid method for multiple
66
  // appends to a time-series from a single thread; if some kind of
67 68
  // synchronization must be achived between multiple
  // threads/processes, one can get the machine uptime in order to
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
  // build a monotonic timestamp on a given machine:
  struct timespec monotonic;
  clock_gettime(CLOCK_MONOTONIC_RAW, &monotonic);
  int64_t timestamp_mono = monotonic.tv_sec * 1000000000L + monotonic.tv_nsec;
  struct timespec realtime;
  clock_gettime(CLOCK_REALTIME, &realtime);
  int64_t timestamp_real = realtime.tv_sec * 1000000000L + realtime.tv_nsec;
  int64_t timestamp_diff = timestamp_real - timestamp_mono;

  // then this process could repeatedly build timestamps like this:
  int64_t timestamp = timestamp_mono + timestamp_diff;
  
  // create the append message:
  char* buf;
  size_t buflen;
84 85 86 87 88
  int res = make_append_msg(names, nameslen, &timestamp, 1, data, ncols, &buf, &buflen);
  if (res < 0) {
    fprintf(stderr, "res error: %d\n", res);
    exit(res);
  }
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
  free(data);
  
  // send it:
  ssize_t wres = write(fd, buf, buflen);
  free(buf);
  if (wres < 0) {
    fprintf(stderr, "'write' error: %s\n", strerror(errno));
  }

  if (close(fd) < 0) {
    fprintf(stderr, "'close' error: %s\n", strerror(errno));
  }
}


// params are
// 1. IP
// 2. port
// 3. name of variable to append to (assumed to be a zts)
// 4. number of columns to append
int main(int argc, char* argv[]) {
110
  enum { IP=1, PORT, VARNAMES, NCOLS };
111 112 113 114 115 116 117 118 119 120 121

  // grab a message rate (# per second)
  if (argc != 5) {
    fprintf(stderr, "usage: %s <ip> <port> <varname> <ncols>\n", argv[0]);
    return -1;
  }
  
  int port = atoi(argv[PORT]);
  char* endptr;
  size_t ncols = strtoull(argv[NCOLS], &endptr, 10);

122 123 124 125 126 127 128 129 130 131 132
  const unsigned MAX_DEPTH = 100;
  const unsigned MAX_LEN   = 255;
  const char* names[MAX_LEN+1];
  unsigned n = 0;
  char *p;
  for (p = strtok(argv[VARNAMES], ","); p != NULL; p = strtok(NULL, ",")) {
    names[n] = strndup(p, MAX_LEN);
    ++n;
  }
  
  simple_append(argv[IP], port, names, n, ncols);
133

134 135 136 137 138
  unsigned i;
  for (i=0; i<n; ++i) {
    free((void*)names[i]);
  }
  
139 140
  return 0;
}