Skip to content
Snippets Groups Projects

catfile: Introduce request queues to allow batching reads

Merged Patrick Steinhardt requested to merge pks-catfile-queue into master
1 unresolved thread
1 file
+ 21
6
Compare changes
  • Side-by-side
  • Inline
  • Same as with the preceding commit, the object reaches into the parent to
    determine whether the object reade is closed or not. This comlicates
    locking and is unclean design-wise given that concerns of the child
    should be disentangled from its parent.
    
    Fix this design issue by keeping track of the closed status in object
    reader and object separately. This allows us to now remove the reference
    to the parent reader completely.
@@ -84,8 +84,10 @@ func (o *objectReader) close() {
defer o.Unlock()
_ = o.cmd.Wait()
o.closed = true
if o.currentObject != nil {
o.currentObject.close()
}
}
func (o *objectReader) isClosed() bool {
@@ -115,12 +117,17 @@ func (o *objectReader) Object(
o.Lock()
defer o.Unlock()
if o.closed {
return nil, fmt.Errorf("cannot read object: %w", os.ErrClosed)
}
if o.currentObject != nil {
// If the current object is still dirty, then we must not try to read a new object.
if o.currentObject.isDirty() {
return nil, fmt.Errorf("current object has not been fully read")
}
o.currentObject.close()
o.currentObject = nil
// If we have already read an object before, then we must consume the trailing
@@ -142,7 +149,6 @@ func (o *objectReader) Object(
o.currentObject = &Object{
ObjectInfo: *oi,
parent: o,
dataReader: io.LimitedReader{
R: o.stdout,
N: oi.Size,
@@ -157,8 +163,6 @@ func (o *objectReader) Object(
type Object struct {
// ObjectInfo represents main information about object
ObjectInfo
// parent is the objectReader which has created the Object.
parent *objectReader
// dataReader is reader which has all the object data.
dataReader io.LimitedReader
@@ -170,6 +174,9 @@ type Object struct {
// during the whole read duration -- and thus it'd become impossible to check for dirtiness
// at the same time.
bytesRemaining int64
// closed determines whether the object is closed for reading.
closed int32
}
// isDirty determines whether the object is still dirty, that is whether there are still unconsumed
@@ -178,8 +185,16 @@ func (o *Object) isDirty() bool {
return atomic.LoadInt64(&o.bytesRemaining) != 0
}
func (o *Object) isClosed() bool {
return atomic.LoadInt32(&o.closed) == 1
}
func (o *Object) close() {
atomic.StoreInt32(&o.closed, 1)
}
func (o *Object) Read(p []byte) (int, error) {
if o.parent.closed {
if o.isClosed() {
return 0, os.ErrClosed
}
@@ -195,7 +210,7 @@ func (o *Object) Read(p []byte) (int, error) {
// via `io.Copy()`, which in turn will use `WriteTo()` or `ReadFrom()` in case these interfaces are
// implemented by the respective reader or writer.
func (o *Object) WriteTo(w io.Writer) (int64, error) {
if o.parent.closed {
if o.isClosed() {
return 0, os.ErrClosed
}
Loading