...
 
Commits (17)
......@@ -117,12 +117,6 @@ cover:
stage: test
script:
- make cover
after_script:
- bash <(curl -s https://codecov.io/bash) -t "${CODECOV_TOKEN}" -f _build/cover/all.merged -F unittests
artifacts:
paths:
- _build/cover/all.html
expire_in: 1 week
code_quality:
image: docker:stable
......
# Gitaly changelog
## v1.19.1
#### Fixed
- Use empty tree if initial commit
https://gitlab.com/gitlab-org/gitaly/merge_requests/1075
## v1.19.0
#### Fixed
- Return blank checksum for git repositories with only invalid refs
https://gitlab.com/gitlab-org/gitaly/merge_requests/1065
#### Other
- Use chunker in GetRawChanges
https://gitlab.com/gitlab-org/gitaly/merge_requests/1043
## v1.18.0
#### Other
......
This diff is collapsed.
---
title: Initial design document for High Availability
merge_request: 1058
author:
type: other
---
title: Reverse proxy pass thru for HA
merge_request: 1064
author:
type: other
---
title: Reimplement DeleteRefs in Go
merge_request: 1069
author:
type: performance
......@@ -294,6 +294,14 @@ Now you can run the one test you're interested in:
go test -count 1 -run TestRepositoryExists ./internal/service/repository
```
When writing tests, prefer using [testify]'s [require], and [assert] as
methods to set expectations over functions like `t.Fatal()` and others directly
called on `testing.T`.
[testify]: https://github.com/stretchr/testify
[require]: https://github.com/stretchr/testify/tree/master/require
[assert]: https://github.com/stretchr/testify/tree/master/assert
#### Rspec tests
It is possible to write end-to-end Rspec tests that run against a full
......
# Gitaly High Availability (HA) Design
Gitaly HA is an active-active cluster configuration for resilient git operations. [Refer to our specific requirements](https://gitlab.com/gitlab-org/gitaly/issues/1332).
Refer to epic &289 for current issues and discussions revolving around HA MVC development.
## Terminology
The following terminology may be used within the context of the Gitaly HA project:
- Shard - partition of the storage for all repos. Each shard will require redundancy in the form of multiple Gitaly nodes (at least 3 when optimal) to maintain HA.
- Praefect - a transparent front end to all Gitaly shards. This reverse proxy ensures that all gRPC calls are forwarded to the correct shard by consulting the coordinator. The reverse proxy also ensures that write actions are performed transactionally when needed.
- etymology: from Latin praefectus for _a person appointed to any of various positions of command, authority, or superintendence, as a chief magistrate in ancient Rome or the chief administrative official of a department of France or Italy._
- [pronounced _pree-fect_](https://www.youtube.com/watch?v=MHszCZjPmTQ)
- Node (TODO: we probably need a similar latin name here) - performs the actual git read/write operations to/from disk. Has no knowledge of shards/prafects/coordinators just as the Gitaly service existed prior to HA.
## Design
The high level design takes a reverse proxy approach to fanning out write requests to the appropriate nodes:
<img src="https://docs.google.com/drawings/d/e/2PACX-1vRl7WS-6RBOWxyLSBbBBAoV9MupmTh5vTqMOw_AX9axlboqkybTbFqGqExLyyYOilqEW7S9euXdBHzX/pub?w=960&amp;h=720">
## Phases
An iterative low risk approach needs to be devised to add functionality and verify assumptions at a sustainable pace while not impeding the existing functionality.
### 1. Simple pass-through proxy - no added functionality
- allows us to set up telemetry for observability of new service
- allows us to evaluate a gRPC proxy library
### 2. Introduce State
The following details need to be persisted in Postgres:
- [x] Primary location for a project
- [ ] Redundant locations for a project
- [ ] Available storage locations (initially can be configuration file)
Initially, the state of the shard nodes will be static and loaded from a configuration file. Eventually, this will be made dynamic via a data store (Postgres).
### Resolving Location
The following existing interaction will remain intact for the first iteration of the HA feature:
```mermaid
sequenceDiagram
Client->>Rails: Modify repo X
Rails-->>Datastore: Where is Repo X?
Datastore-->> Rails: Repo X is at location A
Rails-->>Gitaly: Modify repo X at location A
Gitaly-->>Rails: Request succeeded/failed
```
Once the Rails app has resolved the primary location for the project, the request is made to the praefect. The praefect then resolves the redundant locations via the coordinator before applying the changes.
```mermaid
sequenceDiagram
Rails->>Praefect: Modify repo X at A
Praefect->>Coordinator: Which locations complement A for X?
Coordinator->>Praefect: Locations B and C complement A
Praefect->>Nodes ABC: Modify repo X
Nodes ABC->>Praefect: Modifications successful!
```
*Note: the above interaction between the praefect and nodes A-B-C is an all-or-nothing transaction. All nodes must complete in success, otherwise a single node failure will cause the entire transaction to fail. This will be improved when replication is introduced.*
### 3. Replication
The next phase is to enable replication of data between nodes. This makes transactions more efficient and fault tolerant. This could be done a few ways:
#### Node Orchestrated [👎]
Node orchestrated puts the intelligence of replication into one of the nodes being modified:
```mermaid
sequenceDiagram
Praefect->>Node A: Modify repo X
activate Node A
Node A->>Node B: Modify repo X
Node A->>Node C: Modify repo X
Node A->>Praefect: Modification successful!
```
Orchestration requires designating a leader node for the transaction. This leader node becomes a critical path for all nodes involved. Ideally, we want several simpler (less riskier) operations that can succeed/fail independently of each other. This way, failure and recovery can be handled externally of the nodes.
#### Praefect Orchestrated [👍]
With the praefect orchestrating replication, we are isolating the critical path to a stateless service. Stateless services are preferred for the critical path since another praefect can pick up the task after a praefect failure.
```mermaid
sequenceDiagram
Praefect->>Node A: Modify repo X
Node A->>Praefect: Success!
Praefect->>Node B: Replicate From A
Praefect->>Node C: Replicate From A
Node B->>Praefect: Success!
Node C->>Praefect: Success!
```
*Note: Once Node-A propagates changes to a peer, Node-A is no longer the critical path for subsequent propagations. If Node-A fails after a second peer is propagated, that second peer can become the new leader and resume replications.*
## Notes
* Existing discussions
* Requirements: https://gitlab.com/gitlab-org/gitaly/issues/1332
* Design: https://gitlab.com/gitlab-org/gitaly/issues/1335
* Prior art
* Stemma by Palantir
* [Announcement](https://medium.com/palantir/stemma-distributed-git-server-70afbca0fc29)
* Extends jgit (java git implementation)
* Spokes by GitHub
* Application layer approach: uses underlying git software to propagate changes to other locations.
* Bitbucket Data Center (BDC)
* [BDC FAQ](https://confluence.atlassian.com/enterprise/bitbucket-data-center-faq-776663707.html)
* Ketch by Google (no longer maintained)
* [Sid's comment on performance issue](https://news.ycombinator.com/item?id=13934698)
* Also jgit based
* gRPC proxy considerations
* [gRPC Proxy library](https://github.com/mwitkow/grpc-proxy)
* Pros
* Handles all gRPC requests generically
* Cons
* Lack of support
* [See current importers of project](https://godoc.org/github.com/mwitkow/grpc-proxy/proxy?importers)
* Low level implementation requires knowledge of gRPC internals
* Custom code generation
* Pros
* Simple and maintainable
* Allows us to handwrite proxy code and later automate with lessons learned via code generation
* Cons
* Process heavy; requires custom tooling
* Requires a way to tell which methods are read/write
* [See WIP for marking modifying RPCs](https://gitlab.com/gitlab-org/gitaly-proto/merge_requests/228)
* See also:
* [nRPC](https://github.com/nats-rpc/nrpc) - gRPC via NATS
* [grpclb](https://github.com/bsm/grpclb) - gRPC load balancer
* Complications
* Existing Rails app indicates the Gitaly instance that a request is destined for (e.g. request to modify repo X should be directed to gitaly #1).
* This means that rails app must be kept in the loop about any changes made to the location of a repo.
* This may be mitigated by changing the proxy implementation to intepret the destination address as a reference to a shard rather than a specific host. This might open the door to allowing for something like consistent hashing.
* While Git is distributed in nature, some write operations need to be serialized to avoid race conditions. This includes ref updates.
* How do we coordinate proxies when applying ref updates? Do we need to?
package praefect_test
import (
"context"
"gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
"google.golang.org/grpc"
)
// mockRepoSvc is a mock implementation of gitalypb.RepositoryServiceServer
// for testing purposes
type mockRepoSvc struct {
srv *grpc.Server
}
func (m *mockRepoSvc) RepositoryExists(context.Context, *gitalypb.RepositoryExistsRequest) (*gitalypb.RepositoryExistsResponse, error) {
return &gitalypb.RepositoryExistsResponse{}, nil
}
func (m *mockRepoSvc) RepackIncremental(context.Context, *gitalypb.RepackIncrementalRequest) (*gitalypb.RepackIncrementalResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) RepackFull(context.Context, *gitalypb.RepackFullRequest) (*gitalypb.RepackFullResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) GarbageCollect(context.Context, *gitalypb.GarbageCollectRequest) (*gitalypb.GarbageCollectResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) RepositorySize(context.Context, *gitalypb.RepositorySizeRequest) (*gitalypb.RepositorySizeResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) ApplyGitattributes(context.Context, *gitalypb.ApplyGitattributesRequest) (*gitalypb.ApplyGitattributesResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) FetchRemote(context.Context, *gitalypb.FetchRemoteRequest) (*gitalypb.FetchRemoteResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) CreateRepository(context.Context, *gitalypb.CreateRepositoryRequest) (*gitalypb.CreateRepositoryResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) GetArchive(*gitalypb.GetArchiveRequest, gitalypb.RepositoryService_GetArchiveServer) error {
return nil
}
func (m *mockRepoSvc) HasLocalBranches(context.Context, *gitalypb.HasLocalBranchesRequest) (*gitalypb.HasLocalBranchesResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) FetchSourceBranch(context.Context, *gitalypb.FetchSourceBranchRequest) (*gitalypb.FetchSourceBranchResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) Fsck(context.Context, *gitalypb.FsckRequest) (*gitalypb.FsckResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) WriteRef(context.Context, *gitalypb.WriteRefRequest) (*gitalypb.WriteRefResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) FindMergeBase(context.Context, *gitalypb.FindMergeBaseRequest) (*gitalypb.FindMergeBaseResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) CreateFork(context.Context, *gitalypb.CreateForkRequest) (*gitalypb.CreateForkResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) IsRebaseInProgress(context.Context, *gitalypb.IsRebaseInProgressRequest) (*gitalypb.IsRebaseInProgressResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) IsSquashInProgress(context.Context, *gitalypb.IsSquashInProgressRequest) (*gitalypb.IsSquashInProgressResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) CreateRepositoryFromURL(context.Context, *gitalypb.CreateRepositoryFromURLRequest) (*gitalypb.CreateRepositoryFromURLResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) CreateBundle(*gitalypb.CreateBundleRequest, gitalypb.RepositoryService_CreateBundleServer) error {
return nil
}
func (m *mockRepoSvc) CreateRepositoryFromBundle(gitalypb.RepositoryService_CreateRepositoryFromBundleServer) error {
return nil
}
func (m *mockRepoSvc) WriteConfig(context.Context, *gitalypb.WriteConfigRequest) (*gitalypb.WriteConfigResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) SetConfig(context.Context, *gitalypb.SetConfigRequest) (*gitalypb.SetConfigResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) DeleteConfig(context.Context, *gitalypb.DeleteConfigRequest) (*gitalypb.DeleteConfigResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) FindLicense(context.Context, *gitalypb.FindLicenseRequest) (*gitalypb.FindLicenseResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) GetInfoAttributes(*gitalypb.GetInfoAttributesRequest, gitalypb.RepositoryService_GetInfoAttributesServer) error {
return nil
}
func (m *mockRepoSvc) CalculateChecksum(context.Context, *gitalypb.CalculateChecksumRequest) (*gitalypb.CalculateChecksumResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) Cleanup(context.Context, *gitalypb.CleanupRequest) (*gitalypb.CleanupResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) GetSnapshot(*gitalypb.GetSnapshotRequest, gitalypb.RepositoryService_GetSnapshotServer) error {
return nil
}
func (m *mockRepoSvc) CreateRepositoryFromSnapshot(context.Context, *gitalypb.CreateRepositoryFromSnapshotRequest) (*gitalypb.CreateRepositoryFromSnapshotResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) GetRawChanges(*gitalypb.GetRawChangesRequest, gitalypb.RepositoryService_GetRawChangesServer) error {
return nil
}
func (m *mockRepoSvc) SearchFilesByContent(*gitalypb.SearchFilesByContentRequest, gitalypb.RepositoryService_SearchFilesByContentServer) error {
return nil
}
func (m *mockRepoSvc) SearchFilesByName(*gitalypb.SearchFilesByNameRequest, gitalypb.RepositoryService_SearchFilesByNameServer) error {
return nil
}
func (m *mockRepoSvc) RestoreCustomHooks(gitalypb.RepositoryService_RestoreCustomHooksServer) error {
return nil
}
func (m *mockRepoSvc) BackupCustomHooks(*gitalypb.BackupCustomHooksRequest, gitalypb.RepositoryService_BackupCustomHooksServer) error {
return nil
}
/*Package praefect is a Gitaly reverse proxy for transparently routing gRPC
calls to a set of Gitaly services.*/
package praefect
import (
"context"
"errors"
"fmt"
"net"
"sync"
"github.com/mwitkow/grpc-proxy/proxy"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Logger is a simple interface that allows loggers to be dependency injected
// into the praefect server
type Logger interface {
Debugf(format string, args ...interface{})
}
// Coordinator takes care of directing client requests to the appropriate
// downstream server. The coordinator is thread safe; concurrent calls to
// register nodes are safe.
type Coordinator struct {
log Logger
lock sync.RWMutex
nodes map[string]*grpc.ClientConn
}
// newCoordinator returns a new Coordinator that utilizes the provided logger
func newCoordinator(l Logger) *Coordinator {
return &Coordinator{
log: l,
nodes: make(map[string]*grpc.ClientConn),
}
}
// streamDirector determines which downstream servers receive requests
func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
// For phase 1, we need to route messages based on the storage location
// to the appropriate Gitaly node.
c.log.Debugf("Stream director received method %s", fullMethodName)
// TODO: obtain storage location dynamically from RPC request message
storageLoc := "test"
c.lock.RLock()
cc, ok := c.nodes[storageLoc]
c.lock.RUnlock()
if !ok {
err := status.Error(
codes.FailedPrecondition,
fmt.Sprintf("no downstream node for storage location %q", storageLoc),
)
return nil, nil, err
}
return ctx, cc, nil
}
// Server is a praefect server
type Server struct {
*Coordinator
s *grpc.Server
}
// NewServer returns an initialized praefect gPRC proxy server configured
// with the provided gRPC server options
func NewServer(grpcOpts []grpc.ServerOption, l Logger) *Server {
c := newCoordinator(l)
grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector)...)
return &Server{
s: grpc.NewServer(grpcOpts...),
Coordinator: c,
}
}
// ErrStorageLocExists indicates a storage location has already been registered
// in the proxy for a downstream Gitaly node
var ErrStorageLocExists = errors.New("storage location already registered")
// RegisterNode will direct traffic to the supplied downstream connection when the storage location
// is encountered.
//
// TODO: Coordinator probably needs to handle dialing, or another entity
// needs to handle dialing to ensure keep alives and redialing logic
// exist for when downstream connections are severed.
func (c *Coordinator) RegisterNode(storageLoc string, node *grpc.ClientConn) {
c.lock.Lock()
c.nodes[storageLoc] = node
c.lock.Unlock()
}
func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption {
return []grpc.ServerOption{
grpc.CustomCodec(proxy.Codec()),
grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
}
}
// Start will start the praefect gRPC proxy server listening at the provided
// listener. Function will block until the server is stopped or an
// unrecoverable error occurs.
func (srv *Server) Start(lis net.Listener) error {
return srv.s.Serve(lis)
}
// Shutdown will attempt a graceful shutdown of the grpc server. If unable
// to gracefully shutdown within the context deadline, it will then
// forcefully shutdown the server and return a context cancellation error.
func (srv *Server) Shutdown(ctx context.Context) error {
done := make(chan struct{})
go func() {
srv.s.GracefulStop()
close(done)
}()
select {
case <-ctx.Done():
srv.s.Stop()
return ctx.Err()
case <-done:
return nil
}
}
package praefect_test
import (
"context"
"fmt"
"net"
"testing"
"time"
"github.com/mwitkow/grpc-proxy/proxy"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/praefect"
"google.golang.org/grpc"
)
func TestServerRouting(t *testing.T) {
prf := praefect.NewServer(nil, testLogger{t})
listener, port := listenAvailPort(t)
t.Logf("proxy listening on port %d", port)
defer listener.Close()
errQ := make(chan error)
go func() {
errQ <- prf.Start(listener)
}()
// dial client to proxy
cc := dialLocalPort(t, port, false)
defer cc.Close()
gCli := gitalypb.NewRepositoryServiceClient(cc)
mCli, _, cleanup := newMockDownstream(t)
defer cleanup() // clean up mock downstream server resources
prf.RegisterNode("test", mCli)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, err := gCli.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{})
require.NoError(t, err)
err = prf.Shutdown(ctx)
require.NoError(t, err)
require.NoError(t, <-errQ)
}
func listenAvailPort(tb testing.TB) (net.Listener, int) {
listener, err := net.Listen("tcp", ":0")
require.NoError(tb, err)
return listener, listener.Addr().(*net.TCPAddr).Port
}
func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn {
opts := []grpc.DialOption{
grpc.WithBlock(),
}
if backend {
opts = append(
opts,
grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
)
}
cc, err := client.Dial(
fmt.Sprintf("tcp://localhost:%d", port),
opts,
)
require.NoError(tb, err)
return cc
}
type testLogger struct {
testing.TB
}
func (tl testLogger) Debugf(format string, args ...interface{}) {
tl.TB.Logf(format, args...)
}
// initializes and returns a client to downstream server, downstream server, and cleanup function
func newMockDownstream(tb testing.TB) (*grpc.ClientConn, gitalypb.RepositoryServiceServer, func()) {
// setup mock server
m := &mockRepoSvc{
srv: grpc.NewServer(),
}
gitalypb.RegisterRepositoryServiceServer(m.srv, m)
lis, port := listenAvailPort(tb)
// dial praefect to backend service
cc := dialLocalPort(tb, port, true)
errQ := make(chan error)
go func() {
errQ <- m.srv.Serve(lis)
}()
cleanup := func() {
m.srv.GracefulStop()
lis.Close()
cc.Close()
require.NoError(tb, <-errQ)
}
return cc, m, cleanup
}
......@@ -35,7 +35,15 @@ func commitStats(ctx context.Context, in *gitalypb.CommitStatsRequest) (*gitalyp
return nil, fmt.Errorf("commit not found: %q", in.Revision)
}
cmd, err := git.Command(ctx, in.Repository, "diff", "--numstat", commit.Id+"^", commit.Id)
args := []string{"diff", "--numstat"}
if len(commit.GetParentIds()) == 0 {
args = append(args, git.EmptyTreeID, commit.Id)
} else {
args = append(args, commit.Id+"^", commit.Id)
}
cmd, err := git.Command(ctx, in.Repository, args...)
if err != nil {
return nil, err
}
......
......@@ -58,6 +58,13 @@ func TestCommitStatsSuccess(t *testing.T) {
additions: 0,
deletions: 0,
},
{
desc: "initial commit",
revision: "1a0b36b3",
oid: "1a0b36b3cdad1d2ee32457c102a8c0b7056fa863",
additions: 43,
deletions: 0,
},
}
for _, tc := range tests {
......
package ref
import (
"bufio"
"context"
"fmt"
"strings"
"gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/git/updateref"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
......@@ -15,17 +19,71 @@ func (s *server) DeleteRefs(ctx context.Context, in *gitalypb.DeleteRefsRequest)
return nil, status.Errorf(codes.InvalidArgument, "DeleteRefs: %v", err)
}
client, err := s.RefServiceClient(ctx)
updater, err := updateref.New(ctx, in.GetRepository())
if err != nil {
return nil, err
}
clientCtx, err := rubyserver.SetHeaders(ctx, in.GetRepository())
refnames, err := refsToRemove(ctx, in)
if err != nil {
// TODO decorate error
return nil, err
}
return client.DeleteRefs(clientCtx, in)
for _, ref := range refnames {
if err := updater.Delete(ref); err != nil {
return &gitalypb.DeleteRefsResponse{GitError: err.Error()}, nil
}
}
if err := updater.Wait(); err != nil {
return &gitalypb.DeleteRefsResponse{GitError: fmt.Sprintf("unable to delete refs: %s", err.Error())}, nil
}
return &gitalypb.DeleteRefsResponse{}, nil
}
func refsToRemove(ctx context.Context, req *gitalypb.DeleteRefsRequest) ([]string, error) {
var refs []string
if len(req.Refs) > 0 {
for _, ref := range req.Refs {
refs = append(refs, string(ref))
}
return refs, nil
}
cmd, err := git.Command(ctx, req.GetRepository(), "for-each-ref", "--format=%(refname)")
if err != nil {
return nil, err
}
var prefixes []string
for _, prefix := range req.ExceptWithPrefix {
prefixes = append(prefixes, string(prefix))
}
scanner := bufio.NewScanner(cmd)
for scanner.Scan() {
refName := scanner.Text()
if hasAnyPrefix(refName, prefixes) {
continue
}
refs = append(refs, string(refName))
}
return refs, helper.ErrInternal(fmt.Errorf("error listing refs: %v", cmd.Wait()))
}
func hasAnyPrefix(s string, prefixes []string) bool {
for _, prefix := range prefixes {
if strings.HasPrefix(s, prefix) {
return true
}
}
return false
}
func validateDeleteRefRequest(req *gitalypb.DeleteRefsRequest) error {
......
......@@ -3,6 +3,7 @@ package ref
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
......@@ -83,7 +84,7 @@ func TestFailedDeleteRefsRequestDueToGitError(t *testing.T) {
response, err := client.DeleteRefs(ctx, request)
require.NoError(t, err)
require.Contains(t, response.GitError, "Could not delete refs")
assert.Contains(t, response.GitError, "unable to delete refs")
}
func TestFailedDeleteRefsDueToValidation(t *testing.T) {
......
......@@ -71,7 +71,7 @@ func (s *server) CalculateChecksum(ctx context.Context, in *gitalypb.CalculateCh
return nil, status.Errorf(codes.Internal, err.Error())
}
if err := cmd.Wait(); err != nil {
if err := cmd.Wait(); checksum == nil || err != nil {
if isValidRepo(ctx, repo) {
return &gitalypb.CalculateChecksumResponse{Checksum: blankChecksum}, nil
}
......
......@@ -143,3 +143,29 @@ func TestFailedCalculateChecksum(t *testing.T) {
testhelper.RequireGrpcError(t, err, testCase.code)
}
}
func TestInvalidRefsCalculateChecksum(t *testing.T) {
server, serverSocketPath := runRepoServer(t)
defer server.Stop()
client, conn := newRepositoryClient(t, serverSocketPath)
defer conn.Close()
testRepo, testRepoPath, cleanupFn := testhelper.NewTestRepo(t)
defer cleanupFn()
// Force the refs database of testRepo into a known state
require.NoError(t, os.RemoveAll(path.Join(testRepoPath, "refs")))
for _, d := range []string{"refs/heads", "refs/tags", "refs/notes"} {
require.NoError(t, os.MkdirAll(path.Join(testRepoPath, d), 0755))
}
require.NoError(t, exec.Command("cp", "testdata/checksum-test-invalid-refs", path.Join(testRepoPath, "packed-refs")).Run())
request := &gitalypb.CalculateChecksumRequest{Repository: testRepo}
testCtx, cancelCtx := testhelper.Context()
defer cancelCtx()
response, err := client.CalculateChecksum(testCtx, request)
require.NoError(t, err)
require.Equal(t, "0000000000000000000000000000000000000000", response.Checksum)
}
......@@ -11,6 +11,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/internal/git/rawdiff"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/helper/chunk"
)
func (s *server) GetRawChanges(req *gitalypb.GetRawChangesRequest, stream gitalypb.RepositoryService_GetRawChangesServer) error {
......@@ -62,17 +63,16 @@ func getRawChanges(stream gitalypb.RepositoryService_GetRawChangesServer, repo *
if err != nil {
return fmt.Errorf("start git diff: %v", err)
}
p := rawdiff.NewParser(diffCmd)
var chunk []*gitalypb.GetRawChangesResponse_RawChange
p := rawdiff.NewParser(diffCmd)
chunker := chunk.New(&rawChangesSender{stream: stream})
for {
d, err := p.NextDiff()
if err == io.EOF {
break // happy path
}
if err != nil {
if err == io.EOF {
break // happy path
}
return fmt.Errorf("read diff: %v", err)
}
......@@ -80,21 +80,8 @@ func getRawChanges(stream gitalypb.RepositoryService_GetRawChangesServer, repo *
if err != nil {
return fmt.Errorf("build change from diff line: %v", err)
}
chunk = append(chunk, change)
const chunkSize = 50
if len(chunk) >= chunkSize {
resp := &gitalypb.GetRawChangesResponse{RawChanges: chunk}
if err := stream.Send(resp); err != nil {
return fmt.Errorf("send response: %v", err)
}
chunk = nil
}
}
if len(chunk) > 0 {
resp := &gitalypb.GetRawChangesResponse{RawChanges: chunk}
if err := stream.Send(resp); err != nil {
if err := chunker.Send(change); err != nil {
return fmt.Errorf("send response: %v", err)
}
}
......@@ -103,7 +90,21 @@ func getRawChanges(stream gitalypb.RepositoryService_GetRawChangesServer, repo *
return fmt.Errorf("wait git diff: %v", err)
}
return nil
return chunker.Flush()
}
type rawChangesSender struct {
stream gitalypb.RepositoryService_GetRawChangesServer
changes []*gitalypb.GetRawChangesResponse_RawChange
}
func (s *rawChangesSender) Reset() { s.changes = nil }
func (s *rawChangesSender) Append(it chunk.Item) {
s.changes = append(s.changes, it.(*gitalypb.GetRawChangesResponse_RawChange))
}
func (s *rawChangesSender) Send() error {
response := &gitalypb.GetRawChangesResponse{RawChanges: s.changes}
return s.stream.Send(response)
}
// Ordinarily, Git uses 0000000000000000000000000000000000000000, the
......
# pack-refs with: peeled fully-peeled sorted
1450cd639e0bc6721eb02800169e464f212cde06 foo/bar
259a6fba859cc91c54cd86a2cbd4c2f720e3a19d foo/bar:baz
d0a293c0ac821fadfdc086fe528f79423004229d refs/foo/bar:baz
21751bf5cb2b556543a11018c1f13b35e44a99d7 tags/v0.0.1
498214de67004b1da3d820901307bed2a68a8ef6 keep-around/498214de67004b1da3d820901307bed2a68a8ef6
38008cb17ce1466d8fec2dfa6f6ab8dcfe5cf49e merge-requests/11/head
c347ca2e140aa667b968e51ed0ffe055501fe4f4 environments/3/head
4ed78158b5b018c43005cec917129861541e25bc notes/commits
\ No newline at end of file
......@@ -76,6 +76,7 @@ module GitalyServer
end
end
# Post 11.10 this method can be removed
def delete_refs(request, call)
repo = Gitlab::Git::Repository.from_gitaly(request.repository, call)
......
This diff is collapsed.
# proxy
--
import "github.com/mwitkow/grpc-proxy/proxy"
Package proxy provides a reverse proxy handler for gRPC.
The implementation allows a `grpc.Server` to pass a received ServerStream to a
ClientStream without understanding the semantics of the messages exchanged. It
basically provides a transparent reverse-proxy.
This package is intentionally generic, exposing a `StreamDirector` function that
allows users of this package to implement whatever logic of backend-picking,
dialing and service verification to perform.
See examples on documented functions.
## Usage
#### func Codec
```go
func Codec() grpc.Codec
```
Codec returns a proxying grpc.Codec with the default protobuf codec as parent.
See CodecWithParent.
#### func CodecWithParent
```go
func CodecWithParent(fallback grpc.Codec) grpc.Codec
```
CodecWithParent returns a proxying grpc.Codec with a user provided codec as
parent.
This codec is *crucial* to the functioning of the proxy. It allows the proxy
server to be oblivious to the schema of the forwarded messages. It basically
treats a gRPC message frame as raw bytes. However, if the server handler, or the
client caller are not proxy-internal functions it will fall back to trying to
decode the message using a fallback codec.
#### func RegisterService
```go
func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string)
```
RegisterService sets up a proxy handler for a particular gRPC service and
method. The behaviour is the same as if you were registering a handler method,
e.g. from a codegenerated pb.go file.
This can *only* be used if the `server` also uses grpcproxy.CodecForServer()
ServerOption.
#### func TransparentHandler
```go
func TransparentHandler(director StreamDirector) grpc.StreamHandler
```
TransparentHandler returns a handler that attempts to proxy all requests that
are not registered in the server. The indented use here is as a transparent
proxy, where the server doesn't know about the services implemented by the
backends. It should be used as a `grpc.UnknownServiceHandler`.
This can *only* be used if the `server` also uses grpcproxy.CodecForServer()
ServerOption.
#### type StreamDirector
```go
type StreamDirector func(ctx context.Context, fullMethodName string) (*grpc.ClientConn, error)
```
StreamDirector returns a gRPC ClientConn to be used to forward the call to.
The presence of the `Context` allows for rich filtering, e.g. based on Metadata
(headers). If no handling is meant to be done, a `codes.NotImplemented` gRPC
error should be returned.
It is worth noting that the StreamDirector will be fired *after* all server-side
stream interceptors are invoked. So decisions around authorization, monitoring
etc. are better to be handled there.
See the rather rich example.
# proxy
--
import "github.com/mwitkow/grpc-proxy/proxy"
Package proxy provides a reverse proxy handler for gRPC.
The implementation allows a `grpc.Server` to pass a received ServerStream to a
ClientStream without understanding the semantics of the messages exchanged. It
basically provides a transparent reverse-proxy.
This package is intentionally generic, exposing a `StreamDirector` function that
allows users of this package to implement whatever logic of backend-picking,
dialing and service verification to perform.
See examples on documented functions.
## Usage
#### func Codec
```go
func Codec() grpc.Codec
```
Codec returns a proxying grpc.Codec with the default protobuf codec as parent.
See CodecWithParent.
#### func CodecWithParent
```go
func CodecWithParent(fallback grpc.Codec) grpc.Codec
```
CodecWithParent returns a proxying grpc.Codec with a user provided codec as
parent.
This codec is *crucial* to the functioning of the proxy. It allows the proxy
server to be oblivious to the schema of the forwarded messages. It basically
treats a gRPC message frame as raw bytes. However, if the server handler, or the
client caller are not proxy-internal functions it will fall back to trying to
decode the message using a fallback codec.
#### func RegisterService
```go
func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string)
```
RegisterService sets up a proxy handler for a particular gRPC service and
method. The behaviour is the same as if you were registering a handler method,
e.g. from a codegenerated pb.go file.
This can *only* be used if the `server` also uses grpcproxy.CodecForServer()
ServerOption.
#### func TransparentHandler
```go
func TransparentHandler(director StreamDirector) grpc.StreamHandler
```
TransparentHandler returns a handler that attempts to proxy all requests that
are not registered in the server. The indented use here is as a transparent
proxy, where the server doesn't know about the services implemented by the
backends. It should be used as a `grpc.UnknownServiceHandler`.
This can *only* be used if the `server` also uses grpcproxy.CodecForServer()
ServerOption.
#### type StreamDirector
```go
type StreamDirector func(ctx context.Context, fullMethodName string) (*grpc.ClientConn, error)
```
StreamDirector returns a gRPC ClientConn to be used to forward the call to.
The presence of the `Context` allows for rich filtering, e.g. based on Metadata
(headers). If no handling is meant to be done, a `codes.NotImplemented` gRPC
error should be returned.
It is worth noting that the StreamDirector will be fired *after* all server-side
stream interceptors are invoked. So decisions around authorization, monitoring
etc. are better to be handled there.
See the rather rich example.
package proxy
import (
"fmt"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
)
// Codec returns a proxying grpc.Codec with the default protobuf codec as parent.
//
// See CodecWithParent.
func Codec() grpc.Codec {
return CodecWithParent(&protoCodec{})
}
// CodecWithParent returns a proxying grpc.Codec with a user provided codec as parent.
//
// This codec is *crucial* to the functioning of the proxy. It allows the proxy server to be oblivious
// to the schema of the forwarded messages. It basically treats a gRPC message frame as raw bytes.
// However, if the server handler, or the client caller are not proxy-internal functions it will fall back
// to trying to decode the message using a fallback codec.
func CodecWithParent(fallback grpc.Codec) grpc.Codec {
return &rawCodec{fallback}
}
type rawCodec struct {
parentCodec grpc.Codec
}
type frame struct {
payload []byte
}
func (c *rawCodec) Marshal(v interface{}) ([]byte, error) {
out, ok := v.(*frame)
if !ok {
return c.parentCodec.Marshal(v)
}
return out.payload, nil
}
func (c *rawCodec) Unmarshal(data []byte, v interface{}) error {
dst, ok := v.(*frame)
if !ok {
return c.parentCodec.Unmarshal(data, v)
}
dst.payload = data
return nil
}
func (c *rawCodec) String() string {
return fmt.Sprintf("proxy>%s", c.parentCodec.String())
}
// protoCodec is a Codec implementation with protobuf. It is the default rawCodec for gRPC.
type protoCodec struct{}
func (protoCodec) Marshal(v interface{}) ([]byte, error) {
return proto.Marshal(v.(proto.Message))
}
func (protoCodec) Unmarshal(data []byte, v interface{}) error {
return proto.Unmarshal(data, v.(proto.Message))
}
func (protoCodec) String() string {
return "proto"
}
// Copyright 2017 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
package proxy
import (
"golang.org/x/net/context"
"google.golang.org/grpc"
)
// StreamDirector returns a gRPC ClientConn to be used to forward the call to.
//
// The presence of the `Context` allows for rich filtering, e.g. based on Metadata (headers).
// If no handling is meant to be done, a `codes.NotImplemented` gRPC error should be returned.
//
// The context returned from this function should be the context for the *outgoing* (to backend) call. In case you want
// to forward any Metadata between the inbound request and outbound requests, you should do it manually. However, you
// *must* propagate the cancel function (`context.WithCancel`) of the inbound context to the one returned.
//
// It is worth noting that the StreamDirector will be fired *after* all server-side stream interceptors
// are invoked. So decisions around authorization, monitoring etc. are better to be handled there.
//
// See the rather rich example.
type StreamDirector func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error)
// Copyright 2017 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
/*
Package proxy provides a reverse proxy handler for gRPC.
The implementation allows a `grpc.Server` to pass a received ServerStream to a ClientStream without understanding
the semantics of the messages exchanged. It basically provides a transparent reverse-proxy.
This package is intentionally generic, exposing a `StreamDirector` function that allows users of this package
to implement whatever logic of backend-picking, dialing and service verification to perform.
See examples on documented functions.
*/
package proxy
// Copyright 2017 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
package proxy
import (
"io"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
var (
clientStreamDescForProxying = &grpc.StreamDesc{
ServerStreams: true,
ClientStreams: true,
}
)
// RegisterService sets up a proxy handler for a particular gRPC service and method.
// The behaviour is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file.
//
// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption.
func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) {
streamer := &handler{director}
fakeDesc := &grpc.ServiceDesc{
ServiceName: serviceName,
HandlerType: (*interface{})(nil),
}
for _, m := range methodNames {
streamDesc := grpc.StreamDesc{
StreamName: m,
Handler: streamer.handler,
ServerStreams: true,
ClientStreams: true,
}
fakeDesc.Streams = append(fakeDesc.Streams, streamDesc)
}
server.RegisterService(fakeDesc, streamer)
}
// TransparentHandler returns a handler that attempts to proxy all requests that are not registered in the server.
// The indented use here is as a transparent proxy, where the server doesn't know about the services implemented by the
// backends. It should be used as a `grpc.UnknownServiceHandler`.
//
// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption.
func TransparentHandler(director StreamDirector) grpc.StreamHandler {
streamer := &handler{director}
return streamer.handler
}
type handler struct {
director StreamDirector
}
// handler is where the real magic of proxying happens.
// It is invoked like any gRPC server stream and uses the gRPC server framing to get and receive bytes from the wire,
// forwarding it to a ClientStream established against the relevant ClientConn.
func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error {
// little bit of gRPC internals never hurt anyone
fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
if !ok {
return grpc.Errorf(codes.Internal, "lowLevelServerStream not exists in context")
}
// We require that the director's returned context inherits from the serverStream.Context().
outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName)
if err != nil {
return err
}
clientCtx, clientCancel := context.WithCancel(outgoingCtx)
// TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For.
clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName)
if err != nil {
return err
}
// Explicitly *do not close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
// Channels do not have to be closed, it is just a control flow mechanism, see
// https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
s2cErrChan := s.forwardServerToClient(serverStream, clientStream)
c2sErrChan := s.forwardClientToServer(clientStream, serverStream)
// We don't know which side is going to stop sending first, so we need a select between the two.
for i := 0; i < 2; i++ {
select {
case s2cErr := <-s2cErrChan:
if s2cErr == io.EOF {
// this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./
// the clientStream>serverStream may continue pumping though.
clientStream.CloseSend()
break
} else {
// however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
// to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
// exit with an error to the stack
clientCancel()
return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
}
case c2sErr := <-c2sErrChan:
// This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
// cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
// will be nil.
serverStream.SetTrailer(clientStream.Trailer())
// c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
if c2sErr != io.EOF {
return c2sErr
}
return nil
}
}
return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
}
func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error {
ret := make(chan error, 1)
go func() {
f := &frame{}
for i := 0; ; i++ {
if err := src.RecvMsg(f); err != nil {
ret <- err // this can be io.EOF which is happy case
break
}
if i == 0 {
// This is a bit of a hack, but client to server headers are only readable after first client msg is
// received but must be written to server stream before the first msg is flushed.
// This is the only place to do it nicely.
md, err := src.Header()
if err != nil {
ret <- err
break
}
if err := dst.SendHeader(md); err != nil {
ret <- err
break
}
}
if err := dst.SendMsg(f); err != nil {
ret <- err
break
}
}
}()
return ret
}
func (s *handler) forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error {
ret := make(chan error, 1)
go func() {
f := &frame{}
for i := 0; ; i++ {
if err := src.RecvMsg(f); err != nil {
ret <- err // this can be io.EOF which is happy case
break
}
if err := dst.SendMsg(f); err != nil {
ret <- err
break
}
}
}()
return ret
}
......@@ -228,6 +228,12 @@
"version": "v1",
"versionExact": "v1.0.1"
},
{
"checksumSHA1": "603wOI+hKduL75jXGGxfLKP5GE4=",
"path": "github.com/mwitkow/grpc-proxy/proxy",
"revision": "0f1106ef9c766333b9acb4b81e705da4bade7215",
"revisionTime": "2018-10-17T16:41:39Z"
},
{
"checksumSHA1": "jqEjDv//suCrQUg8iOGI7oxwfRU=",
"path": "github.com/opentracing/opentracing-go",
......@@ -502,14 +508,6 @@
"version": "v1.10.0",
"versionExact": "v1.10.0"
},
{
"checksumSHA1": "R6fNN36q3UydLODupK0vdx9h/CY=",
"path": "gitlab.com/gitlab-org/labkit/correlation",
"revision": "2a3e1f5415a890402696dcbb2c31b5a79c17b579",
"revisionTime": "2019-01-08T10:46:58Z",
"version": "master",
"versionExact": "master"
},
{
"checksumSHA1": "R6fNN36q3UydLODupK0vdx9h/CY=",
"path": "gitlab.com/gitlab-org/labkit/correlation",
......@@ -518,6 +516,14 @@
"version": "=d-tracing",
"versionExact": "d-tracing"
},
{
"checksumSHA1": "R6fNN36q3UydLODupK0vdx9h/CY=",
"path": "gitlab.com/gitlab-org/labkit/correlation",
"revision": "2a3e1f5415a890402696dcbb2c31b5a79c17b579",
"revisionTime": "2019-01-08T10:46:58Z",
"version": "master",
"versionExact": "master"
},
{
"checksumSHA1": "UFBFulprWZHuL9GHhjCKoHXm+Ww=",
"path": "gitlab.com/gitlab-org/labkit/correlation/grpc",
......