Commit fd06fc02 authored by Jacob Vosmaer's avatar Jacob Vosmaer Committed by Nick Thomas

Add gRPC admin health check

parent c2fb2a8f
......@@ -2,6 +2,8 @@
shared/pages/.update
/gitlab-pages
admin.socket
# Used by the makefile
/.GOPATH
/bin
......@@ -3,7 +3,7 @@
verify: list fmt vet lint complexity
fmt: bin/goimports .GOPATH/.ok
$Q ./bin/goimports -l $(allfiles) | awk '{ print "Please run go fmt"; exit 1 }'
$Q ./bin/goimports -l $(allfiles) | awk '{ print } END { if (NR>1) { print "Please run go fmt"; exit 1 } }'
vet: .GOPATH/.ok
$Q go vet $(allpackages)
......
package main
import (
"context"
"crypto/tls"
"net"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/stretchr/testify/require"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
)
var (
// Use ../../ because the pages binary interprets the path in ./shared/pages
adminSecretArgs = []string{"-admin-secret-path", "../../testdata/.admin-secret"}
adminToken = "super-secret\n"
)
func TestAdminHealthCheckUnix(t *testing.T) {
socketPath := "admin.socket"
// Use "../../" because the pages executable cd's into shared/pages
adminArgs := append(adminSecretArgs, "-admin-unix-listener", "../../"+socketPath)
teardown := RunPagesProcessWithoutWait(t, *pagesBinary, listeners, "", adminArgs...)
defer teardown()
waitHTTP2RoundTripUnix(t, socketPath)
testCases := []struct {
desc string
dialOpt grpc.DialOption
code codes.Code
}{
{
desc: "no auth provided",
code: codes.Unauthenticated,
},
{
desc: "wrong auth provided",
dialOpt: grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials("wrong token")),
code: codes.PermissionDenied,
},
{
desc: "correct auth provided",
dialOpt: grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(adminToken)),
code: codes.OK,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
connOpts := []grpc.DialOption{
grpc.WithInsecure(),
grpcUnixDialOpt(),
}
if tc.dialOpt != nil {
connOpts = append(connOpts, tc.dialOpt)
}
conn, err := grpc.Dial(socketPath, connOpts...)
require.NoError(t, err, "dial")
defer conn.Close()
err = healthCheck(conn)
require.Equal(t, tc.code, status.Code(err), "wrong grpc code: %v", err)
})
}
}
func TestAdminHealthCheckHTTPS(t *testing.T) {
key, cert := CreateHTTPSFixtureFiles(t)
creds, err := credentials.NewClientTLSFromFile(cert, "")
require.NoError(t, err, "grpc client credentials")
adminAddr := newAddr()
adminArgs := []string{"-admin-https-listener", adminAddr, "-admin-https-key", key, "-admin-https-cert", cert}
adminArgs = append(adminArgs, adminSecretArgs...)
teardown := RunPagesProcessWithoutWait(t, *pagesBinary, listeners, "", adminArgs...)
defer teardown()
waitHTTP2RoundTrip(t, adminAddr)
testCases := []struct {
desc string
dialOpt grpc.DialOption
code codes.Code
}{
{
desc: "no auth provided",
code: codes.Unauthenticated,
},
{
desc: "wrong auth provided",
dialOpt: grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials("wrong token")),
code: codes.PermissionDenied,
},
{
desc: "correct auth provided",
dialOpt: grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(adminToken)),
code: codes.OK,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
connOpts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
}
if tc.dialOpt != nil {
connOpts = append(connOpts, tc.dialOpt)
}
conn, err := grpc.Dial(adminAddr, connOpts...)
require.NoError(t, err, "dial")
defer conn.Close()
err = healthCheck(conn)
require.Equal(t, tc.code, status.Code(err), "wrong grpc code: %v", err)
})
}
}
func newAddr() string {
s := httptest.NewServer(http.NotFoundHandler())
s.Close()
return s.Listener.Addr().String()
}
func waitHTTP2RoundTrip(t *testing.T, addr string) {
transport := &http2.Transport{
TLSClientConfig: &tls.Config{RootCAs: TestCertPool},
}
req, err := http.NewRequest("get", "https://"+addr, nil)
require.NoError(t, err)
for start := time.Now(); time.Since(start) < 5*time.Second; time.Sleep(100 * time.Millisecond) {
var response *http.Response
response, err = transport.RoundTrip(req)
if err == nil {
response.Body.Close()
return
}
}
t.Fatal(err)
}
func grpcUnixDialOpt() grpc.DialOption {
return grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
})
}
func waitHTTP2RoundTripUnix(t *testing.T, socketPath string) {
var err error
for start := time.Now(); time.Since(start) < 5*time.Second; time.Sleep(100 * time.Millisecond) {
err = roundtripHTTP2Unix(socketPath)
if err == nil {
return
}
}
t.Fatal(err)
}
func roundtripHTTP2Unix(socketPath string) error {
transport := &http2.Transport{
DialTLS: func(network, addr string, cfg *tls.Config) (net.Conn, error) {
return net.Dial("unix", socketPath)
},
}
req, err := http.NewRequest("get", "https://localhost/", nil)
if err != nil {
return err
}
resp, err := transport.RoundTrip(req)
if err != nil {
return err
}
return resp.Body.Close()
}
func healthCheck(conn *grpc.ClientConn) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client := healthpb.NewHealthClient(conn)
_, err := client.Check(ctx, &healthpb.HealthCheckRequest{})
return err
}
......@@ -2,8 +2,10 @@ package main
import (
"crypto/tls"
"fmt"
"net"
"net/http"
"os"
"strconv"
"strings"
"sync"
......@@ -15,6 +17,7 @@ import (
"github.com/rs/cors"
log "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitlab-pages/internal/admin"
"gitlab.com/gitlab-org/gitlab-pages/internal/artifact"
"gitlab.com/gitlab-org/gitlab-pages/internal/domain"
"gitlab.com/gitlab-org/gitlab-pages/internal/httperrors"
......@@ -217,11 +220,63 @@ func (a *theApp) Run() {
}(a.ListenMetrics)
}
a.listenAdminUnix(&wg)
a.listenAdminHTTPS(&wg)
go domain.Watch(a.Domain, a.UpdateDomains, time.Second)
wg.Wait()
}
func (a *theApp) listenAdminUnix(wg *sync.WaitGroup) {
fd := a.ListenAdminUnix
if fd == 0 {
return
}
wg.Add(1)
go func() {
defer wg.Done()
l, err := net.FileListener(os.NewFile(fd, "[admin-socket-unix]"))
if err != nil {
fatal(fmt.Errorf("failed to listen on FD %d: %v", fd, err))
}
defer l.Close()
if err := admin.NewServer(string(a.AdminToken)).Serve(l); err != nil {
fatal(err)
}
}()
}
func (a *theApp) listenAdminHTTPS(wg *sync.WaitGroup) {
fd := a.ListenAdminHTTPS
if fd == 0 {
return
}
cert, err := tls.X509KeyPair(a.AdminCertificate, a.AdminKey)
if err != nil {
fatal(err)
}
wg.Add(1)
go func() {
defer wg.Done()
l, err := net.FileListener(os.NewFile(fd, "[admin-socket-https]"))
if err != nil {
fatal(fmt.Errorf("failed to listen on FD %d: %v", fd, err))
}
defer l.Close()
if err := admin.NewTLSServer(string(a.AdminToken), &cert).Serve(l); err != nil {
fatal(err)
}
}()
}
func runApp(config appConfig) {
a := theApp{appConfig: config}
......
......@@ -6,11 +6,16 @@ type appConfig struct {
ArtifactsServerTimeout int
RootCertificate []byte
RootKey []byte
AdminCertificate []byte
AdminKey []byte
AdminToken []byte
ListenHTTP []uintptr
ListenHTTPS []uintptr
ListenProxy []uintptr
ListenMetrics uintptr
ListenHTTP []uintptr
ListenHTTPS []uintptr
ListenProxy []uintptr
ListenMetrics uintptr
ListenAdminUnix uintptr
ListenAdminHTTPS uintptr
HTTP2 bool
RedirectHTTP bool
......
......@@ -153,14 +153,7 @@ func daemonChroot(cmd *exec.Cmd) (*jail.Jail, error) {
return cage, nil
}
func daemonize(config appConfig, uid, gid uint) {
var err error
defer func() {
if err != nil {
fatal(err)
}
}()
func daemonize(config appConfig, uid, gid uint) error {
log.WithFields(log.Fields{
"uid": uid,
"gid": gid,
......@@ -168,7 +161,7 @@ func daemonize(config appConfig, uid, gid uint) {
cmd, err := daemonReexec(uid, gid, daemonRunProgram)
if err != nil {
return
return err
}
defer killProcess(cmd)
......@@ -176,41 +169,35 @@ func daemonize(config appConfig, uid, gid uint) {
chroot, err := daemonChroot(cmd)
if err != nil {
log.WithError(err).Print("chroot failed")
return
return err
}
defer chroot.Dispose()
// Create a pipe to pass the configuration
configReader, configWriter, err := os.Pipe()
if err != nil {
return
return err
}
defer configWriter.Close()
cmd.ExtraFiles = append(cmd.ExtraFiles, configReader)
// Create a new file and store the FD for each listener
daemonUpdateFds(cmd, config.ListenHTTP)
daemonUpdateFds(cmd, config.ListenHTTPS)
daemonUpdateFds(cmd, config.ListenProxy)
if config.ListenMetrics != 0 {
config.ListenMetrics = daemonUpdateFd(cmd, config.ListenMetrics)
}
updateFds(&config, cmd)
// Start the process
if err = cmd.Start(); err != nil {
if err := cmd.Start(); err != nil {
log.WithError(err).Error("start failed")
return
return err
}
//detach binded mountpoints
if err = chroot.LazyUnbind(); err != nil {
if err := chroot.LazyUnbind(); err != nil {
log.WithError(err).Print("chroot lazy umount failed")
return
return err
}
// Write the configuration
if err = json.NewEncoder(configWriter).Encode(config); err != nil {
return
if err := json.NewEncoder(configWriter).Encode(config); err != nil {
return err
}
configWriter.Close()
......@@ -218,5 +205,25 @@ func daemonize(config appConfig, uid, gid uint) {
passSignals(cmd)
// Wait for process to exit
err = cmd.Wait()
return cmd.Wait()
}
func updateFds(config *appConfig, cmd *exec.Cmd) {
for _, fds := range [][]uintptr{
config.ListenHTTP,
config.ListenHTTPS,
config.ListenProxy,
} {
daemonUpdateFds(cmd, fds)
}
for _, fdPtr := range []*uintptr{
&config.ListenMetrics,
&config.ListenAdminUnix,
&config.ListenAdminHTTPS,
} {
if *fdPtr != 0 {
*fdPtr = daemonUpdateFd(cmd, *fdPtr)
}
}
}
......@@ -3,6 +3,7 @@ package main
import (
"io/ioutil"
"net"
"os"
)
func readFile(file string) (result []byte) {
......@@ -13,17 +14,41 @@ func readFile(file string) (result []byte) {
return
}
func createSocket(addr string) (l net.Listener, fd uintptr) {
// Be careful: if you let either of the return values get garbage
// collected by Go they will be closed automatically.
func createSocket(addr string) (net.Listener, *os.File) {
l, err := net.Listen("tcp", addr)
if err != nil {
fatal(err)
}
f, err := l.(*net.TCPListener).File()
return l, fileForListener(l)
}
// Be careful: if you let either of the return values get garbage
// collected by Go they will be closed automatically.
func createUnixSocket(addr string) (net.Listener, *os.File) {
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
fatal(err)
}
l, err := net.Listen("unix", addr)
if err != nil {
fatal(err)
}
fd = f.Fd()
return
return l, fileForListener(l)
}
func fileForListener(l net.Listener) *os.File {
type filer interface {
File() (*os.File, error)
}
f, err := l.(filer).File()
if err != nil {
fatal(err)
}
return f
}
package admin
import (
context "golang.org/x/net/context"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
)
func authFunc(token string) func(context.Context) (context.Context, error) {
return func(ctx context.Context) (context.Context, error) {
return ctx, gitalyauth.CheckToken(ctx, token)
}
}
package admin
import (
"crypto/tls"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
var logrusEntry *log.Entry
func init() {
logger := log.StandardLogger()
logrusEntry = log.NewEntry(logger)
grpc_logrus.ReplaceGrpcLogger(logrusEntry)
}
// NewServer creates a new unencrypted gRPC server for the gitlab-pages admin API.
func NewServer(secret string) *grpc.Server {
grpcServer := grpc.NewServer(serverOpts(secret)...)
registerServices(grpcServer)
return grpcServer
}
// NewTLSServer creates a new gRPC server with encryption for the gitlab-pages admin API.
func NewTLSServer(secret string, cert *tls.Certificate) *grpc.Server {
grpcServer := grpc.NewServer(append(
serverOpts(secret),
grpc.Creds(credentials.NewServerTLSFromCert(cert)),
)...)
registerServices(grpcServer)
return grpcServer
}
func serverOpts(secret string) []grpc.ServerOption {
return []grpc.ServerOption{
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_prometheus.StreamServerInterceptor,
grpc_logrus.StreamServerInterceptor(logrusEntry),
grpc_auth.StreamServerInterceptor(authFunc(secret)),
grpc_recovery.StreamServerInterceptor(),
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_prometheus.UnaryServerInterceptor,
grpc_logrus.UnaryServerInterceptor(logrusEntry),
grpc_auth.UnaryServerInterceptor(authFunc(secret)),
grpc_recovery.UnaryServerInterceptor(),
)),
}
}
func registerServices(g *grpc.Server) {
healthpb.RegisterHealthServer(g, health.NewServer())
}
......@@ -3,6 +3,7 @@ package main
import (
"errors"
"fmt"
"io"
"net/url"
"os"
"strings"
......@@ -39,6 +40,11 @@ var (
daemonGID = flag.Uint("daemon-gid", 0, "Drop privileges to this group")
logFormat = flag.String("log-format", "text", "The log output format: 'text' or 'json'")
logVerbose = flag.Bool("log-verbose", false, "Verbose logging")
adminSecretPath = flag.String("admin-secret-path", "", "Path to the file containing the admin secret token")
adminUnixListener = flag.String("admin-unix-listener", "", "The path for the admin API unix socket listener (optional)")
adminHTTPSListener = flag.String("admin-https-listener", "", "The listen address for the admin API HTTPS listener (optional)")
adminHTTPSCert = flag.String("admin-https-cert", "", "The path to the certificate file for the admin API (optional)")
adminHTTPSKey = flag.String("admin-https-key", "", "The path to the key file for the admin API (optional)")
disableCrossOriginRequests = flag.Bool("disable-cross-origin-requests", false, "Disable cross-origin requests")
......@@ -64,12 +70,19 @@ func configFromFlags() appConfig {
config.LogFormat = *logFormat
config.LogVerbose = *logVerbose
if *pagesRootCert != "" {
config.RootCertificate = readFile(*pagesRootCert)
}
if *pagesRootKey != "" {
config.RootKey = readFile(*pagesRootKey)
for _, file := range []struct {
contents *[]byte
path string
}{
{&config.RootCertificate, *pagesRootCert},
{&config.RootKey, *pagesRootKey},
{&config.AdminCertificate, *adminHTTPSCert},
{&config.AdminKey, *adminHTTPSKey},
{&config.AdminToken, *adminSecretPath},
} {
if file.path != "" {
*file.contents = readFile(file.path)
}
}
if *artifactsServerTimeout < 1 {
......@@ -120,6 +133,11 @@ func appMain() {
config := configFromFlags()
log.WithFields(log.Fields{
"admin-https-cert": *adminHTTPSCert,
"admin-https-key": *adminHTTPSKey,
"admin-https-listener": *adminHTTPSListener,
"admin-unix-listener": *adminUnixListener,
"admin-secret-path": *adminSecretPath,
"artifacts-server": *artifactsServer,
"artifacts-server-timeout": *artifactsServerTimeout,
"daemon-gid": *daemonGID,
......@@ -142,56 +160,137 @@ func appMain() {
"use-http-2": config.HTTP2,
}).Debug("Start daemon with configuration")
for _, cs := range [][]io.Closer{
createAppListeners(&config),
createMetricsListener(&config),
createAdminUnixListener(&config),
createAdminHTTPSListener(&config),
} {
defer closeAll(cs)
}
if *daemonUID != 0 || *daemonGID != 0 {
if err := daemonize(config, *daemonUID, *daemonGID); err != nil {
fatal(err)
}
return
}
runApp(config)
}
func closeAll(cs []io.Closer) {
for _, c := range cs {
c.Close()
}
}
// createAppListeners returns net.Listener and *os.File instances. The
// caller must ensure they don't get closed or garbage-collected (which
// implies closing) too soon.
func createAppListeners(config *appConfig) []io.Closer {
var closers []io.Closer
for _, addr := range listenHTTP.Split() {
l, fd := createSocket(addr)
defer l.Close()
l, f := createSocket(addr)
closers = append(closers, l, f)
log.WithFields(log.Fields{
"listener": addr,
}).Debug("Set up HTTP listener")
config.ListenHTTP = append(config.ListenHTTP, fd)
config.ListenHTTP = append(config.ListenHTTP, f.Fd())
}
for _, addr := range listenHTTPS.Split() {
l, fd := createSocket(addr)
defer l.Close()
l, f := createSocket(addr)
closers = append(closers, l, f)
log.WithFields(log.Fields{
"listener": addr,
}).Debug("Set up HTTPS listener")
config.ListenHTTPS = append(config.ListenHTTPS, fd)
config.ListenHTTPS = append(config.ListenHTTPS, f.Fd())
}
for _, addr := range listenProxy.Split() {
l, fd := createSocket(addr)
defer l.Close()
l, f := createSocket(addr)
closers = append(closers, l, f)
log.WithFields(log.Fields{
"listener": addr,
}).Debug("Set up proxy listener")
config.ListenProxy = append(config.ListenProxy, fd)
config.ListenProxy = append(config.ListenProxy, f.Fd())
}
if *metricsAddress != "" {
l, fd := createSocket(*metricsAddress)
defer l.Close()
return closers
}
log.WithFields(log.Fields{
"listener": *metricsAddress,
}).Debug("Set up metrics listener")
// createMetricsListener returns net.Listener and *os.File instances. The
// caller must ensure they don't get closed or garbage-collected (which
// implies closing) too soon.
func createMetricsListener(config *appConfig) []io.Closer {
addr := *metricsAddress
if addr == "" {
return nil
}
config.ListenMetrics = fd
l, f := createSocket(addr)
config.ListenMetrics = f.Fd()
log.WithFields(log.Fields{
"listener": addr,
}).Debug("Set up metrics listener")
return []io.Closer{l, f}
}
// createAdminUnixListener returns net.Listener and *os.File instances. The
// caller must ensure they don't get closed or garbage-collected (which
// implies closing) too soon.
func createAdminUnixListener(config *appConfig) []io.Closer {
unixPath := *adminUnixListener
if unixPath == "" {
return nil
}
if *daemonUID != 0 || *daemonGID != 0 {
daemonize(config, *daemonUID, *daemonGID)
return
if *adminSecretPath == "" {
fatal(fmt.Errorf("missing admin secret token file"))
}
runApp(config)
l, f := createUnixSocket(unixPath)
config.ListenAdminUnix = f.Fd()
log.WithFields(log.Fields{
"listener": unixPath,
}).Debug("Set up admin unix socket")