Commit 0160cf3e authored by St. John Johnson's avatar St. John Johnson Committed by Kamil Trzciński

Prevent Executors from modifying Runner configuration

parent 0fba7a3a
......@@ -82,7 +82,7 @@ func (c *ExecCommand) createBuild(repoURL string, abortSignal chan os.Signal) (b
RunnerSettings: c.RunnerSettings,
}
build = common.NewBuild(jobResponse, runner, abortSignal, nil)
build, err = common.NewBuild(jobResponse, runner, abortSignal, nil)
return
}
......
......@@ -121,15 +121,21 @@ func (mr *RunCommand) feedRunners(runners chan *common.RunnerConfig) {
}
}
func (mr *RunCommand) requestJob(runner *common.RunnerConfig, sessionInfo *common.SessionInfo) (*common.JobResponse, bool) {
// requestJob will check if the runner can send another concurrent request to
// GitLab, if not the return value is nil.
func (mr *RunCommand) requestJob(runner *common.RunnerConfig, sessionInfo *common.SessionInfo) *common.JobResponse {
if !mr.buildsHelper.acquireRequest(runner) {
return nil, false
mr.log().WithField("runner", runner.ShortDescription()).
Debugln("Failed to request job: runner requestConcurrency meet")
return nil
}
defer mr.buildsHelper.releaseRequest(runner)
jobData, healthy := mr.network.RequestJob(*runner, sessionInfo)
mr.makeHealthy(runner.UniqueID(), healthy)
return jobData, true
return jobData
}
func (mr *RunCommand) processRunner(id int, runner *common.RunnerConfig, runners chan *common.RunnerConfig) (err error) {
......@@ -138,20 +144,16 @@ func (mr *RunCommand) processRunner(id int, runner *common.RunnerConfig, runners
return
}
context, err := provider.Acquire(runner)
executorData, releaseFn, err := mr.acquireRunnerResources(provider, runner)
if err != nil {
logrus.Warningln("Failed to update executor", runner.Executor, "for", runner.ShortDescription(), err)
mr.log().WithFields(logrus.Fields{
"runner": runner.ShortDescription(),
"executor": runner.Executor,
}).WithError(err).
Warn("Failed to acquire runner resource")
return
}
defer provider.Release(runner, context)
// Acquire build slot
if !mr.buildsHelper.acquireBuild(runner) {
mr.log().WithField("runner", runner.ShortDescription()).
Debugln("Failed to request job: runner limit meet")
return
}
defer mr.buildsHelper.releaseBuild(runner)
defer releaseFn()
var features common.FeaturesInfo
provider.GetFeatures(&features)
......@@ -161,12 +163,7 @@ func (mr *RunCommand) processRunner(id int, runner *common.RunnerConfig, runners
}
// Receive a new build
jobData, result := mr.requestJob(runner, sessionInfo)
if !result {
mr.log().WithField("runner", runner.ShortDescription()).
Debugln("Failed to request job: runner requestConcurrency meet")
return
}
jobData := mr.requestJob(runner, sessionInfo)
if jobData == nil {
return
}
......@@ -177,12 +174,22 @@ func (mr *RunCommand) processRunner(id int, runner *common.RunnerConfig, runners
Token: jobData.Token,
}
trace := mr.network.ProcessJob(*runner, jobCredentials)
defer trace.Fail(err, common.NoneFailure)
defer func() {
if err != nil {
fmt.Fprintln(trace, err.Error())
trace.Fail(err, common.RunnerSystemFailure)
} else {
trace.Fail(nil, common.NoneFailure)
}
}()
trace.SetFailuresCollector(mr.failuresCollector)
// Create a new build
build := common.NewBuild(*jobData, runner, mr.abortBuilds, context)
build, err := common.NewBuild(*jobData, runner, mr.abortBuilds, executorData)
if err != nil {
return
}
build.Session = buildSession
// Add build to list of builds to assign numbers
......@@ -203,6 +210,32 @@ func (mr *RunCommand) processRunner(id int, runner *common.RunnerConfig, runners
return build.Run(mr.config, trace)
}
func (mr *RunCommand) acquireRunnerResources(provider common.ExecutorProvider, runner *common.RunnerConfig) (common.ExecutorData, func(), error) {
executorData, err := provider.Acquire(runner)
if err != nil {
return nil, func() {}, fmt.Errorf("failed to update executor: %v", err)
}
releaseProviderFn := func() {
err := provider.Release(runner, executorData)
if err != nil {
logrus.WithError(err).Error("Failed to release executor")
}
}
if !mr.buildsHelper.acquireBuild(runner) {
releaseProviderFn()
return nil, nil, errors.New("failed to request job, runner limit met")
}
releaseFn := func() {
mr.buildsHelper.releaseBuild(runner)
releaseProviderFn()
}
return executorData, releaseFn, nil
}
func (mr *RunCommand) createSession(features common.FeaturesInfo) (*session.Session, *common.SessionInfo, error) {
if mr.sessionServer == nil || !features.Session {
return nil, nil, nil
......@@ -227,7 +260,10 @@ func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan
for mr.stopSignal == nil {
select {
case runner := <-runners:
mr.processRunner(id, runner, runners)
err := mr.processRunner(id, runner, runners)
if err != nil {
logrus.WithError(err).Error("Failed to process runner")
}
// force GC cycle after processing build
runtime.GC()
......
......@@ -87,7 +87,10 @@ func (r *RunSingleCommand) processBuild(data common.ExecutorData, abortSignal ch
}
config := common.NewConfig()
newBuild := common.NewBuild(*jobData, &r.RunnerConfig, abortSignal, data)
newBuild, err := common.NewBuild(*jobData, &r.RunnerConfig, abortSignal, data)
if err != nil {
return
}
jobCredentials := &common.JobCredentials{
ID: jobData.ID,
......@@ -148,7 +151,11 @@ func (r *RunSingleCommand) Execute(c *cli.Context) {
log.Warningln("Executor update:", err)
}
r.processBuild(data, abortSignal)
pErr := r.processBuild(data, abortSignal)
if pErr != nil {
log.WithError(pErr).Error("Failed to process build")
}
r.checkFinishedConditions()
executorProvider.Release(&r.RunnerConfig, data)
}
......
......@@ -706,14 +706,20 @@ func (b *Build) Duration() time.Duration {
return time.Since(b.createdAt)
}
func NewBuild(jobData JobResponse, runnerConfig *RunnerConfig, systemInterrupt chan os.Signal, executorData ExecutorData) *Build {
func NewBuild(jobData JobResponse, runnerConfig *RunnerConfig, systemInterrupt chan os.Signal, executorData ExecutorData) (*Build, error) {
// Attempt to perform a deep copy of the RunnerConfig
runnerConfigCopy, err := runnerConfig.DeepCopy()
if err != nil {
return nil, fmt.Errorf("deep copy of runner config failed: %v", err)
}
return &Build{
JobResponse: jobData,
Runner: runnerConfig,
Runner: runnerConfigCopy,
SystemInterrupt: systemInterrupt,
ExecutorData: executorData,
createdAt: time.Now(),
}
}, nil
}
func (b *Build) IsFeatureFlagOn(name string) bool {
......
......@@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-runner/helpers/docker"
"gitlab.com/gitlab-org/gitlab-runner/session"
"gitlab.com/gitlab-org/gitlab-runner/session/terminal"
)
......@@ -75,6 +76,63 @@ func TestBuildRun(t *testing.T) {
assert.NoError(t, err)
}
func TestBuildRunNoModifyConfig(t *testing.T) {
e := MockExecutor{}
defer e.AssertExpectations(t)
p := MockExecutorProvider{}
defer p.AssertExpectations(t)
// Create executor only once
p.On("CanCreate").Return(true).Once()
p.On("GetDefaultShell").Return("bash").Once()
p.On("GetFeatures", mock.Anything).Return(nil).Twice()
p.On("Create").Return(&e).Once()
// Attempt to modify the Config object
e.On("Prepare", mock.Anything, mock.Anything, mock.Anything).
Return(func(options ExecutorPrepareOptions) error {
options.Config.Docker.DockerCredentials.Host = "10.0.0.2"
return nil
}).Once()
// We run everything else once
e.On("Finish", nil).Return().Once()
e.On("Cleanup").Return().Once()
// Run script successfully
e.On("Shell").Return(&ShellScriptInfo{Shell: "script-shell"})
e.On("Run", matchBuildStage(BuildStagePrepare)).Return(nil).Once()
e.On("Run", matchBuildStage(BuildStageGetSources)).Return(nil).Once()
e.On("Run", matchBuildStage(BuildStageRestoreCache)).Return(nil).Once()
e.On("Run", matchBuildStage(BuildStageDownloadArtifacts)).Return(nil).Once()
e.On("Run", matchBuildStage(BuildStageUserScript)).Return(nil).Once()
e.On("Run", matchBuildStage(BuildStageAfterScript)).Return(nil).Once()
e.On("Run", matchBuildStage(BuildStageArchiveCache)).Return(nil).Once()
e.On("Run", matchBuildStage(BuildStageUploadOnSuccessArtifacts)).Return(nil).Once()
RegisterExecutor("build-run-nomodify-test", &p)
successfulBuild, err := GetSuccessfulBuild()
assert.NoError(t, err)
rc := &RunnerConfig{
RunnerSettings: RunnerSettings{
Executor: "build-run-nomodify-test",
Docker: &DockerConfig{
DockerCredentials: docker_helpers.DockerCredentials{
Host: "10.0.0.1",
},
},
},
}
build, err := NewBuild(successfulBuild, rc, nil, nil)
assert.NoError(t, err)
err = build.Run(&Config{}, &Trace{Writer: os.Stdout})
assert.NoError(t, err)
assert.Equal(t, "10.0.0.1", rc.Docker.DockerCredentials.Host)
}
func TestRetryPrepare(t *testing.T) {
PreparationRetryInterval = 0
......
......@@ -3,6 +3,7 @@ package common
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
......@@ -646,6 +647,23 @@ func (c *RunnerConfig) GetVariables() JobVariables {
return variables
}
// DeepCopy attempts to make a deep clone of the object
func (c *RunnerConfig) DeepCopy() (*RunnerConfig, error) {
var r RunnerConfig
bytes, err := json.Marshal(c)
if err != nil {
return nil, fmt.Errorf("serialization of runner config failed: %v", err)
}
err = json.Unmarshal(bytes, &r)
if err != nil {
return nil, fmt.Errorf("deserialization of runner config failed: %v", err)
}
return &r, err
}
func NewConfig() *Config {
return &Config{
Concurrent: 1,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment