Commit bf13902a authored by Alan Szlosek's avatar Alan Szlosek

stream-based line buffer

parent ca691b7c
Pipeline #15441578 failed with stage
in 57 seconds
var LineBuffer = function(lineCallback, separator) {
this.callback = lineCallback;
this.separator = separator || /\r\n|\r|\n/;
/*
Due to the way streams work, empty strings pushed from reader are ignored,
thus, empty lines are ignored by LineBuffer
*/
const { Transform } = require('stream');
class LineBuffer extends Transform {
constructor(options) {
options = options || {};
// Should set this to max message length, perhaps
options.highWaterMark = 16384;
options.decodeStrings = false; //highWaterMark = 12;
super(options);
this.lineBuffer = [];
this.separator = /\r\n|\r|\n/;
/*
- lineBuffer contains an array of strings
- we split input data on newline and push into this array
- the last element is assumed to be a partial line (event if it's an empty string),
so we need to append the first line of new input onto it
*/
}
_transform(chunk, encoding, callback) {
if (Buffer.isBuffer(chunk)) {
chunk = chunk.toString();
}
var lines = chunk.split(this.separator);
var len = this.lineBuffer.length;
if (len > 0) {
this.lineBuffer[ len-1 ] += lines.shift();
}
this.lineBuffer = this.lineBuffer.concat(lines);
this.clear();
};
LineBuffer.prototype.clear = function() {
// the last element is assumed to be a partial line (event if it's an empty string),
// so we need to append the first line of new input onto it
this.lineBuffer = [''];
};
LineBuffer.prototype.data = function(data) {
var last = this.lineBuffer.length > 1 ? this.lineBuffer.length - 1: 0;
var lines = data.toString().split(this.separator);
// Append first line of new input to last element of lineBuffer
this.lineBuffer[last] += lines.shift();
last = lines.length;
for (var i = 0; i < last; i++) {
this.lineBuffer.push(lines[i]);
}
// Leave the last partial line in the line buffer
while (this.lineBuffer.length > 1) {
var line = this.lineBuffer.shift();
// Whether or not to send empty lines should be an option
this.callback(line);
}
};
if (!this.push( this.lineBuffer.shift() )) {
break;
}
}
callback();
}
_flush() {
while (this.lineBuffer.length > 0) {
this.push( this.lineBuffer.shift() );
}
}
}
module.exports = LineBuffer;
do you know
who i am?
i am
harbinger
var fs = require('fs');
var tap = require('tap');
var Harbinger = require('../index');
......@@ -5,22 +6,50 @@ tap.test('linebuffer', function(test) {
var lines = [
'hello',
'my name',
'',
'is Harbinger'
'is Harbinger',
'see me',
'warn'
];
var lb = new Harbinger.LineBuffer(function(line) {
var lb = new Harbinger.LineBuffer();
lb.on('data', function(line) {
var expectedLine = lines.shift();
test.equals(expectedLine, line);
test.equals(line.toString(), expectedLine);
if (lines.length == 0) {
test.end();
}
});
test.plan(4);
lb.data("hel");
lb.data("lo\nmy");
// Currently LineBuffer is configured to send empty lines
lb.data(" name\n\n");
lb.data("is Harbinger\n");
//test.plan(5);
lb.write("hel");
lb.write("lo\nmy");
// LineBuffer based on streams will ignore empty lines
lb.write(" name\n\n");
lb.write("is Harbinger\n\n");
setTimeout(function() {
lb.end("see me\nwarn");
}, 10);
});
tap.test('linebuffer.pipe', function(test) {
var lines = [
'do you know',
'who i am?',
'i am',
'harbinger'
];
var lb = new Harbinger.LineBuffer();
lb.on('data', function(line) {
var expectedLine = lines.shift();
test.equals(line.toString(), expectedLine);
if (lines.length == 0) {
test.end();
}
});
var rr = fs.createReadStream('in.log');
rr.pipe(lb);
});
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment