Skip to content
Snippets Groups Projects
Select Git revision
  • main default protected
  • create-k8s-init-objects-dynamically-with-spec-patch
  • ajwalker/fix-depth
  • avonbertoldi/clean-up-ubi-images
  • 16-0-stable
  • update-and-cleanup-git-lfs-install
  • vshushlin/emply-runner-registration-token
  • fneill-move-troubleshooting-to-new-page
  • renovate/golang-1.x
  • renovate/alpine-3.x
  • 31047-don-t-use-docker-links-for-user-defined-networks
  • vshushlin/issue-templates
  • 16-1-stable
  • 16-2-stable
  • fix-job-log-collapsible-sections
  • ajwalker/upgrade-fleeting-latest
  • add-check-interval-metric
  • sh-fix-fips-shell-executor
  • avonbertoldi/29944/revisit-alpine-image-creation-2
  • kballon-main-patch-23985
  • v16.0.3
  • v16.2.1
  • v16.1.1
  • v16.2.0
  • v16.1.0
  • v16.0.2
  • v15.11.1
  • v16.0.1
  • v16.0.0
  • v15.11.0
  • v15.10.1
  • v15.10.0
  • v15.9.1
  • v15.9.0
  • v15.8.3
  • v15.8.2
  • v15.8.1
  • v15.7.4
  • v15.5.2
  • v15.8.0
40 results

multi.go

Forked from GitLab.org / gitlab-runner
11526 commits behind, 9 commits ahead of the upstream repository.
multi.go 9.31 KiB
package commands

import (
	"errors"
	"fmt"
	"os"
	"os/signal"
	"runtime"
	"syscall"
	"time"

	service "github.com/ayufan/golang-kardianos-service"
	"github.com/codegangsta/cli"

	log "github.com/Sirupsen/logrus"

	"gitlab.com/gitlab-org/gitlab-ci-multi-runner/common"
	"gitlab.com/gitlab-org/gitlab-ci-multi-runner/helpers"
	"gitlab.com/gitlab-org/gitlab-ci-multi-runner/helpers/service"
	"gitlab.com/gitlab-org/gitlab-ci-multi-runner/network"
)

type runnerAcquire struct {
	common.RunnerConfig
	provider common.ExecutorProvider
	data     common.ExecutorData
}

func (r *runnerAcquire) Release() {
	r.provider.Release(&r.RunnerConfig, r.data)
}

type RunCommand struct {
	configOptions
	network common.Network
	healthHelper

	buildsHelper buildsHelper

	ServiceName      string `short:"n" long:"service" description:"Use different names for different services"`
	WorkingDirectory string `short:"d" long:"working-directory" description:"Specify custom working directory"`
	User             string `short:"u" long:"user" description:"Use specific user to execute shell scripts"`
	Syslog           bool   `long:"syslog" description:"Log to syslog"`

	// abortBuilds is used to abort running builds
	abortBuilds chan os.Signal

	// runSignal is used to abort current operation (scaling workers, waiting for config)
	runSignal chan os.Signal

	// reloadSignal is used to trigger forceful config reload
	reloadSignal chan os.Signal

	// stopSignals is to catch a signals notified to process: SIGTERM, SIGQUIT, Interrupt, Kill
	stopSignals chan os.Signal

	// stopSignal is used to preserve the signal that was used to stop the process
	// In case this is SIGQUIT it makes to finish all buids
	stopSignal os.Signal

	// runFinished is used to notify that Run() did finish
	runFinished chan bool
}

func (mr *RunCommand) log() *log.Entry {
	return log.WithField("builds", len(mr.buildsHelper.builds))
}

func (mr *RunCommand) feedRunner(runner *common.RunnerConfig, runners chan *runnerAcquire) {
	if !mr.isHealthy(runner.UniqueID()) {
		return
	}

	provider := common.GetExecutor(runner.Executor)
	if provider == nil {
		return
	}

	data, err := provider.Acquire(runner)
	if err != nil {
		log.Warningln("Failed to update executor", runner.Executor, "for", runner.ShortDescription(), err)
		return
	}

	runners <- &runnerAcquire{*runner, provider, data}
}

func (mr *RunCommand) feedRunners(runners chan *runnerAcquire) {
	for mr.stopSignal == nil {
		mr.log().Debugln("Feeding runners to channel")
		config := mr.config
		for _, runner := range config.Runners {
			mr.feedRunner(runner, runners)
		}
		time.Sleep(common.CheckInterval * time.Second)
	}
}

func (mr *RunCommand) processRunner(id int, runner *runnerAcquire) (err error) {
	defer runner.Release()

	// Acquire build slot
	if !mr.buildsHelper.acquire(runner) {
		return
	}
	defer mr.buildsHelper.release(runner)

	// Receive a new build
	buildData, healthy := mr.network.GetBuild(runner.RunnerConfig)
	mr.makeHealthy(runner.UniqueID(), healthy)
	if buildData == nil {
		return
	}

	// Make sure to always close output
	trace := mr.network.ProcessBuild(runner.RunnerConfig, buildData.ID)
	defer trace.Fail(err)

	// Create a new build
	build := &common.Build{
		GetBuildResponse: *buildData,
		Runner:           &runner.RunnerConfig,
		ExecutorData:     runner.data,
		BuildAbort:       mr.abortBuilds,
	}

	// Add build to list of builds to assign numbers
	mr.buildsHelper.addBuild(build)
	defer mr.buildsHelper.removeBuild(build)

	// Process a build
	return build.Run(mr.config, trace)
}

func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan *runnerAcquire) {
	mr.log().Debugln("Starting worker", id)
	for mr.stopSignal == nil {
		select {
		case runner := <-runners:
			mr.processRunner(id, runner)

			// force GC cycle after processing build
			runtime.GC()

		case <-stopWorker:
			mr.log().Debugln("Stopping worker", id)
			return
		}
	}
	<-stopWorker
}

func (mr *RunCommand) startWorkers(startWorker chan int, stopWorker chan bool, runners chan *runnerAcquire) {
	for mr.stopSignal == nil {
		id := <-startWorker
		go mr.processRunners(id, stopWorker, runners)
	}
}

func (mr *RunCommand) loadConfig() error {
	err := mr.configOptions.loadConfig()
	if err != nil {
		return err
	}

	// pass user to execute scripts as specific user
	if mr.User != "" {
		mr.config.User = mr.User
	}

	mr.healthy = nil
	mr.log().Println("Config loaded:", helpers.ToYAML(mr.config))
	return nil
}

func (mr *RunCommand) checkConfig() (err error) {
	info, err := os.Stat(mr.ConfigFile)
	if err != nil {
		return err
	}

	if !mr.config.ModTime.Before(info.ModTime()) {
		return nil
	}

	err = mr.loadConfig()
	if err != nil {
		mr.log().Errorln("Failed to load config", err)
		// don't reload the same file
		mr.config.ModTime = info.ModTime()
		return
	}
	return nil
}

func (mr *RunCommand) Start(s service.Service) error {
	mr.abortBuilds = make(chan os.Signal)
	mr.runSignal = make(chan os.Signal, 1)
	mr.reloadSignal = make(chan os.Signal, 1)
	mr.runFinished = make(chan bool, 1)
	mr.stopSignals = make(chan os.Signal)
	mr.log().Println("Starting multi-runner from", mr.ConfigFile, "...")

	userModeWarning(false)

	if len(mr.WorkingDirectory) > 0 {
		err := os.Chdir(mr.WorkingDirectory)
		if err != nil {
			return err
		}
	}

	err := mr.loadConfig()
	if err != nil {
		return err
	}

	// Start should not block. Do the actual work async.
	go mr.Run()

	return nil
}

func (mr *RunCommand) updateWorkers(currentWorkers, workerIndex *int, startWorker chan int, stopWorker chan bool) os.Signal {
	buildLimit := mr.config.Concurrent

	for *currentWorkers > buildLimit {
		select {
		case stopWorker <- true:
		case signaled := <-mr.runSignal:
			return signaled
		}
		*currentWorkers--
	}

	for *currentWorkers < buildLimit {
		select {
		case startWorker <- *workerIndex:
		case signaled := <-mr.runSignal:
			return signaled
		}
		*currentWorkers++
		*workerIndex++
	}

	return nil
}

func (mr *RunCommand) updateConfig() os.Signal {
	select {
	case <-time.After(common.ReloadConfigInterval * time.Second):
		err := mr.checkConfig()
		if err != nil {
			mr.log().Errorln("Failed to load config", err)
		}

	case <-mr.reloadSignal:
		err := mr.loadConfig()
		if err != nil {
			mr.log().Errorln("Failed to load config", err)
		}

	case signaled := <-mr.runSignal:
		return signaled
	}
	return nil
}

func (mr *RunCommand) runWait() {
	mr.log().Debugln("Waiting for stop signal")

	// Save the stop signal and exit to execute Stop()
	mr.stopSignal = <-mr.stopSignals
}

func (mr *RunCommand) Run() {
	runners := make(chan *runnerAcquire)
	go mr.feedRunners(runners)

	signal.Notify(mr.stopSignals, syscall.SIGQUIT, syscall.SIGTERM, os.Interrupt, os.Kill)
	signal.Notify(mr.reloadSignal, syscall.SIGHUP)

	startWorker := make(chan int)
	stopWorker := make(chan bool)
	go mr.startWorkers(startWorker, stopWorker, runners)

	currentWorkers := 0
	workerIndex := 0

	for mr.stopSignal == nil {
		signaled := mr.updateWorkers(&currentWorkers, &workerIndex, startWorker, stopWorker)
		if signaled != nil {
			break
		}

		signaled = mr.updateConfig()
		if signaled != nil {
			break
		}
	}

	// Wait for workers to shutdown
	for currentWorkers > 0 {
		stopWorker <- true
		currentWorkers--
	}
	mr.log().Println("All workers stopped. Can exit now")
	mr.runFinished <- true
}

func (mr *RunCommand) interruptRun() {
	// Pump interrupt signal
	for {
		mr.runSignal <- mr.stopSignal
	}
}

func (mr *RunCommand) abortAllBuilds() {
	// Pump signal to abort all current builds
	for {
		mr.abortBuilds <- mr.stopSignal
	}
}

func (mr *RunCommand) handleGracefulShutdown() error {
	// We wait till we have a SIGQUIT
	for mr.stopSignal == syscall.SIGQUIT {
		mr.log().Warningln("Requested quit, waiting for builds to finish")

		// Wait for other signals to finish builds
		select {
		case mr.stopSignal = <-mr.stopSignals:
		// We received a new signal

		case <-mr.runFinished:
			// Everything finished we can exit now
			return nil
		}
	}

	return fmt.Errorf("received: %v", mr.stopSignal)
}

func (mr *RunCommand) handleShutdown() error {
	mr.log().Warningln("Requested service stop:", mr.stopSignal)

	go mr.abortAllBuilds()

	// Wait for graceful shutdown or abort after timeout
	for {
		select {
		case mr.stopSignal = <-mr.stopSignals:
			return fmt.Errorf("forced exit: %v", mr.stopSignal)

		case <-time.After(common.ShutdownTimeout * time.Second):
			return errors.New("shutdown timedout")

		case <-mr.runFinished:
			// Everything finished we can exit now
			return nil
		}
	}
}

func (mr *RunCommand) Stop(s service.Service) (err error) {
	go mr.interruptRun()
	err = mr.handleGracefulShutdown()
	if err == nil {
		return
	}
	err = mr.handleShutdown()
	return
}

func (mr *RunCommand) Execute(context *cli.Context) {
	svcConfig := &service.Config{
		Name:        mr.ServiceName,
		DisplayName: mr.ServiceName,
		Description: defaultDescription,
		Arguments:   []string{"run"},
		Option: service.KeyValue{
			"RunWait": mr.runWait,
		},
	}

	service, err := service_helpers.New(mr, svcConfig)
	if err != nil {
		log.Fatalln(err)
	}

	if mr.Syslog {
		log.SetFormatter(new(log.TextFormatter))
		logger, err := service.SystemLogger(nil)
		if err == nil {
			log.AddHook(&ServiceLogHook{logger})
		} else {
			log.Errorln(err)
		}
	}

	err = service.Run()
	if err != nil {
		log.Fatalln(err)
	}
}

func init() {
	common.RegisterCommand2("run", "run multi runner service", &RunCommand{
		ServiceName: defaultServiceName,
		network:     &network.GitLabClient{},
	})
}