Commit 857d72e5 authored by Kamil Trzciński's avatar Kamil Trzciński

Add a multiple prometheus metrics:

Metrics from Machine Provider (Auto-scaling):
- ci_machines_provider{state=acquired|creating|idel|used|removing} - current number of machines in given state,
- ci_machines{type=created|used|removed} - total number of machines,

Metrics from Builds with breakdown on stage and runtime state:
- ci_runner_builds{state=pending|running|finished|canceled|terminated|timedout, stage=prepare_script|build_script|after_script|archive_cache|upload_artifacts} - current number of builds in given state
parent c1f3cdd1
Pipeline #5240722 failed with stages
in 10 minutes and 5 seconds
package commands
import (
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/common"
"sync"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/common"
"github.com/prometheus/client_golang/prometheus"
)
var numBuildsDesc = prometheus.NewDesc("ci_runner_builds", "The current number of running builds.", []string{"state", "stage"}, nil)
type buildsHelper struct {
counts map[string]int
builds []*common.Build
......@@ -98,3 +103,34 @@ func (b *buildsHelper) buildsCount() int {
return len(b.builds)
}
func (b *buildsHelper) statesAndStages() (map[common.BuildRuntimeState]map[common.ShellScriptStage]int) {
b.lock.Lock()
defer b.lock.Unlock()
data := make(map[common.BuildRuntimeState]map[common.ShellScriptStage]int)
for _, build := range b.builds {
if data[build.CurrentState] == nil {
data[build.CurrentState] = make(map[common.ShellScriptStage]int)
}
data[build.CurrentState][build.CurrentStage]++
}
return data
}
// Describe implements prometheus.Collector.
func (b *buildsHelper) Describe(ch chan<- *prometheus.Desc) {
ch <- numBuildsDesc
}
// Collect implements prometheus.Collector.
func (b *buildsHelper) Collect(ch chan<- prometheus.Metric) {
data := b.statesAndStages()
for state, scripts := range data {
for stage, count := range scripts {
ch <- prometheus.MustNewConstMetric(numBuildsDesc, prometheus.GaugeValue, float64(count),
"state="+string(state), "stage="+string(stage))
}
}
}
......@@ -25,8 +25,6 @@ import (
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/network"
)
var numBuildsDesc = prometheus.NewDesc("ci_runner_builds", "The current number of running builds.", nil, nil)
type RunCommand struct {
configOptionsWithMetricsServer
network common.Network
......@@ -59,22 +57,14 @@ type RunCommand struct {
// runFinished is used to notify that Run() did finish
runFinished chan bool
currentWorkers int
}
func (mr *RunCommand) log() *log.Entry {
return log.WithField("builds", mr.buildsHelper.buildsCount())
}
// Describe implements prometheus.Collector.
func (mr *RunCommand) Describe(ch chan<- *prometheus.Desc) {
ch <- numBuildsDesc
}
// Collect implements prometheus.Collector.
func (mr *RunCommand) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(numBuildsDesc, prometheus.GaugeValue, float64(mr.buildsHelper.buildsCount()))
}
func (mr *RunCommand) feedRunner(runner *common.RunnerConfig, runners chan *common.RunnerConfig) {
if !mr.isHealthy(runner.UniqueID()) {
return
......@@ -266,25 +256,25 @@ func (mr *RunCommand) Start(s service.Service) error {
return nil
}
func (mr *RunCommand) updateWorkers(currentWorkers, workerIndex *int, startWorker chan int, stopWorker chan bool) os.Signal {
func (mr *RunCommand) updateWorkers(workerIndex *int, startWorker chan int, stopWorker chan bool) os.Signal {
buildLimit := mr.config.Concurrent
for *currentWorkers > buildLimit {
for mr.currentWorkers > buildLimit {
select {
case stopWorker <- true:
case signaled := <-mr.runSignal:
return signaled
}
*currentWorkers--
mr.currentWorkers--
}
for *currentWorkers < buildLimit {
for mr.currentWorkers < buildLimit {
select {
case startWorker <- *workerIndex:
case signaled := <-mr.runSignal:
return signaled
}
*currentWorkers++
mr.currentWorkers++
*workerIndex++
}
......@@ -328,7 +318,7 @@ func (mr *RunCommand) serveMetrics() error {
registry := prometheus.NewRegistry()
// Metrics about the runner's business logic.
registry.MustRegister(mr)
registry.MustRegister(&mr.buildsHelper)
// Metrics about the program's build version.
registry.MustRegister(common.AppVersion.NewMetricsCollector())
// Go-specific metrics about the process (GC stats, goroutines, etc.).
......@@ -336,6 +326,13 @@ func (mr *RunCommand) serveMetrics() error {
// Go-unrelated process metrics (memory usage, file descriptors, etc.).
registry.MustRegister(prometheus.NewProcessCollector(os.Getpid(), ""))
// Register all executor provider collectors
for _, provider := range common.GetExecutorProviders() {
if collector, ok := provider.(prometheus.Collector); ok && collector != nil {
registry.MustRegister(collector)
}
}
http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
go func() {
log.Fatalln(http.Serve(listener, nil))
......@@ -364,11 +361,10 @@ func (mr *RunCommand) Run() {
stopWorker := make(chan bool)
go mr.startWorkers(startWorker, stopWorker, runners)
currentWorkers := 0
workerIndex := 0
for mr.stopSignal == nil {
signaled := mr.updateWorkers(&currentWorkers, &workerIndex, startWorker, stopWorker)
signaled := mr.updateWorkers(&workerIndex, startWorker, stopWorker)
if signaled != nil {
break
}
......@@ -380,9 +376,9 @@ func (mr *RunCommand) Run() {
}
// Wait for workers to shutdown
for currentWorkers > 0 {
for mr.currentWorkers > 0 {
stopWorker <- true
currentWorkers--
mr.currentWorkers--
}
mr.log().Println("All workers stopped. Can exit now")
mr.runFinished <- true
......
......@@ -14,7 +14,7 @@ import (
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/helpers"
)
type BuildState string
type BuildRuntimeState string
type GitStrategy int
......@@ -25,10 +25,12 @@ const (
)
const (
Pending BuildState = "pending"
Running = "running"
Failed = "failed"
Success = "success"
RuntimePending BuildRuntimeState = "pending"
RuntimeRunning = "running"
RuntimeFinished = "finished"
RuntimeCanceled = "canceled"
RuntimeTerminated = "terminated"
RuntimeTimedout = "timedout"
)
type Build struct {
......@@ -44,10 +46,13 @@ type Build struct {
ExecutorData ExecutorData
// Unique ID for all running builds on this runner
RunnerID int `json:"runner_id"`
RunnerID int `json:"runner_id"`
// Unique ID for all running builds on this runner and this project
ProjectRunnerID int `json:"project_runner_id"`
CurrentStage ShellScriptStage
CurrentState BuildRuntimeState
}
func (b *Build) Log() *logrus.Entry {
......@@ -109,7 +114,9 @@ func (b *Build) StartBuild(rootDir, cacheDir string, sharedDir bool) {
b.CacheDir = path.Join(cacheDir, b.ProjectUniqueDir(false))
}
func (b *Build) executeShellScript(scriptType ShellScriptType, executor Executor, abort chan interface{}) error {
func (b *Build) executeShellScript(scriptType ShellScriptStage, executor Executor, abort chan interface{}) error {
b.CurrentStage = scriptType
shell := executor.Shell()
if shell == nil {
return errors.New("No shell defined")
......@@ -188,6 +195,8 @@ func (b *Build) executeScript(executor Executor, abort chan interface{}) error {
}
func (b *Build) run(executor Executor) (err error) {
b.CurrentState = RuntimeRunning
buildTimeout := b.Timeout
if buildTimeout <= 0 {
buildTimeout = DefaultTimeout
......@@ -206,14 +215,18 @@ func (b *Build) run(executor Executor) (err error) {
select {
case <-b.Trace.Aborted():
err = &BuildError{Inner: errors.New("canceled")}
b.CurrentStage = RuntimeCanceled
case <-time.After(time.Duration(buildTimeout) * time.Second):
err = &BuildError{Inner: fmt.Errorf("execution took longer than %v seconds", buildTimeout)}
b.CurrentStage = RuntimeTimedout
case signal := <-b.SystemInterrupt:
err = fmt.Errorf("aborted: %v", signal)
b.CurrentStage = RuntimeTerminated
case err = <-buildFinish:
b.CurrentState = RuntimeFinished
return err
}
......@@ -262,6 +275,8 @@ func (b *Build) Run(globalConfig *Config, trace BuildTrace) (err error) {
logger := NewBuildLogger(trace, b.Log())
logger.Println("Running with " + AppVersion.Line() + helpers.ANSI_RESET)
b.CurrentState = RuntimePending
defer func() {
if _, ok := err.(*BuildError); ok {
logger.SoftErrorln("Build failed:", err)
......
......@@ -73,6 +73,15 @@ func GetExecutors() []string {
return names
}
func GetExecutorProviders() (providers []ExecutorProvider) {
if executors != nil {
for _, executorProvider := range executors {
providers = append(providers, executorProvider)
}
}
return
}
func NewExecutor(executor string) Executor {
provider := GetExecutor(executor)
if provider != nil {
......
......@@ -44,7 +44,7 @@ func (m *MockShell) GetConfiguration(info ShellScriptInfo) (*ShellConfiguration,
return r0, r1
}
func (m *MockShell) GenerateScript(scriptType ShellScriptType, info ShellScriptInfo) (string, error) {
func (m *MockShell) GenerateScript(scriptType ShellScriptStage, info ShellScriptInfo) (string, error) {
ret := m.Called(scriptType, info)
r0 := ret.Get(0).(string)
......
......@@ -9,6 +9,14 @@ import (
type UpdateState int
type UploadState int
type DownloadState int
type BuildState string
const (
Pending BuildState = "pending"
Running = "running"
Failed = "failed"
Success = "success"
)
const (
UpdateSucceeded UpdateState = iota
......
......@@ -22,14 +22,14 @@ const (
LoginShell
)
type ShellScriptType string
type ShellScriptStage string
const (
ShellPrepareScript ShellScriptType = "prepare_script"
ShellBuildScript = "build_script"
ShellAfterScript = "after_script"
ShellArchiveCache = "archive_cache"
ShellUploadArtifacts = "upload_artifacts"
ShellPrepareScript ShellScriptStage = "prepare_script"
ShellBuildScript = "build_script"
ShellAfterScript = "after_script"
ShellArchiveCache = "archive_cache"
ShellUploadArtifacts = "upload_artifacts"
)
func (s *ShellConfiguration) GetCommandWithArguments() []string {
......@@ -61,7 +61,7 @@ type Shell interface {
IsDefault() bool
GetConfiguration(info ShellScriptInfo) (*ShellConfiguration, error)
GenerateScript(scriptType ShellScriptType, info ShellScriptInfo) (string, error)
GenerateScript(scriptType ShellScriptStage, info ShellScriptInfo) (string, error)
}
var shells map[string]Shell
......@@ -105,7 +105,7 @@ func GetShellConfiguration(info ShellScriptInfo) (*ShellConfiguration, error) {
return shell.GetConfiguration(info)
}
func GenerateShellScript(scriptType ShellScriptType, info ShellScriptInfo) (string, error) {
func GenerateShellScript(scriptType ShellScriptStage, info ShellScriptInfo) (string, error) {
shell := GetShell(info.Shell)
if shell == nil {
return "", fmt.Errorf("shell %s not found", info.Shell)
......
package machine
import (
"github.com/prometheus/client_golang/prometheus"
)
var machinesDataDesc = prometheus.NewDesc("ci_machines_provider", "The current number of machines in given state.", []string{"state"}, nil)
var providerStatisticsDesc = prometheus.NewDesc("ci_machines_provided", "The total number of machines created.", []string{"type"}, nil)
func (m *machineProvider) collectDetails() (data machinesData) {
m.lock.RLock()
defer m.lock.RUnlock()
for _, details := range m.details {
data.Add(details.State)
}
return
}
// Describe implements prometheus.Collector.
func (m *machineProvider) Describe(ch chan<- *prometheus.Desc) {
ch <- machinesDataDesc
ch <- providerStatisticsDesc
}
// Collect implements prometheus.Collector.
func (m *machineProvider) Collect(ch chan<- prometheus.Metric) {
data := m.collectDetails()
ch <- prometheus.MustNewConstMetric(machinesDataDesc, prometheus.GaugeValue, float64(data.Acquired), "state=acquired")
ch <- prometheus.MustNewConstMetric(machinesDataDesc, prometheus.GaugeValue, float64(data.Creating), "state=creating")
ch <- prometheus.MustNewConstMetric(machinesDataDesc, prometheus.GaugeValue, float64(data.Idle), "state=idle")
ch <- prometheus.MustNewConstMetric(machinesDataDesc, prometheus.GaugeValue, float64(data.Used), "state=used")
ch <- prometheus.MustNewConstMetric(machinesDataDesc, prometheus.GaugeValue, float64(data.Removing), "state=removing")
ch <- prometheus.MustNewConstMetric(machinesDataDesc, prometheus.CounterValue, float64(m.statistics.Created), "type=created")
ch <- prometheus.MustNewConstMetric(machinesDataDesc, prometheus.CounterValue, float64(m.statistics.Used), "type=used")
ch <- prometheus.MustNewConstMetric(machinesDataDesc, prometheus.CounterValue, float64(m.statistics.Removed), "type=removed")
}
package machine
import (
"testing"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
)
func TestIfMachineProviderExposesCollectInterface(t *testing.T) {
var provider common.ExecutorProvider
provider = &machineProvider{}
collector, ok := provider.(prometheus.Collector)
assert.True(t, ok)
assert.NotNil(t, collector)
}
func TestMachineProviderDescribe(t *testing.T) {
ch := make(chan *prometheus.Desc, 10)
provider := &machineProvider{}
provider.Describe(ch)
assert.Len(t, ch, 2)
}
func TestMachineProviderCollect(t *testing.T) {
ch := make(chan prometheus.Metric, 50)
provider := &machineProvider{}
provider.Collect(ch)
assert.Len(t, ch, 9)
}
......@@ -11,12 +11,13 @@ import (
)
type machineDetails struct {
Name string
Created time.Time `yaml:"-"`
Used time.Time `yaml:"-"`
UsedCount int
State machineState
Reason string
Name string
Created time.Time `yaml:"-"`
Used time.Time `yaml:"-"`
UsedCount int
State machineState
Reason string
RetryCount int
}
func (m *machineDetails) isUsed() bool {
......
......@@ -12,11 +12,18 @@ import (
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/helpers/docker"
)
type machineProviderStatistics struct {
Created int
Used int
Removed int
}
type machineProvider struct {
machine docker_helpers.Machine
details machinesDetails
lock sync.RWMutex
acquireLock sync.Mutex
statistics machineProviderStatistics
// provider stores a real executor that is used to start run the builds
provider common.ExecutorProvider
}
......@@ -52,6 +59,7 @@ func (m *machineProvider) create(config *common.RunnerConfig, state machineState
details = m.machineDetails(name, true)
details.State = machineStateCreating
details.UsedCount = 0
details.RetryCount = 0
errCh = make(chan error, 1)
// Create machine asynchronously
......@@ -59,6 +67,7 @@ func (m *machineProvider) create(config *common.RunnerConfig, state machineState
started := time.Now()
err := m.machine.Create(config.Machine.MachineDriver, details.Name, config.Machine.MachineOptions...)
for i := 0; i < 3 && err != nil; i++ {
details.RetryCount++
logrus.WithField("name", details.Name).WithError(err).
Warningln("Machine creation failed, trying to provision")
time.Sleep(provisionRetryInterval)
......@@ -77,7 +86,9 @@ func (m *machineProvider) create(config *common.RunnerConfig, state machineState
logrus.WithField("time", time.Since(started)).
WithField("name", details.Name).
WithField("now", time.Now()).
WithField("retries", details.RetryCount).
Infoln("Machine created")
m.statistics.Created++
}
errCh <- err
}()
......@@ -152,6 +163,7 @@ func (m *machineProvider) finalizeRemoval(details *machineDetails) {
WithField("used", time.Since(details.Used)).
WithField("reason", details.Reason).
Warningln("Retrying removal")
details.RetryCount++
}
m.lock.Lock()
......@@ -163,7 +175,10 @@ func (m *machineProvider) finalizeRemoval(details *machineDetails) {
WithField("used", time.Since(details.Used)).
WithField("reason", details.Reason).
WithField("now", time.Now()).
WithField("retries", details.RetryCount).
Infoln("Machine removed")
m.statistics.Removed++
}
func (m *machineProvider) remove(machineName string, reason ...interface{}) error {
......@@ -177,6 +192,7 @@ func (m *machineProvider) remove(machineName string, reason ...interface{}) erro
details.Reason = fmt.Sprint(reason...)
details.State = machineStateRemoving
details.RetryCount = 0
logrus.WithField("name", machineName).
WithField("created", time.Since(details.Created)).
WithField("used", time.Since(details.Used)).
......@@ -331,6 +347,7 @@ func (m *machineProvider) Use(config *common.RunnerConfig, data common.ExecutorD
details.State = machineStateUsed
details.Used = time.Now()
details.UsedCount++
m.statistics.Used++
return
}
......
......@@ -453,7 +453,7 @@ func (b *AbstractShell) writeUploadArtifactsScript(w ShellWriter, info common.Sh
return
}
func (b *AbstractShell) writeScript(w ShellWriter, scriptType common.ShellScriptType, info common.ShellScriptInfo) (err error) {
func (b *AbstractShell) writeScript(w ShellWriter, scriptType common.ShellScriptStage, info common.ShellScriptInfo) (err error) {
switch scriptType {
case common.ShellPrepareScript:
return b.writePrepareScript(w, info)
......
......@@ -217,7 +217,7 @@ func (b *BashShell) GetConfiguration(info common.ShellScriptInfo) (script *commo
return
}
func (b *BashShell) GenerateScript(scriptType common.ShellScriptType, info common.ShellScriptInfo) (script string, err error) {
func (b *BashShell) GenerateScript(scriptType common.ShellScriptStage, info common.ShellScriptInfo) (script string, err error) {
w := &BashWriter{
TemporaryPath: info.Build.FullProjectDir() + ".tmp",
}
......
......@@ -199,7 +199,7 @@ func (b *CmdShell) GetConfiguration(info common.ShellScriptInfo) (script *common
return
}
func (b *CmdShell) GenerateScript(scriptType common.ShellScriptType, info common.ShellScriptInfo) (script string, err error) {
func (b *CmdShell) GenerateScript(scriptType common.ShellScriptStage, info common.ShellScriptInfo) (script string, err error) {
w := &CmdWriter{
TemporaryPath: info.Build.FullProjectDir() + ".tmp",
}
......
......@@ -213,7 +213,7 @@ func (b *PowerShell) GetConfiguration(info common.ShellScriptInfo) (script *comm
return
}
func (b *PowerShell) GenerateScript(scriptType common.ShellScriptType, info common.ShellScriptInfo) (script string, err error) {
func (b *PowerShell) GenerateScript(scriptType common.ShellScriptStage, info common.ShellScriptInfo) (script string, err error) {
w := &PsWriter{
TemporaryPath: info.Build.FullProjectDir() + ".tmp",
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment