Skip to content
Snippets Groups Projects

Full MVC of gitlab-zoekt-indexer

Merged Dylan Griffith requested to merge add-new-api-paths into main
All threads resolved!
Compare and Show latest version
4 files
+ 197
110
Compare changes
  • Side-by-side
  • Inline
Files
4
@@ -18,6 +18,7 @@ import (
@@ -18,6 +18,7 @@ import (
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/client_golang/prometheus/promhttp"
 
"gitlab.com/gitlab-org/search-team/gitlab-zoekt-indexer/correlation"
"gitlab.com/gitlab-org/search-team/gitlab-zoekt-indexer/indexer"
"gitlab.com/gitlab-org/search-team/gitlab-zoekt-indexer/indexer"
)
)
@@ -28,6 +29,7 @@ type options struct {
@@ -28,6 +29,7 @@ type options struct {
type indexServer struct {
type indexServer struct {
indexDir string
indexDir string
 
indexBuilder indexBuilder
promRegistry *prometheus.Registry
promRegistry *prometheus.Registry
metricsRequestsTotal *prometheus.CounterVec
metricsRequestsTotal *prometheus.CounterVec
}
}
@@ -42,11 +44,50 @@ type gitalyConnectionInfo struct {
@@ -42,11 +44,50 @@ type gitalyConnectionInfo struct {
type indexRequest struct {
type indexRequest struct {
Timeout string
Timeout string
RepoID uint32
RepoID uint32
CorrelationID string
GitalyConnectionInfo *gitalyConnectionInfo
GitalyConnectionInfo *gitalyConnectionInfo
FileSizeLimit int
FileSizeLimit int
}
}
 
type indexBuilder interface {
 
indexRepository(ctx context.Context, req indexRequest, indexDir string) error
 
}
 
 
type defaultIndexBuilder struct{}
 
 
func (b defaultIndexBuilder) indexRepository(ctx context.Context, req indexRequest, indexDir string) error {
 
if req.GitalyConnectionInfo == nil {
 
return errors.New("gitalyConnectionInfo is not set")
 
}
 
 
if req.FileSizeLimit == 0 {
 
return errors.New("fileSizeLimit is not set")
 
}
 
 
timeout, err := time.ParseDuration(req.Timeout)
 
if err != nil {
 
return fmt.Errorf("failed to parse Timeout: %v with error %v", req.Timeout, err)
 
}
 
 
ctx, cancel := context.WithTimeout(ctx, timeout)
 
defer cancel()
 
 
idx := &indexer.Indexer{
 
IndexDir: indexDir,
 
ProjectID: req.RepoID,
 
CorrelationID: correlation.GetCorrelationID(ctx),
 
GitalyAddress: req.GitalyConnectionInfo.Address,
 
GitalyStorageName: req.GitalyConnectionInfo.Storage,
 
GitalyRelativePath: req.GitalyConnectionInfo.Path,
 
LimitFileSize: req.FileSizeLimit,
 
}
 
 
if err := idx.IndexRepository(ctx); err != nil {
 
return err
 
}
 
 
return nil
 
}
 
func (s *indexServer) createIndexDir() {
func (s *indexServer) createIndexDir() {
if err := os.MkdirAll(s.indexDir, 0o755); err != nil {
if err := os.MkdirAll(s.indexDir, 0o755); err != nil {
log.Fatalf("createIndexDir %s: %v", s.indexDir, err)
log.Fatalf("createIndexDir %s: %v", s.indexDir, err)
@@ -56,6 +97,11 @@ func (s *indexServer) createIndexDir() {
@@ -56,6 +97,11 @@ func (s *indexServer) createIndexDir() {
func (s *indexServer) handleStatus() http.HandlerFunc {
func (s *indexServer) handleStatus() http.HandlerFunc {
route := "status"
route := "status"
 
type response struct {
 
Success bool
 
SHA string
 
}
 
return func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
param := chi.URLParam(r, "id")
param := chi.URLParam(r, "id")
repoID, err := strconv.ParseUint(param, 10, 32)
repoID, err := strconv.ParseUint(param, 10, 32)
@@ -82,12 +128,12 @@ func (s *indexServer) handleStatus() http.HandlerFunc {
@@ -82,12 +128,12 @@ func (s *indexServer) handleStatus() http.HandlerFunc {
return
return
}
}
response := map[string]any{
resp := response{
"Success": true,
Success: true,
"SHA": currentSHA,
SHA: currentSHA,
}
}
s.respondWith(w, r, route, response)
s.respondWith(w, r, route, resp)
}
}
}
}
@@ -106,6 +152,10 @@ func (s *indexServer) decode(r *http.Request, v interface{}) error {
@@ -106,6 +152,10 @@ func (s *indexServer) decode(r *http.Request, v interface{}) error {
func (s *indexServer) handleIndex() http.HandlerFunc {
func (s *indexServer) handleIndex() http.HandlerFunc {
route := "index"
route := "index"
 
type response struct {
 
Success bool
 
}
 
parseRequest := func(r *http.Request) (indexRequest, error) {
parseRequest := func(r *http.Request) (indexRequest, error) {
var req indexRequest
var req indexRequest
err := s.decode(r, &req)
err := s.decode(r, &req)
@@ -114,14 +164,6 @@ func (s *indexServer) handleIndex() http.HandlerFunc {
@@ -114,14 +164,6 @@ func (s *indexServer) handleIndex() http.HandlerFunc {
return req, errors.New("json parser error")
return req, errors.New("json parser error")
}
}
if req.GitalyConnectionInfo == nil {
return req, errors.New("gitalyConnectionInfo Error")
}
if req.FileSizeLimit == 0 {
return req, errors.New("fileSizeLimit is not set")
}
return req, nil
return req, nil
}
}
@@ -132,48 +174,45 @@ func (s *indexServer) handleIndex() http.HandlerFunc {
@@ -132,48 +174,45 @@ func (s *indexServer) handleIndex() http.HandlerFunc {
return
return
}
}
err = s.indexRepository(req)
err = s.indexBuilder.indexRepository(r.Context(), req, s.indexDir)
if err != nil {
if err != nil {
s.respondWithError(w, r, route, err)
s.respondWithError(w, r, route, err)
return
return
}
}
response := map[string]any{
resp := response{
"Success": true,
Success: true,
}
}
s.respondWith(w, r, route, response)
s.respondWith(w, r, route, resp)
}
}
}
}
func (s *indexServer) indexRepository(req indexRequest) error {
func (s *indexServer) handleTruncate() http.HandlerFunc {
timeout, err := time.ParseDuration(req.Timeout)
route := "truncate"
if err != nil {
return fmt.Errorf("failed to parse Timeout: %v with error %v", req.Timeout, err)
type response struct {
 
Success bool
}
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
emptyDirectory := func(dir string) error {
defer cancel()
files, err := os.ReadDir(dir)
idx := &indexer.Indexer{
if err != nil {
IndexDir: s.indexDir,
return err
ProjectID: req.RepoID,
}
CorrelationID: req.CorrelationID,
GitalyAddress: req.GitalyConnectionInfo.Address,
GitalyStorageName: req.GitalyConnectionInfo.Storage,
GitalyRelativePath: req.GitalyConnectionInfo.Path,
LimitFileSize: req.FileSizeLimit,
}
if err := idx.IndexRepository(ctx); err != nil {
for _, file := range files {
return err
filePath := filepath.Join(dir, file.Name())
}
err := os.RemoveAll(filePath)
 
if err != nil {
 
return err
 
}
 
}
return nil
return nil
}
}
func (s *indexServer) handleTruncate() http.HandlerFunc {
route := "truncate"
return func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
err := emptyDirectory(s.indexDir)
err := emptyDirectory(s.indexDir)
@@ -184,11 +223,11 @@ func (s *indexServer) handleTruncate() http.HandlerFunc {
@@ -184,11 +223,11 @@ func (s *indexServer) handleTruncate() http.HandlerFunc {
return
return
}
}
response := map[string]any{
resp := response{
"Success": true,
Success: true,
}
}
s.respondWith(w, r, route, response)
s.respondWith(w, r, route, resp)
}
}
}
}
@@ -197,24 +236,31 @@ func (s *indexServer) respondWith(w http.ResponseWriter, r *http.Request, route
@@ -197,24 +236,31 @@ func (s *indexServer) respondWith(w http.ResponseWriter, r *http.Request, route
if err := json.NewEncoder(w).Encode(data); err != nil {
if err := json.NewEncoder(w).Encode(data); err != nil {
s.respondWithError(w, r, route, err)
s.respondWithError(w, r, route, err)
 
return
}
}
s.incrementRequestsTotal(r.Method, route, http.StatusOK)
s.incrementRequestsTotal(r.Method, route, http.StatusOK)
}
}
func (s *indexServer) respondWithError(w http.ResponseWriter, r *http.Request, route string, err error) {
func (s *indexServer) respondWithError(w http.ResponseWriter, r *http.Request, route string, err error) {
responseCode := http.StatusInternalServerError
type response struct {
 
Success bool
 
Error string
 
}
s.incrementRequestsTotal(r.Method, route, responseCode)
responseCode := http.StatusInternalServerError
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(responseCode)
w.WriteHeader(responseCode)
response := map[string]any{
"Success": false,
resp := response{
"Error": err.Error(),
Success: false,
 
Error: err.Error(),
}
}
_ = json.NewEncoder(w).Encode(response)
_ = json.NewEncoder(w).Encode(resp)
 
 
s.incrementRequestsTotal(r.Method, route, responseCode)
}
}
func (s *indexServer) incrementRequestsTotal(method, route string, responseCode int) {
func (s *indexServer) incrementRequestsTotal(method, route string, responseCode int) {
@@ -239,24 +285,6 @@ func (s *indexServer) initMetrics() {
@@ -239,24 +285,6 @@ func (s *indexServer) initMetrics() {
)
)
}
}
func emptyDirectory(dir string) error {
files, err := os.ReadDir(dir)
if err != nil {
return err
}
for _, file := range files {
filePath := filepath.Join(dir, file.Name())
err := os.RemoveAll(filePath)
if err != nil {
return err
}
}
return nil
}
func parseOptions() options {
func parseOptions() options {
indexDir := flag.String("index_dir", "", "directory holding index shards.")
indexDir := flag.String("index_dir", "", "directory holding index shards.")
listen := flag.String("listen", ":6060", "listen on this address.")
listen := flag.String("listen", ":6060", "listen on this address.")
@@ -276,7 +304,8 @@ func main() {
@@ -276,7 +304,8 @@ func main() {
opts := parseOptions()
opts := parseOptions()
server := indexServer{
server := indexServer{
indexDir: opts.indexDir,
indexDir: opts.indexDir,
 
indexBuilder: defaultIndexBuilder{},
}
}
server.startIndexingApi(opts.listen)
server.startIndexingApi(opts.listen)
@@ -285,11 +314,15 @@ func main() {
@@ -285,11 +314,15 @@ func main() {
func (s *indexServer) startIndexingApi(listen string) {
func (s *indexServer) startIndexingApi(listen string) {
s.initMetrics()
s.initMetrics()
s.createIndexDir()
s.createIndexDir()
router := s.router()
 
httpServer := http.Server{
 
Addr: listen,
 
Handler: s.router(),
 
}
log.Printf("Starting server on %s", listen)
log.Printf("Starting server on %s", listen)
if err := http.ListenAndServe(listen, router); err != nil {
if err := httpServer.ListenAndServe(); err != nil {
log.Fatal(err)
log.Fatal(err)
}
}
}
}
Loading