Skip to content
Snippets Groups Projects
Commit b54bafce authored by Pavlo Strokov's avatar Pavlo Strokov
Browse files

Removal of distributed_reads feature flag

As feature is enabled and shows expected results we remove it
and make it a default behaviour for the praefect.

Relates to: #2944
parent d8e2f9db
No related branches found
No related tags found
Loading
Pipeline #177583769 passed with warnings
---
title: Removal of distributed_reads feature flag
merge_request: 2470
author:
type: removed
......@@ -13,8 +13,6 @@ var (
GoUpdateHook = FeatureFlag{Name: "go_update_hook", OnByDefault: true}
// GoFetchSourceBranch enables a go implementation of FetchSourceBranch
GoFetchSourceBranch = FeatureFlag{Name: "go_fetch_source_branch", OnByDefault: false}
// DistributedReads allows praefect to redirect accessor operations to up-to-date secondaries
DistributedReads = FeatureFlag{Name: "distributed_reads", OnByDefault: false}
// GoPreReceiveHook will bypass the ruby pre-receive hook and use the go implementation
GoPreReceiveHook = FeatureFlag{Name: "go_prereceive_hook", OnByDefault: true}
// GoPostReceiveHook will bypass the ruby post-receive hook and use the go implementation
......@@ -39,7 +37,6 @@ var (
var All = []FeatureFlag{
GoUpdateHook,
GoFetchSourceBranch,
DistributedReads,
GoPreReceiveHook,
GoPostReceiveHook,
ReferenceTransactions,
......
......@@ -454,83 +454,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
}
}
func TestStreamDirectorAccessor(t *testing.T) {
gitalySocket := testhelper.GetTemporaryGitalySocketFileName()
srv, _ := testhelper.NewServerWithHealth(t, gitalySocket)
defer srv.Stop()
gitalyAddress := "unix://" + gitalySocket
conf := config.Config{
VirtualStorages: []*config.VirtualStorage{
{
Name: "praefect",
Nodes: []*config.Node{
{
Address: gitalyAddress,
Storage: "praefect-internal-1",
},
},
},
},
}
queue := datastore.NewMemoryReplicationEventQueue(conf)
targetRepo := gitalypb.Repository{
StorageName: "praefect",
RelativePath: "/path/to/hashed/storage",
}
ctx, cancel := testhelper.Context()
defer cancel()
ctx = featureflag.IncomingCtxWithDisabledFeatureFlag(ctx, featureflag.DistributedReads)
entry := testhelper.DiscardTestEntry(t)
nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(0, time.Minute)
txMgr := transactions.NewManager()
coordinator := NewCoordinator(
queue,
datastore.NewMemoryRepositoryStore(conf.StorageNames()),
nodeMgr,
txMgr,
conf,
protoregistry.GitalyProtoPreregistered,
)
frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo})
require.NoError(t, err)
fullMethod := "/gitaly.RefService/FindAllBranches"
peeker := &mockPeeker{frame: frame}
streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
require.NoError(t, err)
require.Equal(t, gitalyAddress, streamParams.Primary().Conn.Target())
md, ok := metadata.FromOutgoingContext(streamParams.Primary().Ctx)
require.True(t, ok)
require.Contains(t, md, praefect_metadata.PraefectMetadataKey)
mi, err := coordinator.registry.LookupMethod(fullMethod)
require.NoError(t, err)
require.Equal(t, protoregistry.ScopeRepository, mi.Scope, "method must be repository scoped")
require.Equal(t, protoregistry.OpAccessor, mi.Operation, "method must be an accessor")
m, err := mi.UnmarshalRequestProto(streamParams.Primary().Msg)
require.NoError(t, err)
rewrittenTargetRepo, err := mi.TargetRepo(m)
require.NoError(t, err)
require.Equal(t, "praefect-internal-1", rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name")
}
func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
func TestCoordinatorStreamDirector(t *testing.T) {
gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(), testhelper.GetTemporaryGitalySocketFileName()
srv1, _ := testhelper.NewServerWithHealth(t, gitalySocket0)
defer srv1.Stop()
......
......@@ -15,7 +15,6 @@ import (
"github.com/sirupsen/logrus"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
"gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
......@@ -222,10 +221,6 @@ func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath st
return nil, fmt.Errorf("get shard for %q: %w", virtualStorageName, err)
}
if !featureflag.IsEnabled(ctx, featureflag.DistributedReads) {
return shard.Primary, nil
}
logger := ctxlogrus.Extract(ctx).WithFields(logrus.Fields{"virtual_storage_name": virtualStorageName, "repo_path": repoPath})
upToDateStorages, err := n.rs.GetConsistentSecondaries(ctx, virtualStorageName, repoPath, shard.Primary.GetStorage())
if err != nil {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment