Add web terminal for docker executor

Use reflect.DeepEqual for session.closeTerminalConn since the struct
might have slices which are not comparable.

Make the docker executor implement the `session.Conn` &
`session.InteractiveTerminal` interface to support interactive web
terminals.

Update `gitlab-terminal` package to version to support docker stream.
parent c11d6abc
......@@ -624,11 +624,11 @@
revision = "a7cf72d604cdf0af6031dd5d54a4e513abeff0d4"
[[projects]]
digest = "1:a0d8ffbd3d159d9d2f43aaca906c5c16d65cec791c5ab3259194f0113d10ce5e"
digest = "1:3b5bf7524bfc4a050d46210fa51e7e92f2371dcf64d86f15a9d70e2ceea26db1"
name = "gitlab.com/gitlab-org/gitlab-terminal"
packages = ["."]
pruneopts = "N"
revision = "d523b4fd2bb3c8728724dce365809e09113430a9"
revision = "5af59b871b1bcc3f4b733f6db0ff3b6e8b247b92"
[[projects]]
digest = "1:90a0e11d13444dbf388ada729d5a9b8c57355de59924cb402c2a7711c3c306e6"
......
......@@ -148,7 +148,7 @@ ignored = ["test", "appengine"]
[[constraint]]
name = "gitlab.com/gitlab-org/gitlab-terminal"
revision = "d523b4fd2bb3c8728724dce365809e09113430a9"
revision = "5af59b871b1bcc3f4b733f6db0ff3b6e8b247b92"
##
## Refrain innovations ;)
......
......@@ -95,7 +95,7 @@ Supported features by different executors:
| Absolute paths: caching, artifacts | ✗ | ✗ | ✗ | ✗ | ✗ | ✓ |
| Passing artifacts between stages | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
| Use GitLab Container Registry private images | n/a | n/a | n/a | n/a | ✓ | ✓ |
| Interactive Web terminal | ✗ | ✓ (bash)| ✗ | ✗ | | ✓ |
| Interactive Web terminal | ✗ | ✓ (bash)| ✗ | ✗ | | ✓ |
Supported systems by different shells:
......
......@@ -9,3 +9,5 @@ const prebuiltImageName = "gitlab/gitlab-runner-helper"
const prebuiltImageExtension = ".tar.xz"
const dockerCleanupTimeout = 5 * time.Minute
const waitForContainerTimeout = 15 * time.Second
......@@ -983,6 +983,11 @@ func (e *executor) waitForContainer(id string) error {
continue
}
// If container has finished, kill session.
if e.Build.Session != nil {
e.Build.Session.Kill()
}
if container.State.ExitCode != 0 {
return &common.BuildError{
Inner: fmt.Errorf("exit code %d", container.State.ExitCode),
......
......@@ -118,6 +118,8 @@ func init() {
features.Variables = true
features.Image = true
features.Services = true
features.Session = true
features.Terminal = true
}
common.RegisterExecutor("docker", executors.DefaultExecutorProvider{
......
package docker
import (
"context"
"errors"
"net/http"
"time"
"github.com/docker/docker/api/types"
"gitlab.com/gitlab-org/gitlab-terminal"
"gitlab.com/gitlab-org/gitlab-runner/common"
"gitlab.com/gitlab-org/gitlab-runner/helpers/docker"
terminalsession "gitlab.com/gitlab-org/gitlab-runner/session/terminal"
)
func (s *commandExecutor) Connect() (terminalsession.Conn, error) {
// Waiting for the container to start, is not ideal as it might be hiding a
// real issue and the user is not aware of it. Ideally, the runner should
// inform the user in an interactive way that the container has no started
// yet and should wait/try again. This isn't an easy task to do since we
// can't access the WebSocket here since that is the responsibility of
// `gitlab-terminal` package. There are plans to improve this please take a
// look at https://gitlab.com/gitlab-org/gitlab-ce/issues/50384#proposal and
// https://gitlab.com/gitlab-org/gitlab-terminal/issues/4
containerStarted := make(chan struct{})
containerStartedErr := make(chan error)
go func() {
for {
if s.buildContainer == nil {
time.Sleep(10 * time.Millisecond)
continue
}
if s.buildContainer != nil {
container, err := s.client.ContainerInspect(s.Context, s.buildContainer.ID)
if err != nil {
containerStartedErr <- err
break
}
if container.State.Running {
containerStarted <- struct{}{}
break
}
continue
}
}
}()
ctx, cancel := context.WithTimeout(s.Context, waitForContainerTimeout)
defer cancel()
select {
case <-containerStarted:
return terminalConn{
logger: &s.BuildLogger,
ctx: s.Context,
client: s.client,
containerID: s.buildContainer.ID,
shell: s.BuildShell.DockerCommand,
}, nil
case err := <-containerStartedErr:
return nil, err
case <-ctx.Done():
s.Errorln("Timed out waiting for the container to start the terminal. Please retry")
return nil, errors.New("timeout for waiting for container")
}
}
type terminalConn struct {
logger *common.BuildLogger
ctx context.Context
client docker_helpers.Client
containerID string
shell []string
}
func (t terminalConn) Start(w http.ResponseWriter, r *http.Request, timeoutCh, disconnectCh chan error) {
execConfig := types.ExecConfig{
Tty: true,
AttachStdin: true,
AttachStderr: true,
AttachStdout: true,
Cmd: t.shell,
}
exec, err := t.client.ContainerExecCreate(t.ctx, t.containerID, execConfig)
if err != nil {
t.logger.Errorln("failed to create exec container for terminal:", err)
}
resp, err := t.client.ContainerExecAttach(t.ctx, exec.ID, execConfig)
if err != nil {
t.logger.Errorln("failed to exec attach to container for terminal:", err)
}
dockerTTY := newDockerTTY(&resp)
proxy := terminal.NewStreamProxy(1) // one stopper: terminal exit handler
terminalsession.ProxyTerminal(
timeoutCh,
disconnectCh,
proxy.StopCh,
func() {
terminal.ProxyStream(w, r, dockerTTY, proxy)
},
)
}
func (t terminalConn) Close() error {
return nil
}
package docker
import (
"bytes"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strings"
"testing"
"time"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-runner/common"
"gitlab.com/gitlab-org/gitlab-runner/helpers"
"gitlab.com/gitlab-org/gitlab-runner/session"
)
func TestInteractiveTerminal(t *testing.T) {
if helpers.SkipIntegrationTests(t, "docker", "info") {
return
}
successfulBuild, err := common.GetRemoteLongRunningBuild()
assert.NoError(t, err)
sess, err := session.NewSession(nil)
require.NoError(t, err)
build := &common.Build{
JobResponse: successfulBuild,
Runner: &common.RunnerConfig{
RunnerSettings: common.RunnerSettings{
Executor: "docker",
Docker: &common.DockerConfig{
Image: common.TestAlpineImage,
PullPolicy: common.PullPolicyIfNotPresent,
},
},
},
Session: sess,
}
// Start build
buildLogWriter := bytes.NewBuffer(nil)
go func() {
err = build.Run(&common.Config{}, &common.Trace{Writer: buildLogWriter})
require.NoError(t, err)
}()
buildStarted := make(chan struct{})
go func() {
for {
if buildLogWriter.Len() == 0 {
time.Sleep(1 * time.Second)
continue
}
ln, _ := buildLogWriter.ReadBytes('\n')
// Print out to the user to aid debugging
if len(ln) > 0 {
fmt.Fprint(os.Stdout, string(ln))
}
// We signal that the build has start and we can start using terminal.
if strings.Contains(string(ln), "sleep") {
buildStarted <- struct{}{}
}
}
}()
<-buildStarted
srv := httptest.NewServer(build.Session.Mux())
defer srv.Close()
u := url.URL{Scheme: "ws", Host: srv.Listener.Addr().String(), Path: build.Session.Endpoint + "/exec"}
conn, resp, err := websocket.DefaultDialer.Dial(u.String(), http.Header{"Authorization": []string{build.Session.Token}})
require.NoError(t, err)
assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode)
defer func() {
if conn != nil {
defer conn.Close()
}
}()
err = conn.WriteMessage(websocket.BinaryMessage, []byte("uname\n"))
require.NoError(t, err)
var unameResult string
for i := 0; i < 3; i++ {
typ, b, err := conn.ReadMessage()
require.NoError(t, err)
assert.Equal(t, websocket.BinaryMessage, typ)
unameResult = string(b)
}
assert.Contains(t, unameResult, "Linux")
}
func TestInteractiveWebTerminalWaitForContainerTimeout(t *testing.T) {
successfulBuild, err := common.GetRemoteLongRunningBuild()
assert.NoError(t, err)
sess, err := session.NewSession(nil)
require.NoError(t, err)
build := &common.Build{
JobResponse: successfulBuild,
Runner: &common.RunnerConfig{
RunnerSettings: common.RunnerSettings{
Executor: "docker",
Docker: &common.DockerConfig{
Image: common.TestAlpineImage,
PullPolicy: common.PullPolicyIfNotPresent,
},
},
},
Session: sess,
}
}
func TestInteractiveWebTerminalAttachStrategy(t *testing.T) {
if helpers.SkipIntegrationTests(t, "docker", "info") {
return
}
successfulBuild, err := common.GetRemoteSuccessfulBuild()
assert.NoError(t, err)
sess, err := session.NewSession(nil)
require.NoError(t, err)
build := &common.Build{
JobResponse: successfulBuild,
Runner: &common.RunnerConfig{
RunnerSettings: common.RunnerSettings{
Executor: "docker",
Docker: &common.DockerConfig{
Image: common.TestAlpineImage,
PullPolicy: common.PullPolicyIfNotPresent,
},
},
},
Session: sess,
}
err = build.Run(&common.Config{}, &common.Trace{Writer: os.Stdout})
require.NoError(t, err)
require.False(t, build.Session.Connected())
}
package docker
import "github.com/docker/docker/api/types"
func newDockerTTY(hijackedResp *types.HijackedResponse) *dockerTTY {
return &dockerTTY{
hijackedResp: hijackedResp,
}
}
type dockerTTY struct {
hijackedResp *types.HijackedResponse
}
func (d *dockerTTY) Read(p []byte) (int, error) {
return d.hijackedResp.Reader.Read(p)
}
func (d *dockerTTY) Write(p []byte) (int, error) {
return d.hijackedResp.Conn.Write(p)
}
func (d *dockerTTY) Close() error {
d.hijackedResp.Close()
return nil
}
......@@ -23,6 +23,8 @@ type Client interface {
ContainerAttach(ctx context.Context, container string, options types.ContainerAttachOptions) (types.HijackedResponse, error)
ContainerRemove(ctx context.Context, containerID string, options types.ContainerRemoveOptions) error
ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error)
ContainerExecCreate(ctx context.Context, container string, config types.ExecConfig) (types.IDResponse, error)
ContainerExecAttach(ctx context.Context, execID string, config types.ExecConfig) (types.HijackedResponse, error)
NetworkDisconnect(ctx context.Context, networkID, containerID string, force bool) error
NetworkList(ctx context.Context, options types.NetworkListOptions) ([]types.NetworkResource, error)
......
......@@ -72,6 +72,48 @@ func (_m *MockClient) ContainerCreate(ctx context.Context, config *container.Con
return r0, r1
}
// ContainerExecAttach provides a mock function with given fields: ctx, execID, config
func (_m *MockClient) ContainerExecAttach(ctx context.Context, execID string, config types.ExecConfig) (types.HijackedResponse, error) {
ret := _m.Called(ctx, execID, config)
var r0 types.HijackedResponse
if rf, ok := ret.Get(0).(func(context.Context, string, types.ExecConfig) types.HijackedResponse); ok {
r0 = rf(ctx, execID, config)
} else {
r0 = ret.Get(0).(types.HijackedResponse)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, string, types.ExecConfig) error); ok {
r1 = rf(ctx, execID, config)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ContainerExecCreate provides a mock function with given fields: ctx, _a1, config
func (_m *MockClient) ContainerExecCreate(ctx context.Context, _a1 string, config types.ExecConfig) (types.IDResponse, error) {
ret := _m.Called(ctx, _a1, config)
var r0 types.IDResponse
if rf, ok := ret.Get(0).(func(context.Context, string, types.ExecConfig) types.IDResponse); ok {
r0 = rf(ctx, _a1, config)
} else {
r0 = ret.Get(0).(types.IDResponse)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, string, types.ExecConfig) error); ok {
r1 = rf(ctx, _a1, config)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ContainerInspect provides a mock function with given fields: ctx, containerID
func (_m *MockClient) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) {
ret := _m.Called(ctx, containerID)
......
......@@ -122,6 +122,18 @@ func (c *officialDockerClient) ContainerLogs(ctx context.Context, container stri
return rc, wrapError("ContainerLogs", err, started)
}
func (c *officialDockerClient) ContainerExecCreate(ctx context.Context, container string, config types.ExecConfig) (types.IDResponse, error) {
started := time.Now()
resp, err := c.client.ContainerExecCreate(ctx, container, config)
return resp, wrapError("ContainerExecCreate", err, started)
}
func (c *officialDockerClient) ContainerExecAttach(ctx context.Context, execID string, config types.ExecConfig) (types.HijackedResponse, error) {
started := time.Now()
resp, err := c.client.ContainerExecAttach(ctx, execID, config)
return resp, wrapError("ContainerExecAttach", err, started)
}
func (c *officialDockerClient) NetworkDisconnect(ctx context.Context, networkID string, containerID string, force bool) error {
started := time.Now()
err := c.client.NetworkDisconnect(ctx, networkID, containerID, force)
......
......@@ -2,6 +2,7 @@ package session
import (
"net/http"
"reflect"
"sync"
"github.com/gorilla/websocket"
......@@ -166,7 +167,7 @@ func (s *Session) closeTerminalConn(conn terminal.Conn) {
s.log.WithError(err).Warn("Failed to close terminal connection")
}
if s.terminalConn == conn {
if reflect.DeepEqual(s.terminalConn, conn) {
s.terminalConn = nil
}
}
......
The MIT License (MIT)
Copyright (c) 2018 GitLab B.V.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
package terminal
import (
"fmt"
"io"
)
type StreamProxy struct {
StopCh chan error
}
func NewStreamProxy(stoppers int) *StreamProxy {
return &StreamProxy{
StopCh: make(chan error, stoppers+2), // each proxy() call is a stopper
}
}
func (p *StreamProxy) GetStopCh() chan error {
return p.StopCh
}
func (p *StreamProxy) Serve(client io.ReadWriter, server io.ReadWriter) error {
go p.proxy(client, server)
go p.proxy(server, client)
err := <-p.StopCh
return err
}
func (p *StreamProxy) proxy(to, from io.ReadWriter) {
_, err := io.Copy(to, from)
if err != nil {
p.StopCh <- fmt.Errorf("failed to pipe stream: %v", err)
}
}
package terminal
import (
"bytes"
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type chStringReadWriter struct {
wrieDone chan struct{}
bytes.Buffer
}
func (c *chStringReadWriter) Write(p []byte) (int, error) {
defer func() {
c.wrieDone <- struct{}{}
}()
return c.Buffer.Write(p)
}
func TestServe(t *testing.T) {
encProxy := NewStreamProxy(1)
downstream := bytes.Buffer{}
upstream := chStringReadWriter{
wrieDone: make(chan struct{}),
}
writeString := []byte("data from downstream")
downstream.Write([]byte(writeString))
go func() {
err := encProxy.Serve(&upstream, &downstream)
if err != nil {
t.Fatalf("unexpected error from serve: %v", err)
}
}()
// Wait until the write is done
<-upstream.wrieDone
b := make([]byte, 20)
_, err := upstream.Read(b)
require.NoError(t, err)
assert.Equal(t, writeString, b)
}
func TestServeError(t *testing.T) {
encProxy := NewStreamProxy(1)
downstream := errorReadWriter{}
upstream := bytes.Buffer{}
err := encProxy.Serve(&upstream, &downstream)
assert.Error(t, err)
}
type errorReadWriter struct {
}
func (rw *errorReadWriter) Read(p []byte) (int, error) {
return 0, errors.New("failed to read")
}
func (rw *errorReadWriter) Write(p []byte) (int, error) {
return 0, errors.New("failed to read")
}
package terminal
import (
"errors"
"fmt"
"io"
"net/http"
"os"
"time"
log "github.com/sirupsen/logrus"
"github.com/gorilla/websocket"
"fmt"
"errors"
log "github.com/sirupsen/logrus"
)
var (
// See doc/terminal.md for documentation of this subprotocol
subprotocols = []string{"terminal.gitlab.com", "base64.terminal.gitlab.com"}
upgrader = &websocket.Upgrader{Subprotocols: subprotocols}
BrowserPingInterval = 30 * time.Second
subprotocols = []string{"terminal.gitlab.com", "base64.terminal.gitlab.com"}
upgrader = &websocket.Upgrader{Subprotocols: subprotocols}
BrowserPingInterval = 30 * time.Second
)
// ProxyStream takes the given request, upgrades the connection to a WebSocket
// connection, and also takes a dst ReadWriteCloser where a
// bi-directional stream is set up, were the STDIN of the WebSocket it sent
// dst and the STDOUT/STDERR of dst is written to the WebSocket
// connection. The messages to the WebSocket are encoded into binary text.
func ProxyStream(w http.ResponseWriter, r *http.Request, stream io.ReadWriteCloser, proxy *StreamProxy) {
clientAddr := getClientAddr(r) // We can't know the port with confidence
logger := log.WithFields(log.Fields{
"clientAddr": clientAddr,
"pkg": "terminal",
})
clientConn, err := upgradeClient(w, r)
if err != nil {
logger.WithError(err).Error("failed to upgrade client connection to websocket")
return
}
defer func() {
err := clientConn.UnderlyingConn().Close()
if err != nil {
logger.WithError(err).Error("failed to close client connection")
}
err = stream.Close()
if err != nil {
logger.WithError(err).Error("failed to close stream")
}
}()
client := NewIOWrapper(clientConn)
// Regularly send ping messages to the browser to keep the websocket from
// being timed out by intervening proxies.
go pingLoop(client)
if err := proxy.Serve(client, stream); err != nil {
logger.WithError(err).Error("failed to proxy stream")
}
}
// ProxyWebSocket takes the given request, upgrades the connection to a
// WebSocket connection. The terminal settings are used to connect to the
// dst WebSocket connection where it establishes a bi-directional stream
// between both web sockets.
func ProxyWebSocket(w http.ResponseWriter, r *http.Request, terminal *TerminalSettings, proxy *WebSocketProxy) {
server, err := connectToServer(terminal, r)
if err != nil {
......@@ -55,6 +102,10 @@ func ProxyWebSocket(w http.ResponseWriter, r *http.Request, terminal *TerminalSe
}
}
// ProxyFileDescriptor takes the given request, upgrades the connection to a
// WebSocket connection. A bi-directional stream is opened between the WebSocket
// and FileDescriptor that pipes the STDIN from the WebSocket to the
// FileDescriptor , and STDERR/STDOUT back to the WebSocket.
func ProxyFileDescriptor(w http.ResponseWriter, r *http.Request, fd *os.File, proxy *FileDescriptorProxy) {
clientConn, err := upgradeClient(w, r)
if err != nil {
......
package terminal
import (
"bytes"
"net/http"
"net/http/httptest"
"testing"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const StreamMessage = "this is a test"
func TestProxyStream(t *testing.T) {
downstream := bufferCloser{}
downstream.Write([]byte(StreamMessage))
srv := streamServer{
downstream: downstream,
}
s := httptest.NewServer(&srv)
defer s.Close()
c, _, err := websocket.DefaultDialer.Dial("ws://"+s.Listener.Addr().String()+"/ws", nil)
require.NoError(t, err)