Commit 4bf5c2fd authored by Kamil Trzciński's avatar Kamil Trzciński

Add ability to mask variables.

This makes variables to be allowed to be `masked: true`.
The masked values will be replaced with `[MASKED]`.
The masking is done in-line, thus we delay sending trace
if the content is partial match for masked output.
parent 56efef02
......@@ -39,6 +39,7 @@ func TestProcessRunner_BuildLimit(t *testing.T) {
mJobTrace.On("Write", mock.Anything).Return(0, nil)
mJobTrace.On("IsStdout").Return(false)
mJobTrace.On("SetCancelFunc", mock.Anything)
mJobTrace.On("SetMasked", mock.Anything)
mJobTrace.On("Success")
mJobTrace.On("Fail", mock.Anything, mock.Anything)
......
......@@ -473,6 +473,7 @@ func (b *Build) Run(globalConfig *Config, trace JobTrace) (err error) {
defer cancel()
trace.SetCancelFunc(cancel)
trace.SetMasked(b.GetAllVariables().Masked())
options := ExecutorPrepareOptions{
Config: b.Runner,
......@@ -532,28 +533,45 @@ func (b *Build) GetSharedEnvVariable() JobVariable {
return env
}
func (b *Build) GetCITLSVariables() JobVariables {
func (b *Build) GetTLSVariables(caFile, certFile, keyFile string) JobVariables {
variables := JobVariables{}
if b.TLSCAChain != "" {
variables = append(variables, JobVariable{tls.VariableCAFile, b.TLSCAChain, true, true, true})
variables = append(variables, JobVariable{
Key: caFile,
Value: b.TLSCAChain,
Public: true,
Internal: true,
File: true,
})
}
if b.TLSAuthCert != "" && b.TLSAuthKey != "" {
variables = append(variables, JobVariable{tls.VariableCertFile, b.TLSAuthCert, true, true, true})
variables = append(variables, JobVariable{tls.VariableKeyFile, b.TLSAuthKey, true, true, true})
variables = append(variables, JobVariable{
Key: certFile,
Value: b.TLSAuthCert,
Public: true,
Internal: true,
File: true,
})
variables = append(variables, JobVariable{
Key: keyFile,
Value: b.TLSAuthKey,
Internal: true,
File: true,
})
}
return variables
}
func (b *Build) GetCITLSVariables() JobVariables {
return b.GetTLSVariables(tls.VariableCAFile, tls.VariableCertFile, tls.VariableKeyFile)
}
func (b *Build) GetGitTLSVariables() JobVariables {
variables := JobVariables{}
if b.TLSCAChain != "" {
variables = append(variables, JobVariable{"GIT_SSL_CAINFO", b.TLSCAChain, true, true, true})
}
if b.TLSAuthCert != "" && b.TLSAuthKey != "" {
variables = append(variables, JobVariable{"GIT_SSL_CERT", b.TLSAuthCert, true, true, true})
variables = append(variables, JobVariable{"GIT_SSL_KEY", b.TLSAuthKey, true, true, true})
}
return variables
return b.GetTLSVariables("GIT_SSL_CAINFO", "GIT_SSL_CERT", "GIT_SSL_KEY")
}
func (b *Build) IsSharedEnv() bool {
......
......@@ -17,6 +17,7 @@ func (fjt *fakeJobTrace) Success() {}
func (fjt *fakeJobTrace) Fail(err error, failureReason JobFailureReason) {}
func (fjt *fakeJobTrace) SetCancelFunc(cancelFunc context.CancelFunc) {}
func (fjt *fakeJobTrace) SetFailuresCollector(fc FailuresCollector) {}
func (fjt *fakeJobTrace) SetMasked(masked []string) {}
func (fjt *fakeJobTrace) IsStdout() bool { return false }
func (fjt *fakeJobTrace) Write(p []byte) (n int, err error) {
......
......@@ -292,6 +292,7 @@ func TestJobFailure(t *testing.T) {
trace.On("Write", mock.Anything).Return(0, nil)
trace.On("IsStdout").Return(true)
trace.On("SetCancelFunc", mock.Anything).Once()
trace.On("SetMasked", mock.Anything).Once()
trace.On("Fail", thrownErr, ScriptFailure).Once()
err = build.Run(&Config{}, trace)
......@@ -346,6 +347,7 @@ func TestJobFailureOnExecutionTimeout(t *testing.T) {
trace.On("Write", mock.Anything).Return(0, nil)
trace.On("IsStdout").Return(true)
trace.On("SetCancelFunc", mock.Anything).Once()
trace.On("SetMasked", mock.Anything).Once()
trace.On("Fail", mock.Anything, JobExecutionTimeout).Run(func(arguments mock.Arguments) {
assert.Error(t, arguments.Get(0).(error))
}).Once()
......@@ -680,7 +682,7 @@ func TestDebugTrace(t *testing.T) {
successfulBuild, err := GetSuccessfulBuild()
assert.NoError(t, err)
successfulBuild.Variables = append(successfulBuild.Variables, JobVariable{"CI_DEBUG_TRACE", "false", true, true, false})
successfulBuild.Variables = append(successfulBuild.Variables, JobVariable{Key: "CI_DEBUG_TRACE", Value: "false", Public: true, Internal: true})
build = &Build{
JobResponse: successfulBuild,
}
......@@ -689,7 +691,7 @@ func TestDebugTrace(t *testing.T) {
successfulBuild, err = GetSuccessfulBuild()
assert.NoError(t, err)
successfulBuild.Variables = append(successfulBuild.Variables, JobVariable{"CI_DEBUG_TRACE", "true", true, true, false})
successfulBuild.Variables = append(successfulBuild.Variables, JobVariable{Key: "CI_DEBUG_TRACE", Value: "true", Public: true, Internal: true})
build = &Build{
JobResponse: successfulBuild,
}
......
......@@ -41,6 +41,11 @@ func (_m *MockJobTrace) SetFailuresCollector(fc FailuresCollector) {
_m.Called(fc)
}
// SetMasked provides a mock function with given fields: values
func (_m *MockJobTrace) SetMasked(values []string) {
_m.Called(values)
}
// Success provides a mock function with given fields:
func (_m *MockJobTrace) Success() {
_m.Called()
......
......@@ -62,6 +62,7 @@ type FeaturesInfo struct {
Session bool `json:"session"`
Terminal bool `json:"terminal"`
Refspecs bool `json:"refspecs"`
Masking bool `json:"masking"`
}
type RegisterRunnerParameters struct {
......@@ -359,6 +360,7 @@ type JobTrace interface {
Fail(err error, failureReason JobFailureReason)
SetCancelFunc(cancelFunc context.CancelFunc)
SetFailuresCollector(fc FailuresCollector)
SetMasked(values []string)
IsStdout() bool
}
......
......@@ -23,6 +23,9 @@ func (s *Trace) Write(p []byte) (n int, err error) {
return s.Writer.Write(p)
}
func (s *Trace) SetMasked(values []string) {
}
func (s *Trace) Success() {
}
......
......@@ -13,6 +13,7 @@ type JobVariable struct {
Public bool `json:"public"`
Internal bool `json:"-"`
File bool `json:"file"`
Masked bool `json:"masked"`
}
type JobVariables []JobVariable
......@@ -64,6 +65,15 @@ func (b JobVariables) Expand() (variables JobVariables) {
return variables
}
func (b JobVariables) Masked() (masked []string) {
for _, variable := range b {
if variable.Masked {
masked = append(masked, variable.Value)
}
}
return
}
func ParseVariable(text string) (variable JobVariable, err error) {
keyValue := strings.SplitN(text, "=", 2)
if len(keyValue) != 2 {
......
......@@ -10,7 +10,7 @@ import (
func TestVariablesJSON(t *testing.T) {
var x JobVariable
data := []byte(`{"key": "FOO", "value": "bar", "public": true, "internal": true, "file": true}`)
data := []byte(`{"key": "FOO", "value": "bar", "public": true, "internal": true, "file": true, "masked": true}`)
err := json.Unmarshal(data, &x)
assert.NoError(t, err)
......@@ -19,17 +19,18 @@ func TestVariablesJSON(t *testing.T) {
assert.Equal(t, true, x.Public)
assert.Equal(t, false, x.Internal) // cannot be set from the network
assert.Equal(t, true, x.File)
assert.Equal(t, true, x.Masked)
}
func TestVariableString(t *testing.T) {
v := JobVariable{"key", "value", false, false, false}
v := JobVariable{Key: "key", Value: "value"}
assert.Equal(t, "key=value", v.String())
}
func TestPublicAndInternalVariables(t *testing.T) {
v1 := JobVariable{"key", "value", false, false, false}
v2 := JobVariable{"public", "value", true, false, false}
v3 := JobVariable{"private", "value", false, true, false}
v1 := JobVariable{Key: "key", Value: "value"}
v2 := JobVariable{Key: "public", Value: "value", Public: true}
v3 := JobVariable{Key: "private", Value: "value", Internal: true}
all := JobVariables{v1, v2, v3}
public := all.PublicOrInternal()
assert.NotContains(t, public, v1)
......@@ -37,15 +38,24 @@ func TestPublicAndInternalVariables(t *testing.T) {
assert.Contains(t, public, v3)
}
func TestMaskedVariables(t *testing.T) {
v1 := JobVariable{Key: "key", Value: "key_value"}
v2 := JobVariable{Key: "masked", Value: "masked_value", Masked: true}
all := JobVariables{v1, v2}
masked := all.Masked()
assert.NotContains(t, masked, v1.Value)
assert.Contains(t, masked, v2.Value)
}
func TestListVariables(t *testing.T) {
v := JobVariables{{"key", "value", false, false, false}}
v := JobVariables{{Key: "key", Value: "value"}}
assert.Equal(t, []string{"key=value"}, v.StringList())
}
func TestGetVariable(t *testing.T) {
v1 := JobVariable{"key", "key_value", false, false, false}
v2 := JobVariable{"public", "public_value", true, false, false}
v3 := JobVariable{"private", "private_value", false, false, false}
v1 := JobVariable{Key: "key", Value: "key_value"}
v2 := JobVariable{Key: "public", Value: "public_value", Public: true}
v3 := JobVariable{Key: "private", Value: "private_value"}
all := JobVariables{v1, v2, v3}
assert.Equal(t, "public_value", all.Get("public"))
......@@ -55,7 +65,7 @@ func TestGetVariable(t *testing.T) {
func TestParseVariable(t *testing.T) {
v, err := ParseVariable("key=value=value2")
assert.NoError(t, err)
assert.Equal(t, JobVariable{"key", "value=value2", false, false, false}, v)
assert.Equal(t, JobVariable{Key: "key", Value: "value=value2"}, v)
}
func TestInvalidParseVariable(t *testing.T) {
......@@ -65,10 +75,10 @@ func TestInvalidParseVariable(t *testing.T) {
func TestVariablesExpansion(t *testing.T) {
all := JobVariables{
{"key", "value_of_$public", false, false, false},
{"public", "some_value", true, false, false},
{"private", "value_of_${public}", false, false, false},
{"public", "value_of_$undefined", true, false, false},
{Key: "key", Value: "value_of_$public"},
{Key: "public", Value: "some_value", Public: true},
{Key: "private", Value: "value_of_${public}"},
{Key: "public", Value: "value_of_$undefined", Public: true},
}
expanded := all.Expand()
......@@ -81,10 +91,10 @@ func TestVariablesExpansion(t *testing.T) {
func TestSpecialVariablesExpansion(t *testing.T) {
all := JobVariables{
{"key", "$$", false, false, false},
{"key2", "$/dsa", true, false, false},
{"key3", "aa$@bb", false, false, false},
{"key4", "aa${@}bb", false, false, false},
{Key: "key", Value: "$$"},
{Key: "key2", Value: "$/dsa", Public: true},
{Key: "key3", Value: "aa$@bb"},
{Key: "key4", Value: "aa${@}bb"},
}
expanded := all.Expand()
......@@ -149,5 +159,4 @@ func TestMultipleUsageOfAKey(t *testing.T) {
}
})
}
}
......@@ -1868,6 +1868,7 @@ func (f FakeBuildTrace) Fail(err error, failureReason common.JobFailureReason) {
func (f FakeBuildTrace) Notify(func()) {}
func (f FakeBuildTrace) SetCancelFunc(cancelFunc context.CancelFunc) {}
func (f FakeBuildTrace) SetFailuresCollector(fc common.FailuresCollector) {}
func (f FakeBuildTrace) SetMasked(masked []string) {}
func (f FakeBuildTrace) IsStdout() bool {
return false
}
package trace
import (
"bufio"
"bytes"
"fmt"
"io"
"sync"
"github.com/markelog/trie"
"gitlab.com/gitlab-org/gitlab-runner/helpers"
)
const maskedText = "[MASKED]"
const defaultBytesLimit = 4 * 1024 * 1024 // 4MB
type Buffer struct {
writer io.WriteCloser
lock sync.RWMutex
log bytes.Buffer
logMaskedSize int
bytesLimit int
finish chan struct{}
maskTree *trie.Trie
}
func (b *Buffer) SetMasked(values []string) {
if len(values) == 0 {
b.maskTree = nil
return
}
maskTree := trie.New()
for _, value := range values {
maskTree.Add(value, nil)
}
b.maskTree = maskTree
}
func (b *Buffer) SetLimit(size int) {
b.bytesLimit = size
}
func (b *Buffer) limitExceededMessage() string {
return fmt.Sprintf("\n%sJob's log exceeded limit of %v bytes.%s\n", helpers.ANSI_BOLD_RED, b.bytesLimit, helpers.ANSI_RESET)
}
func (b *Buffer) Bytes() []byte {
b.lock.RLock()
defer b.lock.RUnlock()
return b.log.Bytes()[0:b.logMaskedSize]
}
func (b *Buffer) String() string {
return string(b.Bytes())
}
func (b *Buffer) Write(data []byte) (n int, err error) {
return b.writer.Write(data)
}
func (b *Buffer) Close() error {
// wait for trace to finish
err := b.writer.Close()
<-b.finish
return err
}
func (b *Buffer) advanceAllUnsafe() {
b.logMaskedSize = b.log.Len()
}
func (b *Buffer) advanceAll() {
b.lock.Lock()
defer b.lock.Unlock()
b.advanceAllUnsafe()
}
// advanceLogUnsafe is assumed to be run every character
func (b *Buffer) advanceLogUnsafe() error {
// advance all if no masking is enabled
if b.maskTree == nil {
b.advanceAllUnsafe()
return nil
}
rest := string(b.log.Bytes()[b.logMaskedSize:])
results := b.maskTree.Search(rest)
if len(results) == 0 {
// we can advance as no match was found
b.advanceAllUnsafe()
return nil
}
// full match was found
if len(results) == 1 && results[0].Key == rest {
b.log.Truncate(b.logMaskedSize)
b.log.WriteString(maskedText)
b.advanceAllUnsafe()
}
// partial match, wait for more characters
return nil
}
func (b *Buffer) writeRune(r rune) (int, error) {
b.lock.Lock()
defer b.lock.Unlock()
n, err := b.log.WriteRune(r)
if err != nil {
return n, err
}
err = b.advanceLogUnsafe()
if err != nil {
return n, err
}
if b.log.Len() < b.bytesLimit {
return n, nil
}
b.log.WriteString(b.limitExceededMessage())
return n, io.EOF
}
func (b *Buffer) process(pipe *io.PipeReader) {
defer pipe.Close()
stopped := false
reader := bufio.NewReader(pipe)
for {
r, s, err := reader.ReadRune()
if s <= 0 {
break
} else if stopped {
// ignore symbols if job log exceeded limit
continue
} else if err == nil {
_, err = b.writeRune(r)
if err == io.EOF {
stopped = true
}
} else {
// ignore invalid characters
continue
}
}
b.advanceAll()
close(b.finish)
}
func New() *Buffer {
reader, writer := io.Pipe()
buffer := &Buffer{
writer: writer,
bytesLimit: defaultBytesLimit,
finish: make(chan struct{}),
}
go buffer.process(reader)
return buffer
}
package trace
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestVariablesMasking(t *testing.T) {
traceMessage := "This is the secret message containing secret duplicateValues"
maskedValues := []string{
"is",
"duplicateValue",
"duplicateValue",
"secret",
"containing",
}
buffer := New()
buffer.SetMasked(maskedValues)
_, err := buffer.Write([]byte(traceMessage))
require.NoError(t, err)
err = buffer.Close()
require.NoError(t, err)
assert.Equal(t, "Th[MASKED] [MASKED] the [MASKED] message [MASKED] [MASKED] [MASKED]s", buffer.String())
}
func TestTraceLimit(t *testing.T) {
traceMessage := "This is the long message"
buffer := New()
buffer.SetLimit(10)
_, err := buffer.Write([]byte(traceMessage))
require.NoError(t, err)
err = buffer.Close()
require.NoError(t, err)
assert.Contains(t, buffer.String(), "Job's log exceeded limit of")
}
......@@ -701,10 +701,7 @@ func getPatchServer(t *testing.T, handler func(w http.ResponseWriter, r *http.Re
}
func getTracePatch(traceString string, offset int) *tracePatch {
trace := bytes.Buffer{}
trace.WriteString(traceString)
tracePatch, _ := newTracePatch(trace, offset)
tracePatch, _ := newTracePatch([]byte(traceString), offset)
return tracePatch
}
......
package network
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"sync"
"time"
"gitlab.com/gitlab-org/gitlab-runner/common"
"gitlab.com/gitlab-org/gitlab-runner/helpers"
"gitlab.com/gitlab-org/gitlab-runner/helpers/trace"
)
type clientJobTrace struct {
*io.PipeWriter
client common.Network
config common.RunnerConfig
jobCredentials *common.JobCredentials
id int
bytesLimit int
cancelFunc context.CancelFunc
log bytes.Buffer
buffer *trace.Buffer
lock sync.RWMutex
state common.JobState
failureReason common.JobFailureReason
......@@ -62,6 +56,14 @@ func (c *clientJobTrace) Fail(err error, failureReason common.JobFailureReason)
c.finish()
}
func (c *clientJobTrace) Write(data []byte) (n int, err error) {
return c.buffer.Write(data)
}
func (c *clientJobTrace) SetMasked(masked []string) {
c.buffer.SetMasked(masked)
}
func (c *clientJobTrace) SetCancelFunc(cancelFunc context.CancelFunc) {
c.cancelFunc = cancelFunc
}
......@@ -83,17 +85,15 @@ func (c *clientJobTrace) setFailure(reason common.JobFailureReason) {
}
func (c *clientJobTrace) start() {
reader, writer := io.Pipe()
c.PipeWriter = writer
c.finished = make(chan bool)
c.state = common.Running
c.sentState = common.Running
go c.process(reader)
c.setupLogLimit()
go c.watch()
}
func (c *clientJobTrace) finish() {
c.Close()
c.buffer.Close()
c.finished <- true
// Do final upload of job trace
......@@ -105,54 +105,13 @@ func (c *clientJobTrace) finish() {
}
}
func (c *clientJobTrace) writeRune(r rune) (n int, err error) {
c.lock.Lock()
defer c.lock.Unlock()
n, err = c.log.WriteRune(r)
if c.log.Len() < c.bytesLimit {
return
}
c.log.WriteString(c.limitExceededMessage())
err = io.EOF
return
}
func (c *clientJobTrace) process(pipe *io.PipeReader) {
defer pipe.Close()
stopped := false
reader := bufio.NewReader(pipe)
c.setupLogLimit()
for {
r, s, err := reader.ReadRune()
if s <= 0 {
break
} else if stopped {
// ignore symbols if job log exceeded limit
continue
} else if err == nil {
_, err = c.writeRune(r)
if err == io.EOF {
stopped = true
}
} else {
// ignore invalid characters
continue
}
}
}
func (c *clientJobTrace) incrementalUpdate() common.UpdateState {
c.lock.RLock()
state := c.state
trace := c.log
trace := c.buffer.Bytes()
c.lock.RUnlock()
if c.sentTrace != trace.Len() {
if c.sentTrace != len(trace) {
result := c.sendPatch(trace)
if result != common.UpdateSucceeded {
return result
......@@ -171,7 +130,7 @@ func (c *clientJobTrace) incrementalUpdate() common.UpdateState {
return common.UpdateSucceeded
}
func (c *clientJobTrace) sendPatch(trace bytes.Buffer) common.UpdateState {
func (c *clientJobTrace) sendPatch(trace []byte) common.UpdateState {
tracePatch, err := newTracePatch(trace, c.sentTrace)
if err != nil {
c.config.Log().Errorln("Error while creating a tracePatch", err.Error())
......@@ -197,7 +156,7 @@ func (c *clientJobTrace) sendPatch(trace bytes.Buffer) common.UpdateState {
func (c *clientJobTrace) resendPatch(id int, config common.RunnerConfig, jobCredentials *common.JobCredentials, tracePatch common.JobTracePatch) (update common.UpdateState) {
if !tracePatch.ValidateRange() {
config.Log().Warningln(id, "Full job update is needed")
fullTrace := c.log.String()
fullTrace := string(c.buffer.Bytes())
jobInfo := common.UpdateJobInfo{
ID: c.id,
......@@ -240,10 +199,10 @@ func (c *clientJobTrace) sendUpdate(state common.JobState) common.UpdateState {
func (c *clientJobTrace) fullUpdate() common.UpdateState {
c.lock.RLock()
state := c.state
trace := c.log
trace := c.buffer.Bytes()
c.lock.RUnlock()
if c.sentTrace != trace.Len() {
if c.sentTrace != len(trace) {
c.sendPatch(trace) // we don't care about sendPatch() result, in the worst case we will re-send the trace
}
......@@ -253,14 +212,14 @@ func (c *clientJobTrace) fullUpdate() common.UpdateState {
FailureReason: c.failureReason,
}
if c.sentTrace != trace.Len() {
traceString := trace.String()
if c.sentTrace != len(trace) {
traceString := string(trace)
jobInfo.Trace = &traceString
}
update := c.client.UpdateJob(c.config, c.jobCredentials, jobInfo)
if update == common.UpdateSucceeded {