Skip to content
Snippets Groups Projects
Verified Commit d1f153fe authored by Mikhail Mazurskiy's avatar Mikhail Mazurskiy
Browse files

Workhorse: remove the global Redis client

parent 1e6be117
No related branches found
No related tags found
1 merge request!137842Workhorse: remove the global Redis client
......@@ -21,11 +21,11 @@ type KeyWatcher struct {
subscribers map[string][]chan string
shutdown chan struct{}
reconnectBackoff backoff.Backoff
redisConn *redis.Client
redisConn *redis.Client // can be nil
conn *redis.PubSub
}
func NewKeyWatcher() *KeyWatcher {
func NewKeyWatcher(redisConn *redis.Client) *KeyWatcher {
return &KeyWatcher{
shutdown: make(chan struct{}),
reconnectBackoff: backoff.Backoff{
......@@ -34,6 +34,7 @@ func NewKeyWatcher() *KeyWatcher {
Factor: 2,
Jitter: true,
},
redisConn: redisConn,
}
}
......@@ -125,16 +126,13 @@ func (kw *KeyWatcher) receivePubSubStream(ctx context.Context, pubsub *redis.Pub
}
}
func (kw *KeyWatcher) Process(client *redis.Client) {
func (kw *KeyWatcher) Process() {
log.Info("keywatcher: starting process loop")
ctx := context.Background() // lint:allow context.Background
kw.mu.Lock()
kw.redisConn = client
kw.mu.Unlock()
for {
pubsub := client.Subscribe(ctx, []string{}...)
pubsub := kw.redisConn.Subscribe(ctx, []string{}...)
if err := pubsub.Ping(ctx); err != nil {
log.WithError(fmt.Errorf("keywatcher: %v", err)).Error()
time.Sleep(kw.reconnectBackoff.Duration())
......
......@@ -7,6 +7,8 @@ import (
"testing"
"time"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
......@@ -18,24 +20,28 @@ const (
runnerKey = "runner:build_queue:10"
)
func initRdb() {
buf, _ := os.ReadFile("../../config.toml")
cfg, _ := config.LoadConfig(string(buf))
Configure(cfg.Redis)
func initRdb(t *testing.T) *redis.Client {
buf, err := os.ReadFile("../../config.toml")
require.NoError(t, err)
cfg, err := config.LoadConfig(string(buf))
require.NoError(t, err)
rdb, err := Configure(cfg.Redis)
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, rdb.Close())
})
return rdb
}
func (kw *KeyWatcher) countSubscribers(key string) int {
func countSubscribers(kw *KeyWatcher, key string) int {
kw.mu.Lock()
defer kw.mu.Unlock()
return len(kw.subscribers[key])
}
// Forces a run of the `Process` loop against a mock PubSubConn.
func (kw *KeyWatcher) processMessages(t *testing.T, numWatchers int, value string, ready chan<- struct{}, wg *sync.WaitGroup) {
kw.mu.Lock()
kw.redisConn = rdb
func processMessages(t *testing.T, kw *KeyWatcher, numWatchers int, value string, ready chan<- struct{}, wg *sync.WaitGroup) {
psc := kw.redisConn.Subscribe(ctx, []string{}...)
kw.mu.Unlock()
errC := make(chan error)
go func() { errC <- kw.receivePubSubStream(ctx, psc) }()
......@@ -48,7 +54,7 @@ func (kw *KeyWatcher) processMessages(t *testing.T, numWatchers int, value strin
close(ready)
require.Eventually(t, func() bool {
return kw.countSubscribers(runnerKey) == numWatchers
return countSubscribers(kw, runnerKey) == numWatchers
}, time.Second, time.Millisecond)
// send message after listeners are ready
......@@ -74,7 +80,7 @@ type keyChangeTestCase struct {
}
func TestKeyChangesInstantReturn(t *testing.T) {
initRdb()
rdb := initRdb(t)
testCases := []keyChangeTestCase{
// WatchKeyStatusAlreadyChanged
......@@ -118,13 +124,10 @@ func TestKeyChangesInstantReturn(t *testing.T) {
rdb.Set(ctx, runnerKey, tc.returnValue, 0)
}
defer func() {
rdb.FlushDB(ctx)
}()
defer rdb.FlushDB(ctx)
kw := NewKeyWatcher()
kw := NewKeyWatcher(rdb)
defer kw.Shutdown()
kw.redisConn = rdb
kw.conn = kw.redisConn.Subscribe(ctx, []string{}...)
val, err := kw.WatchKey(ctx, runnerKey, tc.watchValue, tc.timeout)
......@@ -136,7 +139,7 @@ func TestKeyChangesInstantReturn(t *testing.T) {
}
func TestKeyChangesWhenWatching(t *testing.T) {
initRdb()
rdb := initRdb(t)
testCases := []keyChangeTestCase{
// WatchKeyStatusSeenChange
......@@ -170,11 +173,9 @@ func TestKeyChangesWhenWatching(t *testing.T) {
rdb.Set(ctx, runnerKey, tc.returnValue, 0)
}
kw := NewKeyWatcher()
kw := NewKeyWatcher(rdb)
defer kw.Shutdown()
defer func() {
rdb.FlushDB(ctx)
}()
defer rdb.FlushDB(ctx)
wg := &sync.WaitGroup{}
wg.Add(1)
......@@ -189,13 +190,13 @@ func TestKeyChangesWhenWatching(t *testing.T) {
require.Equal(t, tc.expectedStatus, val, "Expected value")
}()
kw.processMessages(t, 1, tc.processedValue, ready, wg)
processMessages(t, kw, 1, tc.processedValue, ready, wg)
})
}
}
func TestKeyChangesParallel(t *testing.T) {
initRdb()
rdb := initRdb(t)
testCases := []keyChangeTestCase{
{
......@@ -222,15 +223,13 @@ func TestKeyChangesParallel(t *testing.T) {
rdb.Set(ctx, runnerKey, tc.returnValue, 0)
}
defer func() {
rdb.FlushDB(ctx)
}()
defer rdb.FlushDB(ctx)
wg := &sync.WaitGroup{}
wg.Add(runTimes)
ready := make(chan struct{})
kw := NewKeyWatcher()
kw := NewKeyWatcher(rdb)
defer kw.Shutdown()
for i := 0; i < runTimes; i++ {
......@@ -244,16 +243,15 @@ func TestKeyChangesParallel(t *testing.T) {
}()
}
kw.processMessages(t, runTimes, tc.processedValue, ready, wg)
processMessages(t, kw, runTimes, tc.processedValue, ready, wg)
})
}
}
func TestShutdown(t *testing.T) {
initRdb()
rdb := initRdb(t)
kw := NewKeyWatcher()
kw.redisConn = rdb
kw := NewKeyWatcher(rdb)
kw.conn = kw.redisConn.Subscribe(ctx, []string{}...)
defer kw.Shutdown()
......@@ -272,14 +270,14 @@ func TestShutdown(t *testing.T) {
go func() {
defer wg.Done()
require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 1 }, 10*time.Second, time.Millisecond)
require.Eventually(t, func() bool { return countSubscribers(kw, runnerKey) == 1 }, 10*time.Second, time.Millisecond)
kw.Shutdown()
}()
wg.Wait()
require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 0 }, 10*time.Second, time.Millisecond)
require.Eventually(t, func() bool { return countSubscribers(kw, runnerKey) == 0 }, 10*time.Second, time.Millisecond)
// Adding a key after the shutdown should result in an immediate response
var val WatchKeyStatus
......
......@@ -16,7 +16,6 @@ import (
)
var (
rdb *redis.Client
// found in https://github.com/redis/go-redis/blob/c7399b6a17d7d3e2a57654528af91349f2468529/sentinel.go#L626
errSentinelMasterAddr error = errors.New("redis: all sentinels specified in configuration are unreachable")
......@@ -120,16 +119,13 @@ func (s sentinelInstrumentationHook) ProcessPipelineHook(next redis.ProcessPipel
}
}
func GetRedisClient() *redis.Client {
return rdb
}
// Configure redis-connection
func Configure(cfg *config.RedisConfig) error {
func Configure(cfg *config.RedisConfig) (*redis.Client, error) {
if cfg == nil {
return nil
return nil, nil
}
var rdb *redis.Client
var err error
if len(cfg.Sentinel) > 0 {
......@@ -138,7 +134,7 @@ func Configure(cfg *config.RedisConfig) error {
rdb, err = configureRedis(cfg)
}
return err
return rdb, err
}
func configureRedis(cfg *config.RedisConfig) (*redis.Client, error) {
......
......@@ -29,8 +29,8 @@ func mockRedisServer(t *testing.T, connectReceived *atomic.Value) string {
}
func TestConfigureNoConfig(t *testing.T) {
rdb = nil
Configure(nil)
rdb, err := Configure(nil)
require.NoError(t, err)
require.Nil(t, rdb, "rdb client should be nil")
}
......@@ -54,15 +54,15 @@ func TestConfigureValidConfigX(t *testing.T) {
parsedURL := helper.URLMustParse(tc.scheme + "://" + a)
cfg := &config.RedisConfig{URL: config.TomlURL{URL: *parsedURL}}
Configure(cfg)
rdb, err := Configure(cfg)
require.NoError(t, err)
defer rdb.Close()
require.NotNil(t, GetRedisClient().Conn(), "Pool should not be nil")
require.NotNil(t, rdb.Conn(), "Pool should not be nil")
// goredis initialise connections lazily
rdb.Ping(context.Background())
require.True(t, connectReceived.Load().(bool))
rdb = nil
})
}
}
......@@ -93,15 +93,15 @@ func TestConnectToSentinel(t *testing.T) {
}
cfg := &config.RedisConfig{Sentinel: sentinelUrls}
Configure(cfg)
rdb, err := Configure(cfg)
require.NoError(t, err)
defer rdb.Close()
require.NotNil(t, GetRedisClient().Conn(), "Pool should not be nil")
require.NotNil(t, rdb.Conn(), "Pool should not be nil")
// goredis initialise connections lazily
rdb.Ping(context.Background())
require.True(t, connectReceived.Load().(bool))
rdb = nil
})
}
}
......
......@@ -225,13 +225,14 @@ func run(boot bootConfig, cfg config.Config) error {
log.Info("Using redis/go-redis")
redisKeyWatcher := redis.NewKeyWatcher()
if err := redis.Configure(cfg.Redis); err != nil {
rdb, err := redis.Configure(cfg.Redis)
if err != nil {
log.WithError(err).Error("unable to configure redis client")
}
redisKeyWatcher := redis.NewKeyWatcher(rdb)
if rdb := redis.GetRedisClient(); rdb != nil {
go redisKeyWatcher.Process(rdb)
if rdb != nil {
go redisKeyWatcher.Process()
}
watchKeyFn := redisKeyWatcher.WatchKey
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment