Commit 96a1eb1d authored by Alexander Færøy's avatar Alexander Færøy
Browse files

Add new subsystem: Stats Reporter.

This subsystem allow Tor users to have a running instance of tor emit
stats events to either graphite or statsd.
parent 54e25ab1
Pipeline #18279064 failed with stage
in 52 minutes and 3 seconds
Stats Reporter
==============
FIXME(ahf): Remove this document.
## TODO
- StatsReporter connections should reconnect when they disconnect from their
service.
- We should support UDP and UNIX sockets for StatsReporter services.
- Add a `stats_entry_t` instance which supports wrapping a function `size_t
fun(void)` to support a pull-model for counters until things are able to use
the `stats_entry` API.
- Fix layering violations between `src/common/stats_reporter.c` and its
dependencies on things in `src/or/`.
......@@ -650,7 +650,7 @@ GENERAL OPTIONS
+
The currently recognized domains are: general, crypto, net, config, fs,
protocol, mm, http, app, control, circ, rend, bug, dir, dirserv, or, edge,
acct, hist, and handshake. Domain names are case-insensitive. +
acct, hist, handshake, and stats_reporter. Domain names are case-insensitive. +
+
For example, "`Log [handshake]debug [~net,~mm]info notice stdout`" sends
to stdout: all handshake messages of any severity, all info-and-higher
......
......@@ -96,6 +96,8 @@ LIBOR_A_SRC = \
src/common/util_format.c \
src/common/util_process.c \
src/common/sandbox.c \
src/common/stats_entry.c \
src/common/stat_store.c \
src/common/storagedir.c \
src/common/workqueue.c \
$(libor_extra_source) \
......@@ -179,6 +181,8 @@ COMMONHEADERS = \
src/common/procmon.h \
src/common/pubsub.h \
src/common/sandbox.h \
src/common/stats_entry.h \
src/common/stat_store.h \
src/common/storagedir.h \
src/common/testsupport.h \
src/common/timers.h \
......
......@@ -1263,7 +1263,7 @@ static const char *domain_list[] = {
"GENERAL", "CRYPTO", "NET", "CONFIG", "FS", "PROTOCOL", "MM",
"HTTP", "APP", "CONTROL", "CIRC", "REND", "BUG", "DIR", "DIRSERV",
"OR", "EDGE", "ACCT", "HIST", "HANDSHAKE", "HEARTBEAT", "CHANNEL",
"SCHED", "GUARD", "CONSDIFF", "DOS", NULL
"SCHED", "GUARD", "CONSDIFF", "DOS", "STATS_REPORTER", NULL
};
/** Return a bitmask for the log domain for which <b>domain</b> is the name,
......
/* Copyright (c) 2018, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file stat_store.c
* \brief XXX: Write a brief introduction to this module.
**/
#define STAT_STORE_PRIVATE
#include "stat_store.h"
#include "util.h"
#include "stats_entry.h"
#define PREFIX(s) ("tor." s)
static stats_entry_t **entries;
void
stat_store_init(void)
{
const size_t size = sizeof(stats_entry_t *) * STAT_LAST;
if (entries != NULL)
return;
entries = raw_malloc(size);
memset(entries, 0, size);
entries[STAT_MALLOC] = stats_entry_new_counter();
entries[STAT_CONNECTION_WRITTEN] = stats_entry_new_counter();
entries[STAT_CONNECTION_READ] = stats_entry_new_counter();
entries[STAT_MAIN_LOOP_SUCCESS] = stats_entry_new_counter();
}
void
stat_store_update(stat_key_t key, size_t value)
{
tor_assert(key < STAT_LAST);
if (entries == NULL)
stat_store_init();
stats_entry_t *entry = entries[key];
tor_assert(entry != NULL);
stats_entry_update(entry, value);
}
size_t
stat_store_get_value(stat_key_t key)
{
tor_assert(key < STAT_LAST);
if (entries == NULL)
stat_store_init();
stats_entry_t *entry = entries[key];
tor_assert(entry != NULL);
return stats_entry_get_value(entry);
}
void
stat_store_reset(stat_key_t key)
{
tor_assert(key < STAT_LAST);
if (entries == NULL)
stat_store_init();
stats_entry_t *entry = entries[key];
tor_assert(entry != NULL);
stats_entry_reset(entry);
}
void
stat_store_reset_all(void)
{
for (int i = 0; i < STAT_LAST; ++i)
stat_store_reset(i);
}
const char *
stat_store_key_string(stat_key_t key)
{
switch (key) {
case STAT_MALLOC:
return PREFIX("internal.memory.allocations");
case STAT_CONNECTION_WRITTEN:
return PREFIX("network.connection.written");
case STAT_CONNECTION_READ:
return PREFIX("network.connection.read");
case STAT_MAIN_LOOP_SUCCESS:
return PREFIX("internal.mainloop.successful");
/* Fall through. */
case STAT_LAST:
default:
goto err;
}
// LCOV_EXCL_START
err:
tor_assert(0);
return "";
// LCOV_EXCL_STOP
}
/* Copyright (c) 2018, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file stat_store.h
* \brief Header file for stat_store.c.
**/
#ifndef TOR_STAT_STORE_H
#define TOR_STAT_STORE_H
#include "torint.h"
#include "testsupport.h"
typedef enum {
/** Number of allocated bytes. */
STAT_MALLOC,
/** Number of bytes written on connections. */
STAT_CONNECTION_WRITTEN,
/** Number of bytes read on connections. */
STAT_CONNECTION_READ,
/** Main loop stats. */
STAT_MAIN_LOOP_SUCCESS,
/** Last element, make sure this is always the last element of stat_key_t. */
STAT_LAST
} stat_key_t;
void stat_store_init(void);
void stat_store_update(stat_key_t key, size_t value);
size_t stat_store_get_value(stat_key_t key);
void stat_store_reset(stat_key_t key);
void stat_store_reset_all(void);
const char *stat_store_key_string(stat_key_t key);
#endif /* TOR_STAT_STORE_H */
/* Copyright (c) 2018, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file stats_entry.c
* \brief XXX: Write a brief introduction to this module.
**/
#define STATS_ENTRY_PRIVATE
#include "stats_entry.h"
#include "util.h"
typedef struct stats_counter_t {
size_t value;
} stats_counter_t;
typedef struct stats_entry_t {
stats_type_t type;
union {
stats_counter_t counter;
} u;
} stats_entry_t;
static stats_entry_t *
stats_entry_new(stats_type_t type)
{
stats_entry_t *entry = NULL;
entry = raw_malloc(sizeof(stats_entry_t));
memset(entry, 0, sizeof(stats_entry_t));
entry->type = type;
return entry;
}
stats_entry_t *
stats_entry_new_counter(void)
{
stats_entry_t *counter;
counter = stats_entry_new(STATS_TYPE_COUNTER);
return counter;
}
void
stats_entry_free_(stats_entry_t *entry)
{
if (! entry)
return;
tor_free(entry);
}
stats_type_t
stats_entry_get_type(const stats_entry_t *entry)
{
tor_assert(entry);
return entry->type;
}
size_t
stats_entry_get_value(const stats_entry_t *entry)
{
tor_assert(entry);
switch (entry->type) {
case STATS_TYPE_COUNTER:
return entry->u.counter.value;
}
// LCOV_EXCL_START
tor_assert(0);
return 0;
// LCOV_EXCL_STOP
}
void
stats_entry_update(stats_entry_t *entry, size_t value)
{
tor_assert(entry);
switch (entry->type) {
case STATS_TYPE_COUNTER:
entry->u.counter.value += value;
break;
}
}
void
stats_entry_reset(stats_entry_t *entry)
{
tor_assert(entry);
switch (entry->type) {
case STATS_TYPE_COUNTER:
entry->u.counter.value = 0;
break;
}
}
/* Copyright (c) 2018, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file stats_entry.h
* \brief Header file for stats_entry.c.
**/
#ifndef TOR_STATS_ENTRY_H
#define TOR_STATS_ENTRY_H
#include "torint.h"
typedef enum {
STATS_TYPE_COUNTER
} stats_type_t;
typedef struct stats_entry_t stats_entry_t;
stats_entry_t *stats_entry_new_counter(void);
void stats_entry_free_(stats_entry_t *entry);
#define stats_entry_free(entry) FREE_AND_NULL(stats_entry_t, stats_entry_free_, (entry))
stats_type_t stats_entry_get_type(const stats_entry_t *entry);
size_t stats_entry_get_value(const stats_entry_t *entry);
void stats_entry_update(stats_entry_t *entry, size_t value);
void stats_entry_reset(stats_entry_t *entry);
#ifdef STATS_ENTRY_PRIVATE
#endif /* STATS_ENTRY_PRIVATE */
#endif /* TOR_STATS_ENTRY_H */
......@@ -105,8 +105,10 @@
#define LD_CONSDIFF (1u<<24)
/** Denial of Service mitigation. */
#define LD_DOS (1u<<25)
/** Stats reporter. */
#define LD_STATS_REPORTER (1u<<26)
/** Number of logging domains in the code. */
#define N_LOGGING_DOMAINS 26
#define N_LOGGING_DOMAINS 27
/** This log message is not safe to send to a callback-based logger
* immediately. Used as a flag, not a log domain. */
......
......@@ -24,6 +24,7 @@
#include "backtrace.h"
#include "util_process.h"
#include "util_format.h"
#include "stat_store.h"
#ifdef _WIN32
#include <io.h>
......@@ -150,6 +151,8 @@ tor_malloc_(size_t size DMALLOC_PARAMS)
result = raw_malloc(size);
#endif
stat_store_update(STAT_MALLOC, size);
if (PREDICT_UNLIKELY(result == NULL)) {
/* LCOV_EXCL_START */
log_err(LD_MM,"Out of memory on malloc(). Dying.");
......
......@@ -102,6 +102,7 @@
#include "routerset.h"
#include "scheduler.h"
#include "statefile.h"
#include "stats_reporter.h"
#include "transports.h"
#include "ext_orport.h"
#ifdef _WIN32
......@@ -383,6 +384,9 @@ static config_var_t option_vars_[] = {
V(HardwareAccel, BOOL, "0"),
V(HeartbeatPeriod, INTERVAL, "6 hours"),
V(MainloopStats, BOOL, "0"),
V(StatsReporter, LINELIST, NULL),
V(StatsReporterGranularity, INTERVAL, "30 seconds"),
V(StatsReporterPrefix, STRING, NULL),
V(AccelName, STRING, NULL),
V(AccelDir, FILENAME, NULL),
V(HashedControlPassword, LINELIST, NULL),
......@@ -1956,6 +1960,18 @@ options_act(const or_options_t *old_options)
return -1;
}
if (! options->DisableNetwork) {
for (cl = options->StatsReporter; cl; cl = cl->next) {
if (parse_stats_reporter_line(cl->value, 0) < 0) {
// LCOV_EXCL_START
log_warn(LD_BUG, "Previously validated StatsReporter line "
"could not be added.");
return -1;
// LCOV_EXCL_STOP
}
}
}
mark_transport_list();
pt_prepare_proxy_list_for_config_read();
if (!options->DisableNetwork) {
......@@ -4279,6 +4295,26 @@ options_validate(or_options_t *old_options, or_options_t *options,
smartlist_free(options_sl);
}
if (options->StatsReporterPrefix) {
if (! stats_reporter_is_valid_prefix(options->StatsReporterPrefix)) {
tor_asprintf(msg,
"StatsReporterPrefix '%s' must contain only the characters [a-zA-Z0-9.].",
options->StatsReporterPrefix);
return -1;
}
}
for (cl = options->StatsReporter; cl; cl = cl->next) {
if (parse_stats_reporter_line(cl->value, 1) < 0)
REJECT("Invalid stats reporter line. See logs for details.");
}
if (options->StatsReporterGranularity < MIN_STATS_REPORTER_GRANULARITY) {
log_warn(LD_CONFIG, "StatsReporterGranularity option is too short; "
"raising to %d seconds.", MIN_STATS_REPORTER_GRANULARITY);
options->StatsReporterGranularity = MIN_STATS_REPORTER_GRANULARITY;
}
if (options->ConstrainedSockets) {
/* If the user wants to constrain socket buffer use, make sure the desired
* limit is between MIN|MAX_TCPSOCK_BUFFER in k increments. */
......@@ -5985,6 +6021,71 @@ parse_bridge_line(const char *line)
return bridge_line;
}
/** Read the contents of a StatsReporter line from <b>line</b>. Return 0 if the
* line is well-formed, and -1 if it isn't.
**/
STATIC int
parse_stats_reporter_line(const char *line, int validate_only)
{
smartlist_t *items;
size_t items_count;
int r;
char *addr_port;
tor_addr_t addr;
uint16_t port;
char *protocol;
/* Split the line up. */
items = smartlist_new();
smartlist_split_string(items, line, NULL,
SPLIT_SKIP_SPACE|SPLIT_IGNORE_BLANK, -1);
items_count = smartlist_len(items);
if (items_count < 2) {
log_warn(LD_CONFIG, "Too few arguments on StatsReporter line.");
goto err;
}
/* Validate IP and port tuple. */
addr_port = smartlist_get(items, 0);
if (tor_addr_port_lookup(addr_port, &addr, &port) < 0) {
log_warn(LD_CONFIG, "Unable to parse StatsReporter address '%s'.",
addr_port);
goto err;
}
if (! port) {
log_warn(LD_CONFIG, "StatsReporter address '%s' is missing a port.",
addr_port);
goto err;
}
/* Validate reporter protocol. */
protocol = smartlist_get(items, 1);
if (! stats_reporter_is_valid_protocol(protocol)) {
log_warn(LD_CONFIG, "Unknown StatsReporter protocol '%s", protocol);
goto err;
}
/* Add the connection to the stats reporting subsystem. */
if (! validate_only)
stats_reporter_add_from_config(&addr, port, protocol);
r = 0;
goto done;
err:
r = -1;
done:
SMARTLIST_FOREACH(items, char*, s, tor_free(s));
smartlist_free(items);
return r;
}
/** Read the contents of a ClientTransportPlugin or ServerTransportPlugin
* line from <b>line</b>, depending on the value of <b>server</b>. Return 0
* if the line is well-formed, and -1 if it isn't.
......
......@@ -22,6 +22,10 @@
* expose more information than we're comfortable with. */
#define MIN_HEARTBEAT_PERIOD (30*60)
/** Lowest allowable value for StatsReporterGranularity.
* The value is defined in seconds. */
#define MIN_STATS_REPORTER_GRANULARITY (5)
MOCK_DECL(const char*, get_dirportfrontpage, (void));
MOCK_DECL(const or_options_t *, get_options, (void));
MOCK_DECL(or_options_t *, get_options_mutable, (void));
......@@ -239,6 +243,7 @@ STATIC int options_validate(or_options_t *old_options,
STATIC int parse_transport_line(const or_options_t *options,
const char *line, int validate_only,
int server);
STATIC int parse_stats_reporter_line(const char *line, int validate_only);
STATIC int consider_adding_dir_servers(const or_options_t *options,
const or_options_t *old_options);
STATIC void add_default_trusted_dir_authorities(dirinfo_type_t type);
......
......@@ -24,6 +24,7 @@
* <ul><li>entry_connection_t, also implemented in connection_edge.c
* </ul>
* <li>control_connection_t, implemented in control.c
* <li>stats_reporter_connection_t, implemented in FIXME(ahf).</li>
* </ul>
*
* The base type implemented in this module is responsible for basic
......@@ -101,6 +102,8 @@
#include "transports.h"
#include "routerparse.h"
#include "sandbox.h"
#include "stats_reporter.h"
#include "stat_store.h"
#ifdef HAVE_PWD_H
#include <pwd.h>
......@@ -189,6 +192,7 @@ conn_type_to_string(int type)
case CONN_TYPE_EXT_OR: return "Extended OR";
case CONN_TYPE_EXT_OR_LISTENER: return "Extended OR listener";
case CONN_TYPE_AP_HTTP_CONNECT_LISTENER: return "HTTP tunnel listener";
case CONN_TYPE_STATS_REPORTER: return "Stats reporter";
default:
log_warn(LD_BUG, "unknown connection type %d", type);
tor_snprintf(buf, sizeof(buf), "unknown [%d]", type);
......@@ -372,6 +376,18 @@ listener_connection_new(int type, int socket_family)
return listener_conn;
}
/** Allocate and return a new stats_reporter_connection_t, initialized as by
* connection_init(). */
stats_reporter_connection_t *
stats_reporter_connection_new(int socket_family)
{
stats_reporter_connection_t *stats_reporter_conn =
tor_malloc_zero(sizeof(stats_reporter_connection_t));
connection_init(time(NULL), TO_CONN(stats_reporter_conn),
CONN_TYPE_STATS_REPORTER, socket_family);
return stats_reporter_conn;
}
/** Allocate, initialize, and return a new connection_t subtype of <b>type</b>
* to make or receive connections of address family <b>socket_family</b>. The
* type should be one of the CONN_TYPE_* constants. */
......@@ -438,6 +454,9 @@ connection_init(time_t now, connection_t *conn, int type, int socket_family)
case CONN_TYPE_CONTROL:
conn->magic = CONTROL_CONNECTION_MAGIC;
break;
case CONN_TYPE_STATS_REPORTER:
conn->magic = STATS_REPORTER_CONNECTION_MAGIC;
break;
CASE_ANY_LISTENER_TYPE:
conn->magic = LISTENER_CONNECTION_MAGIC;
break;
......@@ -531,6 +550,11 @@ connection_free_minimal(connection_t *conn)
mem = TO_CONTROL_CONN(conn);
memlen = sizeof(control_connection_t);
break;
case CONN_TYPE_STATS_REPORTER:
tor_assert(conn->magic == STATS_REPORTER_CONNECTION_MAGIC);
mem = TO_STATS_REPORTER_CONN(conn);
memlen = sizeof(stats_reporter_connection_t);
break;
CASE_ANY_LISTENER_TYPE:
tor_assert(conn->magic == LISTENER_CONNECTION_MAGIC);
mem = TO_LISTENER_CONN(conn);
......@@ -1779,6 +1803,7 @@ connection_connect_sockaddr,(connection_t *conn,
const int proto = (sa->sa_family == AF_INET6 ||
sa->sa_family == AF_INET) ? IPPROTO_TCP : 0;
s = tor_open_socket_nonblocking(protocol_family, SOCK_STREAM, proto);
if (! SOCKET_OK(s)) {
/*
......@@ -4040,6 +4065,9 @@ connection_handle_write_impl(connection_t *conn, int force)
conn->n_written_conn_bw = UINT32_MAX;
}
stat_store_update(STAT_CONNECTION_READ, n_read);
stat_store_update(STAT_CONNECTION_WRITTEN, n_written);
connection_buckets_decrement(conn, approx_time(), n_read, n_written);
if (result > 0) {
......@@ -4632,6 +4660,9 @@ connection_process_inbuf(connection_t *conn, int package_partial)
return connection_dir_process_inbuf(TO_DIR_CONN(conn));
case CONN_TYPE_CONTROL:
return connection_control_process_inbuf(TO_CONTROL_CONN