Commit 238ebb74 authored by Nathan Harris's avatar Nathan Harris

Merge branch 'connection-refactor' into '47-proposal-feedback'

Refactor `RedisConnection`

See merge request Mordil/swift-redis-nio-client!71
parents b807af58 ea7c755d
Pipeline #69999945 passed with stages
in 4 minutes and 28 seconds
......@@ -24,7 +24,7 @@ To install **RedisNIO**, just add the package as a dependency in your [**Package
```swift
dependencies: [
.package(url: "https://github.com/Mordil/swift-redis-nio-client.git", from: "1.0.0-alpha.1")
.package(url: "https://github.com/Mordil/swift-redis-nio-client.git", from: "1.0.0-alpha.4")
]
```
......@@ -32,18 +32,20 @@ and run the following command: `swift package resolve`
## :zap: Getting Started
**RedisNIO** is ready to use right after installation.
**RedisNIO** is quick to use - all you need is an [`EventLoop`](https://apple.github.io/swift-nio/docs/current/NIO/Protocols/EventLoop.html) from **SwiftNIO**.
```swift
import NIO
import RedisNIO
let connection = Redis.makeConnection(
let eventLoop: EventLoop = ...
let connection = RedisConnection.connect(
to: try .init(ipAddress: "127.0.0.1", port: RedisConnection.defaultPort),
password: "my_pass"
on: eventLoop
).wait()
let result = try connection.set("my_key", to: "some value")
.flatMap { return connection.get("my_key" }
.flatMap { return connection.get("my_key") }
.wait()
print(result) // Optional("some value")
......
//===----------------------------------------------------------------------===//
//
// This source file is part of the RedisNIO open source project
//
// Copyright (c) 2019 RedisNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of RedisNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import struct Logging.Logger
import NIO
/// Top-level namespace for the `RedisNIO` package.
///
/// To avoid a cluttered global namespace, named definitions that do not start with a `Redis` prefix
/// are scoped within this namespace.
public enum Redis { }
// MARK: Connection Factory
extension Redis {
/// Makes a new connection to a Redis instance.
///
/// As soon as the connection has been opened on the host, an "AUTH" command will be sent to
/// Redis to authorize use of additional commands on this new connection.
///
/// See [https://redis.io/commands/auth](https://redis.io/commands/auth)
///
/// Example:
///
/// let elg = MultiThreadedEventLoopGroup(numberOfThreads: 3)
/// let connection = Redis.makeConnection(
/// to: .init(ipAddress: "127.0.0.1", port: RedisConnection.defaultPort),
/// using: elg,
/// password: "my_pass"
/// )
///
/// - Parameters:
/// - socket: The `SocketAddress` information of the Redis instance to connect to.
/// - group: The `EventLoopGroup` to build the connection on. Default is a single threaded `EventLoopGroup`.
/// - password: The optional password to authorize the client with.
/// - logger: The `Logger` instance to log with.
/// - Returns: A `RedisConnection` instance representing this new connection.
public static func makeConnection(
to socket: SocketAddress,
using group: EventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1),
password: String? = nil,
logger: Logger = Logger(label: "RedisNIO.RedisConnection")
) -> EventLoopFuture<RedisConnection> {
let client = ClientBootstrap.makeRedisTCPClient(group: group)
return client.connect(to: socket)
.map { return RedisConnection(channel: $0, logger: logger) }
.flatMap { client in
guard let pw = password else {
return group.next().makeSucceededFuture(client)
}
let args = [RESPValue(bulk: pw)]
return client.send(command: "AUTH", with: args)
.map { _ in return client }
}
}
}
......@@ -12,12 +12,7 @@
//
//===----------------------------------------------------------------------===//
import struct Foundation.UUID
import struct Dispatch.DispatchTime
import Logging
import Metrics
import NIO
import NIOConcurrencyHelpers
/// An object capable of sending commands and receiving responses.
///
......@@ -46,144 +41,3 @@ extension RedisClient {
return self.send(command: command, with: [])
}
}
private let loggingKeyID = "RedisConnection"
/// A `RedisClient` implementation that represents an individual connection
/// to a Redis database instance.
///
/// `RedisConnection` comes with logging by default.
///
/// See `RedisClient`
public final class RedisConnection: RedisClient {
public static let defaultPort = 6379
private enum ConnectionState {
case open
case closed
}
/// See `RedisClient.eventLoop`
public var eventLoop: EventLoop { return channel.eventLoop }
/// Is the client still connected to Redis?
public var isConnected: Bool { return state != .closed }
/// Controls the timing behavior of sending commands over this connection. The default is `true`.
///
/// When set to `false`, the host will "queue" commands and determine when to send all at once,
/// while `true` will force each command to be sent as soon as they are "queued".
/// - Note: Setting this to `true` will trigger all "queued" commands to be sent.
public var sendCommandsImmediately: Bool {
get { return autoflush.load() }
set(newValue) {
if newValue { channel.flush() }
autoflush.store(newValue)
}
}
let channel: Channel
private var logger: Logger
private let autoflush = Atomic<Bool>(value: true)
private let _stateLock = Lock()
private var _state: ConnectionState
private var state: ConnectionState {
get { return _stateLock.withLock { self._state } }
set(newValue) { _stateLock.withLockVoid { self._state = newValue } }
}
deinit {
if isConnected {
assertionFailure("close() was not called before deinit!")
logger.warning("RedisConnection did not properly shutdown before deinit!")
}
}
/// Creates a new connection on the provided `Channel`.
/// - Important: Call `close()` before deinitializing to properly cleanup resources.
/// - Note: This connection will take ownership of the channel.
/// - Parameters:
/// - channel: The `Channel` to read and write from.
/// - logger: The `Logger` instance to use for all logging purposes.
public init(channel: Channel, logger: Logger = Logger(label: "RedisNIO.RedisConnection")) {
self.channel = channel
self.logger = logger
self.logger[metadataKey: loggingKeyID] = "\(UUID())"
self.logger.debug("Connection created.")
self._state = .open
RedisMetrics.activeConnectionCount += 1
RedisMetrics.totalConnectionCount.increment()
}
/// Sends a `QUIT` command, then closes the `Channel` this instance was initialized with.
///
/// See [https://redis.io/commands/quit](https://redis.io/commands/quit)
/// - Returns: An `EventLoopFuture` that resolves when the connection has been closed.
@discardableResult
public func close() -> EventLoopFuture<Void> {
guard isConnected else {
logger.notice("Connection received more than one close() request.")
return channel.eventLoop.makeSucceededFuture(())
}
let result = send(command: "QUIT")
.flatMap { _ in
let promise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.close(promise: promise)
return promise.futureResult
}
.map {
self.logger.debug("Connection closed.")
RedisMetrics.activeConnectionCount -= 1
}
.recover {
self.logger.error("Encountered error during close(): \($0)")
self.state = .open
}
// setting it to closed now prevents multiple close() chains, but doesn't stop the QUIT command
// if the connection wasn't closed, it's reset in the callback chain
state = .closed
return result
}
/// Sends commands to the Redis instance this connection is tied to.
///
/// See `RedisClient.send(command:with:)`
///
/// - Note: The timing of when commands are actually sent to Redis are controlled by
/// the `sendCommandsImmediately` property.
public func send(
command: String,
with arguments: [RESPValue]
) -> EventLoopFuture<RESPValue> {
guard isConnected else {
logger.error("\(RedisNIOError.connectionClosed.localizedDescription)")
return channel.eventLoop.makeFailedFuture(RedisNIOError.connectionClosed)
}
var commandParts: [RESPValue] = [.init(bulk: command)]
commandParts.append(contentsOf: arguments)
let promise = channel.eventLoop.makePromise(of: RESPValue.self)
let context = RedisCommand(
command: .array(commandParts),
promise: promise
)
let startTime = DispatchTime.now().uptimeNanoseconds
promise.futureResult.whenComplete { result in
let duration = DispatchTime.now().uptimeNanoseconds - startTime
RedisMetrics.commandRoundTripTime.recordNanoseconds(duration)
guard case let .failure(error) = result else { return }
self.logger.error("\(error.localizedDescription)")
}
logger.debug("Sending command \"\(command)\" with \(arguments)")
guard sendCommandsImmediately else {
return channel.write(context).flatMap { promise.futureResult }
}
return channel.writeAndFlush(context).flatMap { promise.futureResult }
}
}
This diff is collapsed.
......@@ -16,20 +16,30 @@ import Foundation
import NIO
import RedisNIO
extension Redis {
/// Creates a `RedisConnection` using `REDIS_URL` and `REDIS_PW` environment variables if available.
extension RedisConnection {
/// Creates a connection intended for tests using `REDIS_URL` and `REDIS_PW` environment variables if available.
///
/// The default URL is `127.0.0.1` while the default port is `RedisConnection.defaultPort`.
///
/// If `REDIS_PW` is not defined, no authentication will happen on the connection.
public static func makeConnection() throws -> EventLoopFuture<RedisConnection> {
/// - Parameters:
/// - eventLoop: The event loop that the connection should execute on.
/// - port: The port to connect on.
/// - Returns: A `NIO.EventLoopFuture` that resolves with the new connection.
public static func connect(
on eventLoop: EventLoop,
port: Int = RedisConnection.defaultPort
) -> EventLoopFuture<RedisConnection> {
let env = ProcessInfo.processInfo.environment
return Redis.makeConnection(
to: try .makeAddressResolvingHost(
env["REDIS_URL"] ?? "127.0.0.1",
port: RedisConnection.defaultPort
),
password: env["REDIS_PW"]
)
let host = env["REDIS_URL"] ?? "127.0.0.1"
let address: SocketAddress
do {
address = try SocketAddress.makeAddressResolvingHost(host, port: port)
} catch {
return eventLoop.makeFailedFuture(error)
}
return RedisConnection.connect(to: address, on: eventLoop, password: env["REDIS_PW"])
}
}
//===----------------------------------------------------------------------===//
//
// This source file is part of the RedisNIO open source project
//
// Copyright (c) 2019 RedisNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of RedisNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIO
import RedisNIO
import XCTest
/// A helper `XCTestCase` subclass that does the standard work of creating a connection to use in test cases.
///
/// See `RedisConnection.connect(to:port:)` to understand how connections are made.
open class RedisIntegrationTestCase: XCTestCase {
public var connection: RedisConnection!
private let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2)
deinit {
do {
try self.eventLoopGroup.syncShutdownGracefully()
} catch {
print("Failed to gracefully shutdown ELG: \(error)")
}
}
/// Creates a `RedisNIO.RedisConnection` for the next test case, calling `fatalError` if it was not successful.
///
/// See `XCTest.XCTestCase.setUp()`
open override func setUp() {
do {
connection = try self.makeNewConnection()
} catch {
fatalError("Failed to make a RedisConnection: \(error)")
}
}
/// Sends a "FLUSHALL" command to Redis to clear it of any data from the previous test, then closes the connection.
///
/// If any steps fail, a `fatalError` is thrown.
///
/// See `XCTest.XCTestCase.tearDown()`
open override func tearDown() {
do {
if self.connection.isConnected {
_ = try self.connection.send(command: "FLUSHALL")
.flatMap { _ in self.connection.close() }
.wait()
}
self.connection = nil
} catch {
fatalError("Failed to properly cleanup connection: \(error)")
}
}
/// Creates a new connection for use in tests.
///
/// See `RedisConnection.connect(to:port:)`
/// - Returns: The new `RedisNIO.RedisConnection`.
public func makeNewConnection() throws -> RedisConnection {
return try RedisConnection.connect(on: eventLoopGroup.next()).wait()
}
}
......@@ -16,31 +16,13 @@
import RedisNIOTestUtils
import XCTest
final class BasicCommandsTests: XCTestCase {
private var connection: RedisConnection!
override func setUp() {
do {
connection = try Redis.makeConnection().wait()
} catch {
XCTFail("Failed to create RedisConnection! \(error)")
}
}
override func tearDown() {
_ = try? connection.send(command: "FLUSHALL").wait()
try? connection.close().wait()
connection = nil
}
final class BasicCommandsTests: RedisIntegrationTestCase {
func test_select() {
XCTAssertNoThrow(try connection.select(database: 3).wait())
}
func test_delete() {
do {
func test_delete() throws {
let keys = [ #function + "1", #function + "2", #function + "3" ]
try connection.close().wait()
try connection.set(keys[0], to: "value").wait()
try connection.set(keys[1], to: "value").wait()
try connection.set(keys[2], to: "value").wait()
......@@ -53,10 +35,6 @@ final class BasicCommandsTests: XCTestCase {
let third = try connection.delete([keys[1], keys[2]]).wait()
XCTAssertEqual(third, 2)
}
catch {
print("failed")
}
}
func test_expire() throws {
......
......@@ -13,25 +13,10 @@
//===----------------------------------------------------------------------===//
@testable import RedisNIO
import RedisNIOTestUtils
import XCTest
final class HashCommandsTests: XCTestCase {
private var connection: RedisConnection!
override func setUp() {
do {
connection = try Redis.makeConnection().wait()
} catch {
XCTFail("Failed to create RedisConnection! \(error)")
}
}
override func tearDown() {
_ = try? connection.send(command: "FLUSHALL").wait()
try? connection.close().wait()
connection = nil
}
final class HashCommandsTests: RedisIntegrationTestCase {
func test_hset() throws {
var result = try connection.hset("test", to: "\(#line)", in: #function).wait()
XCTAssertTrue(result)
......
......@@ -13,25 +13,10 @@
//===----------------------------------------------------------------------===//
@testable import RedisNIO
import RedisNIOTestUtils
import XCTest
final class ListCommandsTests: XCTestCase {
private var connection: RedisConnection!
override func setUp() {
do {
connection = try Redis.makeConnection().wait()
} catch {
XCTFail("Failed to create RedisConnection! \(error)")
}
}
override func tearDown() {
_ = try? connection.send(command: "FLUSHALL").wait()
try? connection.close().wait()
connection = nil
}
final class ListCommandsTests: RedisIntegrationTestCase {
func test_llen() throws {
var length = try connection.llen(of: #function).wait()
XCTAssertEqual(length, 0)
......@@ -108,14 +93,16 @@ final class ListCommandsTests: XCTestCase {
let element = try connection.brpoplpush(from: "first", to: "second").wait() ?? .null
XCTAssertEqual(Int(fromRESP: element), 10)
let blockingConnection = try Redis.makeConnection().wait()
let blockingConnection = try self.makeNewConnection()
let expectation = XCTestExpectation(description: "brpoplpush should never return")
_ = blockingConnection.bzpopmin(from: #function)
.always { _ in expectation.fulfill() }
.always { _ in
expectation.fulfill()
blockingConnection.close()
}
let result = XCTWaiter.wait(for: [expectation], timeout: 1)
XCTAssertEqual(result, .timedOut)
try blockingConnection.channel.close().wait()
}
func test_linsert() throws {
......@@ -156,14 +143,16 @@ final class ListCommandsTests: XCTestCase {
let pop2 = try connection.blpop(from: ["fake", "first"]).wait()
XCTAssertEqual(pop2?.0, "first")
let blockingConnection = try Redis.makeConnection().wait()
let blockingConnection = try self.makeNewConnection()
let expectation = XCTestExpectation(description: "blpop should never return")
_ = blockingConnection.bzpopmin(from: #function)
.always { _ in expectation.fulfill() }
.always { _ in
expectation.fulfill()
blockingConnection.close()
}
let result = XCTWaiter.wait(for: [expectation], timeout: 1)
XCTAssertEqual(result, .timedOut)
try blockingConnection.channel.close().wait()
}
func test_lpush() throws {
......@@ -213,14 +202,16 @@ final class ListCommandsTests: XCTestCase {
let pop2 = try connection.brpop(from: ["fake", "first"]).wait()
XCTAssertEqual(pop2?.0, "first")
let blockingConnection = try Redis.makeConnection().wait()
let blockingConnection = try self.makeNewConnection()
let expectation = XCTestExpectation(description: "brpop should never return")
_ = blockingConnection.bzpopmin(from: #function)
.always { _ in expectation.fulfill() }
.always { _ in
expectation.fulfill()
blockingConnection.close()
}
let result = XCTWaiter.wait(for: [expectation], timeout: 1)
XCTAssertEqual(result, .timedOut)
try blockingConnection.channel.close().wait()
}
func test_rpush() throws {
......
......@@ -13,25 +13,10 @@
//===----------------------------------------------------------------------===//
@testable import RedisNIO
import RedisNIOTestUtils
import XCTest
final class SetCommandsTests: XCTestCase {
private var connection: RedisConnection!
override func setUp() {
do {
connection = try Redis.makeConnection().wait()
} catch {
XCTFail("Failed to create RedisConnection! \(error)")
}
}
override func tearDown() {
_ = try? connection.send(command: "FLUSHALL").wait()
try? connection.close().wait()
connection = nil
}
final class SetCommandsTests: RedisIntegrationTestCase {
func test_sadd() throws {
var insertCount = try connection.sadd([1, 2, 3], to: #function).wait()
XCTAssertEqual(insertCount, 3)
......
......@@ -14,19 +14,17 @@
import NIO
@testable import RedisNIO
import RedisNIOTestUtils
import XCTest
final class SortedSetCommandsTests: XCTestCase {
final class SortedSetCommandsTests: RedisIntegrationTestCase {
private static let testKey = "SortedSetCommandsTests"
private var connection: RedisConnection!
private var key: String { return SortedSetCommandsTests.testKey }
override func setUp() {
super.setUp()
do {
connection = try Redis.makeConnection().wait()
var dataset: [(Int, Double)] = []
for index in 1...10 {
dataset.append((index, Double(index)))
......@@ -38,12 +36,6 @@ final class SortedSetCommandsTests: XCTestCase {
}
}
override func tearDown() {
_ = try? connection.send(command: "FLUSHALL").wait()
try? connection.close().wait()
connection = nil
}
func test_zadd() throws {
_ = try connection.send(command: "FLUSHALL").wait()
......@@ -168,14 +160,16 @@ final class SortedSetCommandsTests: XCTestCase {
XCTAssertEqual(min2?.0, key)
XCTAssertEqual(min2?.1, 2)
let blockingConnection = try Redis.makeConnection().wait()
let blockingConnection = try self.makeNewConnection()
let expectation = XCTestExpectation(description: "bzpopmin should never return")
_ = blockingConnection.bzpopmin(from: #function)
.always { _ in expectation.fulfill() }
.always { _ in
expectation.fulfill()
blockingConnection.close()
}
let result = XCTWaiter.wait(for: [expectation], timeout: 1)
XCTAssertEqual(result, .timedOut)
try blockingConnection.channel.close().wait()
}
func test_zpopmax() throws {
......@@ -200,14 +194,16 @@ final class SortedSetCommandsTests: XCTestCase {
XCTAssertEqual(max2?.0, key)
XCTAssertEqual(max2?.1, 9)
let blockingConnection = try Redis.makeConnection().wait()
let blockingConnection = try self.makeNewConnection()
let expectation = XCTestExpectation(description: "bzpopmax should never return")
_ = blockingConnection.bzpopmax(from: #function)
.always { _ in expectation.fulfill() }
.always { _ in
expectation.fulfill()
blockingConnection.close()
}
let result = XCTWaiter.wait(for: [expectation], timeout: 1)
XCTAssertEqual(result, .timedOut)
try blockingConnection.channel.close().wait()
}
func test_zincrby() throws {
......
......@@ -13,27 +13,12 @@
//===----------------------------------------------------------------------===//
@testable import RedisNIO
import RedisNIOTestUtils
import XCTest
final class StringCommandsTests: XCTestCase {
final class StringCommandsTests: RedisIntegrationTestCase {
private static let testKey = "SortedSetCommandsTests"
private var connection: RedisConnection!
override func setUp() {
do {
connection = try Redis.makeConnection().wait()
} catch {
XCTFail("Failed to create RedisConnection! \(error)")
}
}
override func tearDown() {
_ = try? connection.send(command: "FLUSHALL").wait()
try? connection.close().wait()
connection = nil
}
func test_get() throws {
try connection.set(#function, to: "value").wait()
let result = try connection.get(#function).wait()
......
//===----------------------------------------------------------------------===//
//
// This source file is part of the RedisNIO open source project
//
// Copyright (c) 2019 RedisNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of RedisNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
@testable import RedisNIO
import RedisNIOTestUtils
import XCTest
final class RedisConnectionTests: RedisIntegrationTestCase {
static let expectedLogsMessage = "The following log(s) in this test are expected."
func test_unexpectedChannelClose() throws {
print(RedisConnectionTests.expectedLogsMessage)
XCTAssertTrue(self.connection.isConnected)
try self.connection.channel.close().wait()
XCTAssertFalse(self.connection.isConnected)
}
func test_callingCloseMultipleTimes() throws {