Commit e927a39c authored by Pawel Rozlach's avatar Pawel Rozlach
Browse files

fix(driver/s3-v2): make retry delayer stateless

The s3_v2 driver shared a single cenkalti/backoff.ExponentialBackOff across
every request and retry for the life of the client. That object is stateful
and not safe for concurrent use, which caused two bugs:

- A data race on the backoff's internal currentInterval, reachable from
  concurrent S3 requests through BackoffDelay (flagged by go test -race).
- The interval only ever grew and stuck at the 60s cap, since it was never
  reset, so a fresh request's first retry could wait ~60s instead of
  starting near 500ms.

Replace it with a stateless delayForAttempt that computes the delay in closed
form from the attempt number the SDK passes in: an exponential curve
(InitialInterval * Multiplier^(attempt-1), capped at MaxInterval) jittered by
±RandomizationFactor, using the same common.Default* parameters as before so
retry timing is unchanged. Being stateless, a single delayer is safe to share
across concurrent requests and every request restarts the curve at attempt 1.
The driver still depends on cenkalti/backoff elsewhere; only this file stops
using it directly.

This backports artifact-registry!635.

Related to gitlab-org/ops/artifact-registry#204



Co-Authored-By: default avatarClaude Opus 4.8 (1M context) <noreply@anthropic.com>
parent ab584d14
Loading
Loading
Loading
Loading
+40 −19
Original line number Diff line number Diff line
@@ -3,11 +3,12 @@ package v2
import (
	"context"
	"fmt"
	"math"
	"math/rand/v2"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/aws/retry"
	"github.com/cenkalti/backoff/v4"
	"github.com/docker/distribution/registry/storage/driver/s3-aws/common"
	"github.com/docker/distribution/registry/storage/internal/metrics"
	log "github.com/sirupsen/logrus"
@@ -16,29 +17,17 @@ import (

var _ retry.BackoffDelayer = (*customDelayer)(nil)

type customDelayer struct {
	backoff backoff.BackOff
}
type customDelayer struct{}

// newCustomDelayer replaces the default delayer provided by aws-sdk-go-v2 with
// a more flexible `cenkalti/backoff/v4` one which also makes s3_v2 driver
// behave in a similar way to its predecessor `s3` driver.
// a more flexible one which also makes s3_v2 driver behave in a similar way to
// its predecessor `s3` driver.
func newCustomDelayer() retry.BackoffDelayer {
	b := backoff.NewExponentialBackOff()
	b.InitialInterval = common.DefaultInitialInterval
	b.RandomizationFactor = common.DefaultRandomizationFactor
	b.Multiplier = common.DefaultMultiplier
	b.MaxInterval = common.DefaultMaxInterval
	b.MaxElapsedTime = common.DefaultMaxElapsedTime

	// NOTE(prozlach): We do not specify the max attempts for the backoff here,
	// as it is handled by aws-sdk-go-v2 higher in the stack.

	return &customDelayer{backoff: b}
	return &customDelayer{}
}

func (cd *customDelayer) BackoffDelay(attempt int, sourceErr error) (time.Duration, error) {
	delay := cd.backoff.NextBackOff()
func (*customDelayer) BackoffDelay(attempt int, sourceErr error) (time.Duration, error) {
	delay := delayForAttempt(attempt)
	log.WithFields(
		log.Fields{
			"attempt": attempt,
@@ -50,6 +39,38 @@ func (cd *customDelayer) BackoffDelay(attempt int, sourceErr error) (time.Durati
	return delay, nil
}

// delayForAttempt returns the randomized exponential backoff for a retry
// attempt. aws-sdk-go-v2 numbers attempts from 1 (the delay before the first
// retry), so attempt 1 yields ~InitialInterval, attempt 2 ~InitialInterval*
// Multiplier, and so on, each capped at MaxInterval and jittered by
// ±RandomizationFactor. It is stateless: a single delayer is safe to share
// across concurrent requests, and every request restarts the curve at attempt 1
// instead of inheriting an interval another request has grown.
//
// NOTE(prozlach): This replaces an earlier design that shared a single, stateful
// backoff.ExponentialBackOff across every request. That object mutates an
// internal currentInterval on each call and is not safe for concurrent use,
// which both raced under concurrent requests and never reset (so the interval
// only grew and stuck at the 60s cap). The parameters are the same defaults the
// driver took from cenkalti/backoff, so retry timing is unchanged; the cap is
// reproduced exactly because cenkalti only ever clamps to MaxInterval once the
// raw exponential value would already exceed it.
func delayForAttempt(attempt int) time.Duration {
	exponent := max(attempt-1, 0)

	// Exponential growth, capped at the maximum interval. A huge attempt makes
	// math.Pow overflow to +Inf, which the comparison clamps to MaxInterval.
	interval := float64(common.DefaultInitialInterval) * math.Pow(common.DefaultMultiplier, float64(exponent))
	if interval > float64(common.DefaultMaxInterval) {
		interval = float64(common.DefaultMaxInterval)
	}

	// Jitter into [interval*(1-rf), interval*(1+rf)).
	delta := common.DefaultRandomizationFactor * interval
	//nolint:gosec // jitter for retry backoff; cryptographic randomness is unnecessary
	return time.Duration(interval - delta + rand.Float64()*(2*delta))
}

var _ aws.Retryer = (*customRetryer)(nil)

type customRetryer struct {
+119 −0
Original line number Diff line number Diff line
package v2

import (
	"sync"
	"testing"
	"time"

	"github.com/docker/distribution/registry/storage/driver/s3-aws/common"
	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
)

// backoffWindow returns the [lower, upper) bounds delayForAttempt can return for
// a given (un-jittered) interval: [interval*(1-rf), interval*(1+rf)).
func backoffWindow(interval time.Duration) (lower, upper time.Duration) {
	lower = time.Duration(float64(interval) * (1 - common.DefaultRandomizationFactor))
	upper = time.Duration(float64(interval) * (1 + common.DefaultRandomizationFactor))

	return lower, upper
}

// TestDelayForAttemptBounds checks that the delay for a given attempt falls
// within the expected jitter window: attempt 1 (and the non-positive guard)
// start from InitialInterval, the interval grows by the multiplier, and a high
// attempt saturates at MaxInterval.
func TestDelayForAttemptBounds(t *testing.T) {
	t.Parallel()

	tests := []struct {
		name     string
		attempt  int
		interval time.Duration
	}{
		{
			name:     "non-positive attempt clamps to the InitialInterval window",
			attempt:  0,
			interval: common.DefaultInitialInterval,
		},
		{
			name:     "first retry starts in the InitialInterval window",
			attempt:  1,
			interval: common.DefaultInitialInterval,
		},
		{
			name:     "second retry grows by the multiplier",
			attempt:  2,
			interval: time.Duration(float64(common.DefaultInitialInterval) * common.DefaultMultiplier),
		},
		{
			name:     "high attempt saturates at the MaxInterval window",
			attempt:  1000,
			interval: common.DefaultMaxInterval,
		},
	}

	for _, tc := range tests {
		t.Run(tc.name, func(t *testing.T) {
			t.Parallel()

			lower, upper := backoffWindow(tc.interval)

			delay := delayForAttempt(tc.attempt)

			assert.Positive(t, delay)
			assert.GreaterOrEqual(t, delay, lower)
			assert.Less(t, delay, upper)
		})
	}
}

// TestDelayForAttemptDoesNotCreep hammers attempt 1 many times. Every result
// must stay inside the attempt-1 window: a shared, stateful backoff would
// ratchet the interval upward call after call, which is the bug this change
// fixes.
func TestDelayForAttemptDoesNotCreep(t *testing.T) {
	t.Parallel()

	lower, upper := backoffWindow(common.DefaultInitialInterval)

	minDelay, maxDelay := upper, time.Duration(0)

	for range 1000 {
		delay := delayForAttempt(1)
		require.GreaterOrEqual(t, delay, lower)
		require.Less(t, delay, upper)

		minDelay = min(minDelay, delay)
		maxDelay = max(maxDelay, delay)
	}

	// The jitter window is symmetric around the interval, so the per-sample
	// bound checks above would still pass if the jitter collapsed to one half
	// (a dropped factor or a sign error). Across 1000 draws the observed minimum
	// must fall in the lower half and the maximum in the upper half, which pins
	// the jitter to the full window.
	assert.Less(t, minDelay, common.DefaultInitialInterval, "minimum sample should fall in the lower half of the window")
	assert.Greater(t, maxDelay, common.DefaultInitialInterval, "maximum sample should fall in the upper half of the window")
}

// TestDelayForAttemptConcurrentUseIsRaceFree exercises the delay computation
// from many goroutines. Run with -race, it guards against reintroducing shared
// mutable state in the backoff curve.
func TestDelayForAttemptConcurrentUseIsRaceFree(t *testing.T) {
	t.Parallel()

	var wg sync.WaitGroup

	for i := range 50 {
		attempt := i%5 + 1

		wg.Go(func() {
			for range 100 {
				_ = delayForAttempt(attempt)
			}
		})
	}

	wg.Wait()
}