Skip to content
Snippets Groups Projects
Commit f99dae3b authored by Patrick Steinhardt's avatar Patrick Steinhardt
Browse files

Merge branch 'sh-revert-3853' into 'master'

Revert catfile cache refactor

See merge request !3902
parents 15c4a53f 5d47b529
No related branches found
No related tags found
1 merge request!3902Revert catfile cache refactor
Pipeline #374952320 passed
Showing
with 622 additions and 597 deletions
......@@ -234,6 +234,7 @@ func New(ctx context.Context, cmd *exec.Cmd, stdin io.Reader, stdout, stderr io.
syscall.Kill(-process.Pid, syscall.SIGTERM)
}
command.Wait()
wg.Done()
}()
logPid = cmd.Process.Pid
......@@ -276,15 +277,6 @@ func (c *Command) wait() {
inFlightCommandGauge.Dec()
c.logProcessComplete()
// This is a bit out-of-place here given that the `wg.Add()` call is in `New()`.
// But in `New()`, we have to resort waiting on the context being finished until we
// would be able to decrement the number of in-flight commands. Given that in some
// cases we detach processes from their initial context such that they run in the
// background, this would cause us to take longer than necessary to decrease the
// wait group counter again. So we instead do it here to accelerate the process,
// even though it's less idiomatic.
wg.Done()
}
// ExitStatus will return the exit-code from an error returned by Wait().
......
......@@ -7,6 +7,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/labkit/correlation"
)
const (
......@@ -28,69 +29,17 @@ type Batch interface {
}
type batch struct {
*objectInfoReader
*objectReader
closedMutex sync.Mutex
closed bool
}
func newBatch(
ctx context.Context,
repo git.RepositoryExecutor,
counter *prometheus.CounterVec,
) (_ *batch, returnedErr error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.Batch")
go func() {
<-ctx.Done()
span.Finish()
}()
objectReader, err := newObjectReader(ctx, repo, counter)
if err != nil {
return nil, err
}
defer func() {
// If creation of the ObjectInfoReader fails, then we do not want to leak the
// ObjectReader process.
if returnedErr != nil {
objectReader.Close()
}
}()
objectInfoReader, err := newObjectInfoReader(ctx, repo, counter)
if err != nil {
return nil, err
}
return &batch{objectReader: objectReader, objectInfoReader: objectInfoReader}, nil
}
// Close closes the writers for objectInfoReader and objectReader. This is only used for cached
// Batches
func (c *batch) Close() {
c.closedMutex.Lock()
defer c.closedMutex.Unlock()
if c.closed {
return
}
c.closed = true
c.objectReader.Close()
c.objectInfoReader.Close()
}
func (c *batch) isClosed() bool {
c.closedMutex.Lock()
defer c.closedMutex.Unlock()
return c.closed
sync.Mutex
*batchCheckProcess
*batchProcess
cancel func()
closed bool
}
// Info returns an ObjectInfo if spec exists. If the revision does not exist
// the error is of type NotFoundError.
func (c *batch) Info(ctx context.Context, revision git.Revision) (*ObjectInfo, error) {
return c.objectInfoReader.info(ctx, revision)
return c.batchCheckProcess.info(revision)
}
// Tree returns a raw tree object. It is an error if the revision does not
......@@ -98,7 +47,7 @@ func (c *batch) Info(ctx context.Context, revision git.Revision) (*ObjectInfo, e
// the object type. Caller must consume the Reader before making another call
// on C.
func (c *batch) Tree(ctx context.Context, revision git.Revision) (*Object, error) {
return c.objectReader.reader(ctx, revision, "tree")
return c.batchProcess.reader(revision, "tree")
}
// Commit returns a raw commit object. It is an error if the revision does not
......@@ -106,7 +55,7 @@ func (c *batch) Tree(ctx context.Context, revision git.Revision) (*Object, error
// check the object type. Caller must consume the Reader before making another
// call on C.
func (c *batch) Commit(ctx context.Context, revision git.Revision) (*Object, error) {
return c.objectReader.reader(ctx, revision, "commit")
return c.batchProcess.reader(revision, "commit")
}
// Blob returns a reader for the requested blob. The entire blob must be
......@@ -115,11 +64,158 @@ func (c *batch) Commit(ctx context.Context, revision git.Revision) (*Object, err
// It is an error if the revision does not point to a blob. To prevent this,
// use Info to resolve the revision and check the object type.
func (c *batch) Blob(ctx context.Context, revision git.Revision) (*Object, error) {
return c.objectReader.reader(ctx, revision, "blob")
return c.batchProcess.reader(revision, "blob")
}
// Tag returns a raw tag object. Caller must consume the Reader before
// making another call on C.
func (c *batch) Tag(ctx context.Context, revision git.Revision) (*Object, error) {
return c.objectReader.reader(ctx, revision, "tag")
return c.batchProcess.reader(revision, "tag")
}
// Close closes the writers for batchCheckProcess and batchProcess. This is only used for cached
// Batches
func (c *batch) Close() {
c.Lock()
defer c.Unlock()
if c.closed {
return
}
c.closed = true
if c.cancel != nil {
// both c.batchProcess and c.batchCheckProcess have goroutines that listen on
// ctx.Done() when this is cancelled, it will cause those goroutines to close both
// writers
c.cancel()
}
}
func (c *batch) isClosed() bool {
c.Lock()
defer c.Unlock()
return c.closed
}
type simulatedBatchSpawnError struct{}
func (simulatedBatchSpawnError) Error() string { return "simulated spawn error" }
func (bc *BatchCache) newBatch(ctx context.Context, repo git.RepositoryExecutor) (*batch, context.Context, error) {
var err error
// batch processes are long-lived and reused across RPCs,
// so we de-correlate the process from the RPC
ctx = correlation.ContextWithCorrelation(ctx, "")
ctx = opentracing.ContextWithSpan(ctx, nil)
span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.Batch")
ctx, cancel := context.WithCancel(ctx)
defer func() {
if err != nil {
cancel()
}
}()
go func() {
<-ctx.Done()
span.Finish()
}()
batchProcess, err := bc.newBatchProcess(ctx, repo)
if err != nil {
return nil, ctx, err
}
batchCheckProcess, err := bc.newBatchCheckProcess(ctx, repo)
if err != nil {
return nil, ctx, err
}
return &batch{batchProcess: batchProcess, batchCheckProcess: batchCheckProcess}, ctx, nil
}
func newInstrumentedBatch(ctx context.Context, c Batch, catfileLookupCounter *prometheus.CounterVec) Batch {
return &instrumentedBatch{
Batch: c,
catfileLookupCounter: catfileLookupCounter,
batchCtx: ctx,
}
}
// We maintain two contexts here: the one RPC-level one, and one batch-level one.
//
// The batchCtx tracks the lifetime of the long-running batch process, and is
// de-correlated from the RPC, as it is shared between many RPCs.
//
// We perform double accounting and re-correlation to get stats and traces per
// batch process.
type instrumentedBatch struct {
Batch
catfileLookupCounter *prometheus.CounterVec
batchCtx context.Context
}
func (ib *instrumentedBatch) Info(ctx context.Context, revision git.Revision) (*ObjectInfo, error) {
ctx, finish := ib.startSpan(ctx, "Batch.Info", revision)
defer finish()
ib.catfileLookupCounter.WithLabelValues("info").Inc()
return ib.Batch.Info(ctx, revision)
}
func (ib *instrumentedBatch) Tree(ctx context.Context, revision git.Revision) (*Object, error) {
ctx, finish := ib.startSpan(ctx, "Batch.Tree", revision)
defer finish()
ib.catfileLookupCounter.WithLabelValues("tree").Inc()
return ib.Batch.Tree(ctx, revision)
}
func (ib *instrumentedBatch) Commit(ctx context.Context, revision git.Revision) (*Object, error) {
ctx, finish := ib.startSpan(ctx, "Batch.Commit", revision)
defer finish()
ib.catfileLookupCounter.WithLabelValues("commit").Inc()
return ib.Batch.Commit(ctx, revision)
}
func (ib *instrumentedBatch) Blob(ctx context.Context, revision git.Revision) (*Object, error) {
ctx, finish := ib.startSpan(ctx, "Batch.Blob", revision)
defer finish()
ib.catfileLookupCounter.WithLabelValues("blob").Inc()
return ib.Batch.Blob(ctx, revision)
}
func (ib *instrumentedBatch) Tag(ctx context.Context, revision git.Revision) (*Object, error) {
ctx, finish := ib.startSpan(ctx, "Batch.Tag", revision)
defer finish()
ib.catfileLookupCounter.WithLabelValues("tag").Inc()
return ib.Batch.Tag(ctx, revision)
}
func (ib *instrumentedBatch) revisionTag(revision git.Revision) opentracing.Tag {
return opentracing.Tag{Key: "revision", Value: revision}
}
func (ib *instrumentedBatch) correlationIDTag(ctx context.Context) opentracing.Tag {
return opentracing.Tag{Key: "correlation_id", Value: correlation.ExtractFromContext(ctx)}
}
func (ib *instrumentedBatch) startSpan(ctx context.Context, methodName string, revision git.Revision) (context.Context, func()) {
span, ctx := opentracing.StartSpanFromContext(ctx, methodName, ib.revisionTag(revision))
span2, _ := opentracing.StartSpanFromContext(ib.batchCtx, methodName, ib.revisionTag(revision), ib.correlationIDTag(ctx))
return ctx, func() {
span.Finish()
span2.Finish()
}
}
......@@ -6,13 +6,11 @@ import (
"sync"
"time"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/repository"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata"
"gitlab.com/gitlab-org/labkit/correlation"
)
const (
......@@ -34,18 +32,14 @@ type Cache interface {
Evict()
}
func newCacheKey(sessionID string, repo repository.GitRepo) (key, bool) {
if sessionID == "" {
return key{}, false
}
func newCacheKey(sessionID string, repo repository.GitRepo) key {
return key{
sessionID: sessionID,
repoStorage: repo.GetStorageName(),
repoRelPath: repo.GetRelativePath(),
repoObjDir: repo.GetGitObjectDirectory(),
repoAltDir: strings.Join(repo.GetGitAlternateObjectDirectories(), ","),
}, true
}
}
type key struct {
......@@ -71,6 +65,9 @@ type BatchCache struct {
maxLen int
// ttl is the fixed ttl for cache entries
ttl time.Duration
// injectSpawnErrors is used for testing purposes only. If set to true, then spawned batch
// processes will simulate spawn errors.
injectSpawnErrors bool
// monitorTicker is the ticker used for the monitoring Goroutine.
monitorTicker *time.Ticker
monitorDone chan interface{}
......@@ -83,11 +80,6 @@ type BatchCache struct {
entriesMutex sync.Mutex
entries []*entry
// cachedProcessDone is a condition that gets signalled whenever a process is being
// considered to be returned to the cache. This field is optional and must only be used in
// tests.
cachedProcessDone *sync.Cond
}
// NewCache creates a new catfile process cache.
......@@ -95,6 +87,15 @@ func NewCache(cfg config.Cfg) *BatchCache {
return newCache(defaultBatchfileTTL, cfg.Git.CatfileCacheSize, defaultEvictionInterval)
}
// Stop stops the monitoring Goroutine and evicts all cached processes. This must only be called
// once.
func (bc *BatchCache) Stop() {
bc.monitorTicker.Stop()
bc.monitorDone <- struct{}{}
<-bc.monitorDone
bc.Evict()
}
func newCache(ttl time.Duration, maxLen int, refreshInterval time.Duration) *BatchCache {
if maxLen <= 0 {
maxLen = defaultMaxLen
......@@ -169,91 +170,47 @@ func (bc *BatchCache) monitor() {
}
}
// Stop stops the monitoring Goroutine and evicts all cached processes. This must only be called
// once.
func (bc *BatchCache) Stop() {
bc.monitorTicker.Stop()
bc.monitorDone <- struct{}{}
<-bc.monitorDone
bc.Evict()
}
// BatchProcess creates a new Batch process for the given repository.
func (bc *BatchCache) BatchProcess(ctx context.Context, repo git.RepositoryExecutor) (_ Batch, returnedErr error) {
requestDone := ctx.Done()
if requestDone == nil {
func (bc *BatchCache) BatchProcess(ctx context.Context, repo git.RepositoryExecutor) (Batch, error) {
if ctx.Done() == nil {
panic("empty ctx.Done() in catfile.Batch.New()")
}
cacheKey, isCacheable := newCacheKey(metadata.GetValue(ctx, SessionIDField), repo)
if isCacheable {
// We only try to look up cached batch processes in case it is cacheable, which
// requires a session ID. This is mostly done such that git-cat-file(1) processes
// from one user cannot interfer with those from another user. The main intent is to
// disallow trivial denial of service attacks against other users in case it is
// possible to poison the cache with broken git-cat-file(1) processes.
if c, ok := bc.checkout(cacheKey); ok {
go bc.returnWhenDone(requestDone, cacheKey, c)
return c, nil
sessionID := metadata.GetValue(ctx, SessionIDField)
if sessionID == "" {
c, ctx, err := bc.newBatch(ctx, repo)
if err != nil {
return nil, err
}
return newInstrumentedBatch(ctx, c, bc.catfileLookupCounter), err
}
// We have not found any cached process, so we need to create a new one. In this
// case, we need to detach the process from the current context such that it does
// not get killed when the current context is done. Note that while we explicitly
// `Close()` processes in case this function fails, we must have a cancellable
// context or otherwise our `command` package will panic.
var cancel func()
ctx, cancel = context.WithCancel(context.Background())
defer func() {
if returnedErr != nil {
cancel()
}
}()
// We have to decorrelate the process from the current context given that it
// may potentially be reused across different RPC calls.
ctx = correlation.ContextWithCorrelation(ctx, "")
ctx = opentracing.ContextWithSpan(ctx, nil)
cacheKey := newCacheKey(sessionID, repo)
requestDone := ctx.Done()
if c, ok := bc.checkout(cacheKey); ok {
go bc.returnWhenDone(requestDone, cacheKey, c)
return newInstrumentedBatch(ctx, c, bc.catfileLookupCounter), nil
}
c, err := newBatch(ctx, repo, bc.catfileLookupCounter)
// if we are using caching, create a fresh context for the new batch
// and initialize the new batch with a bc key and cancel function
cacheCtx, cacheCancel := context.WithCancel(context.Background())
c, ctx, err := bc.newBatch(cacheCtx, repo)
if err != nil {
cacheCancel()
return nil, err
}
defer func() {
// If we somehow fail after creating a new Batch process, then we want to kill
// spawned processes right away.
if returnedErr != nil {
c.Close()
}
}()
bc.totalCatfileProcesses.Inc()
bc.currentCatfileProcesses.Inc()
go func() {
<-ctx.Done()
bc.currentCatfileProcesses.Dec()
}()
if isCacheable {
// If the process is cacheable, then we want to put the process into the cache when
// the current outer context is done.
go bc.returnWhenDone(requestDone, cacheKey, c)
}
return c, nil
c.cancel = cacheCancel
go bc.returnWhenDone(requestDone, cacheKey, c)
return newInstrumentedBatch(ctx, c, bc.catfileLookupCounter), nil
}
func (bc *BatchCache) returnWhenDone(done <-chan struct{}, cacheKey key, c *batch) {
<-done
if bc.cachedProcessDone != nil {
defer func() {
bc.cachedProcessDone.Broadcast()
}()
}
if c == nil || c.isClosed() {
return
}
......
package catfile
import (
"context"
"errors"
"io"
"os"
"sync"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/repository"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/labkit/correlation"
"google.golang.org/grpc/metadata"
)
func TestCache_add(t *testing.T) {
func TestCacheAdd(t *testing.T) {
const maxLen = 3
bc := newCache(time.Hour, maxLen, defaultEvictionInterval)
cfg, repo, _ := testcfg.BuildWithRepo(t)
key0 := mustCreateKey(t, "0", repo)
value0 := mustCreateBatch(t, cfg, repo)
key0 := testKey(0)
value0 := testValue()
bc.add(key0, value0)
requireCacheValid(t, bc)
key1 := mustCreateKey(t, "1", repo)
bc.add(key1, mustCreateBatch(t, cfg, repo))
key1 := testKey(1)
bc.add(key1, testValue())
requireCacheValid(t, bc)
key2 := mustCreateKey(t, "2", repo)
bc.add(key2, mustCreateBatch(t, cfg, repo))
key2 := testKey(2)
bc.add(key2, testValue())
requireCacheValid(t, bc)
// Because maxLen is 3, and key0 is oldest, we expect that adding key3
// will kick out key0.
key3 := mustCreateKey(t, "3", repo)
bc.add(key3, mustCreateBatch(t, cfg, repo))
key3 := testKey(3)
bc.add(key3, testValue())
requireCacheValid(t, bc)
require.Equal(t, maxLen, bc.len(), "length should be maxLen")
require.True(t, value0.isClosed(), "value0 should be closed")
require.Equal(t, []key{key1, key2, key3}, keys(t, bc))
require.Equal(t, []key{key1, key2, key3}, keys(bc))
}
func TestCache_addTwice(t *testing.T) {
func TestCacheAddTwice(t *testing.T) {
bc := newCache(time.Hour, 10, defaultEvictionInterval)
cfg, repo, _ := testcfg.BuildWithRepo(t)
key0 := mustCreateKey(t, "0", repo)
value0 := mustCreateBatch(t, cfg, repo)
key0 := testKey(0)
value0 := testValue()
bc.add(key0, value0)
requireCacheValid(t, bc)
key1 := mustCreateKey(t, "1", repo)
value1 := mustCreateBatch(t, cfg, repo)
bc.add(key1, value1)
key1 := testKey(1)
bc.add(key1, testValue())
requireCacheValid(t, bc)
require.Equal(t, key0, bc.head().key, "key0 should be oldest key")
value2 := mustCreateBatch(t, cfg, repo)
value2 := testValue()
bc.add(key0, value2)
requireCacheValid(t, bc)
require.Equal(t, key1, bc.head().key, "key1 should be oldest key")
require.Equal(t, value1, bc.head().value)
require.Equal(t, value2, bc.head().value)
require.True(t, value0.isClosed(), "value0 should be closed")
}
func TestCache_checkout(t *testing.T) {
func TestCacheCheckout(t *testing.T) {
bc := newCache(time.Hour, 10, defaultEvictionInterval)
cfg, repo, _ := testcfg.BuildWithRepo(t)
key0 := mustCreateKey(t, "0", repo)
value0 := mustCreateBatch(t, cfg, repo)
key0 := testKey(0)
value0 := testValue()
bc.add(key0, value0)
v, ok := bc.checkout(key{sessionID: "foo"})
......@@ -102,33 +85,31 @@ func TestCache_checkout(t *testing.T) {
require.Nil(t, v, "value from second checkout")
}
func TestCache_enforceTTL(t *testing.T) {
func TestCacheEnforceTTL(t *testing.T) {
ttl := time.Hour
bc := newCache(ttl, 10, defaultEvictionInterval)
cfg, repo, _ := testcfg.BuildWithRepo(t)
sleep := func() { time.Sleep(2 * time.Millisecond) }
key0 := mustCreateKey(t, "0", repo)
value0 := mustCreateBatch(t, cfg, repo)
key0 := testKey(0)
value0 := testValue()
bc.add(key0, value0)
sleep()
key1 := mustCreateKey(t, "1", repo)
value1 := mustCreateBatch(t, cfg, repo)
key1 := testKey(1)
value1 := testValue()
bc.add(key1, value1)
sleep()
cutoff := time.Now().Add(ttl)
sleep()
key2 := mustCreateKey(t, "2", repo)
bc.add(key2, mustCreateBatch(t, cfg, repo))
key2 := testKey(2)
bc.add(key2, testValue())
sleep()
key3 := mustCreateKey(t, "3", repo)
bc.add(key3, mustCreateBatch(t, cfg, repo))
key3 := testKey(3)
bc.add(key3, testValue())
sleep()
requireCacheValid(t, bc)
......@@ -142,198 +123,40 @@ func TestCache_enforceTTL(t *testing.T) {
require.True(t, v.isClosed(), "value %d %v should be closed", i, v)
}
require.Equal(t, []key{key2, key3}, keys(t, bc), "remaining keys after EnforceTTL")
require.Equal(t, []key{key2, key3}, keys(bc), "remaining keys after EnforceTTL")
bc.enforceTTL(cutoff)
requireCacheValid(t, bc)
require.Equal(t, []key{key2, key3}, keys(t, bc), "remaining keys after second EnforceTTL")
require.Equal(t, []key{key2, key3}, keys(bc), "remaining keys after second EnforceTTL")
}
func TestCache_autoExpiry(t *testing.T) {
func TestAutoExpiry(t *testing.T) {
ttl := 5 * time.Millisecond
refresh := 1 * time.Millisecond
bc := newCache(ttl, 10, refresh)
cfg, repo, _ := testcfg.BuildWithRepo(t)
key0 := mustCreateKey(t, "0", repo)
value0 := mustCreateBatch(t, cfg, repo)
key0 := testKey(0)
value0 := testValue()
bc.add(key0, value0)
requireCacheValid(t, bc)
require.Contains(t, keys(t, bc), key0, "key should still be in map")
require.Contains(t, keys(bc), key0, "key should still be in map")
require.False(t, value0.isClosed(), "value should not have been closed")
// Wait for the monitor goroutine to do its thing
for i := 0; i < 100; i++ {
if len(keys(t, bc)) == 0 {
if len(keys(bc)) == 0 {
break
}
time.Sleep(refresh)
}
require.Empty(t, keys(t, bc), "key should no longer be in map")
require.Empty(t, keys(bc), "key should no longer be in map")
require.True(t, value0.isClosed(), "value should be closed after eviction")
}
func TestCache_BatchProcess(t *testing.T) {
cfg, repo, _ := testcfg.BuildWithRepo(t)
repoExecutor := newRepoExecutor(t, cfg, repo)
cache := newCache(time.Hour, 10, time.Hour)
defer cache.Evict()
cache.cachedProcessDone = sync.NewCond(&sync.Mutex{})
t.Run("uncancellable", func(t *testing.T) {
ctx := context.Background()
require.PanicsWithValue(t, "empty ctx.Done() in catfile.Batch.New()", func() {
_, _ = cache.BatchProcess(ctx, repoExecutor)
})
})
t.Run("uncacheable", func(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
ctx = correlation.ContextWithCorrelation(ctx, "1")
// The context doesn't carry a session ID and is thus uncacheable.
// The process should never get returned to the cache and must be
// killed on context cancellation.
batchProcess, err := cache.BatchProcess(ctx, repoExecutor)
require.NoError(t, err)
batch, ok := batchProcess.(*batch)
require.True(t, ok, "expected batch")
correlation := correlation.ExtractFromContext(batch.objectReader.creationCtx)
require.Equal(t, "1", correlation)
cancel()
// We're cheating a bit here to avoid creating a racy test by reaching into the
// batch processes and trying to read from their stdout. If the cancel did kill the
// process as expected, then the stdout should be closed and we'll get an EOF.
for _, reader := range []io.Reader{batch.objectInfoReader.cmd, batch.objectReader.cmd} {
output, err := io.ReadAll(reader)
if err != nil {
require.True(t, errors.Is(err, os.ErrClosed))
} else {
require.NoError(t, err)
}
require.Empty(t, output)
}
// This is another bug: while we do not have any resource leaks because processes
// got killed as expected, the batch itself is not considered to have been closed.
require.False(t, batch.isClosed())
require.Empty(t, keys(t, cache))
})
t.Run("cacheable", func(t *testing.T) {
defer cache.Evict()
ctx, cancel := testhelper.Context()
defer cancel()
ctx = correlation.ContextWithCorrelation(ctx, "1")
ctx = testhelper.MergeIncomingMetadata(ctx,
metadata.Pairs(SessionIDField, "1"),
)
batchProcess, err := cache.BatchProcess(ctx, repoExecutor)
require.NoError(t, err)
batch, ok := batchProcess.(*batch)
require.True(t, ok, "expected instrumented batch")
// The correlation ID must be empty given that this will be a cached long-running
// processes that can be reused across multpile RPC calls.
correlation := correlation.ExtractFromContext(batch.objectReader.creationCtx)
require.Empty(t, correlation)
// Cancel the context such that the process will be considered for return to the
// cache and wait for the cache to collect it.
cache.cachedProcessDone.L.Lock()
cancel()
defer cache.cachedProcessDone.L.Unlock()
cache.cachedProcessDone.Wait()
keys := keys(t, cache)
require.Equal(t, []key{{
sessionID: "1",
repoStorage: repo.GetStorageName(),
repoRelPath: repo.GetRelativePath(),
}}, keys)
// Assert that we can still read from the cached process.
_, err = batchProcess.Info(ctx, "refs/heads/master")
require.NoError(t, err)
})
t.Run("dirty process does not get cached", func(t *testing.T) {
defer cache.Evict()
ctx, cancel := testhelper.Context()
defer cancel()
ctx = testhelper.MergeIncomingMetadata(ctx,
metadata.Pairs(SessionIDField, "1"),
)
batchProcess, err := cache.BatchProcess(ctx, repoExecutor)
require.NoError(t, err)
// While we request object data, we do not consume it at all. The reader is thus
// dirty and cannot be reused and shouldn't be returned to the cache.
_, err = batchProcess.Commit(ctx, "refs/heads/master")
require.NoError(t, err)
// Cancel the context such that the process will be considered for return to the
// cache and wait for the cache to collect it.
cache.cachedProcessDone.L.Lock()
cancel()
defer cache.cachedProcessDone.L.Unlock()
cache.cachedProcessDone.Wait()
require.Empty(t, keys(t, cache))
// The process should be killed now.
_, err = batchProcess.Info(ctx, "refs/heads/master")
require.True(t, errors.Is(err, os.ErrClosed))
})
t.Run("closed process does not get cached", func(t *testing.T) {
defer cache.Evict()
ctx, cancel := testhelper.Context()
defer cancel()
ctx = testhelper.MergeIncomingMetadata(ctx,
metadata.Pairs(SessionIDField, "1"),
)
batchProcess, err := cache.BatchProcess(ctx, repoExecutor)
require.NoError(t, err)
batch, ok := batchProcess.(*batch)
require.True(t, ok, "expected batch")
// Closed processes naturally cannot be reused anymore and thus shouldn't ever get
// cached.
batch.Close()
// Cancel the context such that the process will be considered for return to the
// cache and wait for the cache to collect it.
cache.cachedProcessDone.L.Lock()
cancel()
defer cache.cachedProcessDone.L.Unlock()
cache.cachedProcessDone.Wait()
require.Empty(t, keys(t, cache))
})
}
func requireCacheValid(t *testing.T, bc *BatchCache) {
bc.entriesMutex.Lock()
defer bc.entriesMutex.Unlock()
......@@ -344,30 +167,11 @@ func requireCacheValid(t *testing.T, bc *BatchCache) {
}
}
func mustCreateBatch(t *testing.T, cfg config.Cfg, repo repository.GitRepo) *batch {
t.Helper()
ctx, cancel := testhelper.Context()
t.Cleanup(cancel)
batch, err := newBatch(ctx, newRepoExecutor(t, cfg, repo), nil)
require.NoError(t, err)
return batch
}
func mustCreateKey(t *testing.T, sessionID string, repo repository.GitRepo) key {
t.Helper()
key, cacheable := newCacheKey(sessionID, repo)
require.True(t, cacheable)
return key
}
func testValue() *batch { return &batch{} }
func keys(t *testing.T, bc *BatchCache) []key {
t.Helper()
func testKey(i int) key { return key{sessionID: fmt.Sprintf("key-%d", i)} }
func keys(bc *BatchCache) []key {
bc.entriesMutex.Lock()
defer bc.entriesMutex.Unlock()
......
package catfile
import (
"bufio"
"context"
"fmt"
"io"
"sync"
"github.com/opentracing/opentracing-go"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
)
// batchCheckProcess encapsulates a 'git cat-file --batch-check' process
type batchCheckProcess struct {
r *bufio.Reader
w io.WriteCloser
sync.Mutex
}
func (bc *BatchCache) newBatchCheckProcess(ctx context.Context, repo git.RepositoryExecutor) (*batchCheckProcess, error) {
process := &batchCheckProcess{}
var stdinReader io.Reader
stdinReader, process.w = io.Pipe()
span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.BatchCheckProcess")
batchCmd, err := repo.Exec(ctx,
git.SubCmd{
Name: "cat-file",
Flags: []git.Option{
git.Flag{Name: "--batch-check"},
},
},
git.WithStdin(stdinReader),
)
if err != nil {
return nil, err
}
process.r = bufio.NewReader(batchCmd)
go func() {
<-ctx.Done()
// This is crucial to prevent leaking file descriptors.
process.w.Close()
span.Finish()
}()
if bc.injectSpawnErrors {
// Testing only: intentionally leak process
return nil, &simulatedBatchSpawnError{}
}
return process, nil
}
func (bc *batchCheckProcess) info(revision git.Revision) (*ObjectInfo, error) {
bc.Lock()
defer bc.Unlock()
if _, err := fmt.Fprintln(bc.w, revision.String()); err != nil {
return nil, err
}
return ParseObjectInfo(bc.r)
}
......@@ -8,25 +8,13 @@ import (
"sync"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v14/internal/command"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
)
// Object represents data returned by `git cat-file --batch`
type Object struct {
// ObjectInfo represents main information about object
ObjectInfo
// Reader provides raw data about object. It differs for each type of object(tag, commit, tree, log, etc.)
io.Reader
}
// objectReader is a reader for Git objects. Reading is implemented via a long-lived `git cat-file
// --batch` process such that we do not have to spawn a new process for each object we are about to
// read.
type objectReader struct {
cmd *command.Command
stdout *bufio.Reader
// batch encapsulates a 'git cat-file --batch' process
type batchProcess struct {
r *bufio.Reader
w io.WriteCloser
// n is a state machine that tracks how much data we still have to read
// from r. Legal states are: n==0, this means we can do a new request on
......@@ -40,20 +28,16 @@ type objectReader struct {
// instead of doing unsafe memory writes (to n) and failing in some
// unpredictable way.
sync.Mutex
// creationCtx is the context in which this reader has been created. This context may
// potentially be decorrelated from the "real" RPC context in case the reader is going to be
// cached.
creationCtx context.Context
counter *prometheus.CounterVec
}
func newObjectReader(
ctx context.Context,
repo git.RepositoryExecutor,
counter *prometheus.CounterVec,
) (*objectReader, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.ObjectReader")
func (bc *BatchCache) newBatchProcess(ctx context.Context, repo git.RepositoryExecutor) (*batchProcess, error) {
bc.totalCatfileProcesses.Inc()
b := &batchProcess{}
var stdinReader io.Reader
stdinReader, b.w = io.Pipe()
span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.BatchProcess")
batchCmd, err := repo.Exec(ctx,
git.SubCmd{
......@@ -62,113 +46,102 @@ func newObjectReader(
git.Flag{Name: "--batch"},
},
},
git.WithStdin(command.SetupStdin),
git.WithStdin(stdinReader),
)
if err != nil {
return nil, err
}
objectReader := &objectReader{
cmd: batchCmd,
stdout: bufio.NewReader(batchCmd),
creationCtx: ctx,
counter: counter,
}
b.r = bufio.NewReader(batchCmd)
bc.currentCatfileProcesses.Inc()
go func() {
<-ctx.Done()
objectReader.Close()
// This Close() is crucial to prevent leaking file descriptors.
b.w.Close()
bc.currentCatfileProcesses.Dec()
span.Finish()
}()
return objectReader, nil
}
if bc.injectSpawnErrors {
// Testing only: intentionally leak process
return nil, &simulatedBatchSpawnError{}
}
func (o *objectReader) Close() {
_ = o.cmd.Wait()
return b, nil
}
func (o *objectReader) reader(
ctx context.Context,
revision git.Revision,
expectedType string,
) (*Object, error) {
finish := startSpan(o.creationCtx, ctx, fmt.Sprintf("Batch.Object(%s)", expectedType), revision)
defer finish()
if o.counter != nil {
o.counter.WithLabelValues(expectedType).Inc()
}
o.Lock()
defer o.Unlock()
func (b *batchProcess) reader(revision git.Revision, expectedType string) (*Object, error) {
b.Lock()
defer b.Unlock()
if o.n == 1 {
if b.n == 1 {
// Consume linefeed
if _, err := o.stdout.ReadByte(); err != nil {
if _, err := b.r.ReadByte(); err != nil {
return nil, err
}
o.n--
b.n--
}
if o.n != 0 {
return nil, fmt.Errorf("cannot create new Object: batch contains %d unread bytes", o.n)
if b.n != 0 {
return nil, fmt.Errorf("cannot create new Object: batch contains %d unread bytes", b.n)
}
if _, err := fmt.Fprintln(o.cmd, revision.String()); err != nil {
if _, err := fmt.Fprintln(b.w, revision.String()); err != nil {
return nil, err
}
oi, err := ParseObjectInfo(o.stdout)
oi, err := ParseObjectInfo(b.r)
if err != nil {
return nil, err
}
o.n = oi.Size + 1
b.n = oi.Size + 1
if oi.Type != expectedType {
// This is a programmer error and it should never happen. But if it does,
// we need to leave the cat-file process in a good state
if _, err := io.CopyN(io.Discard, o.stdout, o.n); err != nil {
if _, err := io.CopyN(io.Discard, b.r, b.n); err != nil {
return nil, err
}
o.n = 0
b.n = 0
return nil, NotFoundError{error: fmt.Errorf("expected %s to be a %s, got %s", oi.Oid, expectedType, oi.Type)}
}
return &Object{
ObjectInfo: *oi,
Reader: &objectDataReader{
objectReader: o,
r: io.LimitReader(o.stdout, oi.Size),
Reader: &batchReader{
batchProcess: b,
r: io.LimitReader(b.r, oi.Size),
},
}, nil
}
func (o *objectReader) consume(nBytes int) {
o.n -= int64(nBytes)
if o.n < 1 {
func (b *batchProcess) consume(nBytes int) {
b.n -= int64(nBytes)
if b.n < 1 {
panic("too many bytes read from batch")
}
}
func (o *objectReader) hasUnreadData() bool {
o.Lock()
defer o.Unlock()
func (b *batchProcess) hasUnreadData() bool {
b.Lock()
defer b.Unlock()
return o.n > 1
return b.n > 1
}
type objectDataReader struct {
*objectReader
type batchReader struct {
*batchProcess
r io.Reader
}
func (o *objectDataReader) Read(p []byte) (int, error) {
o.objectReader.Lock()
defer o.objectReader.Unlock()
func (br *batchReader) Read(p []byte) (int, error) {
br.batchProcess.Lock()
defer br.batchProcess.Unlock()
n, err := o.r.Read(p)
o.objectReader.consume(n)
n, err := br.r.Read(p)
br.batchProcess.consume(n)
return n, err
}
......@@ -3,7 +3,6 @@ package catfile
import (
"bytes"
"context"
"fmt"
"io"
"os"
"os/exec"
......@@ -14,6 +13,7 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/command"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/repository"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
......@@ -22,11 +22,30 @@ import (
"google.golang.org/grpc/metadata"
)
type repoExecutor struct {
repository.GitRepo
gitCmdFactory git.CommandFactory
}
func (e *repoExecutor) Exec(ctx context.Context, cmd git.Cmd, opts ...git.CmdOpt) (*command.Command, error) {
return e.gitCmdFactory.New(ctx, e.GitRepo, cmd, opts...)
}
func (e *repoExecutor) ExecAndWait(ctx context.Context, cmd git.Cmd, opts ...git.CmdOpt) error {
command, err := e.Exec(ctx, cmd, opts...)
if err != nil {
return err
}
return command.Wait()
}
func setupBatch(t *testing.T, ctx context.Context) (config.Cfg, Batch, *gitalypb.Repository) {
t.Helper()
cfg, repo, _ := testcfg.BuildWithRepo(t)
repoExecutor := newRepoExecutor(t, cfg, repo)
repoExecutor := &repoExecutor{
GitRepo: repo, gitCmdFactory: git.NewExecCommandFactory(cfg),
}
cache := newCache(1*time.Hour, 1000, defaultEvictionInterval)
batch, err := cache.BatchProcess(ctx, repoExecutor)
......@@ -327,14 +346,6 @@ func TestRepeatedCalls(t *testing.T) {
require.Equal(t, string(treeBytes), string(tree2))
}
type failingTestRepoExecutor struct {
git.RepositoryExecutor
}
func (e failingTestRepoExecutor) Exec(context.Context, git.Cmd, ...git.CmdOpt) (*command.Command, error) {
return nil, fmt.Errorf("simulated error")
}
func TestSpawnFailure(t *testing.T) {
cfg, testRepo, _ := testcfg.BuildWithRepo(t)
testRepoExecutor := &repoExecutor{
......@@ -380,12 +391,10 @@ func TestSpawnFailure(t *testing.T) {
ctx2, cancel2 := testhelper.Context()
defer cancel2()
failingTestRepoExecutor := failingTestRepoExecutor{
RepositoryExecutor: testRepoExecutor,
}
_, err = catfileWithFreshSessionID(ctx2, cache, failingTestRepoExecutor)
cache.injectSpawnErrors = true
_, err = catfileWithFreshSessionID(ctx2, cache, testRepoExecutor)
require.Error(t, err, "expect simulated error")
require.EqualError(t, err, "simulated error")
require.IsType(t, &simulatedBatchSpawnError{}, err)
require.True(
t,
......
package catfile
import (
"io"
)
// Object represents data returned by `git cat-file --batch`
type Object struct {
// ObjectInfo represents main information about object
ObjectInfo
// Reader provides raw data about object. It differs for each type of object(tag, commit, tree, log, etc.)
io.Reader
}
package catfile
import (
"fmt"
"io"
"testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
)
func TestObjectReader_reader(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
cfg, repoProto, repoPath := testcfg.BuildWithRepo(t)
commitID, err := git.NewObjectIDFromHex(text.ChompBytes(gittest.Exec(t, cfg, "-C", repoPath, "rev-parse", "refs/heads/master")))
require.NoError(t, err)
commitContents := gittest.Exec(t, cfg, "-C", repoPath, "cat-file", "-p", "refs/heads/master")
t.Run("read existing object by ref", func(t *testing.T) {
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
object, err := reader.reader(ctx, "refs/heads/master", "commit")
require.NoError(t, err)
data, err := io.ReadAll(object)
require.NoError(t, err)
require.Equal(t, commitContents, data)
})
t.Run("read existing object by object ID", func(t *testing.T) {
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
object, err := reader.reader(ctx, commitID.Revision(), "commit")
require.NoError(t, err)
data, err := io.ReadAll(object)
require.NoError(t, err)
require.Contains(t, string(data), "Merge branch 'cherry-pick-ce369011' into 'master'\n")
})
t.Run("read commit with wrong type", func(t *testing.T) {
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
_, err = reader.reader(ctx, commitID.Revision(), "tag")
require.EqualError(t, err, fmt.Sprintf("expected %s to be a tag, got commit", commitID))
// Verify that we're still able to read a commit after the previous read has failed.
object, err := reader.reader(ctx, commitID.Revision(), "commit")
require.NoError(t, err)
data, err := io.ReadAll(object)
require.NoError(t, err)
require.Equal(t, commitContents, data)
})
t.Run("read missing ref", func(t *testing.T) {
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
_, err = reader.reader(ctx, "refs/heads/does-not-exist", "commit")
require.EqualError(t, err, "object not found")
// Verify that we're still able to read a commit after the previous read has failed.
object, err := reader.reader(ctx, commitID.Revision(), "commit")
require.NoError(t, err)
data, err := io.ReadAll(object)
require.NoError(t, err)
require.Equal(t, commitContents, data)
})
t.Run("read fails when not consuming previous object", func(t *testing.T) {
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
_, err = reader.reader(ctx, commitID.Revision(), "commit")
require.NoError(t, err)
// We haven't yet consumed the previous object, so this must now fail.
_, err = reader.reader(ctx, commitID.Revision(), "commit")
require.EqualError(t, err, fmt.Sprintf("cannot create new Object: batch contains %d unread bytes", len(commitContents)+1))
})
t.Run("read fails when partially consuming previous object", func(t *testing.T) {
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
object, err := reader.reader(ctx, commitID.Revision(), "commit")
require.NoError(t, err)
_, err = io.CopyN(io.Discard, object, 100)
require.NoError(t, err)
// We haven't yet consumed the previous object, so this must now fail.
_, err = reader.reader(ctx, commitID.Revision(), "commit")
require.EqualError(t, err, fmt.Sprintf("cannot create new Object: batch contains %d unread bytes", len(commitContents)-100+1))
})
t.Run("read increments Prometheus counter", func(t *testing.T) {
counter := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"type"})
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), counter)
require.NoError(t, err)
for objectType, revision := range map[string]git.Revision{
"commit": "refs/heads/master",
"tree": "refs/heads/master^{tree}",
"blob": "refs/heads/master:README",
"tag": "refs/tags/v1.1.1",
} {
require.Equal(t, float64(0), testutil.ToFloat64(counter.WithLabelValues(objectType)))
object, err := reader.reader(ctx, revision, objectType)
require.NoError(t, err)
require.Equal(t, float64(1), testutil.ToFloat64(counter.WithLabelValues(objectType)))
_, err = io.Copy(io.Discard, object)
require.NoError(t, err)
}
})
}
......@@ -2,15 +2,10 @@ package catfile
import (
"bufio"
"context"
"fmt"
"strconv"
"strings"
"sync"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v14/internal/command"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
)
......@@ -68,76 +63,3 @@ func ParseObjectInfo(stdout *bufio.Reader) (*ObjectInfo, error) {
Size: objectSize,
}, nil
}
// objectInfoReader is a reader for Git object information. This reader is implemented via a
// long-lived `git cat-file --batch-check` process such that we do not have to spawn a separate
// process per object info we're about to read.
type objectInfoReader struct {
cmd *command.Command
stdout *bufio.Reader
sync.Mutex
// creationCtx is the context in which this reader has been created. This context may
// potentially be decorrelated from the "real" RPC context in case the reader is going to be
// cached.
creationCtx context.Context
counter *prometheus.CounterVec
}
func newObjectInfoReader(
ctx context.Context,
repo git.RepositoryExecutor,
counter *prometheus.CounterVec,
) (*objectInfoReader, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.ObjectInfoReader")
batchCmd, err := repo.Exec(ctx,
git.SubCmd{
Name: "cat-file",
Flags: []git.Option{
git.Flag{Name: "--batch-check"},
},
},
git.WithStdin(command.SetupStdin),
)
if err != nil {
return nil, err
}
objectInfoReader := &objectInfoReader{
cmd: batchCmd,
stdout: bufio.NewReader(batchCmd),
creationCtx: ctx,
counter: counter,
}
go func() {
<-ctx.Done()
// This is crucial to prevent leaking file descriptors.
objectInfoReader.Close()
span.Finish()
}()
return objectInfoReader, nil
}
func (o *objectInfoReader) Close() {
_ = o.cmd.Wait()
}
func (o *objectInfoReader) info(ctx context.Context, revision git.Revision) (*ObjectInfo, error) {
finish := startSpan(o.creationCtx, ctx, "Batch.Info", revision)
defer finish()
if o.counter != nil {
o.counter.WithLabelValues("info").Inc()
}
o.Lock()
defer o.Unlock()
if _, err := fmt.Fprintln(o.cmd, revision.String()); err != nil {
return nil, err
}
return ParseObjectInfo(o.stdout)
}
......@@ -2,18 +2,10 @@ package catfile
import (
"bufio"
"fmt"
"strings"
"testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
)
func TestParseObjectInfoSuccess(t *testing.T) {
......@@ -74,91 +66,3 @@ func TestParseObjectInfoErrors(t *testing.T) {
})
}
}
func TestObjectInfoReader(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
cfg, repoProto, repoPath := testcfg.BuildWithRepo(t)
oiByRevision := make(map[string]*ObjectInfo)
for _, revision := range []string{
"refs/heads/master",
"refs/heads/master^{tree}",
"refs/heads/master:README",
"refs/tags/v1.1.1",
} {
revParseOutput := gittest.Exec(t, cfg, "-C", repoPath, "rev-parse", revision)
objectID, err := git.NewObjectIDFromHex(text.ChompBytes(revParseOutput))
require.NoError(t, err)
objectType := text.ChompBytes(gittest.Exec(t, cfg, "-C", repoPath, "cat-file", "-t", revision))
objectContents := gittest.Exec(t, cfg, "-C", repoPath, "cat-file", objectType, revision)
oiByRevision[revision] = &ObjectInfo{
Oid: objectID,
Type: objectType,
Size: int64(len(objectContents)),
}
}
for _, tc := range []struct {
desc string
revision git.Revision
expectedErr error
expectedInfo *ObjectInfo
}{
{
desc: "commit by ref",
revision: "refs/heads/master",
expectedInfo: oiByRevision["refs/heads/master"],
},
{
desc: "commit by ID",
revision: oiByRevision["refs/heads/master"].Oid.Revision(),
expectedInfo: oiByRevision["refs/heads/master"],
},
{
desc: "tree",
revision: oiByRevision["refs/heads/master^{tree}"].Oid.Revision(),
expectedInfo: oiByRevision["refs/heads/master^{tree}"],
},
{
desc: "blob",
revision: oiByRevision["refs/heads/master:README"].Oid.Revision(),
expectedInfo: oiByRevision["refs/heads/master:README"],
},
{
desc: "tag",
revision: oiByRevision["refs/tags/v1.1.1"].Oid.Revision(),
expectedInfo: oiByRevision["refs/tags/v1.1.1"],
},
{
desc: "nonexistent ref",
revision: "refs/heads/does-not-exist",
expectedErr: NotFoundError{fmt.Errorf("object not found")},
},
} {
t.Run(tc.desc, func(t *testing.T) {
counter := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"type"})
reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), counter)
require.NoError(t, err)
require.Equal(t, float64(0), testutil.ToFloat64(counter.WithLabelValues("info")))
info, err := reader.info(ctx, tc.revision)
require.Equal(t, tc.expectedErr, err)
require.Equal(t, tc.expectedInfo, info)
require.Equal(t, float64(1), testutil.ToFloat64(counter.WithLabelValues("info")))
// Verify that we do another request no matter whether the previous call
// succeeded or failed.
_, err = reader.info(ctx, "refs/heads/master")
require.NoError(t, err)
require.Equal(t, float64(2), testutil.ToFloat64(counter.WithLabelValues("info")))
})
}
}
package catfile
import (
"context"
"os"
"testing"
"gitlab.com/gitlab-org/gitaly/v14/internal/command"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/repository"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
)
......@@ -22,27 +17,3 @@ func testMain(m *testing.M) int {
defer cleanup()
return m.Run()
}
type repoExecutor struct {
repository.GitRepo
gitCmdFactory git.CommandFactory
}
func newRepoExecutor(t *testing.T, cfg config.Cfg, repo repository.GitRepo) git.RepositoryExecutor {
return &repoExecutor{
GitRepo: repo,
gitCmdFactory: git.NewExecCommandFactory(cfg),
}
}
func (e *repoExecutor) Exec(ctx context.Context, cmd git.Cmd, opts ...git.CmdOpt) (*command.Command, error) {
return e.gitCmdFactory.New(ctx, e.GitRepo, cmd, opts...)
}
func (e *repoExecutor) ExecAndWait(ctx context.Context, cmd git.Cmd, opts ...git.CmdOpt) error {
command, err := e.Exec(ctx, cmd, opts...)
if err != nil {
return err
}
return command.Wait()
}
package catfile
import (
"context"
"github.com/opentracing/opentracing-go"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/labkit/correlation"
)
func revisionTag(revision git.Revision) opentracing.Tag {
return opentracing.Tag{Key: "revision", Value: revision}
}
func correlationIDTag(ctx context.Context) opentracing.Tag {
return opentracing.Tag{Key: "correlation_id", Value: correlation.ExtractFromContext(ctx)}
}
func startSpan(innerCtx context.Context, outerCtx context.Context, methodName string, revision git.Revision) func() {
innerSpan, ctx := opentracing.StartSpanFromContext(innerCtx, methodName, revisionTag(revision))
outerSpan, _ := opentracing.StartSpanFromContext(outerCtx, methodName, revisionTag(revision), correlationIDTag(ctx))
return func() {
innerSpan.Finish()
outerSpan.Finish()
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment