Commit d0bd0042 authored by Alan Szlosek's avatar Alan Szlosek

LineBuffer and FileWatcher are unnecessary

parent 2b455822
Pipeline #15947385 passed with stage
......@@ -19,7 +19,6 @@ all_tests:
script:
- npm install
- node test/message.js
- node test/linebuffer.js
- node test/syslog.js
- node test/digest.js
- node test/timeseries.js
......
......@@ -14,7 +14,6 @@ Example scenario:
Comes with the following batteries:
* File watcher (so you can tail log files)
* Syslog message parser
* StatsD message parser
* Ability to create a digest of messages, delivered when you want
......
var net = require('net');
var fst = require('fs-tail-stream');
var split = require('split');
var Harbinger = require('../index');
......@@ -27,7 +29,8 @@ var files = [
'/var/log/messages'
];
var watchSyslog = function(filename) {
var lb = new Harbinger.LineBuffer(function(line) {
var rs = fst.createReadStream(filename, { encoding: 'utf8', tail: true });
rs.pipe(split()).on('data', function(line) {
// would be nice to have an option to pull "package/program[PID]" from tag
var payload = Harbinger.syslog.parse(line);
......@@ -37,13 +40,6 @@ var watchSyslog = function(filename) {
everyTenMinutes.add('iptables input denied', message);
}
});
var watcher = Harbinger.watchFile(filename, function(err, data) {
if (err) {
console('watch-file error', err);
return;
}
lb.data(data);
});
};
for (var i = 0; i < files.length; i++) {
watchSyslog(files[i]);
......
......@@ -9,11 +9,13 @@ module.exports = {
StatsD: require('./lib/input/statsd'),
},
LineBuffer: require('./lib/line-buffer'),
// DEPRECATED: use split module
//LineBuffer: require('./lib/line-buffer'),
syslog: require('./lib/syslog'),
watchFile: require('./lib/input/file-watcher'),
// DEPRECATED: use fs-tail-stream
//watchFile: require('./lib/input/file-watcher'),
createMessage: function(payload, source, sourceFile, remoteAddress, localAddress, localPort) {
return {
......
// Adapted from https://github.com/lucagrulla/node-tail
var fs = require('fs');
module.exports = function(filename, callback) {
var watchHandle = null;
var position = 0;
var queue = [];
var read = function() {
var offset = queue.shift()
var end = queue.shift();
var buffer = '';
var stream = fs.createReadStream(filename, {start: offset, end: end, encoding:"utf-8"});
stream.on('error', function(error) {
callback(error);
});
stream.on('end', function() {
callback(null, buffer);
if (queue.length > 0) {
read();
}
});
stream.on('data', function(data) {
buffer += data;
});
};
// Get file size (last position)
var watchCallback = function(event, filename2) {
var stats;
//console.log('FileWatcher', event, filename);
switch (event) {
case 'rename':
// stop watching
watch();
break;
case 'change':
stats = fs.statSync(filename);
if (stats.size < position) {
position = stats.size;
}
if (stats.size > position) {
queue.push(position, stats.size);
position = stats.size;
read();
}
break;
}
};
var watch = function() {
var stats = fs.statSync(filename);
if (watchHandle) {
watchHandle.close();
}
//queue = [];
position = stats.size;
watchHandle = fs.watch(filename, {persistent:true}, watchCallback);
};
watch();
};
/*
Due to the way streams work, empty strings pushed from reader are ignored,
thus, empty lines are ignored by LineBuffer
*/
const { Transform } = require('stream');
class LineBuffer extends Transform {
constructor(options) {
options = options || {};
// Should set this to max message length, perhaps
options.highWaterMark = 16384;
options.decodeStrings = false; //highWaterMark = 12;
super(options);
this.lineBuffer = [];
this.separator = /\r\n|\r|\n/;
/*
- lineBuffer contains an array of strings
- we split input data on newline and push into this array
- the last element is assumed to be a partial line (event if it's an empty string),
so we need to append the first line of new input onto it
*/
}
_transform(chunk, encoding, callback) {
if (Buffer.isBuffer(chunk)) {
chunk = chunk.toString();
}
var lines = chunk.split(this.separator);
var len = this.lineBuffer.length;
if (len > 0) {
this.lineBuffer[ len-1 ] += lines.shift();
}
this.lineBuffer = this.lineBuffer.concat(lines);
while (this.lineBuffer.length > 1) {
if (!this.push( this.lineBuffer.shift() )) {
break;
}
}
callback();
}
_flush(callback) {
while (this.lineBuffer.length > 0) {
this.push( this.lineBuffer.shift() );
}
// Pass along "end of input"
this.push(null);
setTimeout(callback, 1);
}
}
module.exports = LineBuffer;
var fs = require('fs');
var tap = require('tap');
var Harbinger = require('../index');
tap.test('linebuffer', function(test) {
var lines = [
'hello',
'my name',
'is Harbinger',
'see me',
'warn'
];
var lb = new Harbinger.LineBuffer();
lb.on('data', function(line) {
var expectedLine = lines.shift();
test.equals(line.toString(), expectedLine);
if (lines.length == 0) {
test.end();
}
});
//test.plan(5);
lb.write("hel");
lb.write("lo\nmy");
// LineBuffer based on streams will ignore empty lines
lb.write(" name\n\n");
lb.write("is Harbinger\n\n");
setTimeout(function() {
lb.end("see me\nwarn");
}, 10);
});
tap.test('linebuffer.pipe', function(test) {
var lines = [
'do you know',
'who i am?',
'i am',
'harbinger'
];
var lb = new Harbinger.LineBuffer();
lb.on('data', function(line) {
var expectedLine = lines.shift();
test.equals(line.toString(), expectedLine);
if (lines.length == 0) {
test.end();
}
});
var rr = fs.createReadStream('test/in.log');
rr.pipe(lb);
});
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment