Commit f57f76eb authored by Tomasz Maczukin's avatar Tomasz Maczukin

Move debug server to it's own package

parent 215227b4
Pipeline #71534210 failed with stages
in 95 minutes and 27 seconds
......@@ -146,6 +146,7 @@ mocks: $(MOCKERY)
GOPATH=$(ORIGINAL_GOPATH) mockery $(MOCKERY_FLAGS) -dir=./vendor/github.com/ayufan/golang-kardianos-service -output=./helpers/service/mocks -name='(Interface)'
GOPATH=$(ORIGINAL_GOPATH) mockery $(MOCKERY_FLAGS) -dir=./helpers/docker -all -inpkg
GOPATH=$(ORIGINAL_GOPATH) mockery $(MOCKERY_FLAGS) -dir=./helpers/certificate -all -inpkg
GOPATH=$(ORIGINAL_GOPATH) mockery $(MOCKERY_FLAGS) -dir=./helpers/servers/debug -all -inpkg
GOPATH=$(ORIGINAL_GOPATH) mockery $(MOCKERY_FLAGS) -dir=./executors/docker -all -inpkg
GOPATH=$(ORIGINAL_GOPATH) mockery $(MOCKERY_FLAGS) -dir=./executors/custom -all -inpkg
GOPATH=$(ORIGINAL_GOPATH) mockery $(MOCKERY_FLAGS) -dir=./cache -all -inpkg
......
......@@ -4,8 +4,6 @@ import (
"errors"
"fmt"
"net"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"runtime"
......@@ -14,7 +12,6 @@ import (
"github.com/ayufan/golang-kardianos-service"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
......@@ -23,6 +20,7 @@ import (
"gitlab.com/gitlab-org/gitlab-runner/helpers/certificate"
prometheus_helper "gitlab.com/gitlab-org/gitlab-runner/helpers/prometheus"
"gitlab.com/gitlab-org/gitlab-runner/helpers/sentry"
"gitlab.com/gitlab-org/gitlab-runner/helpers/servers/debug"
"gitlab.com/gitlab-org/gitlab-runner/helpers/service"
"gitlab.com/gitlab-org/gitlab-runner/log"
"gitlab.com/gitlab-org/gitlab-runner/network"
......@@ -464,45 +462,35 @@ func (mr *RunCommand) runWait() {
mr.stopSignal = <-mr.stopSignals
}
func (mr *RunCommand) serveMetrics(mux *http.ServeMux) {
registry := prometheus.NewRegistry()
// Metrics about the runner's business logic.
registry.MustRegister(&mr.buildsHelper)
registry.MustRegister(mr)
// Metrics about API connections
registry.MustRegister(mr.networkRequestStatusesCollector)
// Metrics about jobs failures
registry.MustRegister(mr.failuresCollector)
// Metrics about catched errors
registry.MustRegister(&mr.prometheusLogHook)
// Metrics about the program's build version.
registry.MustRegister(common.AppVersion.NewMetricsCollector())
// Go-specific metrics about the process (GC stats, goroutines, etc.).
registry.MustRegister(prometheus.NewGoCollector())
// Go-unrelated process metrics (memory usage, file descriptors, etc.).
registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
func (mr *RunCommand) setupPrometheusCollectors(server debug.Server) {
collectors := debug.CollectorsMap{
"jobs metrics": &mr.buildsHelper,
"limits": mr,
"api connections": mr.networkRequestStatusesCollector,
"job failures": mr.failuresCollector,
"errors": &mr.prometheusLogHook,
"version": common.AppVersion.NewMetricsCollector(),
}
// Register all executor provider collectors
for _, provider := range common.GetExecutorProviders() {
for name, provider := range common.GetExecutorProviders() {
if collector, ok := provider.(prometheus.Collector); ok && collector != nil {
registry.MustRegister(collector)
name := fmt.Sprintf("executor %s", name)
collectors[name] = collector
}
}
mux.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
err := server.RegisterPrometheusCollectors(collectors)
if err != nil {
mr.log().WithError(err).Fatal("Failed to register a Prometheus Collector")
}
}
func (mr *RunCommand) serveDebugData(mux *http.ServeMux) {
mux.HandleFunc("/debug/jobs/list", mr.buildsHelper.ListJobsHandler)
func (mr *RunCommand) setupJobsDebugHandler(server debug.Server) {
server.RegisterDebugEndpoint("jobs/list", mr.buildsHelper.ListJobsHandler)
}
func (mr *RunCommand) servePprof(mux *http.ServeMux) {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
}
var debugServerFactory = debug.NewServer
func (mr *RunCommand) setupMetricsAndDebugServer() {
listenAddress, err := mr.listenAddress()
......@@ -524,19 +512,21 @@ func (mr *RunCommand) setupMetricsAndDebugServer() {
mr.log().WithError(err).Fatal("Failed to create listener for metrics server")
}
mux := http.NewServeMux()
server, err := debugServerFactory()
if err != nil {
mr.log().WithError(err).Fatal("Failed to initialize metrics server")
}
mr.setupPrometheusCollectors(server)
mr.setupJobsDebugHandler(server)
go func() {
err := http.Serve(listener, mux)
err := server.Start(listener)
if err != nil {
mr.log().WithError(err).Fatal("Metrics server terminated")
}
}()
mr.serveMetrics(mux)
mr.serveDebugData(mux)
mr.servePprof(mux)
mr.log().
WithField("address", listenAddress).
Info("Metrics server listening")
......@@ -735,15 +725,19 @@ func (mr *RunCommand) Execute(context *cli.Context) {
}
}
func init() {
func newCommand() *RunCommand {
requestStatusesCollector := network.NewAPIRequestStatusesMap()
common.RegisterCommand2("run", "run multi runner service", &RunCommand{
return &RunCommand{
ServiceName: defaultServiceName,
network: network.NewGitLabClientWithRequestStatusesMap(requestStatusesCollector),
networkRequestStatusesCollector: requestStatusesCollector,
prometheusLogHook: prometheus_helper.NewLogHook(),
failuresCollector: prometheus_helper.NewFailuresCollector(),
buildsHelper: newBuildsHelper(),
})
}
}
func init() {
common.RegisterCommand2("run", "run multi runner service", newCommand())
}
......@@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"gitlab.com/gitlab-org/gitlab-runner/common"
"gitlab.com/gitlab-org/gitlab-runner/helpers/servers/debug"
)
func TestProcessRunner_BuildLimit(t *testing.T) {
......@@ -109,3 +110,44 @@ func TestProcessRunner_BuildLimit(t *testing.T) {
// Wait for all builds to finish.
wg.Wait()
}
func TestDebugServerInitialization(t *testing.T) {
server := new(debug.MockServer)
defer server.AssertExpectations(t)
oldDebugServerFactory := debugServerFactory
defer func() {
debugServerFactory = oldDebugServerFactory
}()
debugServerFactory = func() (debug.Server, error) {
return server, nil
}
collectorsMatcher := mock.MatchedBy(func(collectors debug.CollectorsMap) bool {
return len(collectors) > 0
})
wg := new(sync.WaitGroup)
wg.Add(1)
server.On("RegisterPrometheusCollectors", collectorsMatcher).
Return(nil).
Once()
server.On("RegisterDebugEndpoint", "jobs/list", mock.AnythingOfType("http.HandlerFunc")).
Return(nil).
Once()
server.On("Start", mock.Anything).
Return(nil).
Once().
Run(func(args mock.Arguments) {
wg.Done()
})
cmd := newCommand()
cmd.ListenAddress = "127.0.0.1:12345"
err := cmd.Start(nil)
assert.NoError(t, err)
wg.Wait()
}
......@@ -72,7 +72,9 @@ func MakeBuildError(format string, args ...interface{}) error {
}
}
var executors map[string]ExecutorProvider
type ExecutorProvidersMap map[string]ExecutorProvider
var executors ExecutorProvidersMap
func validateExecutorProvider(provider ExecutorProvider) error {
if provider.GetDefaultShell() == "" {
......@@ -125,13 +127,12 @@ func GetExecutors() []string {
return names
}
func GetExecutorProviders() (providers []ExecutorProvider) {
func GetExecutorProviders() (providers ExecutorProvidersMap) {
if executors != nil {
for _, executorProvider := range executors {
providers = append(providers, executorProvider)
}
return executors
}
return
return make(ExecutorProvidersMap)
}
func NewExecutor(executor string) Executor {
......
package debug
import (
"fmt"
"net"
"net/http"
"net/http/pprof"
"strings"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
dto "github.com/prometheus/client_model/go"
)
type serveMux interface {
http.Handler
Handle(pattern string, handler http.Handler)
HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))
}
var newServeMux = func() serveMux {
return http.NewServeMux()
}
type httpServer interface {
Serve(listener net.Listener) error
}
var newHttpServer = func(handler http.Handler) httpServer {
return &http.Server{Handler: handler}
}
type prometheusRegistry interface {
Register(collector prometheus.Collector) error
Gather() ([]*dto.MetricFamily, error)
}
var newPrometheusRegistry = func() prometheusRegistry {
return prometheus.NewRegistry()
}
type defaultServer struct {
mux serveMux
prometheusRegistry prometheusRegistry
}
func newDefaultServer() (*defaultServer, error) {
server := new(defaultServer)
err := server.init()
if err != nil {
return nil, err
}
return server, nil
}
func (s *defaultServer) init() error {
initializers := []func() error{
s.initMux,
s.initPrometheus,
s.initPprof,
}
for _, initializer := range initializers {
err := initializer()
if err != nil {
return err
}
}
return nil
}
func (s *defaultServer) initMux() error {
s.mux = newServeMux()
return nil
}
func (s *defaultServer) initPrometheus() error {
s.prometheusRegistry = newPrometheusRegistry()
s.mux.Handle("/metrics", promhttp.HandlerFor(s.prometheusRegistry, promhttp.HandlerOpts{}))
collectors := CollectorsMap{
"go stats": prometheus.NewGoCollector(),
"process stats": prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}),
}
return s.RegisterPrometheusCollectors(collectors)
}
func (s *defaultServer) RegisterPrometheusCollectors(collectors CollectorsMap) error {
for name, collector := range collectors {
err := s.RegisterPrometheusCollector(collector)
if err != nil {
return fmt.Errorf("error while registering %q Prometheus collector: %v", name, err)
}
}
return nil
}
func (s *defaultServer) RegisterPrometheusCollector(collector prometheus.Collector) error {
return s.prometheusRegistry.Register(collector)
}
func (s *defaultServer) initPprof() error {
endpoints := EndpointsMap{
"pprof/": pprof.Index,
"pprof/cmdline": pprof.Cmdline,
"pprof/profile": pprof.Profile,
"pprof/symbol": pprof.Symbol,
"pprof/trace": pprof.Trace,
}
s.RegisterDebugEndpoints(endpoints)
return nil
}
func (s *defaultServer) RegisterDebugEndpoints(endpoints EndpointsMap) {
for path, endpoint := range endpoints {
s.RegisterDebugEndpoint(path, endpoint)
}
}
func (s *defaultServer) RegisterDebugEndpoint(path string, handlerFn http.HandlerFunc) {
path = fmt.Sprintf("/debug/%s", strings.TrimLeft(path, "/"))
s.mux.HandleFunc(path, handlerFn)
}
func (s *defaultServer) Start(listener net.Listener) error {
return newHttpServer(s.mux).Serve(listener)
}
package debug
import (
"errors"
"fmt"
"net"
"net/http"
"testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
type fakeListener struct{}
func (l *fakeListener) Accept() (net.Conn, error) { return nil, nil }
func (l *fakeListener) Close() error { return nil }
func (l *fakeListener) Addr() net.Addr { return nil }
type fakeCollector struct{}
func (c *fakeCollector) Describe(chan<- *prometheus.Desc) {}
func (c *fakeCollector) Collect(chan<- prometheus.Metric) {}
func mockUpHttpServer(t *testing.T) (*mockHttpServer, func()) {
httpServ := new(mockHttpServer)
oldNewHttpServer := newHttpServer
newHttpServer = func(handler http.Handler) httpServer {
return httpServ
}
deferFn := func() {
newHttpServer = oldNewHttpServer
httpServ.AssertExpectations(t)
}
return httpServ, deferFn
}
func TestDefaultServer_Start(t *testing.T) {
testCases := map[string]struct {
httpServeError error
}{
"http serve returns an error": {httpServeError: errors.New("test error")},
"http serve doesn't return an error": {httpServeError: nil},
}
for testName, testCase := range testCases {
t.Run(testName, func(t *testing.T) {
httpServ, deferFn := mockUpHttpServer(t)
defer deferFn()
httpServ.On("Serve", mock.Anything).Return(testCase.httpServeError).Once()
s := &defaultServer{}
err := s.Start(new(fakeListener))
assert.Equal(t, testCase.httpServeError, err)
})
}
}
func TestDefaultServer_RegisterPrometheusCollector(t *testing.T) {
testCases := map[string]struct {
registryError error
}{
"registry returns an error": {registryError: errors.New("test error")},
"registry doesn't return an error": {registryError: nil},
}
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
registry := new(mockPrometheusRegistry)
defer registry.AssertExpectations(t)
registry.On("Register", mock.Anything).Return(testCase.registryError).Once()
s := &defaultServer{prometheusRegistry: registry}
err := s.RegisterPrometheusCollector(new(fakeCollector))
assert.Equal(t, testCase.registryError, err)
})
}
}
func TestDefaultServer_RegisterPrometheusCollectors(t *testing.T) {
testCases := map[string]struct {
registerError error
}{
"register returns an error": {registerError: errors.New("test error")},
"register doesn't return an error": {registerError: nil},
}
for testName, testCase := range testCases {
t.Run(testName, func(t *testing.T) {
collector1 := new(fakeCollector)
collectors := CollectorsMap{
"collector 1": collector1,
}
registry := new(mockPrometheusRegistry)
defer registry.AssertExpectations(t)
registry.On("Register", collector1).Return(testCase.registerError).Once()
s := &defaultServer{prometheusRegistry: registry}
err := s.RegisterPrometheusCollectors(collectors)
if testCase.registerError != nil {
assert.EqualError(t, err, fmt.Sprintf("error while registering \"collector 1\" Prometheus collector: test error"))
} else {
assert.NoError(t, err)
}
})
}
}
func TestDefaultServer_RegisterDebugEndpoint(t *testing.T) {
paths := map[string]string{
"some/path": "/debug/some/path",
"some/path/": "/debug/some/path/",
"/some/path": "/debug/some/path",
"/some/path/": "/debug/some/path/",
}
for path, expectedPath := range paths {
t.Run(path, func(t *testing.T) {
mux := new(mockServeMux)
defer mux.AssertExpectations(t)
mux.On("HandleFunc", expectedPath, mock.Anything).Once()
s := &defaultServer{mux: mux}
s.RegisterDebugEndpoint(path, func(w http.ResponseWriter, r *http.Request) {})
})
}
}
func TestDefaultServer_RegisterDebugEndpoints(t *testing.T) {
endpoints := EndpointsMap{
"/some/path/1": func(w http.ResponseWriter, r *http.Request) {},
"/some/path/2": func(w http.ResponseWriter, r *http.Request) {},
}
mux := new(mockServeMux)
defer mux.AssertExpectations(t)
mux.On("HandleFunc", "/debug/some/path/1", mock.Anything).Once()
mux.On("HandleFunc", "/debug/some/path/2", mock.Anything).Once()
s := &defaultServer{mux: mux}
s.RegisterDebugEndpoints(endpoints)
}
func mockUpServeMux(t *testing.T) (*mockServeMux, *bool, func()) {
mux := new(mockServeMux)
oldNewServeMux := newServeMux
deferFn := func() {
newServeMux = oldNewServeMux
mux.AssertExpectations(t)
}
newServeMuxCalled := false
newServeMux = func() serveMux {
newServeMuxCalled = true
return mux
}
return mux, &newServeMuxCalled, deferFn
}
func mockUpPrometheusRegistry(t *testing.T) (*mockPrometheusRegistry, *bool, func()) {
registry := new(mockPrometheusRegistry)
oldNewPrometheusRegistry := newPrometheusRegistry
deferFn := func() {
newPrometheusRegistry = oldNewPrometheusRegistry
registry.AssertExpectations(t)
}
newPrometheusRegistryCalled := false
newPrometheusRegistry = func() prometheusRegistry {
newPrometheusRegistryCalled = true
return registry
}
return registry, &newPrometheusRegistryCalled, deferFn
}
func TestDefaultServerInitialization(t *testing.T) {
testCases := map[string]struct {
prometheusRegisterError error
}{
"prometheus register returns an error": {prometheusRegisterError: errors.New("test error")},
"prometheus register doesn't return an error": {prometheusRegisterError: nil},
}
for testName, testCase := range testCases {
t.Run(testName, func(t *testing.T) {
mux, newServeMuxCalled, muxDeferFn := mockUpServeMux(t)
registry, newPrometheusRegistryCalled, registryDeferFn := mockUpPrometheusRegistry(t)
defer muxDeferFn()
defer registryDeferFn()
mux.On("Handle", "/metrics", mock.Anything).Once()
if testCase.prometheusRegisterError == nil {
mux.On("HandleFunc", "/debug/pprof/", mock.Anything).Once()
mux.On("HandleFunc", "/debug/pprof/cmdline", mock.Anything).Once()
mux.On("HandleFunc", "/debug/pprof/profile", mock.Anything).Once()
mux.On("HandleFunc", "/debug/pprof/symbol", mock.Anything).Once()
mux.On("HandleFunc", "/debug/pprof/trace", mock.Anything).Once()
registry.On("Register", mock.Anything).Return(nil).Twice()
} else {
registry.On("Register", mock.Anything).Return(testCase.prometheusRegisterError).Once()
}
server, err := newDefaultServer()
assert.True(t, *newServeMuxCalled, "newServeMux() needs to be called")
assert.True(t, *newPrometheusRegistryCalled, "newPrometheusRegistry() needs to be called")
if testCase.prometheusRegisterError == nil {
assert.NotNil(t, server)
assert.NoError(t, err)
} else {
assert.Nil(t, server)
assert.Error(t, err)
}
})
}
}
// Code generated by mockery v1.0.0. DO NOT EDIT.
// This comment works around https://github.com/vektra/mockery/issues/155
package debug
import http "net/http"
import mock "github.com/stretchr/testify/mock"
import net "net"
import prometheus "github.com/prometheus/client_golang/prometheus"
// MockServer is an autogenerated mock type for the Server type
type MockServer struct {
mock.Mock
}
// RegisterDebugEndpoint provides a mock function with given fields: path, handlerFn
func (_m *MockServer) RegisterDebugEndpoint(path string, handlerFn http.HandlerFunc) {
_m.Called(path, handlerFn)
}
// RegisterDebugEndpoints provides a mock function with given fields: endpoints
func (_m *MockServer) RegisterDebugEndpoints(endpoints EndpointsMap) {
_m.Called(endpoints)
}
// RegisterPrometheusCollector provides a mock function with given fields: collector
func (_m *MockServer) RegisterPrometheusCollector(collector prometheus.Collector) error {
ret := _m.Called(collector)
var r0 error
if rf, ok := ret.Get(0).(func(prometheus.Collector) error); ok {
r0 = rf(collector)
} else {
r0 = ret.Error(0)
}
return r0
}
// RegisterPrometheusCollectors provides a mock function with given fields: collectors
func (_m *MockServer) RegisterPrometheusCollectors(collectors CollectorsMap) error {
ret := _m.Called(collectors)
var r0 error
if rf, ok := ret.Get(0).(func(CollectorsMap) error); ok {
r0 = rf(collectors)
} else {
r0 = ret.Error(0)
}
return r0
}
// Start provides a mock function with given fields: listener
func (_m *MockServer) Start(listener net.Listener) error {
ret := _m.Called(listener)
var r0 error
if rf, ok := ret.Get(0).(func(net.Listener) error); ok {
r0 = rf(listener)
} else {
r0 = ret.Error(0)
}
return r0
}
// Code generated by mockery v1.0.0. DO NOT EDIT.
// This comment works around https://github.com/vektra/mockery/issues/155
package debug
import mock "github.com/stretchr/testify/mock"
import net "net"
// mockHttpServer is an autogenerated mock type for the httpServer type
type mockHttpServer struct {
mock.Mock
}
// Serve provides a mock function with given fields: listener
func (_m *mockHttpServer) Serve(listener net.Listener) error {
ret := _m.Called(listener)
var r0 error
if rf, ok := ret.Get(0).(func(net.Listener) error); ok {
r0 = rf(listener)
} else {
r0 = ret.Error(0)
}
return r0
}
// Code generated by mockery v1.0.0. DO NOT EDIT.
// This comment works around https://github.com/vektra/mockery/issues/155
package debug
import io_prometheus_client "github.com/prometheus/client_model/go"
import mock "github.com/stretchr/testify/mock"