Skip to content
Snippets Groups Projects
Commit df7efc0b authored by Kamil Trzciński's avatar Kamil Trzciński :speech_balloon:
Browse files

Optimise trace handling

This makes all traces to be send in chunks
and to only use incremental sending endpoint.

This makes GitLab Runner to efficiently append
traces in small chunks, and to not resend unless
explicitly requested.
parent 83323c12
No related branches found
No related tags found
1 merge request!1292Optimise trace handling for big traces
Pipeline #57137447 passed with warnings
......@@ -13,7 +13,8 @@ const HealthyChecks = 3
const HealthCheckInterval = 3600
const DefaultWaitForServicesTimeout = 30
const ShutdownTimeout = 30
const DefaultOutputLimit = 4096 // 4MB in kilobytes
const DefaultOutputLimit = 4 * 1024 * 1024 // in bytes
const DefaultTracePatchLimit = 1024 * 1024 // in bytes
const ForceTraceSentInterval = 30 * time.Second
const PreparationRetries = 3
const DefaultGetSourcesAttempts = 1
......
// Code generated by mockery v1.0.0. DO NOT EDIT.
// This comment works around https://github.com/vektra/mockery/issues/155
package common
import mock "github.com/stretchr/testify/mock"
// MockJobTracePatch is an autogenerated mock type for the JobTracePatch type
type MockJobTracePatch struct {
mock.Mock
}
// Offset provides a mock function with given fields:
func (_m *MockJobTracePatch) Offset() int {
ret := _m.Called()
var r0 int
if rf, ok := ret.Get(0).(func() int); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int)
}
return r0
}
// Patch provides a mock function with given fields:
func (_m *MockJobTracePatch) Patch() []byte {
ret := _m.Called()
var r0 []byte
if rf, ok := ret.Get(0).(func() []byte); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]byte)
}
}
return r0
}
// SetNewOffset provides a mock function with given fields: newOffset
func (_m *MockJobTracePatch) SetNewOffset(newOffset int) {
_m.Called(newOffset)
}
// TotalSize provides a mock function with given fields:
func (_m *MockJobTracePatch) TotalSize() int {
ret := _m.Called()
var r0 int
if rf, ok := ret.Get(0).(func() int); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int)
}
return r0
}
// ValidateRange provides a mock function with given fields:
func (_m *MockJobTracePatch) ValidateRange() bool {
ret := _m.Called()
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
......@@ -27,17 +27,13 @@ func (_m *MockNetwork) DownloadArtifacts(config JobCredentials, artifactsFile st
}
// PatchTrace provides a mock function with given fields: config, jobCredentials, tracePart
func (_m *MockNetwork) PatchTrace(config RunnerConfig, jobCredentials *JobCredentials, tracePart JobTracePatch) UpdateState {
ret := _m.Called(config, jobCredentials, tracePart)
func (m *MockNetwork) PatchTrace(config RunnerConfig, jobCredentials *JobCredentials, content []byte, startOffset int) (int, UpdateState) {
ret := m.Called(config, jobCredentials, content, startOffset)
var r0 UpdateState
if rf, ok := ret.Get(0).(func(RunnerConfig, *JobCredentials, JobTracePatch) UpdateState); ok {
r0 = rf(config, jobCredentials, tracePart)
} else {
r0 = ret.Get(0).(UpdateState)
}
r0 := ret.Get(0).(int)
r1 := ret.Get(1).(UpdateState)
return r0
return r0, r1
}
// ProcessJob provides a mock function with given fields: config, buildCredentials
......
......@@ -304,7 +304,6 @@ type UpdateJobRequest struct {
Token string `json:"token,omitempty"`
State JobState `json:"state,omitempty"`
FailureReason JobFailureReason `json:"failure_reason,omitempty"`
Trace *string `json:"trace,omitempty"`
}
type JobCredentials struct {
......@@ -339,7 +338,6 @@ func (j *JobCredentials) GetToken() string {
type UpdateJobInfo struct {
ID int
State JobState
Trace *string
FailureReason JobFailureReason
}
......@@ -364,21 +362,13 @@ type JobTrace interface {
IsStdout() bool
}
type JobTracePatch interface {
Patch() []byte
Offset() int
TotalSize() int
SetNewOffset(newOffset int)
ValidateRange() bool
}
type Network interface {
RegisterRunner(config RunnerCredentials, parameters RegisterRunnerParameters) *RegisterRunnerResponse
VerifyRunner(config RunnerCredentials) bool
UnregisterRunner(config RunnerCredentials) bool
RequestJob(config RunnerConfig, sessionInfo *SessionInfo) (*JobResponse, bool)
UpdateJob(config RunnerConfig, jobCredentials *JobCredentials, jobInfo UpdateJobInfo) UpdateState
PatchTrace(config RunnerConfig, jobCredentials *JobCredentials, tracePart JobTracePatch) UpdateState
PatchTrace(config RunnerConfig, jobCredentials *JobCredentials, content []byte, startOffset int) (int, UpdateState)
DownloadArtifacts(config JobCredentials, artifactsFile string) DownloadState
UploadRawArtifacts(config JobCredentials, reader io.Reader, options ArtifactsOptions) UploadState
ProcessJob(config RunnerConfig, buildCredentials *JobCredentials) JobTrace
......
......@@ -303,7 +303,6 @@ func (n *GitLabClient) UpdateJob(config common.RunnerConfig, jobCredentials *com
Token: jobCredentials.Token,
State: jobInfo.State,
FailureReason: jobInfo.FailureReason,
Trace: jobInfo.Trace,
}
result, statusText, _, response := n.doJSON(&config.RunnerCredentials, "PUT", fmt.Sprintf("jobs/%d", jobInfo.ID), http.StatusOK, &request, nil)
......@@ -338,28 +337,29 @@ func (n *GitLabClient) UpdateJob(config common.RunnerConfig, jobCredentials *com
}
}
func (n *GitLabClient) PatchTrace(config common.RunnerConfig, jobCredentials *common.JobCredentials, tracePatch common.JobTracePatch) common.UpdateState {
func (n *GitLabClient) PatchTrace(config common.RunnerConfig, jobCredentials *common.JobCredentials, content []byte, startOffset int) (int, common.UpdateState) {
id := jobCredentials.ID
baseLog := config.Log().WithField("job", id)
if tracePatch.Offset() == tracePatch.TotalSize() {
baseLog.Warningln("Appending trace to coordinator...", "skipped due to empty patch")
return common.UpdateFailed
if len(content) == 0 {
baseLog.Debugln("Appending trace to coordinator...", "skipped due to empty patch")
return startOffset, common.UpdateSucceeded
}
contentRange := fmt.Sprintf("%d-%d", tracePatch.Offset(), tracePatch.TotalSize()-1)
endOffset := startOffset + len(content)
contentRange := fmt.Sprintf("%d-%d", startOffset, endOffset-1)
headers := make(http.Header)
headers.Set("Content-Range", contentRange)
headers.Set("JOB-TOKEN", jobCredentials.Token)
uri := fmt.Sprintf("jobs/%d/trace", id)
request := bytes.NewReader(tracePatch.Patch())
request := bytes.NewReader(content)
response, err := n.doRaw(&config.RunnerCredentials, "PATCH", uri, request, "text/plain", headers)
if err != nil {
config.Log().Errorln("Appending trace to coordinator...", "error", err.Error())
return common.UpdateFailed
return startOffset, common.UpdateFailed
}
n.requestsStatusesMap.Append(config.RunnerCredentials.ShortDescription(), APIEndpointPatchTrace, response.StatusCode)
......@@ -379,23 +379,22 @@ func (n *GitLabClient) PatchTrace(config common.RunnerConfig, jobCredentials *co
switch {
case tracePatchResponse.IsAborted():
log.Warningln("Appending trace to coordinator...", "aborted")
return common.UpdateAbort
return startOffset, common.UpdateAbort
case response.StatusCode == http.StatusAccepted:
log.Debugln("Appending trace to coordinator...", "ok")
return common.UpdateSucceeded
return endOffset, common.UpdateSucceeded
case response.StatusCode == http.StatusNotFound:
log.Warningln("Appending trace to coordinator...", "not-found")
return common.UpdateNotFound
return startOffset, common.UpdateNotFound
case response.StatusCode == http.StatusRequestedRangeNotSatisfiable:
log.Warningln("Appending trace to coordinator...", "range mismatch")
tracePatch.SetNewOffset(tracePatchResponse.NewOffset())
return common.UpdateRangeMismatch
return tracePatchResponse.NewOffset(), common.UpdateRangeMismatch
case response.StatusCode == clientError:
log.Errorln("Appending trace to coordinator...", "error")
return common.UpdateAbort
return startOffset, common.UpdateAbort
default:
log.Warningln("Appending trace to coordinator...", "failed")
return common.UpdateFailed
return startOffset, common.UpdateFailed
}
}
......
......@@ -543,7 +543,6 @@ func testUpdateJobHandler(w http.ResponseWriter, r *http.Request, t *testing.T)
assert.NoError(t, err)
assert.Equal(t, "token", req["token"])
assert.Equal(t, "trace", req["trace"])
setStateForUpdateJobHandlerResponse(w, req)
}
......@@ -566,33 +565,32 @@ func TestUpdateJob(t *testing.T) {
Token: "token",
}
trace := "trace"
c := NewGitLabClient()
var state UpdateState
state = c.UpdateJob(config, jobCredentials, UpdateJobInfo{ID: 10, State: "running", Trace: &trace, FailureReason: ""})
state = c.UpdateJob(config, jobCredentials, UpdateJobInfo{ID: 10, State: "running", FailureReason: ""})
assert.Equal(t, UpdateSucceeded, state, "Update should continue when running")
state = c.UpdateJob(config, jobCredentials, UpdateJobInfo{ID: 10, State: "forbidden", Trace: &trace, FailureReason: ""})
state = c.UpdateJob(config, jobCredentials, UpdateJobInfo{ID: 10, State: "forbidden", FailureReason: ""})
assert.Equal(t, UpdateAbort, state, "Update should be aborted if the state is forbidden")
state = c.UpdateJob(config, jobCredentials, UpdateJobInfo{ID: 10, State: "other", Trace: &trace, FailureReason: ""})
state = c.UpdateJob(config, jobCredentials, UpdateJobInfo{ID: 10, State: "other", FailureReason: ""})
assert.Equal(t, UpdateFailed, state, "Update should fail for badly formatted request")
state = c.UpdateJob(config, jobCredentials, UpdateJobInfo{ID: 4, State: "state", Trace: &trace, FailureReason: ""})
state = c.UpdateJob(config, jobCredentials, UpdateJobInfo{ID: 4, State: "state", FailureReason: ""})
assert.Equal(t, UpdateAbort, state, "Update should abort for unknown job")
state = c.UpdateJob(brokenConfig, jobCredentials, UpdateJobInfo{ID: 4, State: "state", Trace: &trace, FailureReason: ""})
state = c.UpdateJob(brokenConfig, jobCredentials, UpdateJobInfo{ID: 4, State: "state", FailureReason: ""})
assert.Equal(t, UpdateAbort, state)
state = c.UpdateJob(config, jobCredentials, UpdateJobInfo{ID: 10, State: "failed", Trace: &trace, FailureReason: "script_failure"})
state = c.UpdateJob(config, jobCredentials, UpdateJobInfo{ID: 10, State: "failed", FailureReason: "script_failure"})
assert.Equal(t, UpdateSucceeded, state, "Update should continue when running")
state = c.UpdateJob(config, jobCredentials, UpdateJobInfo{ID: 10, State: "failed", Trace: &trace, FailureReason: "unknown_failure_reason"})
state = c.UpdateJob(config, jobCredentials, UpdateJobInfo{ID: 10, State: "failed", FailureReason: "unknown_failure_reason"})
assert.Equal(t, UpdateFailed, state, "Update should fail for badly formatted request")
state = c.UpdateJob(config, jobCredentials, UpdateJobInfo{ID: 10, State: "failed", Trace: &trace, FailureReason: ""})
state = c.UpdateJob(config, jobCredentials, UpdateJobInfo{ID: 10, State: "failed", FailureReason: ""})
assert.Equal(t, UpdateFailed, state, "Update should fail for badly formatted request")
}
......@@ -658,9 +656,9 @@ func TestUpdateJobAsKeepAlive(t *testing.T) {
}
var patchToken = "token"
var patchTraceString = "trace trace trace"
var patchTraceContent = []byte("trace trace trace")
func getPatchServer(t *testing.T, handler func(w http.ResponseWriter, r *http.Request, body string, offset, limit int)) (*httptest.Server, *GitLabClient, RunnerConfig) {
func getPatchServer(t *testing.T, handler func(w http.ResponseWriter, r *http.Request, body []byte, offset, limit int)) (*httptest.Server, *GitLabClient, RunnerConfig) {
patchHandler := func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/v4/jobs/1/trace" {
w.WriteHeader(http.StatusNotFound)
......@@ -686,7 +684,7 @@ func getPatchServer(t *testing.T, handler func(w http.ResponseWriter, r *http.Re
limit, err := strconv.Atoi(ranges[1])
assert.NoError(t, err)
handler(w, r, string(body), offset, limit)
handler(w, r, body, offset, limit)
}
server := httptest.NewServer(http.HandlerFunc(patchHandler))
......@@ -700,61 +698,59 @@ func getPatchServer(t *testing.T, handler func(w http.ResponseWriter, r *http.Re
return server, NewGitLabClient(), config
}
func getTracePatch(traceString string, offset int) *tracePatch {
tracePatch, _ := newTracePatch([]byte(traceString), offset)
return tracePatch
}
func TestUnknownPatchTrace(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) {
handler := func(w http.ResponseWriter, r *http.Request, body []byte, offset, limit int) {
w.WriteHeader(http.StatusNotFound)
}
server, client, config := getPatchServer(t, handler)
defer server.Close()
tracePatch := getTracePatch(patchTraceString, 0)
state := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
_, state := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken},
patchTraceContent, 0)
assert.Equal(t, UpdateNotFound, state)
}
func TestForbiddenPatchTrace(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) {
handler := func(w http.ResponseWriter, r *http.Request, body []byte, offset, limit int) {
w.WriteHeader(http.StatusForbidden)
}
server, client, config := getPatchServer(t, handler)
defer server.Close()
tracePatch := getTracePatch(patchTraceString, 0)
state := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
_, state := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken},
patchTraceContent, 0)
assert.Equal(t, UpdateAbort, state)
}
func TestPatchTrace(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) {
assert.Equal(t, patchTraceString[offset:limit+1], body)
handler := func(w http.ResponseWriter, r *http.Request, body []byte, offset, limit int) {
assert.Equal(t, patchTraceContent[offset:limit+1], body)
w.WriteHeader(http.StatusAccepted)
}
server, client, config := getPatchServer(t, handler)
defer server.Close()
tracePatch := getTracePatch(patchTraceString, 0)
state := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
endOffset, state := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken},
patchTraceContent, 0)
assert.Equal(t, UpdateSucceeded, state)
assert.Equal(t, len(patchTraceContent), endOffset)
tracePatch = getTracePatch(patchTraceString, 3)
state = client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
endOffset, state = client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken},
patchTraceContent[3:], 3)
assert.Equal(t, UpdateSucceeded, state)
assert.Equal(t, len(patchTraceContent), endOffset)
tracePatch = getTracePatch(patchTraceString[:10], 3)
state = client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
endOffset, state = client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken},
patchTraceContent[3:10], 3)
assert.Equal(t, UpdateSucceeded, state)
assert.Equal(t, 10, endOffset)
}
func TestRangeMismatchPatchTrace(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) {
handler := func(w http.ResponseWriter, r *http.Request, body []byte, offset, limit int) {
if offset > 10 {
w.Header().Set("Range", "0-10")
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
......@@ -766,66 +762,24 @@ func TestRangeMismatchPatchTrace(t *testing.T) {
server, client, config := getPatchServer(t, handler)
defer server.Close()
tracePatch := getTracePatch(patchTraceString, 11)
state := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
endOffset, state := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken},
patchTraceContent[11:], 11)
assert.Equal(t, UpdateRangeMismatch, state)
assert.Equal(t, 10, endOffset) // TODO: (kamil) to verify behavior
tracePatch = getTracePatch(patchTraceString, 15)
state = client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
assert.Equal(t, UpdateRangeMismatch, state)
tracePatch = getTracePatch(patchTraceString, 5)
state = client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
assert.Equal(t, UpdateSucceeded, state)
}
func TestResendPatchTrace(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) {
if offset > 10 {
w.Header().Set("Range", "0-10")
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
}
w.WriteHeader(http.StatusAccepted)
}
server, client, config := getPatchServer(t, handler)
defer server.Close()
tracePatch := getTracePatch(patchTraceString, 11)
state := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
endOffset, state = client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken},
patchTraceContent[15:], 15)
assert.Equal(t, UpdateRangeMismatch, state)
assert.Equal(t, 10, endOffset) // TODO: (kamil) to verify behavior
state = client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
endOffset, state = client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken},
patchTraceContent[5:], 5)
assert.Equal(t, UpdateSucceeded, state)
}
// We've had a situation where the same job was triggered second time by GItLab. In GitLab the job trace
// was 17041 bytes long while the repeated job trace was only 66 bytes long. We've had a `RangeMismatch`
// response, so the offset was updated (to 17041) and `client.PatchTrace` was repeated, at it was planned.
// Unfortunately the `tracePatch` struct was not resistant to a situation when the offset is set to a
// value bigger than trace's length. This test simulates such situation.
func TestResendDoubledJobPatchTrace(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) {
if offset > 10 {
w.Header().Set("Range", "0-100")
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
}
w.WriteHeader(http.StatusAccepted)
}
server, client, config := getPatchServer(t, handler)
defer server.Close()
tracePatch := getTracePatch(patchTraceString, 11)
state := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
assert.Equal(t, UpdateRangeMismatch, state)
assert.False(t, tracePatch.ValidateRange())
assert.Equal(t, len(patchTraceContent), endOffset)
}
func TestJobFailedStatePatchTrace(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) {
handler := func(w http.ResponseWriter, r *http.Request, body []byte, offset, limit int) {
w.Header().Set("Job-Status", "failed")
w.WriteHeader(http.StatusAccepted)
}
......@@ -833,49 +787,67 @@ func TestJobFailedStatePatchTrace(t *testing.T) {
server, client, config := getPatchServer(t, handler)
defer server.Close()
tracePatch := getTracePatch(patchTraceString, 0)
state := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
_, state := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken},
patchTraceContent, 0)
assert.Equal(t, UpdateAbort, state)
}
func TestPatchTraceCantConnect(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) {}
handler := func(w http.ResponseWriter, r *http.Request, body []byte, offset, limit int) {}
server, client, config := getPatchServer(t, handler)
server.Close()
tracePatch := getTracePatch(patchTraceString, 0)
state := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
_, state := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken},
patchTraceContent, 0)
assert.Equal(t, UpdateFailed, state)
}
func TestPatchTraceUpdatedTrace(t *testing.T) {
sentTrace := 0
traceString := ""
traceContent := []byte{}
updates := []struct {
traceUpdate string
traceUpdate []byte
expectedContentRange string
expectedContentLength int64
expectedResult UpdateState
shouldNotCallPatchTrace bool
}{
{traceUpdate: "test", expectedContentRange: "0-3", expectedContentLength: 4, expectedResult: UpdateSucceeded},
{traceUpdate: "", expectedResult: UpdateFailed, shouldNotCallPatchTrace: true},
{traceUpdate: " ", expectedContentRange: "4-4", expectedContentLength: 1, expectedResult: UpdateSucceeded},
{traceUpdate: "test", expectedContentRange: "5-8", expectedContentLength: 4, expectedResult: UpdateSucceeded},
{
traceUpdate: []byte("test"),
expectedContentRange: "0-3",
expectedContentLength: 4,
expectedResult: UpdateSucceeded,
},
{
traceUpdate: []byte{},
expectedContentLength: 4,
expectedResult: UpdateSucceeded,
shouldNotCallPatchTrace: true,
},
{
traceUpdate: []byte(" "),
expectedContentRange: "4-4", expectedContentLength: 1,
expectedResult: UpdateSucceeded,
},
{
traceUpdate: []byte("test"),
expectedContentRange: "5-8", expectedContentLength: 4,
expectedResult: UpdateSucceeded,
},
}
for id, update := range updates {
t.Run(fmt.Sprintf("patch-%d", id+1), func(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) {
handler := func(w http.ResponseWriter, r *http.Request, body []byte, offset, limit int) {
if update.shouldNotCallPatchTrace {
t.Error("PatchTrace endpoint should not be called")
return
}
if limit+1 <= len(traceString) {
assert.Equal(t, traceString[offset:limit+1], body)
if limit+1 <= len(traceContent) {
assert.Equal(t, traceContent[offset:limit+1], body)
}
assert.Equal(t, update.traceUpdate, body)
......@@ -887,34 +859,50 @@ func TestPatchTraceUpdatedTrace(t *testing.T) {
server, client, config := getPatchServer(t, handler)
defer server.Close()
traceString += update.traceUpdate
tracePatch := getTracePatch(traceString, sentTrace)
result := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
traceContent = append(traceContent, update.traceUpdate...)
endOffset, result := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken},
traceContent[sentTrace:], sentTrace)
assert.Equal(t, update.expectedResult, result)
sentTrace = tracePatch.totalSize
sentTrace = endOffset
})
}
}
func TestPatchTraceContentRangeAndLength(t *testing.T) {
tests := []struct {
tests := map[string]struct {
name string
trace string
trace []byte
expectedContentRange string
expectedContentLength int64
expectedResult UpdateState
shouldNotCallPatchTrace bool
}{
{name: "0 bytes", trace: "", expectedResult: UpdateFailed, shouldNotCallPatchTrace: true},
{name: "1 byte", trace: "1", expectedContentRange: "0-0", expectedContentLength: 1, expectedResult: UpdateSucceeded, shouldNotCallPatchTrace: false},
{name: "2 bytes", trace: "12", expectedContentRange: "0-1", expectedContentLength: 2, expectedResult: UpdateSucceeded, shouldNotCallPatchTrace: false},
"0 bytes": {
trace: []byte{},
expectedResult: UpdateSucceeded,
shouldNotCallPatchTrace: true,
},
"1 byte": {
name: "1 byte",
trace: []byte("1"),
expectedContentRange: "0-0",
expectedContentLength: 1,
expectedResult: UpdateSucceeded,
shouldNotCallPatchTrace: false,
},
"2 bytes": {
trace: []byte("12"),
expectedContentRange: "0-1",
expectedContentLength: 2,
expectedResult: UpdateSucceeded,
shouldNotCallPatchTrace: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) {
for name, test := range tests {
t.Run(name, func(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request, body []byte, offset, limit int) {
if test.shouldNotCallPatchTrace {
t.Error("PatchTrace endpoint should not be called")
return
......@@ -928,15 +916,15 @@ func TestPatchTraceContentRangeAndLength(t *testing.T) {
server, client, config := getPatchServer(t, handler)
defer server.Close()
tracePatch := getTracePatch(test.trace, 0)
result := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
_, result := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken},
test.trace, 0)
assert.Equal(t, test.expectedResult, result)
})
}
}
func TestPatchTraceContentRangeHeaderValues(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) {
handler := func(w http.ResponseWriter, r *http.Request, body []byte, offset, limit int) {
contentRange := r.Header.Get("Content-Range")
bytes := strings.Split(contentRange, "-")
......@@ -947,7 +935,7 @@ func TestPatchTraceContentRangeHeaderValues(t *testing.T) {
require.NoError(t, err, "Should not set error when parsing Content-Range endByte component")
assert.Equal(t, 0, startByte, "Content-Range should contain start byte as first field")
assert.Equal(t, len(patchTraceString)-1, endByte, "Content-Range should contain end byte as second field")
assert.Equal(t, len(patchTraceContent)-1, endByte, "Content-Range should contain end byte as second field")
w.WriteHeader(http.StatusAccepted)
}
......@@ -955,8 +943,8 @@ func TestPatchTraceContentRangeHeaderValues(t *testing.T) {
server, client, config := getPatchServer(t, handler)
defer server.Close()
tracePatch := getTracePatch(patchTraceString, 0)
client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken},
patchTraceContent, 0)
}
func TestAbortedPatchTrace(t *testing.T) {
......@@ -964,7 +952,7 @@ func TestAbortedPatchTrace(t *testing.T) {
for _, status := range statuses {
t.Run(status, func(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) {
handler := func(w http.ResponseWriter, r *http.Request, body []byte, offset, limit int) {
w.Header().Set("Job-Status", status)
w.WriteHeader(http.StatusAccepted)
}
......@@ -972,8 +960,8 @@ func TestAbortedPatchTrace(t *testing.T) {
server, client, config := getPatchServer(t, handler)
defer server.Close()
tracePatch := getTracePatch(patchTraceString, 0)
state := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
_, state := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken},
patchTraceContent, 0)
assert.Equal(t, UpdateAbort, state)
})
}
......
......@@ -14,9 +14,12 @@ type TracePatchResponse struct {
func (p *TracePatchResponse) NewOffset() int {
remoteRangeParts := strings.Split(p.RemoteRange, "-")
newOffset, _ := strconv.Atoi(remoteRangeParts[1])
if len(remoteRangeParts) == 2 {
newOffset, _ := strconv.Atoi(remoteRangeParts[1])
return newOffset
}
return newOffset
return 0
}
func NewTracePatchResponse(response *http.Response) *TracePatchResponse {
......
......@@ -30,6 +30,7 @@ type clientJobTrace struct {
updateInterval time.Duration
forceSendInterval time.Duration
finishRetryInterval time.Duration
maxTracePatchSize int
failuresCollector common.FailuresCollector
}
......@@ -92,118 +93,101 @@ func (c *clientJobTrace) start() {
go c.watch()
}
func (c *clientJobTrace) finish() {
c.buffer.Close()
c.finished <- true
// Do final upload of job trace
for {
if c.fullUpdate() != common.UpdateFailed {
func (c *clientJobTrace) finalTraceUpdate() {
for c.anyTraceToSend() {
switch c.sendPatch() {
case common.UpdateSucceeded:
// we continue sending till we succeed
continue
case common.UpdateAbort:
return
case common.UpdateNotFound:
return
case common.UpdateRangeMismatch:
time.Sleep(c.finishRetryInterval)
case common.UpdateFailed:
time.Sleep(c.finishRetryInterval)
}
time.Sleep(c.finishRetryInterval)
}
}
func (c *clientJobTrace) incrementalUpdate() common.UpdateState {
c.lock.RLock()
state := c.state
trace := c.buffer.Bytes()
c.lock.RUnlock()
if c.sentTrace != len(trace) {
result := c.sendPatch(trace)
if result != common.UpdateSucceeded {
return result
}
}
if c.sentState != state || time.Since(c.sentTime) > c.forceSendInterval {
if state == common.Running { // we should only follow-up with Running!
result := c.sendUpdate(state)
if result != common.UpdateSucceeded {
return result
}
func (c *clientJobTrace) finalStatusUpdate() {
for {
switch c.sendUpdate(true) {
case common.UpdateSucceeded:
return
case common.UpdateAbort:
return
case common.UpdateNotFound:
return
case common.UpdateRangeMismatch:
return
case common.UpdateFailed:
time.Sleep(c.finishRetryInterval)
}
}
return common.UpdateSucceeded
}
func (c *clientJobTrace) sendPatch(trace []byte) common.UpdateState {
tracePatch, err := newTracePatch(trace, c.sentTrace)
if err != nil {
c.config.Log().Errorln("Error while creating a tracePatch", err.Error())
}
update := c.client.PatchTrace(c.config, c.jobCredentials, tracePatch)
if update == common.UpdateNotFound {
return update
}
if update == common.UpdateRangeMismatch {
update = c.resendPatch(c.jobCredentials.ID, c.config, c.jobCredentials, tracePatch)
}
func (c *clientJobTrace) finish() {
c.buffer.Close()
c.finished <- true
c.finalTraceUpdate()
c.finalStatusUpdate()
}
if update == common.UpdateSucceeded {
c.sentTrace = tracePatch.totalSize
c.sentTime = time.Now()
func (c *clientJobTrace) incrementalUpdate() common.UpdateState {
state := c.sendPatch()
if state != common.UpdateSucceeded {
return state
}
return update
return c.sendUpdate(false)
}
func (c *clientJobTrace) resendPatch(id int, config common.RunnerConfig, jobCredentials *common.JobCredentials, tracePatch common.JobTracePatch) (update common.UpdateState) {
if !tracePatch.ValidateRange() {
config.Log().Warningln(id, "Full job update is needed")
fullTrace := string(c.buffer.Bytes())
func (c *clientJobTrace) anyTraceToSend() bool {
c.lock.RLock()
defer c.lock.RUnlock()
return len(c.buffer.Bytes()) != c.sentTrace
}
jobInfo := common.UpdateJobInfo{
ID: c.id,
State: c.state,
Trace: &fullTrace,
FailureReason: c.failureReason,
}
func (c *clientJobTrace) sendPatch() common.UpdateState {
c.lock.RLock()
trace := c.buffer.Bytes()
sentTrace := c.sentTrace
c.lock.RUnlock()
return c.client.UpdateJob(c.config, jobCredentials, jobInfo)
if len(trace) == sentTrace {
return common.UpdateSucceeded
}
config.Log().Warningln(id, "Resending trace patch due to range mismatch")
update = c.client.PatchTrace(config, jobCredentials, tracePatch)
if update == common.UpdateRangeMismatch {
config.Log().Errorln(id, "Appending trace to coordinator...", "failed due to range mismatch")
return common.UpdateFailed
// we send at most `maxTracePatchSize` in single patch
content := trace[sentTrace:]
if len(content) > c.maxTracePatchSize {
content = content[:c.maxTracePatchSize]
}
return
}
func (c *clientJobTrace) sendUpdate(state common.JobState) common.UpdateState {
jobInfo := common.UpdateJobInfo{
ID: c.id,
State: state,
FailureReason: c.failureReason,
}
sentOffset, state := c.client.PatchTrace(
c.config, c.jobCredentials, content, sentTrace)
status := c.client.UpdateJob(c.config, c.jobCredentials, jobInfo)
if status == common.UpdateSucceeded {
c.sentState = state
if state == common.UpdateSucceeded || state == common.UpdateRangeMismatch {
c.lock.Lock()
c.sentTime = time.Now()
c.sentTrace = sentOffset
c.lock.Unlock()
}
return status
return state
}
func (c *clientJobTrace) fullUpdate() common.UpdateState {
func (c *clientJobTrace) sendUpdate(force bool) common.UpdateState {
c.lock.RLock()
state := c.state
trace := c.buffer.Bytes()
shouldUpdateState := c.state != c.sentState
shouldRefresh := time.Since(c.sentTime) > c.forceSendInterval
c.lock.RUnlock()
if c.sentTrace != len(trace) {
c.sendPatch(trace) // we don't care about sendPatch() result, in the worst case we will re-send the trace
if !force && !shouldUpdateState && !shouldRefresh {
return common.UpdateSucceeded
}
jobInfo := common.UpdateJobInfo{
......@@ -212,19 +196,16 @@ func (c *clientJobTrace) fullUpdate() common.UpdateState {
FailureReason: c.failureReason,
}
if c.sentTrace != len(trace) {
traceString := string(trace)
jobInfo.Trace = &traceString
}
status := c.client.UpdateJob(c.config, c.jobCredentials, jobInfo)
update := c.client.UpdateJob(c.config, c.jobCredentials, jobInfo)
if update == common.UpdateSucceeded {
c.sentTrace = len(trace)
c.sentState = state
if status == common.UpdateSucceeded {
c.lock.Lock()
c.sentTime = time.Now()
c.sentState = state
c.lock.Unlock()
}
return update
return status
}
func (c *clientJobTrace) abort() bool {
......@@ -254,12 +235,10 @@ func (c *clientJobTrace) watch() {
}
func (c *clientJobTrace) setupLogLimit() {
bytesLimit := c.config.OutputLimit
bytesLimit := c.config.OutputLimit * 1024 // convert to bytes
if bytesLimit == 0 {
bytesLimit = common.DefaultOutputLimit
}
// configuration values are expressed in KB
bytesLimit *= 1024
c.buffer.SetLimit(bytesLimit)
}
......@@ -271,6 +250,7 @@ func newJobTrace(client common.Network, config common.RunnerConfig, jobCredentia
buffer: trace.New(),
jobCredentials: jobCredentials,
id: jobCredentials.ID,
maxTracePatchSize: common.DefaultTracePatchLimit,
updateInterval: common.UpdateInterval,
forceSendInterval: common.ForceTraceSentInterval,
finishRetryInterval: common.UpdateRetryInterval,
......
package network
import (
"errors"
)
type tracePatch struct {
trace []byte
offset int
totalSize int
}
func (tp *tracePatch) Patch() []byte {
return tp.trace[tp.offset:tp.totalSize]
}
func (tp *tracePatch) Offset() int {
return tp.offset
}
func (tp *tracePatch) TotalSize() int {
return tp.totalSize
}
func (tp *tracePatch) SetNewOffset(newOffset int) {
tp.offset = newOffset
}
func (tp *tracePatch) ValidateRange() bool {
if tp.totalSize >= tp.offset {
return true
}
return false
}
func newTracePatch(trace []byte, offset int) (*tracePatch, error) {
patch := &tracePatch{
trace: trace,
offset: offset,
totalSize: len(trace),
}
if !patch.ValidateRange() {
return nil, errors.New("Range is invalid, limit can't be less than offset")
}
return patch, nil
}
package network
import (
"testing"
"github.com/stretchr/testify/assert"
)
var traceContent = "test content"
func TestNewTracePatch(t *testing.T) {
tp, err := newTracePatch([]byte(traceContent), 0)
assert.NoError(t, err)
assert.Equal(t, 0, tp.Offset())
assert.Equal(t, len(traceContent), tp.TotalSize())
assert.Equal(t, []byte(traceContent), tp.Patch())
}
func TestInvalidTracePatchInitialOffsetValue(t *testing.T) {
_, err := newTracePatch([]byte(traceContent), len(traceContent)+10)
assert.EqualError(t, err, "Range is invalid, limit can't be less than offset")
}
func TestTracePatch_PatchAfterSetNewOffset(t *testing.T) {
tp, err := newTracePatch([]byte(traceContent), 0)
assert.NoError(t, err)
tp.SetNewOffset(5)
assert.Equal(t, []byte("content"), tp.Patch())
}
func TestTracePatchEmptyPatch(t *testing.T) {
tp, err := newTracePatch([]byte(traceContent), len(traceContent))
assert.NoError(t, err)
assert.Empty(t, tp.Patch())
}
......@@ -9,7 +9,6 @@ import (
"testing"
"time"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
......@@ -38,141 +37,78 @@ func matchJobState(jobInfo common.UpdateJobInfo, id int, state common.JobState,
return true
}
func generateJobInfoMatcher(id int, state common.JobState, trace *string, failureReason common.JobFailureReason) interface{} {
func generateJobInfoMatcher(id int, state common.JobState, failureReason common.JobFailureReason) interface{} {
return mock.MatchedBy(func(jobInfo common.UpdateJobInfo) bool {
if jobInfo.Trace == nil && trace != nil {
return false
}
if jobInfo.Trace != nil && trace == nil {
return false
}
if jobInfo.Trace != nil && trace != nil && *jobInfo.Trace != *trace {
return false
}
return matchJobState(jobInfo, id, state, failureReason)
})
}
func generateJobInfoMatcherWithAnyTrace(id int, state common.JobState, failureReason common.JobFailureReason) interface{} {
return mock.MatchedBy(func(jobInfo common.UpdateJobInfo) bool {
return matchJobState(jobInfo, id, state, failureReason)
})
}
func TestJobTraceUpdateSucceeded(t *testing.T) {
traceMessage := "test content"
patchTraceMatcher := mock.MatchedBy(func(tracePatch common.JobTracePatch) bool {
return tracePatch.Offset() == 0 && string(tracePatch.Patch()) == traceMessage
})
tests := []struct {
name string
jobState common.JobState
}{
{name: "Success", jobState: common.Success},
{name: "Fail", jobState: common.Failed},
}
for idx, test := range tests {
t.Run(test.name, func(t *testing.T) {
var wg sync.WaitGroup
jobCredentials := &common.JobCredentials{ID: idx}
net := new(common.MockNetwork)
net.On("PatchTrace", jobConfig, jobCredentials, patchTraceMatcher).Return(common.UpdateSucceeded).Run(func(_ mock.Arguments) { wg.Done() })
var expectedFailureReason common.JobFailureReason
switch test.jobState {
case common.Success:
expectedFailureReason = common.NoneFailure
case common.Failed:
expectedFailureReason = common.ScriptFailure
}
updateMatcher := generateJobInfoMatcher(idx, test.jobState, nil, expectedFailureReason)
net.On("UpdateJob", jobConfig, jobCredentials, updateMatcher).Return(common.UpdateSucceeded)
b := newJobTrace(net, jobConfig, jobCredentials)
// speed up execution time
b.updateInterval = 10 * time.Millisecond
wg.Add(1)
b.start()
fmt.Fprint(b, traceMessage)
wg.Wait()
switch test.jobState {
case common.Success:
b.Success()
case common.Failed:
b.Fail(errors.New("test"), common.ScriptFailure)
}
net.AssertExpectations(t)
})
}
}
func TestIgnoreStatusChange(t *testing.T) {
jobInfoMatcher := generateJobInfoMatcherWithAnyTrace(jobCredentials.ID, common.Success, common.NoneFailure)
jobInfoMatcher := generateJobInfoMatcher(jobCredentials.ID, common.Success, common.NoneFailure)
net := new(common.MockNetwork)
net.On("UpdateJob", jobConfig, jobCredentials, jobInfoMatcher).Return(common.UpdateSucceeded)
mockNetwork := new(common.MockNetwork)
defer mockNetwork.AssertExpectations(t)
b := newJobTrace(net, jobConfig, jobCredentials)
// prevent any UpdateJob before `b.Success()` call
b.updateInterval = 25 * time.Second
// expect to receive just one status
mockNetwork.On("UpdateJob", jobConfig, jobCredentials, jobInfoMatcher).
Return(common.UpdateSucceeded).Once()
b := newJobTrace(mockNetwork, jobConfig, jobCredentials)
b.start()
b.Success()
b.Fail(errors.New("test"), "script_failure")
net.AssertExpectations(t)
}
func TestJobAbort(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
keepAliveUpdateMatcher := generateJobInfoMatcher(jobCredentials.ID, common.Running, nil, "")
updateMatcher := generateJobInfoMatcherWithAnyTrace(jobCredentials.ID, common.Success, common.NoneFailure)
keepAliveUpdateMatcher := generateJobInfoMatcher(jobCredentials.ID, common.Running, common.NoneFailure)
updateMatcher := generateJobInfoMatcher(jobCredentials.ID, common.Success, common.NoneFailure)
mockNetwork := new(common.MockNetwork)
defer mockNetwork.AssertExpectations(t)
// abort while running
mockNetwork.On("UpdateJob", jobConfig, jobCredentials, keepAliveUpdateMatcher).
Return(common.UpdateAbort).Once()
net := new(common.MockNetwork)
net.On("UpdateJob", jobConfig, jobCredentials, keepAliveUpdateMatcher).Return(common.UpdateAbort)
net.On("UpdateJob", jobConfig, jobCredentials, updateMatcher).Return(common.UpdateAbort)
// try to send status at least once more
mockNetwork.On("UpdateJob", jobConfig, jobCredentials, updateMatcher).
Return(common.UpdateAbort).Once()
b := newJobTrace(net, jobConfig, jobCredentials)
b := newJobTrace(mockNetwork, jobConfig, jobCredentials)
b.updateInterval = 0
b.SetCancelFunc(cancel)
b.start()
assert.NotNil(t, <-ctx.Done(), "should abort the job")
b.Success()
net.AssertExpectations(t)
}
func TestJobOutputLimit(t *testing.T) {
assert := assert.New(t)
traceMessage := "abcde"
net := new(common.MockNetwork)
mockNetwork := new(common.MockNetwork)
defer mockNetwork.AssertExpectations(t)
b := newJobTrace(net, jobOutputLimit, jobCredentials)
b := newJobTrace(mockNetwork, jobOutputLimit, jobCredentials)
// prevent any UpdateJob before `b.Success()` call
b.updateInterval = 25 * time.Second
updateMatcher := generateJobInfoMatcher(jobCredentials.ID, common.Success, nil, common.NoneFailure)
updateMatcher := generateJobInfoMatcher(jobCredentials.ID, common.Success, common.NoneFailure)
receivedTrace := bytes.NewBuffer([]byte{})
net.On("PatchTrace", jobOutputLimit, jobCredentials, mock.AnythingOfType("*network.tracePatch")).Return(common.UpdateSucceeded).Run(func(args mock.Arguments) {
if patch, ok := args.Get(2).(*tracePatch); ok {
receivedTrace.Write(patch.Patch())
} else {
assert.FailNow("Unexpected type on PatchTrace tracePatch parameter")
}
})
net.On("UpdateJob", jobOutputLimit, jobCredentials, updateMatcher).Return(common.UpdateSucceeded).Once()
mockNetwork.On("PatchTrace", jobOutputLimit, jobCredentials, mock.Anything, mock.Anything).
Return(1077, common.UpdateSucceeded).
Run(func(args mock.Arguments) {
// the 1077 == len(data)
data := args.Get(2).([]byte)
receivedTrace.Write(data)
})
mockNetwork.On("UpdateJob", jobOutputLimit, jobCredentials, updateMatcher).
Return(common.UpdateSucceeded).Once()
b.start()
// Write 5k to the buffer
......@@ -183,30 +119,23 @@ func TestJobOutputLimit(t *testing.T) {
expectedLogLimitExceededMsg := "Job's log exceeded limit of"
assert.Contains(receivedTrace.String(), traceMessage)
assert.Contains(receivedTrace.String(), expectedLogLimitExceededMsg)
net.AssertExpectations(t)
assert.Contains(t, receivedTrace.String(), traceMessage)
assert.Contains(t, receivedTrace.String(), expectedLogLimitExceededMsg)
}
func TestJobMasking(t *testing.T) {
maskedValues := []string{"masked"}
traceMessage := "This string should be masked"
traceMaskedMessage := "This string should be [MASKED]"
mockNetwork := new(common.MockNetwork)
defer mockNetwork.AssertExpectations(t)
// We disallow PatchTrace, as we want to receive fulltrace in UpdateJob
mockNetwork.On("PatchTrace", mock.Anything, mock.Anything, mock.Anything).
Return(common.UpdateFailed)
mockNetwork.On("PatchTrace", mock.Anything, mock.Anything, []byte(traceMaskedMessage), 0).
Return(len(traceMaskedMessage), common.UpdateSucceeded)
mockNetwork.On("UpdateJob", mock.Anything, mock.Anything, mock.Anything).
Return(common.UpdateSucceeded).
Run(func(args mock.Arguments) {
jobInfo, _ := args.Get(2).(common.UpdateJobInfo)
require.NotNil(t, jobInfo.Trace, "Trace should be set")
require.Equal(t, "This string should be [MASKED]", *jobInfo.Trace)
})
Return(common.UpdateSucceeded)
jobTrace := newJobTrace(mockNetwork, jobConfig, jobCredentials)
jobTrace.SetMasked(maskedValues)
......@@ -217,154 +146,150 @@ func TestJobMasking(t *testing.T) {
jobTrace.Success()
}
func TestJobFinishRetry(t *testing.T) {
updateMatcher := generateJobInfoMatcherWithAnyTrace(jobCredentials.ID, common.Success, common.NoneFailure)
func TestJobFinishTraceUpdateRetry(t *testing.T) {
updateMatcher := generateJobInfoMatcher(jobCredentials.ID, common.Success, common.NoneFailure)
net := new(common.MockNetwork)
net.On("UpdateJob", jobConfig, jobCredentials, updateMatcher).Return(common.UpdateFailed).Times(5)
net.On("UpdateJob", jobConfig, jobCredentials, updateMatcher).Return(common.UpdateSucceeded).Once()
mockNetwork := new(common.MockNetwork)
defer mockNetwork.AssertExpectations(t)
b := newJobTrace(net, jobConfig, jobCredentials)
b.finishRetryInterval = time.Microsecond
// accept just 3 bytes
mockNetwork.On("PatchTrace", jobConfig, jobCredentials, []byte("My trace send"), 0).
Return(3, common.UpdateSucceeded).Once()
b.start()
b.Success()
// retry when trying to send next bytes
mockNetwork.On("PatchTrace", jobConfig, jobCredentials, []byte("trace send"), 3).
Return(0, common.UpdateFailed).Once()
net.AssertExpectations(t)
}
// accept 6 more bytes
mockNetwork.On("PatchTrace", jobConfig, jobCredentials, []byte("trace send"), 3).
Return(9, common.UpdateSucceeded).Once()
func TestJobForceSend(t *testing.T) {
var wg sync.WaitGroup
traceMessage := "test content"
firstPatchMatcher := mock.MatchedBy(func(tracePatch common.JobTracePatch) bool {
return tracePatch.Offset() == 0 && string(tracePatch.Patch()) == traceMessage
})
keepAliveUpdateMatcher := generateJobInfoMatcher(jobCredentials.ID, common.Running, nil, "")
updateMatcher := generateJobInfoMatcherWithAnyTrace(jobCredentials.ID, common.Success, common.NoneFailure)
// restart most of trace
mockNetwork.On("PatchTrace", jobConfig, jobCredentials, []byte("send"), 9).
Return(6, common.UpdateRangeMismatch).Once()
wg.Add(1)
// accept rest of trace
mockNetwork.On("PatchTrace", jobConfig, jobCredentials, []byte("ce send"), 6).
Return(13, common.UpdateSucceeded).Once()
net := new(common.MockNetwork)
net.On("PatchTrace", jobConfig, jobCredentials, firstPatchMatcher).Return(common.UpdateSucceeded).Once()
net.On("UpdateJob", jobConfig, jobCredentials, keepAliveUpdateMatcher).Run(func(_ mock.Arguments) { wg.Done() }).Return(common.UpdateSucceeded)
net.On("UpdateJob", jobConfig, jobCredentials, updateMatcher).Return(common.UpdateSucceeded).Once()
defer net.AssertExpectations(t)
mockNetwork.On("UpdateJob", jobConfig, jobCredentials, updateMatcher).
Return(common.UpdateSucceeded).Once()
b := newJobTrace(net, jobConfig, jobCredentials)
b := newJobTrace(mockNetwork, jobConfig, jobCredentials)
b.finishRetryInterval = time.Microsecond
b.updateInterval = 500 * time.Microsecond
b.forceSendInterval = 4 * b.updateInterval
b.start()
defer b.Success()
fmt.Fprint(b, traceMessage)
wg.Wait()
fmt.Fprint(b, "My trace send")
b.Success()
}
func runOnHijackedLogrusOutput(t *testing.T, handler func(t *testing.T, output *bytes.Buffer)) {
oldOutput := logrus.StandardLogger().Out
defer func() { logrus.StandardLogger().Out = oldOutput }()
func TestJobMaxTracePatchSize(t *testing.T) {
updateMatcher := generateJobInfoMatcher(jobCredentials.ID, common.Success, common.NoneFailure)
buf := bytes.NewBuffer([]byte{})
logrus.StandardLogger().Out = buf
mockNetwork := new(common.MockNetwork)
defer mockNetwork.AssertExpectations(t)
handler(t, buf)
}
// expect just 5 bytes
mockNetwork.On("PatchTrace", jobConfig, jobCredentials, []byte("My tr"), 0).
Return(5, common.UpdateSucceeded).Once()
func TestPatchTraceRangeMismatch(t *testing.T) {
runOnHijackedLogrusOutput(t, func(t *testing.T, output *bytes.Buffer) {
var wg sync.WaitGroup
// expect next 5 bytes
mockNetwork.On("PatchTrace", jobConfig, jobCredentials, []byte("ace s"), 5).
Return(10, common.UpdateSucceeded).Once()
traceMessage := "test content"
// expect last 3 bytes
mockNetwork.On("PatchTrace", jobConfig, jobCredentials, []byte("end"), 10).
Return(13, common.UpdateSucceeded).Once()
wg.Add(1)
mockNetwork.On("UpdateJob", jobConfig, jobCredentials, updateMatcher).
Return(common.UpdateSucceeded).Once()
updateTraceOffsetFn := func(args mock.Arguments) {
patch, ok := args.Get(2).(*tracePatch)
require.True(t, ok, "Argument needs to be a proper *tracePatch instance")
patch.SetNewOffset(15)
}
b := newJobTrace(mockNetwork, jobConfig, jobCredentials)
b.finishRetryInterval = time.Microsecond
b.maxTracePatchSize = 5
fullUpdateMatcher := generateJobInfoMatcher(jobCredentials.ID, common.Running, &traceMessage, common.NoneFailure)
b.start()
fmt.Fprint(b, "My trace send")
b.Success()
}
net := new(common.MockNetwork)
net.On("PatchTrace", jobConfig, jobCredentials, mock.Anything).Run(updateTraceOffsetFn).Return(common.UpdateRangeMismatch).Once()
net.On("UpdateJob", jobConfig, jobCredentials, fullUpdateMatcher).Run(func(_ mock.Arguments) { wg.Done() }).Return(common.UpdateSucceeded).Once()
net.On("UpdateJob", jobConfig, jobCredentials, mock.Anything).Return(common.UpdateSucceeded).Once()
defer net.AssertExpectations(t)
func TestJobFinishStatusUpdateRetry(t *testing.T) {
updateMatcher := generateJobInfoMatcher(jobCredentials.ID, common.Success, common.NoneFailure)
b := newJobTrace(net, jobConfig, jobCredentials)
mockNetwork := new(common.MockNetwork)
defer mockNetwork.AssertExpectations(t)
b.updateInterval = 500 * time.Microsecond
// fail job 5 times
mockNetwork.On("UpdateJob", jobConfig, jobCredentials, updateMatcher).
Return(common.UpdateFailed).Times(5)
b.start()
defer b.Success()
// accept job
mockNetwork.On("UpdateJob", jobConfig, jobCredentials, updateMatcher).
Return(common.UpdateSucceeded).Once()
fmt.Fprint(b, traceMessage)
b := newJobTrace(mockNetwork, jobConfig, jobCredentials)
b.finishRetryInterval = time.Microsecond
wg.Wait()
assert.Regexp(t, "Full job update is needed", output.String())
})
b.start()
b.Success()
}
func TestPatchTraceDoubleRangeMismatch(t *testing.T) {
runOnHijackedLogrusOutput(t, func(t *testing.T, output *bytes.Buffer) {
var wg sync.WaitGroup
traceMessage := "test content"
wg.Add(1)
net := new(common.MockNetwork)
net.On("PatchTrace", jobConfig, jobCredentials, mock.Anything).Return(common.UpdateRangeMismatch).Once()
net.On("PatchTrace", jobConfig, jobCredentials, mock.Anything).Run(func(_ mock.Arguments) { wg.Done() }).Return(common.UpdateRangeMismatch).Once()
net.On("PatchTrace", jobConfig, jobCredentials, mock.Anything).Return(common.UpdateSucceeded).Once()
net.On("UpdateJob", jobConfig, jobCredentials, mock.Anything).Return(common.UpdateSucceeded).Once()
defer net.AssertExpectations(t)
func TestJobIncrementalPatchSend(t *testing.T) {
var wg sync.WaitGroup
b := newJobTrace(net, jobConfig, jobCredentials)
finalUpdateMatcher := generateJobInfoMatcher(
jobCredentials.ID, common.Success, common.NoneFailure)
b.updateInterval = 500 * time.Microsecond
mockNetwork := new(common.MockNetwork)
defer mockNetwork.AssertExpectations(t)
b.start()
defer b.Success()
// ensure that PatchTrace gets executed first
wg.Add(1)
mockNetwork.On("PatchTrace", jobConfig, jobCredentials, []byte("test trace"), 0).
Return(10, common.UpdateSucceeded).Once().
Run(func(args mock.Arguments) {
wg.Done()
})
fmt.Fprint(b, traceMessage)
// wait for the final `UpdateJob` to be executed
mockNetwork.On("UpdateJob", jobConfig, jobCredentials, finalUpdateMatcher).
Return(common.UpdateSucceeded).Once()
wg.Wait()
assert.Regexp(t, "Resending trace patch due to range mismatch", output.String())
assert.Regexp(t, "failed due to range mismatch", output.String())
})
b := newJobTrace(mockNetwork, jobConfig, jobCredentials)
b.updateInterval = time.Millisecond * 10
b.start()
fmt.Fprint(b, "test trace")
wg.Wait()
b.Success()
}
func TestFinalUpdateWithTrace(t *testing.T) {
func TestJobIncrementalStatusRefresh(t *testing.T) {
var wg sync.WaitGroup
traceMessage := "test content"
incrementalUpdateMatcher := generateJobInfoMatcher(
jobCredentials.ID, common.Running, common.NoneFailure)
wg.Add(1)
net := new(common.MockNetwork)
b := newJobTrace(net, jobConfig, jobCredentials)
finalUpdateMatcher := generateJobInfoMatcher(
jobCredentials.ID, common.Success, common.NoneFailure)
finalUpdateMatcher := mock.MatchedBy(func(jobInfo common.UpdateJobInfo) bool {
return *jobInfo.Trace == (traceMessage + traceMessage)
})
net.On("PatchTrace", jobConfig, jobCredentials, mock.Anything).Return(common.UpdateSucceeded).Once().Run(func(_ mock.Arguments) {
b.updateInterval = 10 * time.Second
fmt.Fprint(b, traceMessage)
go b.Success()
})
net.On("PatchTrace", jobConfig, jobCredentials, mock.Anything).Return(common.UpdateFailed).Once()
net.On("UpdateJob", jobConfig, jobCredentials, finalUpdateMatcher).Return(common.UpdateSucceeded).Once().Run(func(_ mock.Arguments) {
wg.Done()
})
defer net.AssertExpectations(t)
mockNetwork := new(common.MockNetwork)
defer mockNetwork.AssertExpectations(t)
b.updateInterval = 500 * time.Microsecond
b.start()
// ensure that incremental UpdateJob gets executed first
wg.Add(1)
mockNetwork.On("UpdateJob", jobConfig, jobCredentials, incrementalUpdateMatcher).
Return(common.UpdateSucceeded).Once().
Run(func(args mock.Arguments) {
wg.Done()
})
fmt.Fprint(b, traceMessage)
// wait for the final `UpdateJob` to be executed
mockNetwork.On("UpdateJob", jobConfig, jobCredentials, finalUpdateMatcher).
Return(common.UpdateSucceeded).Once()
b := newJobTrace(mockNetwork, jobConfig, jobCredentials)
b.updateInterval = time.Millisecond * 10
b.start()
wg.Wait()
b.Success()
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment