Skip to content
Snippets Groups Projects
Unverified Commit e437e5b8 authored by Kamil Trzciński's avatar Kamil Trzciński :speech_balloon:
Browse files

Introduce cancelable prepare contexes

parent c7c183b7
No related branches found
No related tags found
1 merge request!559Use contexes everywhere
......@@ -292,6 +292,13 @@ func (b *Build) run(executor Executor) (err error) {
}
func (b *Build) retryCreateExecutor(globalConfig *Config, provider ExecutorProvider, logger BuildLogger) (executor Executor, err error) {
options := ExecutorPrepareOptions{
Config: b.Runner,
Build: b,
Trace: b.Trace,
User: globalConfig.User,
}
for tries := 0; tries < PreparationRetries; tries++ {
executor = provider.Create()
if executor == nil {
......@@ -301,7 +308,7 @@ func (b *Build) retryCreateExecutor(globalConfig *Config, provider ExecutorProvi
b.executorStageResolver = executor.GetCurrentStage
err = executor.Prepare(globalConfig, b.Runner, b)
err = executor.Prepare(options)
if err == nil {
break
}
......
......@@ -21,9 +21,16 @@ const (
ExecutorStageCleanup ExecutorStage = "cleanup"
)
type ExecutorPrepareOptions struct {
Config *RunnerConfig
Build *Build
Trace JobTrace
User string
}
type Executor interface {
Shell() *ShellScriptInfo
Prepare(globalConfig *Config, config *RunnerConfig, build *Build) error
Prepare(options ExecutorPrepareOptions) error
Run(cmd ExecutorCommand) error
Finish(err error)
Cleanup()
......
package docker
import "time"
const DockerAPIVersion = "1.18"
const dockerLabelPrefix = "com.gitlab.gitlab-runner"
const prebuiltImageName = "gitlab/gitlab-runner-helper"
const prebuiltImageExtension = ".tar.xz"
const dockerCleanupTimeout = 5 * time.Minute
......@@ -48,17 +48,19 @@ type dockerOptions struct {
type executor struct {
executors.AbstractExecutor
client docker_helpers.Client
failures []string // IDs of containers that have failed in some way
builds []*types.Container
services []*types.Container
caches []string // IDs of cache containers
options dockerOptions
info types.Info
binds []string
volumesFrom []string
devices []container.DeviceMapping
links []string
client docker_helpers.Client
failures []string // IDs of containers that have failed in some way
builds []*types.Container
services []*types.Container
caches []string // IDs of cache containers
options dockerOptions
info types.Info
binds []string
volumesFrom []string
devices []container.DeviceMapping
links []string
context context.Context
contextCancel context.CancelFunc
}
func (s *executor) getServiceVariables() []string {
......@@ -147,14 +149,14 @@ func (s *executor) pullDockerImage(imageName string, ac *types.AuthConfig) (*typ
options.RegistryAuth, _ = docker_helpers.EncodeAuthConfig(ac)
}
if err := s.client.ImagePullBlocking(context.TODO(), ref, options); err != nil {
if err := s.client.ImagePullBlocking(s.context, ref, options); err != nil {
if strings.Contains(err.Error(), "not found") {
return nil, &common.BuildError{Inner: err}
}
return nil, err
}
image, _, err := s.client.ImageInspectWithRaw(context.TODO(), imageName)
image, _, err := s.client.ImageInspectWithRaw(s.context, imageName)
return &image, err
}
......@@ -167,7 +169,7 @@ func (s *executor) getDockerImage(imageName string) (*types.ImageInspect, error)
authConfig := s.getAuthConfig(imageName)
s.Debugln("Looking for image", imageName, "...")
image, _, err := s.client.ImageInspectWithRaw(context.TODO(), imageName)
image, _, err := s.client.ImageInspectWithRaw(s.context, imageName)
// If never is specified then we return what inspect did return
if pullPolicy == common.PullPolicyNever {
......@@ -223,7 +225,7 @@ func (s *executor) getPrebuiltImage() (*types.ImageInspect, error) {
imageName := prebuiltImageName + ":" + architecture + "-" + common.REVISION
s.Debugln("Looking for prebuilt image", imageName, "...")
image, _, err := s.client.ImageInspectWithRaw(context.TODO(), imageName)
image, _, err := s.client.ImageInspectWithRaw(s.context, imageName)
if err == nil {
return &image, nil
}
......@@ -244,11 +246,11 @@ func (s *executor) getPrebuiltImage() (*types.ImageInspect, error) {
Tag: architecture + "-" + common.REVISION,
}
if err := s.client.ImageImportBlocking(context.TODO(), source, ref, options); err != nil {
if err := s.client.ImageImportBlocking(s.context, source, ref, options); err != nil {
return nil, fmt.Errorf("Failed to import image: %s", err)
}
image, _, err = s.client.ImageInspectWithRaw(context.TODO(), imageName)
image, _, err = s.client.ImageInspectWithRaw(s.context, imageName)
if err != nil {
s.Debugln("Inspecting imported image", imageName, "failed:", err)
return nil, err
......@@ -315,7 +317,7 @@ func (s *executor) createCacheVolume(containerName, containerPath string) (strin
},
}
resp, err := s.client.ContainerCreate(context.TODO(), config, hostConfig, nil, containerName)
resp, err := s.client.ContainerCreate(s.context, config, hostConfig, nil, containerName)
if err != nil {
if resp.ID != "" {
s.failures = append(s.failures, resp.ID)
......@@ -324,7 +326,7 @@ func (s *executor) createCacheVolume(containerName, containerPath string) (strin
}
s.Debugln("Starting cache container", resp.ID, "...")
err = s.client.ContainerStart(context.TODO(), resp.ID, types.ContainerStartOptions{})
err = s.client.ContainerStart(s.context, resp.ID, types.ContainerStartOptions{})
if err != nil {
s.failures = append(s.failures, resp.ID)
return "", err
......@@ -367,11 +369,11 @@ func (s *executor) addCacheVolume(containerPath string) error {
// get existing cache container
var containerID string
containerName := fmt.Sprintf("%s-cache-%x", s.Build.ProjectUniqueName(), hash)
if inspected, err := s.client.ContainerInspect(context.TODO(), containerName); err == nil {
if inspected, err := s.client.ContainerInspect(s.context, containerName); err == nil {
// check if we have valid cache, if not remove the broken container
if _, ok := inspected.Config.Volumes[containerPath]; !ok {
s.Debugln("Removing broken cache container for ", containerPath, "path")
s.removeContainer(inspected.ID)
s.removeContainer(s.context, inspected.ID)
} else {
containerID = inspected.ID
}
......@@ -575,7 +577,7 @@ func (s *executor) createService(service, version, image string) (*types.Contain
containerName := s.Build.ProjectUniqueName() + "-" + strings.Replace(service, "/", "__", -1)
// this will fail potentially some builds if there's name collision
s.removeContainer(containerName)
s.removeContainer(s.context, containerName)
config := &container.Config{
Image: serviceImage.ID,
......@@ -596,13 +598,13 @@ func (s *executor) createService(service, version, image string) (*types.Contain
}
s.Debugln("Creating service container", containerName, "...")
resp, err := s.client.ContainerCreate(context.TODO(), config, hostConfig, nil, containerName)
resp, err := s.client.ContainerCreate(s.context, config, hostConfig, nil, containerName)
if err != nil {
return nil, err
}
s.Debugln("Starting service container", resp.ID, "...")
err = s.client.ContainerStart(context.TODO(), resp.ID, types.ContainerStartOptions{})
err = s.client.ContainerStart(s.context, resp.ID, types.ContainerStartOptions{})
if err != nil {
s.failures = append(s.failures, resp.ID)
return nil, err
......@@ -650,7 +652,7 @@ func (s *executor) waitForServices() {
func (s *executor) buildServiceLinks(linksMap map[string]*types.Container) (links []string) {
for linkName, linkee := range linksMap {
newContainer, err := s.client.ContainerInspect(context.TODO(), linkee.ID)
newContainer, err := s.client.ContainerInspect(s.context, linkee.ID)
if err != nil {
continue
}
......@@ -761,10 +763,10 @@ func (s *executor) createContainer(containerType, imageName string, cmd []string
}
// this will fail potentially some builds if there's name collision
s.removeContainer(containerName)
s.removeContainer(s.context, containerName)
s.Debugln("Creating container", containerName, "...")
resp, err := s.client.ContainerCreate(context.TODO(), config, hostConfig, nil, containerName)
resp, err := s.client.ContainerCreate(s.context, config, hostConfig, nil, containerName)
if err != nil {
if resp.ID != "" {
s.failures = append(s.failures, resp.ID)
......@@ -772,7 +774,7 @@ func (s *executor) createContainer(containerType, imageName string, cmd []string
return nil, err
}
inspect, err := s.client.ContainerInspect(context.TODO(), resp.ID)
inspect, err := s.client.ContainerInspect(s.context, resp.ID)
if err != nil {
s.failures = append(s.failures, resp.ID)
return nil, err
......@@ -782,9 +784,9 @@ func (s *executor) createContainer(containerType, imageName string, cmd []string
func (s *executor) killContainer(id string, waitCh chan error) (err error) {
for {
s.disconnectNetwork(id)
s.disconnectNetwork(s.context, id)
s.Debugln("Killing container", id, "...")
s.client.ContainerKill(context.TODO(), id, "SIGKILL")
s.client.ContainerKill(s.context, id, "SIGKILL")
// Wait for signal that container were killed
// or retry after some time
......@@ -804,7 +806,7 @@ func (s *executor) waitForContainer(id string) error {
// Use active wait
for {
container, err := s.client.ContainerInspect(context.TODO(), id)
container, err := s.client.ContainerInspect(s.context, id)
if err != nil {
if docker_helpers.IsErrNotFound(err) {
return err
......@@ -846,14 +848,14 @@ func (s *executor) watchContainer(id string, input io.Reader, abort chan interfa
}
s.Debugln("Attaching to container", id, "...")
hijacked, err := s.client.ContainerAttach(context.TODO(), id, options)
hijacked, err := s.client.ContainerAttach(s.context, id, options)
if err != nil {
return
}
defer hijacked.Close()
s.Debugln("Starting container", id, "...")
err = s.client.ContainerStart(context.TODO(), id, types.ContainerStartOptions{})
err = s.client.ContainerStart(s.context, id, types.ContainerStartOptions{})
if err != nil {
return
}
......@@ -898,19 +900,19 @@ func (s *executor) watchContainer(id string, input io.Reader, abort chan interfa
return
}
func (s *executor) removeContainer(id string) error {
s.disconnectNetwork(id)
func (s *executor) removeContainer(ctx context.Context, id string) error {
s.disconnectNetwork(ctx, id)
options := types.ContainerRemoveOptions{
RemoveVolumes: true,
Force: true,
}
err := s.client.ContainerRemove(context.TODO(), id, options)
err := s.client.ContainerRemove(ctx, id, options)
s.Debugln("Removed container", id, "with", err)
return err
}
func (s *executor) disconnectNetwork(id string) error {
netList, err := s.client.NetworkList(context.TODO(), types.NetworkListOptions{})
func (s *executor) disconnectNetwork(ctx context.Context, id string) error {
netList, err := s.client.NetworkList(ctx, types.NetworkListOptions{})
if err != nil {
s.Debugln("Can't get network list. ListNetworks exited with", err)
return err
......@@ -919,7 +921,7 @@ func (s *executor) disconnectNetwork(id string) error {
for _, network := range netList {
for _, pluggedContainer := range network.Containers {
if id == pluggedContainer.Name {
err = s.client.NetworkDisconnect(context.TODO(), network.ID, id, true)
err = s.client.NetworkDisconnect(ctx, network.ID, id, true)
if err != nil {
s.Warningln("Can't disconnect possibly zombie container", pluggedContainer.Name, "from network", network.Name, "->", err)
} else {
......@@ -986,7 +988,7 @@ func (s *executor) connectDocker() (err error) {
}
s.client = client
s.info, err = client.Info(context.TODO())
s.info, err = client.Info(s.context)
if err != nil {
return err
}
......@@ -1024,13 +1026,13 @@ func (s *executor) createDependencies() (err error) {
return
}
func (s *executor) Prepare(globalConfig *common.Config, config *common.RunnerConfig, build *common.Build) error {
err := s.prepareBuildsDir(config)
func (s *executor) Prepare(options common.ExecutorPrepareOptions) error {
err := s.prepareBuildsDir(options.Config)
if err != nil {
return err
}
err = s.AbstractExecutor.Prepare(globalConfig, config, build)
err = s.AbstractExecutor.Prepare(options)
if err != nil {
return err
}
......@@ -1039,7 +1041,7 @@ func (s *executor) Prepare(globalConfig *common.Config, config *common.RunnerCon
return errors.New("Docker doesn't support shells that require script file")
}
if config.Docker == nil {
if options.Config.Docker == nil {
return errors.New("Missing docker configuration")
}
......@@ -1092,10 +1094,14 @@ func (s *executor) Cleanup() {
s.SetCurrentStage(DockerExecutorStageCleanup)
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(context.Background(), dockerCleanupTimeout)
defer cancel()
remove := func(id string) {
wg.Add(1)
go func() {
s.removeContainer(id)
s.removeContainer(ctx, id)
wg.Done()
}()
}
......@@ -1147,12 +1153,12 @@ func (s *executor) runServiceHealthCheckContainer(service *types.Container, time
},
}
s.Debugln("Waiting for service container", containerName, "to be up and running...")
resp, err := s.client.ContainerCreate(context.TODO(), config, hostConfig, nil, containerName)
resp, err := s.client.ContainerCreate(s.context, config, hostConfig, nil, containerName)
if err != nil {
return err
}
defer s.removeContainer(resp.ID)
err = s.client.ContainerStart(context.TODO(), resp.ID, types.ContainerStartOptions{})
defer s.removeContainer(s.context, resp.ID)
err = s.client.ContainerStart(s.context, resp.ID, types.ContainerStartOptions{})
if err != nil {
return err
}
......@@ -1191,7 +1197,7 @@ func (s *executor) waitForServiceContainer(service *types.Container, timeout tim
Timestamps: true,
}
hijacked, err := s.client.ContainerLogs(context.TODO(), service.ID, options)
hijacked, err := s.client.ContainerLogs(s.context, service.ID, options)
if err == nil {
defer hijacked.Close()
stdcopy.StdCopy(&containerBuffer, &containerBuffer, hijacked)
......
......@@ -15,8 +15,8 @@ type commandExecutor struct {
buildContainer *types.ContainerJSON
}
func (s *commandExecutor) Prepare(globalConfig *common.Config, config *common.RunnerConfig, build *common.Build) error {
err := s.executor.Prepare(globalConfig, config, build)
func (s *commandExecutor) Prepare(options common.ExecutorPrepareOptions) error {
err := s.executor.Prepare(options)
if err != nil {
return err
}
......
......@@ -8,8 +8,6 @@ import (
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/common"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/executors"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/helpers/ssh"
"golang.org/x/net/context"
)
type sshExecutor struct {
......@@ -17,8 +15,8 @@ type sshExecutor struct {
sshCommand ssh.Client
}
func (s *sshExecutor) Prepare(globalConfig *common.Config, config *common.RunnerConfig, build *common.Build) error {
err := s.executor.Prepare(globalConfig, config, build)
func (s *sshExecutor) Prepare(options common.ExecutorPrepareOptions) error {
err := s.executor.Prepare(options)
if err != nil {
return err
}
......@@ -41,12 +39,12 @@ func (s *sshExecutor) Prepare(globalConfig *common.Config, config *common.Runner
}
s.Debugln("Starting container", container.ID, "...")
err = s.client.ContainerStart(context.TODO(), container.ID, types.ContainerStartOptions{})
err = s.client.ContainerStart(s.context, container.ID, types.ContainerStartOptions{})
if err != nil {
return err
}
containerData, err := s.client.ContainerInspect(context.TODO(), container.ID)
containerData, err := s.client.ContainerInspect(s.context, container.ID)
if err != nil {
return err
}
......
......@@ -56,22 +56,22 @@ func (e *machineExecutor) Shell() *common.ShellScriptInfo {
return e.executor.Shell()
}
func (e *machineExecutor) Prepare(globalConfig *common.Config, config *common.RunnerConfig, build *common.Build) (err error) {
e.build = build
func (e *machineExecutor) Prepare(options common.ExecutorPrepareOptions) (err error) {
e.build = options.Build
// Use the machine
e.SetCurrentStage(DockerMachineExecutorStageUseMachine)
e.config, e.data, err = e.provider.Use(config, build.ExecutorData)
e.config, e.data, err = e.provider.Use(options.Config, options.Build.ExecutorData)
if err != nil {
return err
}
// TODO: Currently the docker-machine doesn't support multiple builds
build.ProjectRunnerID = 0
if details, _ := build.ExecutorData.(*machineDetails); details != nil {
build.Hostname = details.Name
e.build.ProjectRunnerID = 0
if details, _ := options.Build.ExecutorData.(*machineDetails); details != nil {
options.Build.Hostname = details.Name
} else if details, _ := e.data.(*machineDetails); details != nil {
build.Hostname = details.Name
options.Build.Hostname = details.Name
}
e.log().Infoln("Starting docker-machine build...")
......@@ -81,7 +81,7 @@ func (e *machineExecutor) Prepare(globalConfig *common.Config, config *common.Ru
if e.executor == nil {
return errors.New("failed to create an executor")
}
return e.executor.Prepare(globalConfig, &e.config, build)
return e.executor.Prepare(options)
}
func (e *machineExecutor) Run(cmd common.ExecutorCommand) error {
......
......@@ -71,12 +71,12 @@ func (e *AbstractExecutor) Shell() *common.ShellScriptInfo {
return &e.ExecutorOptions.Shell
}
func (e *AbstractExecutor) Prepare(globalConfig *common.Config, config *common.RunnerConfig, build *common.Build) error {
func (e *AbstractExecutor) Prepare(options common.ExecutorPrepareOptions) error {
e.currentStage = common.ExecutorStagePrepare
e.Config = *config
e.Build = build
e.BuildTrace = build.Trace
e.BuildLogger = common.NewBuildLogger(build.Trace, build.Log())
e.Config = *options.Config
e.Build = options.Build
e.BuildTrace = options.Trace
e.BuildLogger = common.NewBuildLogger(options.Build.Trace, options.Build.Log())
err := e.startBuild()
if err != nil {
......
......@@ -83,8 +83,8 @@ func (s *executor) setupResources() error {
return nil
}
func (s *executor) Prepare(globalConfig *common.Config, config *common.RunnerConfig, job *common.Build) (err error) {
if err = s.AbstractExecutor.Prepare(globalConfig, config, job); err != nil {
func (s *executor) Prepare(options common.ExecutorPrepareOptions) (err error) {
if err = s.AbstractExecutor.Prepare(options); err != nil {
return err
}
......@@ -92,7 +92,7 @@ func (s *executor) Prepare(globalConfig *common.Config, config *common.RunnerCon
return fmt.Errorf("kubernetes doesn't support shells that require script file")
}
if s.kubeClient, err = getKubeClient(config.Kubernetes); err != nil {
if s.kubeClient, err = getKubeClient(options.Config.Kubernetes); err != nil {
return fmt.Errorf("error connecting to Kubernetes: %s", err.Error())
}
......@@ -104,11 +104,11 @@ func (s *executor) Prepare(globalConfig *common.Config, config *common.RunnerCon
return err
}
if err = s.overwriteNamespace(job); err != nil {
if err = s.overwriteNamespace(options.Build); err != nil {
return err
}
s.prepareOptions(job)
s.prepareOptions(options.Build)
if err = s.checkDefaults(); err != nil {
return err
......
......@@ -153,8 +153,8 @@ func (s *executor) createVM() error {
return nil
}
func (s *executor) Prepare(globalConfig *common.Config, config *common.RunnerConfig, build *common.Build) error {
err := s.AbstractExecutor.Prepare(globalConfig, config, build)
func (s *executor) Prepare(options common.ExecutorPrepareOptions) error {
err := s.AbstractExecutor.Prepare(options)
if err != nil {
return err
}
......
......@@ -21,9 +21,9 @@ type executor struct {
executors.AbstractExecutor
}
func (s *executor) Prepare(globalConfig *common.Config, config *common.RunnerConfig, build *common.Build) error {
if globalConfig != nil {
s.Shell().User = globalConfig.User
func (s *executor) Prepare(options common.ExecutorPrepareOptions) error {
if options.User != "" {
s.Shell().User = options.User
}
// expand environment variables to have current directory
......@@ -45,7 +45,7 @@ func (s *executor) Prepare(globalConfig *common.Config, config *common.RunnerCon
s.DefaultCacheDir = os.Expand(s.DefaultCacheDir, mapping)
// Pass control to executor
err = s.AbstractExecutor.Prepare(globalConfig, config, build)
err = s.AbstractExecutor.Prepare(options)
if err != nil {
return err
}
......
......@@ -13,8 +13,8 @@ type executor struct {
sshCommand ssh.Client
}
func (s *executor) Prepare(globalConfig *common.Config, config *common.RunnerConfig, build *common.Build) error {
err := s.AbstractExecutor.Prepare(globalConfig, config, build)
func (s *executor) Prepare(options common.ExecutorPrepareOptions) error {
err := s.AbstractExecutor.Prepare(options)
if err != nil {
return err
}
......
......@@ -147,8 +147,8 @@ func (s *executor) createVM(vmName string) (err error) {
return nil
}
func (s *executor) Prepare(globalConfig *common.Config, config *common.RunnerConfig, build *common.Build) error {
err := s.AbstractExecutor.Prepare(globalConfig, config, build)
func (s *executor) Prepare(options common.ExecutorPrepareOptions) error {
err := s.AbstractExecutor.Prepare(options)
if err != nil {
return err
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment