...
 
Commits (7)
doc:
dox --title "node-memcached" lib/* > doc/index.html
ALL_TESTS = $(shell find tests -name '*.test.js')
REPORTER = spec
UI = bdd
test:
expresso -I lib $(TESTFLAGS) tests/*.test.js
@./node_modules/.bin/mocha \
--require should \
--reporter $(REPORTER) \
--ui $(UI) \
--growl \
$(ALL_TESTS)
doc:
dox --title "node-memcached" lib/* > doc/index.html
.PHONY: test doc
\ No newline at end of file
.PHONY: test doc
"use strict";
var EventEmitter = require('events').EventEmitter
, Spawn = require('child_process').spawn
, spawn = require('child_process').spawn
, Utils = require('./utils');
exports.Manager = ConnectionManager; // connection pooling
exports.IssueLog = IssueLog; // connection issue handling
exports.Available = ping; // connection availablity
function ping(host, callback){
var pong = Spawn('ping', [host]);
pong.stdout.on('data', function(data) {
function ping (host, callback) {
var pong = spawn('ping', [host]);
pong.stdout.on('data', function stdoutdata (data) {
callback(false, data.toString().split('\n')[0].substr(14));
pong.kill();
});
pong.stderr.on('data', function(data) {
pong.stderr.on('data', function stderrdata (data) {
callback(data.toString().split('\n')[0].substr(14), false);
pong.kill();
});
};
}
function IssueLog(args){
function IssueLog (args) {
this.config = args;
this.messages = [];
this.failed = false;
this.totalRetries = 0;
this.totalReconnectsAttempted = 0;
this.totalReconnectsSuccess = 0;
Utils.merge(this, args);
EventEmitter.call(this);
};
}
var issues = IssueLog.prototype = new EventEmitter;
issues.log = function(message){
issues.log = function log (message) {
var issue = this;
this.failed = true;
this.messages.push(message || 'No message specified');
if (this.retries){
setTimeout(Utils.curry(issue, issue.attemptRetry), this.retry);
if (this.retries) {
setTimeout(issue.attemptRetry.bind(issue), this.retry);
return this.emit('issue', this.details);
}
if (this.remove) return this.emit('remove', this.details)
setTimeout(Utils.curry(issue, issue.attemptReconnect), this.reconnect);
if (this.remove) return this.emit('remove', this.details);
setTimeout(issue.attemptReconnect.bind(issue), this.reconnect);
};
Object.defineProperty(issues, 'details', {
get: function(){
get: function getDetails () {
var res = {};
res.server = this.serverAddress;
res.tokens = this.tokens;
res.messages = this.messages;
if (this.retries){
if (this.retries) {
res.retries = this.retries;
res.totalRetries = this.totalRetries
res.totalRetries = this.totalRetries;
} else {
res.totalReconnectsAttempted = this.totalReconnectsAttempted;
res.totalReconnectsSuccess = this.totalReconnectsSuccess;
res.totalReconnectsFailed = this.totalReconnectsAttempted - this.totalReconnectsSuccess;
res.totalDownTime = (res.totalReconnectsFailed * this.reconnect) + (this.totalRetries * this.retry);
}
return res;
}
});
issues.attemptRetry = function(){
issues.attemptRetry = function attemptRetry () {
this.totalRetries++;
this.retries--;
this.failed = false;
};
issues.attemptReconnect = function(){
issues.attemptReconnect = function attemptReconnect () {
var issue = this;
this.totalReconnectsAttempted++;
this.emit('reconnecting', this.details);
// Ping the server
ping(this.tokens[1], function(err){
ping(this.tokens[1], function pingpong (err) {
// still no access to the server
if (err){
this.messages.push(message || 'No message specified');
return setTimeout(Utils.curry(issue, issue.attemptReconnect), issue.reconnect);
if (err) {
this.messages.push(err.message || 'No message specified');
return setTimeout(issue.attemptReconnect.bind(issue), issue.reconnect);
}
issue.emit('reconnected', issue.details);
issue.totalReconnectsSuccess++;
issue.messages.length = 0;
issue.failed = false;
// we connected again, so we are going through the whole cycle again
Utils.merge(issue, JSON.parse(JSON.stringify(issue.config)));
});
};
function ConnectionManager(name, limit, constructor){
function ConnectionManager (name, limit, constructor) {
this.name = name;
this.total = limit;
this.factory = constructor;
this.connections = [];
};
}
var Manager = ConnectionManager.prototype;
Manager.allocate = function(callback){
var total
, i = total = this.connections.length
Manager.allocate = function allocate (callback) {
var total, i
, Manager = this;
i = total = this.connections.length;
// check for available
while(i--){
if (this.isAvailable(this.connections[i])){
while (i--){
if (this.isAvailable(this.connections[i])) {
return callback(false, this.connections[i]);
}
}
// create new
if (total < this.total){
if (total < this.total) {
return this.connections.push(this.factory.apply(this, arguments));
}
// wait untill the next event loop tick, to try again
process.nextTick(function(){Manager.allocate(callback)});
process.nextTick(function next () {
Manager.allocate(callback);
});
};
Manager.isAvailable = function(connection){
Manager.isAvailable = function isAvailable (connection) {
var readyState = connection.readyState;
return (readyState == 'open' || readyState == 'writeOnly') && !(connection._writeQueue && connection._writeQueue.length);
return (readyState === 'open' || readyState === 'writeOnly')
&& !(connection._writeQueue && connection._writeQueue.length);
};
Manager.remove = function(connection){
Manager.remove = function remove (connection) {
var position = this.connections.indexOf(connection);
if (position !== -1) this.connections.splice(position, 1);
if (connection.readyState && connection.readyState !== 'closed' && connection.end) connection.end();
if (connection.readyState && connection.readyState !== 'closed' && connection.end) {
connection.end();
}
};
Manager.free = function(keep){
Manager.free = function freemymemories (keep) {
var save = 0
, connection;
while(this.connections.length){
while (this.connections.length) {
connection = this.connections.shift();
if(save < keep && this.isAvailable(this.connection[0])){
save++
if (save < keep && this.isAvailable(this.connection[0])) {
save++;
continue;
}
this.remove(connection);
}
};
......@@ -18,6 +18,12 @@ var HashRing = require('hashring')
, Manager = Connection.Manager
, IssueLog = Connection.IssueLog;
/**
* Variable lookups
*/
var curry = Utils.curry;
/**
* Constructs a new memcached client
*
......@@ -30,21 +36,22 @@ var HashRing = require('hashring')
function Client (args, options) {
var servers = []
, weights = {}
, regular = 'localhost:11211'
, key;
// Parse down the connection arguments
switch (Object.prototype.toString.call(args)) {
case '[object String]':
servers.push(args);
break;
case '[object Object]':
weights = args;
servers = Object.keys(args);
break;
case '[object Array]':
servers = args.length ? args : [regular];
break;
default:
servers = args;
servers.push(args || regular);
break;
}
......@@ -153,7 +160,7 @@ Client.config = {
, error: function streamError (err) {
memcached.connectionIssue(err, S, callback);
}
, data: Utils.curry(memcached, privates.buffer, S)
, data: curry(memcached, privates.buffer, S)
, timeout: function streamTimeout () {
Manager.remove(this);
}
......@@ -466,7 +473,7 @@ Client.config = {
// Fill the object
resultSet.forEach(function each (statSet) {
response[statSet[0]] = statSet[1];
if (statSet) response[statSet[0]] = statSet[1];
});
return response;
......@@ -486,10 +493,12 @@ Client.config = {
// Fill the object
resultSet.forEach(function each (statSet) {
var identifier = statSet[0].split(':');
if (statSet) {
var identifier = statSet[0].split(':');
if (!response[identifier[0]]) response[identifier[0]] = {};
response[identifier[0]][identifier[1]] = statSet[1];
if (!response[identifier[0]]) response[identifier[0]] = {};
response[identifier[0]][identifier[1]] = statSet[1];
}
});
return response;
......@@ -503,10 +512,12 @@ Client.config = {
// Fill the object
resultSet.forEach(function each (statSet) {
var identifier = statSet[0].split(':');
if (statSet) {
var identifier = statSet[0].split(':');
if (!response[identifier[1]]) response[identifier[1]] = {};
response[identifier[1]][identifier[2]] = statSet[1];
if (!response[identifier[1]]) response[identifier[1]] = {};
response[identifier[1]][identifier[2]] = statSet[1];
}
});
return response;
......@@ -683,13 +694,12 @@ Client.config = {
memcached.getMulti = function getMulti(keys, callback) {
var memcached = this
, responses = {}
, errors = null
, errors = []
, calls;
// handle multiple responses and cache them untill we receive all.
function handle (err, results) {
if (err) {
errors = errors || [];
errors.push(err);
}
......@@ -722,7 +732,6 @@ Client.config = {
// enough to ignore those.
privates.setters = function setters (type, validate, key, value, lifetime, callback, cas) {
var flag = 0
, memcached = this
, valuetype = typeof value
, length;
......@@ -737,11 +746,11 @@ Client.config = {
}
length = Buffer.byteLength(value);
if (length > memcached.maxValue) {
return privates.errorResponse('The length of the value is greater than ' + memcached.maxValue, callback);
if (length > this.maxValue) {
return privates.errorResponse('The length of the value is greater than ' + this.maxValue, callback);
}
memcached.command(function settersCommand (noreply) {
this.command(function settersCommand (noreply) {
return {
key: key
, callback: callback
......@@ -750,7 +759,7 @@ Client.config = {
, cas: cas
, validate: validate
, type: type
, redundancyEnabled: true
, redundancyEnabled: false
, command: [type, key, flag, lifetime, length].join(' ') +
(cas ? ' ' + cas : '') +
(noreply ? NOREPLY : '') +
......@@ -760,8 +769,7 @@ Client.config = {
};
// Curry the function and so we can tell the type our private set function
memcached.set = Utils.curry(memcached
, privates.setters
memcached.set = curry(undefined, privates.setters
, 'set'
, [
['key', String]
......@@ -771,8 +779,7 @@ Client.config = {
]
);
memcached.replace = Utils.curry(memcached
, privates.setters
memcached.replace = curry(undefined, privates.setters
, 'replace'
, [
['key', String]
......@@ -782,8 +789,7 @@ Client.config = {
]
);
memcached.add = Utils.curry(memcached
, privates.setters
memcached.add = curry(undefined, privates.setters
, 'add'
, [
['key', String]
......@@ -863,8 +869,8 @@ Client.config = {
};
// Curry the function and so we can tell the type our private incrdecr
memcached.increment = memcached.incr = Utils.curry(false, privates.incrdecr, 'incr');
memcached.decrement = memcached.decr = Utils.curry(false, privates.incrdecr, 'decr');
memcached.increment = memcached.incr = curry(undefined, privates.incrdecr, 'incr');
memcached.decrement = memcached.decr = curry(undefined, privates.incrdecr, 'decr');
// Deletes the keys from the servers
memcached.del = function del (key, callback){
......@@ -918,12 +924,12 @@ Client.config = {
};
// Curry the function and so we can tell the type our private singles
memcached.version = Utils.curry(false, privates.singles, 'version');
memcached.flush = Utils.curry(false, privates.singles, 'flush_all');
memcached.stats = Utils.curry(false, privates.singles, 'stats');
memcached.settings = Utils.curry(false, privates.singles, 'stats settings');
memcached.slabs = Utils.curry(false, privates.singles, 'stats slabs');
memcached.items = Utils.curry(false, privates.singles, 'stats items');
memcached.version = curry(undefined, privates.singles, 'version');
memcached.flush = curry(undefined, privates.singles, 'flush_all');
memcached.stats = curry(undefined, privates.singles, 'stats');
memcached.settings = curry(undefined, privates.singles, 'stats settings');
memcached.slabs = curry(undefined, privates.singles, 'stats slabs');
memcached.items = curry(undefined, privates.singles, 'stats items');
// aliases
memcached.flushAll = memcached.flush;
......
......@@ -61,16 +61,6 @@ exports.validateArg = function validateArg(args, config){
return true;
};
// currys a function
exports.curry = function curry (context, func) {
var copy = Array.prototype.slice
, args = copy.call(arguments, 2);
return function () {
return func.apply(context || this, args.concat(copy.call(arguments)));
};
};
// a small util to use an object for eventEmitter
exports.fuse = function fuse (target, handlers) {
for(var i in handlers)
......@@ -88,6 +78,16 @@ exports.merge = function merge (target, obj) {
return target;
};
// curry/bind functions
exports.curry = function curry (context, fn) {
var copy = Array.prototype.slice
, args = copy.call(arguments, 2);
return function bowlofcurry () {
return fn.apply(context || this, args.concat(copy.call(arguments)));
};
};
// a small items iterator
exports.Iterator = function iterator (collection, callback) {
var arr = Array.isArray(collection)
......
{
"name": "memcached"
, "version": "0.0.8"
, "version": "0.0.9"
, "author": "Arnout Kazemier"
, "description": "A fully featured Memcached API client, supporting both single and clustered Memcached servers through consistent hashing and failover/failure. Memcached is rewrite of nMemcached, which will be deprecated in the near future."
, "main": "index"
......@@ -26,14 +26,18 @@
, "url": "http://www.3rd-Eden.com"
}]
, "license": {
"type": "MIT"
, "url": "http://github.com/3rd-Eden/node-memcached/blob/master/LICENSE"
"type": "MIT"
, "url": "http://github.com/3rd-Eden/node-memcached/blob/master/LICENSE"
}
, "repository": {
"type": "git"
, "url" : "http://github.com/3rd-Eden/node-memcached.git"
"type": "git"
, "url" : "http://github.com/3rd-Eden/node-memcached.git"
}
, "dependencies": {
"hashring": "*"
}
, "devDependencies": {
"mocha": "*"
, "should": "*"
}
}