Commit 5fb0bfca authored by Nathan Harris's avatar Nathan Harris

Rename `RedisCommandContext` to `RedisCommand`, improve `RedisCommandHandler`...

Rename `RedisCommandContext` to `RedisCommand`, improve `RedisCommandHandler` usage semantics, and cleanup documentation.

Motivation:

During proposal review, it was noted that `RedisCommandContext` was a bit misleading, and the hidden reference semantics worrisome.

In addition, several of parts of the documentation around `RedisCommandHandler` were weak or also misleading.

Modifications:

- Rename `RedisCommandContext` to just `RedisCommand`
- Update documentation to be more explicit about the module who owns the types being referenced
- Update documentation to call out explicit usage semantics and behavior
- Change `RedisCommandHandler` to close the socket connection on error thrown
- Rename the "base" Redis Channel Handlers to be more explicitly named

Result:

Users should have clearer documentation on what happens when using `RedisCommandHandler` and `RedisCommand` without hidden semantics.

This contributes to issue #47.
parent 15a3c038
...@@ -16,11 +16,15 @@ import struct Foundation.UUID ...@@ -16,11 +16,15 @@ import struct Foundation.UUID
import Logging import Logging
import NIO import NIO
/// A context for `RedisCommandHandler` to operate within. /// The `NIO.ChannelOutboundHandler.OutboundIn` type for `RedisCommandHandler`.
public struct RedisCommandContext { ///
/// A full command keyword and arguments stored as a single `RESPValue`. /// This holds the command and its arguments stored as a single `RESPValue` to be sent to Redis,
/// and an `NIO.EventLoopPromise` to be fulfilled when a response has been received.
/// - Important: This struct has _reference semantics_ due to the retention of the `NIO.EventLoopPromise`.
public struct RedisCommand {
/// A command keyword and its arguments stored as a single `RESPValue.array`.
public let command: RESPValue public let command: RESPValue
/// A promise expected to be fulfilled with the `RESPValue` response to the command from Redis. /// A promise to be fulfilled with the sent command's response from Redis.
public let responsePromise: EventLoopPromise<RESPValue> public let responsePromise: EventLoopPromise<RESPValue>
public init(command: RESPValue, promise: EventLoopPromise<RESPValue>) { public init(command: RESPValue, promise: EventLoopPromise<RESPValue>) {
...@@ -29,18 +33,28 @@ public struct RedisCommandContext { ...@@ -29,18 +33,28 @@ public struct RedisCommandContext {
} }
} }
/// A `ChannelDuplexHandler` that works with `RedisCommandContext`s to send commands and forward responses. /// An object that operates in a First In, First Out (FIFO) request-response cycle.
///
/// `RedisCommandHandler` is a `NIO.ChannelDuplexHandler` that sends `RedisCommand` instances to Redis,
/// and fulfills the command's `NIO.EventLoopPromise` as soon as a `RESPValue` response has been received from Redis.
public final class RedisCommandHandler { public final class RedisCommandHandler {
/// Queue of promises waiting to receive a response value from a sent command. /// FIFO queue of promises waiting to receive a response value from a sent command.
private var commandResponseQueue: CircularBuffer<EventLoopPromise<RESPValue>> private var commandResponseQueue: CircularBuffer<EventLoopPromise<RESPValue>>
private var logger: Logger private var logger: Logger
deinit { deinit {
guard commandResponseQueue.count > 0 else { return } guard self.commandResponseQueue.count > 0 else { return }
logger.warning("Command handler deinit when queue is not empty. Current size: \(commandResponseQueue.count)") self.logger[metadataKey: "Queue Size"] = "\(self.commandResponseQueue.count)"
self.logger.warning("Command handler deinit when queue is not empty")
} }
public init(logger: Logger = Logger(label: "RedisNIO.CommandHandler"), initialQueueCapacity: Int = 5) { /// - Parameters:
/// - initialQueueCapacity: The initial queue size to start with. The default is `3`. `RedisCommandHandler` stores all
/// `RedisCommand.responsePromise` objects into a buffer, and unless you intend to execute several concurrent commands against Redis,
/// and don't want the buffer to resize, you shouldn't need to set this parameter.
/// - logger: The `Logging.Logger` instance to use.
/// The logger will have a `Foundation.UUID` value attached as metadata to uniquely identify this instance.
public init(initialQueueCapacity: Int = 3, logger: Logger = Logger(label: "RedisNIO.CommandHandler")) {
self.commandResponseQueue = CircularBuffer(initialCapacity: initialQueueCapacity) self.commandResponseQueue = CircularBuffer(initialCapacity: initialQueueCapacity)
self.logger = logger self.logger = logger
self.logger[metadataKey: "CommandHandler"] = "\(UUID())" self.logger[metadataKey: "CommandHandler"] = "\(UUID())"
...@@ -50,13 +64,16 @@ public final class RedisCommandHandler { ...@@ -50,13 +64,16 @@ public final class RedisCommandHandler {
// MARK: ChannelInboundHandler // MARK: ChannelInboundHandler
extension RedisCommandHandler: ChannelInboundHandler { extension RedisCommandHandler: ChannelInboundHandler {
/// See `ChannelInboundHandler.InboundIn` /// See `NIO.ChannelInboundHandler.InboundIn`
public typealias InboundIn = RESPValue public typealias InboundIn = RESPValue
/// Invoked by NIO when an error has been thrown. The command response promise at the front of the queue will be /// Invoked by SwiftNIO when an error has been thrown. The command queue will be drained, with each promise in the queue being failed with the error thrown.
/// failed with the error.
/// ///
/// See `ChannelInboundHandler.errorCaught(context:error:)` /// See `NIO.ChannelInboundHandler.errorCaught(context:error:)`
/// - Important: This will also close the socket connection to Redis.
/// - Note:`RedisMetrics.commandFailureCount` is **not** incremented from this error.
///
/// A `Logging.LogLevel.critical` message will be written with the caught error.
public func errorCaught(context: ChannelHandlerContext, error: Error) { public func errorCaught(context: ChannelHandlerContext, error: Error) {
let queue = self.commandResponseQueue let queue = self.commandResponseQueue
...@@ -65,21 +82,22 @@ extension RedisCommandHandler: ChannelInboundHandler { ...@@ -65,21 +82,22 @@ extension RedisCommandHandler: ChannelInboundHandler {
self.commandResponseQueue.removeAll() self.commandResponseQueue.removeAll()
queue.forEach { $0.fail(error) } queue.forEach { $0.fail(error) }
logger.critical("Error in channel pipeline.", metadata: ["error": .string(error.localizedDescription)]) self.logger.critical("Error in channel pipeline.", metadata: ["error": .string(error.localizedDescription)])
context.fireErrorCaught(error) context.close(promise: nil)
} }
/// Invoked by NIO when a read has been fired from earlier in the response chain. This forwards the unwrapped /// Invoked by SwiftNIO when a read has been fired from earlier in the response chain.
/// `RESPValue` to the promise awaiting a response at the front of the queue. /// This forwards the decoded `RESPValue` response message to the promise waiting to be fulfilled at the front of the command queue.
/// - Note: `RedisMetrics.commandFailureCount` and `RedisMetrics.commandSuccessCount` are incremented from this method.
/// ///
/// See `ChannelInboundHandler.channelRead(context:data:)` /// See `NIO.ChannelInboundHandler.channelRead(context:data:)`
public func channelRead(context: ChannelHandlerContext, data: NIOAny) { public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let value = self.unwrapInboundIn(data) let value = self.unwrapInboundIn(data)
guard let leadPromise = self.commandResponseQueue.popFirst() else { guard let leadPromise = self.commandResponseQueue.popFirst() else {
assertionFailure("Read triggered with an empty promise queue! Ignoring: \(value)") assertionFailure("Read triggered with an empty promise queue! Ignoring: \(value)")
logger.critical("Read triggered with no promise waiting in the queue!") self.logger.critical("Read triggered with no promise waiting in the queue!")
return return
} }
...@@ -98,16 +116,16 @@ extension RedisCommandHandler: ChannelInboundHandler { ...@@ -98,16 +116,16 @@ extension RedisCommandHandler: ChannelInboundHandler {
// MARK: ChannelOutboundHandler // MARK: ChannelOutboundHandler
extension RedisCommandHandler: ChannelOutboundHandler { extension RedisCommandHandler: ChannelOutboundHandler {
/// See `ChannelOutboundHandler.OutboundIn` /// See `NIO.ChannelOutboundHandler.OutboundIn`
public typealias OutboundIn = RedisCommandContext public typealias OutboundIn = RedisCommand
/// See `ChannelOutboundHandler.OutboundOut` /// See `NIO.ChannelOutboundHandler.OutboundOut`
public typealias OutboundOut = RESPValue public typealias OutboundOut = RESPValue
/// Invoked by NIO when a `write` has been requested on the `Channel`. /// Invoked by SwiftNIO when a `write` has been requested on the `Channel`.
/// This unwraps a `RedisCommandContext`, retaining a callback to forward a response to later, and forwards /// This unwraps a `RedisCommand`, storing the `NIO.EventLoopPromise` in a command queue,
/// the underlying command data further into the pipeline. /// to fulfill later with the response to the command that is about to be sent through the `NIO.Channel`.
/// ///
/// See `ChannelOutboundHandler.write(context:data:promise:)` /// See `NIO.ChannelOutboundHandler.write(context:data:promise:)`
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) { public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let commandContext = self.unwrapOutboundIn(data) let commandContext = self.unwrapOutboundIn(data)
self.commandResponseQueue.append(commandContext.responsePromise) self.commandResponseQueue.append(commandContext.responsePromise)
......
...@@ -44,9 +44,9 @@ extension Channel { ...@@ -44,9 +44,9 @@ extension Channel {
/// - Returns: An `EventLoopFuture` that resolves after all handlers have been added to the pipeline. /// - Returns: An `EventLoopFuture` that resolves after all handlers have been added to the pipeline.
public func addBaseRedisHandlers() -> EventLoopFuture<Void> { public func addBaseRedisHandlers() -> EventLoopFuture<Void> {
let handlers: [(ChannelHandler, name: String)] = [ let handlers: [(ChannelHandler, name: String)] = [
(MessageToByteHandler(RedisMessageEncoder()), "RedisNIO.Outgoing"), (MessageToByteHandler(RedisMessageEncoder()), "RedisNIO.OutgoingHandler"),
(ByteToMessageHandler(RedisByteDecoder()), "RedisNIO.Incoming"), (ByteToMessageHandler(RedisByteDecoder()), "RedisNIO.IncomingHandler"),
(RedisCommandHandler(), "RedisNIO.CommandQueue") (RedisCommandHandler(), "RedisNIO.CommandHandler")
] ]
return .andAllSucceed( return .andAllSucceed(
handlers.map { self.pipeline.addHandler($0, name: $1) }, handlers.map { self.pipeline.addHandler($0, name: $1) },
......
...@@ -166,7 +166,7 @@ public final class RedisConnection: RedisClient { ...@@ -166,7 +166,7 @@ public final class RedisConnection: RedisClient {
let args = arguments.map { $0.convertedToRESPValue() } let args = arguments.map { $0.convertedToRESPValue() }
let promise = channel.eventLoop.makePromise(of: RESPValue.self) let promise = channel.eventLoop.makePromise(of: RESPValue.self)
let context = RedisCommandContext( let context = RedisCommand(
command: .array([RESPValue(bulk: command)] + args), command: .array([RESPValue(bulk: command)] + args),
promise: promise promise: promise
) )
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment