From f45cf7e319a9505d0eaa5e6a09f908bf97578450 Mon Sep 17 00:00:00 2001 From: "Georgi N. Georgiev" <ggeorgiev@gitlab.com> Date: Thu, 30 Jun 2022 13:03:12 +0300 Subject: [PATCH] Add fault tolerance with file storage Try 2 --- commands/exec.go | 2 +- commands/helpers/artifact_metadata.go | 29 +- commands/helpers/artifact_metadata_test.go | 4 +- commands/multi.go | 87 +++++- commands/single.go | 4 +- common/build.go | 141 ++++++---- common/network.go | 5 +- common/store.go | 201 ++++++++++++++ common/time.go | 31 +++ common/trace.go | 21 +- executors/kubernetes/kubernetes.go | 308 +++++++++++++++++---- executors/kubernetes/log_processor.go | 29 +- executors/kubernetes/util.go | 4 + go.mod | 3 +- go.sum | 2 + helpers/trace/buffer.go | 19 +- network/gitlab.go | 3 +- network/trace.go | 32 ++- shells/abstract_test.go | 48 ++-- 19 files changed, 792 insertions(+), 181 deletions(-) create mode 100644 common/store.go create mode 100644 common/time.go diff --git a/commands/exec.go b/commands/exec.go index 243d234ed48..e4f8fdc7d6e 100644 --- a/commands/exec.go +++ b/commands/exec.go @@ -86,7 +86,7 @@ func (c *ExecCommand) createBuild(repoURL string, abortSignal chan os.Signal) (* RunnerSettings: c.RunnerSettings, } - return common.NewBuild(jobResponse, runner, abortSignal, nil) + return common.NewBuild(jobResponse, runner, abortSignal, nil, nil, nil) } func (c *ExecCommand) Execute(context *cli.Context) { diff --git a/commands/helpers/artifact_metadata.go b/commands/helpers/artifact_metadata.go index 49c9838639a..84537efddc6 100644 --- a/commands/helpers/artifact_metadata.go +++ b/commands/helpers/artifact_metadata.go @@ -9,7 +9,6 @@ import ( "io/ioutil" "os" "path/filepath" - "strconv" "strings" "time" @@ -88,8 +87,8 @@ type AttestationPredicateInvocationEnvironment struct { type AttestationPredicateInvocationParameters map[string]string type AttestationMetadataInfo struct { - BuildStartedOn TimeRFC3339 `json:"buildStartedOn"` - BuildFinishedOn TimeRFC3339 `json:"buildFinishedOn"` + BuildStartedOn common.TimeRFC3339 `json:"buildStartedOn"` + BuildFinishedOn common.TimeRFC3339 `json:"buildFinishedOn"` Reproducible bool `json:"reproducible"` Completeness AttestationMetadataInfoCompleteness `json:"completeness"` } @@ -100,26 +99,6 @@ type AttestationMetadataInfoCompleteness struct { Materials bool `json:"materials"` } -// TimeRFC3339 is used specifically to marshal and unmarshal time to/from RFC3339 strings -// That's because the metadata is user-facing and using Go's built-in time parsing will not be portable -type TimeRFC3339 struct { - time.Time -} - -func (t *TimeRFC3339) UnmarshalJSON(b []byte) error { - var err error - t.Time, err = time.Parse(time.RFC3339, strings.Trim(string(b), `"`)) - return err -} - -func (t TimeRFC3339) MarshalJSON() ([]byte, error) { - if t.IsZero() { - return nil, nil - } - - return []byte(strconv.Quote(t.Time.Format(time.RFC3339))), nil -} - type generateMetadataOptions struct { files map[string]os.FileInfo wd string @@ -181,8 +160,8 @@ func (g *artifactMetadataGenerator) metadata(opts generateMetadataOptions) (Atte }, }, Metadata: AttestationMetadataInfo{ - BuildStartedOn: TimeRFC3339{Time: startedAt}, - BuildFinishedOn: TimeRFC3339{Time: endedAt}, + BuildStartedOn: common.TimeRFC3339{Time: startedAt}, + BuildFinishedOn: common.TimeRFC3339{Time: endedAt}, Reproducible: false, Completeness: AttestationMetadataInfoCompleteness{ Parameters: true, diff --git a/commands/helpers/artifact_metadata_test.go b/commands/helpers/artifact_metadata_test.go index 430d7aba860..68c5eafc6f4 100644 --- a/commands/helpers/artifact_metadata_test.go +++ b/commands/helpers/artifact_metadata_test.go @@ -80,10 +80,10 @@ func TestGenerateMetadataToFile(t *testing.T) { }, Materials: make([]interface{}, 0), Metadata: AttestationMetadataInfo{ - BuildStartedOn: TimeRFC3339{ + BuildStartedOn: common.TimeRFC3339{ Time: startedAt, }, - BuildFinishedOn: TimeRFC3339{ + BuildFinishedOn: common.TimeRFC3339{ Time: endedAt, }, Reproducible: false, diff --git a/commands/multi.go b/commands/multi.go index 14812c6efb5..1ac8f952f22 100644 --- a/commands/multi.go +++ b/commands/multi.go @@ -52,6 +52,7 @@ type RunCommand struct { healthHelper buildsHelper buildsHelper + jobStore *common.MultiJobStore ServiceName string `short:"n" long:"service" description:"Use different names for different services"` WorkingDirectory string `short:"d" long:"working-directory" description:"Specify custom working directory"` @@ -104,6 +105,10 @@ func (mr *RunCommand) Start(_ service.Service) error { mr.runFinished = make(chan bool, 1) mr.stopSignals = make(chan os.Signal) + mr.jobStore = common.NewMultiJobStore(func(config *common.RunnerConfig) common.JobStore { + return common.NewFileJobStore("/tmp/", config) + }) + mr.log().Info("Starting multi-runner from ", mr.ConfigFile, "...") userModeWarning(false) @@ -502,14 +507,21 @@ func (mr *RunCommand) processRunner( } // Receive a new build - trace, jobData, err := mr.requestJob(runner, sessionInfo) + trace, jobData, executionState, err := mr.requestJob(runner, sessionInfo) if err != nil || jobData == nil { return } defer func() { mr.traceOutcome(trace, err) }() // Create a new build - build, err := common.NewBuild(*jobData, runner, mr.abortBuilds, executorData) + build, err := common.NewBuild( + *jobData, + runner, + mr.abortBuilds, + executorData, + executionState, + mr.jobStore.Get(runner), + ) if err != nil { return } @@ -525,7 +537,13 @@ func (mr *RunCommand) processRunner( mr.requeueRunner(runner, runners) // Process a build - return build.Run(mr.config, trace) + if err := build.Run(mr.config, trace); err != nil { + // TODO: + _ = mr.jobStore.Get(runner).Remove(jobData.ID) + return err + } + + return nil } func (mr *RunCommand) traceOutcome(trace common.JobTrace, err error) { @@ -570,28 +588,63 @@ func (mr *RunCommand) createSession(provider common.ExecutorProvider) (*session. func (mr *RunCommand) requestJob( runner *common.RunnerConfig, sessionInfo *common.SessionInfo, -) (common.JobTrace, *common.JobResponse, error) { +) (common.JobTrace, *common.JobResponse, *common.JobExecutionState, error) { if !mr.buildsHelper.acquireRequest(runner) { mr.log().WithField("runner", runner.ShortDescription()). Debugln("Failed to request job: runner requestConcurrency meet") - return nil, nil, nil + return nil, nil, nil, nil } defer mr.buildsHelper.releaseRequest(runner) - jobData, healthy := mr.doJobRequest(context.Background(), runner, sessionInfo) - mr.makeHealthy(runner.UniqueID(), healthy) + store := mr.jobStore.Get(runner) - if jobData == nil { - return nil, nil, nil + var jobData *common.JobResponse + executionState, err := store.FindJobToResume() + if err != nil { + // TODO: handle invalid json + return nil, nil, nil, err + } + if executionState != nil { + // TODO: should we check early the job status and fail if e.g. it's been cancelled? + jobData = executionState.Job + executionState.Resumes++ + executionState.ResumedFromStage = executionState.Stage + // TODO: + _ = store.Save(executionState) + mr.makeHealthy(runner.UniqueID(), true) + } else { + var healthy bool + jobData, healthy = mr.doJobRequest(context.Background(), runner, sessionInfo) + mr.makeHealthy(runner.UniqueID(), healthy) + + if jobData == nil { + return nil, nil, nil, nil + } + + executionState = common.NewJobExecutionState(jobData) + if err := store.Save(executionState); err != nil { + return nil, nil, nil, err + } } + go func() { + t := time.NewTicker(10 * time.Second) + for { + <-t.C + executionState.UpdateHealth() + if err := store.Save(executionState); err != nil { + mr.log().WithField("update job execution health error:", err) + } + } + }() + // Make sure to always close output jobCredentials := &common.JobCredentials{ ID: jobData.ID, Token: jobData.Token, } - trace, err := mr.network.ProcessJob(*runner, jobCredentials) + trace, err := mr.network.ProcessJob(*runner, jobCredentials, executionState.SentTrace) if err != nil { jobInfo := common.UpdateJobInfo{ ID: jobCredentials.ID, @@ -601,11 +654,21 @@ func (mr *RunCommand) requestJob( // send failure once mr.network.UpdateJob(*runner, jobCredentials, jobInfo) - return nil, nil, err + return nil, nil, nil, err + } + + if executionState.IsResumed() { + trace.Disable() } + trace.SetOnWriteFunc(func(sentTraceLen int) { + executionState.SentTrace = sentTraceLen + // TODO: + _ = store.Save(executionState) + }) + trace.SetFailuresCollector(mr.failuresCollector) - return trace, jobData, nil + return trace, jobData, executionState, nil } // doJobRequest will execute the request for a new job, respecting an interruption diff --git a/commands/single.go b/commands/single.go index d5c90e85c76..76db6b8b3ca 100644 --- a/commands/single.go +++ b/commands/single.go @@ -93,7 +93,7 @@ func (r *RunSingleCommand) processBuild(data common.ExecutorData, abortSignal ch } config := common.NewConfig() - newBuild, err := common.NewBuild(*jobData, &r.RunnerConfig, abortSignal, data) + newBuild, err := common.NewBuild(*jobData, &r.RunnerConfig, abortSignal, data, nil, nil) if err != nil { return err } @@ -102,7 +102,7 @@ func (r *RunSingleCommand) processBuild(data common.ExecutorData, abortSignal ch ID: jobData.ID, Token: jobData.Token, } - trace, err := r.network.ProcessJob(r.RunnerConfig, jobCredentials) + trace, err := r.network.ProcessJob(r.RunnerConfig, jobCredentials, 0) if err != nil { return err } diff --git a/common/build.go b/common/build.go index 10f8764d5f0..344c37a71dd 100644 --- a/common/build.go +++ b/common/build.go @@ -141,12 +141,10 @@ type Build struct { // Unique ID for all running builds on this runner and this project ProjectRunnerID int `json:"project_runner_id"` + statusLock sync.Mutex // CurrentStage(), CurrentState() and CurrentExecutorStage() are called // from the metrics go routine whilst a build is in-flight, so access // to these variables requires a lock. - statusLock sync.Mutex - currentStage BuildStage - currentState BuildRuntimeState executorStageResolver func() ExecutorStage secretsResolver func(l logger, registry SecretResolverRegistry) (SecretsResolver, error) @@ -160,36 +158,48 @@ type Build struct { createdAt time.Time + ExecutionState *JobExecutionState + Store JobStore + Referees []referees.Referee ArtifactUploader func(config JobCredentials, reader io.ReadCloser, options ArtifactsOptions) (UploadState, string) } func (b *Build) setCurrentStage(stage BuildStage) { - b.statusLock.Lock() - defer b.statusLock.Unlock() + b.ExecutionState.Lock() + defer b.ExecutionState.Unlock() - b.currentStage = stage + if b.ExecutionState.Stage != stage { + b.ExecutionState.Stage = stage + // TODO: + _ = b.Store.Save(b.ExecutionState) + } } func (b *Build) CurrentStage() BuildStage { - b.statusLock.Lock() - defer b.statusLock.Unlock() + b.ExecutionState.Lock() + defer b.ExecutionState.Unlock() + + return b.ExecutionState.Stage - return b.currentStage } func (b *Build) setCurrentState(state BuildRuntimeState) { - b.statusLock.Lock() - defer b.statusLock.Unlock() + b.ExecutionState.Lock() + defer b.ExecutionState.Unlock() - b.currentState = state + if b.ExecutionState.State != state { + b.ExecutionState.State = state + // TODO: + _ = b.Store.Save(b.ExecutionState) + } } func (b *Build) CurrentState() BuildRuntimeState { - b.statusLock.Lock() - defer b.statusLock.Unlock() + b.ExecutionState.Lock() + defer b.ExecutionState.Unlock() - return b.currentState + return b.ExecutionState.State } func (b *Build) Log() *logrus.Entry { @@ -320,7 +330,18 @@ func (b *Build) StartBuild(rootDir, cacheDir string, customBuildDirEnabled, shar return nil } -func (b *Build) executeStage(ctx context.Context, buildStage BuildStage, executor Executor) error { +func (b *Build) executeStage(ctx context.Context, buildStage BuildStage, executor Executor, trace JobTrace) error { + if b.ExecutionState.IsResumed() && b.ExecutionState.Stage != "" && buildStage != b.ExecutionState.Stage { + return nil + } + + if b.ExecutionState.IsResumed() && b.ExecutionState.Stage == b.ExecutionState.ResumedFromStage { + // TODO: executors should still perform similar checks to make sure they won't print lines for a second time after resuming + // Enable the trace at the end of the method to avoid some more logs + // TODO: restore/close build sections? + defer trace.Enable() + } + if ctx.Err() != nil { return ctx.Err() } @@ -423,29 +444,29 @@ func GetStageDescription(stage BuildStage) string { return description } -func (b *Build) executeUploadArtifacts(ctx context.Context, state error, executor Executor) (err error) { +func (b *Build) executeUploadArtifacts(ctx context.Context, state error, executor Executor, trace JobTrace) (err error) { if state == nil { - return b.executeStage(ctx, BuildStageUploadOnSuccessArtifacts, executor) + return b.executeStage(ctx, BuildStageUploadOnSuccessArtifacts, executor, trace) } - return b.executeStage(ctx, BuildStageUploadOnFailureArtifacts, executor) + return b.executeStage(ctx, BuildStageUploadOnFailureArtifacts, executor, trace) } -func (b *Build) executeArchiveCache(ctx context.Context, state error, executor Executor) (err error) { +func (b *Build) executeArchiveCache(ctx context.Context, state error, executor Executor, trace JobTrace) (err error) { if state == nil { - return b.executeStage(ctx, BuildStageArchiveOnSuccessCache, executor) + return b.executeStage(ctx, BuildStageArchiveOnSuccessCache, executor, trace) } - return b.executeStage(ctx, BuildStageArchiveOnFailureCache, executor) + return b.executeStage(ctx, BuildStageArchiveOnFailureCache, executor, trace) } -func (b *Build) executeScript(ctx context.Context, executor Executor) error { +func (b *Build) executeScript(ctx context.Context, executor Executor, trace JobTrace) error { // track job start and create referees startTime := time.Now() b.createReferees(executor) // Prepare stage - err := b.executeStage(ctx, BuildStagePrepare, executor) + err := b.executeStage(ctx, BuildStagePrepare, executor, trace) if err != nil { return fmt.Errorf( "prepare environment: %w. "+ @@ -454,13 +475,13 @@ func (b *Build) executeScript(ctx context.Context, executor Executor) error { ) } - err = b.attemptExecuteStage(ctx, BuildStageGetSources, executor, b.GetGetSourcesAttempts()) + err = b.attemptExecuteStage(ctx, BuildStageGetSources, executor, b.GetGetSourcesAttempts(), trace) if err == nil { - err = b.attemptExecuteStage(ctx, BuildStageRestoreCache, executor, b.GetRestoreCacheAttempts()) + err = b.attemptExecuteStage(ctx, BuildStageRestoreCache, executor, b.GetRestoreCacheAttempts(), trace) } if err == nil { - err = b.attemptExecuteStage(ctx, BuildStageDownloadArtifacts, executor, b.GetDownloadArtifactsAttempts()) + err = b.attemptExecuteStage(ctx, BuildStageDownloadArtifacts, executor, b.GetDownloadArtifactsAttempts(), trace) } if err == nil { @@ -469,24 +490,24 @@ func (b *Build) executeScript(ctx context.Context, executor Executor) error { if s.Name == StepNameAfterScript { continue } - err = b.executeStage(ctx, StepToBuildStage(s), executor) + err = b.executeStage(ctx, StepToBuildStage(s), executor, trace) if err != nil { break } } - b.executeAfterScript(ctx, err, executor) + b.executeAfterScript(ctx, err, executor, trace) } - archiveCacheErr := b.executeArchiveCache(ctx, err, executor) + archiveCacheErr := b.executeArchiveCache(ctx, err, executor, trace) - artifactUploadErr := b.executeUploadArtifacts(ctx, err, executor) + artifactUploadErr := b.executeUploadArtifacts(ctx, err, executor, trace) // track job end and execute referees endTime := time.Now() b.executeUploadReferees(ctx, startTime, endTime) - b.removeFileBasedVariables(ctx, executor) + b.removeFileBasedVariables(ctx, executor, trace) return b.pickPriorityError(err, archiveCacheErr, artifactUploadErr) } @@ -505,7 +526,7 @@ func (b *Build) pickPriorityError(jobErr error, archiveCacheErr error, artifactU return artifactUploadErr } -func (b *Build) executeAfterScript(ctx context.Context, err error, executor Executor) { +func (b *Build) executeAfterScript(ctx context.Context, err error, executor Executor, trace JobTrace) { state, _ := b.runtimeStateAndError(err) b.GetAllVariables().OverwriteKey("CI_JOB_STATUS", JobVariable{ Key: "CI_JOB_STATUS", @@ -515,7 +536,7 @@ func (b *Build) executeAfterScript(ctx context.Context, err error, executor Exec ctx, cancel := context.WithTimeout(ctx, AfterScriptTimeout) defer cancel() - _ = b.executeStage(ctx, BuildStageAfterScript, executor) + _ = b.executeStage(ctx, BuildStageAfterScript, executor, trace) } // StepToBuildStage returns the BuildStage corresponding to a step. @@ -527,8 +548,8 @@ func (b *Build) createReferees(executor Executor) { b.Referees = referees.CreateReferees(executor, b.Runner.Referees, b.Log()) } -func (b *Build) removeFileBasedVariables(ctx context.Context, executor Executor) { - err := b.executeStage(ctx, BuildStageCleanup, executor) +func (b *Build) removeFileBasedVariables(ctx context.Context, executor Executor, trace JobTrace) { + err := b.executeStage(ctx, BuildStageCleanup, executor, trace) if err != nil { b.Log().WithError(err).Warning("Error while executing file based variables removal script") } @@ -572,12 +593,13 @@ func (b *Build) attemptExecuteStage( buildStage BuildStage, executor Executor, attempts int, + trace JobTrace, ) (err error) { if attempts < 1 || attempts > 10 { return fmt.Errorf("number of attempts out of the range [1, 10] for stage: %s", buildStage) } for attempt := 0; attempt < attempts; attempt++ { - if err = b.executeStage(ctx, buildStage, executor); err == nil { + if err = b.executeStage(ctx, buildStage, executor, trace); err == nil { return } } @@ -621,8 +643,10 @@ func (b *Build) runtimeStateAndError(err error) (BuildRuntimeState, error) { } } -func (b *Build) run(ctx context.Context, executor Executor) (err error) { - b.setCurrentState(BuildRunRuntimeRunning) +func (b *Build) run(ctx context.Context, executor Executor, trace JobTrace) (err error) { + if !b.ExecutionState.IsResumed() { + b.setCurrentState(BuildRunRuntimeRunning) + } buildFinish := make(chan error, 1) buildPanic := make(chan error, 1) @@ -646,7 +670,7 @@ func (b *Build) run(ctx context.Context, executor Executor) (err error) { } }() - buildFinish <- b.executeScript(runContext, executor) + buildFinish <- b.executeScript(runContext, executor, trace) }() // Wait for signals: cancel, timeout, abort or finish @@ -880,9 +904,11 @@ func (b *Build) Run(globalConfig *Config, trace JobTrace) (err error) { } b.logger = NewBuildLogger(trace, b.Log()) - b.printRunningWithHeader() - b.setCurrentState(BuildRunStatePending) + if !b.ExecutionState.IsResumed() { + b.printRunningWithHeader() + b.setCurrentState(BuildRunStatePending) + } // These defers are ordered because runBuild could panic and the recover needs to handle that panic. // setTraceStatus needs to be last since it needs a correct error value to report the job's status @@ -901,7 +927,12 @@ func (b *Build) Run(globalConfig *Config, trace JobTrace) (err error) { return err } - ctx, cancel := context.WithTimeout(context.Background(), b.GetBuildTimeout()) + contextTimeout := b.GetBuildTimeout() + if b.ExecutionState.IsResumed() { + contextTimeout -= time.Since(b.ExecutionState.StartedAt.Time) + } + + ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) defer cancel() trace.SetCancelFunc(cancel) @@ -922,7 +953,7 @@ func (b *Build) Run(globalConfig *Config, trace JobTrace) (err error) { executor, err = b.executeBuildSection(executor, options, provider) if err == nil { - err = b.run(ctx, executor) + err = b.run(ctx, executor, trace) if errWait := b.waitForTerminal(ctx, globalConfig.SessionServer.GetSessionTimeout()); errWait != nil { b.Log().WithError(errWait).Debug("Stopped waiting for terminal") } @@ -990,13 +1021,17 @@ func (b *Build) executeBuildSection( Name: string(BuildStagePrepareExecutor), SkipMetrics: !b.JobResponse.Features.TraceSections, Run: func() error { - msg := fmt.Sprintf( - "%sPreparing the %q executor%s", - helpers.ANSI_BOLD_CYAN, - b.Runner.Executor, - helpers.ANSI_RESET, - ) - b.logger.Println(msg) + if !b.ExecutionState.IsResumed() { + msg := fmt.Sprintf( + "%sPreparing the %q executor%s", + helpers.ANSI_BOLD_CYAN, + b.Runner.Executor, + helpers.ANSI_RESET, + ) + + b.logger.Println(msg) + } + executor, err = b.retryCreateExecutor(options, provider, b.logger) return err }, @@ -1365,6 +1400,8 @@ func NewBuild( runnerConfig *RunnerConfig, systemInterrupt chan os.Signal, executorData ExecutorData, + executionState *JobExecutionState, + store JobStore, ) (*Build, error) { // Attempt to perform a deep copy of the RunnerConfig runnerConfigCopy, err := runnerConfig.DeepCopy() @@ -1379,6 +1416,8 @@ func NewBuild( ExecutorData: executorData, createdAt: time.Now(), secretsResolver: newSecretsResolver, + ExecutionState: executionState, + Store: store, }, nil } diff --git a/common/network.go b/common/network.go index c9fc85c2f3c..e5be730f54e 100644 --- a/common/network.go +++ b/common/network.go @@ -566,10 +566,13 @@ type JobTrace interface { SetCancelFunc(cancelFunc context.CancelFunc) Cancel() bool SetAbortFunc(abortFunc context.CancelFunc) + SetOnWriteFunc(func(sentTraceLen int)) Abort() bool SetFailuresCollector(fc FailuresCollector) SetMasked(values []string) IsStdout() bool + Disable() + Enable() } type UpdateJobResult struct { @@ -602,5 +605,5 @@ type Network interface { PatchTrace(config RunnerConfig, jobCredentials *JobCredentials, content []byte, startOffset int) PatchTraceResult DownloadArtifacts(config JobCredentials, artifactsFile io.WriteCloser, directDownload *bool) DownloadState UploadRawArtifacts(config JobCredentials, reader io.ReadCloser, options ArtifactsOptions) (UploadState, string) - ProcessJob(config RunnerConfig, buildCredentials *JobCredentials) (JobTrace, error) + ProcessJob(config RunnerConfig, buildCredentials *JobCredentials, startOffset int) (JobTrace, error) } diff --git a/common/store.go b/common/store.go new file mode 100644 index 00000000000..766cca403b1 --- /dev/null +++ b/common/store.go @@ -0,0 +1,201 @@ +package common + +import ( + "encoding/json" + "fmt" + "github.com/gofrs/flock" + "io/ioutil" + "os" + "path/filepath" + "strings" + "sync" + "time" +) + +type JobExecutionState struct { + sync.Mutex + + Job *JobResponse `json:"job"` + Resumes int `json:"retries"` + State BuildRuntimeState `json:"state"` + Stage BuildStage `json:"stage"` + HealthCheckAt TimeRFC3339 `json:"health_check_at"` + StartedAt TimeRFC3339 `json:"started_at"` + SentTrace int `json:"sent_trace"` + ExecutorMetadata any `json:"executor_metadata"` + + ResumedFromStage BuildStage +} + +func NewJobExecutionState(job *JobResponse) *JobExecutionState { + return &JobExecutionState{ + Job: job, + Resumes: 0, + HealthCheckAt: NewTimeRFC3339(time.Now()), + StartedAt: NewTimeRFC3339(time.Now()), + } +} + +func (s *JobExecutionState) UpdateHealth() { + s.HealthCheckAt = NewTimeRFC3339(time.Now()) +} + +func (s *JobExecutionState) IsResumed() bool { + return s.Resumes > 0 +} + +func (s *JobExecutionState) SetExecutorMetadata(data any) { + s.Lock() + defer s.Unlock() + s.ExecutorMetadata = data +} + +func (s *JobExecutionState) UpdateExecutorMetadata(mutator func(data any)) { + s.Lock() + defer s.Unlock() + mutator(s.ExecutorMetadata) +} + +type StoreFilter func(job *JobExecutionState) bool + +var findRunningFilter = func(execution *JobExecutionState) bool { + return time.Since(execution.HealthCheckAt.Time) > 30*time.Second && + (execution.State == BuildRunStatePending || execution.State == BuildRunRuntimeRunning) +} + +type JobStore interface { + Save(job *JobExecutionState) error + Load(jobID int64) (*JobExecutionState, error) + Remove(jobID int64) error + FindJobToResume() (*JobExecutionState, error) +} + +type StoreFactory func(*RunnerConfig) JobStore + +type MultiJobStore struct { + factory StoreFactory + stores map[string]JobStore +} + +func NewMultiJobStore(factory StoreFactory) *MultiJobStore { + return &MultiJobStore{stores: map[string]JobStore{}, factory: factory} +} + +func (m *MultiJobStore) Get(runner *RunnerConfig) JobStore { + if m.stores[runner.UniqueID()] == nil { + m.stores[runner.UniqueID()] = m.factory(runner) + } + + return m.stores[runner.UniqueID()] +} + +type FileJobStore struct { + dir string + runner *RunnerConfig +} + +func NewFileJobStore(dir string, runner *RunnerConfig) *FileJobStore { + return &FileJobStore{dir, runner} +} + +func (f *FileJobStore) FindJobToResume() (*JobExecutionState, error) { + files, err := ioutil.ReadDir(f.dir) + if err != nil { + return nil, err + } + + for _, file := range files { + if !strings.HasSuffix(file.Name(), fmt.Sprintf("runner.%s-%s.json", f.runner.Name, f.runner.ShortDescription())) { + continue + } + + state, err := func() (*JobExecutionState, error) { + path := filepath.Join(f.dir, file.Name()) + lock := flock.New(path) + if err := lock.Lock(); err != nil { + return nil, err + } + defer lock.Unlock() + + state, err := f.load(path) + if err != nil { + return nil, err + } + + if findRunningFilter(state) { + state.UpdateHealth() + if err := f.saveNoLocK(state); err != nil { + return nil, err + } + + return state, nil + } + + return nil, nil + }() + + if err != nil { + return nil, err + } + + if state != nil { + return state, nil + } + } + + return nil, nil +} + +func (f *FileJobStore) statePath(jobID int64) string { + return filepath.Join(f.dir, fmt.Sprintf("state.job.%d.runner.%s-%s.json", jobID, f.runner.Name, f.runner.ShortDescription())) +} + +func (f *FileJobStore) saveNoLocK(state *JobExecutionState) error { + file := f.statePath(state.Job.ID) + b, err := json.Marshal(state) + if err != nil { + return err + } + + return ioutil.WriteFile(file, b, 0700) +} + +func (f *FileJobStore) Save(state *JobExecutionState) error { + file := f.statePath(state.Job.ID) + lock := flock.New(file) + if err := lock.Lock(); err != nil { + return err + } + defer lock.Unlock() + + return f.saveNoLocK(state) +} + +func (f *FileJobStore) load(file string) (*JobExecutionState, error) { + b, err := ioutil.ReadFile(file) + if err != nil { + return nil, err + } + + var state *JobExecutionState + if err := json.Unmarshal(b, &state); err != nil { + return nil, err + } + + return state, nil +} + +func (f *FileJobStore) Load(jobID int64) (*JobExecutionState, error) { + return f.load(f.statePath(jobID)) +} + +func (f *FileJobStore) Remove(jobID int64) error { + file := f.statePath(jobID) + lock := flock.New(file) + if err := lock.Lock(); err != nil { + return err + } + defer lock.Unlock() + + return os.Remove(file) +} diff --git a/common/time.go b/common/time.go new file mode 100644 index 00000000000..9f59dec8be8 --- /dev/null +++ b/common/time.go @@ -0,0 +1,31 @@ +package common + +import ( + "strconv" + "strings" + "time" +) + +// TimeRFC3339 is used specifically to marshal and unmarshal time to/from RFC3339 strings +// That's because the metadata is user-facing and using Go's built-in time parsing will not be portable +type TimeRFC3339 struct { + time.Time +} + +func NewTimeRFC3339(t time.Time) TimeRFC3339 { + return TimeRFC3339{t} +} + +func (t *TimeRFC3339) UnmarshalJSON(b []byte) error { + var err error + t.Time, err = time.Parse(time.RFC3339, strings.Trim(string(b), `"`)) + return err +} + +func (t TimeRFC3339) MarshalJSON() ([]byte, error) { + if t.IsZero() { + return nil, nil + } + + return []byte(strconv.Quote(t.Time.Format(time.RFC3339))), nil +} diff --git a/common/trace.go b/common/trace.go index 070181a36e2..1e116ce246f 100644 --- a/common/trace.go +++ b/common/trace.go @@ -8,10 +8,11 @@ import ( ) type Trace struct { - Writer io.Writer - cancelFunc context.CancelFunc - abortFunc context.CancelFunc - mutex sync.Mutex + Writer io.Writer + cancelFunc context.CancelFunc + abortFunc context.CancelFunc + onWriteFunc func(sentTraceLen int) + mutex sync.Mutex } type masker interface { @@ -92,3 +93,15 @@ func (s *Trace) SetFailuresCollector(fc FailuresCollector) {} func (s *Trace) IsStdout() bool { return true } + +func (s *Trace) SetOnWriteFunc(onWriteFunc func(sentTraceLen int)) { + s.onWriteFunc = onWriteFunc +} + +func (s *Trace) Enable() { + +} + +func (s *Trace) Disable() { + +} diff --git a/executors/kubernetes/kubernetes.go b/executors/kubernetes/kubernetes.go index 5fe3f9e5505..ea39f713e22 100644 --- a/executors/kubernetes/kubernetes.go +++ b/executors/kubernetes/kubernetes.go @@ -79,6 +79,26 @@ var ( resourceTypePullSecret = "ImagePullSecret" ) +type jobStateResource struct { + Name string `json:"name"` + Namespace string `json:"namespace"` +} + +func resourceFromKubernetesType(from metav1.ObjectMeta) *jobStateResource { + return &jobStateResource{ + Name: from.Name, + Namespace: from.Namespace, + } +} + +type jobStateMetadata struct { + Credentials *jobStateResource `json:"credentials"` + ConfigMap *jobStateResource `json:"config_map"` + Pod *jobStateResource `json:"pod"` + Services []*jobStateResource `json:"services"` + Offset int64 `json:"offset"` +} + type commandTerminatedError struct { exitCode int } @@ -161,7 +181,7 @@ type executor struct { featureChecker featureChecker - newLogProcessor func() logProcessor + newLogProcessor func(offset int64) logProcessor remoteProcessTerminated chan shells.TrapCommandExitStatus @@ -169,6 +189,8 @@ type executor struct { // Flag if a repo mount and emptyDir volume are needed requireDefaultBuildsDirVolume *bool + + processingLogs bool } type serviceCreateResponse struct { @@ -182,6 +204,15 @@ func (s *executor) Name() string { func (s *executor) Prepare(options common.ExecutorPrepareOptions) (err error) { s.AbstractExecutor.PrepareConfiguration(options) + if s.Build.ExecutionState.IsResumed() && s.Build.ExecutionState.ExecutorMetadata != nil { + // When loaded from this this is a map[string]interface{}, we need to convert it + b, _ := json.Marshal(s.Build.ExecutionState.ExecutorMetadata) + var meta *jobStateMetadata + _ = json.Unmarshal(b, &meta) + s.Build.ExecutionState.ExecutorMetadata = meta + } else { + s.Build.ExecutionState.ExecutorMetadata = &jobStateMetadata{} + } if err = s.prepareOverwrites(options.Build.GetAllVariables()); err != nil { return fmt.Errorf("couldn't prepare overwrites: %w", err) @@ -225,9 +256,11 @@ func (s *executor) Prepare(options common.ExecutorPrepareOptions) (err error) { imageName := s.Build.GetAllVariables().ExpandValue(s.options.Image.Name) - s.Println("Using Kubernetes executor with image", imageName, "...") - if !s.Build.IsFeatureFlagOn(featureflags.UseLegacyKubernetesExecutionStrategy) { - s.Println("Using attach strategy to execute scripts...") + if !s.Build.ExecutionState.IsResumed() { + s.Println("Using Kubernetes executor with image", imageName, "...") + if !s.Build.IsFeatureFlagOn(featureflags.UseLegacyKubernetesExecutionStrategy) { + s.Println("Using attach strategy to execute scripts...") + } } s.Debugln(fmt.Sprintf("Using helper image: %s:%s", s.helperImageInfo.Name, s.helperImageInfo.Tag)) @@ -347,6 +380,10 @@ func (s *executor) runWithExecLegacy(cmd common.ExecutorCommand) error { } func (s *executor) runWithAttach(cmd common.ExecutorCommand) error { + if s.Build.ExecutionState.IsResumed() { + fmt.Println("Resuming stage ", s.Build.ExecutionState.Stage) + } + err := s.ensurePodsConfigured(cmd.Context) if err != nil { return err @@ -355,25 +392,28 @@ func (s *executor) runWithAttach(cmd common.ExecutorCommand) error { ctx, cancel := context.WithCancel(cmd.Context) defer cancel() - containerName, containerCommand := s.getContainerInfo(cmd) + podStatusCh := s.watchPodStatus(ctx) - s.Debugln(fmt.Sprintf( - "Starting in container %q the command %q with script: %s", - containerName, - containerCommand, - cmd.Script, - )) + var runInContainerCh <-chan error + if s.Build.ExecutionState.Stage != s.Build.ExecutionState.ResumedFromStage { + containerName, containerCommand := s.getContainerInfo(cmd) - podStatusCh := s.watchPodStatus(ctx) + s.Debugln(fmt.Sprintf( + "Starting in container %q the command %q with script: %s", + containerName, + containerCommand, + cmd.Script, + )) - select { - case err := <-s.runInContainer(containerName, containerCommand): - s.Debugln(fmt.Sprintf("Container %q exited with error: %v", containerName, err)) - var terminatedError *commandTerminatedError - if err != nil && errors.As(err, &terminatedError) { - return &common.BuildError{Inner: err, ExitCode: terminatedError.exitCode} - } + // If we resume from a stage we don't want to run the scripts for that stage again, + // rather let the log processor read the logs from the correct index and move on to the next stage + runInContainerCh = s.runInContainer(containerName, containerCommand) + } else { + runInContainerCh = s.listenForCommandExit() + } + select { + case err := <-runInContainerCh: return err case err := <-podStatusCh: if IsKubernetesPodNotFoundError(err) { @@ -387,10 +427,6 @@ func (s *executor) runWithAttach(cmd common.ExecutorCommand) error { } func (s *executor) ensurePodsConfigured(ctx context.Context) error { - if s.pod != nil { - return nil - } - err := s.setupCredentials() if err != nil { return fmt.Errorf("setting up credentials: %w", err) @@ -401,13 +437,9 @@ func (s *executor) ensurePodsConfigured(ctx context.Context) error { return fmt.Errorf("setting up scripts configMap: %w", err) } - permissionsInitContainer, err := s.buildPermissionsInitContainer(s.helperImageInfo.OSType) + err = s.setupPods() if err != nil { - return fmt.Errorf("building permissions init container: %w", err) - } - err = s.setupBuildPod([]api.Container{permissionsInitContainer}) - if err != nil { - return fmt.Errorf("setting up build pod: %w", err) + return fmt.Errorf("setting up pods: %w", err) } status, err := waitForPodRunning(ctx, s.kubeClient, s.pod, s.Trace, s.Config.Kubernetes) @@ -419,7 +451,63 @@ func (s *executor) ensurePodsConfigured(ctx context.Context) error { return fmt.Errorf("pod failed to enter running state: %s", status) } - go s.processLogs(ctx) + // TODO: can we and should we handle the case of: + // Runner is restarted + // Job is running in background + // Job completes while no running is handling it + // Runner starts but the pod is already done with its work + if !s.processingLogs { + s.processingLogs = true + go s.processLogs(ctx) + } + + return nil +} + +func (s *executor) setupPods() error { + if s.pod != nil { + return nil + } + + existingPod := s.stateExecutorMetadata().Pod + existingServices := s.stateExecutorMetadata().Services + if s.Build.ExecutionState.IsResumed() && existingPod != nil { + // TODO: Ideally we will separate the creation/restoration of Pod and Services but it's not really + // necessary for now + s.Debugln("Restoring pods") + + var err error + s.pod, err = s.kubeClient.CoreV1(). + Pods(existingPod.Namespace). + Get(context.TODO(), existingPod.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + for _, existingSrv := range existingServices { + srv, err := s.kubeClient.CoreV1(). + Services(existingSrv.Namespace). + Get(context.TODO(), existingSrv.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + // TODO: might be nil but we don't care right now + s.services = append(s.services, *srv) + } + + return nil + } + + permissionsInitContainer, err := s.buildPermissionsInitContainer(s.helperImageInfo.OSType) + if err != nil { + return fmt.Errorf("building permissions init container: %w", err) + } + + err = s.setupBuildPod([]api.Container{permissionsInitContainer}) + if err != nil { + return fmt.Errorf("setting up build pod: %w", err) + } return nil } @@ -550,15 +638,56 @@ func (s *executor) buildRedirectionCmd(shell string) string { } func (s *executor) processLogs(ctx context.Context) { - processor := s.newLogProcessor() + processor := s.newLogProcessor(s.stateExecutorMetadata().Offset) logsCh, errCh := processor.Process(ctx) + var offsetToWriteMutex sync.RWMutex + var offsetToWrite int64 + var offsetLastWritten int64 + go func() { + writeOffset := func() { + s.updateStateExecutorMetadata(func(d *jobStateMetadata) { + offsetToWriteMutex.RLock() + if offsetLastWritten == offsetToWrite { + offsetToWriteMutex.RUnlock() + return + } + offsetToWriteMutex.RUnlock() + + d.Offset = offsetToWrite + offsetToWriteMutex.Lock() + offsetLastWritten = offsetToWrite + offsetToWriteMutex.Unlock() + }) + + } + + t := time.NewTicker(100 * time.Millisecond) + for { + select { + case <-t.C: + writeOffset() + case <-ctx.Done(): + writeOffset() + return + } + } + }() + for { select { - case line, ok := <-logsCh: + case lineData, ok := <-logsCh: if !ok { return } + + line := lineData.line + offset := lineData.offset + + offsetToWriteMutex.Lock() + offsetToWrite = offset + offsetToWriteMutex.Unlock() + var status shells.TrapCommandExitStatus if status.TryUnmarshal(line) { s.remoteProcessTerminated <- status @@ -596,6 +725,21 @@ func getExitCode(err error) int { } func (s *executor) setupScriptsConfigMap() error { + if s.configMap != nil { + return nil + } + + existingConfigMap := s.stateExecutorMetadata().ConfigMap + if s.Build.ExecutionState.IsResumed() && existingConfigMap != nil { + s.Debugln("Restoring scripts config map") + + var err error + s.configMap, err = s.kubeClient.CoreV1(). + ConfigMaps(existingConfigMap.Namespace). + Get(context.TODO(), existingConfigMap.Name, metav1.GetOptions{}) + return err + } + s.Debugln("Setting up scripts config map") // After issue https://gitlab.com/gitlab-org/gitlab-runner/issues/10342 is resolved and @@ -628,6 +772,10 @@ func (s *executor) setupScriptsConfigMap() error { return fmt.Errorf("generating scripts config map: %w", err) } + s.updateStateExecutorMetadata(func(d *jobStateMetadata) { + d.ConfigMap = resourceFromKubernetesType(s.configMap.ObjectMeta) + }) + return nil } @@ -1197,6 +1345,21 @@ func (s *executor) isSharedBuildsDirRequired() bool { } func (s *executor) setupCredentials() error { + if s.credentials != nil { + return nil + } + + existingCredentials := s.stateExecutorMetadata().Credentials + if s.Build.ExecutionState.IsResumed() && existingCredentials != nil { + s.Debugln("Restoring secrets") + + var err error + s.credentials, err = s.kubeClient.CoreV1(). + Secrets(existingCredentials.Namespace). + Get(context.TODO(), existingCredentials.Name, metav1.GetOptions{}) + return err + } + s.Debugln("Setting up secrets") authConfigs, err := auth.ResolveConfigs(s.Build.GetDockerAuthConfig(), s.Shell().User, s.Build.Credentials) @@ -1235,9 +1398,28 @@ func (s *executor) setupCredentials() error { } s.credentials = creds + + s.updateStateExecutorMetadata(func(d *jobStateMetadata) { + d.Credentials = resourceFromKubernetesType(s.credentials.ObjectMeta) + }) + return nil } +func (s *executor) updateStateExecutorMetadata(mutator func(d *jobStateMetadata)) { + s.Build.ExecutionState.UpdateExecutorMetadata(func(data any) { + mutator(data.(*jobStateMetadata)) + }) + + if err := s.Build.Store.Save(s.Build.ExecutionState); err != nil { + s.Errorln("Error saving execution state to store: ", err) + } +} + +func (s *executor) stateExecutorMetadata() *jobStateMetadata { + return s.Build.ExecutionState.ExecutorMetadata.(*jobStateMetadata) +} + func (s *executor) getHostAliases() ([]api.HostAlias, error) { supportsHostAliases, err := s.featureChecker.IsHostAliasSupported() switch { @@ -1313,6 +1495,13 @@ func (s *executor) setupBuildPod(initContainers []api.Container) error { return err } + s.updateStateExecutorMetadata(func(d *jobStateMetadata) { + d.Pod = resourceFromKubernetesType(s.pod.ObjectMeta) + for _, srv := range s.services { + d.Services = append(d.Services, resourceFromKubernetesType(srv.ObjectMeta)) + } + }) + return nil } @@ -1477,6 +1666,9 @@ func (s *executor) createBuildAndHelperContainers() (api.Container, api.Containe } func (s *executor) setOwnerReferencesForResources(ownerReferences []metav1.OwnerReference) error { + // TODO: we could make all these operations more granular in terms of reliability and add + // owner reference state in the executor state object + if s.credentials != nil { credentials := s.credentials.DeepCopy() credentials.SetOwnerReferences(ownerReferences) @@ -1741,10 +1933,8 @@ func (s *executor) checkPodStatus() error { } func (s *executor) runInContainer(name string, command []string) <-chan error { - errCh := make(chan error, 1) + errCh := s.listenForCommandExit() go func() { - defer close(errCh) - attach := AttachOptions{ PodName: s.pod.Name, Namespace: s.pod.Namespace, @@ -1759,8 +1949,23 @@ func (s *executor) runInContainer(name string, command []string) <-chan error { retryable := retry.New(retry.WithBuildLog(&attach, &s.BuildLogger)) err := retryable.Run() if err != nil { + s.Debugln(fmt.Sprintf("Container %q exited with error: %v", name, err)) + var terminatedError *commandTerminatedError + if err != nil && errors.As(err, &terminatedError) { + err = &common.BuildError{Inner: err, ExitCode: terminatedError.exitCode} + } + errCh <- err } + }() + + return errCh +} + +func (s *executor) listenForCommandExit() chan error { + errCh := make(chan error, 1) + go func() { + defer close(errCh) exitStatus := <-s.remoteProcessTerminated s.Debugln("Remote process exited with the status:", exitStatus) @@ -1890,13 +2095,19 @@ func (s *executor) checkDefaults() error { } if s.configurationOverwrites.namespace == "" { - s.Warningln( - fmt.Printf("Namespace is empty, therefore assuming '%s'.", DefaultResourceIdentifier), - ) + // TODO: make all these trace printing steps smarter, e.g. disable trace until we reach the stage to resume from + if !s.Build.ExecutionState.IsResumed() { + s.Warningln( + fmt.Printf("Namespace is empty, therefore assuming '%s'.", DefaultResourceIdentifier), + ) + } + s.configurationOverwrites.namespace = DefaultResourceIdentifier } - s.Println("Using Kubernetes namespace:", s.configurationOverwrites.namespace) + if !s.Build.ExecutionState.IsResumed() { + s.Println("Using Kubernetes namespace:", s.configurationOverwrites.namespace) + } return nil } @@ -1917,19 +2128,22 @@ func newExecutor() *executor { remoteProcessTerminated: make(chan shells.TrapCommandExitStatus), } - e.newLogProcessor = func() logProcessor { + e.newLogProcessor = func(offset int64) logProcessor { + podConfig := kubernetesLogProcessorPodConfig{ + namespace: e.pod.Namespace, + pod: e.pod.Name, + container: helperContainerName, + logPath: e.logFile(), + waitLogFileTimeout: waitLogFileTimeout, + } + return newKubernetesLogProcessor( e.kubeClient, e.kubeConfig, &backoff.Backoff{Min: time.Second, Max: 30 * time.Second}, e.Build.Log(), - kubernetesLogProcessorPodConfig{ - namespace: e.pod.Namespace, - pod: e.pod.Name, - container: helperContainerName, - logPath: e.logFile(), - waitLogFileTimeout: waitLogFileTimeout, - }, + podConfig, + offset, ) } diff --git a/executors/kubernetes/log_processor.go b/executors/kubernetes/log_processor.go index 3e6ba7c6e69..5b16166e44c 100644 --- a/executors/kubernetes/log_processor.go +++ b/executors/kubernetes/log_processor.go @@ -21,6 +21,11 @@ type logStreamer interface { fmt.Stringer } +type logLineData struct { + line string + offset int64 +} + type kubernetesLogStreamer struct { kubernetesLogProcessorPodConfig @@ -63,7 +68,7 @@ type logProcessor interface { // Process listens for log lines // consumers must read from the channel until it's closed // consumers are also notified in case of error through the error channel - Process(ctx context.Context) (<-chan string, <-chan error) + Process(ctx context.Context) (<-chan logLineData, <-chan error) } type backoffCalculator interface { @@ -94,6 +99,7 @@ func newKubernetesLogProcessor( backoff backoffCalculator, logger logrus.FieldLogger, podCfg kubernetesLogProcessorPodConfig, + offset int64, ) *kubernetesLogProcessor { logStreamer := &kubernetesLogStreamer{ kubernetesLogProcessorPodConfig: podCfg, @@ -106,11 +112,13 @@ func newKubernetesLogProcessor( backoff: backoff, logger: logger, logStreamer: logStreamer, + + logsOffset: offset, } } -func (l *kubernetesLogProcessor) Process(ctx context.Context) (<-chan string, <-chan error) { - outCh := make(chan string) +func (l *kubernetesLogProcessor) Process(ctx context.Context) (<-chan logLineData, <-chan error) { + outCh := make(chan logLineData) errCh := make(chan error) go func() { defer close(outCh) @@ -121,7 +129,7 @@ func (l *kubernetesLogProcessor) Process(ctx context.Context) (<-chan string, <- return outCh, errCh } -func (l *kubernetesLogProcessor) attach(ctx context.Context, outCh chan string, errCh chan error) { +func (l *kubernetesLogProcessor) attach(ctx context.Context, outCh chan logLineData, errCh chan error) { var ( attempt float64 = -1 backoffDuration time.Duration @@ -167,7 +175,11 @@ func (l *kubernetesLogProcessor) attach(ctx context.Context, outCh chan string, } } -func (l *kubernetesLogProcessor) processStream(ctx context.Context, outCh chan string) error { +func (l *kubernetesLogProcessor) setLogsOffset(newOffset int64) { + l.logsOffset = newOffset +} + +func (l *kubernetesLogProcessor) processStream(ctx context.Context, outCh chan logLineData) error { reader, writer := io.Pipe() defer func() { _ = reader.Close() @@ -212,7 +224,7 @@ func (l *kubernetesLogProcessor) processStream(ctx context.Context, outCh chan s return gr.Wait() } -func (l *kubernetesLogProcessor) readLogs(ctx context.Context, logs io.Reader, outCh chan string) error { +func (l *kubernetesLogProcessor) readLogs(ctx context.Context, logs io.Reader, outCh chan logLineData) error { logsScanner, linesCh := l.scan(ctx, logs) for { @@ -230,7 +242,10 @@ func (l *kubernetesLogProcessor) readLogs(ctx context.Context, logs io.Reader, o l.logsOffset = newLogsOffset } - outCh <- logLine + outCh <- logLineData{ + line: logLine, + offset: l.logsOffset, + } } } } diff --git a/executors/kubernetes/util.go b/executors/kubernetes/util.go index 86a140eea9b..4c1e60652cb 100644 --- a/executors/kubernetes/util.go +++ b/executors/kubernetes/util.go @@ -228,6 +228,10 @@ func waitForPodRunning( out io.Writer, config *common.KubernetesConfig, ) (api.PodPhase, error) { + if running, err := isRunning(pod); running { + return pod.Status.Phase, err + } + pollInterval := config.GetPollInterval() pollAttempts := config.GetPollAttempts() for i := 0; i <= pollAttempts; i++ { diff --git a/go.mod b/go.mod index 2ba4332db99..4c1f0c933f1 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module gitlab.com/gitlab-org/gitlab-runner -go 1.17 +go 1.18 require ( cloud.google.com/go v0.72.0 @@ -87,6 +87,7 @@ require ( github.com/dustin/go-humanize v1.0.0 // indirect github.com/form3tech-oss/jwt-go v3.2.2+incompatible // indirect github.com/go-logr/logr v0.4.0 // indirect + github.com/gofrs/flock v0.8.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect github.com/golang/protobuf v1.5.0 // indirect diff --git a/go.sum b/go.sum index 7b673a07751..b2ba295a07b 100644 --- a/go.sum +++ b/go.sum @@ -225,6 +225,8 @@ github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr6 github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= +github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= +github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= diff --git a/helpers/trace/buffer.go b/helpers/trace/buffer.go index 6d366bf4630..c5e4a94f370 100644 --- a/helpers/trace/buffer.go +++ b/helpers/trace/buffer.go @@ -35,6 +35,7 @@ type Buffer struct { type options struct { urlParamMasking bool + offset int } type Option func(*options) error @@ -46,6 +47,13 @@ func WithURLParamMasking(enabled bool) Option { } } +func WithOffset(offset int) Option { + return func(o *options) error { + o.offset = offset + return nil + } +} + type inverseLengthSort []string func (s inverseLengthSort) Len() int { @@ -224,6 +232,7 @@ func New(opts ...Option) (*Buffer, error) { options := options{ urlParamMasking: true, + offset: 0, } for _, o := range opts { @@ -233,6 +242,14 @@ func New(opts ...Option) (*Buffer, error) { } } + // TODO: + // This might not work exactly the same on a new log file, locally the file exists + // but in a new pod it won't in case the manager is running in a pod which it should + _, err = logFile.Seek(int64(options.offset), 0) + if err != nil { + return nil, err + } + buffer := &Buffer{ logFile: logFile, checksum: crc32.NewIEEE(), @@ -241,7 +258,7 @@ func New(opts ...Option) (*Buffer, error) { buffer.lw = &limitWriter{ w: io.MultiWriter(buffer.logFile, buffer.checksum), - written: 0, + written: int64(options.offset), limit: defaultBytesLimit, } diff --git a/network/gitlab.go b/network/gitlab.go index 518716fc147..bdd629bc92c 100644 --- a/network/gitlab.go +++ b/network/gitlab.go @@ -823,8 +823,9 @@ func (n *GitLabClient) downloadArtifactFile( func (n *GitLabClient) ProcessJob( config common.RunnerConfig, jobCredentials *common.JobCredentials, + startOffset int, ) (common.JobTrace, error) { - trace, err := newJobTrace(n, config, jobCredentials) + trace, err := newJobTrace(n, config, jobCredentials, startOffset) if err != nil { return nil, err } diff --git a/network/trace.go b/network/trace.go index f9d1949fe85..00f6a818f6e 100644 --- a/network/trace.go +++ b/network/trace.go @@ -3,6 +3,7 @@ package network import ( "context" "sync" + "sync/atomic" "time" "gitlab.com/gitlab-org/gitlab-runner/common" @@ -17,6 +18,7 @@ type clientJobTrace struct { id int64 cancelFunc context.CancelFunc abortFunc context.CancelFunc + onWriteFunc func(sentTraceLen int) buffer *trace.Buffer @@ -34,6 +36,8 @@ type clientJobTrace struct { failuresCollector common.FailuresCollector exitCode int + + enabled int32 } func (c *clientJobTrace) Success() { @@ -63,6 +67,10 @@ func (c *clientJobTrace) Fail(err error, failureData common.JobFailureData) { } func (c *clientJobTrace) Write(data []byte) (n int, err error) { + if atomic.LoadInt32(&c.enabled) == 0 { + return 0, nil + } + return c.buffer.Write(data) } @@ -113,6 +121,10 @@ func (c *clientJobTrace) SetAbortFunc(cancelFunc context.CancelFunc) { c.abortFunc = cancelFunc } +func (c *clientJobTrace) SetOnWriteFunc(onWriteFunc func(sentTraceLen int)) { + c.onWriteFunc = onWriteFunc +} + // Abort consumes function set by SetAbortFunc // The abort always have much higher importance than Cancel // as abort interrupts the execution, thus cancel is never @@ -244,7 +256,8 @@ func (c *clientJobTrace) anyTraceToSend() bool { c.lock.RLock() defer c.lock.RUnlock() - return c.buffer.Size() != c.sentTrace + size := c.buffer.Size() + return size != c.sentTrace } func (c *clientJobTrace) sendPatch() common.PatchTraceResult { @@ -269,6 +282,7 @@ func (c *clientJobTrace) sendPatch() common.PatchTraceResult { c.lock.Lock() c.sentTime = time.Now() c.sentTrace = result.SentOffset + c.onWriteFunc(c.sentTrace) c.lock.Unlock() } @@ -405,12 +419,24 @@ func (c *clientJobTrace) IsMaskingURLParams() bool { return c.config.IsFeatureFlagOn(featureflags.UseImprovedURLMasking) } +func (c *clientJobTrace) Disable() { + atomic.StoreInt32(&c.enabled, 0) +} + +func (c *clientJobTrace) Enable() { + atomic.StoreInt32(&c.enabled, 1) +} + func newJobTrace( client common.Network, config common.RunnerConfig, jobCredentials *common.JobCredentials, + startOffset int, ) (*clientJobTrace, error) { - buffer, err := trace.New(trace.WithURLParamMasking(config.IsFeatureFlagOn(featureflags.UseImprovedURLMasking))) + buffer, err := trace.New( + trace.WithURLParamMasking(config.IsFeatureFlagOn(featureflags.UseImprovedURLMasking)), + trace.WithOffset(startOffset), + ) if err != nil { return nil, err } @@ -424,5 +450,7 @@ func newJobTrace( maxTracePatchSize: common.DefaultTracePatchLimit, updateInterval: common.DefaultUpdateInterval, forceSendInterval: common.MinTraceForceSendInterval, + sentTrace: startOffset, + enabled: 1, }, nil } diff --git a/shells/abstract_test.go b/shells/abstract_test.go index 26f32fd6f16..221c0225c4c 100644 --- a/shells/abstract_test.go +++ b/shells/abstract_test.go @@ -1101,7 +1101,7 @@ func TestWriteUserScript(t *testing.T) { m.On("Variable", mock.Anything) m.On("Cd", mock.AnythingOfType("string")) m.On("Noticef", "$ %s", "echo hello").Once() - m.On("Line", "echo hello").Once() + m.On("line", "echo hello").Once() m.On("CheckForErrors").Once() }, expectedErr: nil, @@ -1130,9 +1130,9 @@ func TestWriteUserScript(t *testing.T) { m.On("Noticef", "$ %s", "echo prebuild").Once() m.On("Noticef", "$ %s", "echo release").Once() m.On("Noticef", "$ %s", "echo postbuild").Once() - m.On("Line", "echo prebuild").Once() - m.On("Line", "echo release").Once() - m.On("Line", "echo postbuild").Once() + m.On("line", "echo prebuild").Once() + m.On("line", "echo release").Once() + m.On("line", "echo postbuild").Once() m.On("CheckForErrors").Times(3) }, expectedErr: nil, @@ -1192,11 +1192,11 @@ func TestScriptSections(t *testing.T) { m.On("SectionEnd", mock.AnythingOfType("string")).Once() m.On("SectionStart", mock.AnythingOfType("string"), "$ echo postbuild").Once() m.On("SectionEnd", mock.AnythingOfType("string")).Once() - m.On("Line", "echo prebuild").Once() - m.On("Line", "script 1").Once() - m.On("Line", "script 2").Once() - m.On("Line", "script 3").Once() - m.On("Line", "echo postbuild").Once() + m.On("line", "echo prebuild").Once() + m.On("line", "script 1").Once() + m.On("line", "script 2").Once() + m.On("line", "script 3").Once() + m.On("line", "echo postbuild").Once() m.On("CheckForErrors").Times(5) }, }, @@ -1217,11 +1217,11 @@ func TestScriptSections(t *testing.T) { m.On("Noticef", "$ %s", "script 2").Once() m.On("Noticef", "$ %s", "script 3").Once() m.On("Noticef", "$ %s", "echo postbuild").Once() - m.On("Line", "echo prebuild").Once() - m.On("Line", "script 1").Once() - m.On("Line", "script 2").Once() - m.On("Line", "script 3").Once() - m.On("Line", "echo postbuild").Once() + m.On("line", "echo prebuild").Once() + m.On("line", "script 1").Once() + m.On("line", "script 2").Once() + m.On("line", "script 3").Once() + m.On("line", "echo postbuild").Once() m.On("CheckForErrors").Times(5) }, }, @@ -1242,11 +1242,11 @@ func TestScriptSections(t *testing.T) { m.On("Noticef", "$ %s", "script 2").Once() m.On("Noticef", "$ %s", "script 3").Once() m.On("Noticef", "$ %s", "echo postbuild").Once() - m.On("Line", "echo prebuild").Once() - m.On("Line", "script 1").Once() - m.On("Line", "script 2").Once() - m.On("Line", "script 3").Once() - m.On("Line", "echo postbuild").Once() + m.On("line", "echo prebuild").Once() + m.On("line", "script 1").Once() + m.On("line", "script 2").Once() + m.On("line", "script 3").Once() + m.On("line", "echo postbuild").Once() m.On("CheckForErrors").Times(5) }, }, @@ -1267,11 +1267,11 @@ func TestScriptSections(t *testing.T) { m.On("Noticef", "$ %s", "script 2").Once() m.On("Noticef", "$ %s", "script 3").Once() m.On("Noticef", "$ %s", "echo postbuild").Once() - m.On("Line", "echo prebuild").Once() - m.On("Line", "script 1").Once() - m.On("Line", "script 2").Once() - m.On("Line", "script 3").Once() - m.On("Line", "echo postbuild").Once() + m.On("line", "echo prebuild").Once() + m.On("line", "script 1").Once() + m.On("line", "script 2").Once() + m.On("line", "script 3").Once() + m.On("line", "echo postbuild").Once() m.On("CheckForErrors").Times(5) }, }, -- GitLab