Skip to content
Snippets Groups Projects
Commit cd94be7f authored by Pavlo Strokov's avatar Pavlo Strokov
Browse files

Merge branch 'ps-replication-parallel-processing' into 'master'

replication: Process replication events for storages in parallel

See merge request !3894
parents 01501de9 064d5be9
No related branches found
No related tags found
1 merge request!3894replication: Process replication events for storages in parallel
Pipeline #383271284 passed
......@@ -356,7 +356,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
repl = praefect.NewReplMgr(
logger,
conf.VirtualStorageNames(),
conf.StorageNames(),
queue,
rs,
healthChecker,
......@@ -364,6 +364,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
praefect.WithDelayMetric(delayMetric),
praefect.WithLatencyMetric(latencyMetric),
praefect.WithDequeueBatchSize(conf.Replication.BatchSize),
praefect.WithParallelStorageProcessingWorkers(conf.Replication.ParallelStorageProcessingWorkers),
)
srvFactory = praefect.NewServerFactory(
conf,
......@@ -426,7 +427,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
return fmt.Errorf("unable to start the bootstrap: %v", err)
}
go repl.ProcessBacklog(ctx, praefect.ExpBackoffFunc(1*time.Second, 5*time.Second))
go repl.ProcessBacklog(ctx, praefect.ExpBackoffFactory{Start: time.Second, Max: 5 * time.Second})
logger.Info("background started: processing of the replication events")
repl.ProcessStale(ctx, 30*time.Second, time.Minute)
logger.Info("background started: processing of the stale replication events")
......
......@@ -100,11 +100,14 @@ type Replication struct {
// BatchSize controls how many replication jobs to dequeue and lock
// in a single call to the database.
BatchSize uint `toml:"batch_size"`
// ParallelStorageProcessingWorkers is a number of workers used to process replication
// events per virtual storage (how many storages would be processed in parallel).
ParallelStorageProcessingWorkers uint `toml:"parallel_storage_processing_workers"`
}
// DefaultReplicationConfig returns the default values for replication configuration.
func DefaultReplicationConfig() Replication {
return Replication{BatchSize: 10}
return Replication{BatchSize: 10, ParallelStorageProcessingWorkers: 1}
}
// Config is a container for everything found in the TOML config file
......
......@@ -317,7 +317,7 @@ func TestConfigParsing(t *testing.T) {
SchedulingInterval: config.Duration(time.Minute),
HistogramBuckets: []float64{1, 2, 3, 4, 5},
},
Replication: Replication{BatchSize: 1},
Replication: Replication{BatchSize: 1, ParallelStorageProcessingWorkers: 2},
Failover: Failover{
Enabled: true,
ElectionStrategy: ElectionStrategyPerRepository,
......@@ -344,7 +344,7 @@ func TestConfigParsing(t *testing.T) {
HistogramBuckets: []float64{1, 2, 3, 4, 5},
},
Prometheus: prometheus.DefaultConfig(),
Replication: Replication{BatchSize: 1},
Replication: Replication{BatchSize: 1, ParallelStorageProcessingWorkers: 2},
Failover: Failover{
Enabled: false,
ElectionStrategy: "local",
......
[replication]
batch_size = 1
parallel_storage_processing_workers = 2
[reconciliation]
scheduling_interval = 0
......
......@@ -8,6 +8,7 @@ graceful_stop_timeout = "30s"
[replication]
batch_size = 1
parallel_storage_processing_workers = 2
[reconciliation]
scheduling_interval = "1m"
......
......@@ -54,7 +54,9 @@ func testConfig(backends int) config.Config {
return cfg
}
func noopBackoffFunc() (backoff, backoffReset) {
type noopBackoffFactory struct{}
func (noopBackoffFactory) Create() (Backoff, BackoffReset) {
return func() time.Duration {
return 0
}, func() {}
......@@ -174,7 +176,7 @@ func runPraefectServer(t testing.TB, ctx context.Context, conf config.Config, op
// TODO: run a replmgr for EVERY virtual storage
replmgr := NewReplMgr(
opt.withLogger,
conf.VirtualStorageNames(),
conf.StorageNames(),
opt.withQueue,
opt.withRepoStore,
opt.withNodeMgr,
......@@ -287,7 +289,7 @@ func startProcessBacklog(ctx context.Context, replMgr ReplMgr) <-chan struct{} {
done := make(chan struct{})
go func() {
defer close(done)
replMgr.ProcessBacklog(ctx, noopBackoffFunc)
replMgr.ProcessBacklog(ctx, noopBackoffFactory{})
}()
return done
}
......@@ -407,17 +407,18 @@ func (dr defaultReplicator) RepackFull(ctx context.Context, event datastore.Repl
// ReplMgr is a replication manager for handling replication jobs
type ReplMgr struct {
log *logrus.Entry
queue datastore.ReplicationEventQueue
hc HealthChecker
nodes NodeSet
virtualStorages []string // replicas this replicator is responsible for
replicator Replicator // does the actual replication logic
replInFlightMetric *prometheus.GaugeVec
replLatencyMetric prommetrics.HistogramVec
replDelayMetric prommetrics.HistogramVec
replJobTimeout time.Duration
dequeueBatchSize uint
log *logrus.Entry
queue datastore.ReplicationEventQueue
hc HealthChecker
nodes NodeSet
storageNamesByVirtualStorage map[string][]string // replicas this replicator is responsible for
replicator Replicator // does the actual replication logic
replInFlightMetric *prometheus.GaugeVec
replLatencyMetric prommetrics.HistogramVec
replDelayMetric prommetrics.HistogramVec
replJobTimeout time.Duration
dequeueBatchSize uint
parallelStorageProcessingWorkers uint
}
// ReplMgrOpt allows a replicator to be configured with additional options
......@@ -444,31 +445,50 @@ func WithDequeueBatchSize(size uint) func(*ReplMgr) {
}
}
// WithParallelStorageProcessingWorkers configures the number of workers used to process replication
// events per virtual storage.
func WithParallelStorageProcessingWorkers(n uint) func(*ReplMgr) {
return func(m *ReplMgr) {
m.parallelStorageProcessingWorkers = n
}
}
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, hc HealthChecker, nodes NodeSet, opts ...ReplMgrOpt) ReplMgr {
func NewReplMgr(log *logrus.Entry, storageNames map[string][]string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, hc HealthChecker, nodes NodeSet, opts ...ReplMgrOpt) ReplMgr {
r := ReplMgr{
log: log.WithField("component", "replication_manager"),
queue: queue,
replicator: defaultReplicator{rs: rs, log: log.WithField("component", "replicator")},
virtualStorages: virtualStorages,
hc: hc,
nodes: nodes,
log: log.WithField("component", "replication_manager"),
queue: queue,
replicator: defaultReplicator{rs: rs, log: log.WithField("component", "replicator")},
storageNamesByVirtualStorage: storageNames,
hc: hc,
nodes: nodes,
replInFlightMetric: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "gitaly_praefect_replication_jobs",
Help: "Number of replication jobs in flight.",
}, []string{"virtual_storage", "gitaly_storage", "change_type"},
),
replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
dequeueBatchSize: config.DefaultReplicationConfig().BatchSize,
replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
dequeueBatchSize: config.DefaultReplicationConfig().BatchSize,
parallelStorageProcessingWorkers: 1,
}
for _, opt := range opts {
opt(&r)
}
for virtual, sn := range storageNames {
if len(sn) < int(r.parallelStorageProcessingWorkers) {
r.log.Infof("parallel processing workers decreased from %d "+
"configured with config to %d according to minumal amount of "+
"storages in the virtual storage %q",
r.parallelStorageProcessingWorkers, len(storageNames), virtual,
)
r.parallelStorageProcessingWorkers = uint(len(storageNames))
}
}
return r
}
......@@ -486,34 +506,36 @@ const (
logWithVirtualStorage = "virtual_storage"
)
type (
backoff func() time.Duration
backoffReset func()
)
// BackoffFunc is a function that n turn provides a pair of functions backoff and backoffReset
type BackoffFunc func() (backoff, backoffReset)
// ExpBackoffFactory creates exponentially growing durations.
type ExpBackoffFactory struct {
Start, Max time.Duration
}
// ExpBackoffFunc generates a backoffFunc based off of start and max time durations
func ExpBackoffFunc(start time.Duration, max time.Duration) BackoffFunc {
return func() (backoff, backoffReset) {
const factor = 2
duration := start
// Create returns a backoff function based on Start and Max time durations.
func (b ExpBackoffFactory) Create() (Backoff, BackoffReset) {
const factor = 2
duration := b.Start
return func() time.Duration {
defer func() {
duration *= time.Duration(factor)
if (duration) >= max {
duration = max
}
}()
return duration
}, func() {
duration = start
}
}
return func() time.Duration {
defer func() {
duration *= time.Duration(factor)
if (duration) >= b.Max {
duration = b.Max
}
}()
return duration
}, func() {
duration = b.Start
}
}
type (
// Backoff returns next backoff.
Backoff func() time.Duration
// BackoffReset resets backoff provider.
BackoffReset func()
)
func getCorrelationID(params datastore.Params) string {
correlationID := ""
if val, found := params[metadatahandler.CorrelationIDKey]; found {
......@@ -522,21 +544,26 @@ func getCorrelationID(params datastore.Params) string {
return correlationID
}
// BackoffFactory creates backoff function and a reset pair for it.
type BackoffFactory interface {
// Create return new backoff provider and a reset function for it.
Create() (Backoff, BackoffReset)
}
// ProcessBacklog starts processing of queued jobs.
// It will be processing jobs until ctx is Done. ProcessBacklog
// blocks until all backlog processing goroutines have returned
func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) {
func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFactory) {
var wg sync.WaitGroup
defer wg.Wait()
for _, virtualStorage := range r.virtualStorages {
for virtualStorage := range r.storageNamesByVirtualStorage {
wg.Add(1)
go func(virtualStorage string) {
defer wg.Done()
r.processBacklog(ctx, b, virtualStorage)
}(virtualStorage)
}
wg.Wait()
}
// ProcessStale starts a background process to acknowledge stale replication jobs.
......@@ -564,47 +591,83 @@ func (r ReplMgr) ProcessStale(ctx context.Context, checkPeriod, staleAfter time.
return done
}
func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStorage string) {
logger := r.log.WithField(logWithVirtualStorage, virtualStorage)
backoff, reset := b()
func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFactory, virtualStorage string) {
var wg sync.WaitGroup
defer wg.Wait()
logger := r.log.WithField(logWithVirtualStorage, virtualStorage)
logger.Info("processing started")
// We should make a graceful shutdown of the processing loop and don't want to interrupt
// in-flight operations. That is why we suppress cancellation on the provided context.
appCtx := ctx
ctx = helper.SuppressCancellation(ctx)
for {
select {
case <-appCtx.Done():
logger.WithError(appCtx.Err()).Info("processing stopped")
return // processing must be stopped
default:
// proceed with processing
}
var totalEvents int
for _, storage := range r.hc.HealthyNodes()[virtualStorage] {
target, ok := r.nodes[virtualStorage][storage]
if !ok {
logger.WithField("storage", storage).Error("no connection to target storage")
continue
}
storageNames := r.storageNamesByVirtualStorage[virtualStorage]
type StorageProcessing struct {
StorageName string
Backoff
BackoffReset
}
storagesQueue := make(chan StorageProcessing, len(storageNames))
for _, storageName := range storageNames {
backoff, reset := b.Create()
storagesQueue <- StorageProcessing{StorageName: storageName, Backoff: backoff, BackoffReset: reset}
}
totalEvents += r.handleNode(ctx, virtualStorage, target)
}
for i := uint(0); i < r.parallelStorageProcessingWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if totalEvents == 0 {
select {
case <-time.After(backoff()):
continue
case <-appCtx.Done():
logger.WithError(appCtx.Err()).Info("processing stopped")
return
}
}
for {
var storageProcessing StorageProcessing
select {
case <-appCtx.Done():
logger.WithError(appCtx.Err()).Info("processing stopped")
return
case storageProcessing = <-storagesQueue:
}
healthyStorages := r.hc.HealthyNodes()[virtualStorage]
healthy := false
for _, healthyStorageName := range healthyStorages {
if healthyStorageName != storageProcessing.StorageName {
continue
}
healthy = true
break
}
reset()
var processedEvents int
if healthy {
target, ok := r.nodes[virtualStorage][storageProcessing.StorageName]
if !ok {
logger.WithField("storage", storageProcessing.StorageName).Error("no connection to target storage")
} else {
processedEvents = r.handleNode(ctx, virtualStorage, target)
}
}
if processedEvents == 0 {
// if the storage is not healthy or if there is no events to
// process we don't put it back to the queue immediately but
// wait for certain time period first.
go func() {
select {
case <-time.After(storageProcessing.Backoff()):
storagesQueue <- storageProcessing
case <-appCtx.Done():
logger.WithError(appCtx.Err()).Info("processing stopped")
return
}
}()
} else {
storageProcessing.BackoffReset()
storagesQueue <- storageProcessing
}
}
}()
}
}
......
......@@ -169,36 +169,42 @@ func TestReplMgr_ProcessBacklog(t *testing.T) {
replMgr := NewReplMgr(
loggerEntry,
conf.VirtualStorageNames(),
conf.StorageNames(),
queue,
datastore.MockRepositoryStore{},
nodeMgr,
NodeSetFromNodeManager(nodeMgr),
WithLatencyMetric(&mockReplicationLatencyHistogramVec),
WithDelayMetric(&mockReplicationDelayHistogramVec),
WithParallelStorageProcessingWorkers(100),
)
replMgr.ProcessBacklog(ctx, ExpBackoffFunc(time.Hour, 0))
replMgr.ProcessBacklog(ctx, noopBackoffFactory{})
logEntries := loggerHook.AllEntries()
require.True(t, len(logEntries) > 3, "expected at least 4 log entries to be present")
require.True(t, len(logEntries) > 4, "expected at least 5 log entries to be present")
require.Equal(t,
[]interface{}{`parallel processing workers decreased from 100 configured with config to 1 according to minumal amount of storages in the virtual storage "virtual"`},
[]interface{}{logEntries[0].Message},
)
require.Equal(t,
[]interface{}{"processing started", "virtual"},
[]interface{}{logEntries[0].Message, logEntries[0].Data["virtual_storage"]},
[]interface{}{logEntries[1].Message, logEntries[1].Data["virtual_storage"]},
)
require.Equal(t,
[]interface{}{"replication job processing started", "virtual", "correlation-id"},
[]interface{}{logEntries[1].Message, logEntries[1].Data["virtual_storage"], logEntries[1].Data[logWithCorrID]},
[]interface{}{logEntries[2].Message, logEntries[2].Data["virtual_storage"], logEntries[2].Data[logWithCorrID]},
)
dequeuedEvent := logEntries[1].Data["event"].(datastore.ReplicationEvent)
dequeuedEvent := logEntries[2].Data["event"].(datastore.ReplicationEvent)
require.Equal(t, datastore.JobStateInProgress, dequeuedEvent.State)
require.Equal(t, []string{"backup", "primary"}, []string{dequeuedEvent.Job.TargetNodeStorage, dequeuedEvent.Job.SourceNodeStorage})
require.Equal(t,
[]interface{}{"replication job processing finished", "virtual", datastore.JobStateCompleted, "correlation-id"},
[]interface{}{logEntries[2].Message, logEntries[2].Data["virtual_storage"], logEntries[2].Data["new_state"], logEntries[2].Data[logWithCorrID]},
[]interface{}{logEntries[3].Message, logEntries[3].Data["virtual_storage"], logEntries[3].Data["new_state"], logEntries[3].Data[logWithCorrID]},
)
replicatedPath := filepath.Join(backupCfg.Storages[0].Path, testRepo.GetRelativePath())
......@@ -339,7 +345,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
protoregistry.GitalyProtoPreregistered,
)
replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, rs, nodeMgr, NodeSetFromNodeManager(nodeMgr))
replmgr := NewReplMgr(logEntry, conf.StorageNames(), queue, rs, nodeMgr, NodeSetFromNodeManager(nodeMgr))
prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, nil, nil)
......@@ -707,7 +713,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
replMgr := NewReplMgr(
logEntry,
conf.VirtualStorageNames(),
conf.StorageNames(),
queueInterceptor,
datastore.MockRepositoryStore{},
nodeMgr,
......@@ -857,7 +863,7 @@ func TestProcessBacklog_Success(t *testing.T) {
replMgr := NewReplMgr(
logEntry,
conf.VirtualStorageNames(),
conf.StorageNames(),
queueInterceptor,
datastore.MockRepositoryStore{},
nodeMgr,
......@@ -896,23 +902,22 @@ func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) {
ctx, cancel := testhelper.Context()
first := true
var mtx sync.Mutex
expStorages := map[string]bool{conf.VirtualStorages[0].Nodes[0].Storage: true, conf.VirtualStorages[0].Nodes[2].Storage: true}
queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
queueInterceptor.OnDequeue(func(_ context.Context, virtualStorageName string, storageName string, _ int, _ datastore.ReplicationEventQueue) ([]datastore.ReplicationEvent, error) {
select {
case <-ctx.Done():
return nil, nil
default:
if first {
first = false
require.Equal(t, conf.VirtualStorages[0].Name, virtualStorageName)
require.Equal(t, conf.VirtualStorages[0].Nodes[0].Storage, storageName)
return nil, nil
}
mtx.Lock()
defer mtx.Unlock()
assert.Equal(t, conf.VirtualStorages[0].Name, virtualStorageName)
assert.Equal(t, conf.VirtualStorages[0].Nodes[2].Storage, storageName)
cancel()
assert.True(t, expStorages[storageName], storageName, storageName)
delete(expStorages, storageName)
if len(expStorages) == 0 {
cancel()
}
return nil, nil
}
})
......@@ -924,7 +929,7 @@ func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) {
replMgr := NewReplMgr(
testhelper.DiscardTestEntry(t),
conf.VirtualStorageNames(),
conf.StorageNames(),
queueInterceptor,
nil,
StaticHealthChecker{virtualStorage: {node1.Storage, node3.Storage}},
......@@ -995,7 +1000,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
replMgr := NewReplMgr(
testhelper.DiscardTestEntry(t),
conf.VirtualStorageNames(),
conf.StorageNames(),
queue,
datastore.MockRepositoryStore{},
StaticHealthChecker{virtualStorage: {primaryStorage, secondaryStorage}},
......@@ -1024,7 +1029,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
<-replMgrDone
}
func TestBackoff(t *testing.T) {
func TestBackoffFactory(t *testing.T) {
start := 1 * time.Microsecond
max := 6 * time.Microsecond
expectedBackoffs := []time.Duration{
......@@ -1035,7 +1040,7 @@ func TestBackoff(t *testing.T) {
6 * time.Microsecond,
6 * time.Microsecond,
}
b, reset := ExpBackoffFunc(start, max)()
b, reset := ExpBackoffFactory{Start: start, Max: max}.Create()
for _, expectedBackoff := range expectedBackoffs {
require.Equal(t, expectedBackoff, b())
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment