Skip to content
Snippets Groups Projects
Commit 7f1fd9af authored by Patrick Steinhardt's avatar Patrick Steinhardt
Browse files

catfile: Split out object info reader queue

Similar to the preceding commit, this commit splits out a new object
info reader queue such that we can eventually change callers to use
batched requests for object info.

No change in behaviour is expected given that the queue is not yet used
by anything except as implementation detail of the `Info()` function.
parent aa318bf0
No related branches found
No related tags found
Loading
This commit is part of merge request !4032. Comments created here will be created in the context of that merge request.
...@@ -354,7 +354,7 @@ func TestCache_ObjectInfoReader(t *testing.T) { ...@@ -354,7 +354,7 @@ func TestCache_ObjectInfoReader(t *testing.T) {
// We're cheating a bit here to avoid creating a racy test by reaching into the // We're cheating a bit here to avoid creating a racy test by reaching into the
// process and trying to read from its stdout. If the cancel did kill the process as // process and trying to read from its stdout. If the cancel did kill the process as
// expected, then the stdout should be closed and we'll get an EOF. // expected, then the stdout should be closed and we'll get an EOF.
output, err := io.ReadAll(objectInfoReaderImpl.stdout) output, err := io.ReadAll(objectInfoReaderImpl.queue.stdout)
if err != nil { if err != nil {
require.True(t, errors.Is(err, os.ErrClosed)) require.True(t, errors.Is(err, os.ErrClosed))
} else { } else {
......
...@@ -4,9 +4,11 @@ import ( ...@@ -4,9 +4,11 @@ import (
"bufio" "bufio"
"context" "context"
"fmt" "fmt"
"io"
"os"
"strconv" "strconv"
"strings" "strings"
"sync" "sync/atomic"
"github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
...@@ -96,17 +98,16 @@ type ObjectInfoReader interface { ...@@ -96,17 +98,16 @@ type ObjectInfoReader interface {
// long-lived `git cat-file --batch-check` process such that we do not have to spawn a separate // long-lived `git cat-file --batch-check` process such that we do not have to spawn a separate
// process per object info we're about to read. // process per object info we're about to read.
type objectInfoReader struct { type objectInfoReader struct {
cmd *command.Command cmd *command.Command
stdout *bufio.Reader
sync.Mutex
closed bool
// creationCtx is the context in which this reader has been created. This context may // creationCtx is the context in which this reader has been created. This context may
// potentially be decorrelated from the "real" RPC context in case the reader is going to be // potentially be decorrelated from the "real" RPC context in case the reader is going to be
// cached. // cached.
creationCtx context.Context creationCtx context.Context
counter *prometheus.CounterVec counter *prometheus.CounterVec
queue objectInfoQueue
queueInUse int32
} }
func newObjectInfoReader( func newObjectInfoReader(
...@@ -131,9 +132,12 @@ func newObjectInfoReader( ...@@ -131,9 +132,12 @@ func newObjectInfoReader(
objectInfoReader := &objectInfoReader{ objectInfoReader := &objectInfoReader{
cmd: batchCmd, cmd: batchCmd,
stdout: bufio.NewReader(batchCmd),
creationCtx: ctx, creationCtx: ctx,
counter: counter, counter: counter,
queue: objectInfoQueue{
stdout: bufio.NewReader(batchCmd),
stdin: batchCmd,
},
} }
go func() { go func() {
<-ctx.Done() <-ctx.Done()
...@@ -146,36 +150,112 @@ func newObjectInfoReader( ...@@ -146,36 +150,112 @@ func newObjectInfoReader(
} }
func (o *objectInfoReader) close() { func (o *objectInfoReader) close() {
o.Lock() o.queue.close()
defer o.Unlock()
_ = o.cmd.Wait() _ = o.cmd.Wait()
o.closed = true
} }
func (o *objectInfoReader) isClosed() bool { func (o *objectInfoReader) isClosed() bool {
o.Lock() return o.queue.isClosed()
defer o.Unlock()
return o.closed
} }
func (o *objectInfoReader) isDirty() bool { func (o *objectInfoReader) isDirty() bool {
// We always consume object info directly, so the reader cannot ever be dirty. return o.queue.isDirty()
return false }
func (o *objectInfoReader) infoQueue(ctx context.Context, tracedMethod string) (*objectInfoQueue, func(), error) {
if !atomic.CompareAndSwapInt32(&o.queueInUse, 0, 1) {
return nil, nil, fmt.Errorf("object info queue already in use")
}
trace, finish := startTrace(ctx, o.creationCtx, o.counter, tracedMethod)
o.queue.trace = trace
return &o.queue, func() {
atomic.StoreInt32(&o.queueInUse, 0)
finish()
}, nil
} }
func (o *objectInfoReader) Info(ctx context.Context, revision git.Revision) (*ObjectInfo, error) { func (o *objectInfoReader) Info(ctx context.Context, revision git.Revision) (*ObjectInfo, error) {
trace, finish := startTrace(ctx, o.creationCtx, o.counter, "catfile.Info") queue, cleanup, err := o.infoQueue(ctx, "catfile.Info")
defer finish() if err != nil {
return nil, err
}
defer cleanup()
o.Lock() if err := queue.RequestInfo(revision); err != nil {
defer o.Unlock() return nil, err
}
if _, err := fmt.Fprintln(o.cmd, revision.String()); err != nil { objectInfo, err := queue.ReadInfo()
if err != nil {
return nil, err return nil, err
} }
trace.recordRequest("info")
return ParseObjectInfo(o.stdout) return objectInfo, nil
}
type objectInfoQueue struct {
stdout *bufio.Reader
stdin io.Writer
// outstandingRequests is the number of requests which have been queued up. Gets incremented
// on request, and decremented when starting to read an object (not when that object has
// been fully consumed).
outstandingRequests int64
// closed indicates whether the queue is closed for additional requests.
closed int32
// trace is the current tracing span.
trace *trace
}
  • Contributor

    It is pretty similar to the objectReaderQueue. Can we re-use code from it here?

  • Patrick Steinhardt @pks-gitlab

    changed this line in version 4 of the diff

    ·

    changed this line in version 4 of the diff

    Toggle commit list
  • Author Maintainer

    True. I bailed on that in my first iteration given that I was happy to finally have something that works because I've been trying to solve this problem for multiple days in a row. But it's not too hard to rectify that now, and it's definitely welcome to not have to duplicate code which makes use of atomics and locks and whatnot.

    So: done.

  • Please register or sign in to reply
func (q *objectInfoQueue) isDirty() bool {
return atomic.LoadInt64(&q.outstandingRequests) != 0
}
func (q *objectInfoQueue) isClosed() bool {
return atomic.LoadInt32(&q.closed) == 1
}
func (q *objectInfoQueue) close() {
atomic.StoreInt32(&q.closed, 1)
}
func (q *objectInfoQueue) RequestInfo(revision git.Revision) error {
if q.isClosed() {
return fmt.Errorf("cannot request object info: %w", os.ErrClosed)
}
if _, err := fmt.Fprintln(q.stdin, revision.String()); err != nil {
return fmt.Errorf("requesting object info: %w", err)
}
atomic.AddInt64(&q.outstandingRequests, 1)
return nil
}
func (q *objectInfoQueue) ReadInfo() (*ObjectInfo, error) {
if q.isClosed() {
return nil, fmt.Errorf("cannot read object info: %w", os.ErrClosed)
}
// We first need to determine wether there are any queued requests at all. If not, then we
// cannot read anything.
queuedRequests := atomic.LoadInt64(&q.outstandingRequests)
if queuedRequests == 0 {
return nil, fmt.Errorf("no outstanding request")
}
// And when there are, we need to remove one of these queued requests. We do so via
// `CompareAndSwapInt64()`, which easily allows us to detect concurrent access to the queue.
if !atomic.CompareAndSwapInt64(&q.outstandingRequests, queuedRequests, queuedRequests-1) {
return nil, fmt.Errorf("concurrent access to object info queue")
}
q.trace.recordRequest("info")
return ParseObjectInfo(q.stdout)
} }
...@@ -2,7 +2,9 @@ package catfile ...@@ -2,7 +2,9 @@ package catfile
import ( import (
"bufio" "bufio"
"errors"
"fmt" "fmt"
"os"
"strings" "strings"
"testing" "testing"
...@@ -162,3 +164,201 @@ func TestObjectInfoReader(t *testing.T) { ...@@ -162,3 +164,201 @@ func TestObjectInfoReader(t *testing.T) {
}) })
} }
} }
func TestObjectInfoReader_queue(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
cfg, repoProto, repoPath := testcfg.BuildWithRepo(t)
blobOID := gittest.WriteBlob(t, cfg, repoPath, []byte("foobar"))
blobInfo := ObjectInfo{
Oid: blobOID,
Type: "blob",
Size: int64(len("foobar")),
}
commitOID := gittest.WriteCommit(t, cfg, repoPath)
commitInfo := ObjectInfo{
Oid: commitOID,
Type: "commit",
Size: 225,
}
t.Run("read single info", func(t *testing.T) {
reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
queue, cleanup, err := reader.infoQueue(ctx, "trace")
require.NoError(t, err)
defer cleanup()
require.NoError(t, queue.RequestInfo(blobOID.Revision()))
info, err := queue.ReadInfo()
require.NoError(t, err)
require.Equal(t, &blobInfo, info)
})
t.Run("read multiple object infos", func(t *testing.T) {
reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
queue, cleanup, err := reader.infoQueue(ctx, "trace")
require.NoError(t, err)
defer cleanup()
for oid, objectInfo := range map[git.ObjectID]ObjectInfo{
blobOID: blobInfo,
commitOID: commitInfo,
} {
require.NoError(t, queue.RequestInfo(oid.Revision()))
info, err := queue.ReadInfo()
require.NoError(t, err)
require.Equal(t, &objectInfo, info)
}
})
t.Run("request multiple object infos", func(t *testing.T) {
reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
queue, cleanup, err := reader.infoQueue(ctx, "trace")
require.NoError(t, err)
defer cleanup()
require.NoError(t, queue.RequestInfo(blobOID.Revision()))
require.NoError(t, queue.RequestInfo(commitOID.Revision()))
for _, expectedInfo := range []ObjectInfo{blobInfo, commitInfo} {
info, err := queue.ReadInfo()
require.NoError(t, err)
require.Equal(t, &expectedInfo, info)
}
})
t.Run("read without request", func(t *testing.T) {
reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
queue, cleanup, err := reader.infoQueue(ctx, "trace")
require.NoError(t, err)
defer cleanup()
_, err = queue.ReadInfo()
require.Equal(t, errors.New("no outstanding request"), err)
})
t.Run("request invalid object info", func(t *testing.T) {
reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
queue, cleanup, err := reader.infoQueue(ctx, "trace")
require.NoError(t, err)
defer cleanup()
require.NoError(t, queue.RequestInfo("does-not-exist"))
_, err = queue.ReadInfo()
require.Equal(t, NotFoundError{errors.New("object not found")}, err)
})
t.Run("can continue reading after NotFoundError", func(t *testing.T) {
reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
queue, cleanup, err := reader.infoQueue(ctx, "trace")
require.NoError(t, err)
defer cleanup()
require.NoError(t, queue.RequestInfo("does-not-exist"))
_, err = queue.ReadInfo()
require.Equal(t, NotFoundError{errors.New("object not found")}, err)
// Requesting another object info after the previous one has failed should continue
// to work alright.
require.NoError(t, queue.RequestInfo(blobOID.Revision()))
info, err := queue.ReadInfo()
require.NoError(t, err)
require.Equal(t, &blobInfo, info)
})
t.Run("requesting multiple queues fails", func(t *testing.T) {
reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
_, cleanup, err := reader.infoQueue(ctx, "trace")
require.NoError(t, err)
defer cleanup()
_, _, err = reader.infoQueue(ctx, "trace")
require.Equal(t, errors.New("object info queue already in use"), err)
// After calling cleanup we should be able to create an object queue again.
cleanup()
_, cleanup, err = reader.infoQueue(ctx, "trace")
require.NoError(t, err)
defer cleanup()
})
t.Run("requesting object dirties reader", func(t *testing.T) {
reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
queue, cleanup, err := reader.infoQueue(ctx, "trace")
require.NoError(t, err)
defer cleanup()
require.False(t, reader.isDirty())
require.False(t, queue.isDirty())
require.NoError(t, queue.RequestInfo(blobOID.Revision()))
require.True(t, reader.isDirty())
require.True(t, queue.isDirty())
_, err = queue.ReadInfo()
require.NoError(t, err)
require.False(t, reader.isDirty())
require.False(t, queue.isDirty())
})
t.Run("closing queue blocks request", func(t *testing.T) {
reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
queue, cleanup, err := reader.infoQueue(ctx, "trace")
require.NoError(t, err)
defer cleanup()
queue.close()
require.True(t, reader.isClosed())
require.True(t, queue.isClosed())
require.Equal(t, fmt.Errorf("cannot request object info: %w", os.ErrClosed), queue.RequestInfo(blobOID.Revision()))
})
t.Run("closing queue blocks read", func(t *testing.T) {
reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
queue, cleanup, err := reader.infoQueue(ctx, "trace")
require.NoError(t, err)
defer cleanup()
// Request the object before we close the queue.
require.NoError(t, queue.RequestInfo(blobOID.Revision()))
queue.close()
require.True(t, reader.isClosed())
require.True(t, queue.isClosed())
_, err = queue.ReadInfo()
require.Equal(t, fmt.Errorf("cannot read object info: %w", os.ErrClosed), err)
})
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment