Commit 88c6cfa2 authored by Nathan Harris's avatar Nathan Harris

53 -- Use `CircularBuffer` for `RedisCommandHandler` queue

parent 5672c555
Pipeline #67497566 passed with stages
in 4 minutes and 39 seconds
......@@ -32,7 +32,7 @@ public struct RedisCommandContext {
/// A `ChannelDuplexHandler` that works with `RedisCommandContext`s to send commands and forward responses.
open class RedisCommandHandler {
/// Queue of promises waiting to receive a response value from a sent command.
private var commandResponseQueue: [EventLoopPromise<RESPValue>]
private var commandResponseQueue: CircularBuffer<EventLoopPromise<RESPValue>>
private var logger: Logger
deinit {
......@@ -40,8 +40,8 @@ open class RedisCommandHandler {
logger.warning("Command handler deinit when queue is not empty. Current size: \(commandResponseQueue.count)")
}
public init(logger: Logger = Logger(label: "RedisNIO.CommandHandler")) {
self.commandResponseQueue = []
public init(logger: Logger = Logger(label: "RedisNIO.CommandHandler"), initialQueueCapacity: Int = 5) {
self.commandResponseQueue = CircularBuffer(initialCapacity: initialQueueCapacity)
self.logger = logger
self.logger[metadataKey: "CommandHandler"] = "\(UUID())"
}
......@@ -58,12 +58,16 @@ extension RedisCommandHandler: ChannelInboundHandler {
///
/// See `ChannelInboundHandler.errorCaught(context:error:)`
public func errorCaught(context: ChannelHandlerContext, error: Error) {
guard let leadPromise = commandResponseQueue.last else {
return assertionFailure("Received unexpected error while idle: \(error.localizedDescription)")
}
leadPromise.fail(error)
let queue = self.commandResponseQueue
assert(queue.count > 0, "Received unexpected error while idle: \(error.localizedDescription)")
self.commandResponseQueue.removeAll()
queue.forEach { $0.fail(error) }
logger.critical("Error in channel pipeline.", metadata: ["error": .string(error.localizedDescription)])
context.fireErrorCaught(error)
RedisMetrics.commandFailureCount.increment()
}
/// Invoked by NIO when a read has been fired from earlier in the response chain. This forwards the unwrapped
......@@ -71,17 +75,14 @@ extension RedisCommandHandler: ChannelInboundHandler {
///
/// See `ChannelInboundHandler.channelRead(context:data:)`
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let value = unwrapInboundIn(data)
let value = self.unwrapInboundIn(data)
guard let leadPromise = commandResponseQueue.last else {
guard let leadPromise = self.commandResponseQueue.popFirst() else {
assertionFailure("Read triggered with an empty promise queue! Ignoring: \(value)")
logger.critical("Read triggered with no promise waiting in the queue!")
return
}
let popped = commandResponseQueue.popLast()
assert(popped != nil)
switch value {
case .error(let e):
leadPromise.fail(e)
......@@ -108,8 +109,11 @@ extension RedisCommandHandler: ChannelOutboundHandler {
///
/// See `ChannelOutboundHandler.write(context:data:promise:)`
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let commandContext = unwrapOutboundIn(data)
commandResponseQueue.insert(commandContext.responsePromise, at: 0)
context.write(wrapOutboundOut(commandContext.command), promise: promise)
let commandContext = self.unwrapOutboundIn(data)
self.commandResponseQueue.append(commandContext.responsePromise)
context.write(
self.wrapOutboundOut(commandContext.command),
promise: promise
)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment