Commit 35fff541 authored by Tomasz Maczukin's avatar Tomasz Maczukin

Merge branch 'limit-number-of-concurrent-requests' into 'master'

Limit number of concurrent requests to builds/register.json

See merge request !518
parents 39f8affe 75efba7a
......@@ -10,38 +10,78 @@ import (
var numBuildsDesc = prometheus.NewDesc("ci_runner_builds", "The current number of running builds.", []string{"state", "stage"}, nil)
type runnerCounter struct {
builds int
requests int
}
type buildsHelper struct {
counts map[string]int
builds []*common.Build
lock sync.Mutex
counters map[string]*runnerCounter
builds []*common.Build
lock sync.Mutex
}
func (b *buildsHelper) getRunnerCounter(runner *common.RunnerConfig) *runnerCounter {
if b.counters == nil {
b.counters = make(map[string]*runnerCounter)
}
counter, _ := b.counters[runner.Token]
if counter == nil {
counter = &runnerCounter{}
b.counters[runner.Token] = counter
}
return counter
}
func (b *buildsHelper) acquire(runner *common.RunnerConfig) bool {
func (b *buildsHelper) acquireBuild(runner *common.RunnerConfig) bool {
b.lock.Lock()
defer b.lock.Unlock()
// Check number of builds
count, _ := b.counts[runner.Token]
if runner.Limit > 0 && count >= runner.Limit {
counter := b.getRunnerCounter(runner)
if runner.Limit > 0 && counter.builds >= runner.Limit {
// Too many builds
return false
}
// Create a new build
if b.counts == nil {
b.counts = make(map[string]int)
counter.builds++
return true
}
func (b *buildsHelper) releaseBuild(runner *common.RunnerConfig) bool {
b.lock.Lock()
defer b.lock.Unlock()
counter := b.getRunnerCounter(runner)
if counter.builds > 0 {
counter.builds--
return true
}
return false
}
func (b *buildsHelper) acquireRequest(runner *common.RunnerConfig) bool {
b.lock.Lock()
defer b.lock.Unlock()
counter := b.getRunnerCounter(runner)
if counter.requests >= runner.GetRequestConcurrency() {
return false
}
b.counts[runner.Token]++
counter.requests++
return true
}
func (b *buildsHelper) release(runner *common.RunnerConfig) bool {
func (b *buildsHelper) releaseRequest(runner *common.RunnerConfig) bool {
b.lock.Lock()
defer b.lock.Unlock()
_, ok := b.counts[runner.Token]
if ok {
b.counts[runner.Token]--
counter := b.getRunnerCounter(runner)
if counter.requests > 0 {
counter.requests--
return true
}
return false
......
......@@ -6,6 +6,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/common"
)
......@@ -19,3 +20,94 @@ func TestBuildsHelperCollect(t *testing.T) {
b.Collect(ch)
assert.Len(t, ch, 1)
}
func TestBuildsHelperAcquireRequestWithLimit(t *testing.T) {
runner := common.RunnerConfig{
RequestConcurrency: 2,
}
b := &buildsHelper{}
result := b.acquireRequest(&runner)
require.True(t, result)
result = b.acquireRequest(&runner)
require.True(t, result)
result = b.acquireRequest(&runner)
require.False(t, result, "allow only two requests")
result = b.releaseRequest(&runner)
require.True(t, result)
result = b.releaseRequest(&runner)
require.True(t, result)
result = b.releaseRequest(&runner)
require.False(t, result, "release only two requests")
}
func TestBuildsHelperAcquireRequestWithDefault(t *testing.T) {
runner := common.RunnerConfig{
RequestConcurrency: 0,
}
b := &buildsHelper{}
result := b.acquireRequest(&runner)
require.True(t, result)
result = b.acquireRequest(&runner)
require.False(t, result, "allow only one request")
result = b.releaseRequest(&runner)
require.True(t, result)
result = b.releaseRequest(&runner)
require.False(t, result, "release only one request")
result = b.acquireRequest(&runner)
require.True(t, result)
result = b.releaseRequest(&runner)
require.True(t, result)
result = b.releaseRequest(&runner)
require.False(t, result, "nothing to release")
}
func TestBuildsHelperAcquireBuildWithLimit(t *testing.T) {
runner := common.RunnerConfig{
Limit: 1,
}
b := &buildsHelper{}
result := b.acquireBuild(&runner)
require.True(t, result)
result = b.acquireBuild(&runner)
require.False(t, result, "allow only one build")
result = b.releaseBuild(&runner)
require.True(t, result)
result = b.releaseBuild(&runner)
require.False(t, result, "release only one build")
}
func TestBuildsHelperAcquireBuildUnlimited(t *testing.T) {
runner := common.RunnerConfig{
Limit: 0,
}
b := &buildsHelper{}
result := b.acquireBuild(&runner)
require.True(t, result)
result = b.acquireBuild(&runner)
require.True(t, result)
result = b.releaseBuild(&runner)
require.True(t, result)
result = b.releaseBuild(&runner)
require.True(t, result)
}
......@@ -97,6 +97,17 @@ func (mr *RunCommand) feedRunners(runners chan *common.RunnerConfig) {
}
}
func (mr *RunCommand) requestJob(runner *common.RunnerConfig) (*common.JobResponse, bool) {
if !mr.buildsHelper.acquireRequest(runner) {
return nil, false
}
defer mr.buildsHelper.releaseRequest(runner)
jobData, healthy := mr.network.RequestJob(*runner)
mr.makeHealthy(runner.UniqueID(), healthy)
return jobData, true
}
func (mr *RunCommand) processRunner(id int, runner *common.RunnerConfig, runners chan *common.RunnerConfig) (err error) {
provider := common.GetExecutor(runner.Executor)
if provider == nil {
......@@ -111,14 +122,20 @@ func (mr *RunCommand) processRunner(id int, runner *common.RunnerConfig, runners
defer provider.Release(runner, context)
// Acquire build slot
if !mr.buildsHelper.acquire(runner) {
if !mr.buildsHelper.acquireBuild(runner) {
mr.log().WithField("runner", runner.ShortDescription()).
Debugln("Failed to request job: runner limit meet")
return
}
defer mr.buildsHelper.release(runner)
defer mr.buildsHelper.releaseBuild(runner)
// Receive a new build
jobData, healthy := mr.network.RequestJob(*runner)
mr.makeHealthy(runner.UniqueID(), healthy)
jobData, result := mr.requestJob(runner)
if !result {
mr.log().WithField("runner", runner.ShortDescription()).
Debugln("Failed to request job: runner requestConcurrency meet")
return
}
if jobData == nil {
return
}
......
......@@ -193,9 +193,10 @@ type RunnerSettings struct {
}
type RunnerConfig struct {
Name string `toml:"name" json:"name" short:"name" long:"description" env:"RUNNER_NAME" description:"Runner name"`
Limit int `toml:"limit,omitzero" json:"limit" long:"limit" env:"RUNNER_LIMIT" description:"Maximum number of builds processed by this runner"`
OutputLimit int `toml:"output_limit,omitzero" long:"output-limit" env:"RUNNER_OUTPUT_LIMIT" description:"Maximum build trace size in kilobytes"`
Name string `toml:"name" json:"name" short:"name" long:"description" env:"RUNNER_NAME" description:"Runner name"`
Limit int `toml:"limit,omitzero" json:"limit" long:"limit" env:"RUNNER_LIMIT" description:"Maximum number of builds processed by this runner"`
OutputLimit int `toml:"output_limit,omitzero" long:"output-limit" env:"RUNNER_OUTPUT_LIMIT" description:"Maximum build trace size in kilobytes"`
RequestConcurrency int `toml:"request_concurrency,omitzero" long:"request-concurrency" env:"RUNNER_REQUEST_CONCURRENCY" description:"Maximum concurrency for job requests"`
RunnerCredentials
RunnerSettings
......@@ -305,6 +306,13 @@ func (c *RunnerConfig) String() string {
return fmt.Sprintf("%v url=%v token=%v executor=%v", c.Name, c.URL, c.Token, c.Executor)
}
func (c *RunnerConfig) GetRequestConcurrency() int {
if c.RequestConcurrency <= 0 {
return 1
}
return c.RequestConcurrency
}
func (c *RunnerConfig) GetVariables() JobVariables {
var variables JobVariables
......
......@@ -45,6 +45,7 @@ This defines one runner entry.
| `cache_dir` | directory where build caches will be stored in context of selected executor (Locally, Docker, SSH). If the `docker` executor is used, this directory needs to be included in its `volumes` parameter. |
| `environment` | append or overwrite environment variables |
| `disable_verbose` | don't print run commands |
| `request_concurrency` | limit number of concurrent requests for new jobs from GitLab (default 1) |
| `output_limit` | set maximum build log size in kilobytes, by default set to 4096 (4MB) |
| `pre_clone_script` | commands to be executed on the runner before cloning the Git repository. this can be used to adjust the Git client configuration first, for example. To insert multiple commands, use a (triple-quoted) multi-line string or "\n" character. |
| `pre_build_script` | commands to be executed on the runner after cloning the Git repository, but before executing the build. To insert multiple commands, use a (triple-quoted) multi-line string or "\n" character. |
......
......@@ -12,6 +12,7 @@ import (
"path/filepath"
"runtime"
"strconv"
"sync"
"github.com/Sirupsen/logrus"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/common"
......@@ -22,13 +23,17 @@ const clientError = -100
type GitLabClient struct {
clients map[string]*client
lock sync.Mutex
}
func (n *GitLabClient) getClient(credentials requestCredentials) (c *client, err error) {
n.lock.Lock()
defer n.lock.Unlock()
if n.clients == nil {
n.clients = make(map[string]*client)
}
key := fmt.Sprintf("%s_%s", credentials.GetURL(), credentials.GetTLSCAFile())
key := fmt.Sprintf("%s_%s_%s", credentials.GetURL(), credentials.GetToken(), credentials.GetTLSCAFile())
c = n.clients[key]
if c == nil {
c, err = newClient(credentials)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment