Skip to content
Snippets Groups Projects
Commit ee6b6388 authored by Pavlo Strokov's avatar Pavlo Strokov
Browse files

remove-repository: A new sub-command for the praefect to remove repository

Users of the praefect are missing tools to manage state of the cluster.
One of such tools is a repository removal cli. It should be used to
remove any repository from the cluster. The removal covers cleanup of
the database and removal from the gitaly storages. The command tries
to remove as much as possible first by removing from praefect, replication
event queue and each gitaly node configured in the provided config file.

Part of: #3771

Changelog: added
parent 229ba800
No related branches found
No related tags found
Loading
......@@ -32,6 +32,7 @@ var subcommands = map[string]subcmd{
"dataloss": newDatalossSubcommand(),
"accept-dataloss": &acceptDatalossSubcommand{},
"set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout),
removeRepositoryCmdName: newRemoveRepository(logger),
}
// subCommand returns an exit code, to be fed into os.Exit.
......@@ -71,6 +72,8 @@ func getNodeAddress(cfg config.Config) (string, error) {
return "unix:" + cfg.SocketPath, nil
case cfg.ListenAddr != "":
return "tcp://" + cfg.ListenAddr, nil
case cfg.TLSListenAddr != "":
return "tls://" + cfg.TLSListenAddr, nil
default:
return "", errors.New("no Praefect address configured")
}
......
package main
import (
"context"
"database/sql"
"errors"
"flag"
"fmt"
"strings"
"sync"
"time"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/correlation"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
const (
removeRepositoryCmdName = "remove-repository"
)
type removeRepository struct {
logger logrus.FieldLogger
virtualStorage string
relativePath string
}
func newRemoveRepository(logger logrus.FieldLogger) *removeRepository {
return &removeRepository{logger: logger}
}
func (cmd *removeRepository) FlagSet() *flag.FlagSet {
fs := flag.NewFlagSet(removeRepositoryCmdName, flag.ExitOnError)
fs.StringVar(&cmd.virtualStorage, paramVirtualStorage, "", "name of the repository's virtual storage")
fs.StringVar(&cmd.relativePath, paramRelativePath, "", "relative path to the repository")
fs.Usage = func() {
_, _ = printfErr("Description:\n" +
" This command removes all state associated with a given repository from the Gitaly Cluster.\n" +
" This includes both on-disk repositories on all relevant Gitaly nodes as well as any potential\n" +
" database state as tracked by Praefect.\n")
fs.PrintDefaults()
_, _ = printfErr("NOTE:\n" +
" It may happen that parts of the repository continue to exist after this command, either because\n" +
" of an error that happened during deletion or because of in-flight RPC calls targeting the repository.\n" +
" It is safe and recommended to re-run this command in such a case.\n")
}
return fs
}
func (cmd removeRepository) Exec(flags *flag.FlagSet, cfg config.Config) error {
switch {
case flags.NArg() > 0:
return unexpectedPositionalArgsError{Command: flags.Name()}
case cmd.virtualStorage == "":
return requiredParameterError(paramVirtualStorage)
case cmd.relativePath == "":
return requiredParameterError(paramRelativePath)
}
db, err := glsql.OpenDB(cfg.DB)
if err != nil {
return fmt.Errorf("connect to database: %w", err)
}
defer func() { _ = db.Close() }()
ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID())
logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx))
return cmd.exec(ctx, logger, db, cfg)
}
func (cmd *removeRepository) exec(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, cfg config.Config) error {
// Remove repository explicitly from all storages and clean up database info.
// This prevents creation of the new replication events.
logger.WithFields(logrus.Fields{
"virtual_storage": cmd.virtualStorage,
"relative_path": cmd.relativePath,
}).Debug("remove repository")
addr, err := getNodeAddress(cfg)
if err != nil {
return fmt.Errorf("get praefect address from config: %w", err)
}
logger.Debugf("remove repository info from praefect database %q", addr)
removed, err := cmd.removeRepositoryFromDatabase(ctx, db)
if err != nil {
return fmt.Errorf("remove repository info from praefect database: %w", err)
}
if !removed {
logger.Warn("praefect database has no info about the repository")
}
logger.Debug("removal of the repository info from praefect database completed")
logger.Debug("remove replication events")
ticker := helper.NewTimerTicker(time.Second)
defer ticker.Stop()
if err := cmd.removeReplicationEvents(ctx, logger, db, ticker); err != nil {
return fmt.Errorf("remove scheduled replication events: %w", err)
}
logger.Debug("replication events removal completed")
// We should try to remove repository from each of gitaly nodes.
logger.Debug("remove repository directly by each gitaly node")
cmd.removeRepositoryForEachGitaly(ctx, cfg, logger)
logger.Debug("direct repository removal by each gitaly node completed")
return nil
}
func (cmd *removeRepository) removeRepositoryFromDatabase(ctx context.Context, db *sql.DB) (bool, error) {
var removed bool
if err := db.QueryRowContext(
ctx,
`WITH remove_storages_info AS (
DELETE FROM storage_repositories
WHERE virtual_storage = $1 AND relative_path = $2
)
DELETE FROM repositories
WHERE virtual_storage = $1 AND relative_path = $2
RETURNING TRUE`,
cmd.virtualStorage,
cmd.relativePath,
).Scan(&removed); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return false, nil
}
return false, fmt.Errorf("query row: %w", err)
}
return removed, nil
}
func (cmd *removeRepository) removeRepository(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) {
conn, err := subCmdDial(addr, token)
if err != nil {
return false, fmt.Errorf("error dialing: %w", err)
}
defer func() { _ = conn.Close() }()
ctx = metadata.AppendToOutgoingContext(ctx, "client_name", removeRepositoryCmdName)
repositoryClient := gitalypb.NewRepositoryServiceClient(conn)
if _, err := repositoryClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: repo}); err != nil {
s, ok := status.FromError(err)
if !ok {
return false, fmt.Errorf("RemoveRepository: %w", err)
}
if !strings.Contains(s.Message(), fmt.Sprintf("get primary: repository %q/%q not found", cmd.virtualStorage, cmd.relativePath)) {
return false, fmt.Errorf("RemoveRepository: %w", err)
}
return false, nil
}
return true, nil
}
func (cmd *removeRepository) removeReplicationEvents(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, ticker helper.Ticker) error {
// Wait for the completion of the repository replication jobs.
// As some of them could be a repository creation jobs we need to remove those newly created
// repositories after replication finished.
start := time.Now()
for found := true; found; {
ticker.Reset()
<-ticker.C()
if int(time.Since(start).Seconds())%5 == 0 {
logger.Debug("awaiting for the repository in_progress replication jobs to complete...")
}
row := db.QueryRowContext(
ctx,
`WITH remove_replication_jobs AS (
DELETE FROM replication_queue
WHERE job->>'virtual_storage' = $1
AND job->>'relative_path' = $2
-- Do not remove ongoing replication events as we need to wait
-- for their completion.
AND state != 'in_progress'
)
SELECT EXISTS(
SELECT
FROM replication_queue
WHERE job->>'virtual_storage' = $1
AND job->>'relative_path' = $2
AND state = 'in_progress')`,
cmd.virtualStorage,
cmd.relativePath,
)
if err := row.Scan(&found); err != nil {
return fmt.Errorf("scan in progress jobs: %w", err)
}
}
return nil
}
func (cmd *removeRepository) removeRepositoryForEachGitaly(ctx context.Context, cfg config.Config, logger logrus.FieldLogger) {
for _, vs := range cfg.VirtualStorages {
if vs.Name == cmd.virtualStorage {
var wg sync.WaitGroup
for i := 0; i < len(vs.Nodes); i++ {
wg.Add(1)
go func(node *config.Node) {
defer wg.Done()
logger.Debugf("remove repository with gitaly %q at %q", node.Storage, node.Address)
repo := &gitalypb.Repository{
StorageName: node.Storage,
RelativePath: cmd.relativePath,
}
_, err := cmd.removeRepository(ctx, repo, node.Address, node.Token)
if err != nil {
logger.WithError(err).Warnf("repository removal failed for gitaly %q", node.Storage)
}
logger.Debugf("repository removal call to gitaly %q completed", node.Storage)
}(vs.Nodes[i])
}
wg.Wait()
break
}
}
}
package main
import (
"flag"
"path/filepath"
"strings"
"syscall"
"testing"
"time"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/client"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
)
func TestRemoveRepository_FlagSet(t *testing.T) {
t.Parallel()
cmd := &removeRepository{}
fs := cmd.FlagSet()
require.NoError(t, fs.Parse([]string{"--virtual-storage", "vs", "--repository", "repo"}))
require.Equal(t, "vs", cmd.virtualStorage)
require.Equal(t, "repo", cmd.relativePath)
}
func TestRemoveRepository_Exec_invalidArgs(t *testing.T) {
t.Parallel()
t.Run("not all flag values processed", func(t *testing.T) {
cmd := removeRepository{}
flagSet := flag.NewFlagSet("cmd", flag.PanicOnError)
require.NoError(t, flagSet.Parse([]string{"stub"}))
err := cmd.Exec(flagSet, config.Config{})
require.EqualError(t, err, "cmd doesn't accept positional arguments")
})
t.Run("virtual-storage is not set", func(t *testing.T) {
cmd := removeRepository{}
err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{})
require.EqualError(t, err, `"virtual-storage" is a required parameter`)
})
t.Run("repository is not set", func(t *testing.T) {
cmd := removeRepository{virtualStorage: "stub"}
err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{})
require.EqualError(t, err, `"repository" is a required parameter`)
})
t.Run("db connection error", func(t *testing.T) {
cmd := removeRepository{virtualStorage: "stub", relativePath: "stub"}
cfg := config.Config{DB: config.DB{Host: "stub", SSLMode: "disable"}}
err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), cfg)
require.Error(t, err)
require.Contains(t, err.Error(), "connect to database: dial tcp: lookup stub")
})
t.Run("praefect address is not set in config ", func(t *testing.T) {
cmd := removeRepository{virtualStorage: "stub", relativePath: "stub", logger: testhelper.NewTestLogger(t)}
db := glsql.NewDB(t)
var database string
require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database))
dbConf := glsql.GetDBConfig(t, database)
err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{DB: dbConf})
require.EqualError(t, err, "get praefect address from config: no Praefect address configured")
})
}
func TestRemoveRepository_Exec(t *testing.T) {
t.Parallel()
g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
db := glsql.NewDB(t)
var database string
require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database))
dbConf := glsql.GetDBConfig(t, database)
conf := config.Config{
SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
VirtualStorages: []*config.VirtualStorage{
{
Name: "praefect",
Nodes: []*config.Node{
{Storage: g1Cfg.Storages[0].Name, Address: g1Addr},
{Storage: g2Cfg.Storages[0].Name, Address: g2Srv.Address()},
},
},
},
DB: dbConf,
Failover: config.Failover{
Enabled: true,
ElectionStrategy: config.ElectionStrategyPerRepository,
},
}
starterConfigs, err := getStarterConfigs(conf)
require.NoError(t, err)
stopped := make(chan struct{})
go func() {
defer close(stopped)
err := run(starterConfigs, conf)
assert.EqualError(t, err, `received signal "terminated"`)
}()
cc, err := client.Dial("unix://"+conf.SocketPath, nil)
require.NoError(t, err)
defer func() { require.NoError(t, cc.Close()) }()
repoClient := gitalypb.NewRepositoryServiceClient(cc)
ctx, cancel := testhelper.Context()
defer cancel()
createRepo := func(t *testing.T, storageName, relativePath string) *gitalypb.Repository {
t.Helper()
repo := &gitalypb.Repository{
StorageName: storageName,
RelativePath: relativePath,
}
for i := 0; true; i++ {
_, err := repoClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: repo})
if err != nil {
require.Regexp(t, "(no healthy nodes)|(no such file or directory)|(connection refused)", err.Error())
require.Less(t, i, 100, "praefect doesn't serve for too long")
time.Sleep(50 * time.Millisecond)
} else {
break
}
}
return repo
}
praefectStorage := conf.VirtualStorages[0].Name
t.Run("ok", func(t *testing.T) {
repo := createRepo(t, praefectStorage, "path/to/test/repo")
cmd := &removeRepository{
logger: testhelper.NewTestLogger(t),
virtualStorage: repo.StorageName,
relativePath: repo.RelativePath,
}
require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
var repositoryRowExists bool
require.NoError(t, db.QueryRow(
`SELECT EXISTS(SELECT FROM repositories WHERE virtual_storage = $1 AND relative_path = $2)`,
cmd.virtualStorage, cmd.relativePath,
).Scan(&repositoryRowExists))
require.False(t, repositoryRowExists)
})
t.Run("no info about repository on praefect", func(t *testing.T) {
repo := createRepo(t, praefectStorage, "path/to/test/repo")
repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil)
require.NoError(t, repoStore.DeleteRepository(
ctx, repo.StorageName, repo.RelativePath, []string{g1Cfg.Storages[0].Name, g2Cfg.Storages[0].Name},
))
logger := testhelper.NewTestLogger(t)
loggerHook := test.NewLocal(logger)
cmd := &removeRepository{
logger: logrus.NewEntry(logger),
virtualStorage: praefectStorage,
relativePath: repo.RelativePath,
}
require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
var found bool
for _, entry := range loggerHook.AllEntries() {
if strings.Contains(entry.Message, "praefect database has no info about the repository") {
found = true
}
}
require.True(t, found, "no expected message in the log")
require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
requireNoDatabaseInfo(t, db, cmd)
})
t.Run("one of gitalies is out of service", func(t *testing.T) {
repo := createRepo(t, praefectStorage, "path/to/test/repo")
g2Srv.Shutdown()
logger := testhelper.NewTestLogger(t)
loggerHook := test.NewLocal(logger)
cmd := &removeRepository{
logger: logrus.NewEntry(logger),
virtualStorage: praefectStorage,
relativePath: repo.RelativePath,
}
for {
err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)
if err == nil {
break
}
regexp := "(transport: Error while dialing dial unix /" + strings.TrimPrefix(g2Srv.Address(), "unix:/") + ")|(primary gitaly is not healthy)"
require.Regexp(t, regexp, err.Error())
time.Sleep(time.Second)
}
var found bool
for _, entry := range loggerHook.AllEntries() {
if strings.Contains(entry.Message, `repository removal failed for gitaly "gitaly-2"`) {
found = true
break
}
}
require.True(t, found, "no expected message in the log")
require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
requireNoDatabaseInfo(t, db, cmd)
})
require.NoError(t, syscall.Kill(syscall.Getpid(), syscall.SIGTERM))
<-stopped
}
func requireNoDatabaseInfo(t *testing.T, db glsql.DB, cmd *removeRepository) {
t.Helper()
var repositoryRowExists bool
require.NoError(t, db.QueryRow(
`SELECT EXISTS(SELECT FROM repositories WHERE virtual_storage = $1 AND relative_path = $2)`,
cmd.virtualStorage, cmd.relativePath,
).Scan(&repositoryRowExists))
require.False(t, repositoryRowExists)
var storageRowExists bool
require.NoError(t, db.QueryRow(
`SELECT EXISTS(SELECT FROM storage_repositories WHERE virtual_storage = $1 AND relative_path = $2)`,
cmd.virtualStorage, cmd.relativePath,
).Scan(&storageRowExists))
require.False(t, storageRowExists)
}
func TestRemoveRepository_removeReplicationEvents(t *testing.T) {
t.Parallel()
const (
virtualStorage = "praefect"
relativePath = "relative_path/to/repo.git"
)
ctx, cancel := testhelper.Context()
defer cancel()
db := glsql.NewDB(t)
queue := datastore.NewPostgresReplicationEventQueue(db)
// Set replication event in_progress.
inProgressEvent, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
Change: datastore.CreateRepo,
VirtualStorage: virtualStorage,
TargetNodeStorage: "gitaly-2",
RelativePath: relativePath,
},
})
require.NoError(t, err)
inProgress1, err := queue.Dequeue(ctx, virtualStorage, "gitaly-2", 10)
require.NoError(t, err)
require.Len(t, inProgress1, 1)
// New event - events in the 'ready' state should be removed.
_, err = queue.Enqueue(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
Change: datastore.UpdateRepo,
VirtualStorage: virtualStorage,
TargetNodeStorage: "gitaly-3",
SourceNodeStorage: "gitaly-1",
RelativePath: relativePath,
},
})
require.NoError(t, err)
// Failed event - should be removed as well.
failedEvent, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
Change: datastore.UpdateRepo,
VirtualStorage: virtualStorage,
TargetNodeStorage: "gitaly-4",
SourceNodeStorage: "gitaly-0",
RelativePath: relativePath,
},
})
require.NoError(t, err)
inProgress2, err := queue.Dequeue(ctx, virtualStorage, "gitaly-4", 10)
require.NoError(t, err)
require.Len(t, inProgress2, 1)
// Acknowledge with failed status, so it will remain in the database for the next processing
// attempt or until it is deleted by the 'removeReplicationEvents' method.
acks2, err := queue.Acknowledge(ctx, datastore.JobStateFailed, []uint64{inProgress2[0].ID})
require.NoError(t, err)
require.Equal(t, []uint64{inProgress2[0].ID}, acks2)
ticker := helper.NewTimerTicker(time.Millisecond)
defer ticker.Stop()
errChan := make(chan error, 1)
go func() {
cmd := &removeRepository{virtualStorage: virtualStorage, relativePath: relativePath}
errChan <- cmd.removeReplicationEvents(ctx, testhelper.NewTestLogger(t), db.DB, ticker)
}()
go func() {
// We acknowledge in_progress job, so it unblocks the waiting loop.
acks, err := queue.Acknowledge(ctx, datastore.JobStateCompleted, []uint64{inProgressEvent.ID})
if assert.NoError(t, err) {
assert.Equal(t, []uint64{inProgress1[0].ID}, acks)
}
}()
for checkChan, exists := errChan, true; exists; {
select {
case err := <-checkChan:
require.NoError(t, err)
close(errChan)
checkChan = nil
default:
}
// Wait until job removed
row := db.QueryRow(`SELECT EXISTS(SELECT FROM replication_queue WHERE id = $1)`, failedEvent.ID)
require.NoError(t, row.Scan(&exists))
}
// Once there are no in_progress jobs anymore the method returns.
require.NoError(t, <-errChan)
var notExists bool
row := db.QueryRow(`SELECT NOT EXISTS(SELECT FROM replication_queue)`)
require.NoError(t, row.Scan(&notExists))
require.True(t, notExists)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment