Commit 2979aa06 authored by Francisco Javier López's avatar Francisco Javier López 🌴 Committed by Kamil Trzciński

CI Web Terminal

parent a6f8ca8d
......@@ -283,6 +283,12 @@
packages = ["."]
revision = "599cba5e7b6137d46ddf58fb1765f5d928e69604"
[[projects]]
name = "github.com/gorilla/websocket"
packages = ["."]
revision = "ea4d1f681babbce9545c9c5f3d5194a789c89f5b"
version = "v1.2.0"
[[projects]]
name = "github.com/hashicorp/go-version"
packages = ["."]
......@@ -315,6 +321,12 @@
packages = ["."]
revision = "c2c54e542fb797ad986b31721e1baedf214ca413"
[[projects]]
name = "github.com/kr/pty"
packages = ["."]
revision = "282ce0e5322c82529687d609ee670fac7c7d917c"
version = "v1.1.1"
[[projects]]
branch = "master"
name = "github.com/mattn/go-zglob"
......@@ -469,6 +481,11 @@
packages = ["."]
revision = "a7cf72d604cdf0af6031dd5d54a4e513abeff0d4"
[[projects]]
name = "gitlab.com/gitlab-org/gitlab-terminal"
packages = ["."]
revision = "d523b4fd2bb3c8728724dce365809e09113430a9"
[[projects]]
name = "golang.org/x/crypto"
packages = [
......@@ -514,6 +531,22 @@
]
revision = "042a8f53ce82bbe081222da955159491e32146a0"
[[projects]]
name = "golang.org/x/text"
packages = [
"encoding",
"encoding/internal",
"encoding/internal/identifier",
"encoding/unicode",
"internal/gen",
"internal/utf8internal",
"runes",
"transform",
"unicode/cldr"
]
revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0"
version = "v0.3.0"
[[projects]]
branch = "master"
name = "golang.org/x/time"
......@@ -698,6 +731,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "2f7c08f6c7e538cf19f9a9835d18574927a03b07f30557ea80c97d7a6151a875"
inputs-digest = "a2c6b9459afdbd0f1d9b17239b350f28df28a2db1a704b88d424fba2636280de"
solver-name = "gps-cdcl"
solver-version = 1
......@@ -280,3 +280,7 @@ ignored = ["test", "appengine"]
[[override]]
name = "google.golang.org/appengine"
revision = "e951d3868b377b14f4e60efa3a301532ee3c1ebf"
[[constraint]]
name = "gitlab.com/gitlab-org/gitlab-terminal"
revision = "d523b4fd2bb3c8728724dce365809e09113430a9"
......@@ -153,9 +153,12 @@ mocks: $(MOCKERY)
find . -type f ! -path '*vendor/*' -name 'mock_*' -delete
GOPATH=$(ORIGINAL_GOPATH) mockery $(MOCKERY_FLAGS) -dir=./vendor/github.com/ayufan/golang-kardianos-service -output=./helpers/service/mocks -name='(Interface|Logger)'
GOPATH=$(ORIGINAL_GOPATH) mockery $(MOCKERY_FLAGS) -dir=./helpers/docker -all -inpkg
GOPATH=$(ORIGINAL_GOPATH) mockery $(MOCKERY_FLAGS) -dir=./helpers/certificate -all -inpkg
GOPATH=$(ORIGINAL_GOPATH) mockery $(MOCKERY_FLAGS) -dir=./common -all -inpkg
GOPATH=$(ORIGINAL_GOPATH) mockery $(MOCKERY_FLAGS) -dir=./session -all -inpkg
GOPATH=$(ORIGINAL_GOPATH) mockery $(MOCKERY_FLAGS) -dir=./shells -all -inpkg
test-docker:
make test-docker-image IMAGE=centos:6 TYPE=rpm
make test-docker-image IMAGE=centos:7 TYPE=rpm
......
......@@ -3,10 +3,12 @@ package commands
import (
"fmt"
"net/http"
"strings"
"sync"
"gitlab.com/gitlab-org/gitlab-runner/common"
"gitlab.com/gitlab-org/gitlab-runner/helpers"
"gitlab.com/gitlab-org/gitlab-runner/session"
"github.com/prometheus/client_golang/prometheus"
)
......@@ -74,6 +76,19 @@ func (b *buildsHelper) getRunnerCounter(runner *common.RunnerConfig) *runnerCoun
return counter
}
func (b *buildsHelper) findSessionByURL(url string) *session.Session {
b.lock.Lock()
defer b.lock.Unlock()
for _, build := range b.builds {
if strings.HasPrefix(url, build.Session.Endpoint+"/") {
return build.Session
}
}
return nil
}
func (b *buildsHelper) acquireBuild(runner *common.RunnerConfig) bool {
b.lock.Lock()
defer b.lock.Unlock()
......
......@@ -6,6 +6,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-runner/session"
"gitlab.com/gitlab-org/gitlab-runner/common"
)
......@@ -119,6 +120,23 @@ func TestBuildsHelperAcquireBuildUnlimited(t *testing.T) {
require.True(t, result)
}
func TestBuildsHelperFindSessionByURL(t *testing.T) {
sess, err := session.NewSession(nil)
require.NoError(t, err)
build := common.Build{
Session: sess,
}
h := &buildsHelper{}
h.addBuild(&build)
foundSession := h.findSessionByURL(sess.Endpoint + "/action")
assert.Equal(t, sess, foundSession)
foundSession = h.findSessionByURL("/session/hash/action")
assert.Nil(t, foundSession)
}
var testBuildCurrentID int
func getTestBuild() *common.Build {
......
......@@ -5,27 +5,28 @@ import (
"fmt"
"net"
"net/http"
_ "net/http/pprof" // PPROF package adds everything itself inside its init() function
_ "net/http/pprof" // pprof package adds everything itself inside its init() function
"os"
"os/signal"
"runtime"
"syscall"
"time"
service "github.com/ayufan/golang-kardianos-service"
"github.com/ayufan/golang-kardianos-service"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/urfave/cli"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli"
"gitlab.com/gitlab-org/gitlab-runner/common"
"gitlab.com/gitlab-org/gitlab-runner/helpers"
"gitlab.com/gitlab-org/gitlab-runner/helpers/certificate"
"gitlab.com/gitlab-org/gitlab-runner/helpers/cli"
prometheus_helper "gitlab.com/gitlab-org/gitlab-runner/helpers/prometheus"
"gitlab.com/gitlab-org/gitlab-runner/helpers/sentry"
"gitlab.com/gitlab-org/gitlab-runner/helpers/service"
"gitlab.com/gitlab-org/gitlab-runner/network"
"gitlab.com/gitlab-org/gitlab-runner/session"
)
type RunCommand struct {
......@@ -46,6 +47,8 @@ type RunCommand struct {
failuresCollector *prometheus_helper.FailuresCollector
networkRequestStatusesCollector prometheus.Collector
sessionServer *session.Server
// abortBuilds is used to abort running builds
abortBuilds chan os.Signal
......@@ -58,8 +61,9 @@ type RunCommand struct {
// stopSignals is to catch a signals notified to process: SIGTERM, SIGQUIT, Interrupt, Kill
stopSignals chan os.Signal
// stopSignal is used to preserve the signal that was used to stop the process
// In case this is SIGQUIT it makes to finish all buids
// stopSignal is used to preserve the signal that was used to stop the
// process In case this is SIGQUIT it makes to finish all builds and session
// server.
stopSignal os.Signal
// runFinished is used to notify that Run() did finish
......@@ -101,13 +105,13 @@ func (mr *RunCommand) feedRunners(runners chan *common.RunnerConfig) {
}
}
func (mr *RunCommand) requestJob(runner *common.RunnerConfig) (*common.JobResponse, bool) {
func (mr *RunCommand) requestJob(runner *common.RunnerConfig, sessionInfo *common.SessionInfo) (*common.JobResponse, bool) {
if !mr.buildsHelper.acquireRequest(runner) {
return nil, false
}
defer mr.buildsHelper.releaseRequest(runner)
jobData, healthy := mr.network.RequestJob(*runner)
jobData, healthy := mr.network.RequestJob(*runner, sessionInfo)
mr.makeHealthy(runner.UniqueID(), healthy)
return jobData, true
}
......@@ -133,8 +137,15 @@ func (mr *RunCommand) processRunner(id int, runner *common.RunnerConfig, runners
}
defer mr.buildsHelper.releaseBuild(runner)
var features common.FeaturesInfo
provider.GetFeatures(&features)
buildSession, sessionInfo, err := mr.createSession(features)
if err != nil {
return
}
// Receive a new build
jobData, result := mr.requestJob(runner)
jobData, result := mr.requestJob(runner, sessionInfo)
if !result {
mr.log().WithField("runner", runner.ShortDescription()).
Debugln("Failed to request job: runner requestConcurrency meet")
......@@ -160,6 +171,7 @@ func (mr *RunCommand) processRunner(id int, runner *common.RunnerConfig, runners
Runner: runner,
ExecutorData: context,
SystemInterrupt: mr.abortBuilds,
Session: buildSession,
}
// Add build to list of builds to assign numbers
......@@ -180,6 +192,25 @@ func (mr *RunCommand) processRunner(id int, runner *common.RunnerConfig, runners
return build.Run(mr.config, trace)
}
func (mr *RunCommand) createSession(features common.FeaturesInfo) (*session.Session, *common.SessionInfo, error) {
if mr.sessionServer == nil || !features.Session {
return nil, nil, nil
}
sess, err := session.NewSession(mr.log())
if err != nil {
return nil, nil, err
}
sessionInfo := &common.SessionInfo{
URL: mr.sessionServer.AdvertiseAddress + sess.Endpoint,
Certificate: string(mr.sessionServer.CertificatePublicKey),
Authorization: sess.Token,
}
return sess, sessionInfo, err
}
func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan *common.RunnerConfig) {
mr.log().WithField("worker", id).Debugln("Starting worker")
for mr.stopSignal == nil {
......@@ -347,7 +378,7 @@ func (mr *RunCommand) runWait() {
mr.stopSignal = <-mr.stopSignals
}
func (mr *RunCommand) serveMetrics() {
func (mr *RunCommand) serveMetrics(mux *http.ServeMux) {
registry := prometheus.NewRegistry()
// Metrics about the runner's business logic.
registry.MustRegister(&mr.buildsHelper)
......@@ -371,11 +402,11 @@ func (mr *RunCommand) serveMetrics() {
}
}
http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
mux.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
}
func (mr *RunCommand) serveDebugData() {
http.Handle("/debug/jobs/list", http.HandlerFunc(mr.buildsHelper.ListJobsHandler))
func (mr *RunCommand) serveDebugData(mux *http.ServeMux) {
mux.Handle("/debug/jobs/list", http.HandlerFunc(mr.buildsHelper.ListJobsHandler))
}
func (mr *RunCommand) setupMetricsAndDebugServer() {
......@@ -398,18 +429,54 @@ func (mr *RunCommand) setupMetricsAndDebugServer() {
log.Fatalln(err)
}
mux := http.NewServeMux()
go func() {
log.Fatalln(http.Serve(listener, nil))
log.Fatalln(http.Serve(listener, mux))
}()
mr.serveMetrics()
mr.serveDebugData()
mr.serveMetrics(mux)
mr.serveDebugData(mux)
log.Infoln("Metrics server listening at", listenAddress)
}
func (mr *RunCommand) setupSessionServer() {
if mr.config.SessionServer.ListenAddress == "" {
mr.log().Info("Listen address not defined, session server disabled")
return
}
var err error
mr.sessionServer, err = session.NewServer(
session.ServerConfig{
AdvertiseAddress: mr.config.SessionServer.AdvertiseAddress,
ListenAddress: mr.config.SessionServer.ListenAddress,
ShutdownTimeout: common.ShutdownTimeout * time.Second,
},
mr.log(),
certificate.X509Generator{},
mr.buildsHelper.findSessionByURL,
)
if err != nil {
mr.log().WithError(err).Fatal("Failed to create session server")
}
go func() {
err := mr.sessionServer.Start()
if err != nil {
mr.log().Fatal(err)
}
}()
mr.log().
WithField("address", mr.config.SessionServer.ListenAddress).
Info("Session server listening")
}
func (mr *RunCommand) Run() {
mr.setupMetricsAndDebugServer()
mr.setupSessionServer()
runners := make(chan *common.RunnerConfig)
go mr.feedRunners(runners)
......@@ -482,6 +549,10 @@ func (mr *RunCommand) handleShutdown() error {
go mr.abortAllBuilds()
if mr.sessionServer != nil {
mr.sessionServer.Close()
}
// Wait for graceful shutdown or abort after timeout
for {
select {
......
......@@ -68,7 +68,7 @@ func (r *RunSingleCommand) postBuild() {
}
func (r *RunSingleCommand) processBuild(data common.ExecutorData, abortSignal chan os.Signal) (err error) {
jobData, healthy := r.network.RequestJob(r.RunnerConfig)
jobData, healthy := r.network.RequestJob(r.RunnerConfig, nil)
if !healthy {
log.Println("Runner is not healthy!")
select {
......
......@@ -76,7 +76,7 @@ func mockingExecutionStack(t *testing.T, executorName string, maxBuilds int, job
jobData := common.JobResponse{}
_, cancel := context.WithCancel(context.Background())
jobTrace := common.Trace{Writer: ioutil.Discard, CancelFunc: cancel}
mockNetwork.On("RequestJob", mock.Anything).Return(&jobData, true).Times(maxBuilds)
mockNetwork.On("RequestJob", mock.Anything, mock.Anything).Return(&jobData, true).Times(maxBuilds)
processJob := mockNetwork.On("ProcessJob", mock.Anything, mock.Anything).Return(&jobTrace).Times(maxBuilds)
if job != nil {
processJob.Run(job)
......
......@@ -15,6 +15,8 @@ import (
"gitlab.com/gitlab-org/gitlab-runner/helpers"
"gitlab.com/gitlab-org/gitlab-runner/helpers/tls"
"gitlab.com/gitlab-org/gitlab-runner/session"
"gitlab.com/gitlab-org/gitlab-runner/session/terminal"
)
type GitStrategy int
......@@ -80,6 +82,8 @@ type Build struct {
CurrentStage BuildStage
CurrentState BuildRuntimeState
Session *session.Session
executorStageResolver func() ExecutorStage
logger BuildLogger
allVariables JobVariables
......@@ -276,6 +280,10 @@ func (b *Build) run(ctx context.Context, executor Executor) (err error) {
runContext, runCancel := context.WithCancel(context.Background())
defer runCancel()
if term, ok := executor.(terminal.InteractiveTerminal); b.Session != nil && ok {
b.Session.SetInteractiveTerminal(term)
}
// Run build script
go func() {
buildFinish <- b.executeScript(runContext, executor)
......@@ -335,6 +343,38 @@ func (b *Build) retryCreateExecutor(options ExecutorPrepareOptions, provider Exe
return
}
func (b *Build) waitForTerminal(timeout time.Duration) {
if b.Session == nil || !b.Session.Connected() {
return
}
b.logger.Infoln(
fmt.Sprintf(
"Terminal is connected, will time out in %s...",
timeout,
),
)
select {
case <-time.After(timeout):
err := fmt.Errorf(
"Terminal session timed out (maximum time allowed - %s)",
timeout,
)
b.logger.Infoln(err.Error())
b.Log().WithError(err).Debugln("Connection closed")
b.Session.TimeoutCh <- err
case err := <-b.Session.DisconnectCh:
b.logger.Infoln("Terminal disconnected")
b.Log().WithError(err).Debugln("Terminal disconnected")
case signal := <-b.SystemInterrupt:
err := fmt.Errorf("aborted: %v", signal)
b.logger.Infoln("Terminal disconnected")
b.Log().WithError(err).Debugln("Terminal disconnected")
b.Session.Kill()
}
}
func (b *Build) CurrentExecutorStage() ExecutorStage {
if b.executorStageResolver == nil {
b.executorStageResolver = func() ExecutorStage {
......@@ -372,7 +412,7 @@ func (b *Build) Run(globalConfig *Config, trace JobTrace) (err error) {
}
}()
context, cancel := context.WithTimeout(context.Background(), b.GetBuildTimeout())
ctx, cancel := context.WithTimeout(context.Background(), b.GetBuildTimeout())
defer cancel()
trace.SetCancelFunc(cancel)
......@@ -382,7 +422,7 @@ func (b *Build) Run(globalConfig *Config, trace JobTrace) (err error) {
Build: b,
Trace: trace,
User: globalConfig.User,
Context: context,
Context: ctx,
}
provider := GetExecutor(b.Runner.Executor)
......@@ -394,7 +434,8 @@ func (b *Build) Run(globalConfig *Config, trace JobTrace) (err error) {
executor, err = b.retryCreateExecutor(options, provider, b.logger)
if err == nil {
err = b.run(context, executor)
err = b.run(ctx, executor)
b.waitForTerminal(globalConfig.SessionServer.GetSessionTimeout())
}
if executor != nil {
executor.Finish(err)
......
......@@ -4,13 +4,12 @@ import (