Skip to content
Snippets Groups Projects
Commit fdb41c4d authored by Sean McGivern's avatar Sean McGivern :red_circle:
Browse files

Merge branch 'schin1-validator-pipeline-multi' into 'master'

Handle pipelined/multi cmds in cluster validator

See merge request !103584



Merged-by: default avatarSean McGivern <sean@gitlab.com>
Approved-by: default avatarSean McGivern <sean@gitlab.com>
Co-authored-by: Sylvester Chin's avatarschin1 <schin@gitlab.com>
parents 4d941fc2 7ff2a5aa
No related branches found
No related tags found
1 merge request!103584Handle pipelined/multi cmds in cluster validator
Pipeline #696544654 passed
Showing
with 330 additions and 139 deletions
......@@ -83,24 +83,26 @@ def self.set(user, request)
is_impersonated: request.session[:impersonator_id].present?
)
redis.pipelined do |pipeline|
pipeline.setex(
key_name(user.id, session_private_id),
expiry,
active_user_session.dump
)
# Deprecated legacy format - temporary to support mixed deployments
pipeline.setex(
key_name_v1(user.id, session_private_id),
expiry,
Marshal.dump(active_user_session)
)
pipeline.sadd?(
lookup_key_name(user.id),
session_private_id
)
Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
redis.pipelined do |pipeline|
pipeline.setex(
key_name(user.id, session_private_id),
expiry,
active_user_session.dump
)
# Deprecated legacy format - temporary to support mixed deployments
pipeline.setex(
key_name_v1(user.id, session_private_id),
expiry,
Marshal.dump(active_user_session)
)
pipeline.sadd?(
lookup_key_name(user.id),
session_private_id
)
end
end
end
end
......
......@@ -63,16 +63,18 @@ def join(user)
user_key = user_sessions_key(user.id)
with_redis do |redis|
redis.pipelined do |pipeline|
pipeline.sadd?(user_key, id_i)
pipeline.expire(user_key, USER_LIFETIME.to_i)
Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
redis.pipelined do |pipeline|
pipeline.sadd?(user_key, id_i)
pipeline.expire(user_key, USER_LIFETIME.to_i)
pipeline.zadd(users_key, timestamp.to_f, user.id)
pipeline.zadd(users_key, timestamp.to_f, user.id)
# We also mark for expiry when a session key is created (first user joins),
# because some users might never actively leave a session and the key could
# therefore become stale, w/o us noticing.
reset_session_expiry(pipeline)
# We also mark for expiry when a session key is created (first user joins),
# because some users might never actively leave a session and the key could
# therefore become stale, w/o us noticing.
reset_session_expiry(pipeline)
end
end
end
......@@ -83,26 +85,33 @@ def leave(user)
user_key = user_sessions_key(user.id)
with_redis do |redis|
redis.pipelined do |pipeline|
pipeline.srem?(user_key, id_i)
pipeline.zrem(users_key, user.id)
Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
redis.pipelined do |pipeline|
pipeline.srem?(user_key, id_i)
pipeline.zrem(users_key, user.id)
end
end
# cleanup orphan sessions and users
#
# this needs to be a second pipeline due to the delete operations being
# dependent on the result of the cardinality checks
user_sessions_count, session_users_count = redis.pipelined do |pipeline|
pipeline.scard(user_key)
pipeline.zcard(users_key)
end
user_sessions_count, session_users_count =
Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
redis.pipelined do |pipeline|
pipeline.scard(user_key)
pipeline.zcard(users_key)
end
end
redis.pipelined do |pipeline|
pipeline.del(user_key) unless user_sessions_count > 0
Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
redis.pipelined do |pipeline|
pipeline.del(user_key) unless user_sessions_count > 0
unless session_users_count > 0
pipeline.del(users_key)
@id = nil
unless session_users_count > 0
pipeline.del(users_key)
@id = nil
end
end
end
end
......
......@@ -161,13 +161,15 @@ def self.values_from_set(raw_key)
# timeout - The time after which the cache key should expire.
def self.write_multiple(mapping, key_prefix: nil, timeout: TIMEOUT)
with_redis do |redis|
redis.pipelined do |multi|
mapping.each do |raw_key, value|
key = cache_key_for("#{key_prefix}#{raw_key}")
Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
redis.pipelined do |multi|
mapping.each do |raw_key, value|
key = cache_key_for("#{key_prefix}#{raw_key}")
validate_redis_value!(value)
validate_redis_value!(value)
multi.set(key, value, ex: timeout)
multi.set(key, value, ex: timeout)
end
end
end
end
......
......@@ -15,11 +15,13 @@ class << self
# mapping - Write multiple cache values at once
def write_multiple(mapping)
with_redis do |redis|
redis.multi do |multi|
mapping.each do |raw_key, value|
key = cache_key_for(raw_key)
Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
redis.multi do |multi|
mapping.each do |raw_key, value|
key = cache_key_for(raw_key)
multi.set(key, gzip_compress(value.to_json), ex: EXPIRATION)
multi.set(key, gzip_compress(value.to_json), ex: EXPIRATION)
end
end
end
end
......
......@@ -15,10 +15,12 @@ def get(key)
def touch(*keys, only_if_missing: false)
etags = keys.map { generate_etag }
Gitlab::Redis::SharedState.with do |redis|
redis.pipelined do |pipeline|
keys.each_with_index do |key, i|
pipeline.set(redis_shared_state_key(key), etags[i], ex: EXPIRY_TIME, nx: only_if_missing)
Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
Gitlab::Redis::SharedState.with do |redis|
redis.pipelined do |pipeline|
keys.each_with_index do |key, i|
pipeline.set(redis_shared_state_key(key), etags[i], ex: EXPIRY_TIME, nx: only_if_missing)
end
end
end
end
......
......@@ -66,8 +66,8 @@ def query_time
query_time.round(::Gitlab::InstrumentationHelper::DURATION_PRECISION)
end
def redis_cluster_validate!(command)
::Gitlab::Instrumentation::RedisClusterValidator.validate!(command) if @redis_cluster_validation
def redis_cluster_validate!(commands)
::Gitlab::Instrumentation::RedisClusterValidator.validate!(commands) if @redis_cluster_validation
end
def enable_redis_cluster_validation
......
......@@ -10,57 +10,189 @@ module RedisClusterValidator
#
# Gitlab::Redis::Cache
# .with { |redis| redis.call('COMMAND') }
# .select { |command| command[3] != command[4] }
# .map { |command| [command[0].upcase, { first: command[3], last: command[4], step: command[5] }] }
# .select { |cmd| cmd[3] != 0 }
# .map { |cmd| [
# cmd[0].upcase,
# { first: cmd[3], last: cmd[4], step: cmd[5], single_key: cmd[3] == cmd[4] }
# ]
# }
# .sort_by(&:first)
# .to_h
#
MULTI_KEY_COMMANDS = {
"BITOP" => { first: 2, last: -1, step: 1 },
"BLPOP" => { first: 1, last: -2, step: 1 },
"BRPOP" => { first: 1, last: -2, step: 1 },
"BRPOPLPUSH" => { first: 1, last: 2, step: 1 },
"BZPOPMAX" => { first: 1, last: -2, step: 1 },
"BZPOPMIN" => { first: 1, last: -2, step: 1 },
"DEL" => { first: 1, last: -1, step: 1 },
"EXISTS" => { first: 1, last: -1, step: 1 },
"MGET" => { first: 1, last: -1, step: 1 },
"MSET" => { first: 1, last: -1, step: 2 },
"MSETNX" => { first: 1, last: -1, step: 2 },
"PFCOUNT" => { first: 1, last: -1, step: 1 },
"PFMERGE" => { first: 1, last: -1, step: 1 },
"RENAME" => { first: 1, last: 2, step: 1 },
"RENAMENX" => { first: 1, last: 2, step: 1 },
"RPOPLPUSH" => { first: 1, last: 2, step: 1 },
"SDIFF" => { first: 1, last: -1, step: 1 },
"SDIFFSTORE" => { first: 1, last: -1, step: 1 },
"SINTER" => { first: 1, last: -1, step: 1 },
"SINTERSTORE" => { first: 1, last: -1, step: 1 },
"SMOVE" => { first: 1, last: 2, step: 1 },
"SUNION" => { first: 1, last: -1, step: 1 },
"SUNIONSTORE" => { first: 1, last: -1, step: 1 },
"UNLINK" => { first: 1, last: -1, step: 1 },
"WATCH" => { first: 1, last: -1, step: 1 }
REDIS_COMMANDS = {
"APPEND" => { first: 1, last: 1, step: 1, single_key: true },
"BITCOUNT" => { first: 1, last: 1, step: 1, single_key: true },
"BITFIELD" => { first: 1, last: 1, step: 1, single_key: true },
"BITFIELD_RO" => { first: 1, last: 1, step: 1, single_key: true },
"BITOP" => { first: 2, last: -1, step: 1, single_key: false },
"BITPOS" => { first: 1, last: 1, step: 1, single_key: true },
"BLMOVE" => { first: 1, last: 2, step: 1, single_key: false },
"BLPOP" => { first: 1, last: -2, step: 1, single_key: false },
"BRPOP" => { first: 1, last: -2, step: 1, single_key: false },
"BRPOPLPUSH" => { first: 1, last: 2, step: 1, single_key: false },
"BZPOPMAX" => { first: 1, last: -2, step: 1, single_key: false },
"BZPOPMIN" => { first: 1, last: -2, step: 1, single_key: false },
"COPY" => { first: 1, last: 2, step: 1, single_key: false },
"DECR" => { first: 1, last: 1, step: 1, single_key: true },
"DECRBY" => { first: 1, last: 1, step: 1, single_key: true },
"DEL" => { first: 1, last: -1, step: 1, single_key: false },
"DUMP" => { first: 1, last: 1, step: 1, single_key: true },
"EXISTS" => { first: 1, last: -1, step: 1, single_key: false },
"EXPIRE" => { first: 1, last: 1, step: 1, single_key: true },
"EXPIREAT" => { first: 1, last: 1, step: 1, single_key: true },
"GEOADD" => { first: 1, last: 1, step: 1, single_key: true },
"GEODIST" => { first: 1, last: 1, step: 1, single_key: true },
"GEOHASH" => { first: 1, last: 1, step: 1, single_key: true },
"GEOPOS" => { first: 1, last: 1, step: 1, single_key: true },
"GEORADIUS" => { first: 1, last: 1, step: 1, single_key: true },
"GEORADIUSBYMEMBER" => { first: 1, last: 1, step: 1, single_key: true },
"GEORADIUSBYMEMBER_RO" => { first: 1, last: 1, step: 1, single_key: true },
"GEORADIUS_RO" => { first: 1, last: 1, step: 1, single_key: true },
"GEOSEARCH" => { first: 1, last: 1, step: 1, single_key: true },
"GEOSEARCHSTORE" => { first: 1, last: 2, step: 1, single_key: false },
"GET" => { first: 1, last: 1, step: 1, single_key: true },
"GETBIT" => { first: 1, last: 1, step: 1, single_key: true },
"GETDEL" => { first: 1, last: 1, step: 1, single_key: true },
"GETEX" => { first: 1, last: 1, step: 1, single_key: true },
"GETRANGE" => { first: 1, last: 1, step: 1, single_key: true },
"GETSET" => { first: 1, last: 1, step: 1, single_key: true },
"HDEL" => { first: 1, last: 1, step: 1, single_key: true },
"HEXISTS" => { first: 1, last: 1, step: 1, single_key: true },
"HGET" => { first: 1, last: 1, step: 1, single_key: true },
"HGETALL" => { first: 1, last: 1, step: 1, single_key: true },
"HINCRBY" => { first: 1, last: 1, step: 1, single_key: true },
"HINCRBYFLOAT" => { first: 1, last: 1, step: 1, single_key: true },
"HKEYS" => { first: 1, last: 1, step: 1, single_key: true },
"HLEN" => { first: 1, last: 1, step: 1, single_key: true },
"HMGET" => { first: 1, last: 1, step: 1, single_key: true },
"HMSET" => { first: 1, last: 1, step: 1, single_key: true },
"HRANDFIELD" => { first: 1, last: 1, step: 1, single_key: true },
"HSCAN" => { first: 1, last: 1, step: 1, single_key: true },
"HSET" => { first: 1, last: 1, step: 1, single_key: true },
"HSETNX" => { first: 1, last: 1, step: 1, single_key: true },
"HSTRLEN" => { first: 1, last: 1, step: 1, single_key: true },
"HVALS" => { first: 1, last: 1, step: 1, single_key: true },
"INCR" => { first: 1, last: 1, step: 1, single_key: true },
"INCRBY" => { first: 1, last: 1, step: 1, single_key: true },
"INCRBYFLOAT" => { first: 1, last: 1, step: 1, single_key: true },
"LINDEX" => { first: 1, last: 1, step: 1, single_key: true },
"LINSERT" => { first: 1, last: 1, step: 1, single_key: true },
"LLEN" => { first: 1, last: 1, step: 1, single_key: true },
"LMOVE" => { first: 1, last: 2, step: 1, single_key: false },
"LPOP" => { first: 1, last: 1, step: 1, single_key: true },
"LPOS" => { first: 1, last: 1, step: 1, single_key: true },
"LPUSH" => { first: 1, last: 1, step: 1, single_key: true },
"LPUSHX" => { first: 1, last: 1, step: 1, single_key: true },
"LRANGE" => { first: 1, last: 1, step: 1, single_key: true },
"LREM" => { first: 1, last: 1, step: 1, single_key: true },
"LSET" => { first: 1, last: 1, step: 1, single_key: true },
"LTRIM" => { first: 1, last: 1, step: 1, single_key: true },
"MGET" => { first: 1, last: -1, step: 1, single_key: false },
"MIGRATE" => { first: 3, last: 3, step: 1, single_key: true },
"MOVE" => { first: 1, last: 1, step: 1, single_key: true },
"MSET" => { first: 1, last: -1, step: 2, single_key: false },
"MSETNX" => { first: 1, last: -1, step: 2, single_key: false },
"OBJECT" => { first: 2, last: 2, step: 1, single_key: true },
"PERSIST" => { first: 1, last: 1, step: 1, single_key: true },
"PEXPIRE" => { first: 1, last: 1, step: 1, single_key: true },
"PEXPIREAT" => { first: 1, last: 1, step: 1, single_key: true },
"PFADD" => { first: 1, last: 1, step: 1, single_key: true },
"PFCOUNT" => { first: 1, last: -1, step: 1, single_key: false },
"PFDEBUG" => { first: 2, last: 2, step: 1, single_key: true },
"PFMERGE" => { first: 1, last: -1, step: 1, single_key: false },
"PSETEX" => { first: 1, last: 1, step: 1, single_key: true },
"PTTL" => { first: 1, last: 1, step: 1, single_key: true },
"RENAME" => { first: 1, last: 2, step: 1, single_key: false },
"RENAMENX" => { first: 1, last: 2, step: 1, single_key: false },
"RESTORE" => { first: 1, last: 1, step: 1, single_key: true },
"RESTORE-ASKING" => { first: 1, last: 1, step: 1, single_key: true },
"RPOP" => { first: 1, last: 1, step: 1, single_key: true },
"RPOPLPUSH" => { first: 1, last: 2, step: 1, single_key: false },
"RPUSH" => { first: 1, last: 1, step: 1, single_key: true },
"RPUSHX" => { first: 1, last: 1, step: 1, single_key: true },
"SADD" => { first: 1, last: 1, step: 1, single_key: true },
"SCARD" => { first: 1, last: 1, step: 1, single_key: true },
"SDIFF" => { first: 1, last: -1, step: 1, single_key: false },
"SDIFFSTORE" => { first: 1, last: -1, step: 1, single_key: false },
"SET" => { first: 1, last: 1, step: 1, single_key: true },
"SETBIT" => { first: 1, last: 1, step: 1, single_key: true },
"SETEX" => { first: 1, last: 1, step: 1, single_key: true },
"SETNX" => { first: 1, last: 1, step: 1, single_key: true },
"SETRANGE" => { first: 1, last: 1, step: 1, single_key: true },
"SINTER" => { first: 1, last: -1, step: 1, single_key: false },
"SINTERSTORE" => { first: 1, last: -1, step: 1, single_key: false },
"SISMEMBER" => { first: 1, last: 1, step: 1, single_key: true },
"SMEMBERS" => { first: 1, last: 1, step: 1, single_key: true },
"SMISMEMBER" => { first: 1, last: 1, step: 1, single_key: true },
"SMOVE" => { first: 1, last: 2, step: 1, single_key: false },
"SORT" => { first: 1, last: 1, step: 1, single_key: true },
"SPOP" => { first: 1, last: 1, step: 1, single_key: true },
"SRANDMEMBER" => { first: 1, last: 1, step: 1, single_key: true },
"SREM" => { first: 1, last: 1, step: 1, single_key: true },
"SSCAN" => { first: 1, last: 1, step: 1, single_key: true },
"STRLEN" => { first: 1, last: 1, step: 1, single_key: true },
"SUBSTR" => { first: 1, last: 1, step: 1, single_key: true },
"SUNION" => { first: 1, last: -1, step: 1, single_key: false },
"SUNIONSTORE" => { first: 1, last: -1, step: 1, single_key: false },
"TOUCH" => { first: 1, last: -1, step: 1, single_key: false },
"TTL" => { first: 1, last: 1, step: 1, single_key: true },
"TYPE" => { first: 1, last: 1, step: 1, single_key: true },
"UNLINK" => { first: 1, last: -1, step: 1, single_key: false },
"WATCH" => { first: 1, last: -1, step: 1, single_key: false },
"XACK" => { first: 1, last: 1, step: 1, single_key: true },
"XADD" => { first: 1, last: 1, step: 1, single_key: true },
"XAUTOCLAIM" => { first: 1, last: 1, step: 1, single_key: true },
"XCLAIM" => { first: 1, last: 1, step: 1, single_key: true },
"XDEL" => { first: 1, last: 1, step: 1, single_key: true },
"XGROUP" => { first: 2, last: 2, step: 1, single_key: true },
"XINFO" => { first: 2, last: 2, step: 1, single_key: true },
"XLEN" => { first: 1, last: 1, step: 1, single_key: true },
"XPENDING" => { first: 1, last: 1, step: 1, single_key: true },
"XRANGE" => { first: 1, last: 1, step: 1, single_key: true },
"XREVRANGE" => { first: 1, last: 1, step: 1, single_key: true },
"XSETID" => { first: 1, last: 1, step: 1, single_key: true },
"XTRIM" => { first: 1, last: 1, step: 1, single_key: true },
"ZADD" => { first: 1, last: 1, step: 1, single_key: true },
"ZCARD" => { first: 1, last: 1, step: 1, single_key: true },
"ZCOUNT" => { first: 1, last: 1, step: 1, single_key: true },
"ZDIFFSTORE" => { first: 1, last: 1, step: 1, single_key: true },
"ZINCRBY" => { first: 1, last: 1, step: 1, single_key: true },
"ZINTERSTORE" => { first: 1, last: 1, step: 1, single_key: true },
"ZLEXCOUNT" => { first: 1, last: 1, step: 1, single_key: true },
"ZMSCORE" => { first: 1, last: 1, step: 1, single_key: true },
"ZPOPMAX" => { first: 1, last: 1, step: 1, single_key: true },
"ZPOPMIN" => { first: 1, last: 1, step: 1, single_key: true },
"ZRANDMEMBER" => { first: 1, last: 1, step: 1, single_key: true },
"ZRANGE" => { first: 1, last: 1, step: 1, single_key: true },
"ZRANGEBYLEX" => { first: 1, last: 1, step: 1, single_key: true },
"ZRANGEBYSCORE" => { first: 1, last: 1, step: 1, single_key: true },
"ZRANGESTORE" => { first: 1, last: 2, step: 1, single_key: false },
"ZRANK" => { first: 1, last: 1, step: 1, single_key: true },
"ZREM" => { first: 1, last: 1, step: 1, single_key: true },
"ZREMRANGEBYLEX" => { first: 1, last: 1, step: 1, single_key: true },
"ZREMRANGEBYRANK" => { first: 1, last: 1, step: 1, single_key: true },
"ZREMRANGEBYSCORE" => { first: 1, last: 1, step: 1, single_key: true },
"ZREVRANGE" => { first: 1, last: 1, step: 1, single_key: true },
"ZREVRANGEBYLEX" => { first: 1, last: 1, step: 1, single_key: true },
"ZREVRANGEBYSCORE" => { first: 1, last: 1, step: 1, single_key: true },
"ZREVRANK" => { first: 1, last: 1, step: 1, single_key: true },
"ZSCAN" => { first: 1, last: 1, step: 1, single_key: true },
"ZSCORE" => { first: 1, last: 1, step: 1, single_key: true },
"ZUNIONSTORE" => { first: 1, last: 1, step: 1, single_key: true }
}.freeze
CrossSlotError = Class.new(StandardError)
class << self
def validate!(command)
def validate!(commands)
return unless Rails.env.development? || Rails.env.test?
return if allow_cross_slot_commands?
return if commands.empty?
command_name = command.first.to_s.upcase
argument_positions = MULTI_KEY_COMMANDS[command_name]
return unless argument_positions
arguments = command.flatten[argument_positions[:first]..argument_positions[:last]]
key_slots = arguments.each_slice(argument_positions[:step]).map do |args|
key_slot(args.first)
end
# early exit for single-command (non-pipelined) if it is a single-key-command
command_name = commands.size > 1 ? "PIPELINE/MULTI" : commands.first.first.to_s.upcase
return if commands.size == 1 && REDIS_COMMANDS.dig(command_name, :single_key)
key_slots = commands.map { |command| key_slots(command) }.flatten
if key_slots.uniq.many? # rubocop: disable CodeReuse/ActiveRecord
raise CrossSlotError, "Redis command #{command_name} arguments hash to different slots. See https://docs.gitlab.com/ee/development/redis.html#multi-key-commands"
end
......@@ -78,6 +210,17 @@ def allow_cross_slot_commands
private
def key_slots(command)
argument_positions = REDIS_COMMANDS[command.first.to_s.upcase]
return [] unless argument_positions
arguments = command.flatten[argument_positions[:first]..argument_positions[:last]]
arguments.each_slice(argument_positions[:step]).map do |args|
key_slot(args.first)
end
end
def allow_cross_slot_commands?
Thread.current[:allow_cross_slot_commands].to_i > 0
end
......
......@@ -33,8 +33,7 @@ def read
def instrument_call(commands)
start = Gitlab::Metrics::System.monotonic_time # must come first so that 'start' is always defined
instrumentation_class.instance_count_request(commands.size)
commands.each { |c| instrumentation_class.redis_cluster_validate!(c) }
instrumentation_class.redis_cluster_validate!(commands)
yield
rescue ::Redis::BaseError => ex
......
......@@ -99,11 +99,13 @@ def remove_current_project_id_cache
def refresh_keys_expiration
with_redis do |redis|
redis.multi do |multi|
multi.expire(issue_ids_key, REDIS_EXPIRY_TIME)
multi.expire(current_index_key, REDIS_EXPIRY_TIME)
multi.expire(current_project_key, REDIS_EXPIRY_TIME)
multi.expire(CONCURRENT_RUNNING_REBALANCES_KEY, REDIS_EXPIRY_TIME)
Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
redis.multi do |multi|
multi.expire(issue_ids_key, REDIS_EXPIRY_TIME)
multi.expire(current_index_key, REDIS_EXPIRY_TIME)
multi.expire(current_project_key, REDIS_EXPIRY_TIME)
multi.expire(CONCURRENT_RUNNING_REBALANCES_KEY, REDIS_EXPIRY_TIME)
end
end
end
end
......@@ -112,12 +114,14 @@ def cleanup_cache
value = "#{rebalanced_container_type}/#{rebalanced_container_id}"
with_redis do |redis|
redis.multi do |multi|
multi.del(issue_ids_key)
multi.del(current_index_key)
multi.del(current_project_key)
multi.srem?(CONCURRENT_RUNNING_REBALANCES_KEY, value)
multi.set(self.class.recently_finished_key(rebalanced_container_type, rebalanced_container_id), true, ex: 1.hour)
Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
redis.multi do |multi|
multi.del(issue_ids_key)
multi.del(current_index_key)
multi.del(current_project_key)
multi.srem?(CONCURRENT_RUNNING_REBALANCES_KEY, value)
multi.set(self.class.recently_finished_key(rebalanced_container_type, rebalanced_container_id), true, ex: 1.hour)
end
end
end
end
......
......@@ -14,9 +14,11 @@ def initialize(user, fallback: {})
def save(repositories, group_id)
Gitlab::Redis::SharedState.with do |redis|
redis.multi do |multi|
multi.set(key_for('repositories'), Gitlab::Json.dump(repositories), ex: EXPIRY_TIME)
multi.set(key_for('group_id'), group_id, ex: EXPIRY_TIME)
Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
redis.multi do |multi|
multi.set(key_for('repositories'), Gitlab::Json.dump(repositories), ex: EXPIRY_TIME)
multi.set(key_for('group_id'), group_id, ex: EXPIRY_TIME)
end
end
end
end
......
......@@ -10,9 +10,11 @@ def self.bulk_read(subjects)
results = {}
Gitlab::Redis::Cache.with do |r|
r.pipelined do |pipeline|
subjects.each do |subject|
results[subject.cache_key] = new(subject).read(pipeline)
Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
r.pipelined do |pipeline|
subjects.each do |subject|
results[subject.cache_key] = new(subject).read(pipeline)
end
end
end
end
......
......@@ -22,7 +22,7 @@
it do
stub_rails_env(env)
args = [:mget, 'foo', 'bar']
args = [[:mget, 'foo', 'bar']]
if should_raise
expect { described_class.validate!(args) }
......@@ -58,7 +58,7 @@
with_them do
it do
args = [command] + arguments
args = [[command] + arguments]
if should_raise
expect { described_class.validate!(args) }
......@@ -68,13 +68,32 @@
end
end
end
where(:arguments, :should_raise) do
[[:get, "foo"], [:get, "bar"]] | true
[[:get, "foo"], [:mget, "foo", "bar"]] | true # mix of single-key and multi-key cmds
[[:get, "{foo}:name"], [:get, "{foo}:profile"]] | false
[[:del, "foo"], [:del, "bar"]] | true
[] | false # pipeline or transaction opened and closed without ops
end
with_them do
it do
if should_raise
expect { described_class.validate!(arguments) }
.to raise_error(described_class::CrossSlotError)
else
expect { described_class.validate!(arguments) }.not_to raise_error
end
end
end
end
describe '.allow_cross_slot_commands' do
it 'does not raise for invalid arguments' do
expect do
described_class.allow_cross_slot_commands do
described_class.validate!([:mget, 'foo', 'bar'])
described_class.validate!([[:mget, 'foo', 'bar']])
end
end.not_to raise_error
end
......@@ -83,10 +102,10 @@
expect do
described_class.allow_cross_slot_commands do
described_class.allow_cross_slot_commands do
described_class.validate!([:mget, 'foo', 'bar'])
described_class.validate!([[:mget, 'foo', 'bar']])
end
described_class.validate!([:mget, 'foo', 'bar'])
described_class.validate!([[:mget, 'foo', 'bar']])
end
end.not_to raise_error
end
......
......@@ -57,8 +57,8 @@
Gitlab::Redis::SharedState.with do |redis|
redis.pipelined do |pipeline|
pipeline.call(:get, 'foobar')
pipeline.call(:get, 'foobarbaz')
pipeline.call(:get, '{foobar}buz')
pipeline.call(:get, '{foobar}baz')
end
end
end
......@@ -103,11 +103,22 @@
Gitlab::Redis::SharedState.with do |redis|
redis.pipelined do |pipeline|
pipeline.call(:get, 'foobar')
pipeline.call(:get, 'foobarbaz')
pipeline.call(:get, '{foobar}:buz')
pipeline.call(:get, '{foobar}baz')
end
end
end
it 'raises error when keys are not from the same slot' do
expect do
Gitlab::Redis::SharedState.with do |redis|
redis.pipelined do |pipeline|
pipeline.call(:get, 'foo')
pipeline.call(:get, 'bar')
end
end
end.to raise_error(instance_of(Gitlab::Instrumentation::RedisClusterValidator::CrossSlotError))
end
end
end
......
......@@ -127,17 +127,15 @@ def self.name
end
before(:all) do
primary_store.multi do |multi|
multi.set(key1, value1)
multi.set(key2, value2)
multi.sadd(skey, [value1, value2])
end
primary_store.set(key1, value1)
primary_store.set(key2, value2)
primary_store.sadd?(skey, value1)
primary_store.sadd?(skey, value2)
secondary_store.multi do |multi|
multi.set(key1, value1)
multi.set(key2, value2)
multi.sadd(skey, [value1, value2])
end
secondary_store.set(key1, value1)
secondary_store.set(key2, value2)
secondary_store.sadd?(skey, value1)
secondary_store.sadd?(skey, value2)
end
RSpec.shared_examples_for 'reads correct value' do
......@@ -349,15 +347,11 @@ def self.name
primary_store.flushdb
secondary_store.flushdb
primary_store.multi do |multi|
multi.set(key2, value1)
multi.sadd?(skey, value1)
end
primary_store.set(key2, value1)
primary_store.sadd?(skey, value1)
secondary_store.multi do |multi|
multi.set(key2, value1)
multi.sadd?(skey, value1)
end
secondary_store.set(key2, value1)
secondary_store.sadd?(skey, value1)
end
with_them do
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment