Commit a5ea70b5 authored by Andrew Bernstein's avatar Andrew Bernstein

Adding support for validating and hashing keys that are too long when performing getMulti

parent 01b68784
......@@ -3,7 +3,8 @@
/**
* Node's native modules
*/
var Stream = require('net').Stream
var EventEmitter = require('events').EventEmitter
, Stream = require('net').Stream
, Socket = require('net').Socket;
/**
......@@ -57,6 +58,7 @@ function Client (args, options) {
// merge with global and user config
Utils.merge(this, Client.config);
Utils.merge(this, options);
EventEmitter.call(this);
this.servers = servers;
this.HashRing = new HashRing(args, this.algorithm);
......@@ -66,7 +68,7 @@ function Client (args, options) {
// Allows users to configure the memcached globally or per memcached client
Client.config = {
maxKeySize: 250 // max key size allowed by Memcached
maxKeySize: 251 // max key size allowed by Memcached
, maxExpiration: 2592000 // max expiration duration allowed by Memcached
, maxValue: 1048576 // max length of value allowed by Memcached
, activeQueries: 0
......@@ -74,18 +76,10 @@ Client.config = {
, algorithm: 'crc32' // hashing algorithm that is used for key mapping
, poolSize: 10 // maximal parallel connections
, retries: 5 // Connection pool retries to pull connection from pool
, factor: 3 // Connection pool retry exponential backoff factor
, minTimeout: 1000 // Connection pool retry min delay before retrying
, maxTimeout: 60000 // Connection pool retry max delay before retrying
, randomize: false // Connection pool retry timeout randomization
, reconnect: 18000000 // if dead, attempt reconnect each xx ms
, timeout: 5000 // after x ms the server should send a timeout if we can't connect
, failures: 5 // Number of times a server can have an issue before marked dead
, failuresTimeout: 300000 // Time after which `failures` will be reset to original value, since last failure
, retry: 30000 // When a server has an error, wait this amount of time before retrying
, idle: 5000 // Remove connection from pool when no I/O after `idle` ms
, retries: 5 // amount of retries before server is dead
, retry: 30000 // timeout between retries, all call will be marked as cache miss
, remove: false // remove server if dead if false, we will attempt to reconnect
, redundancy: false // allows you do re-distribute the keys over a x amount of servers
, keyCompression: true // compress keys if they are to large (md5)
......@@ -104,9 +98,7 @@ Client.config = {
, FLAG_BINARY = 1<<2
, FLAG_NUMERIC = 1<<3;
nMemcached.prototype.__proto__ = require('events').EventEmitter.prototype;
var memcached = nMemcached.prototype
var memcached = nMemcached.prototype = new EventEmitter
, privates = {}
, undefined;
......@@ -141,25 +133,12 @@ Client.config = {
manager = new Jackpot(this.poolSize);
manager.retries = memcached.retries;
manager.factor = memcached.factor;
manager.minTimeout = memcached.minTimeout;
manager.maxTimeout = memcached.maxTimeout;
manager.randomize = memcached.randomize;
manager.setMaxListeners(0);
manager.factory(function factory() {
var S = Array.isArray(serverTokens)
? new Stream
: new Socket
, Manager = this
, idleTimeout = function() {
Manager.remove(this);
}
, streamError = function(e) {
memcached.connectionIssue(e.toString(), S);
Manager.remove(this);
};
, Manager = this;
// config the Stream
S.streamID = sid++;
......@@ -179,14 +158,8 @@ Client.config = {
Manager.remove(this);
}
, data: curry(memcached, privates.buffer, S)
, connect: function streamConnect() {
// Jackpot handles any pre-connect timeouts by calling back
// with the error object.
this.setTimeout(this.memcached.idle, idleTimeout);
// Jackpot handles any pre-connect errors, but does not handle errors
// once a connection has been made, nor does Jackpot handle releasing
// connections if an error occurs post-connect
this.on('error', streamError);
, timeout: function streamTimeout() {
Manager.remove(this);
}
, end: S.end
});
......@@ -289,10 +262,8 @@ Client.config = {
}
}
// check if any server exists or and if the server is still alive
// a server may not exist if the manager was never able to connect
// to any server.
if (!server || (server in this.issues && this.issues[server].failed)) {
// check if the server is still alive
if (server in this.issues && this.issues[server].failed) {
return query.callback && memcached.makeCallback(query.callback,new Error('Server not available'));
}
......@@ -303,28 +274,12 @@ Client.config = {
});
}
// S not set if unable to connect to server
if (!S) {
var S = {
serverAddress: server,
tokens: server.split(':').reverse()
}
var message = error || 'Unable to connect to server';
memcached.connectionIssue(message, S);
return query.callback && memcached.makeCallback(query.callback,new Error(message));
}
// Other errors besides inability to connect to server
if (error) {
memcached.connectionIssue(error.toString(), S);
return query.callback && memcached.makeCallback(query.callback,error);
}
// check for issues
if (error) return query.callback && memcached.makeCallback(query.callback,error);
if (!S) return query.callback && memcached.makeCallback(query.callback,new Error('Connect did not give a server'));
if (S.readyState !== 'open') {
var message = 'Connection readyState is set to ' + S.readyState;
memcached.connectionIssue(message, S);
return query.callback && memcached.makeCallback(query.callback,new Error(message));
return query.callback && memcached.makeCallback(query.callback,new Error('Connection readyState is set to ' + S.readySate));
}
// used for request timing
......@@ -371,8 +326,7 @@ Client.config = {
server: server
, tokens: S.tokens
, reconnect: this.reconnect
, failures: this.failures
, failuresTimeout: this.failuresTimeout
, retries: this.retries
, retry: this.retry
, remove: this.remove
});
......@@ -400,7 +354,6 @@ Client.config = {
memcached.HashRing.replaceServer(server, this.failOverServers.shift());
} else {
memcached.HashRing.removeServer(server);
memcached.emit('failure', details);
}
}
});
......@@ -448,7 +401,7 @@ Client.config = {
}
, 'SERVER_ERROR': function servererror(tokens, dataSet, err, queue, S, memcached) {
(memcached || this.memcached).connectionIssue(tokens.splice(1).join(' '), this);
(memcached || this.memcached).connectionIssue(tokens.splice(1).join(' '), S);
return [CONTINUE, false];
}
......@@ -867,6 +820,8 @@ Client.config = {
, multi:true
, type: 'get'
, command: 'get ' + key.join(' ')
, key: keys
, validate: [['key', Array], ['callback', Function]]
};
}, server);
});
......
......@@ -29,7 +29,17 @@ exports.validateArg = function validateArg (args, config) {
if (toString.call(value) !== '[object Array]') {
err = 'Argument "' + key + '" is not a valid Array.';
}
if(!err && key === 'key') {
for(var vKey in value) {
var vValue = value[vKey];
var result = validateKeySize(config, vKey, vValue);
if(result.err) {
err = result.err;
} else {
args.command = args.command.replace(vValue, result['value']);
}
}
}
break;
case Object:
......@@ -52,17 +62,11 @@ exports.validateArg = function validateArg (args, config) {
}
if (!err && key === 'key') {
if (value.length > config.maxKeySize) {
if (config.keyCompression){
args[key] = createHash('md5').update(value).digest('hex');
// also make sure you update the command
args.command = args.command.replace(value, args[key]);
} else {
err = 'Argument "' + key + '" is longer than the maximum allowed length of ' + config.maxKeySize;
}
} else if (/[\s\n\r]/.test(value)) {
err = 'The key should not contain any whitespace or new lines';
var result = validateKeySize(config, key, value);
if(result.err) {
err = result.err;
} else {
args.command = args.command.replace(value, result['value']);
}
}
break;
......@@ -82,6 +86,20 @@ exports.validateArg = function validateArg (args, config) {
return true;
};
var validateKeySize = function validateKeySize(config, key, value) {
if (value.length > config.maxKeySize) {
if (config.keyCompression){
return { err: false, value: createHash('md5').update(value).digest('hex') };
} else {
return { err: 'Argument "' + key + '" is longer than the maximum allowed length of ' + config.maxKeySize };
}
} else if (/[\s\n\r]/.test(value)) {
return { err: 'The key should not contain any whitespace or new lines' };
} else {
return { err: false, value: value };
}
};
// a small util to use an object for eventEmitter
exports.fuse = function fuse (target, handlers) {
for(var i in handlers)
......@@ -137,4 +155,4 @@ exports.escapeValue = function(value) {
//Unescapes escaped values by removing backslashes before line breaks
exports.unescapeValue = function(value) {
return value.replace(/\\(\r|\n)/g, '$1');
};
};
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment