Commit a548ab89 authored by Alan Szlosek's avatar Alan Szlosek

use hashes for timeseries counters, start phasing out throttle in favor of digest

parent 750af719
Pipeline #8095096 passed with stage
in 1 minute and 17 seconds
......@@ -85,11 +85,9 @@ Run `npm test` to run unit tests. A local Redis server is required for the throt
IN PROGRESS
====
I'm currently working on a Timeseries module. I envision it being used like so:
1. Metrics from StatsD are stored in Redis using Timeseries module
2. Timeseries module is configured to calculate the average of those metrics (or other statistics) at a given interval
3. Callback function checks the average and triggers an email if it reaches a certain threshold
* Ditch Throttle for a Digest module. If dev asked for five minute digests, they would trigger at 0:00, 0:05, etc
* Rework Timeseries module to use Redis scripts (in lua) to calculcate statistics
* Once that's done, create an example that alerts when error rate reaches a threshold. Error rate is calculated using request count and error count.
TODO
......
......@@ -13,11 +13,13 @@ module.exports = {
syslog: require('./lib/syslog'),
/*
Throttle: {
// The API for these is inconsistent
InMemory: require('./lib/throttle/memory')
//InRedis: require('./lib/throttle/redis')
},
*/
watchFile: require('./lib/input/file-watcher'),
......
......@@ -2,27 +2,37 @@ var stats = require('statistics');
var log = require('./log')('timeseries', false);
/*
NOTES
METRIC TYPES
For all metric types, the key prefix includes the second the values are being added
Count
- Implemented using hashes
- Incremented using hIncrBy
- Need to implement a cleanup method, since we can't rely on redis to expire hash keys at diff rates
It's possible we can leverage zset and lists in redis for count values.
Perhaps push values onto a list, and increment a score for the second timestamp for each item pushed.
Can we then fetch lexicographically from the zset, get the scores, calculate the list offsets and use lrange?
Might result in our lrange being off by a bit if addCount operations happen between our zset and list fetches.
Time
- Implemented using lists
Gauge
- not yet implemented
Set
- not yet implemented
FEATURES
* All functions should return number of values that were present in Redis
* All functions should return the number of seconds that we have data for. You don't want to alert on a "metric above average" if there's only 1 data point.
* All get*() functions should return the number of seconds that we have data for. You don't want to alert on a "metric above average" if there's only 1 data point.
*/
// This might need to be TimeseriesSum or something
var Timeseries = function(redisClient, key, cutoff) {
var Timeseries = function(redisClient, ttl, namespace) {
this.redisClient = redisClient;
this.key = 'timeseries:' + key;
this.cutoff = cutoff;
this.namespace = namespace || 'ht:'; // default to Harbinger Timeseries as namespace
this.ttl = ttl;
};
module.exports = Timeseries;
......@@ -36,159 +46,92 @@ type: count, timer
*/
Timeseries.prototype.getValues = function(startSeconds, endSeconds, type, callback) {
// NEW METHODS
// TODO: need to set a scheduler to clean up count buckets somehow
// Casual scan over keys for type and use hrem
Timeseries.prototype.count = function(secondsBucket, count, callback) {
var self = this;
// earliestSeconds will be repeatedly updated with the min timestamp value we encounter from Redis.
// Default it to endSeconds in case there is no data
var earliestSeconds = endSeconds; //Number.MAX_SAFE_INTEGER;
var keys = [];
for (var i = startSeconds; i <= endSeconds; i++) {
keys.push( i );
//keys.push( this.key + ':' + type + ':' + i);
}
var key = this._getCountKey();
log('hincrby', key, secondsBucket, count);
self.redisClient.hincrby(
key,
secondsBucket,
count,
callback
);
};
var out = [];
var next = function() {
var key;
if (keys.length == 0) {
// Done
// Calculate the average now
callback(null, out, endSeconds - earliestSeconds);
return;
}
seconds = keys.shift();
key = self.key + ':' + type + ':' + seconds;
// Fetch from redis
log('fetching: ', key);
self.redisClient.lrange([key, 0, -1], function(err, values) {
Timeseries.prototype.time = function(secondsBucket, value, callback) {
var self = this;
var key = this._getTimeKey(secondsBucket);
log('lpush', key, value);
this.redisClient.lpush(
key,
value,
function(err) {
if (err) {
callback(err);
return;
}
if (values.length > 0) {
earliestSeconds = Math.min(earliestSeconds, seconds);
}
log(values);
for (var i = 0; i < values.length; i++) {
out.push( parseFloat(values[i]) );
}
next();
});
};
self.redisClient.expire(key, self.ttl);
callback(err, value);
}
);
};
// Should perhaps cache the output of this so frequent pulls for this exact interval don't have to recalculate
next();
Timeseries.prototype._getCountKey = function() {
return this.namespace + ':count';
};
Timeseries.prototype._getTimeKey = function(secondsBucket) {
return this.namespace + ':time:' + secondsBucket;
};
Timeseries.prototype.getSum = function(startSeconds, endSeconds, type, callback) {
// Return an array of counts for each seconds bucket from start to end
// Returns 0 for seconds buckets that have no data
Timeseries.prototype.getCounts = function(startSeconds, endSeconds, callback) {
var self = this;
// earliestSeconds will be repeatedly updated with the min timestamp value we encounter from Redis.
// Default it to endSeconds in case there is no data
var earliestSeconds = endSeconds;
var keys = [];
var args = [
this._getCountKey()
];
for (var i = startSeconds; i <= endSeconds; i++) {
keys.push( i );
args.push( i );
}
var sum = 0.00;
var itemCount = 0;
var next = function() {
var seconds;
var key;
if (keys.length == 0) {
// Done
// Calculate the average now
callback(null, sum, itemCount, endSeconds - earliestSeconds);
self.redisClient.hmget(args, function(err, value) {
if (err) {
callback(err);
return;
}
seconds = keys.shift();
key = self.key + ':' + type + ':' + seconds;
// Fetch from redis
log('fetching: ', key);
self.redisClient.lrange([key, 0, -1], function(err, values) {
if (err) {
callback(err);
return;
}
log(values);
if (values.length > 0) {
earliestSeconds = Math.min(earliestSeconds, seconds);
}
itemCount += values.length;
for (var i = 0; i < values.length; i++) {
sum += parseFloat(values[i]);
}
next();
value = value.map(function(item) {
var i = parseInt(item);
return isNaN(i) ? 0 : i;
});
};
// Should perhaps cache the output of this so frequent pulls for this exact interval don't have to recalculate
next();
};
Timeseries.prototype.getCountAverage = function(startSeconds, endSeconds, callback) {
// Should we cache this result?
this.getSum(
startSeconds,
endSeconds,
'count',
function(err, sum, items, secondsSpanned) {
if (err) {
callback(err);
return;
}
callback(null, sum / items, secondsSpanned);
}
);
// Replace nulls with 0?
callback(null, value);
});
};
/*
METRIC TYPES
For all metric types, the key prefix includes the second the values are being added
Count
- Believe we can use incrBy
Time
- Believe we need to use a list for these, so we can accurately calculate avg and other stats
Use this to get sum, mean/average, standard deviation, etc
TODO: Use Lua script to calculate and cache these stats within Redis
*/
// NEW METHODS
Timeseries.prototype.count = function(secondsBucket, count, callback) {
var self = this;
var key = this.key + ':count:' + secondsBucket;
log('incrBy', key, count);
self.redisClient.incrby(
key,
count,
function(err) {
self.redisClient.expire(key, self.cutoff);
Timeseries.prototype.getCountStatistics = function(startSeconds, endSeconds, callback) {
this.getCounts(startSeconds, endSeconds, function(err, counts) {
if (err) {
callback(err);
return;
}
);
};
Timeseries.prototype.time = function(secondsBucket, value) {
var self = this;
var key = this.key + ':time:' + secondsBucket;
log('lpush', key, value);
this.redisClient.lpush(
key,
value,
function(err) {
self.redisClient.expire(key, self.cutoff);
}
);
callback(null, counts.reduce(stats) );
});
};
// Return an array of counts for each seconds bucket from start to end
Timeseries.prototype.getCounts = function(startSeconds, endSeconds, callback) {
// Return an array of times for each seconds bucket from start to end
Timeseries.prototype.getTimes = function(startSeconds, endSeconds, callback) {
var self = this;
var keys = [];
for (var i = startSeconds; i <= endSeconds; i++) {
......@@ -206,20 +149,18 @@ Timeseries.prototype.getCounts = function(startSeconds, endSeconds, callback) {
return;
}
seconds = keys.shift();
key = self.key + ':count:' + seconds;
key = self._getTimeKey(seconds);
// Fetch from redis
log('fetching: ', key);
// TODO: should probably use mget instead
self.redisClient.get(key, function(err, value) {
self.redisClient.lrange([key, 0, -1], function(err, values) {
if (err) {
callback(err);
return;
}
if (value) {
out.push( parseInt(value) );
} else {
out.push(0);
log(values);
for (var i = 0; i < values.length; i++) {
out.push( parseInt(values[i]) );
}
next();
});
......@@ -229,21 +170,3 @@ Timeseries.prototype.getCounts = function(startSeconds, endSeconds, callback) {
next();
};
/*
Use this to get sum, mean/average, standard deviation, etc
*/
Timeseries.prototype.getCountStatistics = function(startSeconds, endSeconds, callback) {
this.getCounts(startSeconds, endSeconds, function(err, counts) {
if (err) {
callback(err);
return;
}
callback(null, counts.reduce(stats) );
});
};
/*
*/
......@@ -86,7 +86,6 @@ module.exports = {
lb.data("is Harbinger\n");
},
/*
syslog: {
setUp: function(callback) {
callback();
......@@ -95,6 +94,7 @@ module.exports = {
callback();
},
/*
singleLines: function(test) {
var dt = new Date();
var dateTime = moment(dt).format('MMM D H:mm:ss');
......@@ -124,10 +124,11 @@ module.exports = {
test.done();
}
*/
},
*/
/*
throttle: {
setUp: function(callback) {
callback();
......@@ -143,7 +144,7 @@ module.exports = {
var message = {
name: "Test"
};
var timeout = 1;
var timeout = 2;
var key = 'test:throttle-memory:' + timeout;
var callback = function(msg, count) {
......@@ -179,7 +180,6 @@ module.exports = {
);
},
/*
redis: function(test) {
var redisClient = redis.createClient(6379, 'redis');
redisClient.on("error", function (err) {
......@@ -233,11 +233,9 @@ module.exports = {
(timeout * 1000) + 1000
);
}
*/
},
/*
statsd: {
setUp: function(callback) {
this.server = new harbinger.Input.StatsD();
......@@ -296,10 +294,10 @@ module.exports = {
count: {
single: function(test) {
var self = this;
var ts = new Harbinger.Timeseries(this.redisClient, 'testing', 60);
var ts = new Harbinger.Timeseries(this.redisClient, 60, 'testing');
var secondsBucket = moment().unix() - 100;
test.expect(1);
test.expect(2);
ts.count(secondsBucket, 3, function(err) {
if (err) {
......@@ -307,12 +305,14 @@ module.exports = {
return;
}
self.redisClient.get('timeseries:testing:count:' + secondsBucket, function(err, result) {
ts.getCounts(secondsBucket, secondsBucket, function(err, result) {
if (err) {
console.log(err);
test.done();
return;
}
test.equals(3, result);
test.equals(1, result.length);
test.equals(3, result[0]);
test.done();
});
});
......@@ -327,8 +327,9 @@ module.exports = {
},
multiple: function(test) {
var self = this;
var ts = new Harbinger.Timeseries(this.redisClient, 'testing2', 60);
var ts = new Harbinger.Timeseries(this.redisClient, 60, 'testing2');
var secondsBucket = moment().unix() - 200;
test.expect(2);
......@@ -367,7 +368,7 @@ module.exports = {
},
statistics: function(test) {
var self = this;
var ts = new Harbinger.Timeseries(this.redisClient, 'testing2', 60);
var ts = new Harbinger.Timeseries(this.redisClient, 60, 'testing2');
var secondsBucket = moment().unix() - 300;
test.expect(4);
......@@ -407,6 +408,47 @@ module.exports = {
},
times: function (test) {
var self = this;
var ts = new Harbinger.Timeseries(this.redisClient, 60, 'testing-times');
var secondsBucket = moment().unix() - 200;
test.expect(2);
series(
Harbinger.Timeseries.prototype.time,
ts,
[
[secondsBucket, 200],
[secondsBucket + 2, 900]
],
function(err, out) {
if (err) {
console.log('Err', err);
test.done();
return;
}
test.ok(true);
ts.getTimes(
secondsBucket,
secondsBucket + 2,
function(err, out) {
if (err) {
console.log('Err2', err);
test.done();
return;
}
test.equals( out.join(','), '200,900');
test.done();
}
);
}
);
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment