Commit c8e5f4e5 authored by Ian Ward's avatar Ian Ward

Merge remote-tracking branch 'kaero/connection-issues' into connection-issues

Conflicts:
	lib/memcached.js
	test/memcached-connections.test.js
parents 8e3741e4 e4fc1921
......@@ -84,6 +84,7 @@ Client.config = {
, timeout: 5000 // after x ms the server should send a timeout if we can't connect
, failures: 5 // Number of times a server can have an issue before marked dead
, retry: 30000 // When a server has an error, wait this amount of time before retrying
, idle: 5000 // Remove connection from pool when no I/O after `idle` ms
, remove: false // remove server if dead if false, we will attempt to reconnect
, redundancy: false // allows you do re-distribute the keys over a x amount of servers
, keyCompression: true // compress keys if they are to large (md5)
......@@ -144,11 +145,24 @@ Client.config = {
manager.maxTimeout = memcached.maxTimeout;
manager.randomize = memcached.randomize;
manager.setMaxListeners(0);
manager.factory(function factory() {
var S = Array.isArray(serverTokens)
? new Stream
: new Socket
, Manager = this;
, Manager = this
, idleTimeout = function() {
Manager.remove(this);
}
, connectTimeout = function() {
memcached.connectionIssue('Stream connect timeout', S);
Manager.remove(this);
}
, streamError = function() {
memcached.connectionIssue('Stream error', S);
Manager.remove(this);
};
// config the Stream
S.streamID = sid++;
......@@ -168,12 +182,13 @@ Client.config = {
Manager.remove(this);
}
, data: curry(memcached, privates.buffer, S)
, timeout: function streamTimeout() {
Manager.remove(this);
}
, error: function streamError() {
// callback called when retries is exhausted
memcached.connectionIssue('Stream error', S);
, timeout: connectTimeout
, error: streamError
, connect: function streamConnect() {
// Remove connection timeout listener
this.setTimeout(0, connectTimeout);
// Close idle connections
this.setTimeout(this.memcached.idle, idleTimeout);
}
, end: S.end
});
......@@ -370,6 +385,7 @@ Client.config = {
memcached.HashRing.replaceServer(server, this.failOverServers.shift());
} else {
memcached.HashRing.removeServer(server);
memcached.emit('failure', details);
}
}
});
......
......@@ -8,6 +8,7 @@
var assert = require('assert')
, fs = require('fs')
, net = require('net')
, common = require('./common')
, Memcached = require('../');
......@@ -177,4 +178,53 @@ describe('Memcached connections', function () {
},10); // Make sure `retry`, which is immediate, has passed
});
});
it('should fire `failure` event when server removed and has no fallbacks', function(done) {
var connectionAttempOk = false
, mockSocket = null
, mock = net.createServer(function(socket) {
mockSocket = socket;
connectionAttempOk = true;
})
, memcached = new Memcached(['127.0.0.1:11219'], {
timeout: 3000,
idle: 1000,
retries: 0,
remove: true
})
, emittedErrors = [];
[
'issue',
'remove',
'reconnecting',
'reconnected',
'failure'
].forEach(function(event) {
memcached.on(event, function() {
if (emittedErrors[event]) {
++emittedErrors[event];
} else {
emittedErrors[event] = 1;
}
});
});
mock.listen(11219, function() {
memcached.get('y', function(err) {
var events = Object.keys(emittedErrors);
// memcached instance must emit `remove` and `failure` events
assert.strictEqual(events.length, 2);
assert.ok(~events.indexOf('remove'));
assert.ok(~events.indexOf('failure'));
assert.ok(connectionAttempOk);
assert.strictEqual(err.message, 'Connection timeout');
memcached.end();
mockSocket.destroy();
mock.close();
done();
});
});
});
});
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment