From d676fe3739edbf4f26209553ec06c85029138b9d Mon Sep 17 00:00:00 2001 From: Tomasz Maczukin <tomasz@maczukin.pl> Date: Mon, 21 May 2018 18:26:41 +0200 Subject: [PATCH 01/13] Fix Content-Range header for trace PATCH request --- common/network.go | 1 + network/gitlab.go | 3 +- network/gitlab_test.go | 93 +++++++++++++++++++++++++++++++++++++++++- network/trace.go | 8 ++++ 4 files changed, 102 insertions(+), 3 deletions(-) diff --git a/common/network.go b/common/network.go index 4959740859b..54ba9a7955d 100644 --- a/common/network.go +++ b/common/network.go @@ -351,6 +351,7 @@ type JobTracePatch interface { Patch() []byte Offset() int Limit() int + ContentRange() string SetNewOffset(newOffset int) ValidateRange() bool } diff --git a/network/gitlab.go b/network/gitlab.go index f954f1674c2..eaac0e84ca9 100644 --- a/network/gitlab.go +++ b/network/gitlab.go @@ -333,7 +333,8 @@ func (n *GitLabClient) UpdateJob(config common.RunnerConfig, jobCredentials *com func (n *GitLabClient) PatchTrace(config common.RunnerConfig, jobCredentials *common.JobCredentials, tracePatch common.JobTracePatch) common.UpdateState { id := jobCredentials.ID - contentRange := fmt.Sprintf("%d-%d", tracePatch.Offset(), tracePatch.Limit()) + contentRange := tracePatch.ContentRange() + headers := make(http.Header) headers.Set("Content-Range", contentRange) headers.Set("JOB-TOKEN", jobCredentials.Token) diff --git a/network/gitlab_test.go b/network/gitlab_test.go index 7f5c47de68f..73b25ee79b4 100644 --- a/network/gitlab_test.go +++ b/network/gitlab_test.go @@ -674,8 +674,7 @@ func TestForbiddenPatchTrace(t *testing.T) { func TestPatchTrace(t *testing.T) { handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) { - assert.Equal(t, patchTraceString[offset:limit], body) - + assert.Equal(t, patchTraceString[offset:limit+1], body) w.WriteHeader(http.StatusAccepted) } @@ -791,6 +790,96 @@ func TestPatchTraceCantConnect(t *testing.T) { assert.Equal(t, UpdateFailed, state) } +func TestPatchTraceUpdatedTrace(t *testing.T) { + sentTrace := 0 + traceString := "" + + updates := []struct { + traceUpdate string + expectedContentRange string + expectedContentLength int64 + }{ + {traceUpdate: "test", expectedContentRange: "0-3", expectedContentLength: 4}, + {traceUpdate: "", expectedContentRange: "4-3", expectedContentLength: 0}, + {traceUpdate: " ", expectedContentRange: "4-4", expectedContentLength: 1}, + {traceUpdate: "test", expectedContentRange: "5-8", expectedContentLength: 4}, + } + + for id, update := range updates { + t.Run(fmt.Sprintf("update-%d", id+1), func(t *testing.T) { + handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) { + assert.Equal(t, traceString[offset:limit+1], body) + assert.Equal(t, update.traceUpdate, body) + assert.Equal(t, update.expectedContentRange, r.Header.Get("Content-Range")) + assert.Equal(t, update.expectedContentLength, r.ContentLength) + w.WriteHeader(http.StatusAccepted) + } + + server, client, config := getPatchServer(t, handler) + defer server.Close() + + + traceString += update.traceUpdate + tracePatch := getTracePatch(traceString, sentTrace) + client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch) + sentTrace = tracePatch.Limit() + }) + } +} + +func TestPatchTraceContentRangeAndLength(t *testing.T) { + tests := []struct { + name string + trace string + expectedContentRange string + expectedContentLength int64 + }{ + {name: "0 bytes", trace: "", expectedContentRange: "0-0", expectedContentLength: 0}, + {name: "1 byte", trace: "1", expectedContentRange: "0-0", expectedContentLength: 1}, + {name: "2 bytes", trace: "12", expectedContentRange: "0-1", expectedContentLength: 2}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) { + assert.Equal(t, test.expectedContentRange, r.Header.Get("Content-Range")) + assert.Equal(t, test.expectedContentLength, r.ContentLength) + w.WriteHeader(http.StatusAccepted) + } + + server, client, config := getPatchServer(t, handler) + defer server.Close() + + tracePatch := getTracePatch(test.trace, 0) + client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch) + }) + } +} + +func TestPatchTraceContentRangeHeaderValues(t *testing.T) { + handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) { + contentRange := r.Header.Get("Content-Range") + bytes := strings.Split(contentRange, "-") + + startByte, err := strconv.Atoi(bytes[0]) + require.NoError(t, err, "Should not cet error when parsing Content-Range startByte component") + + endByte, err := strconv.Atoi(bytes[1]) + require.NoError(t, err, "Should not cet error when parsing Content-Range endByte component") + + assert.Equal(t, 0, startByte, "Content-Range should contain start byte as first field") + assert.Equal(t, len(patchTraceString)-1, endByte, "Content-Range should contain end byte as second field") + + w.WriteHeader(http.StatusAccepted) + } + + server, client, config := getPatchServer(t, handler) + defer server.Close() + + tracePatch := getTracePatch(patchTraceString, 0) + client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch) +} + func testArtifactsUploadHandler(w http.ResponseWriter, r *http.Request, t *testing.T) { if r.URL.Path != "/api/v4/jobs/10/artifacts" { w.WriteHeader(http.StatusNotFound) diff --git a/network/trace.go b/network/trace.go index 370b5522bb2..46096d1f0d6 100644 --- a/network/trace.go +++ b/network/trace.go @@ -32,6 +32,14 @@ func (tp *tracePatch) Limit() int { return tp.limit } +func (tp *tracePatch) ContentRange() string { + limit := tp.Limit() + if limit > 0 { + limit-- + } + return fmt.Sprintf("%d-%d", tp.offset, limit) +} + func (tp *tracePatch) SetNewOffset(newOffset int) { tp.offset = newOffset } -- GitLab From 9e381d089d923af02b2147b1cd778191d54bf3ed Mon Sep 17 00:00:00 2001 From: Tomasz Maczukin <tomasz@maczukin.pl> Date: Tue, 22 May 2018 00:27:50 +0200 Subject: [PATCH 02/13] Handle 0-byte patch when updating the non-empty trace --- network/gitlab_test.go | 9 +++++---- network/trace.go | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/network/gitlab_test.go b/network/gitlab_test.go index 73b25ee79b4..c6db884163f 100644 --- a/network/gitlab_test.go +++ b/network/gitlab_test.go @@ -800,7 +800,7 @@ func TestPatchTraceUpdatedTrace(t *testing.T) { expectedContentLength int64 }{ {traceUpdate: "test", expectedContentRange: "0-3", expectedContentLength: 4}, - {traceUpdate: "", expectedContentRange: "4-3", expectedContentLength: 0}, + {traceUpdate: "", expectedContentRange: "4-4", expectedContentLength: 0}, {traceUpdate: " ", expectedContentRange: "4-4", expectedContentLength: 1}, {traceUpdate: "test", expectedContentRange: "5-8", expectedContentLength: 4}, } @@ -808,7 +808,9 @@ func TestPatchTraceUpdatedTrace(t *testing.T) { for id, update := range updates { t.Run(fmt.Sprintf("update-%d", id+1), func(t *testing.T) { handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) { - assert.Equal(t, traceString[offset:limit+1], body) + if limit+1 <= len(traceString) { + assert.Equal(t, traceString[offset:limit+1], body) + } assert.Equal(t, update.traceUpdate, body) assert.Equal(t, update.expectedContentRange, r.Header.Get("Content-Range")) assert.Equal(t, update.expectedContentLength, r.ContentLength) @@ -818,8 +820,7 @@ func TestPatchTraceUpdatedTrace(t *testing.T) { server, client, config := getPatchServer(t, handler) defer server.Close() - - traceString += update.traceUpdate + traceString += update.traceUpdate tracePatch := getTracePatch(traceString, sentTrace) client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch) sentTrace = tracePatch.Limit() diff --git a/network/trace.go b/network/trace.go index 46096d1f0d6..b8bd7642129 100644 --- a/network/trace.go +++ b/network/trace.go @@ -34,7 +34,7 @@ func (tp *tracePatch) Limit() int { func (tp *tracePatch) ContentRange() string { limit := tp.Limit() - if limit > 0 { + if limit > 0 && limit > tp.offset { limit-- } return fmt.Sprintf("%d-%d", tp.offset, limit) -- GitLab From c2fb033fbf5ee22b3b9981b22134c30e7841d7e7 Mon Sep 17 00:00:00 2001 From: Tomasz Maczukin <tomasz@maczukin.pl> Date: Thu, 7 Jun 2018 12:18:48 +0200 Subject: [PATCH 03/13] Move Content-Range header composition to network/gitlab.go --- common/mock_JobTracePatch.go | 8 ++++---- common/network.go | 5 ++--- network/gitlab.go | 2 +- network/gitlab_test.go | 2 +- network/trace.go | 17 ++++++----------- network/trace_test.go | 6 +++--- 6 files changed, 17 insertions(+), 23 deletions(-) diff --git a/common/mock_JobTracePatch.go b/common/mock_JobTracePatch.go index 9a82ef6a7d5..12dbca55caa 100644 --- a/common/mock_JobTracePatch.go +++ b/common/mock_JobTracePatch.go @@ -11,8 +11,8 @@ type MockJobTracePatch struct { mock.Mock } -// Limit provides a mock function with given fields: -func (_m *MockJobTracePatch) Limit() int { +// FirstByte provides a mock function with given fields: +func (_m *MockJobTracePatch) FirstByte() int { ret := _m.Called() var r0 int @@ -25,8 +25,8 @@ func (_m *MockJobTracePatch) Limit() int { return r0 } -// Offset provides a mock function with given fields: -func (_m *MockJobTracePatch) Offset() int { +// LastByte provides a mock function with given fields: +func (_m *MockJobTracePatch) LastByte() int { ret := _m.Called() var r0 int diff --git a/common/network.go b/common/network.go index 54ba9a7955d..fe22094521c 100644 --- a/common/network.go +++ b/common/network.go @@ -349,9 +349,8 @@ type JobTrace interface { type JobTracePatch interface { Patch() []byte - Offset() int - Limit() int - ContentRange() string + FirstByte() int + LastByte() int SetNewOffset(newOffset int) ValidateRange() bool } diff --git a/network/gitlab.go b/network/gitlab.go index eaac0e84ca9..b9e6104efc7 100644 --- a/network/gitlab.go +++ b/network/gitlab.go @@ -333,7 +333,7 @@ func (n *GitLabClient) UpdateJob(config common.RunnerConfig, jobCredentials *com func (n *GitLabClient) PatchTrace(config common.RunnerConfig, jobCredentials *common.JobCredentials, tracePatch common.JobTracePatch) common.UpdateState { id := jobCredentials.ID - contentRange := tracePatch.ContentRange() + contentRange := fmt.Sprintf("%d-%d", tracePatch.FirstByte(), tracePatch.LastByte()) headers := make(http.Header) headers.Set("Content-Range", contentRange) diff --git a/network/gitlab_test.go b/network/gitlab_test.go index c6db884163f..f03307bfa8e 100644 --- a/network/gitlab_test.go +++ b/network/gitlab_test.go @@ -823,7 +823,7 @@ func TestPatchTraceUpdatedTrace(t *testing.T) { traceString += update.traceUpdate tracePatch := getTracePatch(traceString, sentTrace) client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch) - sentTrace = tracePatch.Limit() + sentTrace = tracePatch.limit }) } } diff --git a/network/trace.go b/network/trace.go index b8bd7642129..601206433fc 100644 --- a/network/trace.go +++ b/network/trace.go @@ -24,20 +24,15 @@ func (tp *tracePatch) Patch() []byte { return tp.trace.Bytes()[tp.offset:tp.limit] } -func (tp *tracePatch) Offset() int { +func (tp *tracePatch) FirstByte() int { return tp.offset } -func (tp *tracePatch) Limit() int { - return tp.limit -} - -func (tp *tracePatch) ContentRange() string { - limit := tp.Limit() - if limit > 0 && limit > tp.offset { - limit-- +func (tp *tracePatch) LastByte() int { + if tp.limit > 0 && tp.limit > tp.offset { + return tp.limit - 1 } - return fmt.Sprintf("%d-%d", tp.offset, limit) + return tp.limit } func (tp *tracePatch) SetNewOffset(newOffset int) { @@ -236,7 +231,7 @@ func (c *clientJobTrace) incrementalUpdate() common.UpdateState { } if update == common.UpdateSucceeded { - c.sentTrace = tracePatch.Limit() + c.sentTrace = tracePatch.limit c.sentTime = time.Now() } diff --git a/network/trace_test.go b/network/trace_test.go index 1c701419522..7cdd7c9ed61 100644 --- a/network/trace_test.go +++ b/network/trace_test.go @@ -59,7 +59,7 @@ func generateJobInfoMatcherWithAnyTrace(id int, state common.JobState, failureRe func TestJobTraceUpdateSucceeded(t *testing.T) { traceMessage := "test content" patchTraceMatcher := mock.MatchedBy(func(tracePatch common.JobTracePatch) bool { - return tracePatch.Offset() == 0 && string(tracePatch.Patch()) == traceMessage + return tracePatch.FirstByte() == 0 && string(tracePatch.Patch()) == traceMessage }) tests := []struct { @@ -205,10 +205,10 @@ func TestJobForceSend(t *testing.T) { var wg sync.WaitGroup traceMessage := "test content" firstPatchMatcher := mock.MatchedBy(func(tracePatch common.JobTracePatch) bool { - return tracePatch.Offset() == 0 && string(tracePatch.Patch()) == traceMessage + return tracePatch.FirstByte() == 0 && string(tracePatch.Patch()) == traceMessage }) nextEmptyPatchMatcher := mock.MatchedBy(func(tracePatch common.JobTracePatch) bool { - return tracePatch.Offset() == len(traceMessage) && string(tracePatch.Patch()) == "" + return tracePatch.FirstByte() == len(traceMessage) && string(tracePatch.Patch()) == "" }) updateMatcher := generateJobInfoMatcherWithAnyTrace(jobCredentials.ID, common.Success, common.NoneFailure) -- GitLab From f662a93f59743125cd42779b0c7354588d9e53cb Mon Sep 17 00:00:00 2001 From: Tomasz Maczukin <tomasz@maczukin.pl> Date: Thu, 7 Jun 2018 13:21:30 +0200 Subject: [PATCH 04/13] Add dedicated keep-alive endpoint --- common/mock_Network.go | 14 +++++++ common/network.go | 1 + network/gitlab.go | 45 +++++++++++++++++++-- network/gitlab_test.go | 73 ++++++++++++++++++++++++++++++++++ network/keep_alive_response.go | 31 +++++++++++++++ network/patch_response.go | 18 +-------- network/trace.go | 21 +++++++--- network/trace_test.go | 8 +--- 8 files changed, 179 insertions(+), 32 deletions(-) create mode 100644 network/keep_alive_response.go diff --git a/common/mock_Network.go b/common/mock_Network.go index 332926e87af..c6f12b3bb7b 100644 --- a/common/mock_Network.go +++ b/common/mock_Network.go @@ -26,6 +26,20 @@ func (_m *MockNetwork) DownloadArtifacts(config JobCredentials, artifactsFile st return r0 } +// KeepAlive provides a mock function with given fields: config, jobCredentials +func (_m *MockNetwork) KeepAlive(config RunnerConfig, jobCredentials *JobCredentials) UpdateState { + ret := _m.Called(config, jobCredentials) + + var r0 UpdateState + if rf, ok := ret.Get(0).(func(RunnerConfig, *JobCredentials) UpdateState); ok { + r0 = rf(config, jobCredentials) + } else { + r0 = ret.Get(0).(UpdateState) + } + + return r0 +} + // PatchTrace provides a mock function with given fields: config, jobCredentials, tracePart func (_m *MockNetwork) PatchTrace(config RunnerConfig, jobCredentials *JobCredentials, tracePart JobTracePatch) UpdateState { ret := _m.Called(config, jobCredentials, tracePart) diff --git a/common/network.go b/common/network.go index fe22094521c..edc774f3133 100644 --- a/common/network.go +++ b/common/network.go @@ -361,6 +361,7 @@ type Network interface { UnregisterRunner(config RunnerCredentials) bool RequestJob(config RunnerConfig) (*JobResponse, bool) UpdateJob(config RunnerConfig, jobCredentials *JobCredentials, jobInfo UpdateJobInfo) UpdateState + KeepAlive(config RunnerConfig, jobCredentials *JobCredentials) UpdateState PatchTrace(config RunnerConfig, jobCredentials *JobCredentials, tracePart JobTracePatch) UpdateState DownloadArtifacts(config JobCredentials, artifactsFile string) DownloadState UploadRawArtifacts(config JobCredentials, reader io.Reader, options ArtifactsOptions) UploadState diff --git a/network/gitlab.go b/network/gitlab.go index b9e6104efc7..3d6db4613eb 100644 --- a/network/gitlab.go +++ b/network/gitlab.go @@ -32,9 +32,10 @@ var apiRequestStatuses = prometheus.NewDesc( type APIEndpoint string const ( - APIEndpointRequestJob APIEndpoint = "request_job" - APIEndpointUpdateJob APIEndpoint = "update_job" - APIEndpointPatchTrace APIEndpoint = "patch_trace" + APIEndpointRequestJob APIEndpoint = "request_job" + APIEndpointUpdateJob APIEndpoint = "update_job" + APIEndpointJobKeepAlive APIEndpoint = "job_keep_alive" + APIEndpointPatchTrace APIEndpoint = "patch_trace" ) type apiRequestStatusPermutation struct { @@ -330,6 +331,42 @@ func (n *GitLabClient) UpdateJob(config common.RunnerConfig, jobCredentials *com } } +func (n *GitLabClient) KeepAlive(config common.RunnerConfig, jobCredentials *common.JobCredentials) common.UpdateState { + id := jobCredentials.ID + + headers := make(http.Header) + headers.Set("JOB-TOKEN", jobCredentials.Token) + + uri := fmt.Sprintf("jobs/%d/keep-alive", id) + + response, err := n.doRaw(&config.RunnerCredentials, "POST", uri, nil, "", headers) + if err != nil { + config.Log().Errorln("Sending keep-alive request...", "error", err.Error()) + return common.UpdateFailed + } + + n.requestsStatusesMap.Append(config.RunnerCredentials.ShortDescription(), APIEndpointJobKeepAlive, response.StatusCode) + + keepAliveResponse := NewKeepAliveResponse(response) + log := config.Log().WithFields(logrus.Fields{ + "job": id, + "code": response.StatusCode, + "job-status": keepAliveResponse.RemoteState, + }) + + switch { + case keepAliveResponse.IsAborted(): + log.Warningln("Sending keep-alive request...", "aborted") + return common.UpdateAbort + case response.StatusCode == http.StatusAccepted: + log.Debugln("Sending keep-alive request...", "ok") + return common.UpdateSucceeded + default: + log.Warningln("Sending keep-alive request...", "failed") + return common.UpdateFailed + } +} + func (n *GitLabClient) PatchTrace(config common.RunnerConfig, jobCredentials *common.JobCredentials, tracePatch common.JobTracePatch) common.UpdateState { id := jobCredentials.ID @@ -365,7 +402,7 @@ func (n *GitLabClient) PatchTrace(config common.RunnerConfig, jobCredentials *co switch { case tracePatchResponse.IsAborted(): - log.Warningln("Appending trace to coordinator", "aborted") + log.Warningln("Appending trace to coordinator...", "aborted") return common.UpdateAbort case response.StatusCode == http.StatusAccepted: log.Debugln("Appending trace to coordinator...", "ok") diff --git a/network/gitlab_test.go b/network/gitlab_test.go index f03307bfa8e..bb5fb9ac595 100644 --- a/network/gitlab_test.go +++ b/network/gitlab_test.go @@ -595,6 +595,79 @@ func TestUpdateJob(t *testing.T) { assert.Equal(t, UpdateFailed, state, "Update should fail for badly formatted request") } +var keepAliveToken = "token" + +func getKeepAliveServer(t *testing.T, handler func(w http.ResponseWriter, r *http.Request)) (*httptest.Server, *GitLabClient, RunnerConfig) { + keepAliveHandler := func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/v4/jobs/1/keep-alive" { + w.WriteHeader(http.StatusNotFound) + return + } + + if r.Method != "POST" { + w.WriteHeader(http.StatusNotAcceptable) + return + } + + assert.Equal(t, keepAliveToken, r.Header.Get("JOB-TOKEN")) + + handler(w, r) + } + + server := httptest.NewServer(http.HandlerFunc(keepAliveHandler)) + + config := RunnerConfig{ + RunnerCredentials: RunnerCredentials{ + URL: server.URL, + }, + } + + return server, NewGitLabClient(), config +} + +func TestKeepAlive(t *testing.T) { + handler := func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusAccepted) + } + + server, client, config := getKeepAliveServer(t, handler) + defer server.Close() + + state := client.KeepAlive(config, &JobCredentials{ID: 1, Token: keepAliveToken}) + assert.Equal(t, UpdateSucceeded, state) +} + +func TestKeepAliveUnavailable(t *testing.T) { + handler := func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadGateway) + } + + server, client, config := getKeepAliveServer(t, handler) + defer server.Close() + + state := client.KeepAlive(config, &JobCredentials{ID: 1, Token: keepAliveToken}) + assert.Equal(t, UpdateFailed, state) +} + +func TestKeepAliveRemoteAbort(t *testing.T) { + statuses := []string{"canceled", "failed"} + + for _, status := range statuses { + t.Run(status, func(t *testing.T) { + handler := func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Job-Status", status) + w.WriteHeader(http.StatusAccepted) + } + + server, client, config := getKeepAliveServer(t, handler) + defer server.Close() + + state := client.KeepAlive(config, &JobCredentials{ID: 1, Token: keepAliveToken}) + assert.Equal(t, UpdateAbort, state) + }) + } +} + var patchToken = "token" var patchTraceString = "trace trace trace" diff --git a/network/keep_alive_response.go b/network/keep_alive_response.go new file mode 100644 index 00000000000..763b314ec63 --- /dev/null +++ b/network/keep_alive_response.go @@ -0,0 +1,31 @@ +package network + +import ( + "net/http" +) + +type KeepAliveResponse struct { + response *http.Response + + RemoteState string +} + +func (k *KeepAliveResponse) IsAborted() bool { + if k.RemoteState == "canceled" || k.RemoteState == "failed" { + return true + } + + if k.response.StatusCode == http.StatusForbidden { + return true + } + + return false +} + + +func NewKeepAliveResponse(response *http.Response) *KeepAliveResponse { + return &KeepAliveResponse{ + response: response, + RemoteState: response.Header.Get("Job-Status"), + } +} diff --git a/network/patch_response.go b/network/patch_response.go index 759727ab017..0f9edf3a60f 100644 --- a/network/patch_response.go +++ b/network/patch_response.go @@ -7,24 +7,11 @@ import ( ) type TracePatchResponse struct { - response *http.Response + *KeepAliveResponse - RemoteState string RemoteRange string } -func (p *TracePatchResponse) IsAborted() bool { - if p.RemoteState == "canceled" || p.RemoteState == "failed" { - return true - } - - if p.response.StatusCode == http.StatusForbidden { - return true - } - - return false -} - func (p *TracePatchResponse) NewOffset() int { remoteRangeParts := strings.Split(p.RemoteRange, "-") newOffset, _ := strconv.Atoi(remoteRangeParts[1]) @@ -34,8 +21,7 @@ func (p *TracePatchResponse) NewOffset() int { func NewTracePatchResponse(response *http.Response) *TracePatchResponse { return &TracePatchResponse{ - response: response, - RemoteState: response.Header.Get("Job-Status"), + KeepAliveResponse: NewKeepAliveResponse(response), RemoteRange: response.Header.Get("Range"), } } diff --git a/network/trace.go b/network/trace.go index 601206433fc..0e88e8dcf7c 100644 --- a/network/trace.go +++ b/network/trace.go @@ -194,18 +194,28 @@ func (c *clientJobTrace) process(pipe *io.PipeReader) { } } +func (c *clientJobTrace) isUpToDate(state common.JobState, traceLenght int) bool { + return c.sentState == state && c.sentTrace == traceLenght +} + func (c *clientJobTrace) incrementalUpdate() common.UpdateState { c.lock.RLock() state := c.state trace := c.log c.lock.RUnlock() - if c.sentState == state && - c.sentTrace == trace.Len() && - time.Since(c.sentTime) < c.forceSendInterval { - return common.UpdateSucceeded + if c.isUpToDate(state, trace.Len()) { + if time.Since(c.sentTime) < c.forceSendInterval { + return common.UpdateSucceeded + } + + return c.client.KeepAlive(c.config, c.jobCredentials) } + return c.sendPatch(state, trace) +} + +func (c *clientJobTrace) sendPatch(state common.JobState, trace bytes.Buffer) common.UpdateState { if c.sentState != state { jobInfo := common.UpdateJobInfo{ ID: c.id, @@ -271,8 +281,7 @@ func (c *clientJobTrace) fullUpdate() common.UpdateState { trace := c.log.String() c.lock.RUnlock() - if c.sentState == state && - c.sentTrace == len(trace) && + if c.isUpToDate(state, len(trace)) && time.Since(c.sentTime) < c.forceSendInterval { return common.UpdateSucceeded } diff --git a/network/trace_test.go b/network/trace_test.go index 7cdd7c9ed61..18140a8ead9 100644 --- a/network/trace_test.go +++ b/network/trace_test.go @@ -134,11 +134,10 @@ func TestJobAbort(t *testing.T) { updateMatcher := generateJobInfoMatcherWithAnyTrace(jobCredentials.ID, common.Success, common.NoneFailure) net := new(common.MockNetwork) - net.On("PatchTrace", jobConfig, jobCredentials, mock.AnythingOfType("*network.tracePatch")).Return(common.UpdateAbort) + net.On("KeepAlive", jobConfig, jobCredentials).Return(common.UpdateAbort) net.On("UpdateJob", jobConfig, jobCredentials, updateMatcher).Return(common.UpdateAbort) b := newJobTrace(net, jobConfig, jobCredentials) - // force immediate call to `UpdateJob` b.updateInterval = 0 b.SetCancelFunc(cancel) @@ -207,16 +206,13 @@ func TestJobForceSend(t *testing.T) { firstPatchMatcher := mock.MatchedBy(func(tracePatch common.JobTracePatch) bool { return tracePatch.FirstByte() == 0 && string(tracePatch.Patch()) == traceMessage }) - nextEmptyPatchMatcher := mock.MatchedBy(func(tracePatch common.JobTracePatch) bool { - return tracePatch.FirstByte() == len(traceMessage) && string(tracePatch.Patch()) == "" - }) updateMatcher := generateJobInfoMatcherWithAnyTrace(jobCredentials.ID, common.Success, common.NoneFailure) wg.Add(1) net := new(common.MockNetwork) net.On("PatchTrace", jobConfig, jobCredentials, firstPatchMatcher).Return(common.UpdateSucceeded).Once() - net.On("PatchTrace", jobConfig, jobCredentials, nextEmptyPatchMatcher).Return(common.UpdateSucceeded).Run(func(_ mock.Arguments) { wg.Done() }) + net.On("KeepAlive", jobConfig, jobCredentials).Run(func(_ mock.Arguments) { wg.Done() }).Return(common.UpdateSucceeded) net.On("UpdateJob", jobConfig, jobCredentials, updateMatcher).Return(common.UpdateSucceeded).Once() defer net.AssertExpectations(t) -- GitLab From ae1615ec4c96f487d40d3bd90acaf5b1b71016e9 Mon Sep 17 00:00:00 2001 From: Tomasz Maczukin <tomasz@maczukin.pl> Date: Thu, 7 Jun 2018 19:33:04 +0200 Subject: [PATCH 05/13] Use UpdateJob endpoint instead of dedicated one --- common/mock_Network.go | 14 ---- common/network.go | 1 - network/client.go | 12 ++-- network/client_test.go | 30 ++++----- network/gitlab.go | 73 +++++++-------------- network/gitlab_test.go | 114 ++++++++++++++++++--------------- network/keep_alive_response.go | 12 ++-- network/trace.go | 7 +- network/trace_test.go | 6 +- 9 files changed, 121 insertions(+), 148 deletions(-) diff --git a/common/mock_Network.go b/common/mock_Network.go index c6f12b3bb7b..332926e87af 100644 --- a/common/mock_Network.go +++ b/common/mock_Network.go @@ -26,20 +26,6 @@ func (_m *MockNetwork) DownloadArtifacts(config JobCredentials, artifactsFile st return r0 } -// KeepAlive provides a mock function with given fields: config, jobCredentials -func (_m *MockNetwork) KeepAlive(config RunnerConfig, jobCredentials *JobCredentials) UpdateState { - ret := _m.Called(config, jobCredentials) - - var r0 UpdateState - if rf, ok := ret.Get(0).(func(RunnerConfig, *JobCredentials) UpdateState); ok { - r0 = rf(config, jobCredentials) - } else { - r0 = ret.Get(0).(UpdateState) - } - - return r0 -} - // PatchTrace provides a mock function with given fields: config, jobCredentials, tracePart func (_m *MockNetwork) PatchTrace(config RunnerConfig, jobCredentials *JobCredentials, tracePart JobTracePatch) UpdateState { ret := _m.Called(config, jobCredentials, tracePart) diff --git a/common/network.go b/common/network.go index edc774f3133..fe22094521c 100644 --- a/common/network.go +++ b/common/network.go @@ -361,7 +361,6 @@ type Network interface { UnregisterRunner(config RunnerCredentials) bool RequestJob(config RunnerConfig) (*JobResponse, bool) UpdateJob(config RunnerConfig, jobCredentials *JobCredentials, jobInfo UpdateJobInfo) UpdateState - KeepAlive(config RunnerConfig, jobCredentials *JobCredentials) UpdateState PatchTrace(config RunnerConfig, jobCredentials *JobCredentials, tracePart JobTracePatch) UpdateState DownloadArtifacts(config JobCredentials, artifactsFile string) DownloadState UploadRawArtifacts(config JobCredentials, reader io.Reader, options ArtifactsOptions) UploadState diff --git a/network/client.go b/network/client.go index 6fbae41c206..c5c2d6c205b 100644 --- a/network/client.go +++ b/network/client.go @@ -272,13 +272,13 @@ func (n *client) do(uri, method string, request io.Reader, requestType string, h return } -func (n *client) doJSON(uri, method string, statusCode int, request interface{}, response interface{}) (int, string, ResponseTLSData) { +func (n *client) doJSON(uri, method string, statusCode int, request interface{}, response interface{}) (int, string, ResponseTLSData, *http.Response) { var body io.Reader if request != nil { requestBody, err := json.Marshal(request) if err != nil { - return -1, fmt.Sprintf("failed to marshal project object: %v", err), ResponseTLSData{} + return -1, fmt.Sprintf("failed to marshal project object: %v", err), ResponseTLSData{}, nil } body = bytes.NewReader(requestBody) } @@ -290,7 +290,7 @@ func (n *client) doJSON(uri, method string, statusCode int, request interface{}, res, err := n.do(uri, method, body, "application/json", headers) if err != nil { - return -1, err.Error(), ResponseTLSData{} + return -1, err.Error(), ResponseTLSData{}, nil } defer res.Body.Close() defer io.Copy(ioutil.Discard, res.Body) @@ -299,13 +299,13 @@ func (n *client) doJSON(uri, method string, statusCode int, request interface{}, if response != nil { isApplicationJSON, err := isResponseApplicationJSON(res) if !isApplicationJSON { - return -1, err.Error(), ResponseTLSData{} + return -1, err.Error(), ResponseTLSData{}, nil } d := json.NewDecoder(res.Body) err = d.Decode(response) if err != nil { - return -1, fmt.Sprintf("Error decoding json payload %v", err), ResponseTLSData{} + return -1, fmt.Sprintf("Error decoding json payload %v", err), ResponseTLSData{}, nil } } } @@ -318,7 +318,7 @@ func (n *client) doJSON(uri, method string, statusCode int, request interface{}, KeyFile: n.keyFile, } - return res.StatusCode, res.Status, TLSData + return res.StatusCode, res.Status, TLSData, res } func isResponseApplicationJSON(res *http.Response) (result bool, err error) { diff --git a/network/client_test.go b/network/client_test.go index ee97bd73e43..8730698ba89 100644 --- a/network/client_test.go +++ b/network/client_test.go @@ -117,7 +117,7 @@ func TestClientDo(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, c) - statusCode, statusText, _ := c.doJSON("test/auth", "GET", http.StatusOK, nil, nil) + statusCode, statusText, _, _ := c.doJSON("test/auth", "GET", http.StatusOK, nil, nil) assert.Equal(t, http.StatusForbidden, statusCode, statusText) req := struct { @@ -130,16 +130,16 @@ func TestClientDo(t *testing.T) { Key string `json:"key"` }{} - statusCode, statusText, _ = c.doJSON("test/json", "GET", http.StatusOK, nil, &res) + statusCode, statusText, _, _ = c.doJSON("test/json", "GET", http.StatusOK, nil, &res) assert.Equal(t, http.StatusBadRequest, statusCode, statusText) - statusCode, statusText, _ = c.doJSON("test/json", "GET", http.StatusOK, &req, nil) + statusCode, statusText, _, _ = c.doJSON("test/json", "GET", http.StatusOK, &req, nil) assert.Equal(t, http.StatusNotAcceptable, statusCode, statusText) - statusCode, statusText, _ = c.doJSON("test/json", "GET", http.StatusOK, nil, nil) + statusCode, statusText, _, _ = c.doJSON("test/json", "GET", http.StatusOK, nil, nil) assert.Equal(t, http.StatusBadRequest, statusCode, statusText) - statusCode, statusText, _ = c.doJSON("test/json", "GET", http.StatusOK, &req, &res) + statusCode, statusText, _, _ = c.doJSON("test/json", "GET", http.StatusOK, &req, &res) assert.Equal(t, http.StatusOK, statusCode, statusText) assert.Equal(t, "value", res.Key, statusText) } @@ -151,7 +151,7 @@ func TestClientInvalidSSL(t *testing.T) { c, _ := newClient(&RunnerCredentials{ URL: s.URL, }) - statusCode, statusText, _ := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil) + statusCode, statusText, _, _ := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil) assert.Equal(t, -1, statusCode, statusText) assert.Contains(t, statusText, "certificate signed by unknown authority") } @@ -172,7 +172,7 @@ func TestClientTLSCAFile(t *testing.T) { URL: s.URL, TLSCAFile: file.Name(), }) - statusCode, statusText, tlsData := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil) + statusCode, statusText, tlsData, _ := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil) assert.Equal(t, http.StatusOK, statusCode, statusText) assert.NotEmpty(t, tlsData.CAChain) } @@ -197,7 +197,7 @@ func TestClientCertificateInPredefinedDirectory(t *testing.T) { c, _ := newClient(&RunnerCredentials{ URL: s.URL, }) - statusCode, statusText, tlsData := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil) + statusCode, statusText, tlsData, _ := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil) assert.Equal(t, http.StatusOK, statusCode, statusText) assert.NotEmpty(t, tlsData.CAChain) } @@ -221,7 +221,7 @@ func TestClientInvalidTLSAuth(t *testing.T) { URL: s.URL, TLSCAFile: ca.Name(), }) - statusCode, statusText, _ := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil) + statusCode, statusText, _, _ := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil) assert.Equal(t, -1, statusCode, statusText) assert.Contains(t, statusText, "tls: bad certificate") } @@ -260,7 +260,7 @@ func TestClientTLSAuth(t *testing.T) { TLSCertFile: cert.Name(), TLSKeyFile: key.Name(), }) - statusCode, statusText, tlsData := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil) + statusCode, statusText, tlsData, _ := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil) assert.Equal(t, http.StatusOK, statusCode, statusText) assert.NotEmpty(t, tlsData.CAChain) assert.Equal(t, cert.Name(), tlsData.CertFile) @@ -295,7 +295,7 @@ func TestClientTLSAuthCertificatesInPredefinedDirectory(t *testing.T) { c, _ := newClient(&RunnerCredentials{ URL: s.URL, }) - statusCode, statusText, tlsData := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil) + statusCode, statusText, tlsData, _ := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil) assert.Equal(t, http.StatusOK, statusCode, statusText) assert.NotEmpty(t, tlsData.CAChain) assert.NotEmpty(t, tlsData.CertFile) @@ -350,16 +350,16 @@ func TestClientHandleCharsetInContentType(t *testing.T) { Key string `json:"key"` }{} - statusCode, statusText, _ := c.doJSON("with-charset", "GET", http.StatusOK, nil, &res) + statusCode, statusText, _, _ := c.doJSON("with-charset", "GET", http.StatusOK, nil, &res) assert.Equal(t, http.StatusOK, statusCode, statusText) - statusCode, statusText, _ = c.doJSON("without-charset", "GET", http.StatusOK, nil, &res) + statusCode, statusText, _, _ = c.doJSON("without-charset", "GET", http.StatusOK, nil, &res) assert.Equal(t, http.StatusOK, statusCode, statusText) - statusCode, statusText, _ = c.doJSON("without-json", "GET", http.StatusOK, nil, &res) + statusCode, statusText, _, _ = c.doJSON("without-json", "GET", http.StatusOK, nil, &res) assert.Equal(t, -1, statusCode, statusText) - statusCode, statusText, _ = c.doJSON("invalid-header", "GET", http.StatusOK, nil, &res) + statusCode, statusText, _, _ = c.doJSON("invalid-header", "GET", http.StatusOK, nil, &res) assert.Equal(t, -1, statusCode, statusText) } diff --git a/network/gitlab.go b/network/gitlab.go index 3d6db4613eb..0aeb185e33a 100644 --- a/network/gitlab.go +++ b/network/gitlab.go @@ -160,10 +160,10 @@ func (n *GitLabClient) doRaw(credentials requestCredentials, method, uri string, return c.do(uri, method, request, requestType, headers) } -func (n *GitLabClient) doJSON(credentials requestCredentials, method, uri string, statusCode int, request interface{}, response interface{}) (int, string, ResponseTLSData) { +func (n *GitLabClient) doJSON(credentials requestCredentials, method, uri string, statusCode int, request interface{}, response interface{}) (int, string, ResponseTLSData, *http.Response) { c, err := n.getClient(credentials) if err != nil { - return clientError, err.Error(), ResponseTLSData{} + return clientError, err.Error(), ResponseTLSData{}, nil } return c.doJSON(uri, method, statusCode, request, response) @@ -178,7 +178,7 @@ func (n *GitLabClient) RegisterRunner(runner common.RunnerCredentials, parameter } var response common.RegisterRunnerResponse - result, statusText, _ := n.doJSON(&runner, "POST", "runners", http.StatusCreated, &request, &response) + result, statusText, _, _ := n.doJSON(&runner, "POST", "runners", http.StatusCreated, &request, &response) switch result { case http.StatusCreated: @@ -201,7 +201,7 @@ func (n *GitLabClient) VerifyRunner(runner common.RunnerCredentials) bool { Token: runner.Token, } - result, statusText, _ := n.doJSON(&runner, "POST", "runners/verify", http.StatusOK, &request, nil) + result, statusText, _, _ := n.doJSON(&runner, "POST", "runners/verify", http.StatusOK, &request, nil) switch result { case http.StatusOK: @@ -225,7 +225,7 @@ func (n *GitLabClient) UnregisterRunner(runner common.RunnerCredentials) bool { Token: runner.Token, } - result, statusText, _ := n.doJSON(&runner, "DELETE", "runners", http.StatusNoContent, &request, nil) + result, statusText, _, _ := n.doJSON(&runner, "DELETE", "runners", http.StatusNoContent, &request, nil) const baseLogText = "Unregistering runner from GitLab" switch result { @@ -270,7 +270,7 @@ func (n *GitLabClient) RequestJob(config common.RunnerConfig) (*common.JobRespon } var response common.JobResponse - result, statusText, tlsData := n.doJSON(&config.RunnerCredentials, "POST", "jobs/request", http.StatusCreated, &request, &response) + result, statusText, tlsData, _ := n.doJSON(&config.RunnerCredentials, "POST", "jobs/request", http.StatusCreated, &request, &response) n.requestsStatusesMap.Append(config.RunnerCredentials.ShortDescription(), APIEndpointRequestJob, result) @@ -306,63 +306,34 @@ func (n *GitLabClient) UpdateJob(config common.RunnerConfig, jobCredentials *com Trace: jobInfo.Trace, } - log := config.Log().WithField("job", jobInfo.ID) - - result, statusText, _ := n.doJSON(&config.RunnerCredentials, "PUT", fmt.Sprintf("jobs/%d", jobInfo.ID), http.StatusOK, &request, nil) - + result, statusText, _, response := n.doJSON(&config.RunnerCredentials, "PUT", fmt.Sprintf("jobs/%d", jobInfo.ID), http.StatusOK, &request, nil) n.requestsStatusesMap.Append(config.RunnerCredentials.ShortDescription(), APIEndpointUpdateJob, result) - switch result { - case http.StatusOK: - log.Debugln("Submitting job to coordinator...", "ok") - return common.UpdateSucceeded - case http.StatusNotFound: - log.Warningln("Submitting job to coordinator...", "aborted") - return common.UpdateAbort - case http.StatusForbidden: - log.WithField("status", statusText).Errorln("Submitting job to coordinator...", "forbidden") - return common.UpdateAbort - case clientError: - log.WithField("status", statusText).Errorln("Submitting job to coordinator...", "error") - return common.UpdateAbort - default: - log.WithField("status", statusText).Warningln("Submitting job to coordinator...", "failed") - return common.UpdateFailed - } -} - -func (n *GitLabClient) KeepAlive(config common.RunnerConfig, jobCredentials *common.JobCredentials) common.UpdateState { - id := jobCredentials.ID - - headers := make(http.Header) - headers.Set("JOB-TOKEN", jobCredentials.Token) - - uri := fmt.Sprintf("jobs/%d/keep-alive", id) - - response, err := n.doRaw(&config.RunnerCredentials, "POST", uri, nil, "", headers) - if err != nil { - config.Log().Errorln("Sending keep-alive request...", "error", err.Error()) - return common.UpdateFailed - } - - n.requestsStatusesMap.Append(config.RunnerCredentials.ShortDescription(), APIEndpointJobKeepAlive, response.StatusCode) - keepAliveResponse := NewKeepAliveResponse(response) log := config.Log().WithFields(logrus.Fields{ - "job": id, - "code": response.StatusCode, + "code": result, + "job": jobInfo.ID, "job-status": keepAliveResponse.RemoteState, }) switch { case keepAliveResponse.IsAborted(): - log.Warningln("Sending keep-alive request...", "aborted") + log.Warningln("Submitting job to coordinator...", "aborted") return common.UpdateAbort - case response.StatusCode == http.StatusAccepted: - log.Debugln("Sending keep-alive request...", "ok") + case result == http.StatusOK: + log.Debugln("Submitting job to coordinator...", "ok") return common.UpdateSucceeded + case result == http.StatusNotFound: + log.Warningln("Submitting job to coordinator...", "aborted") + return common.UpdateAbort + case result == http.StatusForbidden: + log.WithField("status", statusText).Errorln("Submitting job to coordinator...", "forbidden") + return common.UpdateAbort + case result == clientError: + log.WithField("status", statusText).Errorln("Submitting job to coordinator...", "error") + return common.UpdateAbort default: - log.Warningln("Sending keep-alive request...", "failed") + log.WithField("status", statusText).Warningln("Submitting job to coordinator...", "failed") return common.UpdateFailed } } diff --git a/network/gitlab_test.go b/network/gitlab_test.go index bb5fb9ac595..ff5efb19e6a 100644 --- a/network/gitlab_test.go +++ b/network/gitlab_test.go @@ -595,77 +595,65 @@ func TestUpdateJob(t *testing.T) { assert.Equal(t, UpdateFailed, state, "Update should fail for badly formatted request") } -var keepAliveToken = "token" - -func getKeepAliveServer(t *testing.T, handler func(w http.ResponseWriter, r *http.Request)) (*httptest.Server, *GitLabClient, RunnerConfig) { - keepAliveHandler := func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/api/v4/jobs/1/keep-alive" { - w.WriteHeader(http.StatusNotFound) - return - } - - if r.Method != "POST" { - w.WriteHeader(http.StatusNotAcceptable) - return - } - - assert.Equal(t, keepAliveToken, r.Header.Get("JOB-TOKEN")) +func testUpdateJobKeepAliveHandler(w http.ResponseWriter, r *http.Request, t *testing.T) { + if r.Method != "PUT" { + w.WriteHeader(http.StatusNotAcceptable) + return + } - handler(w, r) + switch r.URL.Path { + case "/api/v4/jobs/10": + case "/api/v4/jobs/11": + w.Header().Set("Job-Status", "canceled") + case "/api/v4/jobs/12": + w.Header().Set("Job-Status", "failed") + default: + w.WriteHeader(http.StatusNotFound) + return } - server := httptest.NewServer(http.HandlerFunc(keepAliveHandler)) + body, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) - config := RunnerConfig{ - RunnerCredentials: RunnerCredentials{ - URL: server.URL, - }, - } + var req map[string]interface{} + err = json.Unmarshal(body, &req) + assert.NoError(t, err) - return server, NewGitLabClient(), config + assert.Equal(t, "token", req["token"]) + + w.WriteHeader(http.StatusOK) } -func TestKeepAlive(t *testing.T) { +func TestUpdateJobAsKeepAlive(t *testing.T) { handler := func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusAccepted) + testUpdateJobKeepAliveHandler(w, r, t) } - server, client, config := getKeepAliveServer(t, handler) - defer server.Close() - - state := client.KeepAlive(config, &JobCredentials{ID: 1, Token: keepAliveToken}) - assert.Equal(t, UpdateSucceeded, state) -} + s := httptest.NewServer(http.HandlerFunc(handler)) + defer s.Close() -func TestKeepAliveUnavailable(t *testing.T) { - handler := func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusBadGateway) + config := RunnerConfig{ + RunnerCredentials: RunnerCredentials{ + URL: s.URL, + }, } - server, client, config := getKeepAliveServer(t, handler) - defer server.Close() + jobCredentials := &JobCredentials{ + Token: "token", + } - state := client.KeepAlive(config, &JobCredentials{ID: 1, Token: keepAliveToken}) - assert.Equal(t, UpdateFailed, state) -} + c := NewGitLabClient() -func TestKeepAliveRemoteAbort(t *testing.T) { - statuses := []string{"canceled", "failed"} + var state UpdateState - for _, status := range statuses { - t.Run(status, func(t *testing.T) { - handler := func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Job-Status", status) - w.WriteHeader(http.StatusAccepted) - } + state = c.UpdateJob(config, jobCredentials, UpdateJobInfo{ID: 10, State: "running"}) + assert.Equal(t, UpdateSucceeded, state, "Update should continue when running") - server, client, config := getKeepAliveServer(t, handler) - defer server.Close() + state = c.UpdateJob(config, jobCredentials, UpdateJobInfo{ID: 11, State: "running"}) + assert.Equal(t, UpdateAbort, state, "Update should be aborted when Job-Status=canceled") - state := client.KeepAlive(config, &JobCredentials{ID: 1, Token: keepAliveToken}) - assert.Equal(t, UpdateAbort, state) - }) - } + state = c.UpdateJob(config, jobCredentials, UpdateJobInfo{ID: 12, State: "running"}) + assert.Equal(t, UpdateAbort, state, "Update should continue when Job-Status=failed") } var patchToken = "token" @@ -954,6 +942,26 @@ func TestPatchTraceContentRangeHeaderValues(t *testing.T) { client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch) } +func TestAbortedPatchTrace(t *testing.T) { + statuses := []string{"canceled", "failed"} + + for _, status := range statuses { + t.Run(status, func(t *testing.T) { + handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) { + w.Header().Set("Job-Status", status) + w.WriteHeader(http.StatusAccepted) + } + + server, client, config := getPatchServer(t, handler) + defer server.Close() + + tracePatch := getTracePatch(patchTraceString, 0) + state := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch) + assert.Equal(t, UpdateAbort, state) + }) + } +} + func testArtifactsUploadHandler(w http.ResponseWriter, r *http.Request, t *testing.T) { if r.URL.Path != "/api/v4/jobs/10/artifacts" { w.WriteHeader(http.StatusNotFound) diff --git a/network/keep_alive_response.go b/network/keep_alive_response.go index 763b314ec63..5ab645f4af5 100644 --- a/network/keep_alive_response.go +++ b/network/keep_alive_response.go @@ -5,8 +5,7 @@ import ( ) type KeepAliveResponse struct { - response *http.Response - + StatusCode int RemoteState string } @@ -15,17 +14,20 @@ func (k *KeepAliveResponse) IsAborted() bool { return true } - if k.response.StatusCode == http.StatusForbidden { + if k.StatusCode == http.StatusForbidden { return true } return false } - func NewKeepAliveResponse(response *http.Response) *KeepAliveResponse { + if response == nil { + return &KeepAliveResponse{} + } + return &KeepAliveResponse{ - response: response, + StatusCode: response.StatusCode, RemoteState: response.Header.Get("Job-Status"), } } diff --git a/network/trace.go b/network/trace.go index 0e88e8dcf7c..02824732d05 100644 --- a/network/trace.go +++ b/network/trace.go @@ -209,7 +209,12 @@ func (c *clientJobTrace) incrementalUpdate() common.UpdateState { return common.UpdateSucceeded } - return c.client.KeepAlive(c.config, c.jobCredentials) + jobInfo := common.UpdateJobInfo{ + ID: c.id, + State: c.state, + } + + return c.client.UpdateJob(c.config, c.jobCredentials, jobInfo) } return c.sendPatch(state, trace) diff --git a/network/trace_test.go b/network/trace_test.go index 18140a8ead9..1537eff6996 100644 --- a/network/trace_test.go +++ b/network/trace_test.go @@ -131,10 +131,11 @@ func TestJobAbort(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + keepAliveUpdateMatcher := generateJobInfoMatcher(jobCredentials.ID, common.Running, nil, "") updateMatcher := generateJobInfoMatcherWithAnyTrace(jobCredentials.ID, common.Success, common.NoneFailure) net := new(common.MockNetwork) - net.On("KeepAlive", jobConfig, jobCredentials).Return(common.UpdateAbort) + net.On("UpdateJob", jobConfig, jobCredentials, keepAliveUpdateMatcher).Return(common.UpdateAbort) net.On("UpdateJob", jobConfig, jobCredentials, updateMatcher).Return(common.UpdateAbort) b := newJobTrace(net, jobConfig, jobCredentials) @@ -206,13 +207,14 @@ func TestJobForceSend(t *testing.T) { firstPatchMatcher := mock.MatchedBy(func(tracePatch common.JobTracePatch) bool { return tracePatch.FirstByte() == 0 && string(tracePatch.Patch()) == traceMessage }) + keepAliveUpdateMatcher := generateJobInfoMatcher(jobCredentials.ID, common.Running, nil, "") updateMatcher := generateJobInfoMatcherWithAnyTrace(jobCredentials.ID, common.Success, common.NoneFailure) wg.Add(1) net := new(common.MockNetwork) net.On("PatchTrace", jobConfig, jobCredentials, firstPatchMatcher).Return(common.UpdateSucceeded).Once() - net.On("KeepAlive", jobConfig, jobCredentials).Run(func(_ mock.Arguments) { wg.Done() }).Return(common.UpdateSucceeded) + net.On("UpdateJob", jobConfig, jobCredentials, keepAliveUpdateMatcher).Run(func(_ mock.Arguments) { wg.Done() }).Return(common.UpdateSucceeded) net.On("UpdateJob", jobConfig, jobCredentials, updateMatcher).Return(common.UpdateSucceeded).Once() defer net.AssertExpectations(t) -- GitLab From b021d585cf776bc11d64fdc76c035463c664dc70 Mon Sep 17 00:00:00 2001 From: Tomasz Maczukin <tomasz@maczukin.pl> Date: Fri, 8 Jun 2018 14:43:29 +0200 Subject: [PATCH 06/13] Remove unused constant --- network/gitlab.go | 1 - 1 file changed, 1 deletion(-) diff --git a/network/gitlab.go b/network/gitlab.go index 0aeb185e33a..9429054dd28 100644 --- a/network/gitlab.go +++ b/network/gitlab.go @@ -34,7 +34,6 @@ type APIEndpoint string const ( APIEndpointRequestJob APIEndpoint = "request_job" APIEndpointUpdateJob APIEndpoint = "update_job" - APIEndpointJobKeepAlive APIEndpoint = "job_keep_alive" APIEndpointPatchTrace APIEndpoint = "patch_trace" ) -- GitLab From 107c4acccbb87ecac9d3f624af6268b0bde3a40f Mon Sep 17 00:00:00 2001 From: Tomasz Maczukin <tomasz@maczukin.pl> Date: Mon, 2 Jul 2018 13:05:20 +0200 Subject: [PATCH 07/13] Fix a typo --- network/trace.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/network/trace.go b/network/trace.go index 02824732d05..cb080047920 100644 --- a/network/trace.go +++ b/network/trace.go @@ -194,8 +194,8 @@ func (c *clientJobTrace) process(pipe *io.PipeReader) { } } -func (c *clientJobTrace) isUpToDate(state common.JobState, traceLenght int) bool { - return c.sentState == state && c.sentTrace == traceLenght +func (c *clientJobTrace) isUpToDate(state common.JobState, traceLength int) bool { + return c.sentState == state && c.sentTrace == traceLength } func (c *clientJobTrace) incrementalUpdate() common.UpdateState { -- GitLab From 69b344e19b158c5c4f4720083c39b6e954d5f311 Mon Sep 17 00:00:00 2001 From: Tomasz Maczukin <tomasz@maczukin.pl> Date: Mon, 2 Jul 2018 13:06:26 +0200 Subject: [PATCH 08/13] Rename struct to RemoteJobStateResponse --- network/gitlab.go | 6 ++--- network/keep_alive_response.go | 33 ---------------------------- network/patch_response.go | 6 ++--- network/remote_job_state_response.go | 33 ++++++++++++++++++++++++++++ 4 files changed, 39 insertions(+), 39 deletions(-) delete mode 100644 network/keep_alive_response.go create mode 100644 network/remote_job_state_response.go diff --git a/network/gitlab.go b/network/gitlab.go index 9429054dd28..f1a637a9d37 100644 --- a/network/gitlab.go +++ b/network/gitlab.go @@ -308,15 +308,15 @@ func (n *GitLabClient) UpdateJob(config common.RunnerConfig, jobCredentials *com result, statusText, _, response := n.doJSON(&config.RunnerCredentials, "PUT", fmt.Sprintf("jobs/%d", jobInfo.ID), http.StatusOK, &request, nil) n.requestsStatusesMap.Append(config.RunnerCredentials.ShortDescription(), APIEndpointUpdateJob, result) - keepAliveResponse := NewKeepAliveResponse(response) + remoteJobStateResponse := NewRemoteJobStateResponse(response) log := config.Log().WithFields(logrus.Fields{ "code": result, "job": jobInfo.ID, - "job-status": keepAliveResponse.RemoteState, + "job-status": remoteJobStateResponse.RemoteState, }) switch { - case keepAliveResponse.IsAborted(): + case remoteJobStateResponse.IsAborted(): log.Warningln("Submitting job to coordinator...", "aborted") return common.UpdateAbort case result == http.StatusOK: diff --git a/network/keep_alive_response.go b/network/keep_alive_response.go deleted file mode 100644 index 5ab645f4af5..00000000000 --- a/network/keep_alive_response.go +++ /dev/null @@ -1,33 +0,0 @@ -package network - -import ( - "net/http" -) - -type KeepAliveResponse struct { - StatusCode int - RemoteState string -} - -func (k *KeepAliveResponse) IsAborted() bool { - if k.RemoteState == "canceled" || k.RemoteState == "failed" { - return true - } - - if k.StatusCode == http.StatusForbidden { - return true - } - - return false -} - -func NewKeepAliveResponse(response *http.Response) *KeepAliveResponse { - if response == nil { - return &KeepAliveResponse{} - } - - return &KeepAliveResponse{ - StatusCode: response.StatusCode, - RemoteState: response.Header.Get("Job-Status"), - } -} diff --git a/network/patch_response.go b/network/patch_response.go index 0f9edf3a60f..822867554b2 100644 --- a/network/patch_response.go +++ b/network/patch_response.go @@ -7,7 +7,7 @@ import ( ) type TracePatchResponse struct { - *KeepAliveResponse + *RemoteJobStateResponse RemoteRange string } @@ -21,7 +21,7 @@ func (p *TracePatchResponse) NewOffset() int { func NewTracePatchResponse(response *http.Response) *TracePatchResponse { return &TracePatchResponse{ - KeepAliveResponse: NewKeepAliveResponse(response), - RemoteRange: response.Header.Get("Range"), + RemoteJobStateResponse: NewRemoteJobStateResponse(response), + RemoteRange: response.Header.Get("Range"), } } diff --git a/network/remote_job_state_response.go b/network/remote_job_state_response.go new file mode 100644 index 00000000000..612158461f5 --- /dev/null +++ b/network/remote_job_state_response.go @@ -0,0 +1,33 @@ +package network + +import ( + "net/http" +) + +type RemoteJobStateResponse struct { + StatusCode int + RemoteState string +} + +func (r *RemoteJobStateResponse) IsAborted() bool { + if r.RemoteState == "canceled" || r.RemoteState == "failed" { + return true + } + + if r.StatusCode == http.StatusForbidden { + return true + } + + return false +} + +func NewRemoteJobStateResponse(response *http.Response) *RemoteJobStateResponse { + if response == nil { + return &RemoteJobStateResponse{} + } + + return &RemoteJobStateResponse{ + StatusCode: response.StatusCode, + RemoteState: response.Header.Get("Job-Status"), + } +} -- GitLab From e7d1764b3445d6cd73d7870ce6e161b8edcb7748 Mon Sep 17 00:00:00 2001 From: Tomasz Maczukin <tomasz@maczukin.pl> Date: Wed, 4 Jul 2018 16:14:48 +0200 Subject: [PATCH 09/13] Increase test coverage of ./network/trace.go --- network/trace_test.go | 118 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 114 insertions(+), 4 deletions(-) diff --git a/network/trace_test.go b/network/trace_test.go index 1537eff6996..3332e8b4891 100644 --- a/network/trace_test.go +++ b/network/trace_test.go @@ -1,6 +1,7 @@ package network import ( + "bytes" "context" "errors" "fmt" @@ -8,8 +9,10 @@ import ( "testing" "time" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitlab-runner/common" ) @@ -82,11 +85,11 @@ func TestJobTraceUpdateSucceeded(t *testing.T) { var expectedFailureReason common.JobFailureReason switch test.jobState { case common.Success: - expectedFailureReason = common.JobFailureReason("") + expectedFailureReason = common.NoneFailure case common.Failed: - expectedFailureReason = common.JobFailureReason("script_failure") + expectedFailureReason = common.ScriptFailure } - updateMatcher := generateJobInfoMatcher(idx, test.jobState, &traceMessage, expectedFailureReason) + updateMatcher := generateJobInfoMatcher(idx, test.jobState, nil, expectedFailureReason) net.On("UpdateJob", jobConfig, jobCredentials, updateMatcher).Return(common.UpdateSucceeded) b := newJobTrace(net, jobConfig, jobCredentials) @@ -102,7 +105,7 @@ func TestJobTraceUpdateSucceeded(t *testing.T) { case common.Success: b.Success() case common.Failed: - b.Fail(errors.New("test"), "script_failure") + b.Fail(errors.New("test"), common.ScriptFailure) } net.AssertExpectations(t) @@ -228,3 +231,110 @@ func TestJobForceSend(t *testing.T) { fmt.Fprint(b, traceMessage) wg.Wait() } + +func runOnHijackedLogrusOutput(t *testing.T, handler func(t *testing.T, output *bytes.Buffer)) { + oldOutput := logrus.StandardLogger().Out + defer func() { logrus.StandardLogger().Out = oldOutput }() + + buf := bytes.NewBuffer([]byte{}) + logrus.StandardLogger().Out = buf + + handler(t, buf) +} + +func TestPatchTraceRangeMismatch(t *testing.T) { + runOnHijackedLogrusOutput(t, func(t *testing.T, output *bytes.Buffer) { + var wg sync.WaitGroup + + traceMessage := "test content" + + wg.Add(1) + + updateTraceOffsetFn := func(args mock.Arguments) { + patch, ok := args.Get(2).(*tracePatch) + require.True(t, ok, "Argument needs to be a proper *tracePatch instance") + patch.SetNewOffset(15) + } + + fullUpdateMatcher := generateJobInfoMatcher(jobCredentials.ID, common.Running, &traceMessage, common.NoneFailure) + + net := new(common.MockNetwork) + net.On("PatchTrace", jobConfig, jobCredentials, mock.Anything).Run(updateTraceOffsetFn).Return(common.UpdateRangeMismatch).Once() + net.On("UpdateJob", jobConfig, jobCredentials, fullUpdateMatcher).Run(func(_ mock.Arguments) { wg.Done() }).Return(common.UpdateSucceeded).Once() + net.On("UpdateJob", jobConfig, jobCredentials, mock.Anything).Return(common.UpdateSucceeded).Once() + defer net.AssertExpectations(t) + + b := newJobTrace(net, jobConfig, jobCredentials) + + b.updateInterval = 500 * time.Microsecond + + b.start() + defer b.Success() + + fmt.Fprint(b, traceMessage) + + wg.Wait() + assert.Regexp(t, "Full job update is needed", output.String()) + }) +} + +func TestPatchTraceDoubleRangeMismatch(t *testing.T) { + runOnHijackedLogrusOutput(t, func(t *testing.T, output *bytes.Buffer) { + var wg sync.WaitGroup + + traceMessage := "test content" + + wg.Add(1) + + net := new(common.MockNetwork) + net.On("PatchTrace", jobConfig, jobCredentials, mock.Anything).Return(common.UpdateRangeMismatch).Once() + net.On("PatchTrace", jobConfig, jobCredentials, mock.Anything).Run(func(_ mock.Arguments) { wg.Done() }).Return(common.UpdateRangeMismatch).Once() + net.On("UpdateJob", jobConfig, jobCredentials, mock.Anything).Return(common.UpdateSucceeded).Once() + defer net.AssertExpectations(t) + + b := newJobTrace(net, jobConfig, jobCredentials) + + b.updateInterval = 500 * time.Microsecond + + b.start() + defer b.Success() + + fmt.Fprint(b, traceMessage) + + wg.Wait() + assert.Regexp(t, "Resending trace patch due to range mismatch", output.String()) + assert.Regexp(t, "failed due to range mismatch", output.String()) + }) +} + +func TestUpdateWhenStateChanged(t *testing.T) { + var wg sync.WaitGroup + + traceMessage := "test content" + + wg.Add(1) + + updateMatcher := generateJobInfoMatcher(jobCredentials.ID, common.Failed, nil, common.RunnerSystemFailure) + + net := new(common.MockNetwork) + net.On("UpdateJob", jobConfig, jobCredentials, updateMatcher).Return(common.UpdateSucceeded).Once() + net.On("PatchTrace", jobConfig, jobCredentials, mock.Anything).Run(func(_ mock.Arguments) { wg.Done() }).Return(common.UpdateSucceeded).Once() + defer net.AssertExpectations(t) + + b := newJobTrace(net, jobConfig, jobCredentials) + + b.updateInterval = 500 * time.Microsecond + + b.start() + b.setFailure(common.RunnerSystemFailure) + + fmt.Fprint(b, traceMessage) + + wg.Wait() +} + +func TestInvalidTracePatchInitialOffsetValue(t *testing.T) { + trace := bytes.NewBufferString("test") + _, err := newTracePatch(*trace, trace.Len()+10) + assert.EqualError(t, err, "Range is invalid, limit can't be less than offset") +} \ No newline at end of file -- GitLab From 16eb0256fc244248f926909be2b37c368f4be982 Mon Sep 17 00:00:00 2001 From: Tomasz Maczukin <tomasz@maczukin.pl> Date: Wed, 4 Jul 2018 19:07:17 +0200 Subject: [PATCH 10/13] Change the flow of PatchTrace and UpdateJob calls --- network/trace.go | 68 ++++++++++++++++++++++++------------------- network/trace_test.go | 54 +++++++++++++++++++++------------- 2 files changed, 71 insertions(+), 51 deletions(-) diff --git a/network/trace.go b/network/trace.go index cb080047920..ea0e13649d5 100644 --- a/network/trace.go +++ b/network/trace.go @@ -194,43 +194,32 @@ func (c *clientJobTrace) process(pipe *io.PipeReader) { } } -func (c *clientJobTrace) isUpToDate(state common.JobState, traceLength int) bool { - return c.sentState == state && c.sentTrace == traceLength -} - func (c *clientJobTrace) incrementalUpdate() common.UpdateState { c.lock.RLock() state := c.state trace := c.log c.lock.RUnlock() - if c.isUpToDate(state, trace.Len()) { - if time.Since(c.sentTime) < c.forceSendInterval { - return common.UpdateSucceeded + if c.sentTrace != trace.Len() { + result := c.sendPatch(trace) + if result != common.UpdateSucceeded { + return result } + } - jobInfo := common.UpdateJobInfo{ - ID: c.id, - State: c.state, + if c.sentState != state || time.Since(c.sentTime) > c.forceSendInterval { + if state == common.Running { // we should only follow-up with Running! + result := c.sendUpdate(state) + if result != common.UpdateSucceeded { + return result + } } - - return c.client.UpdateJob(c.config, c.jobCredentials, jobInfo) } - return c.sendPatch(state, trace) + return common.UpdateSucceeded } -func (c *clientJobTrace) sendPatch(state common.JobState, trace bytes.Buffer) common.UpdateState { - if c.sentState != state { - jobInfo := common.UpdateJobInfo{ - ID: c.id, - State: state, - FailureReason: c.failureReason, - } - c.client.UpdateJob(c.config, c.jobCredentials, jobInfo) - c.sentState = state - } - +func (c *clientJobTrace) sendPatch(trace bytes.Buffer) common.UpdateState { tracePatch, err := newTracePatch(trace, c.sentTrace) if err != nil { c.config.Log().Errorln("Error while creating a tracePatch", err.Error()) @@ -280,27 +269,46 @@ func (c *clientJobTrace) resendPatch(id int, config common.RunnerConfig, jobCred return } +func (c *clientJobTrace) sendUpdate(state common.JobState) common.UpdateState { + jobInfo := common.UpdateJobInfo{ + ID: c.id, + State: state, + FailureReason: c.failureReason, + } + + status := c.client.UpdateJob(c.config, c.jobCredentials, jobInfo) + if status == common.UpdateSucceeded { + c.sentState = state + c.sentTime = time.Now() + } + + return status +} + func (c *clientJobTrace) fullUpdate() common.UpdateState { c.lock.RLock() state := c.state - trace := c.log.String() + trace := c.log c.lock.RUnlock() - if c.isUpToDate(state, len(trace)) && - time.Since(c.sentTime) < c.forceSendInterval { - return common.UpdateSucceeded + if c.sentTrace != trace.Len() { + c.sendPatch(trace) // we don't care about sendPatch() result, in the worst case we will re-send the trace } jobInfo := common.UpdateJobInfo{ ID: c.id, State: state, - Trace: &trace, FailureReason: c.failureReason, } + if c.sentTrace != trace.Len() { + traceString := trace.String() + jobInfo.Trace = &traceString + } + update := c.client.UpdateJob(c.config, c.jobCredentials, jobInfo) if update == common.UpdateSucceeded { - c.sentTrace = len(trace) + c.sentTrace = trace.Len() c.sentState = state c.sentTime = time.Now() } diff --git a/network/trace_test.go b/network/trace_test.go index 3332e8b4891..1fab227bfa2 100644 --- a/network/trace_test.go +++ b/network/trace_test.go @@ -157,26 +157,22 @@ func TestJobOutputLimit(t *testing.T) { traceMessage := "abcde" net := new(common.MockNetwork) + b := newJobTrace(net, jobOutputLimit, jobCredentials) // prevent any UpdateJob before `b.Success()` call b.updateInterval = 25 * time.Second - updateMatcher := generateJobInfoMatcherWithAnyTrace(jobCredentials.ID, common.Success, common.NoneFailure) - net.On("UpdateJob", jobOutputLimit, jobCredentials, updateMatcher).Return(common.UpdateSucceeded).Run(func(args mock.Arguments) { - if updateInfo, ok := args.Get(2).(common.UpdateJobInfo); ok { - trace := updateInfo.Trace - - expectedLogLimitExceededMsg := b.limitExceededMessage() - bytesLimit := b.bytesLimit + len(expectedLogLimitExceededMsg) - traceSize := len(*trace) + updateMatcher := generateJobInfoMatcher(jobCredentials.ID, common.Success, nil, common.NoneFailure) - assert.Equal(bytesLimit, traceSize, "the trace should be exaclty %v bytes", bytesLimit) - assert.Contains(*trace, traceMessage) - assert.Contains(*trace, b.limitExceededMessage()) + receivedTrace := bytes.NewBuffer([]byte{}) + net.On("PatchTrace", jobOutputLimit, jobCredentials, mock.AnythingOfType("*network.tracePatch")).Return(common.UpdateSucceeded).Run(func(args mock.Arguments) { + if patch, ok := args.Get(2).(*tracePatch); ok { + receivedTrace.Write(patch.Patch()) } else { - assert.FailNow("Unexpected type on UpdateJob jobInfo parameter") + assert.FailNow("Unexpected type on PatchTrace tracePatch parameter") } }) + net.On("UpdateJob", jobOutputLimit, jobCredentials, updateMatcher).Return(common.UpdateSucceeded).Once() b.start() // Write 5k to the buffer @@ -185,6 +181,15 @@ func TestJobOutputLimit(t *testing.T) { } b.Success() + expectedLogLimitExceededMsg := b.limitExceededMessage() + bytesLimit := b.bytesLimit + len(expectedLogLimitExceededMsg) + trace := receivedTrace.String() + traceSize := len(trace) + + assert.Equal(bytesLimit, traceSize, "the trace should be exaclty %v bytes", bytesLimit) + assert.Contains(trace, traceMessage) + assert.Contains(trace, expectedLogLimitExceededMsg) + net.AssertExpectations(t) } @@ -289,6 +294,7 @@ func TestPatchTraceDoubleRangeMismatch(t *testing.T) { net := new(common.MockNetwork) net.On("PatchTrace", jobConfig, jobCredentials, mock.Anything).Return(common.UpdateRangeMismatch).Once() net.On("PatchTrace", jobConfig, jobCredentials, mock.Anything).Run(func(_ mock.Arguments) { wg.Done() }).Return(common.UpdateRangeMismatch).Once() + net.On("PatchTrace", jobConfig, jobCredentials, mock.Anything).Return(common.UpdateSucceeded).Once() net.On("UpdateJob", jobConfig, jobCredentials, mock.Anything).Return(common.UpdateSucceeded).Once() defer net.AssertExpectations(t) @@ -307,26 +313,32 @@ func TestPatchTraceDoubleRangeMismatch(t *testing.T) { }) } -func TestUpdateWhenStateChanged(t *testing.T) { +func TestFinalUpdateWithTrace(t *testing.T) { var wg sync.WaitGroup traceMessage := "test content" wg.Add(1) + net := new(common.MockNetwork) + b := newJobTrace(net, jobConfig, jobCredentials) - updateMatcher := generateJobInfoMatcher(jobCredentials.ID, common.Failed, nil, common.RunnerSystemFailure) + finalUpdateMatcher := mock.MatchedBy(func(jobInfo common.UpdateJobInfo) bool { + return *jobInfo.Trace == (traceMessage+traceMessage) + }) - net := new(common.MockNetwork) - net.On("UpdateJob", jobConfig, jobCredentials, updateMatcher).Return(common.UpdateSucceeded).Once() - net.On("PatchTrace", jobConfig, jobCredentials, mock.Anything).Run(func(_ mock.Arguments) { wg.Done() }).Return(common.UpdateSucceeded).Once() + net.On("PatchTrace", jobConfig, jobCredentials, mock.Anything).Return(common.UpdateSucceeded).Once().Run(func(_ mock.Arguments) { + b.updateInterval = 10 * time.Second + fmt.Fprint(b, traceMessage) + go b.Success() + }) + net.On("PatchTrace", jobConfig, jobCredentials, mock.Anything).Return(common.UpdateFailed).Once() + net.On("UpdateJob", jobConfig, jobCredentials, finalUpdateMatcher).Return(common.UpdateSucceeded).Once().Run(func(_ mock.Arguments) { + wg.Done() + }) defer net.AssertExpectations(t) - b := newJobTrace(net, jobConfig, jobCredentials) - b.updateInterval = 500 * time.Microsecond - b.start() - b.setFailure(common.RunnerSystemFailure) fmt.Fprint(b, traceMessage) -- GitLab From e99421caa79e1ef417569500d9531d6ebe777c5a Mon Sep 17 00:00:00 2001 From: Tomasz Maczukin <tomasz@maczukin.pl> Date: Wed, 4 Jul 2018 19:58:22 +0200 Subject: [PATCH 11/13] Disable sending empty PatchTrace call --- common/network.go | 5 ++-- network/gitlab.go | 17 +++++++---- network/gitlab_test.go | 59 ++++++++++++++++++++++++------------- network/trace.go | 50 +------------------------------ network/trace_patch.go | 54 +++++++++++++++++++++++++++++++++ network/trace_patch_test.go | 46 +++++++++++++++++++++++++++++ network/trace_test.go | 10 ++----- 7 files changed, 156 insertions(+), 85 deletions(-) create mode 100644 network/trace_patch.go create mode 100644 network/trace_patch_test.go diff --git a/common/network.go b/common/network.go index fe22094521c..b6f21ee03b8 100644 --- a/common/network.go +++ b/common/network.go @@ -349,10 +349,11 @@ type JobTrace interface { type JobTracePatch interface { Patch() []byte - FirstByte() int - LastByte() int + Offset() int + TotalSize() int SetNewOffset(newOffset int) ValidateRange() bool + PatchEmpty() bool } type Network interface { diff --git a/network/gitlab.go b/network/gitlab.go index f1a637a9d37..7caf75325b5 100644 --- a/network/gitlab.go +++ b/network/gitlab.go @@ -32,9 +32,9 @@ var apiRequestStatuses = prometheus.NewDesc( type APIEndpoint string const ( - APIEndpointRequestJob APIEndpoint = "request_job" - APIEndpointUpdateJob APIEndpoint = "update_job" - APIEndpointPatchTrace APIEndpoint = "patch_trace" + APIEndpointRequestJob APIEndpoint = "request_job" + APIEndpointUpdateJob APIEndpoint = "update_job" + APIEndpointPatchTrace APIEndpoint = "patch_trace" ) type apiRequestStatusPermutation struct { @@ -340,7 +340,13 @@ func (n *GitLabClient) UpdateJob(config common.RunnerConfig, jobCredentials *com func (n *GitLabClient) PatchTrace(config common.RunnerConfig, jobCredentials *common.JobCredentials, tracePatch common.JobTracePatch) common.UpdateState { id := jobCredentials.ID - contentRange := fmt.Sprintf("%d-%d", tracePatch.FirstByte(), tracePatch.LastByte()) + baseLog := config.Log().WithField("job", id) + if tracePatch.PatchEmpty() { + baseLog.Warningln("Appending trace to coordinator...", "skipped due to empty patch") + return common.UpdateFailed + } + + contentRange := fmt.Sprintf("%d-%d", tracePatch.Offset(), tracePatch.TotalSize()-1) headers := make(http.Header) headers.Set("Content-Range", contentRange) @@ -361,8 +367,7 @@ func (n *GitLabClient) PatchTrace(config common.RunnerConfig, jobCredentials *co defer io.Copy(ioutil.Discard, response.Body) tracePatchResponse := NewTracePatchResponse(response) - log := config.Log().WithFields(logrus.Fields{ - "job": id, + log := baseLog.WithFields(logrus.Fields{ "sent-log": contentRange, "job-log": tracePatchResponse.RemoteRange, "job-status": tracePatchResponse.RemoteState, diff --git a/network/gitlab_test.go b/network/gitlab_test.go index ff5efb19e6a..87a3fda4352 100644 --- a/network/gitlab_test.go +++ b/network/gitlab_test.go @@ -856,22 +856,30 @@ func TestPatchTraceUpdatedTrace(t *testing.T) { traceString := "" updates := []struct { - traceUpdate string - expectedContentRange string - expectedContentLength int64 + traceUpdate string + expectedContentRange string + expectedContentLength int64 + expectedResult UpdateState + shouldNotCallPatchTrace bool }{ - {traceUpdate: "test", expectedContentRange: "0-3", expectedContentLength: 4}, - {traceUpdate: "", expectedContentRange: "4-4", expectedContentLength: 0}, - {traceUpdate: " ", expectedContentRange: "4-4", expectedContentLength: 1}, - {traceUpdate: "test", expectedContentRange: "5-8", expectedContentLength: 4}, + {traceUpdate: "test", expectedContentRange: "0-3", expectedContentLength: 4, expectedResult: UpdateSucceeded}, + {traceUpdate: "", expectedResult: UpdateFailed, shouldNotCallPatchTrace: true}, + {traceUpdate: " ", expectedContentRange: "4-4", expectedContentLength: 1, expectedResult: UpdateSucceeded}, + {traceUpdate: "test", expectedContentRange: "5-8", expectedContentLength: 4, expectedResult: UpdateSucceeded}, } for id, update := range updates { - t.Run(fmt.Sprintf("update-%d", id+1), func(t *testing.T) { + t.Run(fmt.Sprintf("patch-%d", id+1), func(t *testing.T) { handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) { + if update.shouldNotCallPatchTrace { + t.Error("PatchTrace endpoint should not be called") + return + } + if limit+1 <= len(traceString) { assert.Equal(t, traceString[offset:limit+1], body) } + assert.Equal(t, update.traceUpdate, body) assert.Equal(t, update.expectedContentRange, r.Header.Get("Content-Range")) assert.Equal(t, update.expectedContentLength, r.ContentLength) @@ -883,27 +891,37 @@ func TestPatchTraceUpdatedTrace(t *testing.T) { traceString += update.traceUpdate tracePatch := getTracePatch(traceString, sentTrace) - client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch) - sentTrace = tracePatch.limit + + result := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch) + assert.Equal(t, update.expectedResult, result) + + sentTrace = tracePatch.totalSize }) } } func TestPatchTraceContentRangeAndLength(t *testing.T) { tests := []struct { - name string - trace string - expectedContentRange string - expectedContentLength int64 + name string + trace string + expectedContentRange string + expectedContentLength int64 + expectedResult UpdateState + shouldNotCallPatchTrace bool }{ - {name: "0 bytes", trace: "", expectedContentRange: "0-0", expectedContentLength: 0}, - {name: "1 byte", trace: "1", expectedContentRange: "0-0", expectedContentLength: 1}, - {name: "2 bytes", trace: "12", expectedContentRange: "0-1", expectedContentLength: 2}, + {name: "0 bytes", trace: "", expectedResult: UpdateFailed, shouldNotCallPatchTrace: true}, + {name: "1 byte", trace: "1", expectedContentRange: "0-0", expectedContentLength: 1, expectedResult: UpdateSucceeded, shouldNotCallPatchTrace: false}, + {name: "2 bytes", trace: "12", expectedContentRange: "0-1", expectedContentLength: 2, expectedResult: UpdateSucceeded, shouldNotCallPatchTrace: false}, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) { + if test.shouldNotCallPatchTrace { + t.Error("PatchTrace endpoint should not be called") + return + } + assert.Equal(t, test.expectedContentRange, r.Header.Get("Content-Range")) assert.Equal(t, test.expectedContentLength, r.ContentLength) w.WriteHeader(http.StatusAccepted) @@ -913,7 +931,8 @@ func TestPatchTraceContentRangeAndLength(t *testing.T) { defer server.Close() tracePatch := getTracePatch(test.trace, 0) - client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch) + result := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch) + assert.Equal(t, test.expectedResult, result) }) } } @@ -924,10 +943,10 @@ func TestPatchTraceContentRangeHeaderValues(t *testing.T) { bytes := strings.Split(contentRange, "-") startByte, err := strconv.Atoi(bytes[0]) - require.NoError(t, err, "Should not cet error when parsing Content-Range startByte component") + require.NoError(t, err, "Should not set error when parsing Content-Range startByte component") endByte, err := strconv.Atoi(bytes[1]) - require.NoError(t, err, "Should not cet error when parsing Content-Range endByte component") + require.NoError(t, err, "Should not set error when parsing Content-Range endByte component") assert.Equal(t, 0, startByte, "Content-Range should contain start byte as first field") assert.Equal(t, len(patchTraceString)-1, endByte, "Content-Range should contain end byte as second field") diff --git a/network/trace.go b/network/trace.go index ea0e13649d5..7be6298e446 100644 --- a/network/trace.go +++ b/network/trace.go @@ -4,7 +4,6 @@ import ( "bufio" "bytes" "context" - "errors" "fmt" "io" "sync" @@ -14,53 +13,6 @@ import ( "gitlab.com/gitlab-org/gitlab-runner/helpers" ) -type tracePatch struct { - trace bytes.Buffer - offset int - limit int -} - -func (tp *tracePatch) Patch() []byte { - return tp.trace.Bytes()[tp.offset:tp.limit] -} - -func (tp *tracePatch) FirstByte() int { - return tp.offset -} - -func (tp *tracePatch) LastByte() int { - if tp.limit > 0 && tp.limit > tp.offset { - return tp.limit - 1 - } - return tp.limit -} - -func (tp *tracePatch) SetNewOffset(newOffset int) { - tp.offset = newOffset -} - -func (tp *tracePatch) ValidateRange() bool { - if tp.limit >= tp.offset { - return true - } - - return false -} - -func newTracePatch(trace bytes.Buffer, offset int) (*tracePatch, error) { - patch := &tracePatch{ - trace: trace, - offset: offset, - limit: trace.Len(), - } - - if !patch.ValidateRange() { - return nil, errors.New("Range is invalid, limit can't be less than offset") - } - - return patch, nil -} - type clientJobTrace struct { *io.PipeWriter @@ -235,7 +187,7 @@ func (c *clientJobTrace) sendPatch(trace bytes.Buffer) common.UpdateState { } if update == common.UpdateSucceeded { - c.sentTrace = tracePatch.limit + c.sentTrace = tracePatch.totalSize c.sentTime = time.Now() } diff --git a/network/trace_patch.go b/network/trace_patch.go new file mode 100644 index 00000000000..e103266aeec --- /dev/null +++ b/network/trace_patch.go @@ -0,0 +1,54 @@ +package network + +import ( + "bytes" + "errors" +) + +type tracePatch struct { + trace bytes.Buffer + offset int + totalSize int +} + +func (tp *tracePatch) Patch() []byte { + return tp.trace.Bytes()[tp.offset:tp.totalSize] +} + +func (tp *tracePatch) Offset() int { + return tp.offset +} + +func (tp *tracePatch) TotalSize() int { + return tp.totalSize +} + +func (tp *tracePatch) SetNewOffset(newOffset int) { + tp.offset = newOffset +} + +func (tp *tracePatch) ValidateRange() bool { + if tp.totalSize >= tp.offset { + return true + } + + return false +} + +func (tp *tracePatch) PatchEmpty() bool { + return tp.offset == tp.totalSize +} + +func newTracePatch(trace bytes.Buffer, offset int) (*tracePatch, error) { + patch := &tracePatch{ + trace: trace, + offset: offset, + totalSize: trace.Len(), + } + + if !patch.ValidateRange() { + return nil, errors.New("Range is invalid, limit can't be less than offset") + } + + return patch, nil +} diff --git a/network/trace_patch_test.go b/network/trace_patch_test.go new file mode 100644 index 00000000000..4734673ed1b --- /dev/null +++ b/network/trace_patch_test.go @@ -0,0 +1,46 @@ +package network + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" +) + +var traceContent = "test content" + +func TestNewTracePatch(t *testing.T) { + trace := bytes.NewBufferString(traceContent) + tp, err := newTracePatch(*trace, 0) + assert.NoError(t, err) + + assert.Equal(t, 0, tp.Offset()) + assert.Equal(t, len(traceContent), tp.TotalSize()) + assert.Equal(t, []byte(traceContent), tp.Patch()) + assert.False(t, tp.PatchEmpty()) +} + +func TestInvalidTracePatchInitialOffsetValue(t *testing.T) { + trace := bytes.NewBufferString("test") + _, err := newTracePatch(*trace, trace.Len()+10) + assert.EqualError(t, err, "Range is invalid, limit can't be less than offset") +} + +func TestTracePatch_PatchAfterSetNewOffset(t *testing.T) { + trace := bytes.NewBufferString(traceContent) + tp, err := newTracePatch(*trace, 0) + assert.NoError(t, err) + + tp.SetNewOffset(5) + assert.Equal(t, []byte("content"), tp.Patch()) +} + +func TestTracePatchEmptyPatch(t *testing.T) { + trace := bytes.NewBufferString(traceContent) + tp, err := newTracePatch(*trace, len(traceContent)) + assert.NoError(t, err) + + assert.Empty(t, tp.Patch()) + assert.True(t, tp.PatchEmpty()) +} + diff --git a/network/trace_test.go b/network/trace_test.go index 1fab227bfa2..42d2020bf8e 100644 --- a/network/trace_test.go +++ b/network/trace_test.go @@ -62,7 +62,7 @@ func generateJobInfoMatcherWithAnyTrace(id int, state common.JobState, failureRe func TestJobTraceUpdateSucceeded(t *testing.T) { traceMessage := "test content" patchTraceMatcher := mock.MatchedBy(func(tracePatch common.JobTracePatch) bool { - return tracePatch.FirstByte() == 0 && string(tracePatch.Patch()) == traceMessage + return tracePatch.Offset() == 0 && string(tracePatch.Patch()) == traceMessage }) tests := []struct { @@ -213,7 +213,7 @@ func TestJobForceSend(t *testing.T) { var wg sync.WaitGroup traceMessage := "test content" firstPatchMatcher := mock.MatchedBy(func(tracePatch common.JobTracePatch) bool { - return tracePatch.FirstByte() == 0 && string(tracePatch.Patch()) == traceMessage + return tracePatch.Offset() == 0 && string(tracePatch.Patch()) == traceMessage }) keepAliveUpdateMatcher := generateJobInfoMatcher(jobCredentials.ID, common.Running, nil, "") updateMatcher := generateJobInfoMatcherWithAnyTrace(jobCredentials.ID, common.Success, common.NoneFailure) @@ -344,9 +344,3 @@ func TestFinalUpdateWithTrace(t *testing.T) { wg.Wait() } - -func TestInvalidTracePatchInitialOffsetValue(t *testing.T) { - trace := bytes.NewBufferString("test") - _, err := newTracePatch(*trace, trace.Len()+10) - assert.EqualError(t, err, "Range is invalid, limit can't be less than offset") -} \ No newline at end of file -- GitLab From cd8ff13aec3e3348d305d42747c463d85e6c23cc Mon Sep 17 00:00:00 2001 From: Tomasz Maczukin <tomasz@maczukin.pl> Date: Wed, 4 Jul 2018 20:14:09 +0200 Subject: [PATCH 12/13] Regenerate mocks --- common/mock_JobTracePatch.go | 42 ++++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/common/mock_JobTracePatch.go b/common/mock_JobTracePatch.go index 12dbca55caa..02c48b70078 100644 --- a/common/mock_JobTracePatch.go +++ b/common/mock_JobTracePatch.go @@ -11,8 +11,8 @@ type MockJobTracePatch struct { mock.Mock } -// FirstByte provides a mock function with given fields: -func (_m *MockJobTracePatch) FirstByte() int { +// Offset provides a mock function with given fields: +func (_m *MockJobTracePatch) Offset() int { ret := _m.Called() var r0 int @@ -25,31 +25,31 @@ func (_m *MockJobTracePatch) FirstByte() int { return r0 } -// LastByte provides a mock function with given fields: -func (_m *MockJobTracePatch) LastByte() int { +// Patch provides a mock function with given fields: +func (_m *MockJobTracePatch) Patch() []byte { ret := _m.Called() - var r0 int - if rf, ok := ret.Get(0).(func() int); ok { + var r0 []byte + if rf, ok := ret.Get(0).(func() []byte); ok { r0 = rf() } else { - r0 = ret.Get(0).(int) + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } } return r0 } -// Patch provides a mock function with given fields: -func (_m *MockJobTracePatch) Patch() []byte { +// PatchEmpty provides a mock function with given fields: +func (_m *MockJobTracePatch) PatchEmpty() bool { ret := _m.Called() - var r0 []byte - if rf, ok := ret.Get(0).(func() []byte); ok { + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { r0 = rf() } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]byte) - } + r0 = ret.Get(0).(bool) } return r0 @@ -60,6 +60,20 @@ func (_m *MockJobTracePatch) SetNewOffset(newOffset int) { _m.Called(newOffset) } +// TotalSize provides a mock function with given fields: +func (_m *MockJobTracePatch) TotalSize() int { + ret := _m.Called() + + var r0 int + if rf, ok := ret.Get(0).(func() int); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int) + } + + return r0 +} + // ValidateRange provides a mock function with given fields: func (_m *MockJobTracePatch) ValidateRange() bool { ret := _m.Called() -- GitLab From db196768bc3c489f73fec1ec715a8e26e069cecb Mon Sep 17 00:00:00 2001 From: Tomasz Maczukin <tomasz@maczukin.pl> Date: Thu, 9 Aug 2018 22:12:07 +0200 Subject: [PATCH 13/13] Remove PatchEmpty() from the interface --- common/mock_JobTracePatch.go | 14 -------------- common/network.go | 1 - network/gitlab.go | 2 +- network/trace_patch.go | 4 ---- network/trace_patch_test.go | 2 -- 5 files changed, 1 insertion(+), 22 deletions(-) diff --git a/common/mock_JobTracePatch.go b/common/mock_JobTracePatch.go index 02c48b70078..25ac92d6ea8 100644 --- a/common/mock_JobTracePatch.go +++ b/common/mock_JobTracePatch.go @@ -41,20 +41,6 @@ func (_m *MockJobTracePatch) Patch() []byte { return r0 } -// PatchEmpty provides a mock function with given fields: -func (_m *MockJobTracePatch) PatchEmpty() bool { - ret := _m.Called() - - var r0 bool - if rf, ok := ret.Get(0).(func() bool); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(bool) - } - - return r0 -} - // SetNewOffset provides a mock function with given fields: newOffset func (_m *MockJobTracePatch) SetNewOffset(newOffset int) { _m.Called(newOffset) diff --git a/common/network.go b/common/network.go index b6f21ee03b8..e9c4a02d921 100644 --- a/common/network.go +++ b/common/network.go @@ -353,7 +353,6 @@ type JobTracePatch interface { TotalSize() int SetNewOffset(newOffset int) ValidateRange() bool - PatchEmpty() bool } type Network interface { diff --git a/network/gitlab.go b/network/gitlab.go index 7caf75325b5..ae08a262e2c 100644 --- a/network/gitlab.go +++ b/network/gitlab.go @@ -341,7 +341,7 @@ func (n *GitLabClient) PatchTrace(config common.RunnerConfig, jobCredentials *co id := jobCredentials.ID baseLog := config.Log().WithField("job", id) - if tracePatch.PatchEmpty() { + if tracePatch.Offset() == tracePatch.TotalSize() { baseLog.Warningln("Appending trace to coordinator...", "skipped due to empty patch") return common.UpdateFailed } diff --git a/network/trace_patch.go b/network/trace_patch.go index e103266aeec..46e5abab8e9 100644 --- a/network/trace_patch.go +++ b/network/trace_patch.go @@ -35,10 +35,6 @@ func (tp *tracePatch) ValidateRange() bool { return false } -func (tp *tracePatch) PatchEmpty() bool { - return tp.offset == tp.totalSize -} - func newTracePatch(trace bytes.Buffer, offset int) (*tracePatch, error) { patch := &tracePatch{ trace: trace, diff --git a/network/trace_patch_test.go b/network/trace_patch_test.go index 4734673ed1b..a5aa9e8560c 100644 --- a/network/trace_patch_test.go +++ b/network/trace_patch_test.go @@ -17,7 +17,6 @@ func TestNewTracePatch(t *testing.T) { assert.Equal(t, 0, tp.Offset()) assert.Equal(t, len(traceContent), tp.TotalSize()) assert.Equal(t, []byte(traceContent), tp.Patch()) - assert.False(t, tp.PatchEmpty()) } func TestInvalidTracePatchInitialOffsetValue(t *testing.T) { @@ -41,6 +40,5 @@ func TestTracePatchEmptyPatch(t *testing.T) { assert.NoError(t, err) assert.Empty(t, tp.Patch()) - assert.True(t, tp.PatchEmpty()) } -- GitLab