Commit beb58a88 authored by Kamil Trzciński's avatar Kamil Trzciński

Fixes the #1078

parent 9bd45fd1
Pipeline #745149 failed with stage
......@@ -6,44 +6,82 @@ import (
)
type buildsHelper struct {
builds []*common.Build
buildsLock sync.Mutex
counts map[string]int
builds []*common.Build
lock sync.Mutex
}
func (b *buildsHelper) count(runner *common.RunnerConfig) int {
count := 0
for _, build := range b.builds {
if build.Runner.ShortDescription() == runner.ShortDescription() {
count++
}
}
return count
}
func (b *buildsHelper) acquire(runner *runnerAcquire) (build *common.Build) {
b.buildsLock.Lock()
defer b.buildsLock.Unlock()
func (b *buildsHelper) acquire(runner *runnerAcquire) bool {
b.lock.Lock()
defer b.lock.Unlock()
// Check number of builds
count := b.count(&runner.RunnerConfig)
count, _ := b.counts[runner.Token]
if runner.Limit > 0 && count >= runner.Limit {
// Too many builds
return
return false
}
// Create a new build
build = &common.Build{
Runner: &runner.RunnerConfig,
ExecutorData: runner.data,
if b.counts == nil {
b.counts = make(map[string]int)
}
build.AssignID(b.builds...)
b.counts[runner.Token]++
return
}
func (b *buildsHelper) release(runner *runnerAcquire) bool {
b.lock.Lock()
defer b.lock.Unlock()
_, ok := b.counts[runner.Token]
if ok {
b.counts[runner.Token]--
return true
}
return false
}
func (b *buildsHelper) addBuild(build *common.Build) {
b.lock.Lock()
defer b.lock.Unlock()
runners := make(map[int]bool)
projectRunners := make(map[int]bool)
for _, otherBuild := range b.builds {
if otherBuild.Runner.Token != build.Runner.Token {
continue
}
runners[otherBuild.RunnerID] = true
if otherBuild.ProjectID != build.ProjectID {
continue
}
projectRunners[otherBuild.ProjectRunnerID] = true
}
for {
if !runners[build.RunnerID] {
break
}
build.RunnerID++
}
for {
if !projectRunners[build.ProjectRunnerID] {
break
}
build.ProjectRunnerID++
}
b.builds = append(b.builds, build)
return
}
func (b *buildsHelper) release(deleteBuild *common.Build) bool {
b.buildsLock.Lock()
defer b.buildsLock.Unlock()
func (b *buildsHelper) removeBuild(deleteBuild *common.Build) bool {
b.lock.Lock()
defer b.lock.Unlock()
for idx, build := range b.builds {
if build == deleteBuild {
......
......@@ -254,8 +254,6 @@ func (c *ExecCommand) Execute(context *cli.Context) {
logrus.Fatalln(err)
}
build.AssignID()
err = build.Run(&common.Config{}, &stdoutTrace{})
if err != nil {
logrus.Fatalln(err)
......
......@@ -34,7 +34,8 @@ type RunCommand struct {
configOptions
network common.Network
healthHelper
buildsHelper
buildsHelper buildsHelper
ServiceName string `short:"n" long:"service" description:"Use different names for different services"`
WorkingDirectory string `short:"d" long:"working-directory" description:"Specify custom working directory"`
......@@ -62,7 +63,7 @@ type RunCommand struct {
}
func (mr *RunCommand) log() *log.Entry {
return log.WithField("builds", len(mr.builds))
return log.WithField("builds", len(mr.buildsHelper.builds))
}
func (mr *RunCommand) feedRunner(runner *common.RunnerConfig, runners chan *runnerAcquire) {
......@@ -99,11 +100,10 @@ func (mr *RunCommand) processRunner(id int, runner *runnerAcquire) (err error) {
defer runner.Release()
// Acquire build slot
build := mr.buildsHelper.acquire(runner)
if build == nil {
if !mr.buildsHelper.acquire(runner) {
return
}
defer mr.buildsHelper.release(build)
defer mr.buildsHelper.release(runner)
// Receive a new build
buildData, healthy := mr.network.GetBuild(runner.RunnerConfig)
......@@ -116,12 +116,21 @@ func (mr *RunCommand) processRunner(id int, runner *runnerAcquire) (err error) {
trace := mr.network.ProcessBuild(runner.RunnerConfig, buildData.ID)
defer trace.Fail(err)
// Create a new build
build := common.Build{
GetBuildResponse: *buildData,
Runner: &runner.RunnerConfig,
ExecutorData: runner.data,
BuildAbort: mr.abortBuilds,
Network: mr.network,
}
// Add build to list of builds to assign numbers
mr.buildsHelper.addBuild(build)
defer mr.buildsHelper.removeBuild(build)
// Process a build
build.GetBuildResponse = *buildData
build.BuildAbort = mr.abortBuilds
build.Network = mr.network
err = build.Run(mr.config, trace)
return
return build.Run(mr.config, trace)
}
func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan *runnerAcquire) {
......@@ -186,7 +195,7 @@ func (mr *RunCommand) checkConfig() (err error) {
}
func (mr *RunCommand) Start(s service.Service) error {
mr.builds = []*common.Build{}
mr.acquires = []*common.Build{}
mr.abortBuilds = make(chan os.Signal)
mr.runSignal = make(chan os.Signal, 1)
mr.reloadSignal = make(chan os.Signal, 1)
......
......@@ -78,7 +78,6 @@ func (r *RunSingleCommand) processBuild(data common.ExecutorData, abortSignal ch
Network: r.network,
ExecutorData: data,
}
newBuild.AssignID()
trace := r.network.ProcessBuild(r.RunnerConfig, buildData.ID)
defer trace.Fail(err)
......
......@@ -41,37 +41,6 @@ type Build struct {
ProjectRunnerID int `json:"project_runner_id"`
}
func (b *Build) AssignID(otherBuilds ...*Build) {
runners := make(map[int]bool)
projectRunners := make(map[int]bool)
for _, otherBuild := range otherBuilds {
if otherBuild.Runner.ShortDescription() != b.Runner.ShortDescription() {
continue
}
runners[otherBuild.RunnerID] = true
if otherBuild.ProjectID != b.ProjectID {
continue
}
projectRunners[otherBuild.ProjectRunnerID] = true
}
for {
if !runners[b.RunnerID] {
break
}
b.RunnerID++
}
for {
if !projectRunners[b.ProjectRunnerID] {
break
}
b.ProjectRunnerID++
}
}
func (b *Build) ProjectUniqueName() string {
return fmt.Sprintf("runner-%s-project-%d-concurrent-%d",
b.Runner.ShortDescription(), b.ProjectID, b.ProjectRunnerID)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment