Commit 690e599b authored by Tomasz Maczukin's avatar Tomasz Maczukin 🌴 Committed by Kamil Trzciński

Fix wrongly generated `Content-Range` header for `PATCH /api/v4/jobs/:id/trace` request

parent fb161916
......@@ -11,20 +11,6 @@ type MockJobTracePatch struct {
mock.Mock
}
// Limit provides a mock function with given fields:
func (_m *MockJobTracePatch) Limit() int {
ret := _m.Called()
var r0 int
if rf, ok := ret.Get(0).(func() int); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int)
}
return r0
}
// Offset provides a mock function with given fields:
func (_m *MockJobTracePatch) Offset() int {
ret := _m.Called()
......@@ -60,6 +46,20 @@ func (_m *MockJobTracePatch) SetNewOffset(newOffset int) {
_m.Called(newOffset)
}
// TotalSize provides a mock function with given fields:
func (_m *MockJobTracePatch) TotalSize() int {
ret := _m.Called()
var r0 int
if rf, ok := ret.Get(0).(func() int); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int)
}
return r0
}
// ValidateRange provides a mock function with given fields:
func (_m *MockJobTracePatch) ValidateRange() bool {
ret := _m.Called()
......
......@@ -350,7 +350,7 @@ type JobTrace interface {
type JobTracePatch interface {
Patch() []byte
Offset() int
Limit() int
TotalSize() int
SetNewOffset(newOffset int)
ValidateRange() bool
}
......
......@@ -272,13 +272,13 @@ func (n *client) do(uri, method string, request io.Reader, requestType string, h
return
}
func (n *client) doJSON(uri, method string, statusCode int, request interface{}, response interface{}) (int, string, ResponseTLSData) {
func (n *client) doJSON(uri, method string, statusCode int, request interface{}, response interface{}) (int, string, ResponseTLSData, *http.Response) {
var body io.Reader
if request != nil {
requestBody, err := json.Marshal(request)
if err != nil {
return -1, fmt.Sprintf("failed to marshal project object: %v", err), ResponseTLSData{}
return -1, fmt.Sprintf("failed to marshal project object: %v", err), ResponseTLSData{}, nil
}
body = bytes.NewReader(requestBody)
}
......@@ -290,7 +290,7 @@ func (n *client) doJSON(uri, method string, statusCode int, request interface{},
res, err := n.do(uri, method, body, "application/json", headers)
if err != nil {
return -1, err.Error(), ResponseTLSData{}
return -1, err.Error(), ResponseTLSData{}, nil
}
defer res.Body.Close()
defer io.Copy(ioutil.Discard, res.Body)
......@@ -299,13 +299,13 @@ func (n *client) doJSON(uri, method string, statusCode int, request interface{},
if response != nil {
isApplicationJSON, err := isResponseApplicationJSON(res)
if !isApplicationJSON {
return -1, err.Error(), ResponseTLSData{}
return -1, err.Error(), ResponseTLSData{}, nil
}
d := json.NewDecoder(res.Body)
err = d.Decode(response)
if err != nil {
return -1, fmt.Sprintf("Error decoding json payload %v", err), ResponseTLSData{}
return -1, fmt.Sprintf("Error decoding json payload %v", err), ResponseTLSData{}, nil
}
}
}
......@@ -318,7 +318,7 @@ func (n *client) doJSON(uri, method string, statusCode int, request interface{},
KeyFile: n.keyFile,
}
return res.StatusCode, res.Status, TLSData
return res.StatusCode, res.Status, TLSData, res
}
func isResponseApplicationJSON(res *http.Response) (result bool, err error) {
......
......@@ -117,7 +117,7 @@ func TestClientDo(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, c)
statusCode, statusText, _ := c.doJSON("test/auth", "GET", http.StatusOK, nil, nil)
statusCode, statusText, _, _ := c.doJSON("test/auth", "GET", http.StatusOK, nil, nil)
assert.Equal(t, http.StatusForbidden, statusCode, statusText)
req := struct {
......@@ -130,16 +130,16 @@ func TestClientDo(t *testing.T) {
Key string `json:"key"`
}{}
statusCode, statusText, _ = c.doJSON("test/json", "GET", http.StatusOK, nil, &res)
statusCode, statusText, _, _ = c.doJSON("test/json", "GET", http.StatusOK, nil, &res)
assert.Equal(t, http.StatusBadRequest, statusCode, statusText)
statusCode, statusText, _ = c.doJSON("test/json", "GET", http.StatusOK, &req, nil)
statusCode, statusText, _, _ = c.doJSON("test/json", "GET", http.StatusOK, &req, nil)
assert.Equal(t, http.StatusNotAcceptable, statusCode, statusText)
statusCode, statusText, _ = c.doJSON("test/json", "GET", http.StatusOK, nil, nil)
statusCode, statusText, _, _ = c.doJSON("test/json", "GET", http.StatusOK, nil, nil)
assert.Equal(t, http.StatusBadRequest, statusCode, statusText)
statusCode, statusText, _ = c.doJSON("test/json", "GET", http.StatusOK, &req, &res)
statusCode, statusText, _, _ = c.doJSON("test/json", "GET", http.StatusOK, &req, &res)
assert.Equal(t, http.StatusOK, statusCode, statusText)
assert.Equal(t, "value", res.Key, statusText)
}
......@@ -151,7 +151,7 @@ func TestClientInvalidSSL(t *testing.T) {
c, _ := newClient(&RunnerCredentials{
URL: s.URL,
})
statusCode, statusText, _ := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil)
statusCode, statusText, _, _ := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil)
assert.Equal(t, -1, statusCode, statusText)
assert.Contains(t, statusText, "certificate signed by unknown authority")
}
......@@ -172,7 +172,7 @@ func TestClientTLSCAFile(t *testing.T) {
URL: s.URL,
TLSCAFile: file.Name(),
})
statusCode, statusText, tlsData := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil)
statusCode, statusText, tlsData, _ := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil)
assert.Equal(t, http.StatusOK, statusCode, statusText)
assert.NotEmpty(t, tlsData.CAChain)
}
......@@ -197,7 +197,7 @@ func TestClientCertificateInPredefinedDirectory(t *testing.T) {
c, _ := newClient(&RunnerCredentials{
URL: s.URL,
})
statusCode, statusText, tlsData := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil)
statusCode, statusText, tlsData, _ := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil)
assert.Equal(t, http.StatusOK, statusCode, statusText)
assert.NotEmpty(t, tlsData.CAChain)
}
......@@ -221,7 +221,7 @@ func TestClientInvalidTLSAuth(t *testing.T) {
URL: s.URL,
TLSCAFile: ca.Name(),
})
statusCode, statusText, _ := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil)
statusCode, statusText, _, _ := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil)
assert.Equal(t, -1, statusCode, statusText)
assert.Contains(t, statusText, "tls: bad certificate")
}
......@@ -260,7 +260,7 @@ func TestClientTLSAuth(t *testing.T) {
TLSCertFile: cert.Name(),
TLSKeyFile: key.Name(),
})
statusCode, statusText, tlsData := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil)
statusCode, statusText, tlsData, _ := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil)
assert.Equal(t, http.StatusOK, statusCode, statusText)
assert.NotEmpty(t, tlsData.CAChain)
assert.Equal(t, cert.Name(), tlsData.CertFile)
......@@ -295,7 +295,7 @@ func TestClientTLSAuthCertificatesInPredefinedDirectory(t *testing.T) {
c, _ := newClient(&RunnerCredentials{
URL: s.URL,
})
statusCode, statusText, tlsData := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil)
statusCode, statusText, tlsData, _ := c.doJSON("test/ok", "GET", http.StatusOK, nil, nil)
assert.Equal(t, http.StatusOK, statusCode, statusText)
assert.NotEmpty(t, tlsData.CAChain)
assert.NotEmpty(t, tlsData.CertFile)
......@@ -350,16 +350,16 @@ func TestClientHandleCharsetInContentType(t *testing.T) {
Key string `json:"key"`
}{}
statusCode, statusText, _ := c.doJSON("with-charset", "GET", http.StatusOK, nil, &res)
statusCode, statusText, _, _ := c.doJSON("with-charset", "GET", http.StatusOK, nil, &res)
assert.Equal(t, http.StatusOK, statusCode, statusText)
statusCode, statusText, _ = c.doJSON("without-charset", "GET", http.StatusOK, nil, &res)
statusCode, statusText, _, _ = c.doJSON("without-charset", "GET", http.StatusOK, nil, &res)
assert.Equal(t, http.StatusOK, statusCode, statusText)
statusCode, statusText, _ = c.doJSON("without-json", "GET", http.StatusOK, nil, &res)
statusCode, statusText, _, _ = c.doJSON("without-json", "GET", http.StatusOK, nil, &res)
assert.Equal(t, -1, statusCode, statusText)
statusCode, statusText, _ = c.doJSON("invalid-header", "GET", http.StatusOK, nil, &res)
statusCode, statusText, _, _ = c.doJSON("invalid-header", "GET", http.StatusOK, nil, &res)
assert.Equal(t, -1, statusCode, statusText)
}
......
......@@ -159,10 +159,10 @@ func (n *GitLabClient) doRaw(credentials requestCredentials, method, uri string,
return c.do(uri, method, request, requestType, headers)
}
func (n *GitLabClient) doJSON(credentials requestCredentials, method, uri string, statusCode int, request interface{}, response interface{}) (int, string, ResponseTLSData) {
func (n *GitLabClient) doJSON(credentials requestCredentials, method, uri string, statusCode int, request interface{}, response interface{}) (int, string, ResponseTLSData, *http.Response) {
c, err := n.getClient(credentials)
if err != nil {
return clientError, err.Error(), ResponseTLSData{}
return clientError, err.Error(), ResponseTLSData{}, nil
}
return c.doJSON(uri, method, statusCode, request, response)
......@@ -177,7 +177,7 @@ func (n *GitLabClient) RegisterRunner(runner common.RunnerCredentials, parameter
}
var response common.RegisterRunnerResponse
result, statusText, _ := n.doJSON(&runner, "POST", "runners", http.StatusCreated, &request, &response)
result, statusText, _, _ := n.doJSON(&runner, "POST", "runners", http.StatusCreated, &request, &response)
switch result {
case http.StatusCreated:
......@@ -200,7 +200,7 @@ func (n *GitLabClient) VerifyRunner(runner common.RunnerCredentials) bool {
Token: runner.Token,
}
result, statusText, _ := n.doJSON(&runner, "POST", "runners/verify", http.StatusOK, &request, nil)
result, statusText, _, _ := n.doJSON(&runner, "POST", "runners/verify", http.StatusOK, &request, nil)
switch result {
case http.StatusOK:
......@@ -224,7 +224,7 @@ func (n *GitLabClient) UnregisterRunner(runner common.RunnerCredentials) bool {
Token: runner.Token,
}
result, statusText, _ := n.doJSON(&runner, "DELETE", "runners", http.StatusNoContent, &request, nil)
result, statusText, _, _ := n.doJSON(&runner, "DELETE", "runners", http.StatusNoContent, &request, nil)
const baseLogText = "Unregistering runner from GitLab"
switch result {
......@@ -269,7 +269,7 @@ func (n *GitLabClient) RequestJob(config common.RunnerConfig) (*common.JobRespon
}
var response common.JobResponse
result, statusText, tlsData := n.doJSON(&config.RunnerCredentials, "POST", "jobs/request", http.StatusCreated, &request, &response)
result, statusText, tlsData, _ := n.doJSON(&config.RunnerCredentials, "POST", "jobs/request", http.StatusCreated, &request, &response)
n.requestsStatusesMap.Append(config.RunnerCredentials.ShortDescription(), APIEndpointRequestJob, result)
......@@ -305,23 +305,30 @@ func (n *GitLabClient) UpdateJob(config common.RunnerConfig, jobCredentials *com
Trace: jobInfo.Trace,
}
log := config.Log().WithField("job", jobInfo.ID)
result, statusText, _ := n.doJSON(&config.RunnerCredentials, "PUT", fmt.Sprintf("jobs/%d", jobInfo.ID), http.StatusOK, &request, nil)
result, statusText, _, response := n.doJSON(&config.RunnerCredentials, "PUT", fmt.Sprintf("jobs/%d", jobInfo.ID), http.StatusOK, &request, nil)
n.requestsStatusesMap.Append(config.RunnerCredentials.ShortDescription(), APIEndpointUpdateJob, result)
switch result {
case http.StatusOK:
remoteJobStateResponse := NewRemoteJobStateResponse(response)
log := config.Log().WithFields(logrus.Fields{
"code": result,
"job": jobInfo.ID,
"job-status": remoteJobStateResponse.RemoteState,
})
switch {
case remoteJobStateResponse.IsAborted():
log.Warningln("Submitting job to coordinator...", "aborted")
return common.UpdateAbort
case result == http.StatusOK:
log.Debugln("Submitting job to coordinator...", "ok")
return common.UpdateSucceeded
case http.StatusNotFound:
case result == http.StatusNotFound:
log.Warningln("Submitting job to coordinator...", "aborted")
return common.UpdateAbort
case http.StatusForbidden:
case result == http.StatusForbidden:
log.WithField("status", statusText).Errorln("Submitting job to coordinator...", "forbidden")
return common.UpdateAbort
case clientError:
case result == clientError:
log.WithField("status", statusText).Errorln("Submitting job to coordinator...", "error")
return common.UpdateAbort
default:
......@@ -333,7 +340,14 @@ func (n *GitLabClient) UpdateJob(config common.RunnerConfig, jobCredentials *com
func (n *GitLabClient) PatchTrace(config common.RunnerConfig, jobCredentials *common.JobCredentials, tracePatch common.JobTracePatch) common.UpdateState {
id := jobCredentials.ID
contentRange := fmt.Sprintf("%d-%d", tracePatch.Offset(), tracePatch.Limit())
baseLog := config.Log().WithField("job", id)
if tracePatch.Offset() == tracePatch.TotalSize() {
baseLog.Warningln("Appending trace to coordinator...", "skipped due to empty patch")
return common.UpdateFailed
}
contentRange := fmt.Sprintf("%d-%d", tracePatch.Offset(), tracePatch.TotalSize()-1)
headers := make(http.Header)
headers.Set("Content-Range", contentRange)
headers.Set("JOB-TOKEN", jobCredentials.Token)
......@@ -353,8 +367,7 @@ func (n *GitLabClient) PatchTrace(config common.RunnerConfig, jobCredentials *co
defer io.Copy(ioutil.Discard, response.Body)
tracePatchResponse := NewTracePatchResponse(response)
log := config.Log().WithFields(logrus.Fields{
"job": id,
log := baseLog.WithFields(logrus.Fields{
"sent-log": contentRange,
"job-log": tracePatchResponse.RemoteRange,
"job-status": tracePatchResponse.RemoteState,
......@@ -364,7 +377,7 @@ func (n *GitLabClient) PatchTrace(config common.RunnerConfig, jobCredentials *co
switch {
case tracePatchResponse.IsAborted():
log.Warningln("Appending trace to coordinator", "aborted")
log.Warningln("Appending trace to coordinator...", "aborted")
return common.UpdateAbort
case response.StatusCode == http.StatusAccepted:
log.Debugln("Appending trace to coordinator...", "ok")
......
......@@ -595,6 +595,67 @@ func TestUpdateJob(t *testing.T) {
assert.Equal(t, UpdateFailed, state, "Update should fail for badly formatted request")
}
func testUpdateJobKeepAliveHandler(w http.ResponseWriter, r *http.Request, t *testing.T) {
if r.Method != "PUT" {
w.WriteHeader(http.StatusNotAcceptable)
return
}
switch r.URL.Path {
case "/api/v4/jobs/10":
case "/api/v4/jobs/11":
w.Header().Set("Job-Status", "canceled")
case "/api/v4/jobs/12":
w.Header().Set("Job-Status", "failed")
default:
w.WriteHeader(http.StatusNotFound)
return
}
body, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
var req map[string]interface{}
err = json.Unmarshal(body, &req)
assert.NoError(t, err)
assert.Equal(t, "token", req["token"])
w.WriteHeader(http.StatusOK)
}
func TestUpdateJobAsKeepAlive(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request) {
testUpdateJobKeepAliveHandler(w, r, t)
}
s := httptest.NewServer(http.HandlerFunc(handler))
defer s.Close()
config := RunnerConfig{
RunnerCredentials: RunnerCredentials{
URL: s.URL,
},
}
jobCredentials := &JobCredentials{
Token: "token",
}
c := NewGitLabClient()
var state UpdateState
state = c.UpdateJob(config, jobCredentials, UpdateJobInfo{ID: 10, State: "running"})
assert.Equal(t, UpdateSucceeded, state, "Update should continue when running")
state = c.UpdateJob(config, jobCredentials, UpdateJobInfo{ID: 11, State: "running"})
assert.Equal(t, UpdateAbort, state, "Update should be aborted when Job-Status=canceled")
state = c.UpdateJob(config, jobCredentials, UpdateJobInfo{ID: 12, State: "running"})
assert.Equal(t, UpdateAbort, state, "Update should continue when Job-Status=failed")
}
var patchToken = "token"
var patchTraceString = "trace trace trace"
......@@ -674,8 +735,7 @@ func TestForbiddenPatchTrace(t *testing.T) {
func TestPatchTrace(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) {
assert.Equal(t, patchTraceString[offset:limit], body)
assert.Equal(t, patchTraceString[offset:limit+1], body)
w.WriteHeader(http.StatusAccepted)
}
......@@ -791,6 +851,136 @@ func TestPatchTraceCantConnect(t *testing.T) {
assert.Equal(t, UpdateFailed, state)
}
func TestPatchTraceUpdatedTrace(t *testing.T) {
sentTrace := 0
traceString := ""
updates := []struct {
traceUpdate string
expectedContentRange string
expectedContentLength int64
expectedResult UpdateState
shouldNotCallPatchTrace bool
}{
{traceUpdate: "test", expectedContentRange: "0-3", expectedContentLength: 4, expectedResult: UpdateSucceeded},
{traceUpdate: "", expectedResult: UpdateFailed, shouldNotCallPatchTrace: true},
{traceUpdate: " ", expectedContentRange: "4-4", expectedContentLength: 1, expectedResult: UpdateSucceeded},
{traceUpdate: "test", expectedContentRange: "5-8", expectedContentLength: 4, expectedResult: UpdateSucceeded},
}
for id, update := range updates {
t.Run(fmt.Sprintf("patch-%d", id+1), func(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) {
if update.shouldNotCallPatchTrace {
t.Error("PatchTrace endpoint should not be called")
return
}
if limit+1 <= len(traceString) {
assert.Equal(t, traceString[offset:limit+1], body)
}
assert.Equal(t, update.traceUpdate, body)
assert.Equal(t, update.expectedContentRange, r.Header.Get("Content-Range"))
assert.Equal(t, update.expectedContentLength, r.ContentLength)
w.WriteHeader(http.StatusAccepted)
}
server, client, config := getPatchServer(t, handler)
defer server.Close()
traceString += update.traceUpdate
tracePatch := getTracePatch(traceString, sentTrace)
result := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
assert.Equal(t, update.expectedResult, result)
sentTrace = tracePatch.totalSize
})
}
}
func TestPatchTraceContentRangeAndLength(t *testing.T) {
tests := []struct {
name string
trace string
expectedContentRange string
expectedContentLength int64
expectedResult UpdateState
shouldNotCallPatchTrace bool
}{
{name: "0 bytes", trace: "", expectedResult: UpdateFailed, shouldNotCallPatchTrace: true},
{name: "1 byte", trace: "1", expectedContentRange: "0-0", expectedContentLength: 1, expectedResult: UpdateSucceeded, shouldNotCallPatchTrace: false},
{name: "2 bytes", trace: "12", expectedContentRange: "0-1", expectedContentLength: 2, expectedResult: UpdateSucceeded, shouldNotCallPatchTrace: false},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) {
if test.shouldNotCallPatchTrace {
t.Error("PatchTrace endpoint should not be called")
return
}
assert.Equal(t, test.expectedContentRange, r.Header.Get("Content-Range"))
assert.Equal(t, test.expectedContentLength, r.ContentLength)
w.WriteHeader(http.StatusAccepted)
}
server, client, config := getPatchServer(t, handler)
defer server.Close()
tracePatch := getTracePatch(test.trace, 0)
result := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
assert.Equal(t, test.expectedResult, result)
})
}
}
func TestPatchTraceContentRangeHeaderValues(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) {
contentRange := r.Header.Get("Content-Range")
bytes := strings.Split(contentRange, "-")
startByte, err := strconv.Atoi(bytes[0])
require.NoError(t, err, "Should not set error when parsing Content-Range startByte component")
endByte, err := strconv.Atoi(bytes[1])
require.NoError(t, err, "Should not set error when parsing Content-Range endByte component")
assert.Equal(t, 0, startByte, "Content-Range should contain start byte as first field")
assert.Equal(t, len(patchTraceString)-1, endByte, "Content-Range should contain end byte as second field")
w.WriteHeader(http.StatusAccepted)
}
server, client, config := getPatchServer(t, handler)
defer server.Close()
tracePatch := getTracePatch(patchTraceString, 0)
client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
}
func TestAbortedPatchTrace(t *testing.T) {
statuses := []string{"canceled", "failed"}
for _, status := range statuses {
t.Run(status, func(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request, body string, offset, limit int) {
w.Header().Set("Job-Status", status)
w.WriteHeader(http.StatusAccepted)
}
server, client, config := getPatchServer(t, handler)
defer server.Close()
tracePatch := getTracePatch(patchTraceString, 0)
state := client.PatchTrace(config, &JobCredentials{ID: 1, Token: patchToken}, tracePatch)
assert.Equal(t, UpdateAbort, state)
})
}
}
func testArtifactsUploadHandler(w http.ResponseWriter, r *http.Request, t *testing.T) {
if r.URL.Path != "/api/v4/jobs/10/artifacts" {
w.WriteHeader(http.StatusNotFound)
......
......@@ -7,24 +7,11 @@ import (
)
type TracePatchResponse struct {
response *http.Response
*RemoteJobStateResponse
RemoteState string
RemoteRange string
}
func (p *TracePatchResponse) IsAborted() bool {
if p.RemoteState == "canceled" || p.RemoteState == "failed" {
return true
}
if p.response.StatusCode == http.StatusForbidden {
return true
}
return false
}
func (p *TracePatchResponse) NewOffset() int {
remoteRangeParts := strings.Split(p.RemoteRange, "-")
newOffset, _ := strconv.Atoi(remoteRangeParts[1])
......@@ -34,8 +21,7 @@ func (p *TracePatchResponse) NewOffset() int {
func NewTracePatchResponse(response *http.Response) *TracePatchResponse {
return &TracePatchResponse{
response: response,
RemoteState: response.Header.Get("Job-Status"),
RemoteRange: response.Header.Get("Range"),
RemoteJobStateResponse: NewRemoteJobStateResponse(response),
RemoteRange: response.Header.Get("Range"),
}
}
package network
import (
"net/http"
)
type RemoteJobStateResponse struct {
StatusCode int
RemoteState string
}
func (r *RemoteJobStateResponse) IsAborted() bool {
if r.RemoteState == "canceled" || r.RemoteState == "failed" {
return true
}
if r.StatusCode == http.StatusForbidden {
return true
}
return false
}
func NewRemoteJobStateResponse(response *http.Response) *RemoteJobStateResponse {
if response == nil {
return &RemoteJobStateResponse{}
}
return &RemoteJobStateResponse{
StatusCode: response.StatusCode,
RemoteState: response.Header.Get("Job-Status"),
}
}
......@@ -4,7 +4,6 @@ import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"sync"
......@@ -14,50 +13,6 @@ import (
"gitlab.com/gitlab-org/gitlab-runner/helpers"
)
type tracePatch struct {
trace bytes.Buffer
offset int
limit int
}
func (tp *tracePatch) Patch() []byte {
return tp.trace.Bytes()[tp.offset:tp.limit]
}
func (tp *tracePatch) Offset() int {
return tp.offset
}
func (tp *tracePatch) Limit() int {
return tp.limit
}
func (tp *tracePatch) SetNewOffset(newOffset int) {
tp.offset = newOffset
}
func (tp *tracePatch) ValidateRange() bool {
if tp.limit >= tp.offset {
return true
}
return false
}
func newTracePatch(trace bytes.Buffer, offset int) (*tracePatch, error) {
patch := &tracePatch{
trace: trace,
offset: offset,
limit: trace.Len(),
}
if !patch.ValidateRange() {
return nil, errors.New("Range is invalid, limit can't be less than offset")
}
return patch, nil
}
type clientJobTrace struct {
*io.PipeWriter
......@@ -197,22 +152,26 @@ func (c *clientJobTrace) incrementalUpdate() common.UpdateState {
trace := c.log
c.lock.RUnlock()
if c.sentState == state &&
c.sentTrace == trace.Len() &&
time.Since(c.sentTime) < c.forceSendInterval {
return common.UpdateSucceeded
if c.sentTrace != trace.Len() {
result := c.sendPatch(trace)
if result != common.UpdateSucceeded {
return result
}
}
if c.sentState != state {
jobInfo := common.UpdateJobInfo{
ID: c.id,
State: state,
FailureReason: c.failureReason,
if c.sentState != state || time.Since(c.sentTime) > c.forceSendInterval {
if state == common.Running { // we should only follow-up with Running!
result := c.sendUpdate(state)
if result != common.UpdateSucceeded {
return result
}
}
c.client.UpdateJob(c.config, c.jobCredentials, jobInfo)
c.sentState = state
}
return common.UpdateSucceeded
}
func (c *clientJobTrace) sendPatch(trace bytes.Buffer) common.UpdateState {
tracePatch, err := newTracePatch(trace, c.sentTrace)
if err != nil {
c.config.Log().Errorln("Error while creating a tracePatch", err.Error())
......@@ -228,7 +187,7 @@ func (c *clientJobTrace) incrementalUpdate() common.UpdateState {