From f45cf7e319a9505d0eaa5e6a09f908bf97578450 Mon Sep 17 00:00:00 2001
From: "Georgi N. Georgiev" <ggeorgiev@gitlab.com>
Date: Thu, 30 Jun 2022 13:03:12 +0300
Subject: [PATCH] Add fault tolerance with file storage

Try 2
---
 commands/exec.go                           |   2 +-
 commands/helpers/artifact_metadata.go      |  29 +-
 commands/helpers/artifact_metadata_test.go |   4 +-
 commands/multi.go                          |  87 +++++-
 commands/single.go                         |   4 +-
 common/build.go                            | 141 ++++++----
 common/network.go                          |   5 +-
 common/store.go                            | 201 ++++++++++++++
 common/time.go                             |  31 +++
 common/trace.go                            |  21 +-
 executors/kubernetes/kubernetes.go         | 308 +++++++++++++++++----
 executors/kubernetes/log_processor.go      |  29 +-
 executors/kubernetes/util.go               |   4 +
 go.mod                                     |   3 +-
 go.sum                                     |   2 +
 helpers/trace/buffer.go                    |  19 +-
 network/gitlab.go                          |   3 +-
 network/trace.go                           |  32 ++-
 shells/abstract_test.go                    |  48 ++--
 19 files changed, 792 insertions(+), 181 deletions(-)
 create mode 100644 common/store.go
 create mode 100644 common/time.go

diff --git a/commands/exec.go b/commands/exec.go
index 243d234ed48..e4f8fdc7d6e 100644
--- a/commands/exec.go
+++ b/commands/exec.go
@@ -86,7 +86,7 @@ func (c *ExecCommand) createBuild(repoURL string, abortSignal chan os.Signal) (*
 		RunnerSettings: c.RunnerSettings,
 	}
 
-	return common.NewBuild(jobResponse, runner, abortSignal, nil)
+	return common.NewBuild(jobResponse, runner, abortSignal, nil, nil, nil)
 }
 
 func (c *ExecCommand) Execute(context *cli.Context) {
diff --git a/commands/helpers/artifact_metadata.go b/commands/helpers/artifact_metadata.go
index 49c9838639a..84537efddc6 100644
--- a/commands/helpers/artifact_metadata.go
+++ b/commands/helpers/artifact_metadata.go
@@ -9,7 +9,6 @@ import (
 	"io/ioutil"
 	"os"
 	"path/filepath"
-	"strconv"
 	"strings"
 	"time"
 
@@ -88,8 +87,8 @@ type AttestationPredicateInvocationEnvironment struct {
 type AttestationPredicateInvocationParameters map[string]string
 
 type AttestationMetadataInfo struct {
-	BuildStartedOn  TimeRFC3339                         `json:"buildStartedOn"`
-	BuildFinishedOn TimeRFC3339                         `json:"buildFinishedOn"`
+	BuildStartedOn  common.TimeRFC3339                  `json:"buildStartedOn"`
+	BuildFinishedOn common.TimeRFC3339                  `json:"buildFinishedOn"`
 	Reproducible    bool                                `json:"reproducible"`
 	Completeness    AttestationMetadataInfoCompleteness `json:"completeness"`
 }
@@ -100,26 +99,6 @@ type AttestationMetadataInfoCompleteness struct {
 	Materials   bool `json:"materials"`
 }
 
-// TimeRFC3339 is used specifically to marshal and unmarshal time to/from RFC3339 strings
-// That's because the metadata is user-facing and using Go's built-in time parsing will not be portable
-type TimeRFC3339 struct {
-	time.Time
-}
-
-func (t *TimeRFC3339) UnmarshalJSON(b []byte) error {
-	var err error
-	t.Time, err = time.Parse(time.RFC3339, strings.Trim(string(b), `"`))
-	return err
-}
-
-func (t TimeRFC3339) MarshalJSON() ([]byte, error) {
-	if t.IsZero() {
-		return nil, nil
-	}
-
-	return []byte(strconv.Quote(t.Time.Format(time.RFC3339))), nil
-}
-
 type generateMetadataOptions struct {
 	files map[string]os.FileInfo
 	wd    string
@@ -181,8 +160,8 @@ func (g *artifactMetadataGenerator) metadata(opts generateMetadataOptions) (Atte
 			},
 		},
 		Metadata: AttestationMetadataInfo{
-			BuildStartedOn:  TimeRFC3339{Time: startedAt},
-			BuildFinishedOn: TimeRFC3339{Time: endedAt},
+			BuildStartedOn:  common.TimeRFC3339{Time: startedAt},
+			BuildFinishedOn: common.TimeRFC3339{Time: endedAt},
 			Reproducible:    false,
 			Completeness: AttestationMetadataInfoCompleteness{
 				Parameters:  true,
diff --git a/commands/helpers/artifact_metadata_test.go b/commands/helpers/artifact_metadata_test.go
index 430d7aba860..68c5eafc6f4 100644
--- a/commands/helpers/artifact_metadata_test.go
+++ b/commands/helpers/artifact_metadata_test.go
@@ -80,10 +80,10 @@ func TestGenerateMetadataToFile(t *testing.T) {
 			},
 			Materials: make([]interface{}, 0),
 			Metadata: AttestationMetadataInfo{
-				BuildStartedOn: TimeRFC3339{
+				BuildStartedOn: common.TimeRFC3339{
 					Time: startedAt,
 				},
-				BuildFinishedOn: TimeRFC3339{
+				BuildFinishedOn: common.TimeRFC3339{
 					Time: endedAt,
 				},
 				Reproducible: false,
diff --git a/commands/multi.go b/commands/multi.go
index 14812c6efb5..1ac8f952f22 100644
--- a/commands/multi.go
+++ b/commands/multi.go
@@ -52,6 +52,7 @@ type RunCommand struct {
 	healthHelper
 
 	buildsHelper buildsHelper
+	jobStore     *common.MultiJobStore
 
 	ServiceName      string `short:"n" long:"service" description:"Use different names for different services"`
 	WorkingDirectory string `short:"d" long:"working-directory" description:"Specify custom working directory"`
@@ -104,6 +105,10 @@ func (mr *RunCommand) Start(_ service.Service) error {
 	mr.runFinished = make(chan bool, 1)
 	mr.stopSignals = make(chan os.Signal)
 
+	mr.jobStore = common.NewMultiJobStore(func(config *common.RunnerConfig) common.JobStore {
+		return common.NewFileJobStore("/tmp/", config)
+	})
+
 	mr.log().Info("Starting multi-runner from ", mr.ConfigFile, "...")
 
 	userModeWarning(false)
@@ -502,14 +507,21 @@ func (mr *RunCommand) processRunner(
 	}
 
 	// Receive a new build
-	trace, jobData, err := mr.requestJob(runner, sessionInfo)
+	trace, jobData, executionState, err := mr.requestJob(runner, sessionInfo)
 	if err != nil || jobData == nil {
 		return
 	}
 	defer func() { mr.traceOutcome(trace, err) }()
 
 	// Create a new build
-	build, err := common.NewBuild(*jobData, runner, mr.abortBuilds, executorData)
+	build, err := common.NewBuild(
+		*jobData,
+		runner,
+		mr.abortBuilds,
+		executorData,
+		executionState,
+		mr.jobStore.Get(runner),
+	)
 	if err != nil {
 		return
 	}
@@ -525,7 +537,13 @@ func (mr *RunCommand) processRunner(
 	mr.requeueRunner(runner, runners)
 
 	// Process a build
-	return build.Run(mr.config, trace)
+	if err := build.Run(mr.config, trace); err != nil {
+		// TODO:
+		_ = mr.jobStore.Get(runner).Remove(jobData.ID)
+		return err
+	}
+
+	return nil
 }
 
 func (mr *RunCommand) traceOutcome(trace common.JobTrace, err error) {
@@ -570,28 +588,63 @@ func (mr *RunCommand) createSession(provider common.ExecutorProvider) (*session.
 func (mr *RunCommand) requestJob(
 	runner *common.RunnerConfig,
 	sessionInfo *common.SessionInfo,
-) (common.JobTrace, *common.JobResponse, error) {
+) (common.JobTrace, *common.JobResponse, *common.JobExecutionState, error) {
 	if !mr.buildsHelper.acquireRequest(runner) {
 		mr.log().WithField("runner", runner.ShortDescription()).
 			Debugln("Failed to request job: runner requestConcurrency meet")
-		return nil, nil, nil
+		return nil, nil, nil, nil
 	}
 	defer mr.buildsHelper.releaseRequest(runner)
 
-	jobData, healthy := mr.doJobRequest(context.Background(), runner, sessionInfo)
-	mr.makeHealthy(runner.UniqueID(), healthy)
+	store := mr.jobStore.Get(runner)
 
-	if jobData == nil {
-		return nil, nil, nil
+	var jobData *common.JobResponse
+	executionState, err := store.FindJobToResume()
+	if err != nil {
+		// TODO: handle invalid json
+		return nil, nil, nil, err
+	}
+	if executionState != nil {
+		// TODO: should we check early the job status and fail if e.g. it's been cancelled?
+		jobData = executionState.Job
+		executionState.Resumes++
+		executionState.ResumedFromStage = executionState.Stage
+		// TODO:
+		_ = store.Save(executionState)
+		mr.makeHealthy(runner.UniqueID(), true)
+	} else {
+		var healthy bool
+		jobData, healthy = mr.doJobRequest(context.Background(), runner, sessionInfo)
+		mr.makeHealthy(runner.UniqueID(), healthy)
+
+		if jobData == nil {
+			return nil, nil, nil, nil
+		}
+
+		executionState = common.NewJobExecutionState(jobData)
+		if err := store.Save(executionState); err != nil {
+			return nil, nil, nil, err
+		}
 	}
 
+	go func() {
+		t := time.NewTicker(10 * time.Second)
+		for {
+			<-t.C
+			executionState.UpdateHealth()
+			if err := store.Save(executionState); err != nil {
+				mr.log().WithField("update job execution health error:", err)
+			}
+		}
+	}()
+
 	// Make sure to always close output
 	jobCredentials := &common.JobCredentials{
 		ID:    jobData.ID,
 		Token: jobData.Token,
 	}
 
-	trace, err := mr.network.ProcessJob(*runner, jobCredentials)
+	trace, err := mr.network.ProcessJob(*runner, jobCredentials, executionState.SentTrace)
 	if err != nil {
 		jobInfo := common.UpdateJobInfo{
 			ID:            jobCredentials.ID,
@@ -601,11 +654,21 @@ func (mr *RunCommand) requestJob(
 
 		// send failure once
 		mr.network.UpdateJob(*runner, jobCredentials, jobInfo)
-		return nil, nil, err
+		return nil, nil, nil, err
+	}
+
+	if executionState.IsResumed() {
+		trace.Disable()
 	}
 
+	trace.SetOnWriteFunc(func(sentTraceLen int) {
+		executionState.SentTrace = sentTraceLen
+		// TODO:
+		_ = store.Save(executionState)
+	})
+
 	trace.SetFailuresCollector(mr.failuresCollector)
-	return trace, jobData, nil
+	return trace, jobData, executionState, nil
 }
 
 // doJobRequest will execute the request for a new job, respecting an interruption
diff --git a/commands/single.go b/commands/single.go
index d5c90e85c76..76db6b8b3ca 100644
--- a/commands/single.go
+++ b/commands/single.go
@@ -93,7 +93,7 @@ func (r *RunSingleCommand) processBuild(data common.ExecutorData, abortSignal ch
 	}
 
 	config := common.NewConfig()
-	newBuild, err := common.NewBuild(*jobData, &r.RunnerConfig, abortSignal, data)
+	newBuild, err := common.NewBuild(*jobData, &r.RunnerConfig, abortSignal, data, nil, nil)
 	if err != nil {
 		return err
 	}
@@ -102,7 +102,7 @@ func (r *RunSingleCommand) processBuild(data common.ExecutorData, abortSignal ch
 		ID:    jobData.ID,
 		Token: jobData.Token,
 	}
-	trace, err := r.network.ProcessJob(r.RunnerConfig, jobCredentials)
+	trace, err := r.network.ProcessJob(r.RunnerConfig, jobCredentials, 0)
 	if err != nil {
 		return err
 	}
diff --git a/common/build.go b/common/build.go
index 10f8764d5f0..344c37a71dd 100644
--- a/common/build.go
+++ b/common/build.go
@@ -141,12 +141,10 @@ type Build struct {
 	// Unique ID for all running builds on this runner and this project
 	ProjectRunnerID int `json:"project_runner_id"`
 
+	statusLock sync.Mutex
 	// CurrentStage(), CurrentState() and CurrentExecutorStage() are called
 	// from the metrics go routine whilst a build is in-flight, so access
 	// to these variables requires a lock.
-	statusLock            sync.Mutex
-	currentStage          BuildStage
-	currentState          BuildRuntimeState
 	executorStageResolver func() ExecutorStage
 
 	secretsResolver func(l logger, registry SecretResolverRegistry) (SecretsResolver, error)
@@ -160,36 +158,48 @@ type Build struct {
 
 	createdAt time.Time
 
+	ExecutionState *JobExecutionState
+	Store          JobStore
+
 	Referees         []referees.Referee
 	ArtifactUploader func(config JobCredentials, reader io.ReadCloser, options ArtifactsOptions) (UploadState, string)
 }
 
 func (b *Build) setCurrentStage(stage BuildStage) {
-	b.statusLock.Lock()
-	defer b.statusLock.Unlock()
+	b.ExecutionState.Lock()
+	defer b.ExecutionState.Unlock()
 
-	b.currentStage = stage
+	if b.ExecutionState.Stage != stage {
+		b.ExecutionState.Stage = stage
+		// TODO:
+		_ = b.Store.Save(b.ExecutionState)
+	}
 }
 
 func (b *Build) CurrentStage() BuildStage {
-	b.statusLock.Lock()
-	defer b.statusLock.Unlock()
+	b.ExecutionState.Lock()
+	defer b.ExecutionState.Unlock()
+
+	return b.ExecutionState.Stage
 
-	return b.currentStage
 }
 
 func (b *Build) setCurrentState(state BuildRuntimeState) {
-	b.statusLock.Lock()
-	defer b.statusLock.Unlock()
+	b.ExecutionState.Lock()
+	defer b.ExecutionState.Unlock()
 
-	b.currentState = state
+	if b.ExecutionState.State != state {
+		b.ExecutionState.State = state
+		// TODO:
+		_ = b.Store.Save(b.ExecutionState)
+	}
 }
 
 func (b *Build) CurrentState() BuildRuntimeState {
-	b.statusLock.Lock()
-	defer b.statusLock.Unlock()
+	b.ExecutionState.Lock()
+	defer b.ExecutionState.Unlock()
 
-	return b.currentState
+	return b.ExecutionState.State
 }
 
 func (b *Build) Log() *logrus.Entry {
@@ -320,7 +330,18 @@ func (b *Build) StartBuild(rootDir, cacheDir string, customBuildDirEnabled, shar
 	return nil
 }
 
-func (b *Build) executeStage(ctx context.Context, buildStage BuildStage, executor Executor) error {
+func (b *Build) executeStage(ctx context.Context, buildStage BuildStage, executor Executor, trace JobTrace) error {
+	if b.ExecutionState.IsResumed() && b.ExecutionState.Stage != "" && buildStage != b.ExecutionState.Stage {
+		return nil
+	}
+
+	if b.ExecutionState.IsResumed() && b.ExecutionState.Stage == b.ExecutionState.ResumedFromStage {
+		// TODO: executors should still perform similar checks to make sure they won't print lines for a second time after resuming
+		// Enable the trace at the end of the method to avoid some more logs
+		// TODO: restore/close build sections?
+		defer trace.Enable()
+	}
+
 	if ctx.Err() != nil {
 		return ctx.Err()
 	}
@@ -423,29 +444,29 @@ func GetStageDescription(stage BuildStage) string {
 	return description
 }
 
-func (b *Build) executeUploadArtifacts(ctx context.Context, state error, executor Executor) (err error) {
+func (b *Build) executeUploadArtifacts(ctx context.Context, state error, executor Executor, trace JobTrace) (err error) {
 	if state == nil {
-		return b.executeStage(ctx, BuildStageUploadOnSuccessArtifacts, executor)
+		return b.executeStage(ctx, BuildStageUploadOnSuccessArtifacts, executor, trace)
 	}
 
-	return b.executeStage(ctx, BuildStageUploadOnFailureArtifacts, executor)
+	return b.executeStage(ctx, BuildStageUploadOnFailureArtifacts, executor, trace)
 }
 
-func (b *Build) executeArchiveCache(ctx context.Context, state error, executor Executor) (err error) {
+func (b *Build) executeArchiveCache(ctx context.Context, state error, executor Executor, trace JobTrace) (err error) {
 	if state == nil {
-		return b.executeStage(ctx, BuildStageArchiveOnSuccessCache, executor)
+		return b.executeStage(ctx, BuildStageArchiveOnSuccessCache, executor, trace)
 	}
 
-	return b.executeStage(ctx, BuildStageArchiveOnFailureCache, executor)
+	return b.executeStage(ctx, BuildStageArchiveOnFailureCache, executor, trace)
 }
 
-func (b *Build) executeScript(ctx context.Context, executor Executor) error {
+func (b *Build) executeScript(ctx context.Context, executor Executor, trace JobTrace) error {
 	// track job start and create referees
 	startTime := time.Now()
 	b.createReferees(executor)
 
 	// Prepare stage
-	err := b.executeStage(ctx, BuildStagePrepare, executor)
+	err := b.executeStage(ctx, BuildStagePrepare, executor, trace)
 	if err != nil {
 		return fmt.Errorf(
 			"prepare environment: %w. "+
@@ -454,13 +475,13 @@ func (b *Build) executeScript(ctx context.Context, executor Executor) error {
 		)
 	}
 
-	err = b.attemptExecuteStage(ctx, BuildStageGetSources, executor, b.GetGetSourcesAttempts())
+	err = b.attemptExecuteStage(ctx, BuildStageGetSources, executor, b.GetGetSourcesAttempts(), trace)
 
 	if err == nil {
-		err = b.attemptExecuteStage(ctx, BuildStageRestoreCache, executor, b.GetRestoreCacheAttempts())
+		err = b.attemptExecuteStage(ctx, BuildStageRestoreCache, executor, b.GetRestoreCacheAttempts(), trace)
 	}
 	if err == nil {
-		err = b.attemptExecuteStage(ctx, BuildStageDownloadArtifacts, executor, b.GetDownloadArtifactsAttempts())
+		err = b.attemptExecuteStage(ctx, BuildStageDownloadArtifacts, executor, b.GetDownloadArtifactsAttempts(), trace)
 	}
 
 	if err == nil {
@@ -469,24 +490,24 @@ func (b *Build) executeScript(ctx context.Context, executor Executor) error {
 			if s.Name == StepNameAfterScript {
 				continue
 			}
-			err = b.executeStage(ctx, StepToBuildStage(s), executor)
+			err = b.executeStage(ctx, StepToBuildStage(s), executor, trace)
 			if err != nil {
 				break
 			}
 		}
 
-		b.executeAfterScript(ctx, err, executor)
+		b.executeAfterScript(ctx, err, executor, trace)
 	}
 
-	archiveCacheErr := b.executeArchiveCache(ctx, err, executor)
+	archiveCacheErr := b.executeArchiveCache(ctx, err, executor, trace)
 
-	artifactUploadErr := b.executeUploadArtifacts(ctx, err, executor)
+	artifactUploadErr := b.executeUploadArtifacts(ctx, err, executor, trace)
 
 	// track job end and execute referees
 	endTime := time.Now()
 	b.executeUploadReferees(ctx, startTime, endTime)
 
-	b.removeFileBasedVariables(ctx, executor)
+	b.removeFileBasedVariables(ctx, executor, trace)
 
 	return b.pickPriorityError(err, archiveCacheErr, artifactUploadErr)
 }
@@ -505,7 +526,7 @@ func (b *Build) pickPriorityError(jobErr error, archiveCacheErr error, artifactU
 	return artifactUploadErr
 }
 
-func (b *Build) executeAfterScript(ctx context.Context, err error, executor Executor) {
+func (b *Build) executeAfterScript(ctx context.Context, err error, executor Executor, trace JobTrace) {
 	state, _ := b.runtimeStateAndError(err)
 	b.GetAllVariables().OverwriteKey("CI_JOB_STATUS", JobVariable{
 		Key:   "CI_JOB_STATUS",
@@ -515,7 +536,7 @@ func (b *Build) executeAfterScript(ctx context.Context, err error, executor Exec
 	ctx, cancel := context.WithTimeout(ctx, AfterScriptTimeout)
 	defer cancel()
 
-	_ = b.executeStage(ctx, BuildStageAfterScript, executor)
+	_ = b.executeStage(ctx, BuildStageAfterScript, executor, trace)
 }
 
 // StepToBuildStage returns the BuildStage corresponding to a step.
@@ -527,8 +548,8 @@ func (b *Build) createReferees(executor Executor) {
 	b.Referees = referees.CreateReferees(executor, b.Runner.Referees, b.Log())
 }
 
-func (b *Build) removeFileBasedVariables(ctx context.Context, executor Executor) {
-	err := b.executeStage(ctx, BuildStageCleanup, executor)
+func (b *Build) removeFileBasedVariables(ctx context.Context, executor Executor, trace JobTrace) {
+	err := b.executeStage(ctx, BuildStageCleanup, executor, trace)
 	if err != nil {
 		b.Log().WithError(err).Warning("Error while executing file based variables removal script")
 	}
@@ -572,12 +593,13 @@ func (b *Build) attemptExecuteStage(
 	buildStage BuildStage,
 	executor Executor,
 	attempts int,
+	trace JobTrace,
 ) (err error) {
 	if attempts < 1 || attempts > 10 {
 		return fmt.Errorf("number of attempts out of the range [1, 10] for stage: %s", buildStage)
 	}
 	for attempt := 0; attempt < attempts; attempt++ {
-		if err = b.executeStage(ctx, buildStage, executor); err == nil {
+		if err = b.executeStage(ctx, buildStage, executor, trace); err == nil {
 			return
 		}
 	}
@@ -621,8 +643,10 @@ func (b *Build) runtimeStateAndError(err error) (BuildRuntimeState, error) {
 	}
 }
 
-func (b *Build) run(ctx context.Context, executor Executor) (err error) {
-	b.setCurrentState(BuildRunRuntimeRunning)
+func (b *Build) run(ctx context.Context, executor Executor, trace JobTrace) (err error) {
+	if !b.ExecutionState.IsResumed() {
+		b.setCurrentState(BuildRunRuntimeRunning)
+	}
 
 	buildFinish := make(chan error, 1)
 	buildPanic := make(chan error, 1)
@@ -646,7 +670,7 @@ func (b *Build) run(ctx context.Context, executor Executor) (err error) {
 			}
 		}()
 
-		buildFinish <- b.executeScript(runContext, executor)
+		buildFinish <- b.executeScript(runContext, executor, trace)
 	}()
 
 	// Wait for signals: cancel, timeout, abort or finish
@@ -880,9 +904,11 @@ func (b *Build) Run(globalConfig *Config, trace JobTrace) (err error) {
 	}
 
 	b.logger = NewBuildLogger(trace, b.Log())
-	b.printRunningWithHeader()
 
-	b.setCurrentState(BuildRunStatePending)
+	if !b.ExecutionState.IsResumed() {
+		b.printRunningWithHeader()
+		b.setCurrentState(BuildRunStatePending)
+	}
 
 	// These defers are ordered because runBuild could panic and the recover needs to handle that panic.
 	// setTraceStatus needs to be last since it needs a correct error value to report the job's status
@@ -901,7 +927,12 @@ func (b *Build) Run(globalConfig *Config, trace JobTrace) (err error) {
 		return err
 	}
 
-	ctx, cancel := context.WithTimeout(context.Background(), b.GetBuildTimeout())
+	contextTimeout := b.GetBuildTimeout()
+	if b.ExecutionState.IsResumed() {
+		contextTimeout -= time.Since(b.ExecutionState.StartedAt.Time)
+	}
+
+	ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
 	defer cancel()
 
 	trace.SetCancelFunc(cancel)
@@ -922,7 +953,7 @@ func (b *Build) Run(globalConfig *Config, trace JobTrace) (err error) {
 	executor, err = b.executeBuildSection(executor, options, provider)
 
 	if err == nil {
-		err = b.run(ctx, executor)
+		err = b.run(ctx, executor, trace)
 		if errWait := b.waitForTerminal(ctx, globalConfig.SessionServer.GetSessionTimeout()); errWait != nil {
 			b.Log().WithError(errWait).Debug("Stopped waiting for terminal")
 		}
@@ -990,13 +1021,17 @@ func (b *Build) executeBuildSection(
 		Name:        string(BuildStagePrepareExecutor),
 		SkipMetrics: !b.JobResponse.Features.TraceSections,
 		Run: func() error {
-			msg := fmt.Sprintf(
-				"%sPreparing the %q executor%s",
-				helpers.ANSI_BOLD_CYAN,
-				b.Runner.Executor,
-				helpers.ANSI_RESET,
-			)
-			b.logger.Println(msg)
+			if !b.ExecutionState.IsResumed() {
+				msg := fmt.Sprintf(
+					"%sPreparing the %q executor%s",
+					helpers.ANSI_BOLD_CYAN,
+					b.Runner.Executor,
+					helpers.ANSI_RESET,
+				)
+
+				b.logger.Println(msg)
+			}
+
 			executor, err = b.retryCreateExecutor(options, provider, b.logger)
 			return err
 		},
@@ -1365,6 +1400,8 @@ func NewBuild(
 	runnerConfig *RunnerConfig,
 	systemInterrupt chan os.Signal,
 	executorData ExecutorData,
+	executionState *JobExecutionState,
+	store JobStore,
 ) (*Build, error) {
 	// Attempt to perform a deep copy of the RunnerConfig
 	runnerConfigCopy, err := runnerConfig.DeepCopy()
@@ -1379,6 +1416,8 @@ func NewBuild(
 		ExecutorData:    executorData,
 		createdAt:       time.Now(),
 		secretsResolver: newSecretsResolver,
+		ExecutionState:  executionState,
+		Store:           store,
 	}, nil
 }
 
diff --git a/common/network.go b/common/network.go
index c9fc85c2f3c..e5be730f54e 100644
--- a/common/network.go
+++ b/common/network.go
@@ -566,10 +566,13 @@ type JobTrace interface {
 	SetCancelFunc(cancelFunc context.CancelFunc)
 	Cancel() bool
 	SetAbortFunc(abortFunc context.CancelFunc)
+	SetOnWriteFunc(func(sentTraceLen int))
 	Abort() bool
 	SetFailuresCollector(fc FailuresCollector)
 	SetMasked(values []string)
 	IsStdout() bool
+	Disable()
+	Enable()
 }
 
 type UpdateJobResult struct {
@@ -602,5 +605,5 @@ type Network interface {
 	PatchTrace(config RunnerConfig, jobCredentials *JobCredentials, content []byte, startOffset int) PatchTraceResult
 	DownloadArtifacts(config JobCredentials, artifactsFile io.WriteCloser, directDownload *bool) DownloadState
 	UploadRawArtifacts(config JobCredentials, reader io.ReadCloser, options ArtifactsOptions) (UploadState, string)
-	ProcessJob(config RunnerConfig, buildCredentials *JobCredentials) (JobTrace, error)
+	ProcessJob(config RunnerConfig, buildCredentials *JobCredentials, startOffset int) (JobTrace, error)
 }
diff --git a/common/store.go b/common/store.go
new file mode 100644
index 00000000000..766cca403b1
--- /dev/null
+++ b/common/store.go
@@ -0,0 +1,201 @@
+package common
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/gofrs/flock"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"strings"
+	"sync"
+	"time"
+)
+
+type JobExecutionState struct {
+	sync.Mutex
+
+	Job              *JobResponse      `json:"job"`
+	Resumes          int               `json:"retries"`
+	State            BuildRuntimeState `json:"state"`
+	Stage            BuildStage        `json:"stage"`
+	HealthCheckAt    TimeRFC3339       `json:"health_check_at"`
+	StartedAt        TimeRFC3339       `json:"started_at"`
+	SentTrace        int               `json:"sent_trace"`
+	ExecutorMetadata any               `json:"executor_metadata"`
+
+	ResumedFromStage BuildStage
+}
+
+func NewJobExecutionState(job *JobResponse) *JobExecutionState {
+	return &JobExecutionState{
+		Job:           job,
+		Resumes:       0,
+		HealthCheckAt: NewTimeRFC3339(time.Now()),
+		StartedAt:     NewTimeRFC3339(time.Now()),
+	}
+}
+
+func (s *JobExecutionState) UpdateHealth() {
+	s.HealthCheckAt = NewTimeRFC3339(time.Now())
+}
+
+func (s *JobExecutionState) IsResumed() bool {
+	return s.Resumes > 0
+}
+
+func (s *JobExecutionState) SetExecutorMetadata(data any) {
+	s.Lock()
+	defer s.Unlock()
+	s.ExecutorMetadata = data
+}
+
+func (s *JobExecutionState) UpdateExecutorMetadata(mutator func(data any)) {
+	s.Lock()
+	defer s.Unlock()
+	mutator(s.ExecutorMetadata)
+}
+
+type StoreFilter func(job *JobExecutionState) bool
+
+var findRunningFilter = func(execution *JobExecutionState) bool {
+	return time.Since(execution.HealthCheckAt.Time) > 30*time.Second &&
+		(execution.State == BuildRunStatePending || execution.State == BuildRunRuntimeRunning)
+}
+
+type JobStore interface {
+	Save(job *JobExecutionState) error
+	Load(jobID int64) (*JobExecutionState, error)
+	Remove(jobID int64) error
+	FindJobToResume() (*JobExecutionState, error)
+}
+
+type StoreFactory func(*RunnerConfig) JobStore
+
+type MultiJobStore struct {
+	factory StoreFactory
+	stores  map[string]JobStore
+}
+
+func NewMultiJobStore(factory StoreFactory) *MultiJobStore {
+	return &MultiJobStore{stores: map[string]JobStore{}, factory: factory}
+}
+
+func (m *MultiJobStore) Get(runner *RunnerConfig) JobStore {
+	if m.stores[runner.UniqueID()] == nil {
+		m.stores[runner.UniqueID()] = m.factory(runner)
+	}
+
+	return m.stores[runner.UniqueID()]
+}
+
+type FileJobStore struct {
+	dir    string
+	runner *RunnerConfig
+}
+
+func NewFileJobStore(dir string, runner *RunnerConfig) *FileJobStore {
+	return &FileJobStore{dir, runner}
+}
+
+func (f *FileJobStore) FindJobToResume() (*JobExecutionState, error) {
+	files, err := ioutil.ReadDir(f.dir)
+	if err != nil {
+		return nil, err
+	}
+
+	for _, file := range files {
+		if !strings.HasSuffix(file.Name(), fmt.Sprintf("runner.%s-%s.json", f.runner.Name, f.runner.ShortDescription())) {
+			continue
+		}
+
+		state, err := func() (*JobExecutionState, error) {
+			path := filepath.Join(f.dir, file.Name())
+			lock := flock.New(path)
+			if err := lock.Lock(); err != nil {
+				return nil, err
+			}
+			defer lock.Unlock()
+
+			state, err := f.load(path)
+			if err != nil {
+				return nil, err
+			}
+
+			if findRunningFilter(state) {
+				state.UpdateHealth()
+				if err := f.saveNoLocK(state); err != nil {
+					return nil, err
+				}
+
+				return state, nil
+			}
+
+			return nil, nil
+		}()
+
+		if err != nil {
+			return nil, err
+		}
+
+		if state != nil {
+			return state, nil
+		}
+	}
+
+	return nil, nil
+}
+
+func (f *FileJobStore) statePath(jobID int64) string {
+	return filepath.Join(f.dir, fmt.Sprintf("state.job.%d.runner.%s-%s.json", jobID, f.runner.Name, f.runner.ShortDescription()))
+}
+
+func (f *FileJobStore) saveNoLocK(state *JobExecutionState) error {
+	file := f.statePath(state.Job.ID)
+	b, err := json.Marshal(state)
+	if err != nil {
+		return err
+	}
+
+	return ioutil.WriteFile(file, b, 0700)
+}
+
+func (f *FileJobStore) Save(state *JobExecutionState) error {
+	file := f.statePath(state.Job.ID)
+	lock := flock.New(file)
+	if err := lock.Lock(); err != nil {
+		return err
+	}
+	defer lock.Unlock()
+
+	return f.saveNoLocK(state)
+}
+
+func (f *FileJobStore) load(file string) (*JobExecutionState, error) {
+	b, err := ioutil.ReadFile(file)
+	if err != nil {
+		return nil, err
+	}
+
+	var state *JobExecutionState
+	if err := json.Unmarshal(b, &state); err != nil {
+		return nil, err
+	}
+
+	return state, nil
+}
+
+func (f *FileJobStore) Load(jobID int64) (*JobExecutionState, error) {
+	return f.load(f.statePath(jobID))
+}
+
+func (f *FileJobStore) Remove(jobID int64) error {
+	file := f.statePath(jobID)
+	lock := flock.New(file)
+	if err := lock.Lock(); err != nil {
+		return err
+	}
+	defer lock.Unlock()
+
+	return os.Remove(file)
+}
diff --git a/common/time.go b/common/time.go
new file mode 100644
index 00000000000..9f59dec8be8
--- /dev/null
+++ b/common/time.go
@@ -0,0 +1,31 @@
+package common
+
+import (
+	"strconv"
+	"strings"
+	"time"
+)
+
+// TimeRFC3339 is used specifically to marshal and unmarshal time to/from RFC3339 strings
+// That's because the metadata is user-facing and using Go's built-in time parsing will not be portable
+type TimeRFC3339 struct {
+	time.Time
+}
+
+func NewTimeRFC3339(t time.Time) TimeRFC3339 {
+	return TimeRFC3339{t}
+}
+
+func (t *TimeRFC3339) UnmarshalJSON(b []byte) error {
+	var err error
+	t.Time, err = time.Parse(time.RFC3339, strings.Trim(string(b), `"`))
+	return err
+}
+
+func (t TimeRFC3339) MarshalJSON() ([]byte, error) {
+	if t.IsZero() {
+		return nil, nil
+	}
+
+	return []byte(strconv.Quote(t.Time.Format(time.RFC3339))), nil
+}
diff --git a/common/trace.go b/common/trace.go
index 070181a36e2..1e116ce246f 100644
--- a/common/trace.go
+++ b/common/trace.go
@@ -8,10 +8,11 @@ import (
 )
 
 type Trace struct {
-	Writer     io.Writer
-	cancelFunc context.CancelFunc
-	abortFunc  context.CancelFunc
-	mutex      sync.Mutex
+	Writer      io.Writer
+	cancelFunc  context.CancelFunc
+	abortFunc   context.CancelFunc
+	onWriteFunc func(sentTraceLen int)
+	mutex       sync.Mutex
 }
 
 type masker interface {
@@ -92,3 +93,15 @@ func (s *Trace) SetFailuresCollector(fc FailuresCollector) {}
 func (s *Trace) IsStdout() bool {
 	return true
 }
+
+func (s *Trace) SetOnWriteFunc(onWriteFunc func(sentTraceLen int)) {
+	s.onWriteFunc = onWriteFunc
+}
+
+func (s *Trace) Enable() {
+
+}
+
+func (s *Trace) Disable() {
+
+}
diff --git a/executors/kubernetes/kubernetes.go b/executors/kubernetes/kubernetes.go
index 5fe3f9e5505..ea39f713e22 100644
--- a/executors/kubernetes/kubernetes.go
+++ b/executors/kubernetes/kubernetes.go
@@ -79,6 +79,26 @@ var (
 	resourceTypePullSecret     = "ImagePullSecret"
 )
 
+type jobStateResource struct {
+	Name      string `json:"name"`
+	Namespace string `json:"namespace"`
+}
+
+func resourceFromKubernetesType(from metav1.ObjectMeta) *jobStateResource {
+	return &jobStateResource{
+		Name:      from.Name,
+		Namespace: from.Namespace,
+	}
+}
+
+type jobStateMetadata struct {
+	Credentials *jobStateResource   `json:"credentials"`
+	ConfigMap   *jobStateResource   `json:"config_map"`
+	Pod         *jobStateResource   `json:"pod"`
+	Services    []*jobStateResource `json:"services"`
+	Offset      int64               `json:"offset"`
+}
+
 type commandTerminatedError struct {
 	exitCode int
 }
@@ -161,7 +181,7 @@ type executor struct {
 
 	featureChecker featureChecker
 
-	newLogProcessor func() logProcessor
+	newLogProcessor func(offset int64) logProcessor
 
 	remoteProcessTerminated chan shells.TrapCommandExitStatus
 
@@ -169,6 +189,8 @@ type executor struct {
 
 	// Flag if a repo mount and emptyDir volume are needed
 	requireDefaultBuildsDirVolume *bool
+
+	processingLogs bool
 }
 
 type serviceCreateResponse struct {
@@ -182,6 +204,15 @@ func (s *executor) Name() string {
 
 func (s *executor) Prepare(options common.ExecutorPrepareOptions) (err error) {
 	s.AbstractExecutor.PrepareConfiguration(options)
+	if s.Build.ExecutionState.IsResumed() && s.Build.ExecutionState.ExecutorMetadata != nil {
+		// When loaded from this this is a map[string]interface{}, we need to convert it
+		b, _ := json.Marshal(s.Build.ExecutionState.ExecutorMetadata)
+		var meta *jobStateMetadata
+		_ = json.Unmarshal(b, &meta)
+		s.Build.ExecutionState.ExecutorMetadata = meta
+	} else {
+		s.Build.ExecutionState.ExecutorMetadata = &jobStateMetadata{}
+	}
 
 	if err = s.prepareOverwrites(options.Build.GetAllVariables()); err != nil {
 		return fmt.Errorf("couldn't prepare overwrites: %w", err)
@@ -225,9 +256,11 @@ func (s *executor) Prepare(options common.ExecutorPrepareOptions) (err error) {
 
 	imageName := s.Build.GetAllVariables().ExpandValue(s.options.Image.Name)
 
-	s.Println("Using Kubernetes executor with image", imageName, "...")
-	if !s.Build.IsFeatureFlagOn(featureflags.UseLegacyKubernetesExecutionStrategy) {
-		s.Println("Using attach strategy to execute scripts...")
+	if !s.Build.ExecutionState.IsResumed() {
+		s.Println("Using Kubernetes executor with image", imageName, "...")
+		if !s.Build.IsFeatureFlagOn(featureflags.UseLegacyKubernetesExecutionStrategy) {
+			s.Println("Using attach strategy to execute scripts...")
+		}
 	}
 
 	s.Debugln(fmt.Sprintf("Using helper image: %s:%s", s.helperImageInfo.Name, s.helperImageInfo.Tag))
@@ -347,6 +380,10 @@ func (s *executor) runWithExecLegacy(cmd common.ExecutorCommand) error {
 }
 
 func (s *executor) runWithAttach(cmd common.ExecutorCommand) error {
+	if s.Build.ExecutionState.IsResumed() {
+		fmt.Println("Resuming stage ", s.Build.ExecutionState.Stage)
+	}
+
 	err := s.ensurePodsConfigured(cmd.Context)
 	if err != nil {
 		return err
@@ -355,25 +392,28 @@ func (s *executor) runWithAttach(cmd common.ExecutorCommand) error {
 	ctx, cancel := context.WithCancel(cmd.Context)
 	defer cancel()
 
-	containerName, containerCommand := s.getContainerInfo(cmd)
+	podStatusCh := s.watchPodStatus(ctx)
 
-	s.Debugln(fmt.Sprintf(
-		"Starting in container %q the command %q with script: %s",
-		containerName,
-		containerCommand,
-		cmd.Script,
-	))
+	var runInContainerCh <-chan error
+	if s.Build.ExecutionState.Stage != s.Build.ExecutionState.ResumedFromStage {
+		containerName, containerCommand := s.getContainerInfo(cmd)
 
-	podStatusCh := s.watchPodStatus(ctx)
+		s.Debugln(fmt.Sprintf(
+			"Starting in container %q the command %q with script: %s",
+			containerName,
+			containerCommand,
+			cmd.Script,
+		))
 
-	select {
-	case err := <-s.runInContainer(containerName, containerCommand):
-		s.Debugln(fmt.Sprintf("Container %q exited with error: %v", containerName, err))
-		var terminatedError *commandTerminatedError
-		if err != nil && errors.As(err, &terminatedError) {
-			return &common.BuildError{Inner: err, ExitCode: terminatedError.exitCode}
-		}
+		// If we resume from a stage we don't want to run the scripts for that stage again,
+		// rather let the log processor read the logs from the correct index and move on to the next stage
+		runInContainerCh = s.runInContainer(containerName, containerCommand)
+	} else {
+		runInContainerCh = s.listenForCommandExit()
+	}
 
+	select {
+	case err := <-runInContainerCh:
 		return err
 	case err := <-podStatusCh:
 		if IsKubernetesPodNotFoundError(err) {
@@ -387,10 +427,6 @@ func (s *executor) runWithAttach(cmd common.ExecutorCommand) error {
 }
 
 func (s *executor) ensurePodsConfigured(ctx context.Context) error {
-	if s.pod != nil {
-		return nil
-	}
-
 	err := s.setupCredentials()
 	if err != nil {
 		return fmt.Errorf("setting up credentials: %w", err)
@@ -401,13 +437,9 @@ func (s *executor) ensurePodsConfigured(ctx context.Context) error {
 		return fmt.Errorf("setting up scripts configMap: %w", err)
 	}
 
-	permissionsInitContainer, err := s.buildPermissionsInitContainer(s.helperImageInfo.OSType)
+	err = s.setupPods()
 	if err != nil {
-		return fmt.Errorf("building permissions init container: %w", err)
-	}
-	err = s.setupBuildPod([]api.Container{permissionsInitContainer})
-	if err != nil {
-		return fmt.Errorf("setting up build pod: %w", err)
+		return fmt.Errorf("setting up pods: %w", err)
 	}
 
 	status, err := waitForPodRunning(ctx, s.kubeClient, s.pod, s.Trace, s.Config.Kubernetes)
@@ -419,7 +451,63 @@ func (s *executor) ensurePodsConfigured(ctx context.Context) error {
 		return fmt.Errorf("pod failed to enter running state: %s", status)
 	}
 
-	go s.processLogs(ctx)
+	// TODO: can we and should we handle the case of:
+	// Runner is restarted
+	// Job is running in background
+	// Job completes while no running is handling it
+	// Runner starts but the pod is already done with its work
+	if !s.processingLogs {
+		s.processingLogs = true
+		go s.processLogs(ctx)
+	}
+
+	return nil
+}
+
+func (s *executor) setupPods() error {
+	if s.pod != nil {
+		return nil
+	}
+
+	existingPod := s.stateExecutorMetadata().Pod
+	existingServices := s.stateExecutorMetadata().Services
+	if s.Build.ExecutionState.IsResumed() && existingPod != nil {
+		// TODO: Ideally we will separate the creation/restoration of Pod and Services but it's not really
+		// necessary for now
+		s.Debugln("Restoring pods")
+
+		var err error
+		s.pod, err = s.kubeClient.CoreV1().
+			Pods(existingPod.Namespace).
+			Get(context.TODO(), existingPod.Name, metav1.GetOptions{})
+		if err != nil {
+			return err
+		}
+
+		for _, existingSrv := range existingServices {
+			srv, err := s.kubeClient.CoreV1().
+				Services(existingSrv.Namespace).
+				Get(context.TODO(), existingSrv.Name, metav1.GetOptions{})
+			if err != nil {
+				return err
+			}
+
+			// TODO: might be nil but we don't care right now
+			s.services = append(s.services, *srv)
+		}
+
+		return nil
+	}
+
+	permissionsInitContainer, err := s.buildPermissionsInitContainer(s.helperImageInfo.OSType)
+	if err != nil {
+		return fmt.Errorf("building permissions init container: %w", err)
+	}
+
+	err = s.setupBuildPod([]api.Container{permissionsInitContainer})
+	if err != nil {
+		return fmt.Errorf("setting up build pod: %w", err)
+	}
 
 	return nil
 }
@@ -550,15 +638,56 @@ func (s *executor) buildRedirectionCmd(shell string) string {
 }
 
 func (s *executor) processLogs(ctx context.Context) {
-	processor := s.newLogProcessor()
+	processor := s.newLogProcessor(s.stateExecutorMetadata().Offset)
 	logsCh, errCh := processor.Process(ctx)
 
+	var offsetToWriteMutex sync.RWMutex
+	var offsetToWrite int64
+	var offsetLastWritten int64
+	go func() {
+		writeOffset := func() {
+			s.updateStateExecutorMetadata(func(d *jobStateMetadata) {
+				offsetToWriteMutex.RLock()
+				if offsetLastWritten == offsetToWrite {
+					offsetToWriteMutex.RUnlock()
+					return
+				}
+				offsetToWriteMutex.RUnlock()
+
+				d.Offset = offsetToWrite
+				offsetToWriteMutex.Lock()
+				offsetLastWritten = offsetToWrite
+				offsetToWriteMutex.Unlock()
+			})
+
+		}
+
+		t := time.NewTicker(100 * time.Millisecond)
+		for {
+			select {
+			case <-t.C:
+				writeOffset()
+			case <-ctx.Done():
+				writeOffset()
+				return
+			}
+		}
+	}()
+
 	for {
 		select {
-		case line, ok := <-logsCh:
+		case lineData, ok := <-logsCh:
 			if !ok {
 				return
 			}
+
+			line := lineData.line
+			offset := lineData.offset
+
+			offsetToWriteMutex.Lock()
+			offsetToWrite = offset
+			offsetToWriteMutex.Unlock()
+
 			var status shells.TrapCommandExitStatus
 			if status.TryUnmarshal(line) {
 				s.remoteProcessTerminated <- status
@@ -596,6 +725,21 @@ func getExitCode(err error) int {
 }
 
 func (s *executor) setupScriptsConfigMap() error {
+	if s.configMap != nil {
+		return nil
+	}
+
+	existingConfigMap := s.stateExecutorMetadata().ConfigMap
+	if s.Build.ExecutionState.IsResumed() && existingConfigMap != nil {
+		s.Debugln("Restoring scripts config map")
+
+		var err error
+		s.configMap, err = s.kubeClient.CoreV1().
+			ConfigMaps(existingConfigMap.Namespace).
+			Get(context.TODO(), existingConfigMap.Name, metav1.GetOptions{})
+		return err
+	}
+
 	s.Debugln("Setting up scripts config map")
 
 	// After issue https://gitlab.com/gitlab-org/gitlab-runner/issues/10342 is resolved and
@@ -628,6 +772,10 @@ func (s *executor) setupScriptsConfigMap() error {
 		return fmt.Errorf("generating scripts config map: %w", err)
 	}
 
+	s.updateStateExecutorMetadata(func(d *jobStateMetadata) {
+		d.ConfigMap = resourceFromKubernetesType(s.configMap.ObjectMeta)
+	})
+
 	return nil
 }
 
@@ -1197,6 +1345,21 @@ func (s *executor) isSharedBuildsDirRequired() bool {
 }
 
 func (s *executor) setupCredentials() error {
+	if s.credentials != nil {
+		return nil
+	}
+
+	existingCredentials := s.stateExecutorMetadata().Credentials
+	if s.Build.ExecutionState.IsResumed() && existingCredentials != nil {
+		s.Debugln("Restoring secrets")
+
+		var err error
+		s.credentials, err = s.kubeClient.CoreV1().
+			Secrets(existingCredentials.Namespace).
+			Get(context.TODO(), existingCredentials.Name, metav1.GetOptions{})
+		return err
+	}
+
 	s.Debugln("Setting up secrets")
 
 	authConfigs, err := auth.ResolveConfigs(s.Build.GetDockerAuthConfig(), s.Shell().User, s.Build.Credentials)
@@ -1235,9 +1398,28 @@ func (s *executor) setupCredentials() error {
 	}
 
 	s.credentials = creds
+
+	s.updateStateExecutorMetadata(func(d *jobStateMetadata) {
+		d.Credentials = resourceFromKubernetesType(s.credentials.ObjectMeta)
+	})
+
 	return nil
 }
 
+func (s *executor) updateStateExecutorMetadata(mutator func(d *jobStateMetadata)) {
+	s.Build.ExecutionState.UpdateExecutorMetadata(func(data any) {
+		mutator(data.(*jobStateMetadata))
+	})
+
+	if err := s.Build.Store.Save(s.Build.ExecutionState); err != nil {
+		s.Errorln("Error saving execution state to store: ", err)
+	}
+}
+
+func (s *executor) stateExecutorMetadata() *jobStateMetadata {
+	return s.Build.ExecutionState.ExecutorMetadata.(*jobStateMetadata)
+}
+
 func (s *executor) getHostAliases() ([]api.HostAlias, error) {
 	supportsHostAliases, err := s.featureChecker.IsHostAliasSupported()
 	switch {
@@ -1313,6 +1495,13 @@ func (s *executor) setupBuildPod(initContainers []api.Container) error {
 		return err
 	}
 
+	s.updateStateExecutorMetadata(func(d *jobStateMetadata) {
+		d.Pod = resourceFromKubernetesType(s.pod.ObjectMeta)
+		for _, srv := range s.services {
+			d.Services = append(d.Services, resourceFromKubernetesType(srv.ObjectMeta))
+		}
+	})
+
 	return nil
 }
 
@@ -1477,6 +1666,9 @@ func (s *executor) createBuildAndHelperContainers() (api.Container, api.Containe
 }
 
 func (s *executor) setOwnerReferencesForResources(ownerReferences []metav1.OwnerReference) error {
+	// TODO: we could make all these operations more granular in terms of reliability and add
+	// owner reference state in the executor state object
+
 	if s.credentials != nil {
 		credentials := s.credentials.DeepCopy()
 		credentials.SetOwnerReferences(ownerReferences)
@@ -1741,10 +1933,8 @@ func (s *executor) checkPodStatus() error {
 }
 
 func (s *executor) runInContainer(name string, command []string) <-chan error {
-	errCh := make(chan error, 1)
+	errCh := s.listenForCommandExit()
 	go func() {
-		defer close(errCh)
-
 		attach := AttachOptions{
 			PodName:       s.pod.Name,
 			Namespace:     s.pod.Namespace,
@@ -1759,8 +1949,23 @@ func (s *executor) runInContainer(name string, command []string) <-chan error {
 		retryable := retry.New(retry.WithBuildLog(&attach, &s.BuildLogger))
 		err := retryable.Run()
 		if err != nil {
+			s.Debugln(fmt.Sprintf("Container %q exited with error: %v", name, err))
+			var terminatedError *commandTerminatedError
+			if err != nil && errors.As(err, &terminatedError) {
+				err = &common.BuildError{Inner: err, ExitCode: terminatedError.exitCode}
+			}
+
 			errCh <- err
 		}
+	}()
+
+	return errCh
+}
+
+func (s *executor) listenForCommandExit() chan error {
+	errCh := make(chan error, 1)
+	go func() {
+		defer close(errCh)
 
 		exitStatus := <-s.remoteProcessTerminated
 		s.Debugln("Remote process exited with the status:", exitStatus)
@@ -1890,13 +2095,19 @@ func (s *executor) checkDefaults() error {
 	}
 
 	if s.configurationOverwrites.namespace == "" {
-		s.Warningln(
-			fmt.Printf("Namespace is empty, therefore assuming '%s'.", DefaultResourceIdentifier),
-		)
+		// TODO: make all these trace printing steps smarter, e.g. disable trace until we reach the stage to resume from
+		if !s.Build.ExecutionState.IsResumed() {
+			s.Warningln(
+				fmt.Printf("Namespace is empty, therefore assuming '%s'.", DefaultResourceIdentifier),
+			)
+		}
+
 		s.configurationOverwrites.namespace = DefaultResourceIdentifier
 	}
 
-	s.Println("Using Kubernetes namespace:", s.configurationOverwrites.namespace)
+	if !s.Build.ExecutionState.IsResumed() {
+		s.Println("Using Kubernetes namespace:", s.configurationOverwrites.namespace)
+	}
 
 	return nil
 }
@@ -1917,19 +2128,22 @@ func newExecutor() *executor {
 		remoteProcessTerminated: make(chan shells.TrapCommandExitStatus),
 	}
 
-	e.newLogProcessor = func() logProcessor {
+	e.newLogProcessor = func(offset int64) logProcessor {
+		podConfig := kubernetesLogProcessorPodConfig{
+			namespace:          e.pod.Namespace,
+			pod:                e.pod.Name,
+			container:          helperContainerName,
+			logPath:            e.logFile(),
+			waitLogFileTimeout: waitLogFileTimeout,
+		}
+
 		return newKubernetesLogProcessor(
 			e.kubeClient,
 			e.kubeConfig,
 			&backoff.Backoff{Min: time.Second, Max: 30 * time.Second},
 			e.Build.Log(),
-			kubernetesLogProcessorPodConfig{
-				namespace:          e.pod.Namespace,
-				pod:                e.pod.Name,
-				container:          helperContainerName,
-				logPath:            e.logFile(),
-				waitLogFileTimeout: waitLogFileTimeout,
-			},
+			podConfig,
+			offset,
 		)
 	}
 
diff --git a/executors/kubernetes/log_processor.go b/executors/kubernetes/log_processor.go
index 3e6ba7c6e69..5b16166e44c 100644
--- a/executors/kubernetes/log_processor.go
+++ b/executors/kubernetes/log_processor.go
@@ -21,6 +21,11 @@ type logStreamer interface {
 	fmt.Stringer
 }
 
+type logLineData struct {
+	line   string
+	offset int64
+}
+
 type kubernetesLogStreamer struct {
 	kubernetesLogProcessorPodConfig
 
@@ -63,7 +68,7 @@ type logProcessor interface {
 	// Process listens for log lines
 	// consumers must read from the channel until it's closed
 	// consumers are also notified in case of error through the error channel
-	Process(ctx context.Context) (<-chan string, <-chan error)
+	Process(ctx context.Context) (<-chan logLineData, <-chan error)
 }
 
 type backoffCalculator interface {
@@ -94,6 +99,7 @@ func newKubernetesLogProcessor(
 	backoff backoffCalculator,
 	logger logrus.FieldLogger,
 	podCfg kubernetesLogProcessorPodConfig,
+	offset int64,
 ) *kubernetesLogProcessor {
 	logStreamer := &kubernetesLogStreamer{
 		kubernetesLogProcessorPodConfig: podCfg,
@@ -106,11 +112,13 @@ func newKubernetesLogProcessor(
 		backoff:     backoff,
 		logger:      logger,
 		logStreamer: logStreamer,
+
+		logsOffset: offset,
 	}
 }
 
-func (l *kubernetesLogProcessor) Process(ctx context.Context) (<-chan string, <-chan error) {
-	outCh := make(chan string)
+func (l *kubernetesLogProcessor) Process(ctx context.Context) (<-chan logLineData, <-chan error) {
+	outCh := make(chan logLineData)
 	errCh := make(chan error)
 	go func() {
 		defer close(outCh)
@@ -121,7 +129,7 @@ func (l *kubernetesLogProcessor) Process(ctx context.Context) (<-chan string, <-
 	return outCh, errCh
 }
 
-func (l *kubernetesLogProcessor) attach(ctx context.Context, outCh chan string, errCh chan error) {
+func (l *kubernetesLogProcessor) attach(ctx context.Context, outCh chan logLineData, errCh chan error) {
 	var (
 		attempt         float64 = -1
 		backoffDuration time.Duration
@@ -167,7 +175,11 @@ func (l *kubernetesLogProcessor) attach(ctx context.Context, outCh chan string,
 	}
 }
 
-func (l *kubernetesLogProcessor) processStream(ctx context.Context, outCh chan string) error {
+func (l *kubernetesLogProcessor) setLogsOffset(newOffset int64) {
+	l.logsOffset = newOffset
+}
+
+func (l *kubernetesLogProcessor) processStream(ctx context.Context, outCh chan logLineData) error {
 	reader, writer := io.Pipe()
 	defer func() {
 		_ = reader.Close()
@@ -212,7 +224,7 @@ func (l *kubernetesLogProcessor) processStream(ctx context.Context, outCh chan s
 	return gr.Wait()
 }
 
-func (l *kubernetesLogProcessor) readLogs(ctx context.Context, logs io.Reader, outCh chan string) error {
+func (l *kubernetesLogProcessor) readLogs(ctx context.Context, logs io.Reader, outCh chan logLineData) error {
 	logsScanner, linesCh := l.scan(ctx, logs)
 
 	for {
@@ -230,7 +242,10 @@ func (l *kubernetesLogProcessor) readLogs(ctx context.Context, logs io.Reader, o
 				l.logsOffset = newLogsOffset
 			}
 
-			outCh <- logLine
+			outCh <- logLineData{
+				line:   logLine,
+				offset: l.logsOffset,
+			}
 		}
 	}
 }
diff --git a/executors/kubernetes/util.go b/executors/kubernetes/util.go
index 86a140eea9b..4c1e60652cb 100644
--- a/executors/kubernetes/util.go
+++ b/executors/kubernetes/util.go
@@ -228,6 +228,10 @@ func waitForPodRunning(
 	out io.Writer,
 	config *common.KubernetesConfig,
 ) (api.PodPhase, error) {
+	if running, err := isRunning(pod); running {
+		return pod.Status.Phase, err
+	}
+
 	pollInterval := config.GetPollInterval()
 	pollAttempts := config.GetPollAttempts()
 	for i := 0; i <= pollAttempts; i++ {
diff --git a/go.mod b/go.mod
index 2ba4332db99..4c1f0c933f1 100644
--- a/go.mod
+++ b/go.mod
@@ -1,6 +1,6 @@
 module gitlab.com/gitlab-org/gitlab-runner
 
-go 1.17
+go 1.18
 
 require (
 	cloud.google.com/go v0.72.0
@@ -87,6 +87,7 @@ require (
 	github.com/dustin/go-humanize v1.0.0 // indirect
 	github.com/form3tech-oss/jwt-go v3.2.2+incompatible // indirect
 	github.com/go-logr/logr v0.4.0 // indirect
+	github.com/gofrs/flock v0.8.1 // indirect
 	github.com/gogo/protobuf v1.3.2 // indirect
 	github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
 	github.com/golang/protobuf v1.5.0 // indirect
diff --git a/go.sum b/go.sum
index 7b673a07751..b2ba295a07b 100644
--- a/go.sum
+++ b/go.sum
@@ -225,6 +225,8 @@ github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr6
 github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
 github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
 github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
+github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
+github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
 github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
 github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
 github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
diff --git a/helpers/trace/buffer.go b/helpers/trace/buffer.go
index 6d366bf4630..c5e4a94f370 100644
--- a/helpers/trace/buffer.go
+++ b/helpers/trace/buffer.go
@@ -35,6 +35,7 @@ type Buffer struct {
 
 type options struct {
 	urlParamMasking bool
+	offset          int
 }
 
 type Option func(*options) error
@@ -46,6 +47,13 @@ func WithURLParamMasking(enabled bool) Option {
 	}
 }
 
+func WithOffset(offset int) Option {
+	return func(o *options) error {
+		o.offset = offset
+		return nil
+	}
+}
+
 type inverseLengthSort []string
 
 func (s inverseLengthSort) Len() int {
@@ -224,6 +232,7 @@ func New(opts ...Option) (*Buffer, error) {
 
 	options := options{
 		urlParamMasking: true,
+		offset:          0,
 	}
 
 	for _, o := range opts {
@@ -233,6 +242,14 @@ func New(opts ...Option) (*Buffer, error) {
 		}
 	}
 
+	// TODO:
+	// This might not work exactly the same on a new log file, locally the file exists
+	// but in a new pod it won't in case the manager is running in a pod which it should
+	_, err = logFile.Seek(int64(options.offset), 0)
+	if err != nil {
+		return nil, err
+	}
+
 	buffer := &Buffer{
 		logFile:  logFile,
 		checksum: crc32.NewIEEE(),
@@ -241,7 +258,7 @@ func New(opts ...Option) (*Buffer, error) {
 
 	buffer.lw = &limitWriter{
 		w:       io.MultiWriter(buffer.logFile, buffer.checksum),
-		written: 0,
+		written: int64(options.offset),
 		limit:   defaultBytesLimit,
 	}
 
diff --git a/network/gitlab.go b/network/gitlab.go
index 518716fc147..bdd629bc92c 100644
--- a/network/gitlab.go
+++ b/network/gitlab.go
@@ -823,8 +823,9 @@ func (n *GitLabClient) downloadArtifactFile(
 func (n *GitLabClient) ProcessJob(
 	config common.RunnerConfig,
 	jobCredentials *common.JobCredentials,
+	startOffset int,
 ) (common.JobTrace, error) {
-	trace, err := newJobTrace(n, config, jobCredentials)
+	trace, err := newJobTrace(n, config, jobCredentials, startOffset)
 	if err != nil {
 		return nil, err
 	}
diff --git a/network/trace.go b/network/trace.go
index f9d1949fe85..00f6a818f6e 100644
--- a/network/trace.go
+++ b/network/trace.go
@@ -3,6 +3,7 @@ package network
 import (
 	"context"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"gitlab.com/gitlab-org/gitlab-runner/common"
@@ -17,6 +18,7 @@ type clientJobTrace struct {
 	id             int64
 	cancelFunc     context.CancelFunc
 	abortFunc      context.CancelFunc
+	onWriteFunc    func(sentTraceLen int)
 
 	buffer *trace.Buffer
 
@@ -34,6 +36,8 @@ type clientJobTrace struct {
 
 	failuresCollector common.FailuresCollector
 	exitCode          int
+
+	enabled int32
 }
 
 func (c *clientJobTrace) Success() {
@@ -63,6 +67,10 @@ func (c *clientJobTrace) Fail(err error, failureData common.JobFailureData) {
 }
 
 func (c *clientJobTrace) Write(data []byte) (n int, err error) {
+	if atomic.LoadInt32(&c.enabled) == 0 {
+		return 0, nil
+	}
+
 	return c.buffer.Write(data)
 }
 
@@ -113,6 +121,10 @@ func (c *clientJobTrace) SetAbortFunc(cancelFunc context.CancelFunc) {
 	c.abortFunc = cancelFunc
 }
 
+func (c *clientJobTrace) SetOnWriteFunc(onWriteFunc func(sentTraceLen int)) {
+	c.onWriteFunc = onWriteFunc
+}
+
 // Abort consumes function set by SetAbortFunc
 // The abort always have much higher importance than Cancel
 // as abort interrupts the execution, thus cancel is never
@@ -244,7 +256,8 @@ func (c *clientJobTrace) anyTraceToSend() bool {
 	c.lock.RLock()
 	defer c.lock.RUnlock()
 
-	return c.buffer.Size() != c.sentTrace
+	size := c.buffer.Size()
+	return size != c.sentTrace
 }
 
 func (c *clientJobTrace) sendPatch() common.PatchTraceResult {
@@ -269,6 +282,7 @@ func (c *clientJobTrace) sendPatch() common.PatchTraceResult {
 		c.lock.Lock()
 		c.sentTime = time.Now()
 		c.sentTrace = result.SentOffset
+		c.onWriteFunc(c.sentTrace)
 		c.lock.Unlock()
 	}
 
@@ -405,12 +419,24 @@ func (c *clientJobTrace) IsMaskingURLParams() bool {
 	return c.config.IsFeatureFlagOn(featureflags.UseImprovedURLMasking)
 }
 
+func (c *clientJobTrace) Disable() {
+	atomic.StoreInt32(&c.enabled, 0)
+}
+
+func (c *clientJobTrace) Enable() {
+	atomic.StoreInt32(&c.enabled, 1)
+}
+
 func newJobTrace(
 	client common.Network,
 	config common.RunnerConfig,
 	jobCredentials *common.JobCredentials,
+	startOffset int,
 ) (*clientJobTrace, error) {
-	buffer, err := trace.New(trace.WithURLParamMasking(config.IsFeatureFlagOn(featureflags.UseImprovedURLMasking)))
+	buffer, err := trace.New(
+		trace.WithURLParamMasking(config.IsFeatureFlagOn(featureflags.UseImprovedURLMasking)),
+		trace.WithOffset(startOffset),
+	)
 	if err != nil {
 		return nil, err
 	}
@@ -424,5 +450,7 @@ func newJobTrace(
 		maxTracePatchSize: common.DefaultTracePatchLimit,
 		updateInterval:    common.DefaultUpdateInterval,
 		forceSendInterval: common.MinTraceForceSendInterval,
+		sentTrace:         startOffset,
+		enabled:           1,
 	}, nil
 }
diff --git a/shells/abstract_test.go b/shells/abstract_test.go
index 26f32fd6f16..221c0225c4c 100644
--- a/shells/abstract_test.go
+++ b/shells/abstract_test.go
@@ -1101,7 +1101,7 @@ func TestWriteUserScript(t *testing.T) {
 				m.On("Variable", mock.Anything)
 				m.On("Cd", mock.AnythingOfType("string"))
 				m.On("Noticef", "$ %s", "echo hello").Once()
-				m.On("Line", "echo hello").Once()
+				m.On("line", "echo hello").Once()
 				m.On("CheckForErrors").Once()
 			},
 			expectedErr: nil,
@@ -1130,9 +1130,9 @@ func TestWriteUserScript(t *testing.T) {
 				m.On("Noticef", "$ %s", "echo prebuild").Once()
 				m.On("Noticef", "$ %s", "echo release").Once()
 				m.On("Noticef", "$ %s", "echo postbuild").Once()
-				m.On("Line", "echo prebuild").Once()
-				m.On("Line", "echo release").Once()
-				m.On("Line", "echo postbuild").Once()
+				m.On("line", "echo prebuild").Once()
+				m.On("line", "echo release").Once()
+				m.On("line", "echo postbuild").Once()
 				m.On("CheckForErrors").Times(3)
 			},
 			expectedErr: nil,
@@ -1192,11 +1192,11 @@ func TestScriptSections(t *testing.T) {
 				m.On("SectionEnd", mock.AnythingOfType("string")).Once()
 				m.On("SectionStart", mock.AnythingOfType("string"), "$ echo postbuild").Once()
 				m.On("SectionEnd", mock.AnythingOfType("string")).Once()
-				m.On("Line", "echo prebuild").Once()
-				m.On("Line", "script 1").Once()
-				m.On("Line", "script 2").Once()
-				m.On("Line", "script 3").Once()
-				m.On("Line", "echo postbuild").Once()
+				m.On("line", "echo prebuild").Once()
+				m.On("line", "script 1").Once()
+				m.On("line", "script 2").Once()
+				m.On("line", "script 3").Once()
+				m.On("line", "echo postbuild").Once()
 				m.On("CheckForErrors").Times(5)
 			},
 		},
@@ -1217,11 +1217,11 @@ func TestScriptSections(t *testing.T) {
 				m.On("Noticef", "$ %s", "script 2").Once()
 				m.On("Noticef", "$ %s", "script 3").Once()
 				m.On("Noticef", "$ %s", "echo postbuild").Once()
-				m.On("Line", "echo prebuild").Once()
-				m.On("Line", "script 1").Once()
-				m.On("Line", "script 2").Once()
-				m.On("Line", "script 3").Once()
-				m.On("Line", "echo postbuild").Once()
+				m.On("line", "echo prebuild").Once()
+				m.On("line", "script 1").Once()
+				m.On("line", "script 2").Once()
+				m.On("line", "script 3").Once()
+				m.On("line", "echo postbuild").Once()
 				m.On("CheckForErrors").Times(5)
 			},
 		},
@@ -1242,11 +1242,11 @@ func TestScriptSections(t *testing.T) {
 				m.On("Noticef", "$ %s", "script 2").Once()
 				m.On("Noticef", "$ %s", "script 3").Once()
 				m.On("Noticef", "$ %s", "echo postbuild").Once()
-				m.On("Line", "echo prebuild").Once()
-				m.On("Line", "script 1").Once()
-				m.On("Line", "script 2").Once()
-				m.On("Line", "script 3").Once()
-				m.On("Line", "echo postbuild").Once()
+				m.On("line", "echo prebuild").Once()
+				m.On("line", "script 1").Once()
+				m.On("line", "script 2").Once()
+				m.On("line", "script 3").Once()
+				m.On("line", "echo postbuild").Once()
 				m.On("CheckForErrors").Times(5)
 			},
 		},
@@ -1267,11 +1267,11 @@ func TestScriptSections(t *testing.T) {
 				m.On("Noticef", "$ %s", "script 2").Once()
 				m.On("Noticef", "$ %s", "script 3").Once()
 				m.On("Noticef", "$ %s", "echo postbuild").Once()
-				m.On("Line", "echo prebuild").Once()
-				m.On("Line", "script 1").Once()
-				m.On("Line", "script 2").Once()
-				m.On("Line", "script 3").Once()
-				m.On("Line", "echo postbuild").Once()
+				m.On("line", "echo prebuild").Once()
+				m.On("line", "script 1").Once()
+				m.On("line", "script 2").Once()
+				m.On("line", "script 3").Once()
+				m.On("line", "echo postbuild").Once()
 				m.On("CheckForErrors").Times(5)
 			},
 		},
-- 
GitLab