Commit a53f7c98 authored by Alan Szlosek's avatar Alan Szlosek

split timeseries code by data type

parent 33e19b4f
Pipeline #11702156 passed with stage
in 51 seconds
......@@ -4,7 +4,10 @@ module.exports = {
Since: require('./lib/since'),
Digest: require('./lib/digest'),
Timeseries: require('./lib/timeseries'),
Timeseries: {
Counter: require('./lib/timeseries/counter'),
Timer: require('./lib/timeseries/timer')
},
Input: {
StatsD: require('./lib/input/statsd'),
......
......@@ -3,7 +3,9 @@ var log = require('./log')('timeseries', false);
/*
20170824 - Dreaming up new interface and semantics. StatsD count and timer terminology can be confusing.
20170824 - Dreaming up new interface and semantics. StatsD count and timer terminology can be confusing, I feel mainly because it hides the details behind new terms that devs don't take the time to learn.
Timeseries is intended to store and run calculations on timeseries data, similar to what Graphite or InFluxDb can do.
......@@ -62,13 +64,18 @@ type: count, timer
*/
Timeseries.prototype._getKeyPrefix = function(prefix) {
return this.namespace + (prefix || '');
};
// NEW METHODS
// TODO: need to set a scheduler to clean up count buckets somehow
// Casual scan over keys for type and use hrem
Timeseries.prototype.count = function(unixTimestamp, count, callback) {
Timeseries.prototype.increment = function(unixTimestamp, count, callback) {
var self = this;
var key = this._getCountKey();
var key = this._getKeyPrefix();
log('hincrby', key, unixTimestamp, count);
self.redisClient.hincrby(
key,
......@@ -77,40 +84,14 @@ Timeseries.prototype.count = function(unixTimestamp, count, callback) {
callback
);
};
Timeseries.prototype.time = function(unixTimestamp, value, callback) {
var self = this;
var key = this._getTimeKey(unixTimestamp);
log('lpush', key, value);
this.redisClient.lpush(
key,
value,
function(err) {
if (err) {
callback(err);
return;
}
self.redisClient.expire(key, self.ttl);
callback(err, value);
}
);
};
Timeseries.prototype._getCountKey = function() {
return this.namespace + ':count';
};
Timeseries.prototype._getTimeKey = function(unixTimestamp) {
return this.namespace + ':time:' + unixTimestamp;
};
Timeseries.prototype.count = Timeseries.prototype.increment;
// Return an array of counts for each seconds bucket from start to end
// Returns 0 for seconds buckets that have no data
Timeseries.prototype.getCounts = function(startSeconds, endSeconds, callback) {
Timeseries.prototype.getValues = function(startSeconds, endSeconds, callback) {
var self = this;
var args = [
this._getCountKey()
this._getKeyPrefix()
];
for (var i = startSeconds; i <= endSeconds; i++) {
args.push( i );
......@@ -146,6 +127,33 @@ Timeseries.prototype.getCountStatistics = function(startSeconds, endSeconds, cal
};
Timeseries.prototype.push = function(unixTimestamp, value, callback) {
var self = this;
var key = this._getKeyPrefix(unixTimestamp);
log('lpush', key, value);
this.redisClient.lpush(
key,
value,
function(err) {
if (err) {
callback(err);
return;
}
self.redisClient.expire(key, self.ttl);
callback(err, value);
}
);
};
Timeseries.prototype.time = Timeseries.prototype.push;
// Return an array of times for each seconds bucket from start to end
Timeseries.prototype.getTimes = function(startSeconds, endSeconds, callback) {
var self = this;
......
var stats = require('statistics');
var log = require('../log')('timeseries', false);
/*
20170910 - still don't like the "counts and timer" terminology. Maybe Counter and List. because that's waht it is: a value to increment, and a list of values. Both are integers. Don't neessarily have to be, but that's what they are currently.
Counter must be an integer
Timer/Duration/
Timeseries is intended to store and run calculations on timeseries data, similar to what Graphite or InFluxDb can do.
Data types:
Counter
Key/value pair where the value is incremented or decremented. Think "website page hit counter"
Duration/Time
Key->[ [timestamp, value], ...] mapping. Useful if you want to track how long an operation takes. Maybe fetchUser() takes 100ms today, but only took 10ms a month ago. Durations help you see that.
*/
/*
METRIC TYPES
For all metric types, the key prefix includes the unix timestamp for which values are being added
Count
- Implemented using hashes
- Incremented using hIncrBy
- Need to implement a cleanup method, since we can't rely on redis to expire hash keys at diff rates
Time
- Implemented using lists
Gauge
- not yet implemented
Set
- not yet implemented
FEATURES
* All functions should return number of values that were present in Redis
* All get*() functions should return the number of seconds that we have data for. You don't want to alert on a "metric above average" if there's only 1 data point.
*/
var Timeseries = function(redisClient, ttl, namespace) {
this.redisClient = redisClient;
this.namespace = namespace || 'ht:'; // default to Harbinger Timeseries as namespace
this.ttl = ttl;
};
module.exports = Timeseries;
// Use a zset to sum values into time buckets
/*
Still unsure how to handle multiple types yet: counters, timers, etc
But this method could be used by getAverage() to get the sum and number of items
type: count, timer
*/
Timeseries.prototype._getKeyPrefix = function(prefix) {
return this.namespace + ':counter:' + (prefix ? prefix + ':' : '');
};
// NEW METHODS
// TODO: need to set a scheduler to clean up count buckets somehow
// Casual scan over keys for type and use hrem
Timeseries.prototype.add = function(unixTimestamp, count, callback) {
var self = this;
var key = this._getKeyPrefix();
log('hincrby', key, unixTimestamp, count);
self.redisClient.hincrby(
key,
unixTimestamp,
count,
callback
);
};
// Return an array of counts for each seconds bucket from start to end
// Returns 0 for seconds buckets that have no data
Timeseries.prototype.getValues = function(startSeconds, endSeconds, callback) {
var self = this;
var args = [
this._getKeyPrefix()
];
for (var i = startSeconds; i <= endSeconds; i++) {
args.push( i );
}
// From the hash, get values for all the unixTimestamps we care about
self.redisClient.hmget(args, function(err, value) {
if (err) {
callback(err);
return;
}
value = value.map(function(item) {
var i = parseInt(item);
return isNaN(i) ? 0 : i;
});
// Replace nulls with 0?
callback(null, value);
});
};
/*
Use this to get sum, mean/average, standard deviation, etc
TODO: Use Lua script to calculate and cache these stats within Redis
*/
Timeseries.prototype.getStatistics = function(startSeconds, endSeconds, callback) {
this.getValues(startSeconds, endSeconds, function(err, counts) {
if (err) {
callback(err);
return;
}
callback(null, counts.reduce(stats) );
});
};
var stats = require('statistics');
var log = require('../log')('timeseries', false);
/*
20170824 - Dreaming up new interface and semantics. StatsD count and timer terminology can be confusing, I feel mainly because it hides the details behind new terms that devs don't take the time to learn.
Timeseries is intended to store and run calculations on timeseries data, similar to what Graphite or InFluxDb can do.
Data types:
Count/Increment
Key/value pair where the value is incremented or decremented. Think "website page hit counter"
Duration/Time
Key->[ [timestamp, value], ...] mapping. Useful if you want to track how long an operation takes. Maybe fetchUser() takes 100ms today, but only took 10ms a month ago. Durations help you see that.
*/
/*
METRIC TYPES
For all metric types, the key prefix includes the unix timestamp for which values are being added
Count
- Implemented using hashes
- Incremented using hIncrBy
- Need to implement a cleanup method, since we can't rely on redis to expire hash keys at diff rates
Time
- Implemented using lists
Gauge
- not yet implemented
Set
- not yet implemented
FEATURES
* All functions should return number of values that were present in Redis
* All get*() functions should return the number of seconds that we have data for. You don't want to alert on a "metric above average" if there's only 1 data point.
*/
var Timeseries = function(redisClient, ttl, namespace) {
this.redisClient = redisClient;
this.namespace = namespace || 'ht:'; // default to Harbinger Timeseries as namespace
this.ttl = ttl;
};
module.exports = Timeseries;
// Use a zset to sum values into time buckets
/*
Still unsure how to handle multiple types yet: counters, timers, etc
But this method could be used by getAverage() to get the sum and number of items
type: count, timer
*/
Timeseries.prototype._getKeyPrefix = function(prefix) {
return this.namespace + ':timer:' + (prefix ? prefix + ':' : '');
};
Timeseries.prototype.add = function(unixTimestamp, value, callback) {
var self = this;
var key = this._getKeyPrefix(unixTimestamp);
log('lpush', key, value);
this.redisClient.lpush(
key,
value,
function(err) {
if (err) {
callback(err);
return;
}
self.redisClient.expire(key, self.ttl);
callback(err, value);
}
);
};
Timeseries.prototype.time = Timeseries.prototype.push;
// Return an array of times for each seconds bucket from start to end
Timeseries.prototype.getValues = function(startSeconds, endSeconds, callback) {
var self = this;
var keys = [];
for (var i = startSeconds; i <= endSeconds; i++) {
keys.push( i );
}
var out = [];
var next = function() {
var key;
if (keys.length == 0) {
// Done
// Calculate the average now
callback(null, out);
return;
}
seconds = keys.shift();
key = self._getKeyPrefix(seconds);
// Fetch from redis
log('fetching: ', key);
self.redisClient.lrange([key, 0, -1], function(err, values) {
if (err) {
callback(err);
return;
}
log(values);
for (var i = 0; i < values.length; i++) {
out.push( parseInt(values[i]) );
}
next();
});
};
// Should perhaps cache the output of this so frequent pulls for this exact interval don't have to recalculate
next();
};
Timeseries.prototype.getStatistics = function(startSeconds, endSeconds, callback) {
this.getValues(startSeconds, endSeconds, function(err, times) {
if (err) {
callback(err);
return;
}
callback(null, times.reduce(stats) );
});
};
\ No newline at end of file
......@@ -8,6 +8,5 @@
"devDependencies": {
"tap": "^10.7.2"
},
"scripts": {
}
"scripts": {}
}
......@@ -14,6 +14,7 @@ redisClient.on("end", function(err) {
*/
// call func args.length times with each args index array as args
var series = function(func, thisarg, args, callback) {
var out = [];
var trap = function(err) {
......@@ -55,52 +56,27 @@ var done = function(test, redisClient) {
};
tap.test('timeseries.count.single', function(test) {
var self = this;
var redisClient = redis.createClient(6379, 'redis');
var ts = new Harbinger.Timeseries(redisClient, 60, 'testing');
var secondsBucket = moment().unix() - 100;
test.plan(2);
ts.count(secondsBucket, 3, function(err) {
if (err) {
done(test, redisClient);
return;
}
ts.getCounts(secondsBucket, secondsBucket, function(err, result) {
if (err) {
console.log(err);
done(test, redisClient);
return;
}
test.equals(1, result.length);
test.equals(3, result[0]);
done(test, redisClient);
});
});
});
// Calculate Error Rate every second, alert if rate goes above N
/*
To calculate the error rate, we have to read from multiple keys. Can use Redis mget command to do so
*/
tap.test('timeseries.getCounts', function(test) {
tap.test('timeseries.counts.add', function(test) {
var self = this;
var redisClient = redis.createClient(6379, 'redis');
var ts = new Harbinger.Timeseries(redisClient, 60, 'testing2');
var ts = new Harbinger.Timeseries.Counter(redisClient, 60, 'testing2');
var secondsBucket = moment().unix() - 200;
test.plan(2);
series(
Harbinger.Timeseries.prototype.count,
Harbinger.Timeseries.Counter.prototype.add,
ts,
[
[secondsBucket, 3],
[secondsBucket + 2, 4]
[secondsBucket, 1],
[secondsBucket, 2],
[secondsBucket + 2, 1],
[secondsBucket + 2, 2]
],
function(err, out) {
if (err) {
......@@ -110,7 +86,7 @@ tap.test('timeseries.getCounts', function(test) {
}
test.ok(true);
ts.getCounts(
ts.getValues(
secondsBucket,
secondsBucket + 2,
function(err, out) {
......@@ -119,7 +95,7 @@ tap.test('timeseries.getCounts', function(test) {
done(test, redisClient);
return;
}
test.equals( out.join(','), '3,0,4');
test.equals( out.join(','), '3,0,3');
done(test, redisClient);
}
);
......@@ -129,16 +105,16 @@ tap.test('timeseries.getCounts', function(test) {
});
tap.test('timeseries.getCountStatistics', function(test) {
tap.test('timeseries.counts.statistics', function(test) {
var self = this;
var redisClient = redis.createClient(6379, 'redis');
var ts = new Harbinger.Timeseries(redisClient, 60, 'testing2');
var ts = new Harbinger.Timeseries.Counter(redisClient, 60, 'testing2');
var secondsBucket = moment().unix() - 300;
test.plan(4);
series(
Harbinger.Timeseries.prototype.count,
Harbinger.Timeseries.Counter.prototype.add,
ts,
[
[secondsBucket, 3],
......@@ -151,7 +127,7 @@ tap.test('timeseries.getCountStatistics', function(test) {
}
test.ok(true);
ts.getCountStatistics(
ts.getStatistics(
secondsBucket,
secondsBucket + 2,
function(err, stats) {
......@@ -171,16 +147,16 @@ tap.test('timeseries.getCountStatistics', function(test) {
});
tap.test('timeseries.getTimes', function(test) {
tap.test('timeseries.timers.add', function(test) {
var self = this;
var redisClient = redis.createClient(6379, 'redis');
var ts = new Harbinger.Timeseries(redisClient, 60, 'testing-times');
var ts = new Harbinger.Timeseries.Timer(redisClient, 60, 'testing-times');
var secondsBucket = moment().unix() - 200;
test.plan(2);
series(
Harbinger.Timeseries.prototype.time,
Harbinger.Timeseries.Timer.prototype.add,
ts,
[
[secondsBucket, 200],
......@@ -194,7 +170,7 @@ tap.test('timeseries.getTimes', function(test) {
}
test.ok(true);
ts.getTimes(
ts.getValues(
secondsBucket,
secondsBucket + 2,
function(err, out) {
......@@ -212,3 +188,46 @@ tap.test('timeseries.getTimes', function(test) {
);
});
tap.test('timeseries.timers.statistics', function(test) {
var self = this;
var redisClient = redis.createClient(6379, 'redis');
var ts = new Harbinger.Timeseries.Timer(redisClient, 60, 'testing2');
var secondsBucket = moment().unix() - 300;
test.plan(4);
series(
Harbinger.Timeseries.Timer.prototype.add,
ts,
[
[secondsBucket, 1],
[secondsBucket, 2],
[secondsBucket + 2, 3],
[secondsBucket + 2, 4]
],
function(err, out) {
if (err) {
done(test, redisClient);
return;
}
test.ok(true);
ts.getStatistics(
secondsBucket,
secondsBucket + 2,
function(err, stats) {
if (err) {
done(test, redisClient);
return;
}
test.equals(2.5, stats.mean);
test.equals(4, stats.count);
test.equals(10, stats.sum);
done(test, redisClient);
}
);
}
);
});
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment