multi.go 16.8 KB
Newer Older
Kamil Trzciński's avatar
Kamil Trzciński committed
1
package commands
2 3

import (
4 5
	"errors"
	"fmt"
6 7
	"net"
	"net/http"
8
	_ "net/http/pprof" // pprof package adds everything itself inside its init() function
9 10
	"os"
	"os/signal"
11
	"runtime"
12
	"syscall"
13
	"time"
Kamil Trzciński's avatar
Kamil Trzciński committed
14

15
	"github.com/ayufan/golang-kardianos-service"
16 17
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promhttp"
18
	log "github.com/sirupsen/logrus"
19
	"github.com/urfave/cli"
Kamil Trzciński's avatar
Kamil Trzciński committed
20

21 22
	"gitlab.com/gitlab-org/gitlab-runner/common"
	"gitlab.com/gitlab-org/gitlab-runner/helpers"
23
	"gitlab.com/gitlab-org/gitlab-runner/helpers/certificate"
24 25 26 27 28
	"gitlab.com/gitlab-org/gitlab-runner/helpers/cli"
	prometheus_helper "gitlab.com/gitlab-org/gitlab-runner/helpers/prometheus"
	"gitlab.com/gitlab-org/gitlab-runner/helpers/sentry"
	"gitlab.com/gitlab-org/gitlab-runner/helpers/service"
	"gitlab.com/gitlab-org/gitlab-runner/network"
29
	"gitlab.com/gitlab-org/gitlab-runner/session"
30 31
)

32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
var (
	concurrentDesc = prometheus.NewDesc(
		"gitlab_runner_concurrent",
		"The current value of concurrent setting",
		nil,
		nil,
	)

	limitDesc = prometheus.NewDesc(
		"gitlab_runner_limit",
		"The current value of concurrent setting",
		[]string{"runner"},
		nil,
	)
)

48
type RunCommand struct {
49
	configOptionsWithListenAddress
50
	network common.Network
51
	healthHelper
Kamil Trzciński's avatar
Kamil Trzciński committed
52 53

	buildsHelper buildsHelper
54 55 56 57 58 59

	ServiceName      string `short:"n" long:"service" description:"Use different names for different services"`
	WorkingDirectory string `short:"d" long:"working-directory" description:"Specify custom working directory"`
	User             string `short:"u" long:"user" description:"Use specific user to execute shell scripts"`
	Syslog           bool   `long:"syslog" description:"Log to syslog"`

60 61
	sentryLogHook     sentry.LogHook
	prometheusLogHook prometheus_helper.LogHook
62

63 64
	failuresCollector               *prometheus_helper.FailuresCollector
	networkRequestStatusesCollector prometheus.Collector
65

66 67
	sessionServer *session.Server

68 69 70 71 72 73 74 75 76 77 78 79
	// abortBuilds is used to abort running builds
	abortBuilds chan os.Signal

	// runSignal is used to abort current operation (scaling workers, waiting for config)
	runSignal chan os.Signal

	// reloadSignal is used to trigger forceful config reload
	reloadSignal chan os.Signal

	// stopSignals is to catch a signals notified to process: SIGTERM, SIGQUIT, Interrupt, Kill
	stopSignals chan os.Signal

80 81 82
	// stopSignal is used to preserve the signal that was used to stop the
	// process In case this is SIGQUIT it makes to finish all builds and session
	// server.
83 84 85 86
	stopSignal os.Signal

	// runFinished is used to notify that Run() did finish
	runFinished chan bool
87 88

	currentWorkers int
Kamil Trzciński's avatar
Kamil Trzciński committed
89 90
}

91
func (mr *RunCommand) log() *log.Entry {
92
	return log.WithField("builds", mr.buildsHelper.buildsCount())
Kamil Trzciński's avatar
Kamil Trzciński committed
93 94
}

95
func (mr *RunCommand) feedRunner(runner *common.RunnerConfig, runners chan *common.RunnerConfig) {
96 97
	if !mr.isHealthy(runner.UniqueID()) {
		return
Kamil Trzciński's avatar
Kamil Trzciński committed
98 99
	}

100
	runners <- runner
101
}
102

103
func (mr *RunCommand) feedRunners(runners chan *common.RunnerConfig) {
104
	for mr.stopSignal == nil {
105 106
		mr.log().Debugln("Feeding runners to channel")
		config := mr.config
107 108 109

		// If no runners wait full interval to test again
		if len(config.Runners) == 0 {
110
			time.Sleep(config.GetCheckInterval())
111 112 113
			continue
		}

114 115
		interval := config.GetCheckInterval() / time.Duration(len(config.Runners))

116
		// Feed runner with waiting exact amount of time
117 118
		for _, runner := range config.Runners {
			mr.feedRunner(runner, runners)
119
			time.Sleep(interval)
120
		}
Kamil Trzciński's avatar
Kamil Trzciński committed
121
	}
122 123
}

124
func (mr *RunCommand) requestJob(runner *common.RunnerConfig, sessionInfo *common.SessionInfo) (*common.JobResponse, bool) {
125 126 127 128 129
	if !mr.buildsHelper.acquireRequest(runner) {
		return nil, false
	}
	defer mr.buildsHelper.releaseRequest(runner)

130
	jobData, healthy := mr.network.RequestJob(*runner, sessionInfo)
131 132 133 134
	mr.makeHealthy(runner.UniqueID(), healthy)
	return jobData, true
}

135
func (mr *RunCommand) processRunner(id int, runner *common.RunnerConfig, runners chan *common.RunnerConfig) (err error) {
136 137 138 139 140 141 142 143 144 145 146
	provider := common.GetExecutor(runner.Executor)
	if provider == nil {
		return
	}

	context, err := provider.Acquire(runner)
	if err != nil {
		log.Warningln("Failed to update executor", runner.Executor, "for", runner.ShortDescription(), err)
		return
	}
	defer provider.Release(runner, context)
Kamil Trzciński's avatar
Kamil Trzciński committed
147

148
	// Acquire build slot
149 150 151
	if !mr.buildsHelper.acquireBuild(runner) {
		mr.log().WithField("runner", runner.ShortDescription()).
			Debugln("Failed to request job: runner limit meet")
152
		return
153
	}
154
	defer mr.buildsHelper.releaseBuild(runner)
155

156 157 158 159 160 161 162
	var features common.FeaturesInfo
	provider.GetFeatures(&features)
	buildSession, sessionInfo, err := mr.createSession(features)
	if err != nil {
		return
	}

163
	// Receive a new build
164
	jobData, result := mr.requestJob(runner, sessionInfo)
165 166 167 168 169
	if !result {
		mr.log().WithField("runner", runner.ShortDescription()).
			Debugln("Failed to request job: runner requestConcurrency meet")
		return
	}
170
	if jobData == nil {
171
		return
Kamil Trzciński's avatar
Kamil Trzciński committed
172 173
	}

174
	// Make sure to always close output
175 176 177
	jobCredentials := &common.JobCredentials{
		ID:    jobData.ID,
		Token: jobData.Token,
178
	}
179
	trace := mr.network.ProcessJob(*runner, jobCredentials)
Tomasz Maczukin's avatar
Tomasz Maczukin committed
180
	defer trace.Fail(err, common.NoneFailure)
181

182 183
	trace.SetFailuresCollector(mr.failuresCollector)

Kamil Trzciński's avatar
Kamil Trzciński committed
184
	// Create a new build
185
	build := &common.Build{
186
		JobResponse:     *jobData,
187 188 189
		Runner:          runner,
		ExecutorData:    context,
		SystemInterrupt: mr.abortBuilds,
190
		Session:         buildSession,
Kamil Trzciński's avatar
Kamil Trzciński committed
191 192 193 194 195 196
	}

	// Add build to list of builds to assign numbers
	mr.buildsHelper.addBuild(build)
	defer mr.buildsHelper.removeBuild(build)

197 198 199
	// Process the same runner by different worker again
	// to speed up taking the builds
	select {
Kamil Trzciński's avatar
Kamil Trzciński committed
200
	case runners <- runner:
Kamil Trzciński's avatar
Kamil Trzciński committed
201
		mr.log().WithField("runner", runner.ShortDescription()).Debugln("Requeued the runner")
Kamil Trzciński's avatar
Kamil Trzciński committed
202

203
	default:
Kamil Trzciński's avatar
Kamil Trzciński committed
204
		mr.log().WithField("runner", runner.ShortDescription()).Debugln("Failed to requeue the runner: ")
205 206
	}

207
	// Process a build
Kamil Trzciński's avatar
Kamil Trzciński committed
208
	return build.Run(mr.config, trace)
209 210
}

211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
func (mr *RunCommand) createSession(features common.FeaturesInfo) (*session.Session, *common.SessionInfo, error) {
	if mr.sessionServer == nil || !features.Session {
		return nil, nil, nil
	}

	sess, err := session.NewSession(mr.log())
	if err != nil {
		return nil, nil, err
	}

	sessionInfo := &common.SessionInfo{
		URL:           mr.sessionServer.AdvertiseAddress + sess.Endpoint,
		Certificate:   string(mr.sessionServer.CertificatePublicKey),
		Authorization: sess.Token,
	}

	return sess, sessionInfo, err
}

230
func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan *common.RunnerConfig) {
Kamil Trzciński's avatar
Kamil Trzciński committed
231
	mr.log().WithField("worker", id).Debugln("Starting worker")
232
	for mr.stopSignal == nil {
233 234
		select {
		case runner := <-runners:
235
			mr.processRunner(id, runner, runners)
236 237 238

			// force GC cycle after processing build
			runtime.GC()
239

240
		case <-stopWorker:
Kamil Trzciński's avatar
Kamil Trzciński committed
241
			mr.log().WithField("worker", id).Debugln("Stopping worker")
242
			return
243 244
		}
	}
245
	<-stopWorker
246 247
}

248
func (mr *RunCommand) startWorkers(startWorker chan int, stopWorker chan bool, runners chan *common.RunnerConfig) {
249
	for mr.stopSignal == nil {
250 251
		id := <-startWorker
		go mr.processRunners(id, stopWorker, runners)
252
	}
253 254
}

255 256
func (mr *RunCommand) loadConfig() error {
	err := mr.configOptions.loadConfig()
Kamil Trzciński's avatar
Kamil Trzciński committed
257
	if err != nil {
258
		return err
259
	}
260 261

	// Set log level
262
	if !cli_helpers.CustomLogLevelSet && mr.config.LogLevel != nil {
263 264 265 266 267 268
		level, err := log.ParseLevel(*mr.config.LogLevel)
		if err != nil {
			log.Fatalf(err.Error())
		}
		log.SetLevel(level)
	}
Kamil Trzciński's avatar
Kamil Trzciński committed
269

270
	// pass user to execute scripts as specific user
271
	if mr.User != "" {
272
		mr.config.User = mr.User
273 274
	}

275
	mr.healthy = nil
276 277
	mr.log().Println("Configuration loaded")
	mr.log().Debugln(helpers.ToYAML(mr.config))
278 279 280

	// initialize sentry
	if mr.config.SentryDSN != nil {
Kamil Trzciński's avatar
Kamil Trzciński committed
281 282 283 284 285
		var err error
		mr.sentryLogHook, err = sentry.NewLogHook(*mr.config.SentryDSN)
		if err != nil {
			mr.log().WithError(err).Errorln("Sentry failure")
		}
286 287 288 289
	} else {
		mr.sentryLogHook = sentry.LogHook{}
	}

290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
	return nil
}

func (mr *RunCommand) checkConfig() (err error) {
	info, err := os.Stat(mr.ConfigFile)
	if err != nil {
		return err
	}

	if !mr.config.ModTime.Before(info.ModTime()) {
		return nil
	}

	err = mr.loadConfig()
	if err != nil {
		mr.log().Errorln("Failed to load config", err)
		// don't reload the same file
		mr.config.ModTime = info.ModTime()
		return
	}
310 311 312
	return nil
}

313
func (mr *RunCommand) Start(s service.Service) error {
314
	mr.abortBuilds = make(chan os.Signal)
315
	mr.runSignal = make(chan os.Signal, 1)
316
	mr.reloadSignal = make(chan os.Signal, 1)
317 318
	mr.runFinished = make(chan bool, 1)
	mr.stopSignals = make(chan os.Signal)
319
	mr.log().Println("Starting multi-runner from", mr.ConfigFile, "...")
320

321 322
	userModeWarning(false)

323 324
	if len(mr.WorkingDirectory) > 0 {
		err := os.Chdir(mr.WorkingDirectory)
325 326 327 328 329
		if err != nil {
			return err
		}
	}

330 331
	err := mr.loadConfig()
	if err != nil {
332
		return err
333 334
	}

335 336 337 338 339 340
	// Start should not block. Do the actual work async.
	go mr.Run()

	return nil
}

341
func (mr *RunCommand) updateWorkers(workerIndex *int, startWorker chan int, stopWorker chan bool) os.Signal {
342 343
	buildLimit := mr.config.Concurrent

344 345 346 347
	if buildLimit < 1 {
		mr.log().Fatalln("Concurrent is less than 1 - no jobs will be processed")
	}

348
	for mr.currentWorkers > buildLimit {
349 350
		select {
		case stopWorker <- true:
351
		case signaled := <-mr.runSignal:
352 353
			return signaled
		}
354
		mr.currentWorkers--
355 356
	}

357
	for mr.currentWorkers < buildLimit {
358 359
		select {
		case startWorker <- *workerIndex:
360
		case signaled := <-mr.runSignal:
361 362
			return signaled
		}
363
		mr.currentWorkers++
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
		*workerIndex++
	}

	return nil
}

func (mr *RunCommand) updateConfig() os.Signal {
	select {
	case <-time.After(common.ReloadConfigInterval * time.Second):
		err := mr.checkConfig()
		if err != nil {
			mr.log().Errorln("Failed to load config", err)
		}

	case <-mr.reloadSignal:
		err := mr.loadConfig()
		if err != nil {
			mr.log().Errorln("Failed to load config", err)
		}

384
	case signaled := <-mr.runSignal:
385 386 387 388 389
		return signaled
	}
	return nil
}

390 391 392 393 394 395 396
func (mr *RunCommand) runWait() {
	mr.log().Debugln("Waiting for stop signal")

	// Save the stop signal and exit to execute Stop()
	mr.stopSignal = <-mr.stopSignals
}

397
func (mr *RunCommand) serveMetrics(mux *http.ServeMux) {
398 399
	registry := prometheus.NewRegistry()
	// Metrics about the runner's business logic.
400
	registry.MustRegister(&mr.buildsHelper)
401
	registry.MustRegister(mr)
402
	// Metrics about API connections
403
	registry.MustRegister(mr.networkRequestStatusesCollector)
404 405
	// Metrics about jobs failures
	registry.MustRegister(mr.failuresCollector)
406 407
	// Metrics about catched errors
	registry.MustRegister(&mr.prometheusLogHook)
408 409 410 411 412 413 414
	// Metrics about the program's build version.
	registry.MustRegister(common.AppVersion.NewMetricsCollector())
	// Go-specific metrics about the process (GC stats, goroutines, etc.).
	registry.MustRegister(prometheus.NewGoCollector())
	// Go-unrelated process metrics (memory usage, file descriptors, etc.).
	registry.MustRegister(prometheus.NewProcessCollector(os.Getpid(), ""))

415 416 417 418 419 420 421
	// Register all executor provider collectors
	for _, provider := range common.GetExecutorProviders() {
		if collector, ok := provider.(prometheus.Collector); ok && collector != nil {
			registry.MustRegister(collector)
		}
	}

422
	mux.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
423 424
}

425
func (mr *RunCommand) serveDebugData(mux *http.ServeMux) {
426
	mux.HandleFunc("/debug/jobs/list", mr.buildsHelper.ListJobsHandler)
427 428 429
}

func (mr *RunCommand) setupMetricsAndDebugServer() {
430
	listenAddress, err := mr.listenAddress()
431 432

	if err != nil {
433
		mr.log().Errorf("invalid listen address: %s", err.Error())
434 435 436
		return
	}

437
	if listenAddress == "" {
438
		mr.log().Info("Listen address not defined, metrics server disabled")
439
		return
440 441
	}

442 443
	// We separate out the listener creation here so that we can return an error if
	// the provided address is invalid or there is some other listener error.
444
	listener, err := net.Listen("tcp", listenAddress)
445
	if err != nil {
446
		mr.log().WithError(err).Fatal("Failed to create listener for metrics server")
447 448
	}

449 450
	mux := http.NewServeMux()

451
	go func() {
452 453 454 455
		err := http.Serve(listener, mux)
		if err != nil {
			mr.log().WithError(err).Fatal("Metrics server terminated")
		}
456 457
	}()

458 459
	mr.serveMetrics(mux)
	mr.serveDebugData(mux)
460

461 462 463
	mr.log().
		WithField("address", listenAddress).
		Info("Metrics server listening")
464 465
}

466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489
func (mr *RunCommand) setupSessionServer() {
	if mr.config.SessionServer.ListenAddress == "" {
		mr.log().Info("Listen address not defined, session server disabled")
		return
	}

	var err error
	mr.sessionServer, err = session.NewServer(
		session.ServerConfig{
			AdvertiseAddress: mr.config.SessionServer.AdvertiseAddress,
			ListenAddress:    mr.config.SessionServer.ListenAddress,
			ShutdownTimeout:  common.ShutdownTimeout * time.Second,
		},
		mr.log(),
		certificate.X509Generator{},
		mr.buildsHelper.findSessionByURL,
	)
	if err != nil {
		mr.log().WithError(err).Fatal("Failed to create session server")
	}

	go func() {
		err := mr.sessionServer.Start()
		if err != nil {
490
			mr.log().WithError(err).Fatal("Session server terminated")
491 492 493 494 495 496 497 498
		}
	}()

	mr.log().
		WithField("address", mr.config.SessionServer.ListenAddress).
		Info("Session server listening")
}

499 500
func (mr *RunCommand) Run() {
	mr.setupMetricsAndDebugServer()
501
	mr.setupSessionServer()
502

503
	runners := make(chan *common.RunnerConfig)
504 505
	go mr.feedRunners(runners)

506 507 508
	signal.Notify(mr.stopSignals, syscall.SIGQUIT, syscall.SIGTERM, os.Interrupt, os.Kill)
	signal.Notify(mr.reloadSignal, syscall.SIGHUP)

509 510 511
	startWorker := make(chan int)
	stopWorker := make(chan bool)
	go mr.startWorkers(startWorker, stopWorker, runners)
512

513
	workerIndex := 0
514

515
	for mr.stopSignal == nil {
516
		signaled := mr.updateWorkers(&workerIndex, startWorker, stopWorker)
517 518
		if signaled != nil {
			break
Kamil Trzciński's avatar
Kamil Trzciński committed
519 520
		}

521 522 523
		signaled = mr.updateConfig()
		if signaled != nil {
			break
524 525 526 527
		}
	}

	// Wait for workers to shutdown
528
	for mr.currentWorkers > 0 {
529
		stopWorker <- true
530
		mr.currentWorkers--
531
	}
532
	mr.log().Println("All workers stopped. Can exit now")
533
	mr.runFinished <- true
534
}
535

536
func (mr *RunCommand) interruptRun() {
537
	// Pump interrupt signal
538 539 540 541 542 543 544 545 546 547 548
	for {
		mr.runSignal <- mr.stopSignal
	}
}

func (mr *RunCommand) abortAllBuilds() {
	// Pump signal to abort all current builds
	for {
		mr.abortBuilds <- mr.stopSignal
	}
}
549

550
func (mr *RunCommand) handleGracefulShutdown() error {
551
	// We wait till we have a SIGQUIT
552 553
	for mr.stopSignal == syscall.SIGQUIT {
		mr.log().Warningln("Requested quit, waiting for builds to finish")
554 555 556 557

		// Wait for other signals to finish builds
		select {
		case mr.stopSignal = <-mr.stopSignals:
558
		// We received a new signal
559 560 561 562 563

		case <-mr.runFinished:
			// Everything finished we can exit now
			return nil
		}
564 565
	}

566 567 568 569
	return fmt.Errorf("received: %v", mr.stopSignal)
}

func (mr *RunCommand) handleShutdown() error {
570 571
	mr.log().Warningln("Requested service stop:", mr.stopSignal)

572
	go mr.abortAllBuilds()
573

574 575 576 577
	if mr.sessionServer != nil {
		mr.sessionServer.Close()
	}

578
	// Wait for graceful shutdown or abort after timeout
579 580 581 582 583 584 585 586 587
	for {
		select {
		case mr.stopSignal = <-mr.stopSignals:
			return fmt.Errorf("forced exit: %v", mr.stopSignal)

		case <-time.After(common.ShutdownTimeout * time.Second):
			return errors.New("shutdown timedout")

		case <-mr.runFinished:
Kamil Trzciński's avatar
Kamil Trzciński committed
588
			// Everything finished we can exit now
589 590
			return nil
		}
591
	}
592
}
Kamil Trzciński's avatar
Kamil Trzciński committed
593

594 595 596 597 598 599 600 601 602 603
func (mr *RunCommand) Stop(s service.Service) (err error) {
	go mr.interruptRun()
	err = mr.handleGracefulShutdown()
	if err == nil {
		return
	}
	err = mr.handleShutdown()
	return
}

604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629
// Describe implements prometheus.Collector.
func (mr *RunCommand) Describe(ch chan<- *prometheus.Desc) {
	ch <- concurrentDesc
	ch <- limitDesc
}

// Collect implements prometheus.Collector.
func (mr *RunCommand) Collect(ch chan<- prometheus.Metric) {
	config := mr.config

	ch <- prometheus.MustNewConstMetric(
		concurrentDesc,
		prometheus.GaugeValue,
		float64(config.Concurrent),
	)

	for _, runner := range config.Runners {
		ch <- prometheus.MustNewConstMetric(
			limitDesc,
			prometheus.GaugeValue,
			float64(runner.Limit),
			runner.ShortDescription(),
		)
	}
}

630
func (mr *RunCommand) Execute(context *cli.Context) {
631
	svcConfig := &service.Config{
632 633
		Name:        mr.ServiceName,
		DisplayName: mr.ServiceName,
634
		Description: defaultDescription,
635
		Arguments:   []string{"run"},
636 637 638
		Option: service.KeyValue{
			"RunWait": mr.runWait,
		},
639 640
	}

641
	service, err := service_helpers.New(mr, svcConfig)
642
	if err != nil {
643
		log.Fatalln(err)
644 645
	}

646
	if mr.Syslog {
647
		log.SetFormatter(new(log.TextFormatter))
648
		logger, err := service.SystemLogger(nil)
649
		if err == nil {
650
			log.AddHook(&ServiceLogHook{logger, log.InfoLevel})
651
		} else {
652
			log.Errorln(err)
653
		}
654 655
	}

656
	log.AddHook(&mr.sentryLogHook)
657
	log.AddHook(&mr.prometheusLogHook)
658

659
	err = service.Run()
660
	if err != nil {
661
		log.Fatalln(err)
662 663 664 665
	}
}

func init() {
666 667
	requestStatusesCollector := network.NewAPIRequestStatusesMap()

668
	common.RegisterCommand2("run", "run multi runner service", &RunCommand{
Alessio Caiazza's avatar
Alessio Caiazza committed
669 670
		ServiceName: defaultServiceName,
		network:     network.NewGitLabClientWithRequestStatusesMap(requestStatusesCollector),
671 672 673
		networkRequestStatusesCollector: requestStatusesCollector,
		prometheusLogHook:               prometheus_helper.NewLogHook(),
		failuresCollector:               prometheus_helper.NewFailuresCollector(),
674
		buildsHelper:                    newBuildsHelper(),
675
	})
676
}