Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • limansky/gitaly
  • cmingxu/gitaly
  • bugrargn/gitaly
  • asedge/gitaly
  • softwareplumber/gitaly
  • gitlab-org/gitaly
  • armbiant/gitlab-gitaly
  • sinwav/gitaly
  • Mattlk13/gitaly
  • Ruby-and-Friends/gitaly
  • kutelev/gitaly
  • bbodenmiller/gitaly
  • alejandro/gitaly
  • lukeshu/gitaly
  • eddiet/gitaly
  • mpoornima/gitaly
  • dturner_ts/gitaly
  • zj-gitlab/gitaly
  • nadiavu/gitaly
  • balasankarc/gitaly
  • mpszone/gitaly
  • sysu-liyanliang/gitaly
  • frenkel/gitaly
  • lfauts/gitaly
  • yang-zaixin/gitaly
  • mydigitalself/gitaly
  • litongdeng/gitaly
  • toon/gitaly
  • axot/gitaly
  • josecordaz/gitaly
  • sj14/gitaly
  • MaykonOliveira/gitaly
  • zjkyz8/gitaly
  • svankmajer/gitaly
  • qipa/gitaly
  • sptr/gitaly
  • kiameisomabes/gitaly
  • bradobro/gitaly
  • fcgravalos/gitaly
  • reprazent/gitaly
  • tbuida/gitaly
  • arjunmayilvaganan/gitaly
  • b1n/gitaly
  • timniederhausen/gitaly
  • maksim-paskal/gitaly
  • zhutiankai/gitaly
  • maxmati/gitaly
  • 358141721/gitaly
  • despinosa/gitaly
  • c.bianchi/gitaly
  • tamilselvan10021998/gitaly
  • Randomneo/gitaly
  • yemanaung/gitaly
  • siemens/gitaly
  • avar/gitaly
  • SeananXu/gitaly
  • smusavi/gitaly
  • johncai/gitaly
  • utkarsh2102/gitaly
  • jhenkens/gitaly
  • ba2014sheer/gitaly
  • jacobvosmaer-gitlab/gitaly
  • Esteban_Carnuccio/gitaly
  • d0c-s4vage/gitaly
  • thetradedesk/gitaly
  • AfolabiOlaoluwa/gitaly
  • njkevlani/gitaly
  • chriscool/gitaly
  • ethan.gitlab/gitaly
  • GeniusLearner/gitaly
  • wasphin/gitaly
  • CauhxMilloy/gitaly
  • nnelson/gitaly
  • simchatop/gitaly
  • htkaslan/gitaly
  • hemanthdev/gitaly
  • shovanmaity/gitaly
  • Temtaime/gitaly
  • Ash560/gitaly
  • gitlab-bot-readonly/gitaly
  • tylerthetester/gitaly
  • sushovan/gitaly
  • shy-Xu/gitaly
  • thmr1/gitaly
  • pravi/gitaly
  • EphremDms/gitaly
  • sluongng/gitaly
  • nejc/gitaly
  • ksashikumar/gitaly
  • motogami/gitaly
  • bs223617/gitaly
  • eagle_fly_sky/gitaly
  • hashworks/gitaly
  • anatol.pomozov/gitaly
  • T4cC0re/gitaly
  • ethan.reesor/contrib/gitaly
  • czh592/gitaly
  • wchandler/gitaly
  • diogo_nicoleti_ifood/gitaly
  • bulemka12345poczta/gitaly
  • taroguru/gitaly
  • dyrone/gitaly
  • inguin/gitaly
  • mathieuusoyan/gitaly
  • mfechner/gitaly
  • wwwicbd/icbd-gitaly
  • blanet/gitaly
  • paulsd2020/gitaly
  • harry-hov/gitaly
  • pranav/gitaly
  • abhitidarbar/gitaly
  • heygirlhey356/gitaly
  • maoqiang/gitaly
  • yyexplore/gitaly
  • cgsyam/gitaly
  • behrmann/gitaly
  • nfishe/gitaly
  • g4s8/gitaly
  • brett.higgins/gitaly
  • test11042/gitaly
  • jeffzhu503/gitaly
  • TheSide/gitaly
  • nanmu42/gitaly
  • van4elotti/gitaly
  • Cactusinhand/gitaly
  • yamini.bhaskar007/gitaly
  • icbd/gitaly
  • ooulwluoo/gitaly
  • nagypeter/gitaly
  • Abhilashsiyer/gitaly
  • jiangxin/gitaly
  • KimJi55/gitaly
  • flycutter/gitaly
  • leonard-adleman-sec/gitaly
  • ryan-qianchen/gitaly
  • feistel/gitaly
  • snowcrystall/gitaly
  • edith007/gitaly
  • zhanglinjie/gitaly
  • luminarrr/gitaly
  • stephankirsten/gitaly
  • etanot/gitaly
  • nikovega21/gitaly
  • m0rosan/gitaly
  • HaroldKnowlden/gitaly
  • iotcl/test/gitaly
  • okenwaonyebuchi103/gitaly
  • simpleclickers/gitaly
  • kdaudt/gitaly
  • YushuaiLI/gitaly
  • davebarr/gitaly
  • dgsdoug/gitaly
  • trakos/gitaly
  • XciD/gitaly
  • imskr/gitaly
  • mj.bruijns/gitaly
  • markoke256/gitaly
  • Sahil162/gitaly
  • Kartik1397/gitaly
  • akumar1503/gitaly
  • bill2022/gitaly
  • ncu_gsoc/gitaly
  • SURA907/gitaly
  • hongeinh/gitaly
  • joshua.sickmeyer/gitaly
  • adlternative/gitaly
  • amstal93/gitaly
  • gitlab-renovate-forks/gitaly
  • xiaowenxia/gitaly
  • checkscale-gitlab/gitaly
  • KyleFromKitware/gitaly
  • n0h4ppy/gitaly
  • pwn3/gitaly
  • jokerpwn/gitaly
  • salamacha7389/gitaly
  • L11R/gitaly
  • hanmingliang/gitaly
  • Isengart1/gitaly
  • cbj/gitaly
  • gitlab-community/gitaly
  • shamanthb90/gitaly
  • mjkalasky2/gitaly
  • VladPetriv/gitaly
  • arkn98/gitaly
  • scwang18/gitaly
  • sebicioacata/gitaly
  • edsonmichaque/gitaly
  • Kexin2000/gitaly
  • johnwparent/gitaly
  • kevin.rojas/wr-gitaly
  • marc.ratombotsoa/gitaly
  • igor.drozdov/gitaly
  • radityasurya1911/gitaly
  • CS2Us/gitaly
  • RryLee/gitaly
  • liruixin-coding/gitaly
  • gerardo/gitaly
  • josephburnett/gitaly
  • b.s.strelnikov_tinkoff/gitaly
  • OK_MF/gitaly-fork
  • zeb0x01/gitaly
  • 8bitlife/gitaly
  • nraj0408/gitaly
  • troyfox758/gitaly
  • ouladsine.saloua/gitaly
  • qeesung/gitaly
  • dzcdada123/gitaly
  • ashmckenzie/gitaly
  • Freedisch/gitaly
  • xsizxenjin/gitaly
  • eric.p.ju/gitaly
  • zzhzero/gitaly
  • lenghan1991/gitaly
  • jhammer101/gitaly
  • wheredidfranciscogo1/gitaly
  • zakiir206/gitaly
  • jonas.hogman/gitaly
  • sauravchanda9/gitaly
  • lzampier/gitaly
  • fkhe/gitaly
  • liaoxingju/gitaly
  • bufdev/gitaly
  • a.songer/gitaly
  • bcooksley/gitaly
  • Wangyadong1108/gitaly
  • MashyBasker/gitaly
  • nickaldwin/gitaly
  • avevlad/gitaly
  • sat-h/gitaly
  • wingred96399/gitaly-mine
  • chen_feng/gitaly
  • grootwang/gitaly
  • bhatianikhil551/gitaly
  • schuam_alice/gitaly
  • armbiant/hive-gitaly
  • ollevche/gitaly
  • HassanAkbar/gitaly
  • echui-gitlab/gitaly
  • ebrahim.poursadeghi/gitaly
239 results
Show changes
package raftmgr
import (
"bytes"
"context"
"fmt"
"gitlab.com/gitlab-org/gitaly/v16/internal/archive"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"go.etcd.io/etcd/raft/v3/raftpb"
"google.golang.org/protobuf/proto"
)
// Transport defines the interface for sending Raft protocol messages.
type Transport interface {
// Send dispatches a batch of Raft messages. It returns an error if the sending fails. This function receives a
// context, the list of messages to send and a function that returns the path of WAL directory of a particular
// log entry. The implementation must respect input context's cancellation.
Send(ctx context.Context, walDirForLSN func(storage.LSN) string, messages []raftpb.Message) error
// GetRecordedMessages retrieves all recorded messages if recording is enabled.
// This is typically used in a testing environment to verify message transmission.
GetRecordedMessages() []raftpb.Message
}
// NoopTransport is a transport implementation that logs messages and optionally records them.
// It is useful in testing environments where message delivery is non-functional but needs to be observed.
type NoopTransport struct {
logger log.Logger // Logger for outputting message information
recordTransport bool // Flag indicating whether message recording is enabled
recordedMessages []*raftpb.Message // Slice to store recorded messages
}
// NewNoopTransport constructs a new NoopTransport instance.
// The logger is used for logging message information, and the recordTransport flag
// determines whether messages should be recorded.
func NewNoopTransport(logger log.Logger, recordTransport bool) Transport {
return &NoopTransport{
logger: logger,
recordTransport: recordTransport,
}
}
// Send logs each message being sent and records it if recording is enabled.
func (t *NoopTransport) Send(ctx context.Context, pathForLSN func(storage.LSN) string, messages []raftpb.Message) error {
for i := range messages {
for j := range messages[i].Entries {
if messages[i].Entries[j].Type != raftpb.EntryNormal {
continue
}
var msg gitalypb.RaftEntry
if err := proto.Unmarshal(messages[i].Entries[j].Data, &msg); err != nil {
return fmt.Errorf("unmarshalling entry type: %w", err)
}
// This is a very native implementation. Noop Transport is only used for testing
// purposes. All external messages are swallowed and stored in a recorder. It packages
// the whole log entry directory as a tar ball using an existing backup utility. The
// resulting binary data is stored inside a subfield of the message for examining
// purpose. A real implementation of Transaction will likely use an optimized method
// (such as sidechannel) to deliver the data. It does not necessarily store the data in
// the memory.
if len(msg.GetData().GetPacked()) == 0 {
lsn := storage.LSN(messages[i].Entries[j].Index)
path := pathForLSN(lsn)
if err := t.packLogData(ctx, lsn, &msg, path); err != nil {
return fmt.Errorf("packing log data: %w", err)
}
}
data, err := proto.Marshal(&msg)
if err != nil {
return fmt.Errorf("marshaling Raft entry: %w", err)
}
messages[i].Entries[j].Data = data
}
t.logger.WithFields(log.Fields{
"raft.type": messages[i].Type,
"raft.to": messages[i].To,
"raft.from": messages[i].From,
"raft.term": messages[i].Term,
"raft.num_entries": len(messages[i].Entries),
}).Info("sending message")
// Record messages if recording is enabled.
if t.recordTransport {
t.recordedMessages = append(t.recordedMessages, &messages[i])
}
}
return nil
}
func (t *NoopTransport) packLogData(ctx context.Context, lsn storage.LSN, message *gitalypb.RaftEntry, logEntryPath string) error {
var logData bytes.Buffer
if err := archive.WriteTarball(ctx, t.logger.WithFields(log.Fields{
"raft.component": "WAL archiver",
"raft.log_entry_lsn": lsn,
"raft.log_entry_path": logEntryPath,
}), &logData, logEntryPath, "."); err != nil {
return fmt.Errorf("archiving WAL log entry")
}
message.Data = &gitalypb.RaftEntry_LogData{
LocalPath: message.GetData().GetLocalPath(),
Packed: logData.Bytes(),
}
return nil
}
// GetRecordedMessages returns the list of recorded messages.
func (t *NoopTransport) GetRecordedMessages() []raftpb.Message {
messages := make([]raftpb.Message, 0, len(t.recordedMessages))
for _, m := range t.recordedMessages {
messages = append(messages, *m)
}
return messages
}
package raftmgr
import (
"bytes"
"fmt"
"io/fs"
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v16/internal/archive"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"go.etcd.io/etcd/raft/v3/raftpb"
"google.golang.org/protobuf/proto"
)
func TestNoopTransport_Send(t *testing.T) {
t.Parallel()
mustMarshalProto := func(msg proto.Message) []byte {
data, err := proto.Marshal(msg)
if err != nil {
panic(fmt.Sprintf("failed to marshal proto: %v", err))
}
return data
}
tests := []struct {
name string
setupFunc func(tempDir string) ([]raftpb.Message, testhelper.DirectoryState)
}{
{
name: "No messages",
setupFunc: func(tempDir string) ([]raftpb.Message, testhelper.DirectoryState) {
return []raftpb.Message{}, nil
},
},
{
name: "Empty Entries",
setupFunc: func(tempDir string) ([]raftpb.Message, testhelper.DirectoryState) {
return []raftpb.Message{
{
Type: raftpb.MsgApp,
From: 2,
To: 1,
Term: 1,
Entries: []raftpb.Entry{}, // Empty Entries
},
}, nil
},
},
{
name: "Messages with already packed data",
setupFunc: func(tempDir string) ([]raftpb.Message, testhelper.DirectoryState) {
initialMessage := gitalypb.RaftEntry{
Id: 1,
Data: &gitalypb.RaftEntry_LogData{Packed: []byte("already packed data")},
}
messages := []raftpb.Message{
{
Type: raftpb.MsgApp,
From: 2,
To: 1,
Term: 1,
Index: 1,
Entries: []raftpb.Entry{{Index: uint64(1), Type: raftpb.EntryNormal, Data: mustMarshalProto(&initialMessage)}},
},
}
return messages, nil
},
},
{
name: "Messages with referenced data",
setupFunc: func(tempDir string) ([]raftpb.Message, testhelper.DirectoryState) {
// Simulate a log entry dir with files
fileContents := testhelper.DirectoryState{
".": {Mode: archive.TarFileMode | archive.ExecuteMode | fs.ModeDir},
"1": {Mode: archive.TarFileMode, Content: []byte("file1 content")},
"2": {Mode: archive.TarFileMode, Content: []byte("file2 content")},
"3": {Mode: archive.TarFileMode, Content: []byte("file3 content")},
}
for name, file := range fileContents {
if file.Content != nil {
content := file.Content.([]byte)
require.NoError(t, os.WriteFile(filepath.Join(tempDir, name), content, 0o644))
}
}
initialMessage := gitalypb.RaftEntry{
Id: 1,
Data: &gitalypb.RaftEntry_LogData{LocalPath: []byte(tempDir)},
}
messages := []raftpb.Message{
{
Type: raftpb.MsgApp,
From: 2,
To: 1,
Term: 1,
Index: 1,
Entries: []raftpb.Entry{
{Index: uint64(1), Type: raftpb.EntryNormal, Data: mustMarshalProto(&initialMessage)},
},
},
}
return messages, fileContents
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
// Create a temporary directory
tempDir := testhelper.TempDir(t)
// Execute setup function to prepare messages and any necessary file contents
messages, expectedContents := tc.setupFunc(tempDir)
// Setup logger and transport
logger := testhelper.SharedLogger(t)
transport := NewNoopTransport(logger, true)
// Execute the Send operation
require.NoError(t, transport.Send(testhelper.Context(t), func(storage.LSN) string { return tempDir }, messages))
// Fetch recorded messages for verification
recordedMessages := transport.GetRecordedMessages()
require.Len(t, recordedMessages, len(messages))
// Messages must be sent in order.
for i := range messages {
require.Equal(t, messages[i].Type, recordedMessages[i].Type)
require.Equal(t, messages[i].From, recordedMessages[i].From)
require.Equal(t, messages[i].To, recordedMessages[i].To)
require.Equal(t, messages[i].Term, recordedMessages[i].Term)
require.Equal(t, messages[i].Index, recordedMessages[i].Index)
if len(messages[i].Entries) == 0 {
require.Empty(t, recordedMessages[i].Entries)
} else {
var resultMessage gitalypb.RaftEntry
require.NoError(t, proto.Unmarshal(recordedMessages[i].Entries[0].Data, &resultMessage))
require.True(t, len(resultMessage.GetData().GetPacked()) > 0, "packed data must have packed type")
tarballData := resultMessage.GetData().GetPacked()
require.NotEmpty(t, tarballData)
// Optionally verify packed data if expected
if expectedContents != nil {
// Verify tarball content matches expectations
reader := bytes.NewReader(tarballData)
testhelper.RequireTarState(t, reader, expectedContents)
}
}
}
})
}
}
......@@ -45,6 +45,8 @@ var NonTransactionalRPCs = map[string]struct{}{
gitalypb.ServerService_ServerInfo_FullMethodName: {},
gitalypb.ServerService_ReadinessCheck_FullMethodName: {},
gitalypb.ServerService_ServerSignature_FullMethodName: {},
// This RPC does not need to be transactional as it acts as a forwarder.
gitalypb.RaftService_SendMessage_FullMethodName: {},
}
// repositoryCreatingRPCs are all of the RPCs that may create a repository.
......
package offloading
import (
"math/rand"
"time"
"gitlab.com/gitlab-org/gitaly/v16/internal/backoff"
)
var (
// defaultOverallTimeout is the default overall timeout for the sink.
defaultOverallTimeout = 1 * time.Minute
// defaultMaxRetry is the default max backoffStrategy for the sink.
defaultMaxRetry = uint(2)
// defaultRetryTimeout is the default backoffStrategy timeout for the sink.
defaultRetryTimeout = 15 * time.Second
// defaultBackoffStrategy is the default backoff strategy for the sink.
defaultBackoffStrategy = backoff.NewDefaultExponential(rand.New(rand.NewSource(time.Now().UnixNano())))
)
type sinkCfg struct {
overallTimeout time.Duration
maxRetry uint
noRetry bool
retryTimeout time.Duration
backoffStrategy backoff.Strategy
}
// SinkOption modifies a sink configuration.
type SinkOption func(*sinkCfg)
// WithOverallTimout sets the overallTimeout.
// The overallTimeout specifies the maximum allowed duration for the entire operation,
// including all retries. This ensures the operation completes or fails within a bounded time.
func WithOverallTimout(timeout time.Duration) SinkOption {
return func(s *sinkCfg) {
s.overallTimeout = timeout
}
}
// WithMaxRetry sets the max backoffStrategy attempt. Use WithNoRetry if you do not want any retry.
// If the operation supports retries, it will attempt the operation up to maxRetry times
// with retryTimeout intervals between each attempt. Once the overallTimeout is exceeded,
// the operation will be terminated even if maxRetry attempts not been exhausted.
func WithMaxRetry(max uint) SinkOption {
return func(s *sinkCfg) {
s.maxRetry = max
}
}
// WithNoRetry disables the backoffStrategy for the sink.
func WithNoRetry() SinkOption {
return func(s *sinkCfg) {
s.maxRetry = 0
s.noRetry = true
}
}
// WithRetryTimeout sets the retryTimeout.
// The retryTimeout specifies the maximum time to wait for each individual backoffStrategy attempt.
// This is useful for handling transient failures in a single operation while respecting
// the overall timeout.
func WithRetryTimeout(duration time.Duration) SinkOption {
return func(s *sinkCfg) {
s.retryTimeout = duration
}
}
// WithBackoffStrategy sets the backoff strategy for retrying failed operations.
func WithBackoffStrategy(strategy backoff.Strategy) SinkOption {
return func(s *sinkCfg) {
s.backoffStrategy = strategy
}
}
package offloading
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestOffloadingSinkOptions(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
desc string
options []SinkOption
expectedCfg sinkCfg
}{
{
desc: "with default values",
options: []SinkOption{
WithOverallTimout(defaultOverallTimeout),
WithMaxRetry(defaultMaxRetry),
WithRetryTimeout(defaultRetryTimeout),
WithBackoffStrategy(defaultBackoffStrategy),
},
expectedCfg: sinkCfg{
overallTimeout: defaultOverallTimeout,
maxRetry: defaultMaxRetry,
retryTimeout: defaultRetryTimeout,
backoffStrategy: defaultBackoffStrategy,
},
},
{
desc: "with no backoffStrategy",
options: []SinkOption{
WithNoRetry(),
},
expectedCfg: sinkCfg{
maxRetry: 0,
noRetry: true,
},
},
{
desc: "with customized values",
options: []SinkOption{
WithOverallTimout(20 * time.Second),
WithMaxRetry(100),
WithRetryTimeout(2 * time.Second),
WithBackoffStrategy(&backoffStrategyInTest),
},
expectedCfg: sinkCfg{
overallTimeout: 20 * time.Second,
maxRetry: 100,
retryTimeout: 2 * time.Second,
backoffStrategy: &backoffStrategyInTest,
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
cfg := sinkCfg{}
for _, apply := range tc.options {
apply(&cfg)
}
require.Equal(t, tc.expectedCfg, cfg)
})
}
}
package offloading
import (
"context"
"errors"
"io"
"sync"
"time"
"gocloud.dev/blob"
)
var errSimulationCanceled = errors.New("canceled")
// simulation defines the data used for intercepting or simulating method behavior.
// If Delay is set, the method will introduce a delay of the specified duration before returning.
// If Err is set, the method will return the specified error instead of the normal result.
type simulation struct {
Delay time.Duration
Err error
}
// simulationBucket is a blob.Bucket with simulation setup.
type simulationBucket struct {
// simulationMap maps an object key to a list of simulations, defining how each key should behave.
simulationMap map[string][]simulation
// simulationSequence is a flattened version of simulationMap.
// It is useful for functions that need to iterate over all simulations in the map, such as list operations.
simulationSequence []simulation
currentSimulationIndex int
retryStat map[string]int
mu sync.Mutex
*blob.Bucket
}
func newSimulationBucket(bucket *blob.Bucket, s map[string][]simulation) (Bucket, error) {
seq := make([]simulation, 0)
for _, s := range s {
seq = append(seq, s...)
}
m := &simulationBucket{
simulationMap: s,
Bucket: bucket,
retryStat: make(map[string]int),
mu: sync.Mutex{},
currentSimulationIndex: 0,
simulationSequence: seq,
}
return m, nil
}
func (r *simulationBucket) Download(ctx context.Context, key string, writer io.Writer, opts *blob.ReaderOptions) error {
return r.simulate(ctx, key, func() error {
return r.Bucket.Download(ctx, key, writer, opts)
})
}
func (r *simulationBucket) Upload(ctx context.Context, key string, reader io.Reader, opts *blob.WriterOptions) error {
return r.simulate(ctx, key, func() error {
return r.Bucket.Upload(ctx, key, reader, opts)
})
}
func (r *simulationBucket) Delete(ctx context.Context, key string) error {
return r.simulate(ctx, key, func() error {
return r.Bucket.Delete(ctx, key)
})
}
// interceptedList essentially wraps the Bucket's List function.
// The difference is that it returns a listIteratorWrapper, which can inject simulation data into the blob.ListIterator.
func (r *simulationBucket) interceptedList(opts *blob.ListOptions) *listIteratorWrapper {
defer func() {
r.currentSimulationIndex++
}()
it := r.Bucket.List(opts)
var currentSimulation *simulation
if r.currentSimulationIndex >= len(r.simulationSequence) {
currentSimulation = nil
} else {
currentSimulation = &r.simulationSequence[r.currentSimulationIndex]
}
return &listIteratorWrapper{
ListIterator: it,
simulation: currentSimulation,
}
}
// listIteratorWrapper wraps a blob.ListIterator and allows simulation data to be injected.
// When Next is called, it prioritizes returning the simulation data if available.
type listIteratorWrapper struct {
simulation *simulation
*blob.ListIterator
}
func (r *listIteratorWrapper) Next(ctx context.Context) (*blob.ListObject, error) {
if r.simulation == nil {
return r.ListIterator.Next(ctx)
}
timer := time.NewTimer(r.simulation.Delay)
select {
case <-ctx.Done():
return nil, errSimulationCanceled
case <-timer.C:
if r.simulation.Err != nil {
return nil, r.simulation.Err
}
return r.ListIterator.Next(ctx)
}
}
func (r *simulationBucket) simulate(ctx context.Context, key string, callback func() error) error {
simulationData, found := r.simulationMap[key]
if !found {
return callback()
}
r.mu.Lock()
retryIndex, found := r.retryStat[key]
if found {
r.retryStat[key] = r.retryStat[key] + 1
} else {
r.retryStat[key] = 1
}
thisSimulation := simulationData[retryIndex]
timer := time.NewTimer(thisSimulation.Delay)
r.mu.Unlock()
select {
case <-ctx.Done():
return errSimulationCanceled
case <-timer.C:
if thisSimulation.Err != nil {
return thisSimulation.Err
}
return callback()
}
}
package offloading
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"time"
"gitlab.com/gitlab-org/gitaly/v16/internal/backoff"
"gocloud.dev/blob"
"golang.org/x/sync/errgroup"
_ "gocloud.dev/blob/azureblob" // register Azure driver
_ "gocloud.dev/blob/fileblob" // register file driver
_ "gocloud.dev/blob/gcsblob" // register Google Cloud driver
_ "gocloud.dev/blob/memblob" // register in-memory driver
_ "gocloud.dev/blob/s3blob"
)
var (
// deletionGoroutineLimit is the upper bound parallel number of goroutines to delete objects
deletionGoroutineLimit = 5
errMissingBucket = errors.New("missing bucket")
)
// Bucket is an interface to abstract the behavior of the gocloud.dev/blob Bucket type.
// This abstraction is especially useful when adding a customized bucket to intercept traffic
// or to modify the functionality for specific use cases.
type Bucket interface {
Download(ctx context.Context, key string, w io.Writer, opts *blob.ReaderOptions) error
Upload(ctx context.Context, key string, r io.Reader, opts *blob.WriterOptions) error
List(opts *blob.ListOptions) *blob.ListIterator
Delete(ctx context.Context, key string) (err error)
Attributes(ctx context.Context, key string) (*blob.Attributes, error)
Close() error
}
// Iterator is an interface to abstract the behavior of the gocloud.dev/blob ListIterator type.
// This abstraction is especially useful when adding a customized bucket to intercept traffic
// or to modify the functionality for specific use cases.
type Iterator interface {
Next(ctx context.Context) (*blob.ListObject, error)
}
// Sink is a wrapper around the storage bucket, providing an interface for
// operations on offloaded objects.
type Sink struct {
overallTimeout time.Duration
bucket Bucket
backoffStrategy backoff.Strategy
maxRetry uint
noRetry bool
retryTimeout time.Duration
}
// NewSink creates a Sink from the given options. If some options are not specified,
// the function will use the default values for them.
func NewSink(bucket Bucket, options ...SinkOption) (*Sink, error) {
if bucket == nil {
return nil, errMissingBucket
}
var cfg sinkCfg
for _, apply := range options {
apply(&cfg)
}
sink := &Sink{
overallTimeout: cfg.overallTimeout,
bucket: bucket,
backoffStrategy: cfg.backoffStrategy,
maxRetry: cfg.maxRetry,
noRetry: cfg.noRetry,
retryTimeout: cfg.retryTimeout,
}
// fills in default values for missing options.
sink.setDefaults()
return sink, nil
}
// Upload uploads a file located at fullFilePath to the bucket under the specified prefix.
// The fullFilePath include the file name, e.g. /tmp/foo.txt.
func (r *Sink) Upload(ctx context.Context, fullFilePath string, prefix string) (returnErr error) {
ctx, cancel := context.WithTimeout(ctx, r.overallTimeout)
defer cancel()
file, err := os.Open(fullFilePath)
if err != nil {
return fmt.Errorf("open file: %w", err)
}
defer func() {
if err := file.Close(); err != nil {
returnErr = errors.Join(returnErr, fmt.Errorf("close file: %w", err))
}
}()
objectKey := fmt.Sprintf("%s/%s", prefix, filepath.Base(fullFilePath))
if err := r.withRetry(ctx, func(operationCtx context.Context) error {
return r.bucket.Upload(operationCtx, objectKey, file, &blob.WriterOptions{
// 'no-store' - we don't want the offloaded blobs to be cached as the content could be changed,
// so we always want a fresh and up-to-date data
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control#cacheability
// 'no-transform' - disallows intermediates to modify data
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control#other
CacheControl: "no-store, no-transform",
ContentType: "application/octet-stream",
})
}); err != nil {
return fmt.Errorf("upload object %q: %w", objectKey, err)
}
return nil
}
// Download retrieves a file from the bucket and saves it to the specified location on the local file system.
// The objectKey is the key of the object in the bucket, which includes the prefix and
// object name (e.g., "prefix/my_object.idx"); fullFilePath is full path on the local file system where the
// object will be saved including the file name (e.g., "/tmp/foo.txt").
func (r *Sink) Download(ctx context.Context, objectKey string, fullFilePath string) (returnErr error) {
ctx, cancel := context.WithTimeout(ctx, r.overallTimeout)
defer cancel()
file, err := os.Create(fullFilePath)
if err != nil {
return fmt.Errorf("create file: %w", err)
}
defer func() {
err := file.Close()
if returnErr == nil {
// Downloading is successful, check of we can close the file.
if err != nil {
returnErr = errors.Join(returnErr, fmt.Errorf("close file: %w", err))
}
} else {
// Downloading has error, delete the file anyway.
if err := os.Remove(fullFilePath); err != nil {
returnErr = errors.Join(returnErr,
fmt.Errorf("remove file when downloading failed: %w", err))
}
}
}()
if err := r.withRetry(ctx, func(operationCtx context.Context) error {
return r.bucket.Download(operationCtx, objectKey, file, nil)
}); err != nil {
return fmt.Errorf("download object: %w", err)
}
return nil
}
// List lists all objects in the bucket that have the specified prefix.
func (r *Sink) List(ctx context.Context, prefix string) (res []string, err error) {
return listHandler(ctx, r, prefix, r.bucket.List)
}
type listFunc[T Iterator] func(opts *blob.ListOptions) T
// listHandler is responsible for loading the listFunc and executing it to perform the list operation.
//
// Generics are used here because the List signature in the bucket interface uses ListIterator,
// which is a concrete type rather than an interface. If we need to intercept or modify ListIterator 's behavior,
// such as adding delays or intentionally returning errors, generics provide the necessary flexibility to achieve this.
func listHandler[T Iterator](ctx context.Context, r *Sink, prefix string, listFunc listFunc[T]) (res []string, err error) {
prefix = filepath.Clean(prefix)
// listExecutor is where we call the listFunc perform the list operation.
// We can put listExecutor in later retry loop.
listExecutor := func(operationCtx context.Context) (res []string, err error) {
var listErr error
var attrs *blob.ListObject
it := listFunc(&blob.ListOptions{
Prefix: prefix + "/",
Delimiter: "/",
})
objects := make([]string, 0)
for {
attrs, listErr = it.Next(operationCtx)
if listErr != nil {
if errors.Is(listErr, io.EOF) {
return objects, nil
}
return []string{}, fmt.Errorf("list object: %w", listErr)
}
// exclude the bucketPrefix "folder" itself
if attrs != nil && attrs.Key != prefix+"/" {
objects = append(objects, filepath.Base(attrs.Key))
}
}
}
ctx, cancel := context.WithTimeout(ctx, r.overallTimeout)
defer cancel()
if err := r.withRetry(ctx, func(operationCtx context.Context) error {
res, err = listExecutor(operationCtx)
return err
}); err != nil {
return []string{}, fmt.Errorf("list object %w", err)
}
return res, nil
}
// DeleteObjects attempts to delete the specified objects within the given prefix.
// The result is a map of objectKey to error. Successfully deleted objects will not appear in the map.
func (r *Sink) DeleteObjects(ctx context.Context, prefix string, objectNames []string) map[string]error {
res := make(map[string]error)
if len(objectNames) == 0 {
return res
}
ctx, cancel := context.WithTimeout(ctx, r.overallTimeout)
defer cancel()
type deleteResult struct {
object string
err error
}
resCh := make(chan deleteResult)
group := errgroup.Group{}
group.SetLimit(deletionGoroutineLimit)
for _, object := range objectNames {
group.Go(func() error {
// var err error
obj := fmt.Sprintf("%s/%s", prefix, filepath.Base(object))
if err := r.withRetry(ctx, func(operationCtx context.Context) error {
return r.bucket.Delete(operationCtx, obj)
}); err != nil {
resCh <- deleteResult{object: obj, err: err}
}
return nil
})
}
go func() {
// ignore error here since we use resCh to deal with error returned from the operation
// no error is returned from the function called by group.Go()
_ = group.Wait()
close(resCh)
}()
for delRes := range resCh {
res[delRes.object] = delRes.err
}
return res
}
// setDefaults fills in default values for missing options.
func (r *Sink) setDefaults() {
// Retry is wanted but it is not configured.
if !r.noRetry && r.maxRetry == 0 {
r.maxRetry = defaultMaxRetry
}
if r.overallTimeout == 0 {
r.overallTimeout = defaultOverallTimeout
}
if r.backoffStrategy == nil {
r.backoffStrategy = defaultBackoffStrategy
}
if r.retryTimeout == 0 {
r.retryTimeout = defaultRetryTimeout
}
}
// withRetry retries the given operation until it succeeds or the maximum number of retries is reached.
func (r *Sink) withRetry(ctx context.Context, op func(context.Context) error) error {
var err error
for retry := uint(0); retry <= r.maxRetry; {
err = func() error {
operationCtx, operationCancel := context.WithTimeout(ctx, r.retryTimeout)
defer operationCancel()
return op(operationCtx)
}()
if err == nil || r.noRetry {
break
}
timer := time.NewTimer(r.backoffStrategy.Backoff(retry))
select {
case <-ctx.Done():
timer.Stop()
err = fmt.Errorf("backoffStrategy operation %w", err)
return err
case <-timer.C:
// Refresh timer expires, issue another try.
retry++
}
}
return err
}
This diff is collapsed.
package offloading
import (
"testing"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
)
func TestMain(m *testing.M) {
testhelper.Run(m)
}
......@@ -123,6 +123,12 @@ func configure() (_ func(), returnedErr error) {
}
}
if refFormat := os.Getenv("GITALY_TEST_REF_FORMAT"); len(refFormat) > 0 {
if err := os.Setenv("GIT_DEFAULT_REF_FORMAT", refFormat); err != nil {
return nil, fmt.Errorf("setting default ref format: %w", err)
}
}
// We need to make sure that we're gitconfig-clean: Git should not pick up
// gitconfig files from anywhere but the repository itself in case they're configured to
// ignore them. We set that configuration by default in our tests to have a known-good
......
......@@ -294,7 +294,7 @@ type gitalyServerDeps struct {
signingKey string
transactionRegistry *storagemgr.TransactionRegistry
procReceiveRegistry *hook.ProcReceiveRegistry
bundleGenerationMgr *bundleuri.GenerationManager
bundleURIManager *bundleuri.GenerationManager
localRepoFactory localrepo.Factory
}
......@@ -434,30 +434,30 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Conte
gsd.localRepoFactory = localrepo.NewFactory(gsd.logger, gsd.locator, gsd.gitCmdFactory, gsd.catfileCache)
return &service.Dependencies{
Logger: gsd.logger,
Cfg: cfg,
ClientPool: gsd.conns,
StorageLocator: gsd.locator,
TransactionManager: gsd.txMgr,
GitalyHookManager: gsd.hookMgr,
GitCmdFactory: gsd.gitCmdFactory,
BackchannelRegistry: gsd.backchannelReg,
GitlabClient: gsd.gitlabClient,
CatfileCache: gsd.catfileCache,
DiskCache: gsd.diskCache,
PackObjectsCache: gsd.packObjectsCache,
PackObjectsLimiter: gsd.packObjectsLimiter,
LimitHandler: gsd.limitHandler,
RepositoryCounter: gsd.repositoryCounter,
UpdaterWithHooks: gsd.updaterWithHooks,
HousekeepingManager: gsd.housekeepingManager,
TransactionRegistry: gsd.transactionRegistry,
Node: node,
BackupSink: gsd.backupSink,
BackupLocator: gsd.backupLocator,
ProcReceiveRegistry: gsd.procReceiveRegistry,
BundleGenerationManager: gsd.bundleGenerationMgr,
LocalRepositoryFactory: gsd.localRepoFactory,
Logger: gsd.logger,
Cfg: cfg,
ClientPool: gsd.conns,
StorageLocator: gsd.locator,
TransactionManager: gsd.txMgr,
GitalyHookManager: gsd.hookMgr,
GitCmdFactory: gsd.gitCmdFactory,
BackchannelRegistry: gsd.backchannelReg,
GitlabClient: gsd.gitlabClient,
CatfileCache: gsd.catfileCache,
DiskCache: gsd.diskCache,
PackObjectsCache: gsd.packObjectsCache,
PackObjectsLimiter: gsd.packObjectsLimiter,
LimitHandler: gsd.limitHandler,
RepositoryCounter: gsd.repositoryCounter,
UpdaterWithHooks: gsd.updaterWithHooks,
HousekeepingManager: gsd.housekeepingManager,
TransactionRegistry: gsd.transactionRegistry,
Node: node,
BackupSink: gsd.backupSink,
BackupLocator: gsd.backupLocator,
ProcReceiveRegistry: gsd.procReceiveRegistry,
BundleURIManager: gsd.bundleURIManager,
LocalRepositoryFactory: gsd.localRepoFactory,
}
}
......@@ -588,10 +588,10 @@ func WithBackupLocator(backupLocator backup.Locator) GitalyServerOpt {
}
}
// WithBundleGenerationManager sets the bundleuri.Sink that will be used for Gitaly services
func WithBundleGenerationManager(mgr *bundleuri.GenerationManager) GitalyServerOpt {
// WithBundleURIManager sets the bundleuri.Sink that will be used for Gitaly services
func WithBundleURIManager(mgr *bundleuri.GenerationManager) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.bundleGenerationMgr = mgr
deps.bundleURIManager = mgr
return deps
}
}
......
......@@ -23,6 +23,9 @@ var (
"GO-2025-3367": {
GitLabIssueURL: "https://gitlab.com/gitlab-org/gitaly/-/issues/6575",
},
"GO-2025-3408": {
GitLabIssueURL: "https://gitlab.com/gitlab-org/gitaly/-/issues/6613",
},
}
outputPrologue = `
......