Skip to content
Snippets Groups Projects
Commit 26006f80 authored by John Cai's avatar John Cai
Browse files

Add track-repository praefect subcmd

Adds track-repository subcommand that allows an admin to add a repository that
exists on one of the gitaly nodes to the praefect database. Does some
safety checks before inserting the reords.

Changelog: added
parent 743b3226
No related branches found
No related tags found
1 merge request!3918Add track-repository praefect subcmd
......@@ -33,6 +33,7 @@ var subcommands = map[string]subcmd{
"accept-dataloss": &acceptDatalossSubcommand{},
"set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout),
removeRepositoryCmdName: newRemoveRepository(logger),
trackRepositoryCmdName: newTrackRepository(logger),
}
// subCommand returns an exit code, to be fed into os.Exit.
......
package main
import (
"context"
"database/sql"
"errors"
"flag"
"fmt"
"math/rand"
"time"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/correlation"
"google.golang.org/grpc/metadata"
)
const (
trackRepositoryCmdName = "track-repository"
)
type trackRepository struct {
logger logrus.FieldLogger
virtualStorage string
relativePath string
authoritativeStorage string
}
var errAuthoritativeRepositoryNotExist = errors.New("authoritative repository does not exist")
func newTrackRepository(logger logrus.FieldLogger) *trackRepository {
return &trackRepository{logger: logger}
}
func (cmd *trackRepository) FlagSet() *flag.FlagSet {
fs := flag.NewFlagSet(trackRepositoryCmdName, flag.ExitOnError)
fs.StringVar(&cmd.virtualStorage, paramVirtualStorage, "", "name of the repository's virtual storage")
fs.StringVar(&cmd.relativePath, paramRelativePath, "", "relative path to the repository")
fs.StringVar(&cmd.authoritativeStorage, paramAuthoritativeStorage, "", "storage with the repository to consider as authoritative")
fs.Usage = func() {
_, _ = printfErr("Description:\n" +
" This command adds a given repository to be tracked by Praefect.\n" +
" It checks if the repository exists on disk on the authoritative storage, " +
" and whether database records are absent from tracking the repository.")
fs.PrintDefaults()
}
return fs
}
func (cmd trackRepository) Exec(flags *flag.FlagSet, cfg config.Config) error {
switch {
case flags.NArg() > 0:
return unexpectedPositionalArgsError{Command: flags.Name()}
case cmd.virtualStorage == "":
return requiredParameterError(paramVirtualStorage)
case cmd.relativePath == "":
return requiredParameterError(paramRelativePath)
case cmd.authoritativeStorage == "":
if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
return requiredParameterError(paramAuthoritativeStorage)
}
}
db, err := glsql.OpenDB(cfg.DB)
if err != nil {
return fmt.Errorf("connect to database: %w", err)
}
defer func() { _ = db.Close() }()
ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID())
logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx))
return cmd.exec(ctx, logger, db, cfg)
}
const trackRepoErrorPrefix = "attempting to track repository in praefect database"
func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, cfg config.Config) error {
logger.WithFields(logrus.Fields{
"virtual_storage": cmd.virtualStorage,
"relative_path": cmd.relativePath,
"authoritative_storage": cmd.authoritativeStorage,
}).Debug("track repository")
var primary string
var secondaries []string
var variableReplicationFactorEnabled, savePrimary bool
if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
savePrimary = true
primary = cmd.authoritativeStorage
for _, vs := range cfg.VirtualStorages {
if vs.Name == cmd.virtualStorage {
for _, node := range vs.Nodes {
if node.Storage == cmd.authoritativeStorage {
continue
}
secondaries = append(secondaries, node.Storage)
}
}
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
replicationFactor := cfg.DefaultReplicationFactors()[cmd.virtualStorage]
if replicationFactor > 0 {
variableReplicationFactorEnabled = true
// Select random secondaries according to the default replication factor.
r.Shuffle(len(secondaries), func(i, j int) {
secondaries[i], secondaries[j] = secondaries[j], secondaries[i]
})
secondaries = secondaries[:replicationFactor-1]
}
} else {
savePrimary = false
if err := db.QueryRowContext(ctx, `SELECT node_name FROM shard_primaries WHERE shard_name = $1 AND demoted = 'false'`, cmd.virtualStorage).Scan(&primary); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("%s: no primaries found", trackRepoErrorPrefix)
}
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
}
authoritativeRepoExists, err := cmd.authoritativeRepositoryExists(ctx, cfg, primary)
if err != nil {
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
if !authoritativeRepoExists {
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, errAuthoritativeRepositoryNotExist)
}
if err := cmd.trackRepository(
ctx,
datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()),
primary,
secondaries,
savePrimary,
variableReplicationFactorEnabled,
); err != nil {
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
logger.Debug("finished adding new repository to be tracked in praefect database.")
return nil
}
func (cmd *trackRepository) trackRepository(
ctx context.Context,
ds *datastore.PostgresRepositoryStore,
primary string,
secondaries []string,
savePrimary bool,
variableReplicationFactorEnabled bool,
) error {
repositoryID, err := ds.ReserveRepositoryID(ctx, cmd.virtualStorage, cmd.relativePath)
if err != nil {
if errors.Is(err, commonerr.ErrRepositoryAlreadyExists) {
cmd.logger.Print("repository is already tracked in praefect database")
return nil
}
return fmt.Errorf("ReserveRepositoryID: %w", err)
}
if err := ds.CreateRepository(
ctx,
repositoryID,
cmd.virtualStorage,
cmd.relativePath,
primary,
nil,
secondaries,
savePrimary,
variableReplicationFactorEnabled,
); err != nil {
return fmt.Errorf("CreateRepository: %w", err)
}
return nil
}
func (cmd *trackRepository) repositoryExists(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) {
conn, err := subCmdDial(addr, token)
if err != nil {
return false, fmt.Errorf("error dialing: %w", err)
}
defer func() { _ = conn.Close() }()
ctx = metadata.AppendToOutgoingContext(ctx, "client_name", trackRepositoryCmdName)
repositoryClient := gitalypb.NewRepositoryServiceClient(conn)
res, err := repositoryClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{Repository: repo})
if err != nil {
return false, err
}
return res.GetExists(), nil
}
func (cmd *trackRepository) authoritativeRepositoryExists(ctx context.Context, cfg config.Config, nodeName string) (bool, error) {
for _, vs := range cfg.VirtualStorages {
if vs.Name != cmd.virtualStorage {
continue
}
for _, node := range vs.Nodes {
if node.Storage == nodeName {
logger.Debugf("check if repository %q exists on gitaly %q at %q", cmd.relativePath, node.Storage, node.Address)
repo := &gitalypb.Repository{
StorageName: node.Storage,
RelativePath: cmd.relativePath,
}
exists, err := cmd.repositoryExists(ctx, repo, node.Address, node.Token)
if err != nil {
logger.WithError(err).Warnf("checking if repository exists %q, %q", node.Storage, cmd.relativePath)
return false, nil
}
return exists, nil
}
}
return false, fmt.Errorf("node %q not found", cmd.authoritativeStorage)
}
return false, fmt.Errorf("virtual storage %q not found", cmd.virtualStorage)
}
package main
import (
"flag"
"fmt"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/client"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
)
func TestAddRepository_FlagSet(t *testing.T) {
t.Parallel()
cmd := &trackRepository{}
fs := cmd.FlagSet()
require.NoError(t, fs.Parse([]string{"--virtual-storage", "vs", "--repository", "repo", "--authoritative-storage", "storage-0"}))
require.Equal(t, "vs", cmd.virtualStorage)
require.Equal(t, "repo", cmd.relativePath)
require.Equal(t, "storage-0", cmd.authoritativeStorage)
}
func TestAddRepository_Exec_invalidArgs(t *testing.T) {
t.Parallel()
t.Run("not all flag values processed", func(t *testing.T) {
cmd := trackRepository{}
flagSet := flag.NewFlagSet("cmd", flag.PanicOnError)
require.NoError(t, flagSet.Parse([]string{"stub"}))
err := cmd.Exec(flagSet, config.Config{})
require.EqualError(t, err, "cmd doesn't accept positional arguments")
})
t.Run("virtual-storage is not set", func(t *testing.T) {
cmd := trackRepository{}
err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{})
require.EqualError(t, err, `"virtual-storage" is a required parameter`)
})
t.Run("repository is not set", func(t *testing.T) {
cmd := trackRepository{virtualStorage: "stub"}
err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{})
require.EqualError(t, err, `"repository" is a required parameter`)
})
t.Run("authoritative-storage is not set", func(t *testing.T) {
cmd := trackRepository{virtualStorage: "stub", relativePath: "path/to/repo"}
err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}})
require.EqualError(t, err, `"authoritative-storage" is a required parameter`)
})
t.Run("db connection error", func(t *testing.T) {
cmd := trackRepository{virtualStorage: "stub", relativePath: "stub", authoritativeStorage: "storage-0"}
cfg := config.Config{DB: config.DB{Host: "stub", SSLMode: "disable"}}
err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), cfg)
require.Error(t, err)
require.Contains(t, err.Error(), "connect to database: dial tcp: lookup stub")
})
}
func TestAddRepository_Exec(t *testing.T) {
t.Parallel()
g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
g1Srv := testserver.StartGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
defer g2Srv.Shutdown()
defer g1Srv.Shutdown()
g1Addr := g1Srv.Address()
db := glsql.NewDB(t)
var database string
require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database))
dbConf := glsql.GetDBConfig(t, database)
virtualStorageName := "praefect"
conf := config.Config{
AllowLegacyElectors: true,
SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
VirtualStorages: []*config.VirtualStorage{
{
Name: virtualStorageName,
Nodes: []*config.Node{
{Storage: g1Cfg.Storages[0].Name, Address: g1Addr},
{Storage: g2Cfg.Storages[0].Name, Address: g2Srv.Address()},
},
DefaultReplicationFactor: 2,
},
},
DB: dbConf,
}
gitalyCC, err := client.Dial(g1Addr, nil)
require.NoError(t, err)
defer func() { require.NoError(t, gitalyCC.Close()) }()
ctx, cancel := testhelper.Context()
defer cancel()
gitaly1RepositoryClient := gitalypb.NewRepositoryServiceClient(gitalyCC)
createRepoThroughGitaly1 := func(relativePath string) error {
_, err := gitaly1RepositoryClient.CreateRepository(
ctx,
&gitalypb.CreateRepositoryRequest{
Repository: &gitalypb.Repository{
StorageName: g1Cfg.Storages[0].Name,
RelativePath: relativePath,
},
})
return err
}
testCases := map[string]struct {
failoverConfig config.Failover
authoritativeStorage string
}{
"sql election": {
failoverConfig: config.Failover{
Enabled: true,
ElectionStrategy: config.ElectionStrategySQL,
},
authoritativeStorage: "",
},
"per repository election": {
failoverConfig: config.Failover{
Enabled: true,
ElectionStrategy: config.ElectionStrategyPerRepository,
},
authoritativeStorage: g1Cfg.Storages[0].Name,
},
}
logger := testhelper.NewTestLogger(t)
for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
addCmdConf := conf
addCmdConf.Failover = tc.failoverConfig
t.Run("ok", func(t *testing.T) {
nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), addCmdConf, db.DB, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
relativePath := fmt.Sprintf("path/to/test/repo_%s", tn)
repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
require.NoError(t, createRepoThroughGitaly1(relativePath))
rmRepoCmd := &removeRepository{
logger: logger,
virtualStorage: virtualStorageName,
relativePath: relativePath,
}
require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
// create the repo on Gitaly without Praefect knowing
require.NoError(t, createRepoThroughGitaly1(relativePath))
require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath))
require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath))
addRepoCmd := &trackRepository{
logger: logger,
virtualStorage: virtualStorageName,
relativePath: relativePath,
authoritativeStorage: tc.authoritativeStorage,
}
require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf))
as := datastore.NewAssignmentStore(db, conf.StorageNames())
assignments, err := as.GetHostAssignments(ctx, virtualStorageName, relativePath)
require.NoError(t, err)
require.Len(t, assignments, 2)
assert.Contains(t, assignments, g1Cfg.Storages[0].Name)
assert.Contains(t, assignments, g2Cfg.Storages[0].Name)
exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, relativePath)
require.NoError(t, err)
assert.True(t, exists)
})
t.Run("repository does not exist", func(t *testing.T) {
relativePath := fmt.Sprintf("path/to/test/repo_1_%s", tn)
cmd := &trackRepository{
logger: testhelper.NewTestLogger(t),
virtualStorage: "praefect",
relativePath: relativePath,
authoritativeStorage: tc.authoritativeStorage,
}
assert.ErrorIs(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf), errAuthoritativeRepositoryNotExist)
})
t.Run("records already exist", func(t *testing.T) {
relativePath := fmt.Sprintf("path/to/test/repo_2_%s", tn)
require.NoError(t, createRepoThroughGitaly1(relativePath))
require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath))
require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath))
ds := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
id, err := ds.ReserveRepositoryID(ctx, virtualStorageName, relativePath)
require.NoError(t, err)
require.NoError(t, ds.CreateRepository(ctx, id, virtualStorageName, relativePath, g1Cfg.Storages[0].Name, nil, nil, true, true))
cmd := &trackRepository{
logger: testhelper.NewTestLogger(t),
virtualStorage: virtualStorageName,
relativePath: relativePath,
authoritativeStorage: tc.authoritativeStorage,
}
assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf))
})
})
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment