node-memcached

memcached

lib/memcached.js

var EventEmitter = require('events').EventEmitter , Stream = require('net').Stream , Buffer = require('buffer').Buffer;

var HashRing = require('hashring') , Connection = require('./connection') , Utils = require('./utils') , Manager = Connection.Manager , IssueLog = Connection.IssueLog;

// The constructor function Client(args, options){ if(!(this && this.hasOwnProperty && (this instanceof Client))) this = new Client();

var servers = [] , weights = {} , key;

// Parse down the connection arguments

switch (Object.prototype.toString.call(args)){ case '[object String]': servers.push(args); break; case '[object Object]': weights = args; servers = Object.keys(args); case '[object Array]': default: servers = args; break; }

if (!servers.length) throw new Error('No servers where supplied in the arguments');

// merge with global and user config Utils.merge(this, Client.config); Utils.merge(this, options); EventEmitter.call(this);

this.servers = servers; this.HashRing = new HashRing(args, this.algorithm); this.connections = {}; this.issues = []; };

// Allows users to configure the memcached globally or per memcached client Client.config = { maxKeySize: 251 // max key size allowed by Memcached , maxExpiration: 2592000 // max expiration duration allowed by Memcached , maxValue: 1048576 // max length of value allowed by Memcached

, algorithm: 'crc32' // hashing algorithm that is used for key mapping

, poolSize: 10 // maximal parallel connections , reconnect: 18000000 // if dead, attempt reconnect each xx ms , timeout: 5000 // after x ms the server should send a timeout if we can't connect , retries: 5 // amount of retries before server is dead , retry: 30000 // timeout between retries, all call will be marked as cache miss , remove: false // remove server if dead if false, we will attempt to reconnect , redundancy: false // allows you do re-distribute the keys over a x amount of servers , keyCompression: true // compress keys if they are to large (md5) , debug: false // Output the commands and responses };

// There some functions we don't want users to touch so we scope them (function(nMemcached){ const LINEBREAK = '\r\n' , NOREPLY = ' noreply' , FLUSH = 1E3 , BUFFER = 1E2 , CONTINUE = 1E1 , FLAGJSON = 1<<1 , FLAGBINARY = 2<<1;

var memcached = nMemcached.prototype = new EventEmitter , private = {} , undefined;

// Creates or generates a new connection for the give server, the callback will receive the connection // if the operation was successful memcached.connect = function connect(server, callback){ // server is dead, bail out if (server in this.issues && this.issues[server].failed) return callback(false, false);

// fetch from connection pool
if (server in this.connections) return this.connections[server].allocate(callback);

// No connection factory created yet, so we must build one
var serverTokens = /(.*):(\d+){1,}$/.exec(server).reverse()
  , memcached = this;

serverTokens.pop();

this.connections[server] = new Manager(server, this.poolSize, function(callback){
  var S = new Stream
    , Manager = this;

  // config the Stream
  S.setTimeout(memcached.timeout);
  S.setNoDelay(true);
  S.metaData = [];
  S.responseBuffer = "";
  S.bufferArray = [];
  S.server = server;
  S.tokens = serverTokens;

  // Add the event listeners
  Utils.fuse(S, {
    connect: function streamConnect(){ callback(false, this) }
  , close: function streamClose(){ Manager.remove(this) }
  , error: function streamError(err){ memcached.connectionIssue(err, S, callback) }
  , data: Utils.curry(memcached, private.buffer, S)
  , timeout: function streamTimeout(){ Manager.remove(this) }
  , end: S.end
  });

  // connect the net.Stream [port, hostname]
  S.connect.apply(S, serverTokens);
  return S;
});

// now that we have setup our connection factory we can allocate a new connection
this.connections[server].allocate(callback);

};

// Creates a multi stream, so it's easier to query agains // multiple memcached servers. memcached.multi = function memcachedMulti(keys, callback){ var map = {} , memcached = this , servers , i;

// gets all servers based on the supplied keys,
// or just gives all servers if we don't have keys
if (keys){
  keys.forEach(function fetchMultipleServers(key){
    var server = memcached.HashRing.getNode(key);
    if (map[server]){
      map[server].push(key);
    } else {
      map[server] = [key];
    }
  });
  // store the servers
  servers = Object.keys(map);
} else {
  servers = this.servers;
}

i = servers.length;
while(i--){
  callback.call(this, servers[i], map[servers[i]], i, servers.length);
}

};

// Executes the command on the net.Stream, if no server is supplied it will use the query.key to get // the server from the HashRing memcached.command = function memcachedCommand(queryCompiler, server){

// generate a regular query, 
var query = queryCompiler()
, redundancy = this.redundancy && this.redundancy < this.servers.length
, queryRedundancy = query.redundancyEnabled
, memcached = this;

// validate the arguments
if (query.validation && !Utils.validateArg(query, this)) return;

// fetch servers
server = server ? server : redundancy && queryRedundancy ? (redundancy = this.HashRing.createRange(query.key, (this.redundancy + 1), true)).shift() : this.HashRing.getNode(query.key);

// check if the server is still alive
if (server in this.issues && this.issues[server].failed) return query.callback && query.callback(false, false);

Client.config.debug && console.log(query.command);

this.connect(server, function allocateMemcachedConnection(error, S){
  // check for issues
  if (!S) return query.callback && query.callback(false, false);
  if (error) return query.callback && query.callback(error);
  if (S.readyState !== 'open') return query.callback && query.callback('Connection readyState is set to ' + S.readySate);

  // used for request timing
  query.start = Date.now();
  S.metaData.push(query);
  S.write(query.command + LINEBREAK);
});

// if we have redundancy enabled and the query is used for redundancy, than we are going loop over
// the servers, check if we can reach them, and connect to the correct net connection.
// because all redundancy queries are executed with "no reply" we do not need to store the callback
// as there will be no value to parse. 
if (redundancy && queryRedundancy){
  queryRedundancy = queryCompiler(queryRedundancy);
  redundancy.forEach(function(server){
    if (server in memcached.issues && memcached.issues[server].failed) return;

    memcached.connect(server, function allocateMemcachedConnection(error, S){
      if (!S || error || S.readyState !== 'open') return;
      S.write(queryRedundancy.command + LINEBREAK);
    });
  })
}

};

// Logs all connection issues, and handles them off. Marking all requests as cache misses. memcached.connectionIssue = function connectionIssue(error, S, callback){ // end connection and mark callback as cache miss if (S && S.end) S.end(); if (callback) callback(false, false);

var issues
  , server = S.server
  , memcached = this;

// check for existing issue logs, or create a new log
if (server in this.issues){
  issues = this.issues[server];
} else {
  issues = this.issues[server] = new IssueLog({
    server: server
  , tokens: S.tokens
  , reconnect: this.reconnect
  , retries: this.retries
  , retry: this.retry
  , remove: this.remove
  });

  // proxy the events
  Utils.fuse(issues, {
    issue: function(details){ memcached.emit('issue', details) }
  , failure: function(details){ memcached.emit('failure', details) }
  , reconnecting: function(details){ memcached.emit('reconnecting', details) }
  , reconnected: function(details){ memcached.emit('reconnect', details) }
  , remove: function(details){
      // emit event and remove servers
      memcached.emit('remove', details);
      memcached.connections[server].end();

      if (this.failOverServers && this.failOverServers.length){
        memcached.HashRing.replaceServer(server, this.failOverServers.shift());
      } else {
        memcached.HashRing.removeServer(server);
      }
    }
  });
}

// log the issue
issues.log(error);

};

// Kills all active connections memcached.end = function endMemcached(){ var memcached = this; Object.keys(this.connections).forEach(function closeConnection(key){ memcached.connections[key].free(0) }); };

// These do not need to be publicly available as it's one of the most important // parts of the whole client, the parser commands: private.parsers = { // handle error responses 'NOTFOUND': function(tokens, dataSet, err){ return [CONTINUE, false] } , 'NOTSTORED': function(tokens, dataSet, err){ return [CONTINUE, false] } , 'ERROR': function(tokens, dataSet, err){ err.push('Received an ERROR response'); return [FLUSH, false] } , 'CLIENTERROR': function(tokens, dataSet, err){ err.push(tokens.splice(1).join(' ')); return [CONTINUE, false] } , 'SERVERERROR': function(tokens, dataSet, err, queue, S, memcached){ memcached.connectionIssue(tokens.splice(1).join(' '), S); return [CONTINUE, false] }

// keyword based responses

, 'STORED': function(tokens, dataSet){ return [CONTINUE, true] } , 'DELETED': function(tokens, dataSet){ return [CONTINUE, true] } , 'OK': function(tokens, dataSet){ return [CONTINUE, true] } , 'EXISTS': function(tokens, dataSet){ return [CONTINUE, false] } , 'END': function(tokens, dataSet, err, queue){ if (!queue.length) queue.push(false); return [FLUSH, true] }

// value parsing:

, 'VALUE': function(tokens, dataSet, err, queue){ var key = tokens[1] , flag = +tokens[2] , expire = tokens[3] , cas = tokens[4] , multi = this.metaData[0] && this.metaData[0].multi || cas ? {} : false , tmp;

  switch (flag){
    case FLAG_JSON:
      dataSet = JSON.parse(dataSet);
      break;
    case FLAG_BINARY:
      tmp = new Buffer(dataSet.length);
      tmp.write(dataSet, 0, 'binary');
      dataSet = tmp;
      break;
    }

  // Add to queue as multiple get key key key key key returns multiple values
  if (!multi){
    queue.push(dataSet);
  } else {
    multi[key] = dataSet;
    if (cas) multi.cas = cas;
    queue.push(multi);
  }

  return [BUFFER, false] 
}

, 'INCRDECR': function(tokens){ return [CONTINUE, +tokens[1]] } , 'STAT': function(tokens, dataSet, err, queue){ queue.push([tokens[1], /^\d+$/.test(tokens[2]) ? +tokens[2] : tokens[2]]); return [BUFFER, true] } , 'VERSION': function(tokens, dataSet){ var versionTokens = /(\d+)(?:.)(\d+)(?:.)(\d+)$/.exec(tokens.pop());

  return [CONTINUE, {
    server: this.server
  , version: versionTokens[0]
  , major: versionTokens[1] || 0
  , minor: versionTokens[2] || 0
  , bugfix: versionTokens[3] || 0
  }];
}

, 'ITEM': function(tokens, dataSet, err, queue){ queue.push({ key: tokens[1] , b: +tokens[2].substr(1) , s: +tokens[4] }); return [BUFFER, false] } };

// Parses down result sets private.resultParsers = { // combines the stats array, in to an object 'stats': function(resultSet){ var response = {};

  // add references to the retrieved server
  response.server = this.server;

  // Fill the object 
  resultSet.forEach(function(statSet){
    response[statSet[0]] = statSet[1];
  });

  return response;
}

// the settings uses the same parse format as the regular stats

, 'stats settings': function(){ return private.resultParsers.stats.apply(this, arguments) } // Group slabs by slab id , 'stats slabs': function(resultSet){ var response = {};

  // add references to the retrieved server
  response.server = this.server;

  // Fill the object 
  resultSet.forEach(function(statSet){
    var identifier = statSet[0].split(':');

    if (!response[identifier[0]]) response[identifier[0]] = {};
    response[identifier[0]][identifier[1]] = statSet[1];

  });

  return response;
}

, 'stats items': function(resultSet){ var response = {};

  // add references to the retrieved server
  response.server = this.server;

  // Fill the object 
  resultSet.forEach(function(statSet){
    var identifier = statSet[0].split(':');

    if (!response[identifier[1]]) response[identifier[1]] = {};
    response[identifier[1]][identifier[2]] = statSet[1];

  });

  return response;
}

};

// Generates a RegExp that can be used to check if a chunk is memcached response identifier private.allCommands = new RegExp('^(?:' + Object.keys(private.parsers).join('|') + '|\d' + ')'); private.bufferedCommands = new RegExp('^(?:' + Object.keys(private.parsers).join('|') + ')');

// When working with large chunks of responses, node chunks it in to pieces. So we might have // half responses. So we are going to buffer up the buffer and user our buffered buffer to query // against. Also when you execute allot of .writes to the same stream, node will combine the responses // in to one response stream. With no indication where it had cut the data. So it can be it cuts inside the value response, // or even right in the middle of a line-break, so we need to make sure, the last piece in the buffer is a LINEBREAK // because that is all what is sure about the Memcached Protocol, all responds end with them. private.buffer = function BufferBuffer(S, BufferStream){ S.responseBuffer += BufferStream;

Client.config.debug && console.log(S.responseBuffer);

// only call transform the data once we are sure, 100% sure, that we valid response ending
if (S.responseBuffer.substr(S.responseBuffer.length - 2) === LINEBREAK){
  var chunks = S.responseBuffer.split(LINEBREAK);

  S.responseBuffer = ""; // clear!
  this.rawDataReceived(S, S.bufferArray = S.bufferArray.concat(chunks));
} 

};

// The actual parsers function that scan over the responseBuffer in search of Memcached response // identifiers. Once we have found one, we will send it to the dedicated parsers that will transform // the data in a human readable format, deciding if we should queue it up, or send it to a callback fn. memcached.rawDataReceived = function rawDataReceived(S){ var queue = [] , token , tokenSet , dataSet = '' , resultSet , metaData , err = [] , tmp;

while(S.bufferArray.length && private.allCommands.test(S.bufferArray[0])){

  token = S.bufferArray.shift();
  tokenSet = token.split(' ');

  // special case for digit only's these are responses from INCR and DECR
  if (/\d+/.test(tokenSet[0])) tokenSet.unshift('INCRDECR');

  // special case for value, it's required that it has a second response!
  // add the token back, and wait for the next response, we might be handling a big 
  // ass response here.
  if (tokenSet[0] == 'VALUE' && S.bufferArray.indexOf('END') == -1){
    return S.bufferArray.unshift(token);
  }

  // check for dedicated parser
  if (private.parsers[tokenSet[0]]){

    // fetch the response content
    while(S.bufferArray.length){
      if (private.bufferedCommands.test(S.bufferArray[0])) break;

      dataSet += S.bufferArray.shift();
    };

    resultSet = private.parsers[tokenSet[0]].call(S, tokenSet, dataSet || token, err, queue, this);

    // check how we need to handle the resultSet response
    switch(resultSet.shift()){
      case BUFFER:
        break;

      case FLUSH:
        metaData = S.metaData.shift();
        resultSet = queue;

        // if we have a callback, call it
        if (metaData && metaData.callback){
          metaData.execution = Date.now() - metaData.start;
          metaData.callback.call(
            metaData, err.length ? err : err[0],

            // see if optional parsing needs to be applied to make the result set more readable
            private.resultParsers[metaData.type] ? private.resultParsers[metaData.type].call(S, resultSet, err) :
            !Array.isArray(queue) || queue.length > 1 ? queue : queue[0] 
         );
        }

        queue.length = err.length = 0;
        break;

      case CONTINUE:
      default:
        metaData = S.metaData.shift();

        if (metaData && metaData.callback){
          metaData.execution = Date.now() - metaData.start;
          metaData.callback.call(metaData, err.length > 1 ? err : err[0], resultSet[0]);
        }

        err.length = 0;
        break;
    }
  } else {
    // handle unkown responses
    metaData = S.metaData.shift();
    if (metaData && metaData.callback){
      metaData.execution = Date.now() - metaData.start;
      metaData.callback.call(metaData, 'Unknown response from the memcached server: "' + token + '"', false);
    }
  }

  // cleanup
  dataSet = ''
  tokenSet = metaData = undefined;

  // check if we need to remove an empty item from the array, as splitting on /r/n might cause an empty
  // item at the end.. 
  if (S.bufferArray[0] === '') S.bufferArray.shift();
};

};

// Small wrapper function that only executes errors when we have a callback private.errorResponse = function errorResponse(error, callback){ if (typeof callback == 'function') callback(error, false);

return false;

};

// This is where the actual Memcached API layer begins: memcached.get = function get(key, callback){ if (Array.isArray(key)) return this.getMulti.apply(this, arguments);

this.command(function getCommand(noreply){ return {
  key: key
, callback: callback
, validate: [['key', String], ['callback', Function]]
, type: 'get'
, command: 'get ' + key
}});

};

// the difference between get and gets is that gets, also returns a cas value // and gets doesn't support multi-gets at this moment. memcached.gets = function get(key, callback){ this.command(function getCommand(noreply){ return { key: key , callback: callback , validate: [['key', String], ['callback', Function]] , type: 'gets' , command: 'gets ' + key }}); };

// Handles get's with multiple keys memcached.getMulti = function getMulti(keys, callback){ var memcached = this , responses = {} , errors = [] , calls

  // handle multiple responses and cache them untill we receive all. 
  , handle = function(err, results){
      if (err) errors.push(err);

      // add all responses to the array
      (Array.isArray(results) ? results : [results]).forEach(function(value){ Utils.merge(responses, value) });

      if (!--calls) callback(errors.length ? errors : false, responses);
    };

this.multi(keys, function(server, key, index, totals){
  if (!calls) calls = totals;

  memcached.command(function getMultiCommand(noreply){ return {
      callback: handle
    , multi:true
    , type: 'get'
    , command: 'get ' + key.join(' ')
    }},
    server
 );
});

};

// As all command nearly use the same syntax we are going to proxy them all to this // function to ease maintenance. This is possible because most set commands will use the same // syntax for the Memcached server. Some commands do not require a lifetime and a flag, but the // memcached server is smart enough to ignore those. private.setters = function setters(type, validate, key, value, lifetime, callback, cas){ var flag = 0 , memcached = this , valuetype = typeof value , length;

if (Buffer.isBuffer(value)){
  flag = FLAG_BINARY;
  value = value.toString('binary');
} else if (valuetype !== 'string' && valuetype !== 'number'){
  flag = FLAG_JSON;
  value = JSON.stringify(value);
} else {
  value = value.toString();
}

length = Buffer.byteLength(value);
if (length > memcached.maxValue) return private.errorResponse('The length of the value is greater than ' + memcached.maxValue, callback);

memcached.command(function settersCommand(noreply){ return {
  key: key
, callback: callback
, lifetime: lifetime
, value: value
, cas: cas
, validate: validate
, type: type
, redundancyEnabled: true
, command: [type, key, flag, lifetime, length].join(' ') +
       (cas ? ' ' + cas : '') + 
       (noreply ? NOREPLY : '') + 
       LINEBREAK + value
}});

};

// Curry the function and so we can tell the type our private set function memcached.set = Utils.curry(false, private.setters, 'set', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]]); memcached.replace = Utils.curry(false, private.setters, 'replace', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]]); memcached.add = Utils.curry(false, private.setters, 'add', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]]);

memcached.cas = function checkandset(key, value, cas, lifetime, callback){ private.setters.call(this, 'cas', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]], key, value, lifetime, callback, cas); };

memcached.append = function append(key, value, callback){ private.setters.call(this, 'append', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]], key, value, 0, callback); };

memcached.prepend = function prepend(key, value, callback){ private.setters.call(this, 'prepend', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]], key, value, 0, callback); };

// Small handler for incr and decr's private.incrdecr = function incrdecr(type, key, value, callback){ this.command(function incredecrCommand(noreply){ return { key: key , callback: callback , value: value , validate: [['key', String], ['value', Number], ['callback', Function]] , type: type , redundancyEnabled: true , command: [type, key, value].join(' ') + (noreply ? NOREPLY : '') }}); };

// Curry the function and so we can tell the type our private incrdecr memcached.increment = memcached.incr = Utils.curry(false, private.incrdecr, 'incr'); memcached.decrement = memcached.decr = Utils.curry(false, private.incrdecr, 'decr');

// Deletes the keys from the servers memcached.del = function del(key, callback){ this.command(function deleteCommand(noreply){ return { key: key , callback: callback , validate: [['key', String], ['callback', Function]] , type: 'delete' , redundancyEnabled: true , command: 'delete ' + key + (noreply ? NOREPLY : '') }}); }; memcached['delete'] = memcached.del;

// Small wrapper that handle single keyword commands such as FLUSH ALL, VERSION and STAT private.singles = function singles(type, callback){ var memcached = this , responses = [] , errors = [] , calls

  // handle multiple servers
  , handle = function(err, results){
    if (err) errors.push(err);
    if (results) responses = responses.concat(results);

    // multi calls should ALWAYS return an array!
    if (!--calls) callback(errors, responses);
  };

this.multi(false, function(server, keys, index, totals){
  if (!calls) calls = totals;

  memcached.command(function singlesCommand(noreply){ return {
      callback: handle
    , type: type
    , command: type
    }},
    server
 );
});

};

// Curry the function and so we can tell the type our private singles memcached.version = Utils.curry(false, private.singles, 'version'); memcached.flush = Utils.curry(false, private.singles, 'flush_all'); memcached.stats = Utils.curry(false, private.singles, 'stats'); memcached.settings = Utils.curry(false, private.singles, 'stats settings'); memcached.slabs = Utils.curry(false, private.singles, 'stats slabs'); memcached.items = Utils.curry(false, private.singles, 'stats items');

// You need to use the items dump to get the correct server and slab settings // see simple_cachedump.js for an example memcached.cachedump = function cachedump(server, slabid, number, callback){ this.command(function cachedumpCommand(noreply){ return { callback: callback , number: number , slabid: slabid , validate: [['number', Number], ['slabid', Number], ['callback', Function]] , type: 'stats cachedump' , command: 'stats cachedump ' + slabid + ' ' + number }}, server ); };

})(Client);

module.exports = Client;

utils

lib/utils.js

var CreateHash = require('crypto').createHash;

exports.validateArg = function validateArg(args, config){ var toString = Object.prototype.toString , err , callback;

args.validate.forEach(function(tokens){ var key = tokens[0] , value = args[key];

switch(tokens[1]){
  case Number:
    if (toString.call(value) !== '[object Number]') err = 'Argument "' + key + '" is not a valid Number.';
    break;

  case Boolean:
    if (toString.call(value) !== '[object Boolean]') err = 'Argument "' + key + '" is not a valid Boolean.';
    break;

  case Array:
    if (toString.call(value) !== '[object Array]') err = 'Argument "' + key + '" is not a valid Array.';
    break;

  case Object:
    if (toString.call(value) !== '[object Object]') err = 'Argument "' + key + '" is not a valid Object.';
    break;

  case Function:
    if (toString.call(value) !== '[object Function]') err = 'Argument "' + key + '" is not a valid Function.';
    break;

  case String: 
    if (toString.call(value) !== '[object String]') err = 'Argument "' + key + '" is not a valid String.';

    if (!err && key == 'key' && value.length > config.maxKeySize){
      if (config.keyCompression){
        args[key] = CreateHash('md5').update(value).digest('hex');

        // also make sure you update the command
        config.command.replace(new RegExp('^(' + value + ')'), args[key]); 
      } else {
        err = 'Argument "' + key + '" is longer than the maximum allowed length of ' + config.maxKeySize;
      }
    }

    break;

  default:
    if (toString.call(value) == '[object global]' && !tokens[2]) err = 'Argument "' + key + '" is not defined.';
}

});

if (err){ if(callback) callback(err, false); return false; }

return true; };

// currys a function exports.curry = function curry(context, func){ var copy = Array.prototype.slice , args = copy.call(arguments, 2);

return function(){ return func.apply(context || this, args.concat(copy.call(arguments))); } };

// a small util to use an object for eventEmitter exports.fuse = function fuse(target, handlers){ for(var i in handlers) if (handlers.hasOwnProperty(i)){ target.on(i, handlers[i]); } };

// merges a object's proppertys / values with a other object exports.merge = function merge(target, obj){ for(var i in obj){ target[i] = obj[i]; }

return target; };

// a small items iterator exports.Iterator = function iterator(collection, callback){ var arr = Array.isArray(collection) , keys = !arr ? Object.keys(collection) : false , index = 0 , maximum = arr ? collection.length : keys.length , self = this;

// returns next item this.next = function(){ var obj = arr ? collection[index] : { key: keys[index], value: collection[keys[index]] }; callback(obj, index++, collection, self); };

// check if we have more items this.hasNext = function(){ return index < maximum; }; };

connection

lib/connection.js

var EventEmitter = require('events').EventEmitter , Spawn = require('child_process').spawn , Utils = require('./utils');

exports.Manager = ConnectionManager; // connection pooling exports.IssueLog = IssueLog; // connection issue handling exports.Available = ping; // connection availablity

function ping(host, callback){ var pong = Spawn('ping', [host]);

pong.stdout.on('data', function(data) { callback(false, data.toString().split('\n')[0].substr(14)); pong.kill(); });

pong.stderr.on('data', function(data) { callback(data.toString().split('\n')[0].substr(14), false); pong.kill(); }); };

function IssueLog(args){ this.config = args; this.messages = []; this.failed = false;

this.totalRetries = 0; this.totalReconnectsAttempted = 0; this.totalReconnectsSuccess = 0;

Utils.merge(this, args); EventEmitter.call(this); };

var issues = IssueLog.prototype = new EventEmitter;

issues.log = function(message){ var issue = this;

this.failed = true; this.messages.push(message || 'No message specified');

if (this.retries){ setTimeout(Utils.curry(issue, issue.attemptRetry), this.retry); return this.emit('issue', this.details); }

if (this.remove) return this.emit('remove', this.details)

setTimeout(Utils.curry(issue, issue.attemptReconnect), this.reconnect); };

Object.defineProperty(issues, 'details', { get: function(){ var res = {};

res.server = this.server;
res.tokens = this.tokens;
res.messages = this.messages;

if (this.retries){
  res.retries = this.retries;
  res.totalRetries = this.totalRetries
} else {
  res.totalReconnectsAttempted = this.totalReconnectsAttempted;
  res.totalReconnectsSuccess = this.totalReconnectsSuccess;
  res.totalReconnectsFailed = this.totalReconnectsAttempted - this.totalReconnectsSuccess;
  res.totalDownTime = (res.totalReconnectsFailed * this.reconnect) + (this.totalRetries * this.retry);
}

return res;

} });

issues.attemptRetry = function(){ this.totalRetries++; this.retries--; this.failed = false; };

issues.attemptReconnect = function(){ var issue = this; this.totalReconnectsAttempted++; this.emit('reconnecting', this.details);

// Ping the server ping(this.tokens[1], function(err){ // still no access to the server if (err){ this.messages.push(message || 'No message specified'); return setTimeout(Utils.curry(issue, issue.attemptReconnect), issue.reconnect); }

issue.emit('reconnected', issue.details);

issue.totalReconnectsSuccess++;
issue.messages.length = 0;
issue.failed = false;

// we connected again, so we are going through the whole cycle again
Utils.merge(issue, JSON.parse(JSON.stringify(issue.config)));

}); };

function ConnectionManager(name, limit, constructor){ this.name = name; this.total = limit; this.factory = constructor; this.connections = []; };

var Manager = ConnectionManager.prototype;

Manager.allocate = function(callback){ var total , i = total = this.connections.length , Manager = this;

// check for available while(i--){ if (this.isAvailable(this.connections[i])){ return callback(false, this.connections[i]); } }

// create new if (total < this.total){ return this.connections.push(this.factory.apply(this, arguments)); }

// wait untill the next event loop tick, to try again process.nextTick(function(){Manager.allocate(callback)}); };

Manager.isAvailable = function(connection){ var readyState = connection.readyState; return (readyState == 'open' || readyState == 'writeOnly') && !(connection.writeQueue && connection.writeQueue.length); };

Manager.remove = function(connection){ var position = this.connections.indexOf(connection);

if (position !== -1) this.connections.splice(position, 1);

if (connection.readyState && connection.readyState !== 'closed' && connection.end) connection.end(); };

Manager.free = function(keep){ var save = 0 , connection;

while(this.connections.length){ connection = this.connections.shift(); if(save < keep && this.isAvailable(this.connection[0])){ save++ continue; }

this.remove(connection);

} };