Commit e5917d58 authored by Kamil Trzciński's avatar Kamil Trzciński

Acquire context before asking for build instead of doing that in main loop

parent 65e809f3
......@@ -11,7 +11,7 @@ type buildsHelper struct {
lock sync.Mutex
}
func (b *buildsHelper) acquire(runner *runnerAcquire) bool {
func (b *buildsHelper) acquire(runner *common.RunnerConfig) bool {
b.lock.Lock()
defer b.lock.Unlock()
......@@ -30,7 +30,7 @@ func (b *buildsHelper) acquire(runner *runnerAcquire) bool {
return true
}
func (b *buildsHelper) release(runner *runnerAcquire) bool {
func (b *buildsHelper) release(runner *common.RunnerConfig) bool {
b.lock.Lock()
defer b.lock.Unlock()
......
......@@ -20,16 +20,6 @@ import (
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/network"
)
type runnerAcquire struct {
common.RunnerConfig
provider common.ExecutorProvider
data common.ExecutorData
}
func (r *runnerAcquire) Release() {
r.provider.Release(&r.RunnerConfig, r.data)
}
type RunCommand struct {
configOptions
network common.Network
......@@ -66,26 +56,15 @@ func (mr *RunCommand) log() *log.Entry {
return log.WithField("builds", len(mr.buildsHelper.builds))
}
func (mr *RunCommand) feedRunner(runner *common.RunnerConfig, runners chan *runnerAcquire) {
func (mr *RunCommand) feedRunner(runner *common.RunnerConfig, runners chan *common.RunnerConfig) {
if !mr.isHealthy(runner.UniqueID()) {
return
}
provider := common.GetExecutor(runner.Executor)
if provider == nil {
return
}
data, err := provider.Acquire(runner)
if err != nil {
log.Warningln("Failed to update executor", runner.Executor, "for", runner.ShortDescription(), err)
return
}
runners <- &runnerAcquire{*runner, provider, data}
runners <- runner
}
func (mr *RunCommand) feedRunners(runners chan *runnerAcquire) {
func (mr *RunCommand) feedRunners(runners chan *common.RunnerConfig) {
for mr.stopSignal == nil {
mr.log().Debugln("Feeding runners to channel")
config := mr.config
......@@ -96,8 +75,18 @@ func (mr *RunCommand) feedRunners(runners chan *runnerAcquire) {
}
}
func (mr *RunCommand) processRunner(id int, runner *runnerAcquire) (err error) {
defer runner.Release()
func (mr *RunCommand) processRunner(id int, runner *common.RunnerConfig) (err error) {
provider := common.GetExecutor(runner.Executor)
if provider == nil {
return
}
context, err := provider.Acquire(runner)
if err != nil {
log.Warningln("Failed to update executor", runner.Executor, "for", runner.ShortDescription(), err)
return
}
defer provider.Release(runner, context)
// Acquire build slot
if !mr.buildsHelper.acquire(runner) {
......@@ -106,21 +95,21 @@ func (mr *RunCommand) processRunner(id int, runner *runnerAcquire) (err error) {
defer mr.buildsHelper.release(runner)
// Receive a new build
buildData, healthy := mr.network.GetBuild(runner.RunnerConfig)
buildData, healthy := mr.network.GetBuild(*runner)
mr.makeHealthy(runner.UniqueID(), healthy)
if buildData == nil {
return
}
// Make sure to always close output
trace := mr.network.ProcessBuild(runner.RunnerConfig, buildData.ID)
trace := mr.network.ProcessBuild(*runner, buildData.ID)
defer trace.Fail(err)
// Create a new build
build := &common.Build{
GetBuildResponse: *buildData,
Runner: &runner.RunnerConfig,
ExecutorData: runner.data,
Runner: runner,
ExecutorData: context,
BuildAbort: mr.abortBuilds,
}
......@@ -132,7 +121,7 @@ func (mr *RunCommand) processRunner(id int, runner *runnerAcquire) (err error) {
return build.Run(mr.config, trace)
}
func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan *runnerAcquire) {
func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan *common.RunnerConfig) {
mr.log().Debugln("Starting worker", id)
for mr.stopSignal == nil {
select {
......@@ -150,7 +139,7 @@ func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan
<-stopWorker
}
func (mr *RunCommand) startWorkers(startWorker chan int, stopWorker chan bool, runners chan *runnerAcquire) {
func (mr *RunCommand) startWorkers(startWorker chan int, stopWorker chan bool, runners chan *common.RunnerConfig) {
for mr.stopSignal == nil {
id := <-startWorker
go mr.processRunners(id, stopWorker, runners)
......@@ -274,7 +263,7 @@ func (mr *RunCommand) runWait() {
}
func (mr *RunCommand) Run() {
runners := make(chan *runnerAcquire)
runners := make(chan *common.RunnerConfig)
go mr.feedRunners(runners)
signal.Notify(mr.stopSignals, syscall.SIGQUIT, syscall.SIGTERM, os.Interrupt, os.Kill)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment