Commit 5a3aa812 authored by Kamil Trzciński's avatar Kamil Trzciński

Introduce Acquire/Release to Executor

- This allow to allocate resources in context of executor for time of fetching the builds
parent 867071e5
package commands
import (
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/common"
"sync"
)
type buildsHelper struct {
builds []*common.Build
buildsLock sync.Mutex
}
func (b *buildsHelper) count(runner *common.RunnerConfig) int {
count := 0
for _, build := range b.builds {
if build.Runner.ShortDescription() == runner.ShortDescription() {
count++
}
}
return count
}
func (b *buildsHelper) acquire(runner *runnerAcquire) (build *common.Build) {
b.buildsLock.Lock()
defer b.buildsLock.Unlock()
// Check number of builds
count := b.count(&runner.RunnerConfig)
if runner.Limit > 0 && count >= runner.Limit {
// Too many builds
return
}
// Create a new build
build = &common.Build{
Runner: &runner.RunnerConfig,
ExecutorData: runner.data,
}
build.AssignID(b.builds...)
b.builds = append(b.builds, build)
return
}
func (b *buildsHelper) release(deleteBuild *common.Build) bool {
b.buildsLock.Lock()
defer b.buildsLock.Unlock()
for idx, build := range b.builds {
if build == deleteBuild {
b.builds = append(b.builds[0:idx], b.builds[idx+1:]...)
return true
}
}
return false
}
package commands
import (
"sync"
"time"
"github.com/Sirupsen/logrus"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/common"
)
type healthData struct {
failures int
lastCheck time.Time
}
type healthHelper struct {
healthy map[string]*healthData
healthyLock sync.Mutex
}
func (mr *healthHelper) getHealth(id string) *healthData {
mr.healthyLock.Lock()
defer mr.healthyLock.Unlock()
if mr.healthy == nil {
mr.healthy = map[string]*healthData{}
}
health := mr.healthy[id]
if health == nil {
health = &healthData{
lastCheck: time.Now(),
}
mr.healthy[id] = health
}
return health
}
func (mr *healthHelper) isHealthy(id string) bool {
health := mr.getHealth(id)
if health.failures < common.HealthyChecks {
return true
}
if time.Since(health.lastCheck) > common.HealthCheckInterval*time.Second {
logrus.Errorln("Runner", id, "is not healthy, but will be checked!")
health.failures = 0
health.lastCheck = time.Now()
return true
}
return false
}
func (mr *healthHelper) makeHealthy(id string, healthy bool) {
health := mr.getHealth(id)
if healthy {
health.failures = 0
health.lastCheck = time.Now()
} else {
health.failures++
if health.failures >= common.HealthyChecks {
logrus.Errorln("Runner", id, "is not healthy and will be disabled!")
}
}
}
...@@ -6,7 +6,6 @@ import ( ...@@ -6,7 +6,6 @@ import (
"os" "os"
"os/signal" "os/signal"
"runtime" "runtime"
"sync"
"syscall" "syscall"
"time" "time"
...@@ -16,28 +15,32 @@ import ( ...@@ -16,28 +15,32 @@ import (
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/common" "gitlab.com/gitlab-org/gitlab-ci-multi-runner/common"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/helpers"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/helpers/service" "gitlab.com/gitlab-org/gitlab-ci-multi-runner/helpers/service"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/network" "gitlab.com/gitlab-org/gitlab-ci-multi-runner/network"
) )
type RunnerHealth struct { type runnerAcquire struct {
failures int common.RunnerConfig
lastCheck time.Time provider common.ExecutorProvider
data common.ExecutorData
}
func (r *runnerAcquire) Release() {
r.provider.Release(&r.RunnerConfig, r.data)
} }
type RunCommand struct { type RunCommand struct {
configOptions configOptions
network common.Network network common.Network
healthHelper
buildsHelper
ServiceName string `short:"n" long:"service" description:"Use different names for different services"` ServiceName string `short:"n" long:"service" description:"Use different names for different services"`
WorkingDirectory string `short:"d" long:"working-directory" description:"Specify custom working directory"` WorkingDirectory string `short:"d" long:"working-directory" description:"Specify custom working directory"`
User string `short:"u" long:"user" description:"Use specific user to execute shell scripts"` User string `short:"u" long:"user" description:"Use specific user to execute shell scripts"`
Syslog bool `long:"syslog" description:"Log to syslog"` Syslog bool `long:"syslog" description:"Log to syslog"`
builds []*common.Build
buildsLock sync.RWMutex
healthy map[string]*RunnerHealth
healthyLock sync.Mutex
finished bool finished bool
abortBuilds chan os.Signal abortBuilds chan os.Signal
interruptSignal chan os.Signal interruptSignal chan os.Signal
...@@ -49,148 +52,66 @@ func (mr *RunCommand) log() *log.Entry { ...@@ -49,148 +52,66 @@ func (mr *RunCommand) log() *log.Entry {
return log.WithField("builds", len(mr.builds)) return log.WithField("builds", len(mr.builds))
} }
func (mr *RunCommand) getHealth(runner *common.RunnerConfig) *RunnerHealth { func (mr *RunCommand) feedRunner(runner *common.RunnerConfig, runners chan *runnerAcquire) {
mr.healthyLock.Lock() if !mr.isHealthy(runner.UniqueID()) {
defer mr.healthyLock.Unlock() return
if mr.healthy == nil {
mr.healthy = map[string]*RunnerHealth{}
}
health := mr.healthy[runner.UniqueID()]
if health == nil {
health = &RunnerHealth{
lastCheck: time.Now(),
}
mr.healthy[runner.UniqueID()] = health
}
return health
}
func (mr *RunCommand) isHealthy(runner *common.RunnerConfig) bool {
health := mr.getHealth(runner)
if health.failures < common.HealthyChecks {
return true
} }
if time.Since(health.lastCheck) > common.HealthCheckInterval*time.Second { provider := common.GetExecutor(runner.Executor)
mr.log().Errorln("Runner", runner.ShortDescription(), "is not healthy, but will be checked!") if provider == nil {
health.failures = 0 return
health.lastCheck = time.Now()
return true
} }
return false data, err := provider.Acquire(runner)
} if err != nil {
log.Warningln("Failed to update executor", runner.Executor, "for", runner.ShortDescription(), err)
func (mr *RunCommand) makeHealthy(runner *common.RunnerConfig) { return
health := mr.getHealth(runner)
health.failures = 0
health.lastCheck = time.Now()
}
func (mr *RunCommand) makeUnhealthy(runner *common.RunnerConfig) {
health := mr.getHealth(runner)
health.failures++
if health.failures >= common.HealthyChecks {
mr.log().Errorln("Runner", runner.ShortDescription(), "is not healthy and will be disabled!")
} }
}
func (mr *RunCommand) addBuild(newBuild *common.Build) {
mr.buildsLock.Lock()
defer mr.buildsLock.Unlock()
newBuild.AssignID(mr.builds...) runners <- &runnerAcquire{*runner, provider, data}
mr.builds = append(mr.builds, newBuild)
mr.log().Debugln("Added a new build", newBuild)
} }
func (mr *RunCommand) removeBuild(deleteBuild *common.Build) bool { func (mr *RunCommand) feedRunners(runners chan *runnerAcquire) {
mr.buildsLock.Lock() for !mr.finished {
defer mr.buildsLock.Unlock() mr.log().Debugln("Feeding runners to channel")
config := mr.config
for idx, build := range mr.builds { for _, runner := range config.Runners {
if build == deleteBuild { mr.feedRunner(runner, runners)
mr.builds = append(mr.builds[0:idx], mr.builds[idx+1:]...)
mr.log().Debugln("Build removed", deleteBuild)
return true
}
}
return false
}
func (mr *RunCommand) buildsForRunner(runner *common.RunnerConfig) int {
count := 0
for _, build := range mr.builds {
if build.Runner == runner {
count++
} }
time.Sleep(common.CheckInterval * time.Second)
} }
return count
} }
func (mr *RunCommand) requestBuild(runner *common.RunnerConfig) *common.Build { func (mr *RunCommand) processRunner(id int, runner *runnerAcquire) {
if runner == nil { defer runner.Release()
return nil
}
if !mr.isHealthy(runner) { // Acquire build slot
return nil build := mr.buildsHelper.acquire(runner)
} if build == nil {
return
count := mr.buildsForRunner(runner)
if runner.Limit > 0 && count >= runner.Limit {
return nil
}
buildData, healthy := mr.network.GetBuild(*runner)
if healthy {
mr.makeHealthy(runner)
} else {
mr.makeUnhealthy(runner)
} }
defer mr.buildsHelper.release(build)
// Receive a new build
buildData, healthy := mr.network.GetBuild(runner.RunnerConfig)
mr.makeHealthy(runner.UniqueID(), healthy)
if buildData == nil { if buildData == nil {
return nil return
}
mr.log().Debugln("Received new build for", runner.ShortDescription(), "build", buildData.ID)
newBuild := &common.Build{
GetBuildResponse: *buildData,
Runner: runner,
BuildAbort: mr.abortBuilds,
Network: mr.network,
} }
return newBuild
}
func (mr *RunCommand) feedRunners(runners chan *common.RunnerConfig) { // Process a build
for !mr.finished { build.GetBuildResponse = *buildData
mr.log().Debugln("Feeding runners to channel") build.BuildAbort = mr.abortBuilds
config := mr.config build.Network = mr.network
for _, runner := range config.Runners { build.Run(mr.config)
runners <- runner
}
time.Sleep(common.CheckInterval * time.Second)
}
} }
func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan *common.RunnerConfig) { func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan *runnerAcquire) {
mr.log().Debugln("Starting worker", id) mr.log().Debugln("Starting worker", id)
for !mr.finished { for !mr.finished {
select { select {
case runner := <-runners: case runner := <-runners:
mr.log().Debugln("Checking runner", runner, "on", id) mr.processRunner(id, runner)
newJob := mr.requestBuild(runner)
if newJob == nil {
break
}
mr.addBuild(newJob)
newJob.Run(mr.config)
mr.removeBuild(newJob)
newJob = nil
// force GC cycle after processing build // force GC cycle after processing build
runtime.GC() runtime.GC()
...@@ -203,7 +124,7 @@ func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan ...@@ -203,7 +124,7 @@ func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan
<-stopWorker <-stopWorker
} }
func (mr *RunCommand) startWorkers(startWorker chan int, stopWorker chan bool, runners chan *common.RunnerConfig) { func (mr *RunCommand) startWorkers(startWorker chan int, stopWorker chan bool, runners chan *runnerAcquire) {
for !mr.finished { for !mr.finished {
id := <-startWorker id := <-startWorker
go mr.processRunners(id, stopWorker, runners) go mr.processRunners(id, stopWorker, runners)
...@@ -222,7 +143,7 @@ func (mr *RunCommand) loadConfig() error { ...@@ -222,7 +143,7 @@ func (mr *RunCommand) loadConfig() error {
} }
mr.healthy = nil mr.healthy = nil
mr.log().Println("Config loaded.") mr.log().Println("Config loaded:", helpers.ToYAML(mr.config))
return nil return nil
} }
...@@ -320,7 +241,7 @@ func (mr *RunCommand) updateConfig() os.Signal { ...@@ -320,7 +241,7 @@ func (mr *RunCommand) updateConfig() os.Signal {
} }
func (mr *RunCommand) Run() { func (mr *RunCommand) Run() {
runners := make(chan *common.RunnerConfig) runners := make(chan *runnerAcquire)
go mr.feedRunners(runners) go mr.feedRunners(runners)
startWorker := make(chan int) startWorker := make(chan int)
......
...@@ -50,7 +50,7 @@ func waitForInterrupts(finished *bool, abortSignal chan os.Signal, doneSignal ch ...@@ -50,7 +50,7 @@ func waitForInterrupts(finished *bool, abortSignal chan os.Signal, doneSignal ch
} }
} }
func (r *RunSingleCommand) processBuild(abortSignal chan os.Signal) { func (r *RunSingleCommand) processBuild(data common.ExecutorData, abortSignal chan os.Signal) {
buildData, healthy := r.network.GetBuild(r.RunnerConfig) buildData, healthy := r.network.GetBuild(r.RunnerConfig)
if !healthy { if !healthy {
log.Println("Runner is not healthy!") log.Println("Runner is not healthy!")
...@@ -76,6 +76,7 @@ func (r *RunSingleCommand) processBuild(abortSignal chan os.Signal) { ...@@ -76,6 +76,7 @@ func (r *RunSingleCommand) processBuild(abortSignal chan os.Signal) {
Runner: &r.RunnerConfig, Runner: &r.RunnerConfig,
BuildAbort: abortSignal, BuildAbort: abortSignal,
Network: r.network, Network: r.network,
ExecutorData: data,
} }
newBuild.AssignID() newBuild.AssignID()
newBuild.Run(config) newBuild.Run(config)
...@@ -92,6 +93,11 @@ func (r *RunSingleCommand) Execute(c *cli.Context) { ...@@ -92,6 +93,11 @@ func (r *RunSingleCommand) Execute(c *cli.Context) {
log.Fatalln("Missing Executor") log.Fatalln("Missing Executor")
} }
executorProvider := common.GetExecutor(r.Executor)
if executorProvider == nil {
log.Fatalln("Uknown executor:", r.Executor)
}
log.Println("Starting runner for", r.URL, "with token", r.ShortDescription(), "...") log.Println("Starting runner for", r.URL, "with token", r.ShortDescription(), "...")
finished := false finished := false
...@@ -101,7 +107,13 @@ func (r *RunSingleCommand) Execute(c *cli.Context) { ...@@ -101,7 +107,13 @@ func (r *RunSingleCommand) Execute(c *cli.Context) {
go waitForInterrupts(&finished, abortSignal, doneSignal) go waitForInterrupts(&finished, abortSignal, doneSignal)
for !finished { for !finished {
r.processBuild(abortSignal) data, err := executorProvider.Acquire(&r.RunnerConfig)
if err != nil {
log.Warningln("Executor update:", err)
}
r.processBuild(data, abortSignal)
executorProvider.Release(&r.RunnerConfig, data)
} }
doneSignal <- 0 doneSignal <- 0
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"errors" "errors"
"fmt" "fmt"
"github.com/Sirupsen/logrus"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/helpers" "gitlab.com/gitlab-org/gitlab-ci-multi-runner/helpers"
"net/url" "net/url"
"os" "os"
...@@ -37,6 +38,7 @@ type Build struct { ...@@ -37,6 +38,7 @@ type Build struct {
CacheDir string `json:"-" yaml:"-"` CacheDir string `json:"-" yaml:"-"`
Hostname string `json:"-" yaml:"-"` Hostname string `json:"-" yaml:"-"`
Runner *RunnerConfig `json:"runner"` Runner *RunnerConfig `json:"runner"`
ExecutorData ExecutorData
// Unique ID for all running builds on this runner // Unique ID for all running builds on this runner
RunnerID int `json:"runner_id"` RunnerID int `json:"runner_id"`
...@@ -212,13 +214,21 @@ func (b *Build) SendBuildLog() { ...@@ -212,13 +214,21 @@ func (b *Build) SendBuildLog() {
} }
func (b *Build) Run(globalConfig *Config) error { func (b *Build) Run(globalConfig *Config) error {
executor := NewExecutor(b.Runner.Executor) logrus.Debugln("Starting a new build:", helpers.ToYAML(b))
if executor == nil { provider := GetExecutor(b.Runner.Executor)
if provider == nil {
b.WriteString("Executor not found: " + b.Runner.Executor) b.WriteString("Executor not found: " + b.Runner.Executor)
b.SendBuildLog() b.SendBuildLog()
return errors.New("executor not found") return errors.New("executor not found")
} }
executor := provider.Create()
if executor == nil {
b.WriteString("Failed to create executor: " + b.Runner.Executor)
b.SendBuildLog()
return errors.New("executor not found")
}
err := executor.Prepare(globalConfig, b.Runner, b) err := executor.Prepare(globalConfig, b.Runner, b)
if err == nil { if err == nil {
err = executor.Start() err = executor.Start()
......
...@@ -4,6 +4,8 @@ import ( ...@@ -4,6 +4,8 @@ import (
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
) )
type ExecutorData interface{}
type Executor interface { type Executor interface {
Prepare(globalConfig *Config, config *RunnerConfig, build *Build) error Prepare(globalConfig *Config, config *RunnerConfig, build *Build) error
Start() error Start() error
...@@ -15,6 +17,8 @@ type Executor interface { ...@@ -15,6 +17,8 @@ type Executor interface {
type ExecutorProvider interface { type ExecutorProvider interface {
CanCreate() bool CanCreate() bool
Create() Executor Create() Executor
Acquire(config *RunnerConfig) (ExecutorData, error)
Release(config *RunnerConfig, data ExecutorData) error
GetFeatures(features *FeaturesInfo) GetFeatures(features *FeaturesInfo)
} }
......
...@@ -18,6 +18,14 @@ func (e DefaultExecutorProvider) Create() common.Executor { ...@@ -18,6 +18,14 @@ func (e DefaultExecutorProvider) Create() common.Executor {
return e.Creator() return e.Creator()
} }
func (e DefaultExecutorProvider) Acquire(config *common.RunnerConfig) (common.ExecutorData, error) {
return nil, nil
}
func (e DefaultExecutorProvider) Release(config *common.RunnerConfig, data common.ExecutorData) error {
return nil
}
func (e DefaultExecutorProvider) GetFeatures(features *common.FeaturesInfo) { func (e DefaultExecutorProvider) GetFeatures(features *common.FeaturesInfo) {
if e.FeaturesUpdater != nil { if e.FeaturesUpdater != nil {
e.FeaturesUpdater(features) e.FeaturesUpdater(features)
......
package helpers package helpers
import ( import (
"testing"
"github.com/BurntSushi/toml" "github.com/BurntSushi/toml"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
) )
func TestTOMLOmitEmpty(t *testing.T) { func TestTOMLOmitEmpty(t *testing.T) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment