From 43031e2d469829d9564319b10a30c7ab0a8d4dc2 Mon Sep 17 00:00:00 2001
From: Pavlo Strokov <pstrokov@gitlab.com>
Date: Mon, 4 Oct 2021 21:21:11 +0300
Subject: [PATCH 1/4] bootstrap: Abstract bootstrapper for testing

The old implementation of the bootstrapper initialization
does not allow calling the 'run' function to start a service
because the tableflip library doesn't support multiple
instances to be created for one process.
Starting the Praefect service is required in tests to verify
sub-command execution. The bootstrapper initialization
extracted out of 'run' function. It allows using a new
Noop bootstrapper to run service without tableflip
support.

(cherry picked from commit 18ff3676f30667f70c8435903a4c9832a1c5e594)
---
 cmd/gitaly/main.go                            |  4 +-
 cmd/praefect/main.go                          | 17 +++--
 cmd/praefect/subcmd_remove_repository_test.go |  8 +--
 internal/bootstrap/bootstrap.go               | 72 ++++++++++++++++---
 internal/bootstrap/bootstrap_test.go          | 38 +++++-----
 5 files changed, 95 insertions(+), 44 deletions(-)

diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go
index 3b901c45cd8..db7c73a0610 100644
--- a/cmd/gitaly/main.go
+++ b/cmd/gitaly/main.go
@@ -199,8 +199,6 @@ func run(cfg config.Cfg) error {
 		return fmt.Errorf("linguist instance creation: %w", err)
 	}
 
-	b.StopAction = gitalyServerFactory.GracefulStop
-
 	rubySrv := rubyserver.New(cfg)
 	if err := rubySrv.Start(); err != nil {
 		return fmt.Errorf("initialize gitaly-ruby: %v", err)
@@ -303,5 +301,5 @@ func run(cfg config.Cfg) error {
 		}
 	}()
 
-	return b.Wait(cfg.GracefulRestartTimeout.Duration())
+	return b.Wait(cfg.GracefulRestartTimeout.Duration(), gitalyServerFactory.GracefulStop)
 }
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 44a0981f09b..128f30cea7c 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -144,7 +144,12 @@ func main() {
 		logger.Fatalf("%s", err)
 	}
 
-	if err := run(starterConfigs, conf); err != nil {
+	b, err := bootstrap.New()
+	if err != nil {
+		logger.Fatalf("unable to create a bootstrap: %v", err)
+	}
+
+	if err := run(starterConfigs, conf, b); err != nil {
 		logger.Fatalf("%v", err)
 	}
 }
@@ -187,7 +192,7 @@ func configure(conf config.Config) {
 	sentry.ConfigureSentry(version.GetVersion(), conf.Sentry)
 }
 
-func run(cfgs []starter.Config, conf config.Config) error {
+func run(cfgs []starter.Config, conf config.Config, b bootstrap.Listener) error {
 	nodeLatencyHistogram, err := metrics.RegisterNodeLatency(conf.Prometheus)
 	if err != nil {
 		return err
@@ -391,12 +396,6 @@ func run(cfgs []starter.Config, conf config.Config) error {
 	}
 	prometheus.MustRegister(metricsCollectors...)
 
-	b, err := bootstrap.New()
-	if err != nil {
-		return fmt.Errorf("unable to create a bootstrap: %v", err)
-	}
-
-	b.StopAction = srvFactory.GracefulStop
 	for _, cfg := range cfgs {
 		srv, err := srvFactory.Create(cfg.IsSecure())
 		if err != nil {
@@ -476,7 +475,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
 		logger.Warn(`Repository cleanup background task disabled as "repositories_cleanup.run_interval" is not set or 0.`)
 	}
 
-	return b.Wait(conf.GracefulStopTimeout.Duration())
+	return b.Wait(conf.GracefulStopTimeout.Duration(), srvFactory.GracefulStop)
 }
 
 func getStarterConfigs(conf config.Config) ([]starter.Config, error) {
diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go
index 9dab2f0a663..039b16873e5 100644
--- a/cmd/praefect/subcmd_remove_repository_test.go
+++ b/cmd/praefect/subcmd_remove_repository_test.go
@@ -4,7 +4,6 @@ import (
 	"flag"
 	"path/filepath"
 	"strings"
-	"syscall"
 	"testing"
 	"time"
 
@@ -13,6 +12,7 @@ import (
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 	"gitlab.com/gitlab-org/gitaly/v14/client"
+	"gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap"
 	"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
 	"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
 	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
@@ -108,10 +108,10 @@ func TestRemoveRepository_Exec(t *testing.T) {
 	starterConfigs, err := getStarterConfigs(conf)
 	require.NoError(t, err)
 	stopped := make(chan struct{})
+	bootstrapper := bootstrap.NewNoop()
 	go func() {
 		defer close(stopped)
-		err := run(starterConfigs, conf)
-		assert.EqualError(t, err, `received signal "terminated"`)
+		assert.NoError(t, run(starterConfigs, conf, bootstrapper))
 	}()
 
 	cc, err := client.Dial("unix://"+conf.SocketPath, nil)
@@ -229,7 +229,7 @@ func TestRemoveRepository_Exec(t *testing.T) {
 		requireNoDatabaseInfo(t, db, cmd)
 	})
 
-	require.NoError(t, syscall.Kill(syscall.Getpid(), syscall.SIGTERM))
+	bootstrapper.Terminate()
 	<-stopped
 }
 
diff --git a/internal/bootstrap/bootstrap.go b/internal/bootstrap/bootstrap.go
index 95e43aa10f5..f1d355926de 100644
--- a/internal/bootstrap/bootstrap.go
+++ b/internal/bootstrap/bootstrap.go
@@ -22,11 +22,18 @@ const (
 	socketReusePortWarning = "Unable to set SO_REUSEPORT: zero downtime upgrades will not work"
 )
 
+// Listener is an interface of the bootstrap manager.
+type Listener interface {
+	// RegisterStarter adds starter to the pool.
+	RegisterStarter(starter Starter)
+	// Start starts all registered starters to accept connections.
+	Start() error
+	// Wait terminates all registered starters.
+	Wait(gracefulTimeout time.Duration, stopAction func()) error
+}
+
 // Bootstrap handles graceful upgrades
 type Bootstrap struct {
-	// StopAction will be invoked during a graceful stop. It must wait until the shutdown is completed
-	StopAction func()
-
 	upgrader   upgrader
 	listenFunc ListenFunc
 	errChan    chan error
@@ -152,7 +159,8 @@ func (b *Bootstrap) Start() error {
 // Wait will signal process readiness to the parent and than wait for an exit condition
 // SIGTERM, SIGINT and a runtime error will trigger an immediate shutdown
 // in case of an upgrade there will be a grace period to complete the ongoing requests
-func (b *Bootstrap) Wait(gracefulTimeout time.Duration) error {
+// stopAction will be invoked during a graceful stop. It must wait until the shutdown is completed.
+func (b *Bootstrap) Wait(gracefulTimeout time.Duration, stopAction func()) error {
 	signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT}
 	immediateShutdown := make(chan os.Signal, len(signals))
 	signal.Notify(immediateShutdown, signals...)
@@ -168,7 +176,7 @@ func (b *Bootstrap) Wait(gracefulTimeout time.Duration) error {
 		// the new process signaled its readiness and we started a graceful stop
 		// however no further upgrades can be started until this process is running
 		// we set a grace period and then we force a termination.
-		waitError := b.waitGracePeriod(gracefulTimeout, immediateShutdown)
+		waitError := b.waitGracePeriod(gracefulTimeout, immediateShutdown, stopAction)
 
 		err = fmt.Errorf("graceful upgrade: %v", waitError)
 	case s := <-immediateShutdown:
@@ -180,13 +188,13 @@ func (b *Bootstrap) Wait(gracefulTimeout time.Duration) error {
 	return err
 }
 
-func (b *Bootstrap) waitGracePeriod(gracefulTimeout time.Duration, kill <-chan os.Signal) error {
+func (b *Bootstrap) waitGracePeriod(gracefulTimeout time.Duration, kill <-chan os.Signal, stopAction func()) error {
 	log.WithField("graceful_timeout", gracefulTimeout).Warn("starting grace period")
 
 	allServersDone := make(chan struct{})
 	go func() {
-		if b.StopAction != nil {
-			b.StopAction()
+		if stopAction != nil {
+			stopAction()
 		}
 		close(allServersDone)
 	}()
@@ -210,3 +218,51 @@ func (b *Bootstrap) listen(network, path string) (net.Listener, error) {
 
 	return b.listenFunc(network, path)
 }
+
+// Noop is a bootstrapper that does no additional configurations.
+type Noop struct {
+	starters []Starter
+	shutdown chan struct{}
+	errChan  chan error
+}
+
+// NewNoop returns initialized instance of the *Noop.
+func NewNoop() *Noop {
+	return &Noop{shutdown: make(chan struct{})}
+}
+
+// RegisterStarter adds starter to the pool.
+func (n *Noop) RegisterStarter(starter Starter) {
+	n.starters = append(n.starters, starter)
+}
+
+// Start starts all registered starters to accept connections.
+func (n *Noop) Start() error {
+	n.errChan = make(chan error, len(n.starters))
+
+	for _, start := range n.starters {
+		if err := start(net.Listen, n.errChan); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+// Wait terminates all registered starters.
+func (n *Noop) Wait(_ time.Duration, stopAction func()) error {
+	select {
+	case <-n.shutdown:
+		if stopAction != nil {
+			stopAction()
+		}
+	case err := <-n.errChan:
+		return err
+	}
+
+	return nil
+}
+
+// Terminate unblocks Wait method and executes stopAction call-back passed into it.
+func (n *Noop) Terminate() {
+	close(n.shutdown)
+}
diff --git a/internal/bootstrap/bootstrap_test.go b/internal/bootstrap/bootstrap_test.go
index 1fb6c348562..83e0d2b5ede 100644
--- a/internal/bootstrap/bootstrap_test.go
+++ b/internal/bootstrap/bootstrap_test.go
@@ -109,10 +109,10 @@ func TestImmediateTerminationOnSocketError(t *testing.T) {
 	ctx, cancel := testhelper.Context()
 	defer cancel()
 
-	b, server := makeBootstrap(t, ctx)
+	b, server, stopAction := makeBootstrap(t, ctx)
 
 	waitCh := make(chan error)
-	go func() { waitCh <- b.Wait(2 * time.Second) }()
+	go func() { waitCh <- b.Wait(2*time.Second, stopAction) }()
 
 	require.NoError(t, server.listeners["tcp"].Close(), "Closing first listener")
 
@@ -127,12 +127,12 @@ func TestImmediateTerminationOnSignal(t *testing.T) {
 			ctx, cancel := testhelper.Context()
 			defer cancel()
 
-			b, server := makeBootstrap(t, ctx)
+			b, server, stopAction := makeBootstrap(t, ctx)
 
 			done := server.slowRequest(3 * time.Minute)
 
 			waitCh := make(chan error)
-			go func() { waitCh <- b.Wait(2 * time.Second) }()
+			go func() { waitCh <- b.Wait(2*time.Second, stopAction) }()
 
 			// make sure we are inside b.Wait() or we'll kill the test suite
 			time.Sleep(100 * time.Millisecond)
@@ -157,9 +157,9 @@ func TestGracefulTerminationStuck(t *testing.T) {
 	ctx, cancel := testhelper.Context()
 	defer cancel()
 
-	b, server := makeBootstrap(t, ctx)
+	b, server, stopAction := makeBootstrap(t, ctx)
 
-	err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil)
+	err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil, stopAction)
 	require.Contains(t, err.Error(), "grace period expired")
 }
 
@@ -171,11 +171,11 @@ func TestGracefulTerminationWithSignals(t *testing.T) {
 		t.Run(sig.String(), func(t *testing.T) {
 			ctx, cancel := testhelper.Context()
 			defer cancel()
-			b, server := makeBootstrap(t, ctx)
+			b, server, stopAction := makeBootstrap(t, ctx)
 
 			err := testGracefulUpdate(t, server, b, 1*time.Second, 2*time.Second, func() {
 				require.NoError(t, self.Signal(sig))
-			})
+			}, stopAction)
 			require.Contains(t, err.Error(), "force shutdown")
 		})
 	}
@@ -184,11 +184,11 @@ func TestGracefulTerminationWithSignals(t *testing.T) {
 func TestGracefulTerminationServerErrors(t *testing.T) {
 	ctx, cancel := testhelper.Context()
 	defer cancel()
-	b, server := makeBootstrap(t, ctx)
+	b, server, _ := makeBootstrap(t, ctx)
 
 	done := make(chan error, 1)
 	// This is a simulation of receiving a listener error during waitGracePeriod
-	b.StopAction = func() {
+	stopAction := func() {
 		// we close the unix listener in order to test that the shutdown will not fail, but it keep waiting for the TCP request
 		require.NoError(t, server.listeners["unix"].Close())
 
@@ -200,7 +200,7 @@ func TestGracefulTerminationServerErrors(t *testing.T) {
 		require.NoError(t, server.server.Shutdown(context.Background()))
 	}
 
-	err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil)
+	err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil, stopAction)
 	require.Contains(t, err.Error(), "grace period expired")
 
 	require.NoError(t, <-done)
@@ -209,12 +209,12 @@ func TestGracefulTerminationServerErrors(t *testing.T) {
 func TestGracefulTermination(t *testing.T) {
 	ctx, cancel := testhelper.Context()
 	defer cancel()
-	b, server := makeBootstrap(t, ctx)
+	b, server, _ := makeBootstrap(t, ctx)
 
 	// Using server.Close we bypass the graceful shutdown faking a completed shutdown
-	b.StopAction = func() { server.server.Close() }
+	stopAction := func() { server.server.Close() }
 
-	err := testGracefulUpdate(t, server, b, 1*time.Second, 2*time.Second, nil)
+	err := testGracefulUpdate(t, server, b, 1*time.Second, 2*time.Second, nil, stopAction)
 	require.Contains(t, err.Error(), "completed")
 }
 
@@ -236,9 +236,9 @@ func TestPortReuse(t *testing.T) {
 	b.upgrader.Stop()
 }
 
-func testGracefulUpdate(t *testing.T, server *testServer, b *Bootstrap, waitTimeout, gracefulWait time.Duration, duringGracePeriodCallback func()) error {
+func testGracefulUpdate(t *testing.T, server *testServer, b *Bootstrap, waitTimeout, gracefulWait time.Duration, duringGracePeriodCallback func(), stopAction func()) error {
 	waitCh := make(chan error)
-	go func() { waitCh <- b.Wait(gracefulWait) }()
+	go func() { waitCh <- b.Wait(gracefulWait, stopAction) }()
 
 	// Start a slow request to keep the old server from shutting down immediately.
 	req := server.slowRequest(2 * gracefulWait)
@@ -268,7 +268,7 @@ func testGracefulUpdate(t *testing.T, server *testServer, b *Bootstrap, waitTime
 	return waitErr
 }
 
-func makeBootstrap(t *testing.T, ctx context.Context) (*Bootstrap, *testServer) {
+func makeBootstrap(t *testing.T, ctx context.Context) (*Bootstrap, *testServer, func()) {
 	mux := http.NewServeMux()
 	mux.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
 		w.WriteHeader(200)
@@ -292,8 +292,6 @@ func makeBootstrap(t *testing.T, ctx context.Context) (*Bootstrap, *testServer)
 	b, err := _new(u, net.Listen, false)
 	require.NoError(t, err)
 
-	b.StopAction = func() { require.NoError(t, s.Shutdown(context.Background())) }
-
 	listeners := make(map[string]net.Listener)
 	start := func(network, address string) Starter {
 		return func(listen ListenFunc, errors chan<- error) error {
@@ -333,7 +331,7 @@ func makeBootstrap(t *testing.T, ctx context.Context) (*Bootstrap, *testServer)
 		server:    &s,
 		listeners: listeners,
 		url:       url,
-	}
+	}, func() { require.NoError(t, s.Shutdown(context.Background())) }
 }
 
 func testAllListeners(t *testing.T, listeners map[string]net.Listener) {
-- 
GitLab


From aac5d5e573b6a48f724ea8337ea74e24e4c4167f Mon Sep 17 00:00:00 2001
From: Pavlo Strokov <pstrokov@gitlab.com>
Date: Tue, 5 Oct 2021 10:55:54 +0300
Subject: [PATCH 2/4] prometheus: Avoid duplicated metrics registration

Praefect uses prometheus to export metrics from inside.
It relies on the defaults from the prometheus library
to gather set of metrics and register a new metrics.
Because of it the new metrics got registered on the
DefaultRegisterer - a global pre-configured registerer.
Because of that we can't call 'run' function multiple
times (for testing purposes) as it results to the metrics
registration error. To omit that problem the 'run' function
extended with prometheus.Registerer parameter that is used
to register praefect custom metrics. The production code
still uses the same DefaultRegisterer as it was before.
And the test code creates a new instance of the registerer
for each 'run' invocation, so there are no more duplicates.

(cherry picked from commit 81368d46df2af8b715b92db8552e4f30cbf8118c)
---
 cmd/praefect/main.go                          | 16 ++++++++--------
 cmd/praefect/subcmd_remove_repository_test.go |  3 ++-
 internal/praefect/metrics/prometheus.go       | 12 ++++++------
 3 files changed, 16 insertions(+), 15 deletions(-)

diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 128f30cea7c..262b2f90d2c 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -149,7 +149,7 @@ func main() {
 		logger.Fatalf("unable to create a bootstrap: %v", err)
 	}
 
-	if err := run(starterConfigs, conf, b); err != nil {
+	if err := run(starterConfigs, conf, b, prometheus.DefaultRegisterer); err != nil {
 		logger.Fatalf("%v", err)
 	}
 }
@@ -192,18 +192,18 @@ func configure(conf config.Config) {
 	sentry.ConfigureSentry(version.GetVersion(), conf.Sentry)
 }
 
-func run(cfgs []starter.Config, conf config.Config, b bootstrap.Listener) error {
-	nodeLatencyHistogram, err := metrics.RegisterNodeLatency(conf.Prometheus)
+func run(cfgs []starter.Config, conf config.Config, b bootstrap.Listener, promreg prometheus.Registerer) error {
+	nodeLatencyHistogram, err := metrics.RegisterNodeLatency(conf.Prometheus, promreg)
 	if err != nil {
 		return err
 	}
 
-	delayMetric, err := metrics.RegisterReplicationDelay(conf.Prometheus)
+	delayMetric, err := metrics.RegisterReplicationDelay(conf.Prometheus, promreg)
 	if err != nil {
 		return err
 	}
 
-	latencyMetric, err := metrics.RegisterReplicationLatency(conf.Prometheus)
+	latencyMetric, err := metrics.RegisterReplicationLatency(conf.Prometheus, promreg)
 	if err != nil {
 		return err
 	}
@@ -390,11 +390,11 @@ func run(cfgs []starter.Config, conf config.Config, b bootstrap.Listener) error
 	)
 	metricsCollectors = append(metricsCollectors, transactionManager, coordinator, repl)
 	if db != nil {
-		prometheus.MustRegister(
+		promreg.MustRegister(
 			datastore.NewRepositoryStoreCollector(logger, conf.VirtualStorageNames(), db, conf.Prometheus.ScrapeTimeout),
 		)
 	}
-	prometheus.MustRegister(metricsCollectors...)
+	promreg.MustRegister(metricsCollectors...)
 
 	for _, cfg := range cfgs {
 		srv, err := srvFactory.Create(cfg.IsSecure())
@@ -447,7 +447,7 @@ func run(cfgs []starter.Config, conf config.Config, b bootstrap.Listener) error
 				conf.StorageNames(),
 				conf.Reconciliation.HistogramBuckets,
 			)
-			prometheus.MustRegister(r)
+			promreg.MustRegister(r)
 			go r.Run(ctx, helper.NewTimerTicker(interval))
 		}
 	}
diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go
index 039b16873e5..abc4b93b9f6 100644
--- a/cmd/praefect/subcmd_remove_repository_test.go
+++ b/cmd/praefect/subcmd_remove_repository_test.go
@@ -7,6 +7,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/prometheus/client_golang/prometheus"
 	"github.com/sirupsen/logrus"
 	"github.com/sirupsen/logrus/hooks/test"
 	"github.com/stretchr/testify/assert"
@@ -111,7 +112,7 @@ func TestRemoveRepository_Exec(t *testing.T) {
 	bootstrapper := bootstrap.NewNoop()
 	go func() {
 		defer close(stopped)
-		assert.NoError(t, run(starterConfigs, conf, bootstrapper))
+		assert.NoError(t, run(starterConfigs, conf, bootstrapper, prometheus.NewRegistry()))
 	}()
 
 	cc, err := client.Dial("unix://"+conf.SocketPath, nil)
diff --git a/internal/praefect/metrics/prometheus.go b/internal/praefect/metrics/prometheus.go
index b09ee9631c8..88886b5743d 100644
--- a/internal/praefect/metrics/prometheus.go
+++ b/internal/praefect/metrics/prometheus.go
@@ -9,7 +9,7 @@ import (
 
 // RegisterReplicationDelay creates and registers a prometheus histogram
 // to observe replication delay times
-func RegisterReplicationDelay(conf gitalycfgprom.Config) (metrics.HistogramVec, error) {
+func RegisterReplicationDelay(conf gitalycfgprom.Config, registerer prometheus.Registerer) (metrics.HistogramVec, error) {
 	replicationDelay := prometheus.NewHistogramVec(
 		prometheus.HistogramOpts{
 			Namespace: "gitaly",
@@ -20,12 +20,12 @@ func RegisterReplicationDelay(conf gitalycfgprom.Config) (metrics.HistogramVec,
 		[]string{"type"},
 	)
 
-	return replicationDelay, prometheus.Register(replicationDelay)
+	return replicationDelay, registerer.Register(replicationDelay)
 }
 
 // RegisterReplicationLatency creates and registers a prometheus histogram
 // to observe replication latency times
-func RegisterReplicationLatency(conf gitalycfgprom.Config) (metrics.HistogramVec, error) {
+func RegisterReplicationLatency(conf gitalycfgprom.Config, registerer prometheus.Registerer) (metrics.HistogramVec, error) {
 	replicationLatency := prometheus.NewHistogramVec(
 		prometheus.HistogramOpts{
 			Namespace: "gitaly",
@@ -36,12 +36,12 @@ func RegisterReplicationLatency(conf gitalycfgprom.Config) (metrics.HistogramVec
 		[]string{"type"},
 	)
 
-	return replicationLatency, prometheus.Register(replicationLatency)
+	return replicationLatency, registerer.Register(replicationLatency)
 }
 
 // RegisterNodeLatency creates and registers a prometheus histogram to
 // observe internal node latency
-func RegisterNodeLatency(conf gitalycfgprom.Config) (metrics.HistogramVec, error) {
+func RegisterNodeLatency(conf gitalycfgprom.Config, registerer prometheus.Registerer) (metrics.HistogramVec, error) {
 	nodeLatency := prometheus.NewHistogramVec(
 		prometheus.HistogramOpts{
 			Namespace: "gitaly",
@@ -51,7 +51,7 @@ func RegisterNodeLatency(conf gitalycfgprom.Config) (metrics.HistogramVec, error
 		}, []string{"gitaly_storage"},
 	)
 
-	return nodeLatency, prometheus.Register(nodeLatency)
+	return nodeLatency, registerer.Register(nodeLatency)
 }
 
 var MethodTypeCounter = promauto.NewCounterVec(
-- 
GitLab


From ebaade4a4816704e6c5bf5696f070fd16273fe09 Mon Sep 17 00:00:00 2001
From: John Cai <jcai@gitlab.com>
Date: Tue, 16 Nov 2021 18:12:56 -0500
Subject: [PATCH 3/4] praefect: Add ability to have separate database metrics
 endpoint

By default, when metrics are enabled, then each Praefect will expose
information about how many read-only repositories there are, which
requires Praefect to query the database. First, this will result in the
same metrics being exposed by every Praefect given that the database is
shared between all of them. And second, this will cause one query per
Praefect per scraping run. This cost does add up and generate quite some
load on the database, especially so if there is a lot of repositories in
that database, up to a point where it may overload the database
completely.

Fix this issue by splitting metrics which hit the database into a
separate endpoint "/db_metrics". This allows admins to set up a separate
scraper with a different scraping interval for this metric, and
furthermore it gives the ability to only scrape this metric for one of
the Praefect instances so the work isn't unnecessarily duplicated.

Given that this is a breaking change which will get backported, we must
make this behaviour opt-in for now. We thus include a new configuration
key "prometheus_use_database_endpoint" which enables the new behaviour
such that existing installations' metrics won't break on a simple point
release. The intent is to eventually remove this configuration though
and enable it for all setups on a major release.

Changelog: added
(cherry picked from commit 7e74b7333ca6f2d1e55e7a17350cccc7c856c847)
---
 cmd/praefect/main.go                          | 36 ++++++++++++++++---
 cmd/praefect/subcmd_remove_repository_test.go |  2 +-
 internal/praefect/config/config.go            | 14 +++++---
 3 files changed, 42 insertions(+), 10 deletions(-)

diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 262b2f90d2c..fdfbb278979 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -63,11 +63,13 @@ import (
 	"flag"
 	"fmt"
 	"math/rand"
+	"net/http"
 	"os"
 	"strings"
 	"time"
 
 	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promhttp"
 	"github.com/sirupsen/logrus"
 	"gitlab.com/gitlab-org/gitaly/v14/internal/backchannel"
 	"gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap"
@@ -149,7 +151,9 @@ func main() {
 		logger.Fatalf("unable to create a bootstrap: %v", err)
 	}
 
-	if err := run(starterConfigs, conf, b, prometheus.DefaultRegisterer); err != nil {
+	dbPromRegistry := prometheus.NewRegistry()
+
+	if err := run(starterConfigs, conf, b, prometheus.DefaultRegisterer, dbPromRegistry); err != nil {
 		logger.Fatalf("%v", err)
 	}
 }
@@ -192,7 +196,16 @@ func configure(conf config.Config) {
 	sentry.ConfigureSentry(version.GetVersion(), conf.Sentry)
 }
 
-func run(cfgs []starter.Config, conf config.Config, b bootstrap.Listener, promreg prometheus.Registerer) error {
+func run(
+	cfgs []starter.Config,
+	conf config.Config,
+	b bootstrap.Listener,
+	promreg prometheus.Registerer,
+	dbPromRegistry interface {
+		prometheus.Registerer
+		prometheus.Gatherer
+	},
+) error {
 	nodeLatencyHistogram, err := metrics.RegisterNodeLatency(conf.Prometheus, promreg)
 	if err != nil {
 		return err
@@ -390,9 +403,18 @@ func run(cfgs []starter.Config, conf config.Config, b bootstrap.Listener, promre
 	)
 	metricsCollectors = append(metricsCollectors, transactionManager, coordinator, repl)
 	if db != nil {
-		promreg.MustRegister(
-			datastore.NewRepositoryStoreCollector(logger, conf.VirtualStorageNames(), db, conf.Prometheus.ScrapeTimeout),
-		)
+		repositoryStoreCollector := datastore.NewRepositoryStoreCollector(logger, conf.VirtualStorageNames(), db, conf.Prometheus.ScrapeTimeout)
+
+		// Eventually, database-related metrics will always be exported via a separate
+		// endpoint such that it's possible to set a different scraping interval and thus to
+		// reduce database load. For now though, we register the metrics twice, once for the
+		// standard and once for the database-specific endpoint. This is done to ensure a
+		// transitory period where deployments can be moved to the new endpoint without
+		// causing breakage if they still use the old endpoint.
+		dbPromRegistry.MustRegister(repositoryStoreCollector)
+		if !conf.PrometheusExcludeDatabaseFromDefaultMetrics {
+			promreg.MustRegister(repositoryStoreCollector)
+		}
 	}
 	promreg.MustRegister(metricsCollectors...)
 
@@ -415,9 +437,13 @@ func run(cfgs []starter.Config, conf config.Config, b bootstrap.Listener, promre
 				return err
 			}
 
+			serveMux := http.NewServeMux()
+			serveMux.Handle("/db_metrics", promhttp.HandlerFor(dbPromRegistry, promhttp.HandlerOpts{}))
+
 			go func() {
 				if err := monitoring.Start(
 					monitoring.WithListener(l),
+					monitoring.WithServeMux(serveMux),
 					monitoring.WithBuildInformation(praefect.GetVersion(), praefect.GetBuildTime())); err != nil {
 					logger.WithError(err).Errorf("Unable to start prometheus listener: %v", conf.PrometheusListenAddr)
 				}
diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go
index abc4b93b9f6..bb56dee3c6f 100644
--- a/cmd/praefect/subcmd_remove_repository_test.go
+++ b/cmd/praefect/subcmd_remove_repository_test.go
@@ -112,7 +112,7 @@ func TestRemoveRepository_Exec(t *testing.T) {
 	bootstrapper := bootstrap.NewNoop()
 	go func() {
 		defer close(stopped)
-		assert.NoError(t, run(starterConfigs, conf, bootstrapper, prometheus.NewRegistry()))
+		assert.NoError(t, run(starterConfigs, conf, bootstrapper, prometheus.NewRegistry(), prometheus.NewRegistry()))
 	}()
 
 	cc, err := client.Dial("unix://"+conf.SocketPath, nil)
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index bef7f8d6222..1e3cf682db5 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -123,10 +123,16 @@ type Config struct {
 	Sentry               sentry.Config     `toml:"sentry"`
 	PrometheusListenAddr string            `toml:"prometheus_listen_addr"`
 	Prometheus           prometheus.Config `toml:"prometheus"`
-	Auth                 auth.Config       `toml:"auth"`
-	TLS                  config.TLS        `toml:"tls"`
-	DB                   `toml:"database"`
-	Failover             Failover `toml:"failover"`
+	// PrometheusExcludeDatabaseFromDefaultMetrics excludes database-related metrics from the
+	// default metrics. If set to `false`, then database metrics will be available both via
+	// `/metrics` and `/db_metrics`. Otherwise, they will only be accessible via `/db_metrics`.
+	// Defaults to `false`. This is used as a transitory configuration key: eventually, database
+	// metrics will always be removed from the standard metrics endpoint.
+	PrometheusExcludeDatabaseFromDefaultMetrics bool        `toml:"prometheus_exclude_database_from_default_metrics"`
+	Auth                                        auth.Config `toml:"auth"`
+	TLS                                         config.TLS  `toml:"tls"`
+	DB                                          `toml:"database"`
+	Failover                                    Failover `toml:"failover"`
 	// Keep for legacy reasons: remove after Omnibus has switched
 	FailoverEnabled     bool                `toml:"failover_enabled"`
 	MemoryQueueEnabled  bool                `toml:"memory_queue_enabled"`
-- 
GitLab


From 3cde9b5e764616dc81e4986d4a8cdc0272bb23ac Mon Sep 17 00:00:00 2001
From: John Cai <jcai@gitlab.com>
Date: Wed, 17 Nov 2021 19:38:30 -0500
Subject: [PATCH 4/4] praefect: Do not collect repository store metrics on
 startup

Our current code path will trigger the RepositoryStoreCollector to query
the database on startup, even if the prometheus listener is not
listening. This is because we call DescribeByCollect in the Describe
method. The Prometheus client will call Describe on Register, which ends
up triggering the Collect method and hence runs the queries. Instead, we
can just provide the decriptions separately from the Collect method.

Changelog: fixed
(cherry picked from commit 90cb7fb7b9f8703547fa62719650394478653c62)
---
 internal/praefect/datastore/collector.go      | 40 +++++++++++--------
 internal/praefect/datastore/collector_test.go | 34 ++++++++++++++++
 2 files changed, 57 insertions(+), 17 deletions(-)

diff --git a/internal/praefect/datastore/collector.go b/internal/praefect/datastore/collector.go
index 71f145e67da..40390b47af3 100644
--- a/internal/praefect/datastore/collector.go
+++ b/internal/praefect/datastore/collector.go
@@ -11,21 +11,25 @@ import (
 	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
 )
 
-// This is kept for backwards compatibility as some alerting rules depend on this.
-// The unavailable repositories is a more accurate description for the metric and
-// is exported below so we can migrate to it.
-var descReadOnlyRepositories = prometheus.NewDesc(
-	"gitaly_praefect_read_only_repositories",
-	"Number of repositories in read-only mode within a virtual storage.",
-	[]string{"virtual_storage"},
-	nil,
-)
-
-var descUnavailableRepositories = prometheus.NewDesc(
-	"gitaly_praefect_unavailable_repositories",
-	"Number of repositories that have no healthy, up to date replicas.",
-	[]string{"virtual_storage"},
-	nil,
+var (
+	// This is kept for backwards compatibility as some alerting rules depend on this.
+	// The unavailable repositories is a more accurate description for the metric and
+	// is exported below so we can migrate to it.
+	descReadOnlyRepositories = prometheus.NewDesc(
+		"gitaly_praefect_read_only_repositories",
+		"Number of repositories in read-only mode within a virtual storage.",
+		[]string{"virtual_storage"},
+		nil,
+	)
+
+	descUnavailableRepositories = prometheus.NewDesc(
+		"gitaly_praefect_unavailable_repositories",
+		"Number of repositories that have no healthy, up to date replicas.",
+		[]string{"virtual_storage"},
+		nil,
+	)
+
+	descriptions = []*prometheus.Desc{descReadOnlyRepositories, descUnavailableRepositories}
 )
 
 // RepositoryStoreCollector collects metrics from the RepositoryStore.
@@ -47,7 +51,9 @@ func NewRepositoryStoreCollector(log logrus.FieldLogger, virtualStorages []strin
 }
 
 func (c *RepositoryStoreCollector) Describe(ch chan<- *prometheus.Desc) {
-	prometheus.DescribeByCollect(c, ch)
+	for _, desc := range descriptions {
+		ch <- desc
+	}
 }
 
 func (c *RepositoryStoreCollector) Collect(ch chan<- prometheus.Metric) {
@@ -61,7 +67,7 @@ func (c *RepositoryStoreCollector) Collect(ch chan<- prometheus.Metric) {
 	}
 
 	for _, vs := range c.virtualStorages {
-		for _, desc := range []*prometheus.Desc{descReadOnlyRepositories, descUnavailableRepositories} {
+		for _, desc := range descriptions {
 			ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(unavailableCounts[vs]), vs)
 		}
 	}
diff --git a/internal/praefect/datastore/collector_test.go b/internal/praefect/datastore/collector_test.go
index 3514b123937..3cc972e363a 100644
--- a/internal/praefect/datastore/collector_test.go
+++ b/internal/praefect/datastore/collector_test.go
@@ -2,14 +2,18 @@ package datastore
 
 import (
 	"context"
+	"database/sql"
+	"errors"
 	"fmt"
 	"strings"
 	"testing"
 	"time"
 
+	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/client_golang/prometheus/testutil"
 	"github.com/sirupsen/logrus"
 	"github.com/sirupsen/logrus/hooks/test"
+	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
 	"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
@@ -211,3 +215,33 @@ gitaly_praefect_unavailable_repositories{virtual_storage="virtual-storage-2"} 0
 		})
 	}
 }
+
+type checkIfQueriedDB struct {
+	queried bool
+}
+
+func (c *checkIfQueriedDB) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
+	c.queried = true
+	return nil, errors.New("QueryContext should not be called")
+}
+
+func (c *checkIfQueriedDB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
+	c.queried = true
+	return &sql.Row{}
+}
+
+func (c *checkIfQueriedDB) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
+	c.queried = true
+	return nil, errors.New("ExecContext should not be called")
+}
+
+func TestRepositoryStoreCollector_CollectNotCalledOnRegister(t *testing.T) {
+	logger, _ := test.NewNullLogger()
+
+	var db checkIfQueriedDB
+	c := NewRepositoryStoreCollector(logger, []string{"virtual-storage-1", "virtual-storage-2"}, &db, 2*time.Second)
+	registry := prometheus.NewRegistry()
+	registry.MustRegister(c)
+
+	assert.False(t, db.queried)
+}
-- 
GitLab