Commit bc448528 authored by Tomasz Maczukin's avatar Tomasz Maczukin

Merge branch 'improve-support-for-big-traces' into 'master'

Optimise trace handling for big traces

See merge request gitlab-org/gitlab-runner!1292
parents 83323c12 df7efc0b
......@@ -13,7 +13,8 @@ const HealthyChecks = 3
const HealthCheckInterval = 3600
const DefaultWaitForServicesTimeout = 30
const ShutdownTimeout = 30
const DefaultOutputLimit = 4096 // 4MB in kilobytes
const DefaultOutputLimit = 4 * 1024 * 1024 // in bytes
const DefaultTracePatchLimit = 1024 * 1024 // in bytes
const ForceTraceSentInterval = 30 * time.Second
const PreparationRetries = 3
const DefaultGetSourcesAttempts = 1
......
// Code generated by mockery v1.0.0. DO NOT EDIT.
// This comment works around https://github.com/vektra/mockery/issues/155
package common
import mock "github.com/stretchr/testify/mock"
// MockJobTracePatch is an autogenerated mock type for the JobTracePatch type
type MockJobTracePatch struct {
mock.Mock
}
// Offset provides a mock function with given fields:
func (_m *MockJobTracePatch) Offset() int {
ret := _m.Called()
var r0 int
if rf, ok := ret.Get(0).(func() int); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int)
}
return r0
}
// Patch provides a mock function with given fields:
func (_m *MockJobTracePatch) Patch() []byte {
ret := _m.Called()
var r0 []byte
if rf, ok := ret.Get(0).(func() []byte); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]byte)
}
}
return r0
}
// SetNewOffset provides a mock function with given fields: newOffset
func (_m *MockJobTracePatch) SetNewOffset(newOffset int) {
_m.Called(newOffset)
}
// TotalSize provides a mock function with given fields:
func (_m *MockJobTracePatch) TotalSize() int {
ret := _m.Called()
var r0 int
if rf, ok := ret.Get(0).(func() int); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int)
}
return r0
}
// ValidateRange provides a mock function with given fields:
func (_m *MockJobTracePatch) ValidateRange() bool {
ret := _m.Called()
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
......@@ -27,17 +27,13 @@ func (_m *MockNetwork) DownloadArtifacts(config JobCredentials, artifactsFile st
}
// PatchTrace provides a mock function with given fields: config, jobCredentials, tracePart
func (_m *MockNetwork) PatchTrace(config RunnerConfig, jobCredentials *JobCredentials, tracePart JobTracePatch) UpdateState {
ret := _m.Called(config, jobCredentials, tracePart)
func (m *MockNetwork) PatchTrace(config RunnerConfig, jobCredentials *JobCredentials, content []byte, startOffset int) (int, UpdateState) {
ret := m.Called(config, jobCredentials, content, startOffset)
var r0 UpdateState
if rf, ok := ret.Get(0).(func(RunnerConfig, *JobCredentials, JobTracePatch) UpdateState); ok {
r0 = rf(config, jobCredentials, tracePart)
} else {
r0 = ret.Get(0).(UpdateState)
}
r0 := ret.Get(0).(int)
r1 := ret.Get(1).(UpdateState)
return r0
return r0, r1
}
// ProcessJob provides a mock function with given fields: config, buildCredentials
......
......@@ -304,7 +304,6 @@ type UpdateJobRequest struct {
Token string `json:"token,omitempty"`
State JobState `json:"state,omitempty"`
FailureReason JobFailureReason `json:"failure_reason,omitempty"`
Trace *string `json:"trace,omitempty"`
}
type JobCredentials struct {
......@@ -339,7 +338,6 @@ func (j *JobCredentials) GetToken() string {
type UpdateJobInfo struct {
ID int
State JobState
Trace *string
FailureReason JobFailureReason
}
......@@ -364,21 +362,13 @@ type JobTrace interface {
IsStdout() bool
}
type JobTracePatch interface {
Patch() []byte
Offset() int
TotalSize() int
SetNewOffset(newOffset int)
ValidateRange() bool
}
type Network interface {
RegisterRunner(config RunnerCredentials, parameters RegisterRunnerParameters) *RegisterRunnerResponse
VerifyRunner(config RunnerCredentials) bool
UnregisterRunner(config RunnerCredentials) bool
RequestJob(config RunnerConfig, sessionInfo *SessionInfo) (*JobResponse, bool)
UpdateJob(config RunnerConfig, jobCredentials *JobCredentials, jobInfo UpdateJobInfo) UpdateState
PatchTrace(config RunnerConfig, jobCredentials *JobCredentials, tracePart JobTracePatch) UpdateState
PatchTrace(config RunnerConfig, jobCredentials *JobCredentials, content []byte, startOffset int) (int, UpdateState)
DownloadArtifacts(config JobCredentials, artifactsFile string) DownloadState
UploadRawArtifacts(config JobCredentials, reader io.Reader, options ArtifactsOptions) UploadState
ProcessJob(config RunnerConfig, buildCredentials *JobCredentials) JobTrace
......
......@@ -303,7 +303,6 @@ func (n *GitLabClient) UpdateJob(config common.RunnerConfig, jobCredentials *com
Token: jobCredentials.Token,
State: jobInfo.State,
FailureReason: jobInfo.FailureReason,
Trace: jobInfo.Trace,
}
result, statusText, _, response := n.doJSON(&config.RunnerCredentials, "PUT", fmt.Sprintf("jobs/%d", jobInfo.ID), http.StatusOK, &request, nil)
......@@ -338,28 +337,29 @@ func (n *GitLabClient) UpdateJob(config common.RunnerConfig, jobCredentials *com
}
}
func (n *GitLabClient) PatchTrace(config common.RunnerConfig, jobCredentials *common.JobCredentials, tracePatch common.JobTracePatch) common.UpdateState {
func (n *GitLabClient) PatchTrace(config common.RunnerConfig, jobCredentials *common.JobCredentials, content []byte, startOffset int) (int, common.UpdateState) {
id := jobCredentials.ID
baseLog := config.Log().WithField("job", id)
if tracePatch.Offset() == tracePatch.TotalSize() {
baseLog.Warningln("Appending trace to coordinator...", "skipped due to empty patch")
return common.UpdateFailed
if len(content) == 0 {
baseLog.Debugln("Appending trace to coordinator...", "skipped due to empty patch")
return startOffset, common.UpdateSucceeded
}
contentRange := fmt.Sprintf("%d-%d", tracePatch.Offset(), tracePatch.TotalSize()-1)
endOffset := startOffset + len(content)
contentRange := fmt.Sprintf("%d-%d", startOffset, endOffset-1)
headers := make(http.Header)
headers.Set("Content-Range", contentRange)
headers.Set("JOB-TOKEN", jobCredentials.Token)
uri := fmt.Sprintf("jobs/%d/trace", id)
request := bytes.NewReader(tracePatch.Patch())
request := bytes.NewReader(content)
response, err := n.doRaw(&config.RunnerCredentials, "PATCH", uri, request, "text/plain", headers)
if err != nil {
config.Log().Errorln("Appending trace to coordinator...", "error", err.Error())
return common.UpdateFailed
return startOffset, common.UpdateFailed
}
n.requestsStatusesMap.Append(config.RunnerCredentials.ShortDescription(), APIEndpointPatchTrace, response.StatusCode)
......@@ -379,23 +379,22 @@ func (n *GitLabClient) PatchTrace(config common.RunnerConfig, jobCredentials *co
switch {
case tracePatchResponse.IsAborted():
log.Warningln("Appending trace to coordinator...", "aborted")
return common.UpdateAbort
return startOffset, common.UpdateAbort
case response.StatusCode == http.StatusAccepted:
log.Debugln("Appending trace to coordinator...", "ok")
return common.UpdateSucceeded
return endOffset, common.UpdateSucceeded
case response.StatusCode == http.StatusNotFound:
log.Warningln("Appending trace to coordinator...", "not-found")
return common.UpdateNotFound
return startOffset, common.UpdateNotFound
case response.StatusCode == http.StatusRequestedRangeNotSatisfiable:
log.Warningln("Appending trace to coordinator...", "range mismatch")
tracePatch.SetNewOffset(tracePatchResponse.NewOffset())
return common.UpdateRangeMismatch
return tracePatchResponse.NewOffset(), common.UpdateRangeMismatch
case response.StatusCode == clientError:
log.Errorln("Appending trace to coordinator...", "error")
return common.UpdateAbort
return startOffset, common.UpdateAbort
default:
log.Warningln("Appending trace to coordinator...", "failed")
return common.UpdateFailed
return startOffset, common.UpdateFailed
}
}
......
This diff is collapsed.
......@@ -14,9 +14,12 @@ type TracePatchResponse struct {
func (p *TracePatchResponse) NewOffset() int {
remoteRangeParts := strings.Split(p.RemoteRange, "-")
newOffset, _ := strconv.Atoi(remoteRangeParts[1])
if len(remoteRangeParts) == 2 {
newOffset, _ := strconv.Atoi(remoteRangeParts[1])
return newOffset
}
return newOffset
return 0
}
func NewTracePatchResponse(response *http.Response) *TracePatchResponse {
......
......@@ -30,6 +30,7 @@ type clientJobTrace struct {
updateInterval time.Duration
forceSendInterval time.Duration
finishRetryInterval time.Duration
maxTracePatchSize int
failuresCollector common.FailuresCollector
}
......@@ -92,118 +93,101 @@ func (c *clientJobTrace) start() {
go c.watch()
}
func (c *clientJobTrace) finish() {
c.buffer.Close()
c.finished <- true
// Do final upload of job trace
for {
if c.fullUpdate() != common.UpdateFailed {
func (c *clientJobTrace) finalTraceUpdate() {
for c.anyTraceToSend() {
switch c.sendPatch() {
case common.UpdateSucceeded:
// we continue sending till we succeed
continue
case common.UpdateAbort:
return
case common.UpdateNotFound:
return
case common.UpdateRangeMismatch:
time.Sleep(c.finishRetryInterval)
case common.UpdateFailed:
time.Sleep(c.finishRetryInterval)
}
time.Sleep(c.finishRetryInterval)
}
}
func (c *clientJobTrace) incrementalUpdate() common.UpdateState {
c.lock.RLock()
state := c.state
trace := c.buffer.Bytes()
c.lock.RUnlock()
if c.sentTrace != len(trace) {
result := c.sendPatch(trace)
if result != common.UpdateSucceeded {
return result
}
}
if c.sentState != state || time.Since(c.sentTime) > c.forceSendInterval {
if state == common.Running { // we should only follow-up with Running!
result := c.sendUpdate(state)
if result != common.UpdateSucceeded {
return result
}
func (c *clientJobTrace) finalStatusUpdate() {
for {
switch c.sendUpdate(true) {
case common.UpdateSucceeded:
return
case common.UpdateAbort:
return
case common.UpdateNotFound:
return
case common.UpdateRangeMismatch:
return
case common.UpdateFailed:
time.Sleep(c.finishRetryInterval)
}
}
return common.UpdateSucceeded
}
func (c *clientJobTrace) sendPatch(trace []byte) common.UpdateState {
tracePatch, err := newTracePatch(trace, c.sentTrace)
if err != nil {
c.config.Log().Errorln("Error while creating a tracePatch", err.Error())
}
update := c.client.PatchTrace(c.config, c.jobCredentials, tracePatch)
if update == common.UpdateNotFound {
return update
}
if update == common.UpdateRangeMismatch {
update = c.resendPatch(c.jobCredentials.ID, c.config, c.jobCredentials, tracePatch)
}
func (c *clientJobTrace) finish() {
c.buffer.Close()
c.finished <- true
c.finalTraceUpdate()
c.finalStatusUpdate()
}
if update == common.UpdateSucceeded {
c.sentTrace = tracePatch.totalSize
c.sentTime = time.Now()
func (c *clientJobTrace) incrementalUpdate() common.UpdateState {
state := c.sendPatch()
if state != common.UpdateSucceeded {
return state
}
return update
return c.sendUpdate(false)
}
func (c *clientJobTrace) resendPatch(id int, config common.RunnerConfig, jobCredentials *common.JobCredentials, tracePatch common.JobTracePatch) (update common.UpdateState) {
if !tracePatch.ValidateRange() {
config.Log().Warningln(id, "Full job update is needed")
fullTrace := string(c.buffer.Bytes())
func (c *clientJobTrace) anyTraceToSend() bool {
c.lock.RLock()
defer c.lock.RUnlock()
return len(c.buffer.Bytes()) != c.sentTrace
}
jobInfo := common.UpdateJobInfo{
ID: c.id,
State: c.state,
Trace: &fullTrace,
FailureReason: c.failureReason,
}
func (c *clientJobTrace) sendPatch() common.UpdateState {
c.lock.RLock()
trace := c.buffer.Bytes()
sentTrace := c.sentTrace
c.lock.RUnlock()
return c.client.UpdateJob(c.config, jobCredentials, jobInfo)
if len(trace) == sentTrace {
return common.UpdateSucceeded
}
config.Log().Warningln(id, "Resending trace patch due to range mismatch")
update = c.client.PatchTrace(config, jobCredentials, tracePatch)
if update == common.UpdateRangeMismatch {
config.Log().Errorln(id, "Appending trace to coordinator...", "failed due to range mismatch")
return common.UpdateFailed
// we send at most `maxTracePatchSize` in single patch
content := trace[sentTrace:]
if len(content) > c.maxTracePatchSize {
content = content[:c.maxTracePatchSize]
}
return
}
func (c *clientJobTrace) sendUpdate(state common.JobState) common.UpdateState {
jobInfo := common.UpdateJobInfo{
ID: c.id,
State: state,
FailureReason: c.failureReason,
}
sentOffset, state := c.client.PatchTrace(
c.config, c.jobCredentials, content, sentTrace)
status := c.client.UpdateJob(c.config, c.jobCredentials, jobInfo)
if status == common.UpdateSucceeded {
c.sentState = state
if state == common.UpdateSucceeded || state == common.UpdateRangeMismatch {
c.lock.Lock()
c.sentTime = time.Now()
c.sentTrace = sentOffset
c.lock.Unlock()
}
return status
return state
}
func (c *clientJobTrace) fullUpdate() common.UpdateState {
func (c *clientJobTrace) sendUpdate(force bool) common.UpdateState {
c.lock.RLock()
state := c.state
trace := c.buffer.Bytes()
shouldUpdateState := c.state != c.sentState
shouldRefresh := time.Since(c.sentTime) > c.forceSendInterval
c.lock.RUnlock()
if c.sentTrace != len(trace) {
c.sendPatch(trace) // we don't care about sendPatch() result, in the worst case we will re-send the trace
if !force && !shouldUpdateState && !shouldRefresh {
return common.UpdateSucceeded
}
jobInfo := common.UpdateJobInfo{
......@@ -212,19 +196,16 @@ func (c *clientJobTrace) fullUpdate() common.UpdateState {
FailureReason: c.failureReason,
}
if c.sentTrace != len(trace) {
traceString := string(trace)
jobInfo.Trace = &traceString
}
status := c.client.UpdateJob(c.config, c.jobCredentials, jobInfo)
update := c.client.UpdateJob(c.config, c.jobCredentials, jobInfo)
if update == common.UpdateSucceeded {
c.sentTrace = len(trace)
c.sentState = state
if status == common.UpdateSucceeded {
c.lock.Lock()
c.sentTime = time.Now()
c.sentState = state
c.lock.Unlock()
}
return update
return status
}
func (c *clientJobTrace) abort() bool {
......@@ -254,12 +235,10 @@ func (c *clientJobTrace) watch() {
}
func (c *clientJobTrace) setupLogLimit() {
bytesLimit := c.config.OutputLimit
bytesLimit := c.config.OutputLimit * 1024 // convert to bytes
if bytesLimit == 0 {
bytesLimit = common.DefaultOutputLimit
}
// configuration values are expressed in KB
bytesLimit *= 1024
c.buffer.SetLimit(bytesLimit)
}
......@@ -271,6 +250,7 @@ func newJobTrace(client common.Network, config common.RunnerConfig, jobCredentia
buffer: trace.New(),
jobCredentials: jobCredentials,
id: jobCredentials.ID,
maxTracePatchSize: common.DefaultTracePatchLimit,
updateInterval: common.UpdateInterval,
forceSendInterval: common.ForceTraceSentInterval,
finishRetryInterval: common.UpdateRetryInterval,
......
package network
import (
"errors"
)
type tracePatch struct {
trace []byte
offset int
totalSize int
}
func (tp *tracePatch) Patch() []byte {
return tp.trace[tp.offset:tp.totalSize]
}
func (tp *tracePatch) Offset() int {
return tp.offset
}
func (tp *tracePatch) TotalSize() int {
return tp.totalSize
}
func (tp *tracePatch) SetNewOffset(newOffset int) {
tp.offset = newOffset
}
func (tp *tracePatch) ValidateRange() bool {
if tp.totalSize >= tp.offset {
return true
}
return false
}
func newTracePatch(trace []byte, offset int) (*tracePatch, error) {
patch := &tracePatch{
trace: trace,
offset: offset,
totalSize: len(trace),
}
if !patch.ValidateRange() {
return nil, errors.New("Range is invalid, limit can't be less than offset")
}
return patch, nil
}
package network
import (
"testing"
"github.com/stretchr/testify/assert"
)
var traceContent = "test content"
func TestNewTracePatch(t *testing.T) {
tp, err := newTracePatch([]byte(traceContent), 0)
assert.NoError(t, err)
assert.Equal(t, 0, tp.Offset())
assert.Equal(t, len(traceContent), tp.TotalSize())
assert.Equal(t, []byte(traceContent), tp.Patch())
}
func TestInvalidTracePatchInitialOffsetValue(t *testing.T) {
_, err := newTracePatch([]byte(traceContent), len(traceContent)+10)
assert.EqualError(t, err, "Range is invalid, limit can't be less than offset")
}
func TestTracePatch_PatchAfterSetNewOffset(t *testing.T) {
tp, err := newTracePatch([]byte(traceContent), 0)
assert.NoError(t, err)
tp.SetNewOffset(5)
assert.Equal(t, []byte("content"), tp.Patch())
}
func TestTracePatchEmptyPatch(t *testing.T) {
tp, err := newTracePatch([]byte(traceContent), len(traceContent))
assert.NoError(t, err)
assert.Empty(t, tp.Patch())
}
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment