From de66c8bbab2be88361739db8cb4775e615dca020 Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Wed, 25 Aug 2021 16:58:36 +0000 Subject: [PATCH 1/2] Remove redundant variable --- internal/streamcache/cursor.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/internal/streamcache/cursor.go b/internal/streamcache/cursor.go index 04006b0a36..cf1a9be6a3 100644 --- a/internal/streamcache/cursor.go +++ b/internal/streamcache/cursor.go @@ -13,7 +13,6 @@ type cursor struct { subscribers []*notifier m sync.RWMutex doneChan chan struct{} - done bool } func newCursor() *cursor { return &cursor{doneChan: make(chan struct{})} } @@ -38,9 +37,12 @@ func (c *cursor) Unsubscribe(n *notifier) { } } - if len(c.subscribers) == 0 && !c.done { - c.done = true - close(c.doneChan) + if len(c.subscribers) == 0 { + select { + case <-c.doneChan: + default: + close(c.doneChan) + } } } -- GitLab From 070c69dc5f0f5b5aedbc1020ee5d3db8ce13127b Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Wed, 25 Aug 2021 16:58:52 +0000 Subject: [PATCH 2/2] Add sendfile(2) support to streamcache This will make io.Copy from a streamcache entry into a network socket more efficient. It will not necessarily be faster than a regular io.Copy, but the Gitaly server will spend fewer CPU cycles copying the data. Changelog: performance --- internal/streamcache/cache.go | 18 ++-- internal/streamcache/pipe.go | 39 +++++--- internal/streamcache/pipe_linux.go | 118 ++++++++++++++++++++++ internal/streamcache/pipe_test.go | 2 +- internal/streamcache/sendfile_test.go | 138 ++++++++++++++++++++++++++ 5 files changed, 293 insertions(+), 22 deletions(-) create mode 100644 internal/streamcache/pipe_linux.go create mode 100644 internal/streamcache/sendfile_test.go diff --git a/internal/streamcache/cache.go b/internal/streamcache/cache.go index 939366f6d6..ac2fc28992 100644 --- a/internal/streamcache/cache.go +++ b/internal/streamcache/cache.go @@ -109,7 +109,7 @@ func (NullCache) FindOrCreate(key string, create func(io.Writer) error) (s *Stre pr, pw := io.Pipe() w := newWaiter() go func() { w.SetError(runCreate(pw, create)) }() - return &Stream{reader: pr, waiter: w}, true, nil + return &Stream{ReadCloser: pr, waiter: w}, true, nil } // Stop is a no-op. @@ -233,18 +233,22 @@ type entry struct { // Wait()). Callers must always call Close() to prevent resource leaks. type Stream struct { waiter *waiter - reader io.ReadCloser + io.ReadCloser } // Wait returns the error value of the Stream. If ctx is canceled, // Wait unblocks and returns early. func (s *Stream) Wait(ctx context.Context) error { return s.waiter.Wait(ctx) } -// Read reads from the underlying stream of the stream. -func (s *Stream) Read(p []byte) (int, error) { return s.reader.Read(p) } +// WriteTo implements io.WriterTo. For some w on some platforms, this +// uses sendfile to make copying data more efficient. +func (s *Stream) WriteTo(w io.Writer) (int64, error) { + if wt, ok := s.ReadCloser.(io.WriterTo); ok { + return wt.WriteTo(w) + } -// Close releases the underlying resources of the stream. -func (s *Stream) Close() error { return s.reader.Close() } + return io.Copy(w, s.ReadCloser) +} func (c *cache) newEntry(key string, create func(io.Writer) error) (_ *Stream, _ *entry, err error) { e := &entry{ @@ -296,7 +300,7 @@ func (c *cache) newEntry(key string, create func(io.Writer) error) (_ *Stream, _ } func (e *entry) wrapReadCloser(r io.ReadCloser) *Stream { - return &Stream{reader: r, waiter: e.waiter} + return &Stream{ReadCloser: r, waiter: e.waiter} } func runCreate(w io.WriteCloser, create func(io.Writer) error) (err error) { diff --git a/internal/streamcache/pipe.go b/internal/streamcache/pipe.go index 714e178920..881f37f7ec 100644 --- a/internal/streamcache/pipe.go +++ b/internal/streamcache/pipe.go @@ -79,7 +79,7 @@ type pipe struct { wnotifier *notifier } -func newPipe(w namedWriteCloser) (io.ReadCloser, *pipe, error) { +func newPipe(w namedWriteCloser) (*pipeReader, *pipe, error) { p := &pipe{ name: w.Name(), w: w, @@ -98,19 +98,15 @@ func newPipe(w namedWriteCloser) (io.ReadCloser, *pipe, error) { func (p *pipe) Write(b []byte) (int, error) { // Loop (block) until at least one reader catches up with our last write. - for done := false; !done && p.wcursor.Position() > p.rcursor.Position(); { + for p.wcursor.Position() > p.rcursor.Position() { select { case <-p.wcursor.Done(): - done = true + // Prevent writing bytes no-one will read + return 0, errWrongCloseOrder case <-p.wnotifier.C: } } - // Prevent writing bytes no-one will read - if p.wcursor.IsDone() { - return 0, errWrongCloseOrder - } - n, err := p.w.Write(b) // Notify blocked readers, if any, of new data that is available. @@ -146,7 +142,7 @@ func (p *pipe) Close() error { func (p *pipe) RemoveFile() error { return os.Remove(p.name) } -func (p *pipe) OpenReader() (io.ReadCloser, error) { +func (p *pipe) OpenReader() (*pipeReader, error) { p.m.Lock() defer p.m.Unlock() @@ -183,6 +179,12 @@ type pipeReader struct { reader io.ReadCloser position int64 notifier *notifier + + // golangci-lint does not like this struct field because it is only used + // on Linux. On macOS, it complains the field is unused. On Linux, it + // complains that "nolint:unused" is unused. So we need "unused" for + // platforms other than Linux, and "nolintlint" for Linux. + sendfileCalledSuccessfully bool //nolint:unused,nolintlint } func (pr *pipeReader) Close() error { @@ -190,26 +192,35 @@ func (pr *pipeReader) Close() error { return pr.reader.Close() } -func (pr *pipeReader) Read(b []byte) (int, error) { +func (pr *pipeReader) waitReadable() bool { // Block until there is data for us to read. Note that it can actually - // happen that r.position > pr.pipe.wcursor, so we really want >= here, not + // happen that pr.position > pr.pipe.wcursor, so we really want >= here, not // ==. There is a race between the moment the write end finishes writing // a chunk of data to the file and the moment pr.pipe.wcursor gets // updated. - for done := false; !done && pr.position >= pr.pipe.wcursor.Position(); { +wait: + for pr.position >= pr.pipe.wcursor.Position() { select { case <-pr.pipe.rcursor.Done(): - done = true + break wait case <-pr.notifier.C: } } - n, err := pr.reader.Read(b) + return pr.position < pr.pipe.wcursor.Position() +} + +func (pr *pipeReader) advancePosition(n int) { pr.position += int64(n) // The writer is subscribed to changes in pr.pipe.rcursor. If it is // currently blocked, this call to SetPosition() will unblock it. pr.pipe.rcursor.SetPosition(pr.position) +} +func (pr *pipeReader) Read(b []byte) (int, error) { + pr.waitReadable() + n, err := pr.reader.Read(b) + pr.advancePosition(n) return n, err } diff --git a/internal/streamcache/pipe_linux.go b/internal/streamcache/pipe_linux.go new file mode 100644 index 0000000000..a1ff386d4c --- /dev/null +++ b/internal/streamcache/pipe_linux.go @@ -0,0 +1,118 @@ +package streamcache + +import ( + "errors" + "io" + "syscall" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + sendfileCounter = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "gitaly_streamcache_sendfile_bytes_total", + Help: "Number of bytes sent using sendfile", + }, + ) +) + +func (pr *pipeReader) WriteTo(w io.Writer) (int64, error) { + if n, err := pr.writeTo(w); n > 0 || err == nil { + return n, err + } + + // If n == 0 and err != nil then we were unable to use sendfile(2), so + // try again using io.Copy and Read. Use struct{ io.Reader } to prevent + // infinite recursion into pr.WriteTo(). + return io.Copy(w, struct{ io.Reader }{pr}) +} + +// writeTo tries to copy the pipe stream to w using the Linux sendfile(2) +// system call. This may fail for various reasons (type of w, type of +// pr.reader, Linux kernel version). If writeTo returns 0 and a non-nil +// error, the caller should try again with io.Copy and pipeReader.Read(). +func (pr *pipeReader) writeTo(w io.Writer) (int64, error) { + // If w does not have a file descriptor (maybe it is a TLS connection, or + // a *bytes.Buffer), this first step will fail. + dst, err := getRawconn(w) + if err != nil { + return 0, err + } + + // pr.reader must also be a thing with a file descriptor. + src, err := getRawconn(pr.reader) + if err != nil { + return 0, err + } + + start := pr.position + var errRead, errWrite, errSendfile error + + // src.Read gives us the file descriptor of the underlying file of the + // pipe. + errRead = src.Read(func(srcFd uintptr) bool { + // dst.Write gives us the file descriptor of the thing we write into, + // typically a network socket. + errWrite = dst.Write(func(dstFd uintptr) bool { + errSendfile = pr.sendfile(int(dstFd), int(srcFd)) + + // If errSendfile is EAGAIN, ask Go runtime to wait for dst to become + // writeable again by returning false. + return errSendfile != syscall.EAGAIN + }) + + return true + }) + written := pr.position - start + + for _, err := range []error{errRead, errWrite, errSendfile} { + if err != nil { + return written, err + } + } + + return written, nil +} + +func getRawconn(v interface{}) (syscall.RawConn, error) { + if sc, ok := v.(syscall.Conn); ok { + return sc.SyscallConn() + } + + return nil, errors.New("value does not implement syscall.Conn") +} + +func (pr *pipeReader) sendfile(dst int, src int) error { + for { + // There is no point in calling sendfile if we already know there is no + // unread data in the underlying file of the pipe. So let's wait for the + // pipe to tell us there is data. + if !pr.waitReadable() { + // We are at EOF. + return nil + } + + // We need to give sendfile a maximum number of bytes to copy. It is OK + // if it is too big because sendfile won't block trying to copy bytes + // that aren't in the file. We picked 4MB because that is the same + // maximum the Go stdlib uses. + // https://github.com/golang/go/blob/go1.16.7/src/internal/poll/sendfile_linux.go#L11 + const maxBytes = 4 << 20 + n, err := syscall.Sendfile(dst, src, nil, maxBytes) + + // sendfile returns -1 in case of errors, so we must check if n is + // positive. + if n > 0 { + pr.advancePosition(n) + sendfileCounter.Add(float64(n)) + pr.sendfileCalledSuccessfully = true + } + + // In case of EINTR, ignore the error and retry immediately + if err != nil && err != syscall.EINTR { + return err + } + } +} diff --git a/internal/streamcache/pipe_test.go b/internal/streamcache/pipe_test.go index 291ed574ed..87372cfc9e 100644 --- a/internal/streamcache/pipe_test.go +++ b/internal/streamcache/pipe_test.go @@ -13,7 +13,7 @@ import ( "github.com/stretchr/testify/require" ) -func createPipe(t *testing.T) (io.ReadCloser, *pipe) { +func createPipe(t *testing.T) (*pipeReader, *pipe) { t.Helper() f, err := ioutil.TempFile("", "gitaly-streamcache-test") diff --git a/internal/streamcache/sendfile_test.go b/internal/streamcache/sendfile_test.go new file mode 100644 index 0000000000..786c085b42 --- /dev/null +++ b/internal/streamcache/sendfile_test.go @@ -0,0 +1,138 @@ +//go:build linux +// +build linux + +package streamcache + +import ( + "bytes" + "io" + "io/ioutil" + "math/rand" + "os" + "testing" + "testing/iotest" + + "github.com/stretchr/testify/require" +) + +type wrappedFile struct{ f *os.File } + +func (wf *wrappedFile) Write(p []byte) (int, error) { return wf.f.Write(p) } +func (wf *wrappedFile) Close() error { return wf.f.Close() } +func (wf *wrappedFile) Name() string { return wf.f.Name() } + +func TestPipe_WriteTo(t *testing.T) { + data := make([]byte, 10*1024*1024) + _, err := rand.Read(data) + require.NoError(t, err) + + testCases := []struct { + desc string + create func(t *testing.T) namedWriteCloser + sendfile bool + }{ + { + desc: "os.File", + create: func(t *testing.T) namedWriteCloser { + f, err := ioutil.TempFile("", "pipe write to") + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, os.Remove(f.Name())) }) + return f + }, + sendfile: true, + }, + { + desc: "non-file writer", + create: func(t *testing.T) namedWriteCloser { + f, err := ioutil.TempFile("", "pipe write to") + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, os.Remove(f.Name())) }) + return &wrappedFile{f} + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + pr, p := createPipe(t) + defer pr.Close() + + errC := make(chan error, 1) + go func() { + errC <- func() error { + defer p.Close() + + // To exercise pipe blocking logic, we want to prevent writing all of + // data at once. + r := iotest.HalfReader(bytes.NewReader(data)) + if _, err := io.Copy(p, r); err != nil { + return err + } + return p.Close() + }() + }() + + outW := tc.create(t) + require.NoError(t, err) + defer outW.Close() + + n, err := pr.WriteTo(outW) + require.NoError(t, err) + require.Equal(t, int64(len(data)), n) + + require.NoError(t, outW.Close()) + require.NoError(t, <-errC) + + outBytes, err := ioutil.ReadFile(outW.Name()) + require.NoError(t, err) + // Don't use require.Equal because we don't want a 10MB error message. + require.True(t, bytes.Equal(data, outBytes)) + + require.Equal(t, tc.sendfile, pr.sendfileCalledSuccessfully) + }) + } +} + +func TestPipe_WriteTo_EAGAIN(t *testing.T) { + data := make([]byte, 10*1024*1024) + _, err := rand.Read(data) + require.NoError(t, err) + + pr, p := createPipe(t) + defer pr.Close() + defer p.Close() + + _, err = p.Write(data) + require.NoError(t, err) + require.NoError(t, p.Close()) + + fr, fw, err := os.Pipe() + require.NoError(t, err) + defer fr.Close() + defer fw.Close() + + errC := make(chan error, 1) + go func() { + errC <- func() error { + defer fw.Close() + + // This will try to write 10MB into fw at once, which will fail because + // the pipe buffer is too small. Then sendfile will return EAGAIN. Doing + // this tests our ability to handle EAGAIN correctly. + _, err := pr.WriteTo(fw) + if err != nil { + return err + } + + return fw.Close() + }() + }() + + out, err := ioutil.ReadAll(fr) + require.NoError(t, err) + // Don't use require.Equal because we don't want a 10MB error message. + require.True(t, bytes.Equal(data, out)) + + require.NoError(t, <-errC) + require.True(t, pr.sendfileCalledSuccessfully) +} -- GitLab