RedisCommandHandler.swift 6.21 KB
Newer Older
1 2
//===----------------------------------------------------------------------===//
//
3
// This source file is part of the RediStack open source project
4
//
5
// Copyright (c) 2019 RediStack project authors
6 7 8
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
9
// See CONTRIBUTORS.txt for the list of RediStack project authors
10 11 12 13 14
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

15
import struct Foundation.UUID
16
import Logging
17 18
import NIO

19 20
/// The `NIO.ChannelOutboundHandler.OutboundIn` type for `RedisCommandHandler`.
///
21
/// This holds the full command message to be sent to Redis, and an `NIO.EventLoopPromise` to be fulfilled when a response has been received.
22 23
/// - Important: This struct has _reference semantics_ due to the retention of the `NIO.EventLoopPromise`.
public struct RedisCommand {
24 25 26
    /// A message waiting to be sent to Redis. A full message contains a command keyword and its arguments stored as a single `RESPValue.array`.
    public let message: RESPValue
    /// A promise to be fulfilled with the sent message's response from Redis.
27
    public let responsePromise: EventLoopPromise<RESPValue>
28

29 30
    public init(message: RESPValue, responsePromise promise: EventLoopPromise<RESPValue>) {
        self.message = message
31
        self.responsePromise = promise
32
    }
33 34
}

35 36 37 38
/// An object that operates in a First In, First Out (FIFO) request-response cycle.
///
/// `RedisCommandHandler` is a `NIO.ChannelDuplexHandler` that sends `RedisCommand` instances to Redis,
/// and fulfills the command's `NIO.EventLoopPromise` as soon as a `RESPValue` response has been received from Redis.
39
public final class RedisCommandHandler {
40
    /// FIFO queue of promises waiting to receive a response value from a sent command.
41
    private var commandResponseQueue: CircularBuffer<EventLoopPromise<RESPValue>>
42
    private var logger: Logger
43

44
    deinit {
45 46 47
        guard self.commandResponseQueue.count > 0 else { return }
        self.logger[metadataKey: "Queue Size"] = "\(self.commandResponseQueue.count)"
        self.logger.warning("Command handler deinit when queue is not empty")
48 49
    }

50 51 52 53 54 55
    /// - Parameters:
    ///     - initialQueueCapacity: The initial queue size to start with. The default is `3`. `RedisCommandHandler` stores all
    ///         `RedisCommand.responsePromise` objects into a buffer, and unless you intend to execute several concurrent commands against Redis,
    ///         and don't want the buffer to resize, you shouldn't need to set this parameter.
    ///     - logger: The `Logging.Logger` instance to use.
    ///         The logger will have a `Foundation.UUID` value attached as metadata to uniquely identify this instance.
56
    public init(initialQueueCapacity: Int = 3, logger: Logger = Logger(label: "RediStack.CommandHandler")) {
57
        self.commandResponseQueue = CircularBuffer(initialCapacity: initialQueueCapacity)
58 59
        self.logger = logger
        self.logger[metadataKey: "CommandHandler"] = "\(UUID())"
60 61 62 63 64 65
    }
}

// MARK: ChannelInboundHandler

extension RedisCommandHandler: ChannelInboundHandler {
66
    /// See `NIO.ChannelInboundHandler.InboundIn`
67 68
    public typealias InboundIn = RESPValue

69
    /// Invoked by SwiftNIO when an error has been thrown. The command queue will be drained, with each promise in the queue being failed with the error thrown.
70
    ///
71 72 73 74 75
    /// See `NIO.ChannelInboundHandler.errorCaught(context:error:)`
    /// - Important: This will also close the socket connection to Redis.
    /// - Note:`RedisMetrics.commandFailureCount` is **not** incremented from this error.
    ///
    /// A `Logging.LogLevel.critical` message will be written with the caught error.
tanner0101's avatar
tanner0101 committed
76
    public func errorCaught(context: ChannelHandlerContext, error: Error) {
77 78 79 80 81 82 83
        let queue = self.commandResponseQueue
        
        assert(queue.count > 0, "Received unexpected error while idle: \(error.localizedDescription)")
        
        self.commandResponseQueue.removeAll()
        queue.forEach { $0.fail(error) }
        
84
        self.logger.critical("Error in channel pipeline.", metadata: ["error": "\(error.localizedDescription)"])
85 86

        context.close(promise: nil)
87 88
    }

89 90 91
    /// Invoked by SwiftNIO when a read has been fired from earlier in the response chain.
    /// This forwards the decoded `RESPValue` response message to the promise waiting to be fulfilled at the front of the command queue.
    /// - Note: `RedisMetrics.commandFailureCount` and `RedisMetrics.commandSuccessCount` are incremented from this method.
92
    ///
93
    /// See `NIO.ChannelInboundHandler.channelRead(context:data:)`
tanner0101's avatar
tanner0101 committed
94
    public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
95
        let value = self.unwrapInboundIn(data)
96

97
        guard let leadPromise = self.commandResponseQueue.popFirst() else {
98
            assertionFailure("Read triggered with an empty promise queue! Ignoring: \(value)")
99
            self.logger.critical("Read triggered with no promise waiting in the queue!")
100
            return
101 102 103
        }

        switch value {
104 105 106 107 108 109 110
        case .error(let e):
            leadPromise.fail(e)
            RedisMetrics.commandFailureCount.increment()

        default:
            leadPromise.succeed(value)
            RedisMetrics.commandSuccessCount.increment()
111 112 113 114 115 116 117
        }
    }
}

// MARK: ChannelOutboundHandler

extension RedisCommandHandler: ChannelOutboundHandler {
118 119 120
    /// See `NIO.ChannelOutboundHandler.OutboundIn`
    public typealias OutboundIn = RedisCommand
    /// See `NIO.ChannelOutboundHandler.OutboundOut`
121 122
    public typealias OutboundOut = RESPValue

123 124 125
    /// Invoked by SwiftNIO when a `write` has been requested on the `Channel`.
    /// This unwraps a `RedisCommand`, storing the `NIO.EventLoopPromise` in a command queue,
    /// to fulfill later with the response to the command that is about to be sent through the `NIO.Channel`.
126
    ///
127
    /// See `NIO.ChannelOutboundHandler.write(context:data:promise:)`
tanner0101's avatar
tanner0101 committed
128
    public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
129 130 131
        let commandContext = self.unwrapOutboundIn(data)
        self.commandResponseQueue.append(commandContext.responsePromise)
        context.write(
132
            self.wrapOutboundOut(commandContext.message),
133 134
            promise: promise
        )
135 136
    }
}