Skip to content
Snippets Groups Projects
Commit 0b543e75 authored by Patrick Steinhardt's avatar Patrick Steinhardt
Browse files

Revert "Merge branch 'sh-revert-3853' into 'master'"

This reverts commit f99dae3b, reversing
changes made to 15c4a53f.
parent bdd6fb3b
No related branches found
No related tags found
No related merge requests found
Showing
with 1078 additions and 342 deletions
......@@ -234,7 +234,6 @@ func New(ctx context.Context, cmd *exec.Cmd, stdin io.Reader, stdout, stderr io.
syscall.Kill(-process.Pid, syscall.SIGTERM)
}
command.Wait()
wg.Done()
}()
logPid = cmd.Process.Pid
......@@ -277,6 +276,15 @@ func (c *Command) wait() {
inFlightCommandGauge.Dec()
c.logProcessComplete()
// This is a bit out-of-place here given that the `wg.Add()` call is in `New()`.
// But in `New()`, we have to resort waiting on the context being finished until we
// would be able to decrement the number of in-flight commands. Given that in some
// cases we detach processes from their initial context such that they run in the
// background, this would cause us to take longer than necessary to decrease the
// wait group counter again. So we instead do it here to accelerate the process,
// even though it's less idiomatic.
wg.Done()
}
// ExitStatus will return the exit-code from an error returned by Wait().
......
......@@ -7,7 +7,6 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/labkit/correlation"
)
const (
......@@ -29,17 +28,69 @@ type Batch interface {
}
type batch struct {
sync.Mutex
*batchCheckProcess
*batchProcess
cancel func()
closed bool
*objectInfoReader
*objectReader
closedMutex sync.Mutex
closed bool
}
func newBatch(
ctx context.Context,
repo git.RepositoryExecutor,
counter *prometheus.CounterVec,
) (_ *batch, returnedErr error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.Batch")
go func() {
<-ctx.Done()
span.Finish()
}()
objectReader, err := newObjectReader(ctx, repo, counter)
if err != nil {
return nil, err
}
defer func() {
// If creation of the ObjectInfoReader fails, then we do not want to leak the
// ObjectReader process.
if returnedErr != nil {
objectReader.Close()
}
}()
objectInfoReader, err := newObjectInfoReader(ctx, repo, counter)
if err != nil {
return nil, err
}
return &batch{objectReader: objectReader, objectInfoReader: objectInfoReader}, nil
}
// Close closes the writers for objectInfoReader and objectReader. This is only used for cached
// Batches
func (c *batch) Close() {
c.closedMutex.Lock()
defer c.closedMutex.Unlock()
if c.closed {
return
}
c.closed = true
c.objectReader.Close()
c.objectInfoReader.Close()
}
func (c *batch) isClosed() bool {
c.closedMutex.Lock()
defer c.closedMutex.Unlock()
return c.closed
}
// Info returns an ObjectInfo if spec exists. If the revision does not exist
// the error is of type NotFoundError.
func (c *batch) Info(ctx context.Context, revision git.Revision) (*ObjectInfo, error) {
return c.batchCheckProcess.info(revision)
return c.objectInfoReader.info(ctx, revision)
}
// Tree returns a raw tree object. It is an error if the revision does not
......@@ -47,7 +98,7 @@ func (c *batch) Info(ctx context.Context, revision git.Revision) (*ObjectInfo, e
// the object type. Caller must consume the Reader before making another call
// on C.
func (c *batch) Tree(ctx context.Context, revision git.Revision) (*Object, error) {
return c.batchProcess.reader(revision, "tree")
return c.objectReader.reader(ctx, revision, "tree")
}
// Commit returns a raw commit object. It is an error if the revision does not
......@@ -55,7 +106,7 @@ func (c *batch) Tree(ctx context.Context, revision git.Revision) (*Object, error
// check the object type. Caller must consume the Reader before making another
// call on C.
func (c *batch) Commit(ctx context.Context, revision git.Revision) (*Object, error) {
return c.batchProcess.reader(revision, "commit")
return c.objectReader.reader(ctx, revision, "commit")
}
// Blob returns a reader for the requested blob. The entire blob must be
......@@ -64,158 +115,11 @@ func (c *batch) Commit(ctx context.Context, revision git.Revision) (*Object, err
// It is an error if the revision does not point to a blob. To prevent this,
// use Info to resolve the revision and check the object type.
func (c *batch) Blob(ctx context.Context, revision git.Revision) (*Object, error) {
return c.batchProcess.reader(revision, "blob")
return c.objectReader.reader(ctx, revision, "blob")
}
// Tag returns a raw tag object. Caller must consume the Reader before
// making another call on C.
func (c *batch) Tag(ctx context.Context, revision git.Revision) (*Object, error) {
return c.batchProcess.reader(revision, "tag")
}
// Close closes the writers for batchCheckProcess and batchProcess. This is only used for cached
// Batches
func (c *batch) Close() {
c.Lock()
defer c.Unlock()
if c.closed {
return
}
c.closed = true
if c.cancel != nil {
// both c.batchProcess and c.batchCheckProcess have goroutines that listen on
// ctx.Done() when this is cancelled, it will cause those goroutines to close both
// writers
c.cancel()
}
}
func (c *batch) isClosed() bool {
c.Lock()
defer c.Unlock()
return c.closed
}
type simulatedBatchSpawnError struct{}
func (simulatedBatchSpawnError) Error() string { return "simulated spawn error" }
func (bc *BatchCache) newBatch(ctx context.Context, repo git.RepositoryExecutor) (*batch, context.Context, error) {
var err error
// batch processes are long-lived and reused across RPCs,
// so we de-correlate the process from the RPC
ctx = correlation.ContextWithCorrelation(ctx, "")
ctx = opentracing.ContextWithSpan(ctx, nil)
span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.Batch")
ctx, cancel := context.WithCancel(ctx)
defer func() {
if err != nil {
cancel()
}
}()
go func() {
<-ctx.Done()
span.Finish()
}()
batchProcess, err := bc.newBatchProcess(ctx, repo)
if err != nil {
return nil, ctx, err
}
batchCheckProcess, err := bc.newBatchCheckProcess(ctx, repo)
if err != nil {
return nil, ctx, err
}
return &batch{batchProcess: batchProcess, batchCheckProcess: batchCheckProcess}, ctx, nil
}
func newInstrumentedBatch(ctx context.Context, c Batch, catfileLookupCounter *prometheus.CounterVec) Batch {
return &instrumentedBatch{
Batch: c,
catfileLookupCounter: catfileLookupCounter,
batchCtx: ctx,
}
}
// We maintain two contexts here: the one RPC-level one, and one batch-level one.
//
// The batchCtx tracks the lifetime of the long-running batch process, and is
// de-correlated from the RPC, as it is shared between many RPCs.
//
// We perform double accounting and re-correlation to get stats and traces per
// batch process.
type instrumentedBatch struct {
Batch
catfileLookupCounter *prometheus.CounterVec
batchCtx context.Context
}
func (ib *instrumentedBatch) Info(ctx context.Context, revision git.Revision) (*ObjectInfo, error) {
ctx, finish := ib.startSpan(ctx, "Batch.Info", revision)
defer finish()
ib.catfileLookupCounter.WithLabelValues("info").Inc()
return ib.Batch.Info(ctx, revision)
}
func (ib *instrumentedBatch) Tree(ctx context.Context, revision git.Revision) (*Object, error) {
ctx, finish := ib.startSpan(ctx, "Batch.Tree", revision)
defer finish()
ib.catfileLookupCounter.WithLabelValues("tree").Inc()
return ib.Batch.Tree(ctx, revision)
}
func (ib *instrumentedBatch) Commit(ctx context.Context, revision git.Revision) (*Object, error) {
ctx, finish := ib.startSpan(ctx, "Batch.Commit", revision)
defer finish()
ib.catfileLookupCounter.WithLabelValues("commit").Inc()
return ib.Batch.Commit(ctx, revision)
}
func (ib *instrumentedBatch) Blob(ctx context.Context, revision git.Revision) (*Object, error) {
ctx, finish := ib.startSpan(ctx, "Batch.Blob", revision)
defer finish()
ib.catfileLookupCounter.WithLabelValues("blob").Inc()
return ib.Batch.Blob(ctx, revision)
}
func (ib *instrumentedBatch) Tag(ctx context.Context, revision git.Revision) (*Object, error) {
ctx, finish := ib.startSpan(ctx, "Batch.Tag", revision)
defer finish()
ib.catfileLookupCounter.WithLabelValues("tag").Inc()
return ib.Batch.Tag(ctx, revision)
}
func (ib *instrumentedBatch) revisionTag(revision git.Revision) opentracing.Tag {
return opentracing.Tag{Key: "revision", Value: revision}
}
func (ib *instrumentedBatch) correlationIDTag(ctx context.Context) opentracing.Tag {
return opentracing.Tag{Key: "correlation_id", Value: correlation.ExtractFromContext(ctx)}
}
func (ib *instrumentedBatch) startSpan(ctx context.Context, methodName string, revision git.Revision) (context.Context, func()) {
span, ctx := opentracing.StartSpanFromContext(ctx, methodName, ib.revisionTag(revision))
span2, _ := opentracing.StartSpanFromContext(ib.batchCtx, methodName, ib.revisionTag(revision), ib.correlationIDTag(ctx))
return ctx, func() {
span.Finish()
span2.Finish()
}
return c.objectReader.reader(ctx, revision, "tag")
}
......@@ -6,11 +6,13 @@ import (
"sync"
"time"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/repository"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata"
"gitlab.com/gitlab-org/labkit/correlation"
)
const (
......@@ -32,14 +34,18 @@ type Cache interface {
Evict()
}
func newCacheKey(sessionID string, repo repository.GitRepo) key {
func newCacheKey(sessionID string, repo repository.GitRepo) (key, bool) {
if sessionID == "" {
return key{}, false
}
return key{
sessionID: sessionID,
repoStorage: repo.GetStorageName(),
repoRelPath: repo.GetRelativePath(),
repoObjDir: repo.GetGitObjectDirectory(),
repoAltDir: strings.Join(repo.GetGitAlternateObjectDirectories(), ","),
}
}, true
}
type key struct {
......@@ -65,9 +71,6 @@ type BatchCache struct {
maxLen int
// ttl is the fixed ttl for cache entries
ttl time.Duration
// injectSpawnErrors is used for testing purposes only. If set to true, then spawned batch
// processes will simulate spawn errors.
injectSpawnErrors bool
// monitorTicker is the ticker used for the monitoring Goroutine.
monitorTicker *time.Ticker
monitorDone chan interface{}
......@@ -80,6 +83,11 @@ type BatchCache struct {
entriesMutex sync.Mutex
entries []*entry
// cachedProcessDone is a condition that gets signalled whenever a process is being
// considered to be returned to the cache. This field is optional and must only be used in
// tests.
cachedProcessDone *sync.Cond
}
// NewCache creates a new catfile process cache.
......@@ -87,15 +95,6 @@ func NewCache(cfg config.Cfg) *BatchCache {
return newCache(defaultBatchfileTTL, cfg.Git.CatfileCacheSize, defaultEvictionInterval)
}
// Stop stops the monitoring Goroutine and evicts all cached processes. This must only be called
// once.
func (bc *BatchCache) Stop() {
bc.monitorTicker.Stop()
bc.monitorDone <- struct{}{}
<-bc.monitorDone
bc.Evict()
}
func newCache(ttl time.Duration, maxLen int, refreshInterval time.Duration) *BatchCache {
if maxLen <= 0 {
maxLen = defaultMaxLen
......@@ -170,47 +169,91 @@ func (bc *BatchCache) monitor() {
}
}
// Stop stops the monitoring Goroutine and evicts all cached processes. This must only be called
// once.
func (bc *BatchCache) Stop() {
bc.monitorTicker.Stop()
bc.monitorDone <- struct{}{}
<-bc.monitorDone
bc.Evict()
}
// BatchProcess creates a new Batch process for the given repository.
func (bc *BatchCache) BatchProcess(ctx context.Context, repo git.RepositoryExecutor) (Batch, error) {
if ctx.Done() == nil {
func (bc *BatchCache) BatchProcess(ctx context.Context, repo git.RepositoryExecutor) (_ Batch, returnedErr error) {
requestDone := ctx.Done()
if requestDone == nil {
panic("empty ctx.Done() in catfile.Batch.New()")
}
sessionID := metadata.GetValue(ctx, SessionIDField)
if sessionID == "" {
c, ctx, err := bc.newBatch(ctx, repo)
if err != nil {
return nil, err
cacheKey, isCacheable := newCacheKey(metadata.GetValue(ctx, SessionIDField), repo)
if isCacheable {
// We only try to look up cached batch processes in case it is cacheable, which
// requires a session ID. This is mostly done such that git-cat-file(1) processes
// from one user cannot interfer with those from another user. The main intent is to
// disallow trivial denial of service attacks against other users in case it is
// possible to poison the cache with broken git-cat-file(1) processes.
if c, ok := bc.checkout(cacheKey); ok {
go bc.returnWhenDone(requestDone, cacheKey, c)
return c, nil
}
return newInstrumentedBatch(ctx, c, bc.catfileLookupCounter), err
}
cacheKey := newCacheKey(sessionID, repo)
requestDone := ctx.Done()
if c, ok := bc.checkout(cacheKey); ok {
go bc.returnWhenDone(requestDone, cacheKey, c)
return newInstrumentedBatch(ctx, c, bc.catfileLookupCounter), nil
// We have not found any cached process, so we need to create a new one. In this
// case, we need to detach the process from the current context such that it does
// not get killed when the current context is done. Note that while we explicitly
// `Close()` processes in case this function fails, we must have a cancellable
// context or otherwise our `command` package will panic.
var cancel func()
ctx, cancel = context.WithCancel(context.Background())
defer func() {
if returnedErr != nil {
cancel()
}
}()
// We have to decorrelate the process from the current context given that it
// may potentially be reused across different RPC calls.
ctx = correlation.ContextWithCorrelation(ctx, "")
ctx = opentracing.ContextWithSpan(ctx, nil)
}
// if we are using caching, create a fresh context for the new batch
// and initialize the new batch with a bc key and cancel function
cacheCtx, cacheCancel := context.WithCancel(context.Background())
c, ctx, err := bc.newBatch(cacheCtx, repo)
c, err := newBatch(ctx, repo, bc.catfileLookupCounter)
if err != nil {
cacheCancel()
return nil, err
}
defer func() {
// If we somehow fail after creating a new Batch process, then we want to kill
// spawned processes right away.
if returnedErr != nil {
c.Close()
}
}()
bc.totalCatfileProcesses.Inc()
bc.currentCatfileProcesses.Inc()
go func() {
<-ctx.Done()
bc.currentCatfileProcesses.Dec()
}()
if isCacheable {
// If the process is cacheable, then we want to put the process into the cache when
// the current outer context is done.
go bc.returnWhenDone(requestDone, cacheKey, c)
}
c.cancel = cacheCancel
go bc.returnWhenDone(requestDone, cacheKey, c)
return newInstrumentedBatch(ctx, c, bc.catfileLookupCounter), nil
return c, nil
}
func (bc *BatchCache) returnWhenDone(done <-chan struct{}, cacheKey key, c *batch) {
<-done
if bc.cachedProcessDone != nil {
defer func() {
bc.cachedProcessDone.Broadcast()
}()
}
if c == nil || c.isClosed() {
return
}
......
package catfile
import (
"fmt"
"context"
"errors"
"io"
"os"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/repository"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/labkit/correlation"
"google.golang.org/grpc/metadata"
)
func TestCacheAdd(t *testing.T) {
func TestCache_add(t *testing.T) {
const maxLen = 3
bc := newCache(time.Hour, maxLen, defaultEvictionInterval)
key0 := testKey(0)
value0 := testValue()
cfg, repo, _ := testcfg.BuildWithRepo(t)
key0 := mustCreateKey(t, "0", repo)
value0 := mustCreateBatch(t, cfg, repo)
bc.add(key0, value0)
requireCacheValid(t, bc)
key1 := testKey(1)
bc.add(key1, testValue())
key1 := mustCreateKey(t, "1", repo)
bc.add(key1, mustCreateBatch(t, cfg, repo))
requireCacheValid(t, bc)
key2 := testKey(2)
bc.add(key2, testValue())
key2 := mustCreateKey(t, "2", repo)
bc.add(key2, mustCreateBatch(t, cfg, repo))
requireCacheValid(t, bc)
// Because maxLen is 3, and key0 is oldest, we expect that adding key3
// will kick out key0.
key3 := testKey(3)
bc.add(key3, testValue())
key3 := mustCreateKey(t, "3", repo)
bc.add(key3, mustCreateBatch(t, cfg, repo))
requireCacheValid(t, bc)
require.Equal(t, maxLen, bc.len(), "length should be maxLen")
require.True(t, value0.isClosed(), "value0 should be closed")
require.Equal(t, []key{key1, key2, key3}, keys(bc))
require.Equal(t, []key{key1, key2, key3}, keys(t, bc))
}
func TestCacheAddTwice(t *testing.T) {
func TestCache_addTwice(t *testing.T) {
bc := newCache(time.Hour, 10, defaultEvictionInterval)
key0 := testKey(0)
value0 := testValue()
cfg, repo, _ := testcfg.BuildWithRepo(t)
key0 := mustCreateKey(t, "0", repo)
value0 := mustCreateBatch(t, cfg, repo)
bc.add(key0, value0)
requireCacheValid(t, bc)
key1 := testKey(1)
bc.add(key1, testValue())
key1 := mustCreateKey(t, "1", repo)
value1 := mustCreateBatch(t, cfg, repo)
bc.add(key1, value1)
requireCacheValid(t, bc)
require.Equal(t, key0, bc.head().key, "key0 should be oldest key")
value2 := testValue()
value2 := mustCreateBatch(t, cfg, repo)
bc.add(key0, value2)
requireCacheValid(t, bc)
require.Equal(t, key1, bc.head().key, "key1 should be oldest key")
require.Equal(t, value2, bc.head().value)
require.Equal(t, value1, bc.head().value)
require.True(t, value0.isClosed(), "value0 should be closed")
}
func TestCacheCheckout(t *testing.T) {
func TestCache_checkout(t *testing.T) {
bc := newCache(time.Hour, 10, defaultEvictionInterval)
key0 := testKey(0)
value0 := testValue()
cfg, repo, _ := testcfg.BuildWithRepo(t)
key0 := mustCreateKey(t, "0", repo)
value0 := mustCreateBatch(t, cfg, repo)
bc.add(key0, value0)
v, ok := bc.checkout(key{sessionID: "foo"})
......@@ -85,31 +102,33 @@ func TestCacheCheckout(t *testing.T) {
require.Nil(t, v, "value from second checkout")
}
func TestCacheEnforceTTL(t *testing.T) {
func TestCache_enforceTTL(t *testing.T) {
ttl := time.Hour
bc := newCache(ttl, 10, defaultEvictionInterval)
cfg, repo, _ := testcfg.BuildWithRepo(t)
sleep := func() { time.Sleep(2 * time.Millisecond) }
key0 := testKey(0)
value0 := testValue()
key0 := mustCreateKey(t, "0", repo)
value0 := mustCreateBatch(t, cfg, repo)
bc.add(key0, value0)
sleep()
key1 := testKey(1)
value1 := testValue()
key1 := mustCreateKey(t, "1", repo)
value1 := mustCreateBatch(t, cfg, repo)
bc.add(key1, value1)
sleep()
cutoff := time.Now().Add(ttl)
sleep()
key2 := testKey(2)
bc.add(key2, testValue())
key2 := mustCreateKey(t, "2", repo)
bc.add(key2, mustCreateBatch(t, cfg, repo))
sleep()
key3 := testKey(3)
bc.add(key3, testValue())
key3 := mustCreateKey(t, "3", repo)
bc.add(key3, mustCreateBatch(t, cfg, repo))
sleep()
requireCacheValid(t, bc)
......@@ -123,40 +142,198 @@ func TestCacheEnforceTTL(t *testing.T) {
require.True(t, v.isClosed(), "value %d %v should be closed", i, v)
}
require.Equal(t, []key{key2, key3}, keys(bc), "remaining keys after EnforceTTL")
require.Equal(t, []key{key2, key3}, keys(t, bc), "remaining keys after EnforceTTL")
bc.enforceTTL(cutoff)
requireCacheValid(t, bc)
require.Equal(t, []key{key2, key3}, keys(bc), "remaining keys after second EnforceTTL")
require.Equal(t, []key{key2, key3}, keys(t, bc), "remaining keys after second EnforceTTL")
}
func TestAutoExpiry(t *testing.T) {
func TestCache_autoExpiry(t *testing.T) {
ttl := 5 * time.Millisecond
refresh := 1 * time.Millisecond
bc := newCache(ttl, 10, refresh)
key0 := testKey(0)
value0 := testValue()
cfg, repo, _ := testcfg.BuildWithRepo(t)
key0 := mustCreateKey(t, "0", repo)
value0 := mustCreateBatch(t, cfg, repo)
bc.add(key0, value0)
requireCacheValid(t, bc)
require.Contains(t, keys(bc), key0, "key should still be in map")
require.Contains(t, keys(t, bc), key0, "key should still be in map")
require.False(t, value0.isClosed(), "value should not have been closed")
// Wait for the monitor goroutine to do its thing
for i := 0; i < 100; i++ {
if len(keys(bc)) == 0 {
if len(keys(t, bc)) == 0 {
break
}
time.Sleep(refresh)
}
require.Empty(t, keys(bc), "key should no longer be in map")
require.Empty(t, keys(t, bc), "key should no longer be in map")
require.True(t, value0.isClosed(), "value should be closed after eviction")
}
func TestCache_BatchProcess(t *testing.T) {
cfg, repo, _ := testcfg.BuildWithRepo(t)
repoExecutor := newRepoExecutor(t, cfg, repo)
cache := newCache(time.Hour, 10, time.Hour)
defer cache.Evict()
cache.cachedProcessDone = sync.NewCond(&sync.Mutex{})
t.Run("uncancellable", func(t *testing.T) {
ctx := context.Background()
require.PanicsWithValue(t, "empty ctx.Done() in catfile.Batch.New()", func() {
_, _ = cache.BatchProcess(ctx, repoExecutor)
})
})
t.Run("uncacheable", func(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
ctx = correlation.ContextWithCorrelation(ctx, "1")
// The context doesn't carry a session ID and is thus uncacheable.
// The process should never get returned to the cache and must be
// killed on context cancellation.
batchProcess, err := cache.BatchProcess(ctx, repoExecutor)
require.NoError(t, err)
batch, ok := batchProcess.(*batch)
require.True(t, ok, "expected batch")
correlation := correlation.ExtractFromContext(batch.objectReader.creationCtx)
require.Equal(t, "1", correlation)
cancel()
// We're cheating a bit here to avoid creating a racy test by reaching into the
// batch processes and trying to read from their stdout. If the cancel did kill the
// process as expected, then the stdout should be closed and we'll get an EOF.
for _, reader := range []io.Reader{batch.objectInfoReader.cmd, batch.objectReader.cmd} {
output, err := io.ReadAll(reader)
if err != nil {
require.True(t, errors.Is(err, os.ErrClosed))
} else {
require.NoError(t, err)
}
require.Empty(t, output)
}
// This is another bug: while we do not have any resource leaks because processes
// got killed as expected, the batch itself is not considered to have been closed.
require.False(t, batch.isClosed())
require.Empty(t, keys(t, cache))
})
t.Run("cacheable", func(t *testing.T) {
defer cache.Evict()
ctx, cancel := testhelper.Context()
defer cancel()
ctx = correlation.ContextWithCorrelation(ctx, "1")
ctx = testhelper.MergeIncomingMetadata(ctx,
metadata.Pairs(SessionIDField, "1"),
)
batchProcess, err := cache.BatchProcess(ctx, repoExecutor)
require.NoError(t, err)
batch, ok := batchProcess.(*batch)
require.True(t, ok, "expected instrumented batch")
// The correlation ID must be empty given that this will be a cached long-running
// processes that can be reused across multpile RPC calls.
correlation := correlation.ExtractFromContext(batch.objectReader.creationCtx)
require.Empty(t, correlation)
// Cancel the context such that the process will be considered for return to the
// cache and wait for the cache to collect it.
cache.cachedProcessDone.L.Lock()
cancel()
defer cache.cachedProcessDone.L.Unlock()
cache.cachedProcessDone.Wait()
keys := keys(t, cache)
require.Equal(t, []key{{
sessionID: "1",
repoStorage: repo.GetStorageName(),
repoRelPath: repo.GetRelativePath(),
}}, keys)
// Assert that we can still read from the cached process.
_, err = batchProcess.Info(ctx, "refs/heads/master")
require.NoError(t, err)
})
t.Run("dirty process does not get cached", func(t *testing.T) {
defer cache.Evict()
ctx, cancel := testhelper.Context()
defer cancel()
ctx = testhelper.MergeIncomingMetadata(ctx,
metadata.Pairs(SessionIDField, "1"),
)
batchProcess, err := cache.BatchProcess(ctx, repoExecutor)
require.NoError(t, err)
// While we request object data, we do not consume it at all. The reader is thus
// dirty and cannot be reused and shouldn't be returned to the cache.
_, err = batchProcess.Commit(ctx, "refs/heads/master")
require.NoError(t, err)
// Cancel the context such that the process will be considered for return to the
// cache and wait for the cache to collect it.
cache.cachedProcessDone.L.Lock()
cancel()
defer cache.cachedProcessDone.L.Unlock()
cache.cachedProcessDone.Wait()
require.Empty(t, keys(t, cache))
// The process should be killed now.
_, err = batchProcess.Info(ctx, "refs/heads/master")
require.True(t, errors.Is(err, os.ErrClosed))
})
t.Run("closed process does not get cached", func(t *testing.T) {
defer cache.Evict()
ctx, cancel := testhelper.Context()
defer cancel()
ctx = testhelper.MergeIncomingMetadata(ctx,
metadata.Pairs(SessionIDField, "1"),
)
batchProcess, err := cache.BatchProcess(ctx, repoExecutor)
require.NoError(t, err)
batch, ok := batchProcess.(*batch)
require.True(t, ok, "expected batch")
// Closed processes naturally cannot be reused anymore and thus shouldn't ever get
// cached.
batch.Close()
// Cancel the context such that the process will be considered for return to the
// cache and wait for the cache to collect it.
cache.cachedProcessDone.L.Lock()
cancel()
defer cache.cachedProcessDone.L.Unlock()
cache.cachedProcessDone.Wait()
require.Empty(t, keys(t, cache))
})
}
func requireCacheValid(t *testing.T, bc *BatchCache) {
bc.entriesMutex.Lock()
defer bc.entriesMutex.Unlock()
......@@ -167,11 +344,30 @@ func requireCacheValid(t *testing.T, bc *BatchCache) {
}
}
func testValue() *batch { return &batch{} }
func mustCreateBatch(t *testing.T, cfg config.Cfg, repo repository.GitRepo) *batch {
t.Helper()
ctx, cancel := testhelper.Context()
t.Cleanup(cancel)
batch, err := newBatch(ctx, newRepoExecutor(t, cfg, repo), nil)
require.NoError(t, err)
return batch
}
func mustCreateKey(t *testing.T, sessionID string, repo repository.GitRepo) key {
t.Helper()
key, cacheable := newCacheKey(sessionID, repo)
require.True(t, cacheable)
return key
}
func testKey(i int) key { return key{sessionID: fmt.Sprintf("key-%d", i)} }
func keys(t *testing.T, bc *BatchCache) []key {
t.Helper()
func keys(bc *BatchCache) []key {
bc.entriesMutex.Lock()
defer bc.entriesMutex.Unlock()
......
package catfile
import (
"bufio"
"context"
"fmt"
"io"
"sync"
"github.com/opentracing/opentracing-go"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
)
// batchCheckProcess encapsulates a 'git cat-file --batch-check' process
type batchCheckProcess struct {
r *bufio.Reader
w io.WriteCloser
sync.Mutex
}
func (bc *BatchCache) newBatchCheckProcess(ctx context.Context, repo git.RepositoryExecutor) (*batchCheckProcess, error) {
process := &batchCheckProcess{}
var stdinReader io.Reader
stdinReader, process.w = io.Pipe()
span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.BatchCheckProcess")
batchCmd, err := repo.Exec(ctx,
git.SubCmd{
Name: "cat-file",
Flags: []git.Option{
git.Flag{Name: "--batch-check"},
},
},
git.WithStdin(stdinReader),
)
if err != nil {
return nil, err
}
process.r = bufio.NewReader(batchCmd)
go func() {
<-ctx.Done()
// This is crucial to prevent leaking file descriptors.
process.w.Close()
span.Finish()
}()
if bc.injectSpawnErrors {
// Testing only: intentionally leak process
return nil, &simulatedBatchSpawnError{}
}
return process, nil
}
func (bc *batchCheckProcess) info(revision git.Revision) (*ObjectInfo, error) {
bc.Lock()
defer bc.Unlock()
if _, err := fmt.Fprintln(bc.w, revision.String()); err != nil {
return nil, err
}
return ParseObjectInfo(bc.r)
}
......@@ -3,6 +3,7 @@ package catfile
import (
"bytes"
"context"
"fmt"
"io"
"os"
"os/exec"
......@@ -13,7 +14,6 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/command"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/repository"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
......@@ -22,30 +22,11 @@ import (
"google.golang.org/grpc/metadata"
)
type repoExecutor struct {
repository.GitRepo
gitCmdFactory git.CommandFactory
}
func (e *repoExecutor) Exec(ctx context.Context, cmd git.Cmd, opts ...git.CmdOpt) (*command.Command, error) {
return e.gitCmdFactory.New(ctx, e.GitRepo, cmd, opts...)
}
func (e *repoExecutor) ExecAndWait(ctx context.Context, cmd git.Cmd, opts ...git.CmdOpt) error {
command, err := e.Exec(ctx, cmd, opts...)
if err != nil {
return err
}
return command.Wait()
}
func setupBatch(t *testing.T, ctx context.Context) (config.Cfg, Batch, *gitalypb.Repository) {
t.Helper()
cfg, repo, _ := testcfg.BuildWithRepo(t)
repoExecutor := &repoExecutor{
GitRepo: repo, gitCmdFactory: git.NewExecCommandFactory(cfg),
}
repoExecutor := newRepoExecutor(t, cfg, repo)
cache := newCache(1*time.Hour, 1000, defaultEvictionInterval)
batch, err := cache.BatchProcess(ctx, repoExecutor)
......@@ -346,6 +327,14 @@ func TestRepeatedCalls(t *testing.T) {
require.Equal(t, string(treeBytes), string(tree2))
}
type failingTestRepoExecutor struct {
git.RepositoryExecutor
}
func (e failingTestRepoExecutor) Exec(context.Context, git.Cmd, ...git.CmdOpt) (*command.Command, error) {
return nil, fmt.Errorf("simulated error")
}
func TestSpawnFailure(t *testing.T) {
cfg, testRepo, _ := testcfg.BuildWithRepo(t)
testRepoExecutor := &repoExecutor{
......@@ -391,10 +380,12 @@ func TestSpawnFailure(t *testing.T) {
ctx2, cancel2 := testhelper.Context()
defer cancel2()
cache.injectSpawnErrors = true
_, err = catfileWithFreshSessionID(ctx2, cache, testRepoExecutor)
failingTestRepoExecutor := failingTestRepoExecutor{
RepositoryExecutor: testRepoExecutor,
}
_, err = catfileWithFreshSessionID(ctx2, cache, failingTestRepoExecutor)
require.Error(t, err, "expect simulated error")
require.IsType(t, &simulatedBatchSpawnError{}, err)
require.EqualError(t, err, "simulated error")
require.True(
t,
......
package catfile
import (
"io"
)
// Object represents data returned by `git cat-file --batch`
type Object struct {
// ObjectInfo represents main information about object
ObjectInfo
// Reader provides raw data about object. It differs for each type of object(tag, commit, tree, log, etc.)
io.Reader
}
......@@ -2,10 +2,15 @@ package catfile
import (
"bufio"
"context"
"fmt"
"strconv"
"strings"
"sync"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v14/internal/command"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
)
......@@ -63,3 +68,76 @@ func ParseObjectInfo(stdout *bufio.Reader) (*ObjectInfo, error) {
Size: objectSize,
}, nil
}
// objectInfoReader is a reader for Git object information. This reader is implemented via a
// long-lived `git cat-file --batch-check` process such that we do not have to spawn a separate
// process per object info we're about to read.
type objectInfoReader struct {
cmd *command.Command
stdout *bufio.Reader
sync.Mutex
// creationCtx is the context in which this reader has been created. This context may
// potentially be decorrelated from the "real" RPC context in case the reader is going to be
// cached.
creationCtx context.Context
counter *prometheus.CounterVec
}
func newObjectInfoReader(
ctx context.Context,
repo git.RepositoryExecutor,
counter *prometheus.CounterVec,
) (*objectInfoReader, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.ObjectInfoReader")
batchCmd, err := repo.Exec(ctx,
git.SubCmd{
Name: "cat-file",
Flags: []git.Option{
git.Flag{Name: "--batch-check"},
},
},
git.WithStdin(command.SetupStdin),
)
if err != nil {
return nil, err
}
objectInfoReader := &objectInfoReader{
cmd: batchCmd,
stdout: bufio.NewReader(batchCmd),
creationCtx: ctx,
counter: counter,
}
go func() {
<-ctx.Done()
// This is crucial to prevent leaking file descriptors.
objectInfoReader.Close()
span.Finish()
}()
return objectInfoReader, nil
}
func (o *objectInfoReader) Close() {
_ = o.cmd.Wait()
}
func (o *objectInfoReader) info(ctx context.Context, revision git.Revision) (*ObjectInfo, error) {
finish := startSpan(o.creationCtx, ctx, "Batch.Info", revision)
defer finish()
if o.counter != nil {
o.counter.WithLabelValues("info").Inc()
}
o.Lock()
defer o.Unlock()
if _, err := fmt.Fprintln(o.cmd, revision.String()); err != nil {
return nil, err
}
return ParseObjectInfo(o.stdout)
}
......@@ -2,10 +2,18 @@ package catfile
import (
"bufio"
"fmt"
"strings"
"testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
)
func TestParseObjectInfoSuccess(t *testing.T) {
......@@ -66,3 +74,91 @@ func TestParseObjectInfoErrors(t *testing.T) {
})
}
}
func TestObjectInfoReader(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
cfg, repoProto, repoPath := testcfg.BuildWithRepo(t)
oiByRevision := make(map[string]*ObjectInfo)
for _, revision := range []string{
"refs/heads/master",
"refs/heads/master^{tree}",
"refs/heads/master:README",
"refs/tags/v1.1.1",
} {
revParseOutput := gittest.Exec(t, cfg, "-C", repoPath, "rev-parse", revision)
objectID, err := git.NewObjectIDFromHex(text.ChompBytes(revParseOutput))
require.NoError(t, err)
objectType := text.ChompBytes(gittest.Exec(t, cfg, "-C", repoPath, "cat-file", "-t", revision))
objectContents := gittest.Exec(t, cfg, "-C", repoPath, "cat-file", objectType, revision)
oiByRevision[revision] = &ObjectInfo{
Oid: objectID,
Type: objectType,
Size: int64(len(objectContents)),
}
}
for _, tc := range []struct {
desc string
revision git.Revision
expectedErr error
expectedInfo *ObjectInfo
}{
{
desc: "commit by ref",
revision: "refs/heads/master",
expectedInfo: oiByRevision["refs/heads/master"],
},
{
desc: "commit by ID",
revision: oiByRevision["refs/heads/master"].Oid.Revision(),
expectedInfo: oiByRevision["refs/heads/master"],
},
{
desc: "tree",
revision: oiByRevision["refs/heads/master^{tree}"].Oid.Revision(),
expectedInfo: oiByRevision["refs/heads/master^{tree}"],
},
{
desc: "blob",
revision: oiByRevision["refs/heads/master:README"].Oid.Revision(),
expectedInfo: oiByRevision["refs/heads/master:README"],
},
{
desc: "tag",
revision: oiByRevision["refs/tags/v1.1.1"].Oid.Revision(),
expectedInfo: oiByRevision["refs/tags/v1.1.1"],
},
{
desc: "nonexistent ref",
revision: "refs/heads/does-not-exist",
expectedErr: NotFoundError{fmt.Errorf("object not found")},
},
} {
t.Run(tc.desc, func(t *testing.T) {
counter := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"type"})
reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), counter)
require.NoError(t, err)
require.Equal(t, float64(0), testutil.ToFloat64(counter.WithLabelValues("info")))
info, err := reader.info(ctx, tc.revision)
require.Equal(t, tc.expectedErr, err)
require.Equal(t, tc.expectedInfo, info)
require.Equal(t, float64(1), testutil.ToFloat64(counter.WithLabelValues("info")))
// Verify that we do another request no matter whether the previous call
// succeeded or failed.
_, err = reader.info(ctx, "refs/heads/master")
require.NoError(t, err)
require.Equal(t, float64(2), testutil.ToFloat64(counter.WithLabelValues("info")))
})
}
}
......@@ -8,13 +8,25 @@ import (
"sync"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v14/internal/command"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
)
// batch encapsulates a 'git cat-file --batch' process
type batchProcess struct {
r *bufio.Reader
w io.WriteCloser
// Object represents data returned by `git cat-file --batch`
type Object struct {
// ObjectInfo represents main information about object
ObjectInfo
// Reader provides raw data about object. It differs for each type of object(tag, commit, tree, log, etc.)
io.Reader
}
// objectReader is a reader for Git objects. Reading is implemented via a long-lived `git cat-file
// --batch` process such that we do not have to spawn a new process for each object we are about to
// read.
type objectReader struct {
cmd *command.Command
stdout *bufio.Reader
// n is a state machine that tracks how much data we still have to read
// from r. Legal states are: n==0, this means we can do a new request on
......@@ -28,16 +40,20 @@ type batchProcess struct {
// instead of doing unsafe memory writes (to n) and failing in some
// unpredictable way.
sync.Mutex
}
func (bc *BatchCache) newBatchProcess(ctx context.Context, repo git.RepositoryExecutor) (*batchProcess, error) {
bc.totalCatfileProcesses.Inc()
b := &batchProcess{}
var stdinReader io.Reader
stdinReader, b.w = io.Pipe()
// creationCtx is the context in which this reader has been created. This context may
// potentially be decorrelated from the "real" RPC context in case the reader is going to be
// cached.
creationCtx context.Context
counter *prometheus.CounterVec
}
span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.BatchProcess")
func newObjectReader(
ctx context.Context,
repo git.RepositoryExecutor,
counter *prometheus.CounterVec,
) (*objectReader, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.ObjectReader")
batchCmd, err := repo.Exec(ctx,
git.SubCmd{
......@@ -46,102 +62,113 @@ func (bc *BatchCache) newBatchProcess(ctx context.Context, repo git.RepositoryEx
git.Flag{Name: "--batch"},
},
},
git.WithStdin(stdinReader),
git.WithStdin(command.SetupStdin),
)
if err != nil {
return nil, err
}
b.r = bufio.NewReader(batchCmd)
bc.currentCatfileProcesses.Inc()
objectReader := &objectReader{
cmd: batchCmd,
stdout: bufio.NewReader(batchCmd),
creationCtx: ctx,
counter: counter,
}
go func() {
<-ctx.Done()
// This Close() is crucial to prevent leaking file descriptors.
b.w.Close()
bc.currentCatfileProcesses.Dec()
objectReader.Close()
span.Finish()
}()
if bc.injectSpawnErrors {
// Testing only: intentionally leak process
return nil, &simulatedBatchSpawnError{}
}
return objectReader, nil
}
return b, nil
func (o *objectReader) Close() {
_ = o.cmd.Wait()
}
func (b *batchProcess) reader(revision git.Revision, expectedType string) (*Object, error) {
b.Lock()
defer b.Unlock()
func (o *objectReader) reader(
ctx context.Context,
revision git.Revision,
expectedType string,
) (*Object, error) {
finish := startSpan(o.creationCtx, ctx, fmt.Sprintf("Batch.Object(%s)", expectedType), revision)
defer finish()
if o.counter != nil {
o.counter.WithLabelValues(expectedType).Inc()
}
o.Lock()
defer o.Unlock()
if b.n == 1 {
if o.n == 1 {
// Consume linefeed
if _, err := b.r.ReadByte(); err != nil {
if _, err := o.stdout.ReadByte(); err != nil {
return nil, err
}
b.n--
o.n--
}
if b.n != 0 {
return nil, fmt.Errorf("cannot create new Object: batch contains %d unread bytes", b.n)
if o.n != 0 {
return nil, fmt.Errorf("cannot create new Object: batch contains %d unread bytes", o.n)
}
if _, err := fmt.Fprintln(b.w, revision.String()); err != nil {
if _, err := fmt.Fprintln(o.cmd, revision.String()); err != nil {
return nil, err
}
oi, err := ParseObjectInfo(b.r)
oi, err := ParseObjectInfo(o.stdout)
if err != nil {
return nil, err
}
b.n = oi.Size + 1
o.n = oi.Size + 1
if oi.Type != expectedType {
// This is a programmer error and it should never happen. But if it does,
// we need to leave the cat-file process in a good state
if _, err := io.CopyN(io.Discard, b.r, b.n); err != nil {
if _, err := io.CopyN(io.Discard, o.stdout, o.n); err != nil {
return nil, err
}
b.n = 0
o.n = 0
return nil, NotFoundError{error: fmt.Errorf("expected %s to be a %s, got %s", oi.Oid, expectedType, oi.Type)}
}
return &Object{
ObjectInfo: *oi,
Reader: &batchReader{
batchProcess: b,
r: io.LimitReader(b.r, oi.Size),
Reader: &objectDataReader{
objectReader: o,
r: io.LimitReader(o.stdout, oi.Size),
},
}, nil
}
func (b *batchProcess) consume(nBytes int) {
b.n -= int64(nBytes)
if b.n < 1 {
func (o *objectReader) consume(nBytes int) {
o.n -= int64(nBytes)
if o.n < 1 {
panic("too many bytes read from batch")
}
}
func (b *batchProcess) hasUnreadData() bool {
b.Lock()
defer b.Unlock()
func (o *objectReader) hasUnreadData() bool {
o.Lock()
defer o.Unlock()
return b.n > 1
return o.n > 1
}
type batchReader struct {
*batchProcess
type objectDataReader struct {
*objectReader
r io.Reader
}
func (br *batchReader) Read(p []byte) (int, error) {
br.batchProcess.Lock()
defer br.batchProcess.Unlock()
func (o *objectDataReader) Read(p []byte) (int, error) {
o.objectReader.Lock()
defer o.objectReader.Unlock()
n, err := br.r.Read(p)
br.batchProcess.consume(n)
n, err := o.r.Read(p)
o.objectReader.consume(n)
return n, err
}
package catfile
import (
"fmt"
"io"
"testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
)
func TestObjectReader_reader(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
cfg, repoProto, repoPath := testcfg.BuildWithRepo(t)
commitID, err := git.NewObjectIDFromHex(text.ChompBytes(gittest.Exec(t, cfg, "-C", repoPath, "rev-parse", "refs/heads/master")))
require.NoError(t, err)
commitContents := gittest.Exec(t, cfg, "-C", repoPath, "cat-file", "-p", "refs/heads/master")
t.Run("read existing object by ref", func(t *testing.T) {
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
object, err := reader.reader(ctx, "refs/heads/master", "commit")
require.NoError(t, err)
data, err := io.ReadAll(object)
require.NoError(t, err)
require.Equal(t, commitContents, data)
})
t.Run("read existing object by object ID", func(t *testing.T) {
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
object, err := reader.reader(ctx, commitID.Revision(), "commit")
require.NoError(t, err)
data, err := io.ReadAll(object)
require.NoError(t, err)
require.Contains(t, string(data), "Merge branch 'cherry-pick-ce369011' into 'master'\n")
})
t.Run("read commit with wrong type", func(t *testing.T) {
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
_, err = reader.reader(ctx, commitID.Revision(), "tag")
require.EqualError(t, err, fmt.Sprintf("expected %s to be a tag, got commit", commitID))
// Verify that we're still able to read a commit after the previous read has failed.
object, err := reader.reader(ctx, commitID.Revision(), "commit")
require.NoError(t, err)
data, err := io.ReadAll(object)
require.NoError(t, err)
require.Equal(t, commitContents, data)
})
t.Run("read missing ref", func(t *testing.T) {
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
_, err = reader.reader(ctx, "refs/heads/does-not-exist", "commit")
require.EqualError(t, err, "object not found")
// Verify that we're still able to read a commit after the previous read has failed.
object, err := reader.reader(ctx, commitID.Revision(), "commit")
require.NoError(t, err)
data, err := io.ReadAll(object)
require.NoError(t, err)
require.Equal(t, commitContents, data)
})
t.Run("read fails when not consuming previous object", func(t *testing.T) {
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
_, err = reader.reader(ctx, commitID.Revision(), "commit")
require.NoError(t, err)
// We haven't yet consumed the previous object, so this must now fail.
_, err = reader.reader(ctx, commitID.Revision(), "commit")
require.EqualError(t, err, fmt.Sprintf("cannot create new Object: batch contains %d unread bytes", len(commitContents)+1))
})
t.Run("read fails when partially consuming previous object", func(t *testing.T) {
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
object, err := reader.reader(ctx, commitID.Revision(), "commit")
require.NoError(t, err)
_, err = io.CopyN(io.Discard, object, 100)
require.NoError(t, err)
// We haven't yet consumed the previous object, so this must now fail.
_, err = reader.reader(ctx, commitID.Revision(), "commit")
require.EqualError(t, err, fmt.Sprintf("cannot create new Object: batch contains %d unread bytes", len(commitContents)-100+1))
})
t.Run("read increments Prometheus counter", func(t *testing.T) {
counter := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"type"})
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), counter)
require.NoError(t, err)
for objectType, revision := range map[string]git.Revision{
"commit": "refs/heads/master",
"tree": "refs/heads/master^{tree}",
"blob": "refs/heads/master:README",
"tag": "refs/tags/v1.1.1",
} {
require.Equal(t, float64(0), testutil.ToFloat64(counter.WithLabelValues(objectType)))
object, err := reader.reader(ctx, revision, objectType)
require.NoError(t, err)
require.Equal(t, float64(1), testutil.ToFloat64(counter.WithLabelValues(objectType)))
_, err = io.Copy(io.Discard, object)
require.NoError(t, err)
}
})
}
package catfile
import (
"context"
"os"
"testing"
"gitlab.com/gitlab-org/gitaly/v14/internal/command"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/repository"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
)
......@@ -17,3 +22,27 @@ func testMain(m *testing.M) int {
defer cleanup()
return m.Run()
}
type repoExecutor struct {
repository.GitRepo
gitCmdFactory git.CommandFactory
}
func newRepoExecutor(t *testing.T, cfg config.Cfg, repo repository.GitRepo) git.RepositoryExecutor {
return &repoExecutor{
GitRepo: repo,
gitCmdFactory: git.NewExecCommandFactory(cfg),
}
}
func (e *repoExecutor) Exec(ctx context.Context, cmd git.Cmd, opts ...git.CmdOpt) (*command.Command, error) {
return e.gitCmdFactory.New(ctx, e.GitRepo, cmd, opts...)
}
func (e *repoExecutor) ExecAndWait(ctx context.Context, cmd git.Cmd, opts ...git.CmdOpt) error {
command, err := e.Exec(ctx, cmd, opts...)
if err != nil {
return err
}
return command.Wait()
}
package catfile
import (
"context"
"github.com/opentracing/opentracing-go"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/labkit/correlation"
)
func revisionTag(revision git.Revision) opentracing.Tag {
return opentracing.Tag{Key: "revision", Value: revision}
}
func correlationIDTag(ctx context.Context) opentracing.Tag {
return opentracing.Tag{Key: "correlation_id", Value: correlation.ExtractFromContext(ctx)}
}
func startSpan(innerCtx context.Context, outerCtx context.Context, methodName string, revision git.Revision) func() {
innerSpan, ctx := opentracing.StartSpanFromContext(innerCtx, methodName, revisionTag(revision))
outerSpan, _ := opentracing.StartSpanFromContext(outerCtx, methodName, revisionTag(revision), correlationIDTag(ctx))
return func() {
innerSpan.Finish()
outerSpan.Finish()
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment