Commit 39bdd211 authored by Alan Szlosek's avatar Alan Szlosek

re-organization

parent b75ae8a4
......@@ -76,7 +76,11 @@ function parse(msg) {
};
module.exports = parse;
/*
module.exports = {
severity: Severity,
parse: parse
};
*/
/*
TODO:
* check message.payload.time ... doesn't exist for statsd messages
*/
// Load our Input modules
var StatsD = require('./input/statsd');
// Load helper modules
var createMessage = require('./lib/message');
var Duration = require('./lib/durations');
var Throttle = require('./lib/throttle-memory');
var Timeseries = require('./lib/timeseries');
var nodemailer = require('nodemailer');
var transporter = nodemailer.createTransport('smtps://user:pass@smtp.example.com');
var output = {
console: function(message, extra) {
// Format messages on the console however you see fit
console.log(message.payload.time, message.payload.msg, 'x' + extra);
},
emailStatsdErrors: function(msg) {
var message = {
text: 'Metric: ' + msg.payload.metric + "\nValue: " + msg.payload.value,
from: "DevOps Harbinger <devops@example.com>",
to: "devops.engineer@example.com",
subject: "DevOps Errors",
};
// send the message and get a callback with an error or details of the message that was sent
transporter.sendMail(
message,
function(err, message) {
console.log(err || message);
}
);
}
};
var Bucket = function(name, seconds) {
return name + seconds;
};
// ACT AS A STATSD COLLECTOR, ACCEPT DATA FROM STATSD
var statsdPort = 8125;
StatsD(statsdPort, '127.0.0.1', function(error, metric, value, type, sampleRate, remoteAddress) {
var payload;
var source = 'statsd';
var message;
payload = {
metric: metric,
value: value,
type: type
};
// params: payload, source, remoteAddress, localAddress, localPort) {
message = createMessage(payload, source, remoteAddress, '0.0.0.0', statsdPort);
// You may want to split this delivery decision-making to a separate function when
// using more than one input
// Deliver error.count metrics via email at most every minute
if (message.payload.metric.match(/errors\.count$/)) {
// We're throttling these errors to 1 minute,
// Give it a custom label so it doesn't clash with other throttles
Throttle('errors.count:1m', message, Duration.minute, output.emailStatsdErrors);
// The above is just a simple throttling example. In reality you'd likely only alert
// If the error count remains above a certain number over a time period
// Create new timeseries tracker, but only keep an hour's worth of data
var ts = new Timeseries(redisClient, 'errors.count', 3600);
ts.addCount(messsage.payload.value);
// So now how do we run calculations based on this tracked data?
}
});
/*
1. Our sample metric is called "sample"
2. Every second, calculate the average of the metric values over the past 10 seconds
3. Alert when the average is over 50
4. Populate the metric with random integers every 500ms
*/
var Timeseries = require('./lib/timeseries');
var log = require('./lib/log')('example-timeseries', true);
var moment = require('moment');
var redis = require('redis');
function getRandomInteger(min, max) {
return Math.floor(Math.random() * (max - min)) + min;
};
var redisClient = redis.createClient(6379, 'localhost');
// Instantiate a timeseries dataset called "sample" that preserves 60 seconds worth of data
var metric = new Timeseries(redisClient, 'sample', 60);
var averageInterval;
var countInterval;
var done = function() {
clearInterval(averageInterval);
clearInterval(countInterval);
redisClient.quit();
};
// Calculate the average of the metric every second, and print the average if it reaches our threshold
var averageInterval = setInterval(
function() {
// Exclude the current second to avoid partial calculations
var startSeconds = moment().subtract(11, 'seconds').unix();
var endSeconds = startSeconds + 10;
metric.getCountAverage(
startSeconds,
endSeconds,
function(err, avg, secondsSpanned) {
if (err) {
log(err);
done();
return;
}
log('Dataset spans ' + secondsSpanned + ' seconds');
if (secondsSpanned == 10 && avg > 50) {
log('Average above 50', avg);
done();
}
}
);
},
1000
);
// Fill the metric with values every 500ms
var countInterval = setInterval(
function() {
var rand = getRandomInteger(1, 100);
metric.addCount(moment().unix(), rand);
},
500
);
/*
NOTE: This code is old and confusing ... please see example-statsd.js for a clearer example
Example using Redis rollups, and support for sending alerts via email
Requirements: npm install nodemail
*/
var net = require('net');
var SyslogParser = require('./lib/syslog-parser');
var IfNotSeen = require('./lib/after');
var StatsD = require('./input/statsd');
var LineBuffer = require('./lib/line-buffer');
var Since = require('./lib/since');
var watchFile = require('./input/file-watcher');
var createMessage = require('./lib/message');
var redis = require('redis');
var nodemailer = require('nodemailer');
var transporter = nodemailer.createTransport('smtps://user:pass@smtp.example.com');
var debug = false;
var redisClient = redis.createClient(6379, 'localhost');
redisClient.on("error", function (err) {
console.log("Error " + err);
});
var Throttle = require('./lib/throttle-redis')(redisClient);
/*
Performance tuning notes
Allocate more memory to receive buffers. On older laptop 10240000 is enough to receive 10k udp messages a second
20480000 is enough to receive 30k per second
sysctl -w net.core.rmem_default=40960000
sysctl -w net.core.rmem_max=40960000
Configuration notes:
It's possible to make nginx log to journald via
error_log syslog:server=unix:/dev/log;
access_log syslog:server=unix:/dev/log;
That way I don't have to find another daemon like remote_syslog2
*/
var Times = {
day: 60 * 60 * 24,
hour: 60 * 60,
minute: 60,
second: 1
};
var Duration = Times;
var backends = {
console: function(message, extra) {
console.log(message.payload.time, message.payload.msg, 'x' + extra);
},
hipchat: function(message) {
},
email: function(msg) {
var message = {
text: msg.payload.msg,
from: "Alan Szlosek <alan@greaterscope.com>",
to: "alan.szlosek@gmail.com",
subject: "DevOps Alert",
};
// send the message and get a callback with an error or details of the message that was sent
transporter.sendMail(
message,
function(err, message) {
console.log(err || message);
}
);
}
};
var AlertIfSince = new Since(redisClient);
// Emit message to console every N seconds
var rollupToConsole = function(message, seconds) {
Rollup(
message,
seconds,
backends.console
);
};
var rollupToEmail = function(message, seconds) {
Rollup(
message,
seconds,
function(message) {
backends.console(message);
backends.email(message);
}
);
};
var transformAndDeliver = function(message) {
// TRANSFORMS AND DELIVERY DESTINATION
if (message.source == 'statsd') {
if (message.payload.metric.match(/error\.count$/)) {
Throttle(message.id + ':1s', message, 1 * Times.second, backends.console);
}
} else if (message.source == "syslog" ) {
// If syslog severity is 0 through 3, should alert. https://en.wikipedia.org/wiki/Syslog
// These message seem severe, but we may need to tune
if (0 <= message.payload.severity && message.payload.severity < 2) {
Throttle(message.id + ':1s', message, 1 * Times.second, backends.console);
}
if (message.payload.msg.match(/(PHP Fatal error)|(Uncaught )/i)) {
// This will have
Throttle(message.id + ':1s', message, 1 * Times.second, backends.console);
/*
} else if (message.payload.msg.match(/session (opened|closed)/)) {
Throttle(message.id + ':1s', message, 1 * Times.second, backends.console);
*/
} else if (message.payload.msg.match(/howdy/)) {
AlertIfSince.countForSeconds(message.id + ':5over10', message, 5, 10, function(message) {
// Throttle this alert
Throttle(message.id + ':5over10throttle60s', message, 60, backends.console);
});
} else if (message.payload.msg.match(/server reached pm\.max_children setting/)) {
Throttle(message.id + ':1s', message, 1 * Times.second, backends.console);
}
}
};
// SYSLOG INGESTERS
// OPTION 1: LISTEN FOR SYSLOG MESSAGES ON PORT 514
/*
var syslogServer = new net.Server();
var syslogPort = 10000;
syslogServer.on('connection', function(socket) {
var lineBuffer = new LineBuffer(function(line) {
// would be nice to have an option to pull "package/program[PID]" from tag
payload = SyslogParser.parse(line);
id = payload.msg;
message = createMessage(payload, 'syslog', socket.remoteAddress, socket.localAddress, socket.localPort);
transformAndDeliver(message);
});
socket
.on('data', function(data) {
lineBuffer.data(data);
});
});
syslog.listen(syslogPort, function() {
console.log('Listening for syslog messages on TCP 10000');
});
*/
// OPTION 2: LISTEN ON A UNIX DOMAIN SOCKET
//syslogServer.listen('/var/log/syslog');
// OPTION 3: TAIL LOG FILES
var files = [
'/var/log/auth.log',
'/var/log/syslog',
'/home/alan/coding/projects/devops-harbinger/test.log'
];
var watchSyslog = function(filename) {
// Tail /var/log/auth.log for input
var lb = new LineBuffer(function(line) {
if (debug) {
console.log('LineBuffer', line);
}
// would be nice to have an option to pull "package/program[PID]" from tag
payload = SyslogParser.parse(line);
id = payload.msg;
message = createMessage(payload, 'syslog'); //socket.remoteAddress, socket.localAddress, socket.localPort);
transformAndDeliver(message);
});
var watcher = watchFile(filename, function(err, data) {
if (err) {
console('watch-file error', err);
return;
}
lb.data(data);
});
};
for (var i = 0; i < files.length; i++) {
watchSyslog(files[i]);
}
// ACT AS A STATSD COLLECTOR, ACCEPT DATA FROM STATSD
var statsdPort = 8125;
StatsD(statsdPort, '127.0.0.1', function(error, metric, value, type, sampleRate, remoteAddress) {
var payload;
var message;
if (metric.match(/(app)\./)) {
payload = {
metric: metric,
value: value
};
message = createMessage(payload, 'statsd', remoteAddress, '0.0.0.0', statsdPort);
transformAndDeliver(message);
} else if (metric.match(/^(node-event-pipeline|namespace)\./)) {
payload = {
metric: metric,
value: value,
type: type
};
message = createMessage(payload, 'statsd', remoteAddress, '0.0.0.0', statsdPort);
transformAndDeliver(message);
}
});
var syslog = require('../src/lib/syslog-parser');
var throttleInMemory = require('../src/lib/throttle-memory');
var ThrottleInRedis = require('../src/lib/throttle-redis');
var harbinger = require('../');
var moment = require('moment');
var redis = require('redis');
var statsd = require('../src/input/statsd');
var udp = require('dgram').createSocket('udp4');
udp.unref();
......@@ -39,7 +36,7 @@ module.exports = {
for (var i = 0; i < lines.length; i++) {
var out = syslog.parse( lines[i] );
var out = harbinger.Parse.syslog( lines[i] );
// Eh, these fields
out.time = '' + out.time;
test.deepEqual(out, objects[i]);
......@@ -83,7 +80,7 @@ module.exports = {
test.expect(2);
for (var i = 0; i < 10; i++) {
throttleInMemory(
harbinger.Throttle.InMemory(
'test:throttle-memory:' + timeout,
message,
timeout,
......@@ -94,7 +91,7 @@ module.exports = {
setTimeout(
function() {
for (var i = 0; i < 10; i++) {
throttleInMemory(
harbinger.Throttle.InMemory(
'test:throttle-memory:' + timeout,
message,
timeout,
......@@ -110,8 +107,12 @@ module.exports = {
var redisClient = redis.createClient(6379, 'localhost');
redisClient.on("error", function (err) {
console.log("Error " + err);
test.done();
});
redisClient.on("end", function(err) {
//console.log("Redis connection closed");
});
var throttleInRedis = ThrottleInRedis(redisClient);
var throttleInRedis = harbinger.Throttle.InRedis(redisClient);
var message = {
name: "Test"
};
......@@ -160,7 +161,7 @@ module.exports = {
statsd: {
setUp: function(callback) {
this.server = new statsd();
this.server = new harbinger.Input.StatsD();
callback();
},
tearDown: function(callback) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment