Skip to content
Snippets Groups Projects
Commit 8fdefe68 authored by Eric Ju's avatar Eric Ju
Browse files

offloading: Add a mock bucket for simulation delay response

We have retry and cancel support in the offloading sink. In order
test those, we have to mock a bucket that can simulate delay and error.

In this commit, we add a mock bucket. The bucket is based on fileblob
bucket but intercept its response with cuostizable delay and error.
With such simulation, we can write test cases to examine the
offloading sink's behavior under long delay and frequent error.
parent 05e178eb
No related branches found
No related tags found
Loading
package offloading
import (
"context"
"errors"
"io"
"sync"
"time"
"gocloud.dev/blob"
)
var errSimulationCanceled = errors.New("canceled")
// simulation defines the data used for intercepting or simulating method behavior.
// If Delay is set, the method will introduce a delay of the specified duration before returning.
// If Err is set, the method will return the specified error instead of the normal result.
type simulation struct {
Delay time.Duration
Err error
}
// simulationBucket is a blob.Bucket with simulation setup.
type simulationBucket struct {
// simulationMap maps an object key to a list of simulations, defining how each key should behave.
simulationMap map[string][]simulation
// simulationSequence is a flattened version of simulationMap.
// It is useful for functions that need to iterate over all simulations in the map, such as list operations.
simulationSequence []simulation
currentSimulationIndex int
retryStat map[string]int
mu sync.Mutex
*blob.Bucket
}
func newSimulationBucket(bucket *blob.Bucket, s map[string][]simulation) (Bucket, error) {
seq := make([]simulation, 0)
for _, s := range s {
seq = append(seq, s...)
}
m := &simulationBucket{
simulationMap: s,
Bucket: bucket,
retryStat: make(map[string]int),
mu: sync.Mutex{},
currentSimulationIndex: 0,
simulationSequence: seq,
}
return m, nil
}
func (r *simulationBucket) Download(ctx context.Context, key string, writer io.Writer, opts *blob.ReaderOptions) error {
return r.simulate(ctx, key, func() error {
return r.Bucket.Download(ctx, key, writer, opts)
})
}
func (r *simulationBucket) Upload(ctx context.Context, key string, reader io.Reader, opts *blob.WriterOptions) error {
return r.simulate(ctx, key, func() error {
return r.Bucket.Upload(ctx, key, reader, opts)
})
}
func (r *simulationBucket) Delete(ctx context.Context, key string) error {
return r.simulate(ctx, key, func() error {
return r.Bucket.Delete(ctx, key)
})
}
// interceptedList essentially wraps the Bucket's List function.
// The difference is that it returns a listIteratorWrapper, which can inject simulation data into the blob.ListIterator.
func (r *simulationBucket) interceptedList(opts *blob.ListOptions) *listIteratorWrapper {
defer func() {
r.currentSimulationIndex++
}()
it := r.Bucket.List(opts)
var currentSimulation *simulation
if r.currentSimulationIndex >= len(r.simulationSequence) {
currentSimulation = nil
} else {
currentSimulation = &r.simulationSequence[r.currentSimulationIndex]
}
return &listIteratorWrapper{
ListIterator: it,
simulation: currentSimulation,
}
}
// listIteratorWrapper wraps a blob.ListIterator and allows simulation data to be injected.
// When Next is called, it prioritizes returning the simulation data if available.
type listIteratorWrapper struct {
simulation *simulation
*blob.ListIterator
}
func (r *listIteratorWrapper) Next(ctx context.Context) (*blob.ListObject, error) {
if r.simulation == nil {
return r.ListIterator.Next(ctx)
}
timer := time.NewTimer(r.simulation.Delay)
select {
case <-ctx.Done():
return nil, errSimulationCanceled
case <-timer.C:
if r.simulation.Err != nil {
return nil, r.simulation.Err
}
return r.ListIterator.Next(ctx)
}
}
func (r *simulationBucket) simulate(ctx context.Context, key string, callback func() error) error {
simulationData, found := r.simulationMap[key]
if !found {
return callback()
}
r.mu.Lock()
retryIndex, found := r.retryStat[key]
if found {
r.retryStat[key] = r.retryStat[key] + 1
} else {
r.retryStat[key] = 1
}
thisSimulation := simulationData[retryIndex]
timer := time.NewTimer(thisSimulation.Delay)
r.mu.Unlock()
select {
case <-ctx.Done():
return errSimulationCanceled
case <-timer.C:
if thisSimulation.Err != nil {
return thisSimulation.Err
}
return callback()
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment