Commit 0f9ba5fc authored by Kamil Trzciński's avatar Kamil Trzciński 🔴

Merge branch 'fix/docker-executor-output-attaching' into 'master'

Replace io.Copy with stdcopy.StdCopy for docker output handling

See merge request !503
parents ef4b0e73 d41e38e6
......@@ -203,6 +203,11 @@
"Comment": "v1.13.0",
"Rev": "49bf474f9ed7ce7143a59d1964ff7b7fd9b52178"
},
{
"ImportPath": "github.com/docker/docker/pkg/stdcopy",
"Comment": "v1.13.0",
"Rev": "49bf474f9ed7ce7143a59d1964ff7b7fd9b52178"
},
{
"ImportPath": "github.com/docker/docker/pkg/tlsconfig",
"Comment": "v1.13.0",
......
......@@ -18,6 +18,8 @@ import (
"github.com/docker/distribution/reference"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/pkg/stdcopy"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/common"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/executors"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/helpers"
......@@ -832,7 +834,7 @@ func (s *executor) watchContainer(id string, input io.Reader, abort chan interfa
// Copy any output to the build trace
go func() {
_, err := io.Copy(s.BuildTrace, hijacked.Reader)
_, err := stdcopy.StdCopy(s.BuildTrace, s.BuildTrace, hijacked.Reader)
if err != nil {
attachCh <- err
}
......@@ -1149,7 +1151,7 @@ func (s *executor) waitForServiceContainer(service *types.Container, timeout tim
hijacked, err := s.client.ContainerLogs(context.TODO(), service.ID, options)
if err == nil {
defer hijacked.Close()
io.Copy(&containerBuffer, hijacked)
stdcopy.StdCopy(&containerBuffer, &containerBuffer, hijacked)
if containerLog := containerBuffer.String(); containerLog != "" {
buffer.WriteString("\n")
buffer.WriteString(strings.TrimSpace(containerLog))
......
package docker_test
import (
"bytes"
"net/url"
"os"
"os/exec"
"regexp"
"strings"
"testing"
"time"
......@@ -185,6 +187,36 @@ func TestDockerCommandBuildCancel(t *testing.T) {
assert.EqualError(t, err, "canceled")
}
func TestDockerCommandOutput(t *testing.T) {
if helpers.SkipIntegrationTests(t, "docker", "info") {
return
}
successfulBuild, err := common.GetRemoteSuccessfulBuild()
assert.NoError(t, err)
build := &common.Build{
GetBuildResponse: successfulBuild,
Runner: &common.RunnerConfig{
RunnerSettings: common.RunnerSettings{
Executor: "docker",
Docker: &common.DockerConfig{
Image: "alpine",
},
},
},
}
var buf []byte
buffer := bytes.NewBuffer(buf)
err = build.Run(&common.Config{}, &common.Trace{Writer: buffer})
assert.NoError(t, err)
re, err := regexp.Compile("(?m)^Cloning into '/builds/gitlab-org/gitlab-test'...")
assert.NoError(t, err)
assert.Regexp(t, re, buffer.String())
}
func TestDockerPrivilegedServiceAccessingBuildsFolder(t *testing.T) {
if helpers.SkipIntegrationTests(t, "docker", "info") {
return
......
package stdcopy
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"sync"
)
// StdType is the type of standard stream
// a writer can multiplex to.
type StdType byte
const (
// Stdin represents standard input stream type.
Stdin StdType = iota
// Stdout represents standard output stream type.
Stdout
// Stderr represents standard error steam type.
Stderr
stdWriterPrefixLen = 8
stdWriterFdIndex = 0
stdWriterSizeIndex = 4
startingBufLen = 32*1024 + stdWriterPrefixLen + 1
)
var bufPool = &sync.Pool{New: func() interface{} { return bytes.NewBuffer(nil) }}
// stdWriter is wrapper of io.Writer with extra customized info.
type stdWriter struct {
io.Writer
prefix byte
}
// Write sends the buffer to the underneath writer.
// It inserts the prefix header before the buffer,
// so stdcopy.StdCopy knows where to multiplex the output.
// It makes stdWriter to implement io.Writer.
func (w *stdWriter) Write(p []byte) (n int, err error) {
if w == nil || w.Writer == nil {
return 0, errors.New("Writer not instantiated")
}
if p == nil {
return 0, nil
}
header := [stdWriterPrefixLen]byte{stdWriterFdIndex: w.prefix}
binary.BigEndian.PutUint32(header[stdWriterSizeIndex:], uint32(len(p)))
buf := bufPool.Get().(*bytes.Buffer)
buf.Write(header[:])
buf.Write(p)
n, err = w.Writer.Write(buf.Bytes())
n -= stdWriterPrefixLen
if n < 0 {
n = 0
}
buf.Reset()
bufPool.Put(buf)
return
}
// NewStdWriter instantiates a new Writer.
// Everything written to it will be encapsulated using a custom format,
// and written to the underlying `w` stream.
// This allows multiple write streams (e.g. stdout and stderr) to be muxed into a single connection.
// `t` indicates the id of the stream to encapsulate.
// It can be stdcopy.Stdin, stdcopy.Stdout, stdcopy.Stderr.
func NewStdWriter(w io.Writer, t StdType) io.Writer {
return &stdWriter{
Writer: w,
prefix: byte(t),
}
}
// StdCopy is a modified version of io.Copy.
//
// StdCopy will demultiplex `src`, assuming that it contains two streams,
// previously multiplexed together using a StdWriter instance.
// As it reads from `src`, StdCopy will write to `dstout` and `dsterr`.
//
// StdCopy will read until it hits EOF on `src`. It will then return a nil error.
// In other words: if `err` is non nil, it indicates a real underlying error.
//
// `written` will hold the total number of bytes written to `dstout` and `dsterr`.
func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, err error) {
var (
buf = make([]byte, startingBufLen)
bufLen = len(buf)
nr, nw int
er, ew error
out io.Writer
frameSize int
)
for {
// Make sure we have at least a full header
for nr < stdWriterPrefixLen {
var nr2 int
nr2, er = src.Read(buf[nr:])
nr += nr2
if er == io.EOF {
if nr < stdWriterPrefixLen {
return written, nil
}
break
}
if er != nil {
return 0, er
}
}
// Check the first byte to know where to write
switch StdType(buf[stdWriterFdIndex]) {
case Stdin:
fallthrough
case Stdout:
// Write on stdout
out = dstout
case Stderr:
// Write on stderr
out = dsterr
default:
return 0, fmt.Errorf("Unrecognized input header: %d", buf[stdWriterFdIndex])
}
// Retrieve the size of the frame
frameSize = int(binary.BigEndian.Uint32(buf[stdWriterSizeIndex : stdWriterSizeIndex+4]))
// Check if the buffer is big enough to read the frame.
// Extend it if necessary.
if frameSize+stdWriterPrefixLen > bufLen {
buf = append(buf, make([]byte, frameSize+stdWriterPrefixLen-bufLen+1)...)
bufLen = len(buf)
}
// While the amount of bytes read is less than the size of the frame + header, we keep reading
for nr < frameSize+stdWriterPrefixLen {
var nr2 int
nr2, er = src.Read(buf[nr:])
nr += nr2
if er == io.EOF {
if nr < frameSize+stdWriterPrefixLen {
return written, nil
}
break
}
if er != nil {
return 0, er
}
}
// Write the retrieved frame (without header)
nw, ew = out.Write(buf[stdWriterPrefixLen : frameSize+stdWriterPrefixLen])
if ew != nil {
return 0, ew
}
// If the frame has not been fully written: error
if nw != frameSize {
return 0, io.ErrShortWrite
}
written += int64(nw)
// Move the rest of the buffer to the beginning
copy(buf, buf[frameSize+stdWriterPrefixLen:])
// Move the index
nr -= frameSize + stdWriterPrefixLen
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment