Commit 4ede4c22 authored by Alan Szlosek's avatar Alan Szlosek

clean up examples, use digest, test digest cache limit

parent 9cba2609
Pipeline #13561727 failed with stage
in 1 minute and 33 seconds
/*
1. Our sample metric is called "sample"
2. Every second, calculate the average of the metric values over the past 10 seconds
3. Alert when the average is over 50
4. Populate the metric with random integers every 500ms
*/
var Harbinger = require('../index');
var log = require('../lib/log')('example-timeseries', true);
var moment = require('moment');
var redis = require('redis');
function getRandomInteger(min, max) {
return Math.floor(Math.random() * (max - min)) + min;
};
var redisClient = redis.createClient(6379, 'localhost');
// Instantiate a timeseries dataset called "sample" that preserves 60 seconds worth of data
var metric = new Harbinger.Timeseries(redisClient, 'sample', 60);
var averageInterval;
var countInterval;
var done = function() {
clearInterval(averageInterval);
clearInterval(countInterval);
redisClient.quit();
};
// Calculate the average of the metric every second, and print the average if it reaches our threshold
var averageInterval = setInterval(
function() {
// Exclude the current second to avoid partial calculations
var startSeconds = moment().subtract(11, 'seconds').unix();
var endSeconds = startSeconds + 10;
metric.getCountAverage(
startSeconds,
endSeconds,
function(err, avg, secondsSpanned) {
if (err) {
log(err);
done();
return;
}
log('Dataset spans ' + secondsSpanned + ' seconds. Avg: ' + avg);
if (secondsSpanned == 10 && avg > 51) {
log('Average above 51', avg);
done();
}
}
);
},
1000
);
// Fill the metric with values every 500ms
var countInterval = setInterval(
function() {
var rand = getRandomInteger(1, 100);
metric.count(moment().unix(), rand);
},
500
);
/*
NOTE: This code is old and confusing ... please see example-statsd.js for a clearer example
Example using Redis rollups, and support for sending alerts via email
Requirements: npm install nodemail
*/
var net = require('net');
var SyslogParser = require('./lib/syslog-parser');
var IfNotSeen = require('./lib/after');
var StatsD = require('./input/statsd');
var LineBuffer = require('./lib/line-buffer');
var Since = require('./lib/since');
var watchFile = require('./input/file-watcher');
var createMessage = require('./lib/message');
var redis = require('redis');
var nodemailer = require('nodemailer');
var transporter = nodemailer.createTransport('smtps://user:pass@smtp.example.com');
var debug = false;
var redisClient = redis.createClient(6379, 'localhost');
redisClient.on("error", function (err) {
console.log("Error " + err);
});
var Throttle = require('./lib/throttle-redis')(redisClient);
/*
Performance tuning notes
Allocate more memory to receive buffers. On older laptop 10240000 is enough to receive 10k udp messages a second
20480000 is enough to receive 30k per second
sysctl -w net.core.rmem_default=40960000
sysctl -w net.core.rmem_max=40960000
Configuration notes:
It's possible to make nginx log to journald via
error_log syslog:server=unix:/dev/log;
access_log syslog:server=unix:/dev/log;
That way I don't have to find another daemon like remote_syslog2
*/
var Times = {
day: 60 * 60 * 24,
hour: 60 * 60,
minute: 60,
second: 1
};
var Duration = Times;
var backends = {
console: function(message, extra) {
console.log(message.payload.time, message.payload.msg, 'x' + extra);
},
hipchat: function(message) {
},
email: function(msg) {
var message = {
text: msg.payload.msg,
from: "Alan Szlosek <alan@greaterscope.com>",
to: "alan.szlosek@gmail.com",
subject: "DevOps Alert",
};
// send the message and get a callback with an error or details of the message that was sent
transporter.sendMail(
message,
function(err, message) {
console.log(err || message);
}
);
}
};
var AlertIfSince = new Since(redisClient);
// Emit message to console every N seconds
var rollupToConsole = function(message, seconds) {
Rollup(
message,
seconds,
backends.console
);
};
var rollupToEmail = function(message, seconds) {
Rollup(
message,
seconds,
function(message) {
backends.console(message);
backends.email(message);
}
);
};
var transformAndDeliver = function(message) {
// TRANSFORMS AND DELIVERY DESTINATION
if (message.source == 'statsd') {
if (message.payload.metric.match(/error\.count$/)) {
Throttle(message.id + ':1s', message, 1 * Times.second, backends.console);
}
} else if (message.source == "syslog" ) {
// If syslog severity is 0 through 3, should alert. https://en.wikipedia.org/wiki/Syslog
// These message seem severe, but we may need to tune
if (0 <= message.payload.severity && message.payload.severity < 2) {
Throttle(message.id + ':1s', message, 1 * Times.second, backends.console);
}
if (message.payload.msg.match(/(PHP Fatal error)|(Uncaught )/i)) {
// This will have
Throttle(message.id + ':1s', message, 1 * Times.second, backends.console);
/*
} else if (message.payload.msg.match(/session (opened|closed)/)) {
Throttle(message.id + ':1s', message, 1 * Times.second, backends.console);
*/
} else if (message.payload.msg.match(/howdy/)) {
AlertIfSince.countForSeconds(message.id + ':5over10', message, 5, 10, function(message) {
// Throttle this alert
Throttle(message.id + ':5over10throttle60s', message, 60, backends.console);
});
} else if (message.payload.msg.match(/server reached pm\.max_children setting/)) {
Throttle(message.id + ':1s', message, 1 * Times.second, backends.console);
}
}
};
// SYSLOG INGESTERS
// OPTION 1: LISTEN FOR SYSLOG MESSAGES ON PORT 514
/*
var syslogServer = new net.Server();
var syslogPort = 10000;
syslogServer.on('connection', function(socket) {
var lineBuffer = new LineBuffer(function(line) {
// would be nice to have an option to pull "package/program[PID]" from tag
payload = SyslogParser.parse(line);
id = payload.msg;
message = createMessage(payload, 'syslog', socket.remoteAddress, socket.localAddress, socket.localPort);
transformAndDeliver(message);
});
socket
.on('data', function(data) {
lineBuffer.data(data);
});
});
syslog.listen(syslogPort, function() {
console.log('Listening for syslog messages on TCP 10000');
});
*/
// OPTION 2: LISTEN ON A UNIX DOMAIN SOCKET
//syslogServer.listen('/var/log/syslog');
// OPTION 3: TAIL LOG FILES
var files = [
'/var/log/auth.log',
'/var/log/syslog',
'/home/alan/coding/projects/devops-harbinger/test.log'
];
var watchSyslog = function(filename) {
// Tail /var/log/auth.log for input
var lb = new LineBuffer(function(line) {
if (debug) {
console.log('LineBuffer', line);
}
// would be nice to have an option to pull "package/program[PID]" from tag
payload = SyslogParser.parse(line);
id = payload.msg;
message = createMessage(payload, 'syslog'); //socket.remoteAddress, socket.localAddress, socket.localPort);
transformAndDeliver(message);
});
var watcher = watchFile(filename, function(err, data) {
if (err) {
console('watch-file error', err);
return;
}
lb.data(data);
});
};
for (var i = 0; i < files.length; i++) {
watchSyslog(files[i]);
}
// ACT AS A STATSD COLLECTOR, ACCEPT DATA FROM STATSD
var statsdPort = 8125;
StatsD(statsdPort, '127.0.0.1', function(error, metric, value, type, sampleRate, remoteAddress) {
var payload;
var message;
if (metric.match(/(app)\./)) {
payload = {
metric: metric,
value: value
};
message = createMessage(payload, 'statsd', remoteAddress, '0.0.0.0', statsdPort);
transformAndDeliver(message);
} else if (metric.match(/^(node-event-pipeline|namespace)\./)) {
payload = {
metric: metric,
value: value,
type: type
};
message = createMessage(payload, 'statsd', remoteAddress, '0.0.0.0', statsdPort);
transformAndDeliver(message);
}
});
......@@ -7,20 +7,20 @@ var slackToken = '';
var debug = true;
var backends = {
console: function(message, extra) {
console.log(message.payload.time, extra + 'x', message);
},
slack: function(message) {
//slack.chat.postMessage(slackToken, slackChannel, message.payload.message);
// ISO-8601 format is: YYYY-MM-DDTHH:mm:ss.sssZ
var digestPattern = /T\d\d:[012345]0:00.\d\d\dZ$/;
var everyTenMinutes = new Harbinger.Digest(
// evaluate every minute
false,
// pattern matches times every 10 minutes
digestPattern,
// only deliver 10 latest messages to callback
10,
// callback prints messages to console
function(counts, cache) {
console.log('Ten latest messages', cache);
}
};
);
var files = [
......@@ -36,8 +36,7 @@ var watchSyslog = function(filename) {
if (payload.message.search(/iptables_INPUT_denied/) != -1) {
var message = Harbinger.createMessage(payload, 'syslog', filename); //socket.remoteAddress, socket.localAddress, socket.localPort);
Harbinger.Throttle.InMemory('iptables input denied:slack', message, 10, backends.slack);
Harbinger.Throttle.InMemory('iptables input denied:console', message, 10, backends.console);
everyTenMinutes.add('iptables input denied', message);
}
});
var watcher = Harbinger.watchFile(filename, function(err, data) {
......
......@@ -22,9 +22,9 @@ tap.test('digest.second', function(test) {
digest.add(123, {message: 'Test'});
digest.add(123, {message: 'Another'});
});
tap.test('digest.five-second', function(test) {
test.plan(3);
......@@ -59,5 +59,26 @@ tap.test('digest.five-second', function(test) {
digest.add(messageId, messages[i]);
}
}
});
tap.test('digest.cache-size', function(test) {
test.plan(1);
var start = moment().unix();
var callback = function(counts, cache) {
var end = moment().unix();
// Make sure we've seen two instances of message id 123
test.equal(counts[123], 10);
digest.stop();
test.end();
};
var digest = new Harbinger.Digest(true, null, 10, callback);
for (var i = 0; i < 100; i++) {
digest.add(123, {message: 'Test ' + i});
}
});
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment