Skip to content
Snippets Groups Projects
Verified Commit 23747451 authored by Tomasz Maczukin's avatar Tomasz Maczukin 💬
Browse files

Add dedicated keep-alive endpoint

parent 0d424989
No related branches found
No related tags found
No related merge requests found
This commit is part of merge request !906. Comments created here will be created in the context of that merge request.
......@@ -26,6 +26,20 @@ func (_m *MockNetwork) DownloadArtifacts(config JobCredentials, artifactsFile st
return r0
}
// KeepAlive provides a mock function with given fields: config, jobCredentials
func (_m *MockNetwork) KeepAlive(config RunnerConfig, jobCredentials *JobCredentials) UpdateState {
ret := _m.Called(config, jobCredentials)
var r0 UpdateState
if rf, ok := ret.Get(0).(func(RunnerConfig, *JobCredentials) UpdateState); ok {
r0 = rf(config, jobCredentials)
} else {
r0 = ret.Get(0).(UpdateState)
}
return r0
}
// PatchTrace provides a mock function with given fields: config, jobCredentials, tracePart
func (_m *MockNetwork) PatchTrace(config RunnerConfig, jobCredentials *JobCredentials, tracePart JobTracePatch) UpdateState {
ret := _m.Called(config, jobCredentials, tracePart)
......
......@@ -346,6 +346,7 @@ type Network interface {
UnregisterRunner(config RunnerCredentials) bool
RequestJob(config RunnerConfig) (*JobResponse, bool)
UpdateJob(config RunnerConfig, jobCredentials *JobCredentials, jobInfo UpdateJobInfo) UpdateState
KeepAlive(config RunnerConfig, jobCredentials *JobCredentials) UpdateState
PatchTrace(config RunnerConfig, jobCredentials *JobCredentials, tracePart JobTracePatch) UpdateState
DownloadArtifacts(config JobCredentials, artifactsFile string) DownloadState
UploadRawArtifacts(config JobCredentials, reader io.Reader, baseName string, expireIn string) UploadState
......
......@@ -33,9 +33,10 @@ var apiRequestStatuses = prometheus.NewDesc(
type APIEndpoint string
const (
APIEndpointRequestJob APIEndpoint = "request_job"
APIEndpointUpdateJob APIEndpoint = "update_job"
APIEndpointPatchTrace APIEndpoint = "patch_trace"
APIEndpointRequestJob APIEndpoint = "request_job"
APIEndpointUpdateJob APIEndpoint = "update_job"
APIEndpointJobKeepAlive APIEndpoint = "job_keep_alive"
APIEndpointPatchTrace APIEndpoint = "patch_trace"
)
type apiRequestStatusPermutation struct {
......@@ -326,6 +327,43 @@ func (n *GitLabClient) UpdateJob(config common.RunnerConfig, jobCredentials *com
}
}
func (n *GitLabClient) KeepAlive(config common.RunnerConfig, jobCredentials *common.JobCredentials) common.UpdateState {
id := jobCredentials.ID
headers := make(http.Header)
headers.Set("JOB-TOKEN", jobCredentials.Token)
uri := fmt.Sprintf("jobs/%d/keep-alive", id)
response, err := n.doRaw(&config.RunnerCredentials, "POST", uri, nil, "", headers)
if err != nil {
config.Log().Errorln("Sending keep-alive request...", "error", err.Error())
return common.UpdateFailed
}
n.requestsStatusesMap.Append(config.RunnerCredentials.ShortDescription(), APIEndpointJobKeepAlive, response.StatusCode)
keepAliveResponse := NewKeepAliveResponse(response)
log := config.Log().WithFields(logrus.Fields{
"job": id,
"code": response.StatusCode,
"job-status": keepAliveResponse.RemoteState,
"status": response.Status,
})
switch {
case keepAliveResponse.IsAborted():
log.Warningln("Sending keep-alive request...", "aborted")
return common.UpdateAbort
case response.StatusCode == http.StatusAccepted:
log.Debugln("Sending keep-alive request...", "ok")
return common.UpdateSucceeded
default:
log.Warningln("Sending keep-alive request...", "failed")
return common.UpdateFailed
}
}
func (n *GitLabClient) PatchTrace(config common.RunnerConfig, jobCredentials *common.JobCredentials, tracePatch common.JobTracePatch) common.UpdateState {
id := jobCredentials.ID
......@@ -361,7 +399,7 @@ func (n *GitLabClient) PatchTrace(config common.RunnerConfig, jobCredentials *co
switch {
case tracePatchResponse.IsAborted():
log.Warningln("Appending trace to coordinator", "aborted")
log.Warningln("Appending trace to coordinator...", "aborted")
return common.UpdateAbort
case response.StatusCode == http.StatusAccepted:
log.Debugln("Appending trace to coordinator...", "ok")
......
......@@ -595,6 +595,79 @@ func TestUpdateJob(t *testing.T) {
assert.Equal(t, UpdateFailed, state, "Update should fail for badly formatted request")
}
var keepAliveToken = "token"
func getKeepAliveServer(t *testing.T, handler func(w http.ResponseWriter, r *http.Request)) (*httptest.Server, *GitLabClient, RunnerConfig) {
keepAliveHandler := func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/v4/jobs/1/keep-alive" {
w.WriteHeader(http.StatusNotFound)
return
}
if r.Method != "POST" {
w.WriteHeader(http.StatusNotAcceptable)
return
}
assert.Equal(t, keepAliveToken, r.Header.Get("JOB-TOKEN"))
handler(w, r)
}
server := httptest.NewServer(http.HandlerFunc(keepAliveHandler))
config := RunnerConfig{
RunnerCredentials: RunnerCredentials{
URL: server.URL,
},
}
return server, NewGitLabClient(), config
}
func TestKeepAlive(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusAccepted)
}
server, client, config := getKeepAliveServer(t, handler)
defer server.Close()
state := client.KeepAlive(config, &JobCredentials{ID: 1, Token: keepAliveToken})
assert.Equal(t, UpdateSucceeded, state)
}
func TestKeepAliveUnavailable(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadGateway)
}
server, client, config := getKeepAliveServer(t, handler)
defer server.Close()
state := client.KeepAlive(config, &JobCredentials{ID: 1, Token: keepAliveToken})
assert.Equal(t, UpdateFailed, state)
}
func TestKeepAliveRemoteAbort(t *testing.T) {
statuses := []string{"canceled", "failed"}
for _, status := range statuses {
t.Run(status, func(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Job-Status", status)
w.WriteHeader(http.StatusAccepted)
}
server, client, config := getKeepAliveServer(t, handler)
defer server.Close()
state := client.KeepAlive(config, &JobCredentials{ID: 1, Token: keepAliveToken})
assert.Equal(t, UpdateAbort, state)
})
}
}
var patchToken = "token"
var patchTraceString = "trace trace trace"
......
package network
import (
"net/http"
)
type KeepAliveResponse struct {
response *http.Response
RemoteState string
}
func (k *KeepAliveResponse) IsAborted() bool {
if k.RemoteState == "canceled" || k.RemoteState == "failed" {
return true
}
if k.response.StatusCode == http.StatusForbidden {
return true
}
return false
}
func NewKeepAliveResponse(response *http.Response) *KeepAliveResponse {
return &KeepAliveResponse{
response: response,
RemoteState: response.Header.Get("Job-Status"),
}
}
......@@ -7,24 +7,11 @@ import (
)
type TracePatchResponse struct {
response *http.Response
*KeepAliveResponse
RemoteState string
RemoteRange string
}
func (p *TracePatchResponse) IsAborted() bool {
if p.RemoteState == "canceled" || p.RemoteState == "failed" {
return true
}
if p.response.StatusCode == http.StatusForbidden {
return true
}
return false
}
func (p *TracePatchResponse) NewOffset() int {
remoteRangeParts := strings.Split(p.RemoteRange, "-")
newOffset, _ := strconv.Atoi(remoteRangeParts[1])
......@@ -34,8 +21,7 @@ func (p *TracePatchResponse) NewOffset() int {
func NewTracePatchResponse(response *http.Response) *TracePatchResponse {
return &TracePatchResponse{
response: response,
RemoteState: response.Header.Get("Job-Status"),
KeepAliveResponse: NewKeepAliveResponse(response),
RemoteRange: response.Header.Get("Range"),
}
}
......@@ -194,18 +194,28 @@ func (c *clientJobTrace) process(pipe *io.PipeReader) {
}
}
func (c *clientJobTrace) isUpToDate(state common.JobState, traceLenght int) bool {
return c.sentState == state && c.sentTrace == traceLenght
}
func (c *clientJobTrace) incrementalUpdate() common.UpdateState {
c.lock.RLock()
state := c.state
trace := c.log
c.lock.RUnlock()
if c.sentState == state &&
c.sentTrace == trace.Len() &&
time.Since(c.sentTime) < c.forceSendInterval {
return common.UpdateSucceeded
if c.isUpToDate(state, trace.Len()) {
if time.Since(c.sentTime) < c.forceSendInterval {
return common.UpdateSucceeded
}
return c.client.KeepAlive(c.config, c.jobCredentials)
}
return c.sendPatch(state, trace)
}
func (c *clientJobTrace) sendPatch(state common.JobState, trace bytes.Buffer) common.UpdateState {
if c.sentState != state {
jobInfo := common.UpdateJobInfo{
ID: c.id,
......@@ -271,8 +281,7 @@ func (c *clientJobTrace) fullUpdate() common.UpdateState {
trace := c.log.String()
c.lock.RUnlock()
if c.sentState == state &&
c.sentTrace == len(trace) &&
if c.isUpToDate(state, len(trace)) &&
time.Since(c.sentTime) < c.forceSendInterval {
return common.UpdateSucceeded
}
......
......@@ -134,11 +134,10 @@ func TestJobAbort(t *testing.T) {
updateMatcher := generateJobInfoMatcherWithAnyTrace(jobCredentials.ID, common.Success, common.NoneFailure)
net := new(common.MockNetwork)
net.On("PatchTrace", jobConfig, jobCredentials, mock.AnythingOfType("*network.tracePatch")).Return(common.UpdateAbort)
net.On("KeepAlive", jobConfig, jobCredentials).Return(common.UpdateAbort)
net.On("UpdateJob", jobConfig, jobCredentials, updateMatcher).Return(common.UpdateAbort)
b := newJobTrace(net, jobConfig, jobCredentials)
// force immediate call to `UpdateJob`
b.updateInterval = 0
b.SetCancelFunc(cancel)
......@@ -207,16 +206,13 @@ func TestJobForceSend(t *testing.T) {
firstPatchMatcher := mock.MatchedBy(func(tracePatch common.JobTracePatch) bool {
return tracePatch.FirstByte() == 0 && string(tracePatch.Patch()) == traceMessage
})
nextEmptyPatchMatcher := mock.MatchedBy(func(tracePatch common.JobTracePatch) bool {
return tracePatch.FirstByte() == len(traceMessage) && string(tracePatch.Patch()) == ""
})
updateMatcher := generateJobInfoMatcherWithAnyTrace(jobCredentials.ID, common.Success, common.NoneFailure)
wg.Add(1)
net := new(common.MockNetwork)
net.On("PatchTrace", jobConfig, jobCredentials, firstPatchMatcher).Return(common.UpdateSucceeded).Once()
net.On("PatchTrace", jobConfig, jobCredentials, nextEmptyPatchMatcher).Return(common.UpdateSucceeded).Run(func(_ mock.Arguments) { wg.Done() })
net.On("KeepAlive", jobConfig, jobCredentials).Run(func(_ mock.Arguments) { wg.Done() }).Return(common.UpdateSucceeded)
net.On("UpdateJob", jobConfig, jobCredentials, updateMatcher).Return(common.UpdateSucceeded).Once()
defer net.AssertExpectations(t)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment