...
 
Commits (31)
......@@ -14,3 +14,4 @@
*.socket
git-env
/gitaly-debug
/praefect
......@@ -117,12 +117,6 @@ cover:
stage: test
script:
- make cover
after_script:
- bash <(curl -s https://codecov.io/bash) -t "${CODECOV_TOKEN}" -f _build/cover/all.merged -F unittests
artifacts:
paths:
- _build/cover/all.html
expire_in: 1 week
code_quality:
image: docker:stable
......
# Gitaly changelog
## v1.19.1
#### Fixed
- Use empty tree if initial commit
https://gitlab.com/gitlab-org/gitaly/merge_requests/1075
## v1.19.0
#### Fixed
- Return blank checksum for git repositories with only invalid refs
https://gitlab.com/gitlab-org/gitaly/merge_requests/1065
#### Other
- Use chunker in GetRawChanges
https://gitlab.com/gitlab-org/gitaly/merge_requests/1043
## v1.18.0
#### Other
- Make clear there is no []byte reuse bug in SearchFilesByContent
https://gitlab.com/gitlab-org/gitaly/merge_requests/1055
- Use chunker in FindCommits
https://gitlab.com/gitlab-org/gitaly/merge_requests/1059
- Statically link jaeger into Gitaly by default
https://gitlab.com/gitlab-org/gitaly/merge_requests/1063
## v1.17.0
#### Other
- Add glProjectPath to logs
https://gitlab.com/gitlab-org/gitaly/merge_requests/1049
- Switch from commitsSender to chunker
https://gitlab.com/gitlab-org/gitaly/merge_requests/1060
#### Security
- Disable git protocol v2 temporarily
https://gitlab.com/gitlab-org/gitaly/merge_requests/
## v1.16.0
......@@ -33,6 +71,12 @@
- Rewrite CommitStats in Go
https://gitlab.com/gitlab-org/gitaly/merge_requests/1048
## v1.14.1
#### Security
- Disable git protocol v2 temporarily
https://gitlab.com/gitlab-org/gitaly/merge_requests/
## v1.14.0
#### Fixed
......@@ -89,6 +133,20 @@
- Remove unused Ruby rebase/squash code
https://gitlab.com/gitlab-org/gitaly/merge_requests/1033
## v1.12.2
#### Security
- Disable git protocol v2 temporarily
https://gitlab.com/gitlab-org/gitaly/merge_requests/
## v1.12.1
#### Fixed
- Fix flaky rebase test
https://gitlab.com/gitlab-org/gitaly/merge_requests/1028
- Fix regression for https_proxy and unix socket connections
https://gitlab.com/gitlab-org/gitaly/merge_requests/1032
## v1.12.0
#### Fixed
......@@ -137,6 +195,16 @@
- README cleanup
https://gitlab.com/gitlab-org/gitaly/merge_requests/996
## v1.7.2
#### Other
- Fix tests failing due to test-repo change
https://gitlab.com/gitlab-org/gitaly/merge_requests/1004
#### Security
- Disable git protocol v2 temporarily
https://gitlab.com/gitlab-org/gitaly/merge_requests/
## v1.7.1
#### Other
......@@ -255,6 +323,16 @@
- Support SSH credentials for push mirroring
https://gitlab.com/gitlab-org/gitaly/merge_requests/959
## v0.129.1
#### Other
- Fix tests failing due to test-repo change
https://gitlab.com/gitlab-org/gitaly/merge_requests/1004
#### Security
- Disable git protocol v2 temporarily
https://gitlab.com/gitlab-org/gitaly/merge_requests/
## v0.129.0
#### Added
......
This diff is collapsed.
......@@ -232,6 +232,7 @@ PREFIX ?= /usr/local
INSTALL_DEST_DIR := $(DESTDIR)$(PREFIX)/bin/
BUNDLE_FLAGS ?= --deployment
ASSEMBLY_ROOT ?= {{ .BuildDir }}/assembly
BUILD_TAGS := tracer_static tracer_static_jaeger
unexport GOROOT
unexport GOBIN
......
---
title: Switch from commitsSender to chunker
merge_request: 1060
author:
type: other
---
title: Initial design document for High Availability
merge_request: 1058
author:
type: other
---
title: Reverse proxy pass thru for HA
merge_request: 1064
author:
type: other
---
title: Add glProjectPath to logs
merge_request: 1049
author:
type: other
package main
import (
"context"
"flag"
"net"
"os"
"os/signal"
"syscall"
"time"
log "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
)
var (
flagConfig = flag.String("config", "", "Location for the config.toml")
logger = log.New()
)
func main() {
flag.Parse()
logger.Formatter = &log.JSONFormatter{}
logger.WithField("config", *flagConfig).Info("reading config file")
conf, err := config.FromFile(*flagConfig)
if err != nil {
logger.Fatalf("%s", err)
}
if err := conf.Validate(); err != nil {
logger.Fatalf("%s", err)
}
l, err := net.Listen("tcp", conf.ListenAddr)
if err != nil {
logger.Fatalf("%s", err)
}
logger.WithField("address", conf.ListenAddr).Info("listening at tcp address")
logger.Fatalf("%v", run(l, conf))
}
func run(l net.Listener, conf *config.Config) error {
// TODO: Add gRPC server opts as first argument
srv := praefect.NewServer(nil, log.New())
for _, gitaly := range conf.GitalyServers {
// TODO: set appropriate DialOptions
conn, err := client.Dial(gitaly.ListenAddr, nil)
if err != nil {
return err
}
srv.RegisterNode(gitaly.Name, conn)
logger.WithField("gitaly listen addr", gitaly.ListenAddr).Info("registered gitaly node")
}
signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT}
termCh := make(chan os.Signal, len(signals))
signal.Notify(termCh, signals...)
serverErrors := make(chan error, 1)
go func() { serverErrors <- srv.Start(l) }()
var err error
select {
case s := <-termCh:
logger.WithField("signal", s).Warn("received signal, shutting down gracefully")
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
err = srv.Shutdown(ctx)
case err = <-serverErrors:
}
return err
}
# Example Praefect configuration file
# # TCP address to listen on
listen_addr = "localhost:2305"
# socket_path = "/home/git/gitlab/tmp/sockets/private/praefect.socket"
# # One or more Gitaly servers need to be configured to be managed. The names
# of each server are used to link multiple nodes, or `gitaly_server`s together
# as shard. listen_addr should be unique for all nodes.
# Requires the protocol to be defined, e.g. tcp://host.tld:1234
[[gitaly_server]]
name = "default"
listen_addr = "tcp://localhost:9999"
......@@ -294,6 +294,14 @@ Now you can run the one test you're interested in:
go test -count 1 -run TestRepositoryExists ./internal/service/repository
```
When writing tests, prefer using [testify]'s [require], and [assert] as
methods to set expectations over functions like `t.Fatal()` and others directly
called on `testing.T`.
[testify]: https://github.com/stretchr/testify
[require]: https://github.com/stretchr/testify/tree/master/require
[assert]: https://github.com/stretchr/testify/tree/master/assert
#### Rspec tests
It is possible to write end-to-end Rspec tests that run against a full
......
# Gitaly High Availability (HA) Design
Gitaly HA is an active-active cluster configuration for resilient git operations. [Refer to our specific requirements](https://gitlab.com/gitlab-org/gitaly/issues/1332).
Refer to epic &289 for current issues and discussions revolving around HA MVC development.
## Terminology
The following terminology may be used within the context of the Gitaly HA project:
- Shard - partition of the storage for all repos. Each shard will require redundancy in the form of multiple Gitaly nodes (at least 3 when optimal) to maintain HA.
- Praefect - a transparent front end to all Gitaly shards. This reverse proxy ensures that all gRPC calls are forwarded to the correct shard by consulting the coordinator. The reverse proxy also ensures that write actions are performed transactionally when needed.
- etymology: from Latin praefectus for _a person appointed to any of various positions of command, authority, or superintendence, as a chief magistrate in ancient Rome or the chief administrative official of a department of France or Italy._
- [pronounced _pree-fect_](https://www.youtube.com/watch?v=MHszCZjPmTQ)
- Node (TODO: we probably need a similar latin name here) - performs the actual git read/write operations to/from disk. Has no knowledge of shards/prafects/coordinators just as the Gitaly service existed prior to HA.
## Design
The high level design takes a reverse proxy approach to fanning out write requests to the appropriate nodes:
<img src="https://docs.google.com/drawings/d/e/2PACX-1vRl7WS-6RBOWxyLSBbBBAoV9MupmTh5vTqMOw_AX9axlboqkybTbFqGqExLyyYOilqEW7S9euXdBHzX/pub?w=960&amp;h=720">
## Phases
An iterative low risk approach needs to be devised to add functionality and verify assumptions at a sustainable pace while not impeding the existing functionality.
### 1. Simple pass-through proxy - no added functionality
- allows us to set up telemetry for observability of new service
- allows us to evaluate a gRPC proxy library
### 2. Introduce State
The following details need to be persisted in Postgres:
- [x] Primary location for a project
- [ ] Redundant locations for a project
- [ ] Available storage locations (initially can be configuration file)
Initially, the state of the shard nodes will be static and loaded from a configuration file. Eventually, this will be made dynamic via a data store (Postgres).
### Resolving Location
The following existing interaction will remain intact for the first iteration of the HA feature:
```mermaid
sequenceDiagram
Client->>Rails: Modify repo X
Rails-->>Datastore: Where is Repo X?
Datastore-->> Rails: Repo X is at location A
Rails-->>Gitaly: Modify repo X at location A
Gitaly-->>Rails: Request succeeded/failed
```
Once the Rails app has resolved the primary location for the project, the request is made to the praefect. The praefect then resolves the redundant locations via the coordinator before applying the changes.
```mermaid
sequenceDiagram
Rails->>Praefect: Modify repo X at A
Praefect->>Coordinator: Which locations complement A for X?
Coordinator->>Praefect: Locations B and C complement A
Praefect->>Nodes ABC: Modify repo X
Nodes ABC->>Praefect: Modifications successful!
```
*Note: the above interaction between the praefect and nodes A-B-C is an all-or-nothing transaction. All nodes must complete in success, otherwise a single node failure will cause the entire transaction to fail. This will be improved when replication is introduced.*
### 3. Replication
The next phase is to enable replication of data between nodes. This makes transactions more efficient and fault tolerant. This could be done a few ways:
#### Node Orchestrated [👎]
Node orchestrated puts the intelligence of replication into one of the nodes being modified:
```mermaid
sequenceDiagram
Praefect->>Node A: Modify repo X
activate Node A
Node A->>Node B: Modify repo X
Node A->>Node C: Modify repo X
Node A->>Praefect: Modification successful!
```
Orchestration requires designating a leader node for the transaction. This leader node becomes a critical path for all nodes involved. Ideally, we want several simpler (less riskier) operations that can succeed/fail independently of each other. This way, failure and recovery can be handled externally of the nodes.
#### Praefect Orchestrated [👍]
With the praefect orchestrating replication, we are isolating the critical path to a stateless service. Stateless services are preferred for the critical path since another praefect can pick up the task after a praefect failure.
```mermaid
sequenceDiagram
Praefect->>Node A: Modify repo X
Node A->>Praefect: Success!
Praefect->>Node B: Replicate From A
Praefect->>Node C: Replicate From A
Node B->>Praefect: Success!
Node C->>Praefect: Success!
```
*Note: Once Node-A propagates changes to a peer, Node-A is no longer the critical path for subsequent propagations. If Node-A fails after a second peer is propagated, that second peer can become the new leader and resume replications.*
## Notes
* Existing discussions
* Requirements: https://gitlab.com/gitlab-org/gitaly/issues/1332
* Design: https://gitlab.com/gitlab-org/gitaly/issues/1335
* Prior art
* Stemma by Palantir
* [Announcement](https://medium.com/palantir/stemma-distributed-git-server-70afbca0fc29)
* Extends jgit (java git implementation)
* Spokes by GitHub
* Application layer approach: uses underlying git software to propagate changes to other locations.
* Bitbucket Data Center (BDC)
* [BDC FAQ](https://confluence.atlassian.com/enterprise/bitbucket-data-center-faq-776663707.html)
* Ketch by Google (no longer maintained)
* [Sid's comment on performance issue](https://news.ycombinator.com/item?id=13934698)
* Also jgit based
* gRPC proxy considerations
* [gRPC Proxy library](https://github.com/mwitkow/grpc-proxy)
* Pros
* Handles all gRPC requests generically
* Cons
* Lack of support
* [See current importers of project](https://godoc.org/github.com/mwitkow/grpc-proxy/proxy?importers)
* Low level implementation requires knowledge of gRPC internals
* Custom code generation
* Pros
* Simple and maintainable
* Allows us to handwrite proxy code and later automate with lessons learned via code generation
* Cons
* Process heavy; requires custom tooling
* Requires a way to tell which methods are read/write
* [See WIP for marking modifying RPCs](https://gitlab.com/gitlab-org/gitaly-proto/merge_requests/228)
* See also:
* [nRPC](https://github.com/nats-rpc/nrpc) - gRPC via NATS
* [grpclb](https://github.com/bsm/grpclb) - gRPC load balancer
* Complications
* Existing Rails app indicates the Gitaly instance that a request is destined for (e.g. request to modify repo X should be directed to gitaly #1).
* This means that rails app must be kept in the loop about any changes made to the location of a repo.
* This may be mitigated by changing the proxy implementation to intepret the destination address as a reference to a shard rather than a specific host. This might open the door to allowing for something like consistent hashing.
* While Git is distributed in nature, some write operations need to be serialized to avoid race conditions. This includes ref updates.
* How do we coordinate proxies when applying ref updates? Do we need to?
......@@ -49,6 +49,17 @@ type GitlabShell struct {
// Git contains the settings for the Git executable
type Git struct {
BinPath string `toml:"bin_path"`
// ProtocolV2Enabled can be set to true to enable the newer Git protocol
// version. This should not be enabled until GitLab *either* stops
// using transfer.hideRefs for security purposes, *or* Git protocol v2
// respects this setting:
//
// https://public-inbox.org/git/20181213155817.27666-1-avarab@gmail.com/T/
//
// This is not user-configurable. Once a new Git version has been released,
// we can add code to enable it if the detected git binary is new enough
ProtocolV2Enabled bool
}
// Storage contains a single storage-shard
......
......@@ -7,6 +7,8 @@ import (
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/internal/config"
)
const (
......@@ -39,9 +41,13 @@ func AddGitProtocolEnv(ctx context.Context, req RequestWithGitProtocol, env []st
service, method := methodFromContext(ctx)
if req.GetGitProtocol() == ProtocolV2 {
env = append(env, fmt.Sprintf("GIT_PROTOCOL=%s", ProtocolV2))
if config.Config.Git.ProtocolV2Enabled {
env = append(env, fmt.Sprintf("GIT_PROTOCOL=%s", ProtocolV2))
gitProtocolRequests.WithLabelValues(service, method, "v2").Inc()
gitProtocolRequests.WithLabelValues(service, method, "v2").Inc()
} else {
gitProtocolRequests.WithLabelValues(service, method, "v2-rejected").Inc()
}
} else {
gitProtocolRequests.WithLabelValues(service, method, "v0").Inc()
}
......
package git
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/config"
)
type fakeProtocolMessage struct {
protocol string
}
func (f fakeProtocolMessage) GetGitProtocol() string {
return f.protocol
}
func setGitProtocolV2Support(value bool) func() {
orig := config.Config.Git.ProtocolV2Enabled
config.Config.Git.ProtocolV2Enabled = value
return func() {
config.Config.Git.ProtocolV2Enabled = orig
}
}
func TestAddGitProtocolEnvRespectsConfigEnabled(t *testing.T) {
restore := setGitProtocolV2Support(true)
defer restore()
env := []string{"fake=value"}
msg := fakeProtocolMessage{protocol: "version=2"}
value := AddGitProtocolEnv(context.Background(), msg, env)
require.Equal(t, value, append(env, "GIT_PROTOCOL=version=2"))
}
func TestAddGitProtocolEnvWhenV2NotRequested(t *testing.T) {
restore := setGitProtocolV2Support(true)
defer restore()
env := []string{"fake=value"}
msg := fakeProtocolMessage{protocol: ""}
value := AddGitProtocolEnv(context.Background(), msg, env)
require.Equal(t, value, env)
}
func TestAddGitProtocolEnvRespectsConfigDisabled(t *testing.T) {
restore := setGitProtocolV2Support(false)
defer restore()
env := []string{"fake=value"}
msg := fakeProtocolMessage{protocol: "GIT_PROTOCOL=version=2"}
value := AddGitProtocolEnv(context.Background(), msg, env)
require.Equal(t, value, env)
}
package config
import (
"fmt"
"os"
"github.com/BurntSushi/toml"
)
// Config is a container for everything found in the TOML config file
type Config struct {
ListenAddr string `toml:"listen_addr" split_words:"true"`
GitalyServers []*GitalyServer `toml:"gitaly_server", split_words:"true"`
}
// GitalyServer allows configuring the servers that RPCs are proxied to
type GitalyServer struct {
Name string `toml:"name"`
ListenAddr string `toml:"listen_addr" split_words:"true"`
}
// FromFile loads the config for the passed file path
func FromFile(filePath string) (*Config, error) {
cfgFile, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer cfgFile.Close()
config := &Config{}
_, err = toml.DecodeReader(cfgFile, config)
return config, err
}
// Validate establishes if the config is valid
func (c *Config) Validate() error {
if c.ListenAddr == "" {
return fmt.Errorf("no listen address configured")
}
if len(c.GitalyServers) == 0 {
return fmt.Errorf("no gitaly backends configured")
}
listenAddrs := make(map[string]bool, len(c.GitalyServers))
for _, gitaly := range c.GitalyServers {
if gitaly.Name == "" {
return fmt.Errorf("expect %q to have a name", gitaly)
}
if _, found := listenAddrs[gitaly.ListenAddr]; found {
return fmt.Errorf("gitaly listen_addr: %s is not unique", gitaly.ListenAddr)
}
listenAddrs[gitaly.ListenAddr] = true
}
return nil
}
package config
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestConfigValidation(t *testing.T) {
gitalySrvs := []*GitalyServer{&GitalyServer{"test", "localhost:23456"}}
testCases := []struct {
desc string
config *Config
expectError bool
}{
{
desc: "No ListenAddr",
config: &Config{"", gitalySrvs},
expectError: true,
},
{
desc: "No servers",
config: &Config{"localhost:1234", nil},
expectError: true,
},
{
desc: "duplicate address",
config: &Config{"localhost:1234", []*GitalyServer{gitalySrvs[0], gitalySrvs[0]}},
expectError: true,
},
{
desc: "Valid config",
config: &Config{"localhost:1234", gitalySrvs},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t1 *testing.T) {
err := tc.config.Validate()
if tc.expectError {
require.Error(t1, err)
} else {
require.NoError(t1, err)
}
})
}
}
package praefect_test
import (
"context"
"gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
"google.golang.org/grpc"
)
// mockRepoSvc is a mock implementation of gitalypb.RepositoryServiceServer
// for testing purposes
type mockRepoSvc struct {
srv *grpc.Server
}
func (m *mockRepoSvc) RepositoryExists(context.Context, *gitalypb.RepositoryExistsRequest) (*gitalypb.RepositoryExistsResponse, error) {
return &gitalypb.RepositoryExistsResponse{}, nil
}
func (m *mockRepoSvc) RepackIncremental(context.Context, *gitalypb.RepackIncrementalRequest) (*gitalypb.RepackIncrementalResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) RepackFull(context.Context, *gitalypb.RepackFullRequest) (*gitalypb.RepackFullResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) GarbageCollect(context.Context, *gitalypb.GarbageCollectRequest) (*gitalypb.GarbageCollectResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) RepositorySize(context.Context, *gitalypb.RepositorySizeRequest) (*gitalypb.RepositorySizeResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) ApplyGitattributes(context.Context, *gitalypb.ApplyGitattributesRequest) (*gitalypb.ApplyGitattributesResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) FetchRemote(context.Context, *gitalypb.FetchRemoteRequest) (*gitalypb.FetchRemoteResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) CreateRepository(context.Context, *gitalypb.CreateRepositoryRequest) (*gitalypb.CreateRepositoryResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) GetArchive(*gitalypb.GetArchiveRequest, gitalypb.RepositoryService_GetArchiveServer) error {
return nil
}
func (m *mockRepoSvc) HasLocalBranches(context.Context, *gitalypb.HasLocalBranchesRequest) (*gitalypb.HasLocalBranchesResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) FetchSourceBranch(context.Context, *gitalypb.FetchSourceBranchRequest) (*gitalypb.FetchSourceBranchResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) Fsck(context.Context, *gitalypb.FsckRequest) (*gitalypb.FsckResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) WriteRef(context.Context, *gitalypb.WriteRefRequest) (*gitalypb.WriteRefResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) FindMergeBase(context.Context, *gitalypb.FindMergeBaseRequest) (*gitalypb.FindMergeBaseResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) CreateFork(context.Context, *gitalypb.CreateForkRequest) (*gitalypb.CreateForkResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) IsRebaseInProgress(context.Context, *gitalypb.IsRebaseInProgressRequest) (*gitalypb.IsRebaseInProgressResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) IsSquashInProgress(context.Context, *gitalypb.IsSquashInProgressRequest) (*gitalypb.IsSquashInProgressResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) CreateRepositoryFromURL(context.Context, *gitalypb.CreateRepositoryFromURLRequest) (*gitalypb.CreateRepositoryFromURLResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) CreateBundle(*gitalypb.CreateBundleRequest, gitalypb.RepositoryService_CreateBundleServer) error {
return nil
}
func (m *mockRepoSvc) CreateRepositoryFromBundle(gitalypb.RepositoryService_CreateRepositoryFromBundleServer) error {
return nil
}
func (m *mockRepoSvc) WriteConfig(context.Context, *gitalypb.WriteConfigRequest) (*gitalypb.WriteConfigResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) SetConfig(context.Context, *gitalypb.SetConfigRequest) (*gitalypb.SetConfigResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) DeleteConfig(context.Context, *gitalypb.DeleteConfigRequest) (*gitalypb.DeleteConfigResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) FindLicense(context.Context, *gitalypb.FindLicenseRequest) (*gitalypb.FindLicenseResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) GetInfoAttributes(*gitalypb.GetInfoAttributesRequest, gitalypb.RepositoryService_GetInfoAttributesServer) error {
return nil
}
func (m *mockRepoSvc) CalculateChecksum(context.Context, *gitalypb.CalculateChecksumRequest) (*gitalypb.CalculateChecksumResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) Cleanup(context.Context, *gitalypb.CleanupRequest) (*gitalypb.CleanupResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) GetSnapshot(*gitalypb.GetSnapshotRequest, gitalypb.RepositoryService_GetSnapshotServer) error {
return nil
}
func (m *mockRepoSvc) CreateRepositoryFromSnapshot(context.Context, *gitalypb.CreateRepositoryFromSnapshotRequest) (*gitalypb.CreateRepositoryFromSnapshotResponse, error) {
return nil, nil
}
func (m *mockRepoSvc) GetRawChanges(*gitalypb.GetRawChangesRequest, gitalypb.RepositoryService_GetRawChangesServer) error {
return nil
}
func (m *mockRepoSvc) SearchFilesByContent(*gitalypb.SearchFilesByContentRequest, gitalypb.RepositoryService_SearchFilesByContentServer) error {
return nil
}
func (m *mockRepoSvc) SearchFilesByName(*gitalypb.SearchFilesByNameRequest, gitalypb.RepositoryService_SearchFilesByNameServer) error {
return nil
}
func (m *mockRepoSvc) RestoreCustomHooks(gitalypb.RepositoryService_RestoreCustomHooksServer) error {
return nil
}
func (m *mockRepoSvc) BackupCustomHooks(*gitalypb.BackupCustomHooksRequest, gitalypb.RepositoryService_BackupCustomHooksServer) error {
return nil
}
/*Package praefect is a Gitaly reverse proxy for transparently routing gRPC
calls to a set of Gitaly services.*/
package praefect
import (
"context"
"errors"
"fmt"
"net"
"sync"
"github.com/mwitkow/grpc-proxy/proxy"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Logger is a simple interface that allows loggers to be dependency injected
// into the praefect server
type Logger interface {
Debugf(format string, args ...interface{})
}
// Coordinator takes care of directing client requests to the appropriate
// downstream server. The coordinator is thread safe; concurrent calls to
// register nodes are safe.
type Coordinator struct {
log Logger
lock sync.RWMutex
nodes map[string]*grpc.ClientConn
}
// newCoordinator returns a new Coordinator that utilizes the provided logger
func newCoordinator(l Logger) *Coordinator {
return &Coordinator{
log: l,
nodes: make(map[string]*grpc.ClientConn),
}
}
// streamDirector determines which downstream servers receive requests
func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
// For phase 1, we need to route messages based on the storage location
// to the appropriate Gitaly node.
c.log.Debugf("Stream director received method %s", fullMethodName)
// TODO: obtain storage location dynamically from RPC request message
storageLoc := "test"
c.lock.RLock()
cc, ok := c.nodes[storageLoc]
c.lock.RUnlock()
if !ok {
err := status.Error(
codes.FailedPrecondition,
fmt.Sprintf("no downstream node for storage location %q", storageLoc),
)
return nil, nil, err
}
return ctx, cc, nil
}
// Server is a praefect server
type Server struct {
*Coordinator
s *grpc.Server
}
// NewServer returns an initialized praefect gPRC proxy server configured
// with the provided gRPC server options
func NewServer(grpcOpts []grpc.ServerOption, l Logger) *Server {
c := newCoordinator(l)
grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector)...)
return &Server{
s: grpc.NewServer(grpcOpts...),
Coordinator: c,
}
}
// ErrStorageLocExists indicates a storage location has already been registered
// in the proxy for a downstream Gitaly node
var ErrStorageLocExists = errors.New("storage location already registered")
// RegisterNode will direct traffic to the supplied downstream connection when the storage location
// is encountered.
//
// TODO: Coordinator probably needs to handle dialing, or another entity
// needs to handle dialing to ensure keep alives and redialing logic
// exist for when downstream connections are severed.
func (c *Coordinator) RegisterNode(storageLoc string, node *grpc.ClientConn) {
c.lock.Lock()
c.nodes[storageLoc] = node
c.lock.Unlock()
}
func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption {
return []grpc.ServerOption{
grpc.CustomCodec(proxy.Codec()),
grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
}
}
// Start will start the praefect gRPC proxy server listening at the provided
// listener. Function will block until the server is stopped or an
// unrecoverable error occurs.
func (srv *Server) Start(lis net.Listener) error {
return srv.s.Serve(lis)
}
// Shutdown will attempt a graceful shutdown of the grpc server. If unable
// to gracefully shutdown within the context deadline, it will then
// forcefully shutdown the server and return a context cancellation error.
func (srv *Server) Shutdown(ctx context.Context) error {
done := make(chan struct{})
go func() {
srv.s.GracefulStop()
close(done)
}()
select {
case <-ctx.Done():
srv.s.Stop()
return ctx.Err()
case <-done:
return nil
}
}
package praefect_test
import (
"context"
"fmt"
"net"
"testing"
"time"
"github.com/mwitkow/grpc-proxy/proxy"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/praefect"
"google.golang.org/grpc"
)
func TestServerRouting(t *testing.T) {
prf := praefect.NewServer(nil, testLogger{t})
listener, port := listenAvailPort(t)
t.Logf("proxy listening on port %d", port)
defer listener.Close()
errQ := make(chan error)
go func() {
errQ <- prf.Start(listener)
}()
// dial client to proxy
cc := dialLocalPort(t, port, false)
defer cc.Close()
gCli := gitalypb.NewRepositoryServiceClient(cc)
mCli, _, cleanup := newMockDownstream(t)
defer cleanup() // clean up mock downstream server resources
prf.RegisterNode("test", mCli)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, err := gCli.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{})
require.NoError(t, err)
err = prf.Shutdown(ctx)
require.NoError(t, err)
require.NoError(t, <-errQ)
}
func listenAvailPort(tb testing.TB) (net.Listener, int) {
listener, err := net.Listen("tcp", ":0")
require.NoError(tb, err)
return listener, listener.Addr().(*net.TCPAddr).Port
}
func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn {
opts := []grpc.DialOption{
grpc.WithBlock(),
}
if backend {
opts = append(
opts,
grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
)
}
cc, err := client.Dial(
fmt.Sprintf("tcp://localhost:%d", port),
opts,
)
require.NoError(tb, err)
return cc
}
type testLogger struct {
testing.TB
}
func (tl testLogger) Debugf(format string, args ...interface{}) {
tl.TB.Logf(format, args...)
}
// initializes and returns a client to downstream server, downstream server, and cleanup function
func newMockDownstream(tb testing.TB) (*grpc.ClientConn, gitalypb.RepositoryServiceServer, func()) {
// setup mock server
m := &mockRepoSvc{
srv: grpc.NewServer(),
}
gitalypb.RegisterRepositoryServiceServer(m.srv, m)
lis, port := listenAvailPort(tb)
// dial praefect to backend service
cc := dialLocalPort(tb, port, true)
errQ := make(chan error)
go func() {
errQ <- m.srv.Serve(lis)
}()
cleanup := func() {
m.srv.GracefulStop()
lis.Close()
cc.Close()
require.NoError(tb, <-errQ)
}
return cc, m, cleanup
}
......@@ -13,6 +13,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/internal/git/log"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/helper/chunk"
)
const commitsPerPage int = 20
......@@ -115,40 +116,37 @@ func (g *GetCommits) Commit() (*gitalypb.GitCommit, error) {
return commit, nil
}
type findCommitsSender struct {
stream gitalypb.CommitService_FindCommitsServer
commits []*gitalypb.GitCommit
}
func (s *findCommitsSender) Reset() { s.commits = nil }
func (s *findCommitsSender) Append(it chunk.Item) {
s.commits = append(s.commits, it.(*gitalypb.GitCommit))
}
func (s *findCommitsSender) Send() error {
return s.stream.Send(&gitalypb.FindCommitsResponse{Commits: s.commits})
}
func streamPaginatedCommits(getCommits *GetCommits, commitsPerPage int, stream gitalypb.CommitService_FindCommitsServer) error {
var commitPage []*gitalypb.GitCommit
chunker := chunk.New(&findCommitsSender{stream: stream})
for getCommits.Scan() {
commit, err := getCommits.Commit()
if err != nil {
return err
}
commitPage = append(commitPage, commit)
if len(commitPage) == commitsPerPage {
if err := stream.Send(
&gitalypb.FindCommitsResponse{
Commits: commitPage,
},
); err != nil {
return fmt.Errorf("error when sending stream response: %v", err)
}
commitPage = nil
if err := chunker.Send(commit); err != nil {
return err
}
}
if getCommits.Err() != nil {
return fmt.Errorf("get commits: %v", getCommits.Err())
}
// send the last page
if len(commitPage) > 0 {
if err := stream.Send(
&gitalypb.FindCommitsResponse{
Commits: commitPage,
},
); err != nil {
return fmt.Errorf("error when sending stream response: %v", err)
}
}
return nil
return chunker.Flush()
}
func getLogCommandFlags(req *gitalypb.FindCommitsRequest) []string {
......
......@@ -35,7 +35,15 @@ func commitStats(ctx context.Context, in *gitalypb.CommitStatsRequest) (*gitalyp
return nil, fmt.Errorf("commit not found: %q", in.Revision)
}
cmd, err := git.Command(ctx, in.Repository, "diff", "--numstat", commit.Id+"^", commit.Id)
args := []string{"diff", "--numstat"}
if len(commit.GetParentIds()) == 0 {
args = append(args, git.EmptyTreeID, commit.Id)
} else {
args = append(args, commit.Id+"^", commit.Id)
}
cmd, err := git.Command(ctx, in.Repository, args...)
if err != nil {
return nil, err
}
......
......@@ -58,6 +58,13 @@ func TestCommitStatsSuccess(t *testing.T) {
additions: 0,
deletions: 0,
},
{
desc: "initial commit",
revision: "1a0b36b3",
oid: "1a0b36b3cdad1d2ee32457c102a8c0b7056fa863",
additions: 43,
deletions: 0,
},
}
for _, tc := range tests {
......
......@@ -71,7 +71,7 @@ func (s *server) CalculateChecksum(ctx context.Context, in *gitalypb.CalculateCh
return nil, status.Errorf(codes.Internal, err.Error())
}
if err := cmd.Wait(); err != nil {
if err := cmd.Wait(); checksum == nil || err != nil {
if isValidRepo(ctx, repo) {
return &gitalypb.CalculateChecksumResponse{Checksum: blankChecksum}, nil
}
......
......@@ -143,3 +143,29 @@ func TestFailedCalculateChecksum(t *testing.T) {
testhelper.RequireGrpcError(t, err, testCase.code)
}
}
func TestInvalidRefsCalculateChecksum(t *testing.T) {
server, serverSocketPath := runRepoServer(t)
defer server.Stop()
client, conn := newRepositoryClient(t, serverSocketPath)
defer conn.Close()
testRepo, testRepoPath, cleanupFn := testhelper.NewTestRepo(t)
defer cleanupFn()
// Force the refs database of testRepo into a known state
require.NoError(t, os.RemoveAll(path.Join(testRepoPath, "refs")))
for _, d := range []string{"refs/heads", "refs/tags", "refs/notes"} {
require.NoError(t, os.MkdirAll(path.Join(testRepoPath, d), 0755))
}
require.NoError(t, exec.Command("cp", "testdata/checksum-test-invalid-refs", path.Join(testRepoPath, "packed-refs")).Run())
request := &gitalypb.CalculateChecksumRequest{Repository: testRepo}
testCtx, cancelCtx := testhelper.Context()
defer cancelCtx()
response, err := client.CalculateChecksum(testCtx, request)
require.NoError(t, err)
require.Equal(t, "0000000000000000000000000000000000000000", response.Checksum)
}
......@@ -11,6 +11,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/internal/git/rawdiff"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/helper/chunk"
)
func (s *server) GetRawChanges(req *gitalypb.GetRawChangesRequest, stream gitalypb.RepositoryService_GetRawChangesServer) error {
......@@ -62,17 +63,16 @@ func getRawChanges(stream gitalypb.RepositoryService_GetRawChangesServer, repo *
if err != nil {
return fmt.Errorf("start git diff: %v", err)
}
p := rawdiff.NewParser(diffCmd)
var chunk []*gitalypb.GetRawChangesResponse_RawChange
p := rawdiff.NewParser(diffCmd)
chunker := chunk.New(&rawChangesSender{stream: stream})
for {
d, err := p.NextDiff()
if err == io.EOF {
break // happy path
}
if err != nil {
if err == io.EOF {
break // happy path
}
return fmt.Errorf("read diff: %v", err)
}
......@@ -80,21 +80,8 @@ func getRawChanges(stream gitalypb.RepositoryService_GetRawChangesServer, repo *
if err != nil {
return fmt.Errorf("build change from diff line: %v", err)
}
chunk = append(chunk, change)
const chunkSize = 50
if len(chunk) >= chunkSize {
resp := &gitalypb.GetRawChangesResponse{RawChanges: chunk}
if err := stream.Send(resp); err != nil {
return fmt.Errorf("send response: %v", err)
}
chunk = nil
}
}
if len(chunk) > 0 {
resp := &gitalypb.GetRawChangesResponse{RawChanges: chunk}
if err := stream.Send(resp); err != nil {
if err := chunker.Send(change); err != nil {
return fmt.Errorf("send response: %v", err)
}
}
......@@ -103,7 +90,21 @@ func getRawChanges(stream gitalypb.RepositoryService_GetRawChangesServer, repo *
return fmt.Errorf("wait git diff: %v", err)
}
return nil
return chunker.Flush()
}
type rawChangesSender struct {
stream gitalypb.RepositoryService_GetRawChangesServer
changes []*gitalypb.GetRawChangesResponse_RawChange
}
func (s *rawChangesSender) Reset() { s.changes = nil }
func (s *rawChangesSender) Append(it chunk.Item) {
s.changes = append(s.changes, it.(*gitalypb.GetRawChangesResponse_RawChange))
}
func (s *rawChangesSender) Send() error {
response := &gitalypb.GetRawChangesResponse{RawChanges: s.changes}
return s.stream.Send(response)
}
// Ordinarily, Git uses 0000000000000000000000000000000000000000, the
......
......@@ -122,8 +122,13 @@ func sendSearchFilesResultChunked(cmd *command.Command, stream gitalypb.Reposito
scanner := bufio.NewScanner(cmd)
for scanner.Scan() {
line := append(scanner.Bytes(), '\n')
if bytes.Equal(line, contentDelimiter) {
// Intentionally avoid scanner.Bytes() because that returns a []byte that
// becomes invalid on the next loop iteration, and we want to hold on to
// the contents of the current line for a while. Scanner.Text() is a
// string and hence immutable.
line := scanner.Text() + "\n"
if line == string(contentDelimiter) {
if err := sendMatchInChunks(buf, stream); err != nil {
return err
}
......
# pack-refs with: peeled fully-peeled sorted
1450cd639e0bc6721eb02800169e464f212cde06 foo/bar
259a6fba859cc91c54cd86a2cbd4c2f720e3a19d foo/bar:baz
d0a293c0ac821fadfdc086fe528f79423004229d refs/foo/bar:baz
21751bf5cb2b556543a11018c1f13b35e44a99d7 tags/v0.0.1
498214de67004b1da3d820901307bed2a68a8ef6 keep-around/498214de67004b1da3d820901307bed2a68a8ef6
38008cb17ce1466d8fec2dfa6f6ab8dcfe5cf49e merge-requests/11/head
c347ca2e140aa667b968e51ed0ffe055501fe4f4 environments/3/head
4ed78158b5b018c43005cec917129861541e25bc notes/commits
\ No newline at end of file
......@@ -10,7 +10,6 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/streamio"
......@@ -54,10 +53,8 @@ func TestSuccessfulInfoRefsUploadPackWithGitConfigOptions(t *testing.T) {
}
func TestSuccessfulInfoRefsUploadPackWithGitProtocol(t *testing.T) {
defer func(old string) {
config.Config.Git.BinPath = old
}(config.Config.Git.BinPath)
config.Config.Git.BinPath = "../../testhelper/env_git"
restore := testhelper.EnableGitProtocolV2Support()
defer restore()
server, serverSocketPath := runSmartHTTPServer(t)
defer server.Stop()
......@@ -148,7 +145,7 @@ func TestFailureRepoNotFoundInfoRefsReceivePack(t *testing.T) {
client, conn := newSmartHTTPClient(t, serverSocketPath)
defer conn.Close()
repo := &gitalypb.Repository{StorageName: "default", RelativePath: "testdata/data/another_repo"}
repo := &gitalypb.Repository{StorageName: "default", RelativePath: "testdata/scratch/another_repo"}
rpcRequest := &gitalypb.InfoRefsRequest{Repository: repo}
ctx, cancel := context.WithCancel(context.Background())
......
......@@ -68,10 +68,8 @@ func TestSuccessfulReceivePackRequest(t *testing.T) {
}
func TestSuccessfulReceivePackRequestWithGitProtocol(t *testing.T) {
defer func(old string) {
config.Config.Git.BinPath = old
}(config.Config.Git.BinPath)
config.Config.Git.BinPath = "../../testhelper/env_git"
restore := testhelper.EnableGitProtocolV2Support()
defer restore()
server, serverSocketPath := runSmartHTTPServer(t)
defer server.Stop()
......
../../../testhelper/env_git
\ No newline at end of file
......@@ -15,7 +15,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/git"