Verified Commit e7b597fc authored by Nathan Harris's avatar Nathan Harris
Browse files

Add support for PubSub

Motivation:

One of the great features of Redis is being able to subscribe and receive messages published to specific channels
as a way of acting as a message queue for processing jobs.

PubSub requires a specific understanding of the connection model that can only be implemented directly in this library.

Modifications:

- Add: `RedisPubSubHandler` to sit in front of `RedisCommandHandler` to manage subscription callbacks and Redis registration
- Add: `publish` and the `pubsub` commands
- Add: `addPubSubHandler` extension to `NIO.Channel`
- Add: Type-safe String wrapper of `RedisChannelName` for PubSub methods
- Add: `pubsubSubscriptionNotFound` error case
- Add: `isSubscribed` property to `RedisConnection`
- Add: `availableConnectionCount` and `leasedConnectionCount` properties to `RedisConnectionPool`
- Add: Metrics for PubSub
- Add: `makeNewPool` factory method to `RedisConnectionPoolIntegrationTestCase`
- Change: `RedisClient` to require methods for PubSub management, as they are intrinsicly tied to the client's connection model
- Change: Parsing of `PING` response for handling special case in PubSub mode
- Rename: `ActiveConnectionGauge` to `RedisMetrics.IncrementalGauge`

Result:

Developers will now be able to use Redis in PubSub mode with both connections and pools.

This resolves #6
parent 45f665b9
Pipeline #198066000 passed with stage
in 0 seconds
This diff is collapsed.
......@@ -2,7 +2,7 @@
//
// This source file is part of the RediStack open source project
//
// Copyright (c) 2019 RediStack project authors
// Copyright (c) 2019-2020 RediStack project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
......@@ -36,7 +36,16 @@ extension RedisClient {
? [.init(bulk: message!)] // safe because we did a nil pre-check
: []
return send(command: "PING", with: args)
.tryConverting()
.flatMapThrowing {
// because PING is a special command allowed during pub/sub, we do manual conversion
// this is because the response format is different in pub/sub ([pong,<message>])
guard let response = $0.string ?? $0.array?[1].string else {
throw RedisClientError.assertionFailure(message: "ping message not found")
}
// if no message was sent in the ping in pubsub, then the response will be an empty string
// so we mimic a normal PONG response as if we weren't in pubsub
return response.isEmpty ? "PONG" : response
}
}
/// Select the Redis logical database having the specified zero-based numeric index.
......
//===----------------------------------------------------------------------===//
//
// This source file is part of the RediStack open source project
//
// Copyright (c) 2020 RediStack project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of RediStack project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIO
// MARK: Publish
extension RedisClient {
/// Publishes the provided message to a specific Redis channel.
///
/// See [PUBLISH](https://redis.io/commands/publish)
/// - Parameters:
/// - message: The "message" value to publish on the channel.
/// - channel: The name of the channel to publish the message to.
/// - Returns: The number of subscribed clients that received the message.
@inlinable
@discardableResult
public func publish<Message: RESPValueConvertible>(
_ message: Message,
to channel: RedisChannelName
) -> EventLoopFuture<Int> {
let args: [RESPValue] = [
.init(from: channel),
message.convertedToRESPValue()
]
return self.send(command: "PUBLISH", with: args)
.tryConverting()
}
}
// MARK: PubSub Sub-commands
extension RedisClient {
/// Resolves a list of all the channels that have at least 1 (non-pattern) subscriber.
///
/// See [PUBSUB CHANNELS](https://redis.io/commands/pubsub#pubsub-channels-pattern)
/// - Note: If no `match` pattern is provided, all active channels will be returned.
/// - Parameter match: An optional pattern of channel names to filter for.
/// - Returns: A list of all active channel names.
public func activeChannels(matching match: String? = nil) -> EventLoopFuture<[RedisChannelName]> {
var args: [RESPValue] = [.init(bulk: "CHANNELS")]
if let m = match { args.append(.init(bulk: m)) }
return self.send(command: "PUBSUB", with: args)
.tryConverting()
}
/// Resolves the total count of active subscriptions to channels that were made using patterns.
///
/// See [PUBSUB NUMPAT](https://redis.io/commands/pubsub#codepubsub-numpatcode)
/// - Returns: The total count of subscriptions made through patterns.
public func patternSubscriberCount() -> EventLoopFuture<Int> {
let args: [RESPValue] = [.init(bulk: "NUMPAT")]
return self.send(command: "PUBSUB", with: args)
.tryConverting()
}
/// Resolves a count of (non-pattern) subscribers for each given channel.
///
/// See [PUBSUB NUMSUB](https://redis.io/commands/pubsub#codepubsub-numsub-channel-1--channel-ncode)
/// - Parameter channels: A list of channel names to collect the subscriber counts for.
/// - Returns: A mapping of channel names and their (non-pattern) subscriber count.
public func subscriberCount(forChannels channels: [RedisChannelName]) -> EventLoopFuture<[RedisChannelName: Int]> {
guard channels.count > 0 else { return self.eventLoop.makeSucceededFuture([:]) }
var args: [RESPValue] = [.init(bulk: "NUMSUB")]
args.append(convertingContentsOf: channels)
return self.send(command: "PUBSUB", with: args)
.tryConverting(to: [RESPValue].self)
.flatMapThrowing { response in
assert(response.count == channels.count * 2, "Unexpected response size!")
// Redis guarantees that the response format is [channel1Name, channel1Count, channel2Name, ...]
// with the order of channels matching the order sent in the request
return try channels
.enumerated()
.reduce(into: [:]) { (result, next) in
assert(next.element.rawValue == response[next.offset].string, "Unexpected value in current index!")
guard let count = response[next.offset + 1].int else {
throw RedisClientError.assertionFailure(
message: "Unexpected value at position \(next.offset + 1) in \(response)"
)
}
result[next.element] = count
}
}
}
}
......@@ -40,7 +40,7 @@ internal final class ConnectionPool {
private let connectionFactory: (EventLoop) -> EventLoopFuture<RedisConnection>
/// A stack of connections that are active and suitable for use by clients.
private var availableConnections: ArraySlice<RedisConnection>
private(set) var availableConnections: ArraySlice<RedisConnection>
/// A buffer of users waiting for connections to be handed over.
private var connectionWaiters: CircularBuffer<Waiter>
......@@ -66,7 +66,7 @@ internal final class ConnectionPool {
private var pendingConnectionCount: Int
/// The number of connections that have been handed out to users and are in active use.
private var leasedConnectionCount: Int
private(set) var leasedConnectionCount: Int
/// Whether this connection pool is "leaky".
///
......
......@@ -2,7 +2,7 @@
//
// This source file is part of the RediStack open source project
//
// Copyright (c) 2019 RediStack project authors
// Copyright (c) 2019-2020 RediStack project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
......@@ -14,6 +14,8 @@
import NIO
// MARK: Convenience extensions
extension TimeAmount {
/// The seconds representation of the TimeAmount.
@usableFromInline
......@@ -22,13 +24,37 @@ extension TimeAmount {
}
}
// MARK: Setting up a Redis connection
// MARK: Pipeline manipulation
extension Channel {
/// Adds the baseline `ChannelHandlers` needed to support sending and receiving messages in Redis Serialization Protocol (RESP) format to the pipeline.
/// Adds the baseline channel handlers needed to support sending and receiving messages in Redis Serialization Protocol (RESP) format to the pipeline.
///
/// For implementation details, see `RedisMessageEncoder`, `RedisByteDecoder`, and `RedisCommandHandler`.
/// - Returns: An `EventLoopFuture` that resolves after all handlers have been added to the pipeline.
///
/// # Pipeline chart
/// RedisClient.send
/// |
/// v
/// +-------------------------------------------------------------------+
/// | ChannelPipeline | |
/// | TAIL | |
/// | +---------------------------------------------------------+ |
/// | | RedisCommandHandler | |
/// | +---------------------------------------------------------+ |
/// | ^ | |
/// | | v |
/// | +---------------------+ +----------------------+ |
/// | | RedisByteDecoder | | RedisMessageEncoder | |
/// | +---------------------+ +----------------------+ |
/// | | | |
/// | | HEAD | |
/// +-------------------------------------------------------------------+
/// ^ |
/// | v
/// +-----------------+ +------------------+
/// | [ Socket.read ] | | [ Socket.write ] |
/// +-----------------+ +------------------+
/// - Returns: A `NIO.EventLoopFuture` that resolves after all handlers have been added to the pipeline.
public func addBaseRedisHandlers() -> EventLoopFuture<Void> {
let handlers: [(ChannelHandler, name: String)] = [
(MessageToByteHandler(RedisMessageEncoder()), "RediStack.OutgoingHandler"),
......@@ -40,14 +66,69 @@ extension Channel {
on: self.eventLoop
)
}
/// Adds the channel handler that is responsible for handling everything related to Redis PubSub.
/// - Important: The connection that manages this channel is responsible for removing the `RedisPubSubHandler`.
///
/// # Discussion
/// PubSub responsibilities include managing subscription callbacks as well as parsing and dispatching messages received from Redis.
///
/// For implementation details, see `RedisPubSubHandler`.
///
/// The handler will be inserted in the `NIO.ChannelPipeline` just before the `RedisCommandHandler` instance.
///
/// # Pipeline chart
/// RedisClient.send
/// |
/// v
/// +-------------------------------------------------------------------+
/// | ChannelPipeline | |
/// | TAIL | |
/// | +---------------------------------------------------------+ |
/// | | RedisCommandHandler | |
/// | +---------------------------------------------------------+ |
/// | ^ | |
/// | | v |
/// | +---------------------------------------------------------+ |
/// | | (might forward) RedisPubSubHandler (forwards) |----|<-----------+
/// | +---------------------------------------------------------+ | |
/// | ^ | | +
/// | | v | RedisClient.subscribe/unsubscribe
/// | +---------------------+ +----------------------+ |
/// | | RedisByteDecoder | | RedisMessageEncoder | |
/// | +---------------------+ +----------------------+ |
/// | | | |
/// | | HEAD | |
/// +-------------------------------------------------------------------+
/// ^ |
/// | v
/// +-----------------+ +------------------+
/// | [ Socket.read ] | | [ Socket.write ] |
/// +-----------------+ +------------------+
/// - Returns: A `NIO.EventLoopFuture` that resolves the instance of the PubSubHandler that was added to the pipeline.
public func addPubSubHandler() -> EventLoopFuture<RedisPubSubHandler> {
return self.pipeline
.handler(type: RedisCommandHandler.self)
.flatMap {
let pubsubHandler = RedisPubSubHandler()
return self.pipeline
.addHandler(pubsubHandler, name: "RediStack.PubSubHandler", position: .before($0))
.map { pubsubHandler }
}
}
}
// MARK: Setting up a Redis connection
extension ClientBootstrap {
/// Makes a new `ClientBootstrap` instance with a baseline Redis `Channel` pipeline
/// for sending and receiving messages in Redis Serialization Protocol (RESP) format.
///
/// For implementation details, see `RedisMessageEncoder`, `RedisByteDecoder`, and `RedisCommandHandler`.
///
/// See also `Channel.addBaseRedisHandlers()`.
/// - Parameter group: The `EventLoopGroup` to create the `ClientBootstrap` with.
/// - Returns: A `ClientBootstrap` with the base configuration of a `Channel` pipeline for RESP messages.
/// - Returns: A TCP connection with the base configuration of a `Channel` pipeline for RESP messages.
public static func makeRedisTCPClient(group: EventLoopGroup) -> ClientBootstrap {
return ClientBootstrap(group: group)
.channelOption(
......
//===----------------------------------------------------------------------===//
//
// This source file is part of the RediStack open source project
//
// Copyright (c) 2020 RediStack project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of RediStack project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
/// A representation of a Redis Pub/Sub channel.
///
/// `RedisChannelName` is a thin wrapper around `String`, to provide stronger type-safety at compile time.
///
/// It conforms to `ExpressibleByStringLiteral` and `ExpressibleByStringInterpolation`, so creating an instance is simple:
/// ```swift
/// let channel: RedisChannelName = "channel1" // or "\(channelNameVariable)"
/// ```
public struct RedisChannelName:
RESPValueConvertible,
RawRepresentable,
ExpressibleByStringLiteral,
ExpressibleByStringInterpolation,
CustomStringConvertible, CustomDebugStringConvertible,
Comparable, Hashable, Codable
{
public let rawValue: String
/// Initializes a type-safe representation of a Redis Pub/Sub channel name.
/// - Parameter name: The name of the Redis Pub/Sub channel.
public init(_ name: String) {
self.rawValue = name
}
public var description: String { self.rawValue }
public var debugDescription: String { "\(Self.self): \(self.rawValue)" }
public init?(fromRESP value: RESPValue) {
guard let string = value.string else { return nil }
self.rawValue = string
}
public init?(rawValue: String) { self.rawValue = rawValue }
public init(stringLiteral value: String) { self.rawValue = value }
public init(from decoder: Decoder) throws {
let container = try decoder.singleValueContainer()
self.rawValue = try container.decode(String.self)
}
public static func <(lhs: RedisChannelName, rhs: RedisChannelName) -> Bool {
return lhs.rawValue < rhs.rawValue
}
public func convertedToRESPValue() -> RESPValue {
return .init(bulk: self.rawValue)
}
public func encode(to encoder: Encoder) throws {
var container = encoder.singleValueContainer()
try container.encode(self.rawValue)
}
}
......@@ -24,7 +24,7 @@ import NIO
/// let result = client.send(command: "GET", arguments: ["my_key"])
/// // result == EventLoopFuture<RESPValue>
///
/// See [https://redis.io/commands](https://redis.io/commands)
/// For the full list of available commands, see [https://redis.io/commands](https://redis.io/commands)
public protocol RedisClient {
/// The `NIO.EventLoop` that this client operates on.
var eventLoop: EventLoop { get }
......@@ -40,8 +40,82 @@ public protocol RedisClient {
/// - Parameter logger: The `Logging.Logger` instance to use for command logs.
/// - Returns: A RedisClient with the temporary override for command logging.
func logging(to logger: Logger) -> RedisClient
/// Subscribes the client to the specified Redis channels, invoking the provided message receiver each time a message is published.
///
/// See [SUBSCRIBE](https://redis.io/commands/subscribe)
/// - Important: This will establish the client in a "PubSub mode" where only a specific list of commands are allowed to be executed.
///
/// Commands issued with this client outside of that list will resolve with failures.
///
/// See the [PubSub specification](https://redis.io/topics/pubsub)
/// - Parameters:
/// - channels: The names of channels to subscribe to.
/// - receiver: A closure which will be invoked each time a channel with a name in `channels` publishes a message.
/// - subscribeHandler: An optional closure to be invoked when the subscription becomes active.
/// - unsubscribeHandler: An optional closure to be invoked when the subscription becomes inactive.
/// - Returns: A notification `NIO.EventLoopFuture` that resolves once the subscription has been registered with Redis.
func subscribe(
to channels: [RedisChannelName],
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?,
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?
) -> EventLoopFuture<Void>
/// Subscribes the client to the specified Redis channel name patterns, invoking the provided message receiver each time a message is published to
/// a matching channel.
///
///- Note: If the client is also subscribed to a channel directly by name which also matches a pattern, both subscription message receivers will be invoked.
///
/// See [PSUBSCRIBE](https://redis.io/commands/psubscribe)
/// - Important: This will establish the client in a "PubSub mode" where only a specific list of commands are allowed to be executed.
///
/// Commands issues with this client outside of that list will resolve with failures.
///
/// See the [PubSub specification](https://redis.io/topics/pubsub)
/// - Parameters:
/// - patterns: A list of glob patterns used for matching against PubSub channel names to subscribe to.
/// - receiver: A closure which will be invoked each time a channel with a name matching the specified pattern(s) publishes a message.
/// - subscribeHandler: An optional closure to be invoked when the subscription becomes active.
/// - unsubscribeHandler: An optional closure to be invoked when the subscription becomes inactive.
/// - Returns: A notification `NIO.EventLoopFuture` that resolves once the subscription has been registered with Redis.
func psubscribe(
to patterns: [String],
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?,
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?
) -> EventLoopFuture<Void>
/// Unsubscribes the client from a specific Redis channel from receiving any future published messages.
///
/// See [UNSUBSCRIBE](https://redis.io/commands/unsubscribe)
/// - Note: If the channel was not subscribed to with `subscribe(to:messageReceiver:onSubscribe:onUnsubscribe:)`,
/// then this method has no effect.
/// - Important: If no more subscriptions (pattern or channel) are active on the client, the client will be taken out of its "PubSub mode".
///
/// It will then be allowed to use any command like normal.
///
/// See the [PubSub specification](https://redis.io/topics/pubsub)
/// - Parameter channels: A list of channel names to be unsubscribed from.
/// - Returns: A notification `NIO.EventLoopFuture` that resolves once the subscription(s) have been removed from Redis.
func unsubscribe(from channels: [RedisChannelName]) -> EventLoopFuture<Void>
/// Unsubscribes the client from a pattern of Redis channel names from receiving any future published messages.
///
/// See [PUNSUBSCRIBE](https://redis.io/commands/punsubscribe)
/// - Note: This method does not unsubscribe subscriptions made with `subscribe(to:messageReceiver:onSubscribe:onUnsubscribe:)`.
/// - Important: If no more subscriptions (pattern or channel) are active on the client, the client will be taken out of its "PubSub mode".
///
/// It will then be allowed to use any command like normal.
///
/// See the [PubSub specification](https://redis.io/topics/pubsub)
/// - Parameter patterns: A list of glob patterns to be unsubscribed from.
/// - Returns: A notification `NIO.EventLoopFuture` that resolves once the subscription(s) have been removed from Redis.
func punsubscribe(from patterns: [String]) -> EventLoopFuture<Void>
}
// MARK: Extension Methods
extension RedisClient {
/// Sends the desired command without arguments.
/// - Parameter command: The command keyword to execute.
......@@ -49,6 +123,122 @@ extension RedisClient {
public func send(command: String) -> EventLoopFuture<RESPValue> {
return self.send(command: command, with: [])
}
/// Unsubscribes the client from all active Redis channel name subscriptions.
/// - Returns: A `NIO.EventLoopFuture` that resolves when the subscriptions have been removed.
public func unsubscribe() -> EventLoopFuture<Void> {
return self.unsubscribe(from: [])
}
/// Unsubscribes the client from all active Redis channel name patterns subscriptions.
/// - Returns: A `NIO.EventLoopFuture` that resolves when the subscriptions have been removed.
public func punsubscribe() -> EventLoopFuture<Void> {
return self.punsubscribe(from: [])
}
}
// MARK: Overloads
extension RedisClient {
public func unsubscribe(from channels: RedisChannelName...) -> EventLoopFuture<Void> {
return self.unsubscribe(from: channels)
}
public func punsubscribe(from patterns: String...) -> EventLoopFuture<Void> {
return self.punsubscribe(from: patterns)
}
#if swift(>=5.3)
public func subscribe(
to channels: [RedisChannelName],
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler? = nil,
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler? = nil
) -> EventLoopFuture<Void> {
return self.subscribe(to: channels, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
}
public func subscribe(
to channels: RedisChannelName...,
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler? = nil,
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler? = nil
) -> EventLoopFuture<Void> {
return self.subscribe(to: channels, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
}
public func psubscribe(
to patterns: [String],
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler? = nil,
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler? = nil
) -> EventLoopFuture<Void> {
return self.psubscribe(to: patterns, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
}
public func psubscribe(
to patterns: String...,
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler? = nil,
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler? = nil
) -> EventLoopFuture<Void> {
return self.psubscribe(to: patterns, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
}
#else
public func subscribe(
to channels: RedisChannelName...,
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?,
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?
) -> EventLoopFuture<Void> {
return self.subscribe(to: channels, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
}
public func subscribe(
to channels: RedisChannelName...,
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler? = nil,
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler? = nil,
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver
) -> EventLoopFuture<Void> {
return self.subscribe(to: channels, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
}
public func subscribe(
to channels: [RedisChannelName],
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler? = nil,
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler? = nil,
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver
) -> EventLoopFuture<Void> {
return self.subscribe(to: channels, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
}
public func psubscribe(
to patterns: String...,
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?,
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?
) -> EventLoopFuture<Void> {
return self.psubscribe(to: patterns, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
}
public func psubscribe(
to patterns: [String],
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler? = nil,
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler? = nil,
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver
) -> EventLoopFuture<Void> {
return self.psubscribe(to: patterns, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
}
public func psubscribe(
to patterns: String...,
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler? = nil,
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler? = nil,
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver
) -> EventLoopFuture<Void> {
return self.psubscribe(to: patterns, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
}
#endif
}
// MARK: Errors
......
......@@ -104,8 +104,14 @@ public final class RedisConnection: RedisClient, RedisClientWithUserContext {
public var isConnected: Bool {
// `Channel.isActive` is set to false before the `closeFuture` resolves in cases where the channel might be
// closed, or closing, before our state has been updated
return self.channel.isActive && self.state == .open
return self.channel.isActive && self.state.isConnected
}
/// Is the connection currently subscribed for PubSub?
///
/// Only a narrow list of commands are allowed when in "PubSub mode".
///
/// See [PUBSUB](https://redis.io/topics/pubsub).
public var isSubscribed: Bool { self.state.isSubscribed }
/// Controls the behavior of when sending commands over this connection. The default is `true.
///
/// When set to `false`, the commands will be placed into a buffer, and the host machine will determine when to drain the buffer.
......@@ -156,7 +162,7 @@ public final class RedisConnection: RedisClient, RedisClientWithUserContext {
self.channel.closeFuture.whenSuccess {
// if our state is still open, that means we didn't cause the closeFuture to resolve.
// update state, metrics, and logging
guard self.state == .open else { return }
guard self.state.isConnected else { return }
self.state = .closed
self.logger.error("connection was closed unexpectedly")
......@@ -168,8 +174,20 @@ public final class RedisConnection: RedisClient, RedisClientWithUserContext {
internal enum ConnectionState {
case open
case pubsub(RedisPubSubHandler)
case shuttingDown
case closed
var isConnected: Bool {
switch self {
case .open, .pubsub: return true
default: return false
}
}
var isSubscribed: Bool {
guard case .pubsub = self else { return false }