Commit 73922325 authored by Nathan Harris's avatar Nathan Harris

Add Blocking Sorted Set Pop Commands

Motivation:

To be a comprehensive library, all commands should be implemented, even if they are highly discouraged. Sorted Set's collection of commands were missing `bzpopmin` and `bzpopmax`.

Modifications:

`bzpopmin` and `bzpopmax` are supported with defaults and overloads for an easier API.
`RedisClient.channel` is now `internal` to have access during testing for bypassing normal guards for closing connections.

Result:

Users now have access to `bzpopmin` and `bzpopmax` commands.
parent d0da3e76
......@@ -292,6 +292,149 @@ extension RedisClient {
}
}
// MARK: Blocking Pop
extension RedisClient {
/// Removes the element from a sorted set with the lowest score, blocking until an element is
/// available.
///
/// - Important:
/// This will block the connection from completing further commands until an element
/// is available to pop from the set.
///
/// It is **highly** recommended to set a reasonable `timeout`
/// or to use the non-blocking `zpopmin` method where possible.
///
/// See [https://redis.io/commands/bzpopmin](https://redis.io/commands/bzpopmin)
/// - Parameters:
/// - key: The key identifying the sorted set in Redis.
/// - timeout: The time (in seconds) to wait. `0` means indefinitely.
/// - Returns:
/// The element and its associated score that was popped from the sorted set,
/// or `nil` if the timeout was reached.
@inlinable
public func bzpopmin(
from key: String,
timeout: Int = 0
) -> EventLoopFuture<(Double, RESPValue)?> {
return bzpopmin(from: [key], timeout: timeout)
.map {
guard let response = $0 else { return nil }
return (response.1, response.2)
}
}
/// Removes the element from a sorted set with the lowest score, blocking until an element is
/// available.
///
/// - Important:
/// This will block the connection from completing further commands until an element
/// is available to pop from the group of sets.
///
/// It is **highly** recommended to set a reasonable `timeout`
/// or to use the non-blocking `zpopmin` method where possible.
///
/// See [https://redis.io/commands/bzpopmin](https://redis.io/commands/bzpopmin)
/// - Parameters:
/// - keys: A list of sorted set keys in Redis.
/// - timeout: The time (in seconds) to wait. `0` means indefinitely.
/// - Returns:
/// If timeout was reached, `nil`.
///
/// Otherwise, the key of the sorted set the element was removed from, the element itself,
/// and its associated score is returned.
@inlinable
public func bzpopmin(
from keys: [String],
timeout: Int = 0
) -> EventLoopFuture<(String, Double, RESPValue)?> {
return self._bzpop(command: "BZPOPMIN", keys, timeout)
}
/// Removes the element from a sorted set with the highest score, blocking until an element is
/// available.
///
/// - Important:
/// This will block the connection from completing further commands until an element
/// is available to pop from the set.
///
/// It is **highly** recommended to set a reasonable `timeout`
/// or to use the non-blocking `zpopmax` method where possible.
///
/// See [https://redis.io/commands/bzpopmax](https://redis.io/commands/bzpopmax)
/// - Parameters:
/// - key: The key identifying the sorted set in Redis.
/// - timeout: The time (in seconds) to wait. `0` means indefinitely.
/// - Returns:
/// The element and its associated score that was popped from the sorted set,
/// or `nil` if the timeout was reached.
@inlinable
public func bzpopmax(
from key: String,
timeout: Int = 0
) -> EventLoopFuture<(Double, RESPValue)?> {
return self.bzpopmax(from: [key], timeout: timeout)
.map {
guard let response = $0 else { return nil }
return (response.1, response.2)
}
}
/// Removes the element from a sorted set with the highest score, blocking until an element is
/// available.
///
/// - Important:
/// This will block the connection from completing further commands until an element
/// is available to pop from the group of sets.
///
/// It is **highly** recommended to set a reasonable `timeout`
/// or to use the non-blocking `zpopmax` method where possible.
///
/// See [https://redis.io/commands/bzpopmax](https://redis.io/commands/bzpopmax)
/// - Parameters:
/// - keys: A list of sorted set keys in Redis.
/// - timeout: The time (in seconds) to wait. `0` means indefinitely.
/// - Returns:
/// If timeout was reached, `nil`.
///
/// Otherwise, the key of the sorted set the element was removed from, the element itself,
/// and its associated score is returned.
@inlinable
public func bzpopmax(
from keys: [String],
timeout: Int = 0
) -> EventLoopFuture<(String, Double, RESPValue)?> {
return self._bzpop(command: "BZPOPMAX", keys, timeout)
}
@usableFromInline
func _bzpop(
command: String,
_ keys: [String],
_ timeout: Int
) -> EventLoopFuture<(String, Double, RESPValue)?> {
let args = keys as [RESPValueConvertible] + [timeout]
return send(command: command, with: args)
// per the Redis docs,
// we will receive either a nil response,
// or an array with 3 elements in the form [Set Key, Element Score, Element Value]
.flatMapThrowing {
guard !$0.isNull else { return nil }
guard let response = [RESPValue]($0) else {
throw NIORedisError.responseConversion(to: [RESPValue].self)
}
assert(response.count == 3, "Unexpected response size returned!")
guard
let key = response[0].string,
let score = Double(response[1])
else {
throw NIORedisError.assertionFailure(message: "Unexpected structure in response: \(response)")
}
return (key, score, response[2])
}
}
}
// MARK: Increment
extension RedisClient {
......
......@@ -76,7 +76,7 @@ public final class RedisConnection: RedisClient {
}
}
private let channel: Channel
let channel: Channel
private var logger: Logger
private let autoflush = Atomic<Bool>(value: true)
......
......@@ -155,6 +155,26 @@ final class SortedSetCommandsTests: XCTestCase {
XCTAssertEqual(results[1].1, 10)
}
func test_bzpopmin() throws {
let nilMin = try connection.bzpopmin(from: #function, timeout: 1).wait()
XCTAssertNil(nilMin)
let min1 = try connection.bzpopmin(from: key).wait()
XCTAssertEqual(min1?.0, 1)
let min2 = try connection.bzpopmin(from: [#function, key]).wait()
XCTAssertEqual(min2?.0, key)
XCTAssertEqual(min2?.1, 2)
let blockingConnection = try Redis.makeConnection().wait()
let expectation = XCTestExpectation(description: "bzpopmin should never return")
_ = blockingConnection.bzpopmin(from: #function)
.always { _ in expectation.fulfill() }
let result = XCTWaiter.wait(for: [expectation], timeout: 1)
XCTAssertEqual(result, .timedOut)
try blockingConnection.channel.close().wait()
}
func test_zpopmax() throws {
let min = try connection.zpopmax(from: key).wait()
XCTAssertEqual(min?.1, 10)
......@@ -167,6 +187,26 @@ final class SortedSetCommandsTests: XCTestCase {
XCTAssertEqual(results[1].1, 1)
}
func test_bzpopmax() throws {
let nilMax = try connection.bzpopmax(from: #function, timeout: 1).wait()
XCTAssertNil(nilMax)
let max1 = try connection.bzpopmax(from: key).wait()
XCTAssertEqual(max1?.0, 10)
let max2 = try connection.bzpopmax(from: [#function, key]).wait()
XCTAssertEqual(max2?.0, key)
XCTAssertEqual(max2?.1, 9)
let blockingConnection = try Redis.makeConnection().wait()
let expectation = XCTestExpectation(description: "bzpopmax should never return")
_ = blockingConnection.bzpopmax(from: #function)
.always { _ in expectation.fulfill() }
let result = XCTWaiter.wait(for: [expectation], timeout: 1)
XCTAssertEqual(result, .timedOut)
try blockingConnection.channel.close().wait()
}
func test_zincrby() throws {
var score = try connection.zincrby(3_00_1398.328923, element: 1, in: key).wait()
XCTAssertEqual(score, 3_001_399.328923)
......@@ -352,7 +392,9 @@ final class SortedSetCommandsTests: XCTestCase {
("test_zcount", test_zcount),
("test_zlexcount", test_zlexcount),
("test_zpopmin", test_zpopmin),
("test_bzpopmin", test_bzpopmin),
("test_zpopmax", test_zpopmax),
("test_bzpopmax", test_bzpopmax),
("test_zincrby", test_zincrby),
("test_zunionstore", test_zunionstore),
("test_zinterstore", test_zinterstore),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment