Commit 74d738a0 authored by Tomasz Maczukin's avatar Tomasz Maczukin

Merge branch 'store-traces-on-disk' into 'master'

Store traces on disk

See merge request !1315
parents 3e07892e 4c991bc5
Pipeline #64121006 passed with stages
in 76 minutes and 54 seconds
......@@ -121,21 +121,54 @@ func (mr *RunCommand) feedRunners(runners chan *common.RunnerConfig) {
}
}
func (mr *RunCommand) requeueRunner(runner *common.RunnerConfig, runners chan *common.RunnerConfig) {
select {
case runners <- runner:
mr.log().WithField("runner", runner.ShortDescription()).Debugln("Requeued the runner")
default:
mr.log().WithField("runner", runner.ShortDescription()).Debugln("Failed to requeue the runner: ")
}
}
// requestJob will check if the runner can send another concurrent request to
// GitLab, if not the return value is nil.
func (mr *RunCommand) requestJob(runner *common.RunnerConfig, sessionInfo *common.SessionInfo) *common.JobResponse {
func (mr *RunCommand) requestJob(runner *common.RunnerConfig, sessionInfo *common.SessionInfo) (common.JobTrace, *common.JobResponse, error) {
if !mr.buildsHelper.acquireRequest(runner) {
mr.log().WithField("runner", runner.ShortDescription()).
Debugln("Failed to request job: runner requestConcurrency meet")
return nil
return nil, nil, nil
}
defer mr.buildsHelper.releaseRequest(runner)
jobData, healthy := mr.network.RequestJob(*runner, sessionInfo)
mr.makeHealthy(runner.UniqueID(), healthy)
return jobData
if jobData == nil {
return nil, nil, nil
}
// Make sure to always close output
jobCredentials := &common.JobCredentials{
ID: jobData.ID,
Token: jobData.Token,
}
trace, err := mr.network.ProcessJob(*runner, jobCredentials)
if err != nil {
jobInfo := common.UpdateJobInfo{
ID: jobCredentials.ID,
State: common.Failed,
FailureReason: common.RunnerSystemFailure,
}
// send failure once
mr.network.UpdateJob(*runner, jobCredentials, jobInfo)
return nil, nil, err
}
trace.SetFailuresCollector(mr.failuresCollector)
return trace, jobData, nil
}
func (mr *RunCommand) processRunner(id int, runner *common.RunnerConfig, runners chan *common.RunnerConfig) (err error) {
......@@ -150,25 +183,16 @@ func (mr *RunCommand) processRunner(id int, runner *common.RunnerConfig, runners
}
defer releaseFn()
var features common.FeaturesInfo
provider.GetFeatures(&features)
buildSession, sessionInfo, err := mr.createSession(features)
buildSession, sessionInfo, err := mr.createSession(provider)
if err != nil {
return
}
// Receive a new build
jobData := mr.requestJob(runner, sessionInfo)
if jobData == nil {
trace, jobData, err := mr.requestJob(runner, sessionInfo)
if err != nil || jobData == nil {
return
}
// Make sure to always close output
jobCredentials := &common.JobCredentials{
ID: jobData.ID,
Token: jobData.Token,
}
trace := mr.network.ProcessJob(*runner, jobCredentials)
defer func() {
if err != nil {
fmt.Fprintln(trace, err.Error())
......@@ -178,8 +202,6 @@ func (mr *RunCommand) processRunner(id int, runner *common.RunnerConfig, runners
}
}()
trace.SetFailuresCollector(mr.failuresCollector)
// Create a new build
build, err := common.NewBuild(*jobData, runner, mr.abortBuilds, executorData)
if err != nil {
......@@ -193,13 +215,7 @@ func (mr *RunCommand) processRunner(id int, runner *common.RunnerConfig, runners
// Process the same runner by different worker again
// to speed up taking the builds
select {
case runners <- runner:
mr.log().WithField("runner", runner.ShortDescription()).Debugln("Requeued the runner")
default:
mr.log().WithField("runner", runner.ShortDescription()).Debugln("Failed to requeue the runner: ")
}
mr.requeueRunner(runner, runners)
// Process a build
return build.Run(mr.config, trace)
......@@ -224,7 +240,13 @@ func (mr *RunCommand) acquireRunnerResources(provider common.ExecutorProvider, r
return executorData, releaseFn, nil
}
func (mr *RunCommand) createSession(features common.FeaturesInfo) (*session.Session, *common.SessionInfo, error) {
func (mr *RunCommand) createSession(provider common.ExecutorProvider) (*session.Session, *common.SessionInfo, error) {
var features common.FeaturesInfo
if err := provider.GetFeatures(&features); err != nil {
return nil, nil, err
}
if mr.sessionServer == nil || !features.Session {
return nil, nil, nil
}
......
......@@ -46,7 +46,7 @@ func TestProcessRunner_BuildLimit(t *testing.T) {
mNetwork := common.MockNetwork{}
defer mNetwork.AssertExpectations(t)
mNetwork.On("RequestJob", mock.Anything, mock.Anything).Return(&jobData, true)
mNetwork.On("ProcessJob", mock.Anything, mock.Anything).Return(&mJobTrace)
mNetwork.On("ProcessJob", mock.Anything, mock.Anything).Return(&mJobTrace, nil)
var runningBuilds uint32
e := common.MockExecutor{}
......
......@@ -96,7 +96,11 @@ func (r *RunSingleCommand) processBuild(data common.ExecutorData, abortSignal ch
ID: jobData.ID,
Token: jobData.Token,
}
trace := r.network.ProcessJob(r.RunnerConfig, jobCredentials)
trace, err := r.network.ProcessJob(r.RunnerConfig, jobCredentials)
if err != nil {
return err
}
defer trace.Fail(err, common.NoneFailure)
err = newBuild.Run(config, trace)
......
......@@ -77,7 +77,7 @@ func mockingExecutionStack(t *testing.T, executorName string, maxBuilds int, job
_, cancel := context.WithCancel(context.Background())
jobTrace := common.Trace{Writer: ioutil.Discard, CancelFunc: cancel}
mockNetwork.On("RequestJob", mock.Anything, mock.Anything).Return(&jobData, true).Times(maxBuilds)
processJob := mockNetwork.On("ProcessJob", mock.Anything, mock.Anything).Return(&jobTrace).Times(maxBuilds)
processJob := mockNetwork.On("ProcessJob", mock.Anything, mock.Anything).Return(&jobTrace, nil).Times(maxBuilds)
if job != nil {
processJob.Run(job)
}
......
......@@ -48,7 +48,7 @@ func (_m *MockNetwork) PatchTrace(config RunnerConfig, jobCredentials *JobCreden
}
// ProcessJob provides a mock function with given fields: config, buildCredentials
func (_m *MockNetwork) ProcessJob(config RunnerConfig, buildCredentials *JobCredentials) JobTrace {
func (_m *MockNetwork) ProcessJob(config RunnerConfig, buildCredentials *JobCredentials) (JobTrace, error) {
ret := _m.Called(config, buildCredentials)
var r0 JobTrace
......@@ -60,7 +60,14 @@ func (_m *MockNetwork) ProcessJob(config RunnerConfig, buildCredentials *JobCred
}
}
return r0
var r1 error
if rf, ok := ret.Get(1).(func(RunnerConfig, *JobCredentials) error); ok {
r1 = rf(config, buildCredentials)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// RegisterRunner provides a mock function with given fields: config, parameters
......
......@@ -380,5 +380,5 @@ type Network interface {
PatchTrace(config RunnerConfig, jobCredentials *JobCredentials, content []byte, startOffset int) (int, UpdateState)
DownloadArtifacts(config JobCredentials, artifactsFile string) DownloadState
UploadRawArtifacts(config JobCredentials, reader io.Reader, options ArtifactsOptions) UploadState
ProcessJob(config RunnerConfig, buildCredentials *JobCredentials) JobTrace
ProcessJob(config RunnerConfig, buildCredentials *JobCredentials) (JobTrace, error)
}
......@@ -5,6 +5,8 @@ import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"sync"
"github.com/markelog/trie"
......@@ -18,8 +20,10 @@ const defaultBytesLimit = 4 * 1024 * 1024 // 4MB
type Buffer struct {
writer io.WriteCloser
lock sync.RWMutex
log bytes.Buffer
logMaskedSize int
logFile *os.File
logSize int
logWriter *bufio.Writer
advanceBuffer bytes.Buffer
bytesLimit int
finish chan struct{}
......@@ -43,34 +47,50 @@ func (b *Buffer) SetLimit(size int) {
b.bytesLimit = size
}
func (b *Buffer) limitExceededMessage() string {
return fmt.Sprintf("\n%sJob's log exceeded limit of %v bytes.%s\n", helpers.ANSI_BOLD_RED, b.bytesLimit, helpers.ANSI_RESET)
func (b *Buffer) Size() int {
return b.logSize
}
func (b *Buffer) Bytes() []byte {
b.lock.RLock()
defer b.lock.RUnlock()
func (b *Buffer) Reader(offset, n int) (io.ReadSeeker, error) {
b.lock.Lock()
defer b.lock.Unlock()
err := b.logWriter.Flush()
if err != nil {
return nil, err
}
return b.log.Bytes()[0:b.logMaskedSize]
return io.NewSectionReader(b.logFile, int64(offset), int64(n)), nil
}
func (b *Buffer) String() string {
return string(b.Bytes())
func (b *Buffer) Bytes(offset, n int) ([]byte, error) {
reader, err := b.Reader(offset, n)
if err != nil {
return nil, err
}
return ioutil.ReadAll(reader)
}
func (b *Buffer) Write(data []byte) (n int, err error) {
return b.writer.Write(data)
}
func (b *Buffer) Close() error {
func (b *Buffer) Finish() {
// wait for trace to finish
err := b.writer.Close()
b.writer.Close()
<-b.finish
return err
}
func (b *Buffer) advanceAllUnsafe() {
b.logMaskedSize = b.log.Len()
func (b *Buffer) Close() {
_ = b.logFile.Close()
_ = os.Remove(b.logFile.Name())
}
func (b *Buffer) advanceAllUnsafe() error {
n, err := b.advanceBuffer.WriteTo(b.logWriter)
b.logSize += int(n)
return err
}
func (b *Buffer) advanceAll() {
......@@ -84,70 +104,69 @@ func (b *Buffer) advanceAll() {
func (b *Buffer) advanceLogUnsafe() error {
// advance all if no masking is enabled
if b.maskTree == nil {
b.advanceAllUnsafe()
return nil
return b.advanceAllUnsafe()
}
rest := string(b.log.Bytes()[b.logMaskedSize:])
rest := b.advanceBuffer.String()
results := b.maskTree.Search(rest)
if len(results) == 0 {
// we can advance as no match was found
b.advanceAllUnsafe()
return nil
return b.advanceAllUnsafe()
}
// full match was found
if len(results) == 1 && results[0].Key == rest {
b.log.Truncate(b.logMaskedSize)
b.log.WriteString(maskedText)
b.advanceAllUnsafe()
b.advanceBuffer.Reset()
b.advanceBuffer.WriteString(maskedText)
return b.advanceAllUnsafe()
}
// partial match, wait for more characters
return nil
}
func (b *Buffer) writeRune(r rune) (int, error) {
func (b *Buffer) limitExceededMessage() string {
return fmt.Sprintf("\n%sJob's log exceeded limit of %v bytes.%s\n", helpers.ANSI_BOLD_RED, b.bytesLimit, helpers.ANSI_RESET)
}
func (b *Buffer) writeRune(r rune) error {
b.lock.Lock()
defer b.lock.Unlock()
n, err := b.log.WriteRune(r)
if err != nil {
return n, err
// over trace limit
if b.logSize > b.bytesLimit {
return io.EOF
}
err = b.advanceLogUnsafe()
if err != nil {
return n, err
if _, err := b.advanceBuffer.WriteRune(r); err != nil {
return err
}
if b.log.Len() < b.bytesLimit {
return n, nil
if err := b.advanceLogUnsafe(); err != nil {
return err
}
b.log.WriteString(b.limitExceededMessage())
return n, io.EOF
// under trace limit
if b.logSize <= b.bytesLimit {
return nil
}
b.advanceBuffer.Reset()
b.advanceBuffer.WriteString(b.limitExceededMessage())
return b.advanceAllUnsafe()
}
func (b *Buffer) process(pipe *io.PipeReader) {
defer pipe.Close()
stopped := false
reader := bufio.NewReader(pipe)
for {
r, s, err := reader.ReadRune()
if s <= 0 {
break
} else if stopped {
// ignore symbols if job log exceeded limit
continue
} else if err == nil {
_, err = b.writeRune(r)
if err == io.EOF {
stopped = true
}
b.writeRune(r)
} else {
// ignore invalid characters
continue
......@@ -158,13 +177,20 @@ func (b *Buffer) process(pipe *io.PipeReader) {
close(b.finish)
}
func New() *Buffer {
func New() (*Buffer, error) {
logFile, err := ioutil.TempFile("", "trace")
if err != nil {
return nil, err
}
reader, writer := io.Pipe()
buffer := &Buffer{
writer: writer,
bytesLimit: defaultBytesLimit,
finish: make(chan struct{}),
logFile: logFile,
logWriter: bufio.NewWriter(logFile),
}
go buffer.process(reader)
return buffer
return buffer, nil
}
......@@ -17,29 +17,42 @@ func TestVariablesMasking(t *testing.T) {
"containing",
}
buffer := New()
buffer, err := New()
require.NoError(t, err)
defer buffer.Close()
buffer.SetMasked(maskedValues)
_, err := buffer.Write([]byte(traceMessage))
_, err = buffer.Write([]byte(traceMessage))
require.NoError(t, err)
err = buffer.Close()
buffer.Finish()
content, err := buffer.Bytes(0, 1000)
require.NoError(t, err)
assert.Equal(t, "Th[MASKED] [MASKED] the [MASKED] message [MASKED] [MASKED] [MASKED]s", buffer.String())
assert.Equal(t, "Th[MASKED] [MASKED] the [MASKED] message [MASKED] [MASKED] [MASKED]s", string(content))
}
func TestTraceLimit(t *testing.T) {
traceMessage := "This is the long message"
buffer := New()
buffer, err := New()
require.NoError(t, err)
defer buffer.Close()
buffer.SetLimit(10)
_, err := buffer.Write([]byte(traceMessage))
require.NoError(t, err)
for i := 0; i < 100; i++ {
_, err = buffer.Write([]byte(traceMessage))
require.NoError(t, err)
}
buffer.Finish()
err = buffer.Close()
content, err := buffer.Bytes(0, 1000)
require.NoError(t, err)
assert.Contains(t, buffer.String(), "Job's log exceeded limit of")
assert.Equal(t, "This is the\n\x1b[31;1mJob's log exceeded limit of 10 bytes.\x1b[0;m\n",
string(content))
}
......@@ -530,10 +530,14 @@ func (n *GitLabClient) DownloadArtifacts(config common.JobCredentials, artifacts
}
}
func (n *GitLabClient) ProcessJob(config common.RunnerConfig, jobCredentials *common.JobCredentials) common.JobTrace {
trace := newJobTrace(n, config, jobCredentials)
func (n *GitLabClient) ProcessJob(config common.RunnerConfig, jobCredentials *common.JobCredentials) (common.JobTrace, error) {
trace, err := newJobTrace(n, config, jobCredentials)
if err != nil {
return nil, err
}
trace.start()
return trace
return trace, nil
}
func NewGitLabClientWithRequestStatusesMap(rsMap *APIRequestStatusesMap) *GitLabClient {
......
......@@ -765,12 +765,12 @@ func TestRangeMismatchPatchTrace(t *testing.T) {
endOffset, state := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken},
patchTraceContent[11:], 11)
assert.Equal(t, UpdateRangeMismatch, state)
assert.Equal(t, 10, endOffset) // TODO: (kamil) to verify behavior
assert.Equal(t, 10, endOffset)
endOffset, state = client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken},
patchTraceContent[15:], 15)
assert.Equal(t, UpdateRangeMismatch, state)
assert.Equal(t, 10, endOffset) // TODO: (kamil) to verify behavior
assert.Equal(t, 10, endOffset)
endOffset, state = client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken},
patchTraceContent[5:], 5)
......
......@@ -129,10 +129,11 @@ func (c *clientJobTrace) finalStatusUpdate() {
}
func (c *clientJobTrace) finish() {
c.buffer.Close()
c.buffer.Finish()
c.finished <- true
c.finalTraceUpdate()
c.finalStatusUpdate()
c.buffer.Close()
}
func (c *clientJobTrace) incrementalUpdate() common.UpdateState {
......@@ -147,23 +148,22 @@ func (c *clientJobTrace) incrementalUpdate() common.UpdateState {
func (c *clientJobTrace) anyTraceToSend() bool {
c.lock.RLock()
defer c.lock.RUnlock()
return len(c.buffer.Bytes()) != c.sentTrace
return c.buffer.Size() != c.sentTrace
}
func (c *clientJobTrace) sendPatch() common.UpdateState {
c.lock.RLock()
trace := c.buffer.Bytes()
content, err := c.buffer.Bytes(c.sentTrace, c.maxTracePatchSize)
sentTrace := c.sentTrace
c.lock.RUnlock()
if len(trace) == sentTrace {
return common.UpdateSucceeded
if err != nil {
return common.UpdateFailed
}
// we send at most `maxTracePatchSize` in single patch
content := trace[sentTrace:]
if len(content) > c.maxTracePatchSize {
content = content[:c.maxTracePatchSize]
if len(content) == 0 {
return common.UpdateSucceeded
}
sentOffset, state := c.client.PatchTrace(
......@@ -243,16 +243,21 @@ func (c *clientJobTrace) setupLogLimit() {
c.buffer.SetLimit(bytesLimit)
}
func newJobTrace(client common.Network, config common.RunnerConfig, jobCredentials *common.JobCredentials) *clientJobTrace {
func newJobTrace(client common.Network, config common.RunnerConfig, jobCredentials *common.JobCredentials) (*clientJobTrace, error) {
buffer, err := trace.New()
if err != nil {
return nil, err
}
return &clientJobTrace{
client: client,
config: config,
buffer: trace.New(),
buffer: buffer,
jobCredentials: jobCredentials,
id: jobCredentials.ID,
maxTracePatchSize: common.DefaultTracePatchLimit,
updateInterval: common.UpdateInterval,
forceSendInterval: common.ForceTraceSentInterval,
finishRetryInterval: common.UpdateRetryInterval,
}
}, nil
}
......@@ -53,7 +53,9 @@ func TestIgnoreStatusChange(t *testing.T) {
mockNetwork.On("UpdateJob", jobConfig, jobCredentials, jobInfoMatcher).
Return(common.UpdateSucceeded).Once()
b := newJobTrace(mockNetwork, jobConfig, jobCredentials)
b, err := newJobTrace(mockNetwork, jobConfig, jobCredentials)
require.NoError(t, err)
b.start()
b.Success()
b.Fail(errors.New("test"), "script_failure")
......@@ -77,7 +79,9 @@ func TestJobAbort(t *testing.T) {
mockNetwork.On("UpdateJob", jobConfig, jobCredentials, updateMatcher).
Return(common.UpdateAbort).Once()
b := newJobTrace(mockNetwork, jobConfig, jobCredentials)
b, err := newJobTrace(mockNetwork, jobConfig, jobCredentials)
require.NoError(t, err)
b.updateInterval = 0
b.SetCancelFunc(cancel)
......@@ -92,7 +96,9 @@ func TestJobOutputLimit(t *testing.T) {
mockNetwork := new(common.MockNetwork)
defer mockNetwork.AssertExpectations(t)
b := newJobTrace(mockNetwork, jobOutputLimit, jobCredentials)
b, err := newJobTrace(mockNetwork, jobOutputLimit, jobCredentials)
require.NoError(t, err)
// prevent any UpdateJob before `b.Success()` call
b.updateInterval = 25 * time.Second
......@@ -100,9 +106,10 @@ func TestJobOutputLimit(t *testing.T) {
receivedTrace := bytes.NewBuffer([]byte{})
mockNetwork.On("PatchTrace", jobOutputLimit, jobCredentials, mock.Anything, mock.Anything).
Return(1077, common.UpdateSucceeded).
Return(1078, common.UpdateSucceeded).