Skip to content
Snippets Groups Projects
Commit 1fed0790 authored by Pavlo Strokov's avatar Pavlo Strokov
Browse files

Improve query to identify up to date storages for reads distribution

Instead of checking for all storages the approach changed to
check only a defined set.
It helps omit of defining of the unique groups of storage and change
with sorting in order to get the latest actionable rows in each group.
The new query defines the list of the latest events per-storage
based on the passed in list of storage names.
It also relies on the new index that based on the updated_at column
and helps to identify the latest actionable rows faster.

Closes: #2944
parent 8c546351
No related branches found
No related tags found
Loading
Pipeline #165099982 passed
---
title: Improve query to identify up to date storages for reads distribution
merge_request:
author:
type: fixed
......@@ -124,6 +124,19 @@ func Uint64sToInterfaces(vs ...uint64) []interface{} {
return rs
}
// StringsToInterfaces converts list of string values to the list of empty interfaces.
func StringsToInterfaces(vs ...string) []interface{} {
if vs == nil {
return nil
}
rs := make([]interface{}, len(vs))
for i, v := range vs {
rs[i] = v
}
return rs
}
// GeneratePlaceholders returns string with 'count' placeholders starting from 'start' index.
// 1 will be used if provided value for 'start' is less then 1.
// 1 will be used if provided value for 'count' is less then 1.
......
......@@ -313,6 +313,21 @@ func TestUint64sToInterfaces(t *testing.T) {
}
}
func TestStringsToInterfaces(t *testing.T) {
for _, tc := range []struct {
From []string
Exp []interface{}
}{
{From: nil, Exp: nil},
{From: []string{"1"}, Exp: []interface{}{"1"}},
{From: []string{" ", "#$@!%^&*()-=\\{}", "こんにちは"}, Exp: []interface{}{" ", "#$@!%^&*()-=\\{}", "こんにちは"}},
} {
t.Run("", func(t *testing.T) {
require.Equal(t, tc.Exp, StringsToInterfaces(tc.From...))
})
}
}
func TestUint64Provider(t *testing.T) {
var provider Uint64Provider
......
......@@ -195,7 +195,7 @@ func (s *memoryReplicationEventQueue) GetOutdatedRepositories(ctx context.Contex
return outdatedRepositories, nil
}
func (s *memoryReplicationEventQueue) GetUpToDateStorages(_ context.Context, virtualStorage, repoPath string) ([]string, error) {
func (s *memoryReplicationEventQueue) GetUpToDateStorages(_ context.Context, virtualStorage, repoPath string, checkNames []string) ([]string, error) {
s.RLock()
dirtyStorages := make(map[string]struct{})
for dst, event := range s.lastEventByDest {
......@@ -212,8 +212,14 @@ func (s *memoryReplicationEventQueue) GetUpToDateStorages(_ context.Context, vir
var result []string
for _, storage := range storageNames {
if _, found := dirtyStorages[storage]; !found {
result = append(result, storage)
for _, checkName := range checkNames {
if storage != checkName {
continue
}
if _, found := dirtyStorages[storage]; !found {
result = append(result, storage)
}
}
}
return result, nil
......
package migrations
import migrate "github.com/rubenv/sql-migrate"
func init() {
m := &migrate.Migration{
Id: "20200710060654_index_on_updated_at_relative_path",
Up: []string{
// This index is used by the query in glsql.PostgresReplicationEventQueue#GetUpToDateStorages method.
`CREATE INDEX IF NOT EXISTS updated_at_relative_replication_queue_idx
ON replication_queue USING BTREE (updated_at DESC, (job ->> 'relative_path'))`,
},
Down: []string{
`DROP INDEX IF EXISTS updated_at_relative_replication_queue_idx`,
},
}
allMigrations = append(allMigrations, m)
}
......@@ -28,8 +28,9 @@ type ReplicationEventQueue interface {
// from the reference storage.
GetOutdatedRepositories(ctx context.Context, virtualStorage string, referenceStorage string) (map[string][]string, error)
// GetUpToDateStorages returns list of target storages where latest replication job is in 'completed' state.
// It return sub-set of passed in list of 'storageNames' values that would be checked.
// It returns no results if there is no up to date storages or there were no replication events yet.
GetUpToDateStorages(ctx context.Context, virtualStorage, repoPath string) ([]string, error)
GetUpToDateStorages(ctx context.Context, virtualStorage, repoPath string, storageNames []string) ([]string, error)
// StartHealthUpdate starts periodical update of the event's health identifier.
// The events with fresh health identifier won't be considered as stale.
// The health update will be executed on each new entry received from trigger channel passed in.
......@@ -357,19 +358,42 @@ ORDER BY repository, target
return nodesByRepo, rows.Err()
}
func (rq PostgresReplicationEventQueue) GetUpToDateStorages(ctx context.Context, virtualStorage, repoPath string) ([]string, error) {
func (rq PostgresReplicationEventQueue) GetUpToDateStorages(ctx context.Context, virtualStorage, repoPath string, storageNames []string) ([]string, error) {
if len(storageNames) == 0 {
return nil, nil
}
params := glsql.NewParamsAssembler()
virtualStoragePlaceholder := params.AddParam(virtualStorage)
repoPathPlaceholder := params.AddParam(repoPath)
storagesPlaceholder := params.AddParams(glsql.StringsToInterfaces(storageNames...))
statePlaceholder := params.AddParam(JobStateCompleted)
// The performance of this query depends on the internal/praefect/datastore/migrations/20200710060654_index_on_updated_at_relative_path.go
// If implementation changes consider if the index is still useful.
query := `
SELECT storage
FROM (
SELECT DISTINCT ON (job ->> 'target_node_storage')
WITH latest AS (
SELECT
job ->> 'target_node_storage' AS storage,
state
CASE WHEN SUM(CASE WHEN updated_at IS NULL THEN 1 ELSE 0 END) > 0
THEN NULL ELSE MAX(updated_at)
END AS updated_at
FROM replication_queue
WHERE job ->> 'virtual_storage' = $1 AND job ->> 'relative_path' = $2
ORDER BY job ->> 'target_node_storage', updated_at DESC NULLS FIRST
) t
WHERE state = 'completed'`
rows, err := rq.qc.QueryContext(ctx, query, virtualStorage, repoPath)
WHERE job ->> 'virtual_storage' = ` + virtualStoragePlaceholder + `
AND job ->> 'relative_path' = ` + repoPathPlaceholder + `
AND job ->> 'target_node_storage' IN (` + storagesPlaceholder + `)
GROUP BY job ->> 'target_node_storage'
)
SELECT DISTINCT queue.job ->> 'target_node_storage'
FROM replication_queue AS queue
JOIN latest ON latest.storage = queue.job ->> 'target_node_storage'
WHERE queue.job ->> 'virtual_storage' = ` + virtualStoragePlaceholder + `
AND queue.job ->> 'relative_path' = ` + repoPathPlaceholder + `
AND queue.job ->> 'target_node_storage' IN (` + storagesPlaceholder + `)
AND queue.updated_at = latest.updated_at
AND queue.state = ` + statePlaceholder
rows, err := rq.qc.QueryContext(ctx, query, params.Params()...)
if err != nil {
return nil, fmt.Errorf("query: %w", err)
}
......
......@@ -563,7 +563,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{1, 3})
require.NoError(t, err)
require.Equal(t, []uint64{1, 3}, acknowledge2)
require.ElementsMatch(t, []uint64{1, 3}, acknowledge2)
requireLocks(t, ctx, db, []LockRow{
{ID: "praefect|gitaly-1|/project/path-1", Acquired: true},
{ID: "praefect|gitaly-1|/project/path-2", Acquired: true},
......@@ -594,7 +594,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{2, 5, 6, 7})
require.NoError(t, err)
require.Equal(t, []uint64{2, 5, 6, 7}, acknowledged3)
require.ElementsMatch(t, []uint64{2, 5, 6, 7}, acknowledged3)
requireLocks(t, ctx, db, []LockRow{
{ID: "praefect|gitaly-1|/project/path-1", Acquired: false},
{ID: "praefect|gitaly-1|/project/path-2", Acquired: false},
......@@ -659,6 +659,19 @@ func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) {
source := PostgresReplicationEventQueue{qc: db}
t.Run("no storage names provided", func(t *testing.T) {
db.MustExec(t, `
INSERT INTO replication_queue
(job, updated_at, state)
VALUES
('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed')`,
)
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1", nil)
require.NoError(t, err)
require.ElementsMatch(t, nil, ss)
})
t.Run("single 'ready' job for single storage", func(t *testing.T) {
db.TruncateAll(t)
......@@ -666,10 +679,10 @@ func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) {
INSERT INTO replication_queue
(job, updated_at, state)
VALUES
('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'ready')`,
('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', NULL, 'ready')`,
)
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1", []string{"s1"})
require.NoError(t, err)
require.ElementsMatch(t, []string{}, ss)
})
......@@ -684,7 +697,7 @@ func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) {
('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'dead')`,
)
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1", []string{"s1"})
require.NoError(t, err)
require.ElementsMatch(t, []string{}, ss)
})
......@@ -699,7 +712,7 @@ func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) {
('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'failed')`,
)
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1", []string{"s1"})
require.NoError(t, err)
require.ElementsMatch(t, []string{}, ss)
})
......@@ -714,7 +727,7 @@ func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) {
('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed')`,
)
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1", []string{"s1"})
require.NoError(t, err)
require.ElementsMatch(t, []string{"s1"}, ss)
})
......@@ -730,7 +743,7 @@ func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) {
('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-2"}', '2020-01-01 00:00:00', 'completed')`,
)
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1", []string{"s1", "s2"})
require.NoError(t, err)
require.ElementsMatch(t, []string{"s1"}, ss)
})
......@@ -746,9 +759,9 @@ func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) {
('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed')`,
)
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1", []string{"s1", "s2"})
require.NoError(t, err)
require.ElementsMatch(t, []string{"s1", "s2"}, ss)
require.ElementsMatch(t, []string{"s1", "s2"}, ss, ss)
})
t.Run("last jobs are 'completed' for multiple storages but different virtuals", func(t *testing.T) {
......@@ -762,7 +775,7 @@ func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) {
('{"virtual_storage": "vs2", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed')`,
)
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1", []string{"s1"})
require.NoError(t, err)
require.ElementsMatch(t, []string{"s1"}, ss)
})
......@@ -778,7 +791,7 @@ func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) {
('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'in_progress')`,
)
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1", []string{"s1", "s2"})
require.NoError(t, err)
require.ElementsMatch(t, []string{"s1"}, ss)
})
......@@ -791,12 +804,12 @@ func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) {
(job, updated_at, state)
VALUES
('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'dead'),
('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'ready'),
('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', NULL, 'ready'),
('{"virtual_storage": "vs1", "target_node_storage": "s3", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'failed'),
('{"virtual_storage": "vs1", "target_node_storage": "s4", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'in_progress')`,
)
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1", []string{"s1", "s2", "s3", "s4"})
require.NoError(t, err)
require.ElementsMatch(t, []string{}, ss)
})
......@@ -811,7 +824,7 @@ func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) {
('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'dead'),
('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'ready'),
('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', NULL, 'ready'),
('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
('{"virtual_storage": "vs1", "target_node_storage": "s3", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'failed'),
......@@ -821,9 +834,9 @@ func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) {
('{"virtual_storage": "vs1", "target_node_storage": "s4", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed')`,
)
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1", []string{"s1", "s2", "s3", "s4"})
require.NoError(t, err)
require.ElementsMatch(t, []string{}, ss)
require.ElementsMatch(t, []string{}, ss, ss)
})
t.Run("multiple virtuals with multiple storages", func(t *testing.T) {
......@@ -848,10 +861,50 @@ func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) {
('{"virtual_storage": "vs1", "target_node_storage": "s5", "relative_path": "path-2"}', '2020-01-01 00:00:00', 'completed')`,
)
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1", []string{"s1", "s2", "s3", "s4", "s5"})
require.NoError(t, err)
require.ElementsMatch(t, []string{"s2"}, ss)
})
t.Run("multiple virtuals with multiple storages but reduced by the provided list", func(t *testing.T) {
db.TruncateAll(t)
db.MustExec(t, `
INSERT INTO replication_queue
(job, updated_at, state)
VALUES
('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'completed'),
('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'dead'),
('{"virtual_storage": "vs2", "target_node_storage": "s3", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'completed'),
('{"virtual_storage": "vs1", "target_node_storage": "s4", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
('{"virtual_storage": "vs1", "target_node_storage": "s5", "relative_path": "path-2"}', '2020-01-01 00:00:00', 'completed')`,
)
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1", []string{"s1", "s2"})
require.NoError(t, err)
require.ElementsMatch(t, []string{"s1", "s2"}, ss)
})
t.Run("storage with 'completed' and 'failed' events with same update time considered up to date", func(t *testing.T) {
db.TruncateAll(t)
db.MustExec(t, `
INSERT INTO replication_queue
(job, updated_at, state)
VALUES
('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'failed')`,
)
ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1", []string{"s1"})
require.NoError(t, err)
require.ElementsMatch(t, []string{"s1"}, ss)
})
}
func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) {
......
......@@ -226,8 +226,17 @@ func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath st
return shard.Primary, nil
}
storageNames := make([]string, 0, len(shard.Secondaries))
for _, secondary := range shard.Secondaries {
if !secondary.IsHealthy() {
continue
}
storageNames = append(storageNames, secondary.GetStorage())
}
logger := ctxlogrus.Extract(ctx).WithFields(logrus.Fields{"virtual_storage_name": virtualStorageName, "repo_path": repoPath})
upToDateStorages, err := n.queue.GetUpToDateStorages(ctx, virtualStorageName, repoPath)
upToDateStorages, err := n.queue.GetUpToDateStorages(ctx, virtualStorageName, repoPath, storageNames)
if err != nil {
// this is recoverable error - proceed with primary node
logger.WithError(err).Warn("get up to date secondaries")
......@@ -245,10 +254,6 @@ func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath st
logger.WithError(err).Warn("storage returned as up-to-date")
}
if !node.IsHealthy() {
continue
}
storages[node] = struct{}{}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment