Skip to content
Snippets Groups Projects

Full MVC of gitlab-zoekt-indexer

Merged Dylan Griffith requested to merge add-new-api-paths into main
All threads resolved!
Compare and Show latest version
6 files
+ 313
224
Compare changes
  • Side-by-side
  • Inline
Files
6
@@ -6,7 +6,6 @@ import (
@@ -6,7 +6,6 @@ import (
"errors"
"errors"
"flag"
"flag"
"fmt"
"fmt"
"io"
"log"
"log"
"net/http"
"net/http"
"os"
"os"
@@ -14,20 +13,22 @@ import (
@@ -14,20 +13,22 @@ import (
"strconv"
"strconv"
"time"
"time"
 
"github.com/go-chi/chi/v5"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/client_golang/prometheus/promhttp"
 
"gitlab.com/gitlab-org/search-team/gitlab-zoekt-indexer/correlation"
"gitlab.com/gitlab-org/search-team/gitlab-zoekt-indexer/indexer"
"gitlab.com/gitlab-org/search-team/gitlab-zoekt-indexer/indexer"
)
)
type Options struct {
type options struct {
indexDir string
indexDir string
listen string
listen string
}
}
type indexServer struct {
type indexServer struct {
opts Options
indexDir string
promRegistry *prometheus.Registry
promRegistry *prometheus.Registry
metricsRequestsTotal *prometheus.CounterVec
metricsRequestsTotal *prometheus.CounterVec
}
}
@@ -42,145 +43,217 @@ type gitalyConnectionInfo struct {
@@ -42,145 +43,217 @@ type gitalyConnectionInfo struct {
type indexRequest struct {
type indexRequest struct {
Timeout string
Timeout string
RepoID uint32
RepoID uint32
CorrelationID string
GitalyConnectionInfo *gitalyConnectionInfo
GitalyConnectionInfo *gitalyConnectionInfo
FileSizeLimit int
FileSizeLimit int
}
}
type statusRequest struct {
func (s *indexServer) indexRepository(req indexRequest, ctx context.Context) error {
RepoID uint32
timeout, err := time.ParseDuration(req.Timeout)
}
if err != nil {
 
return fmt.Errorf("failed to parse Timeout: %v with error %v", req.Timeout, err)
 
}
 
 
ctx, cancel := context.WithTimeout(ctx, timeout)
 
defer cancel()
func (o *Options) createIndexDir() {
idx := &indexer.Indexer{
if err := os.MkdirAll(o.indexDir, 0o755); err != nil {
IndexDir: s.indexDir,
log.Fatalf("createIndexDir %s: %v", o.indexDir, err)
ProjectID: req.RepoID,
 
CorrelationID: correlation.GetCorrelationID(ctx),
 
GitalyAddress: req.GitalyConnectionInfo.Address,
 
GitalyStorageName: req.GitalyConnectionInfo.Storage,
 
GitalyRelativePath: req.GitalyConnectionInfo.Path,
 
LimitFileSize: req.FileSizeLimit,
}
}
 
 
if err := idx.IndexRepository(ctx); err != nil {
 
return err
 
}
 
 
return nil
}
}
func (s *indexServer) serveHealthCheck(w http.ResponseWriter, r *http.Request) {
func (s *indexServer) createIndexDir() {
// Nothing to do. Just return 200
if err := os.MkdirAll(s.indexDir, 0o755); err != nil {
 
log.Fatalf("createIndexDir %s: %v", s.indexDir, err)
 
}
}
}
func (s *indexServer) serveStatus(w http.ResponseWriter, r *http.Request) {
func (s *indexServer) handleStatus() http.HandlerFunc {
route := "status"
route := "status"
dec := json.NewDecoder(r.Body)
type response struct {
dec.DisallowUnknownFields()
Success bool
var req statusRequest
SHA string
err := dec.Decode(&req)
if err != nil {
http.Error(w, "JSON parser error", http.StatusBadRequest)
return
}
}
idx := &indexer.Indexer{
return func(w http.ResponseWriter, r *http.Request) {
IndexDir: s.opts.indexDir,
param := chi.URLParam(r, "id")
ProjectID: req.RepoID,
repoID, err := strconv.ParseUint(param, 10, 32)
}
currentSHA, err := idx.CurrentSHA()
if err != nil {
 
http.Error(w, err.Error(), http.StatusBadRequest)
 
return
 
}
if err != nil {
idx := &indexer.Indexer{
s.respondWithError(w, r.Method, route, err)
IndexDir: s.indexDir,
return
ProjectID: uint32(repoID),
}
}
response := map[string]any{
currentSHA, err := idx.CurrentSHA()
"Success": true,
"SHA": currentSHA,
}
if err != nil {
if err != nil {
s.respondWithError(w, r.Method, route, err)
s.respondWithError(w, r, route, err)
return
return
}
}
w.Header().Set("Content-Type", "application/json")
if err != nil {
_ = json.NewEncoder(w).Encode(response)
s.respondWithError(w, r, route, err)
 
return
 
}
s.incrementRequestsTotal(r.Method, route, http.StatusOK)
resp := response{
 
Success: true,
 
SHA: currentSHA,
 
}
 
 
s.respondWith(w, r, route, resp)
 
}
}
}
func (s *indexServer) serveMetrics(w http.ResponseWriter, r *http.Request) {
func (s *indexServer) handleMetrics() http.HandlerFunc {
promhttp.HandlerFor(s.promRegistry, promhttp.HandlerOpts{Registry: s.promRegistry}).ServeHTTP(w, r)
return func(w http.ResponseWriter, r *http.Request) {
 
promhttp.HandlerFor(s.promRegistry, promhttp.HandlerOpts{Registry: s.promRegistry}).ServeHTTP(w, r)
 
}
}
}
func parseRequest(r io.Reader) (indexRequest, error) {
func (s *indexServer) decode(r *http.Request, v interface{}) error {
dec := json.NewDecoder(r)
dec := json.NewDecoder(r.Body)
dec.DisallowUnknownFields()
dec.DisallowUnknownFields()
var req indexRequest
return dec.Decode(v)
err := dec.Decode(&req)
}
if err != nil {
func (s *indexServer) handleIndex() http.HandlerFunc {
return req, errors.New("json parser error")
route := "index"
}
if req.GitalyConnectionInfo == nil {
type response struct {
return req, errors.New("gitalyConnectionInfo Error")
Success bool
}
}
if req.FileSizeLimit == 0 {
parseRequest := func(r *http.Request) (indexRequest, error) {
return req, errors.New("fileSizeLimit is not set")
var req indexRequest
}
err := s.decode(r, &req)
return req, nil
if err != nil {
}
return req, errors.New("json parser error")
 
}
func (s *indexServer) serveIndex(w http.ResponseWriter, r *http.Request) {
if req.GitalyConnectionInfo == nil {
route := "index"
return req, errors.New("gitalyConnectionInfo Error")
 
}
req, err := parseRequest(r.Body)
if req.FileSizeLimit == 0 {
if err != nil {
return req, errors.New("fileSizeLimit is not set")
http.Error(w, err.Error(), http.StatusBadRequest)
}
return
}
response, err := s.indexRepository(req)
return req, nil
if err != nil {
s.respondWithError(w, r.Method, route, err)
return
}
}
w.Header().Set("Content-Type", "application/json")
return func(w http.ResponseWriter, r *http.Request) {
_ = json.NewEncoder(w).Encode(response)
req, err := parseRequest(r)
 
if err != nil {
 
http.Error(w, err.Error(), http.StatusBadRequest)
 
return
 
}
s.incrementRequestsTotal(r.Method, route, http.StatusOK)
err = s.indexRepository(req, r.Context())
 
if err != nil {
 
s.respondWithError(w, r, route, err)
 
return
 
}
 
 
resp := response{
 
Success: true,
 
}
 
 
s.respondWith(w, r, route, resp)
 
}
}
}
func (s *indexServer) serveTruncate(w http.ResponseWriter, r *http.Request) {
func (s *indexServer) handleTruncate() http.HandlerFunc {
route := "truncate"
route := "truncate"
err := emptyDirectory(s.opts.indexDir)
if err != nil {
type response struct {
err = fmt.Errorf("failed to empty indexDir: %v with error: %v", s.opts.indexDir, err)
Success bool
 
}
s.respondWithError(w, r.Method, route, err)
emptyDirectory := func(dir string) error {
return
files, err := os.ReadDir(dir)
 
 
if err != nil {
 
return err
 
}
 
 
for _, file := range files {
 
filePath := filepath.Join(dir, file.Name())
 
err := os.RemoveAll(filePath)
 
if err != nil {
 
return err
 
}
 
}
 
 
return nil
}
}
response := map[string]any{
return func(w http.ResponseWriter, r *http.Request) {
"Success": true,
err := emptyDirectory(s.indexDir)
 
 
if err != nil {
 
err = fmt.Errorf("failed to empty indexDir: %v with error: %v", s.indexDir, err)
 
 
s.respondWithError(w, r, route, err)
 
return
 
}
 
 
resp := response{
 
Success: true,
 
}
 
 
s.respondWith(w, r, route, resp)
}
}
 
}
 
 
func (s *indexServer) respondWith(w http.ResponseWriter, r *http.Request, route string, data interface{}) {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(response)
 
if err := json.NewEncoder(w).Encode(data); err != nil {
 
s.respondWithError(w, r, route, err)
 
return
 
}
s.incrementRequestsTotal(r.Method, route, http.StatusOK)
s.incrementRequestsTotal(r.Method, route, http.StatusOK)
}
}
func (s *indexServer) respondWithError(w http.ResponseWriter, method, route string, err error) {
func (s *indexServer) respondWithError(w http.ResponseWriter, r *http.Request, route string, err error) {
responseCode := http.StatusInternalServerError
type response struct {
 
Success bool
 
Error string
 
}
s.incrementRequestsTotal(method, route, responseCode)
responseCode := http.StatusInternalServerError
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(responseCode)
w.WriteHeader(responseCode)
response := map[string]any{
"Success": false,
resp := response{
"Error": err.Error(),
Success: false,
 
Error: err.Error(),
}
}
_ = json.NewEncoder(w).Encode(response)
_ = json.NewEncoder(w).Encode(resp)
 
 
s.incrementRequestsTotal(r.Method, route, responseCode)
}
}
func (s *indexServer) incrementRequestsTotal(method, route string, responseCode int) {
func (s *indexServer) incrementRequestsTotal(method, route string, responseCode int) {
@@ -205,70 +278,7 @@ func (s *indexServer) initMetrics() {
@@ -205,70 +278,7 @@ func (s *indexServer) initMetrics() {
)
)
}
}
func (s *indexServer) startIndexingApi() {
func parseOptions() options {
s.initMetrics()
s.opts.createIndexDir()
http.HandleFunc("/", s.serveHealthCheck)
http.HandleFunc("/status", s.serveStatus)
http.HandleFunc("/index", s.serveIndex)
http.HandleFunc("/truncate", s.serveTruncate)
http.HandleFunc("/metrics", s.serveMetrics)
if err := http.ListenAndServe(s.opts.listen, nil); err != nil {
log.Fatal(err)
}
}
func emptyDirectory(dir string) error {
files, err := os.ReadDir(dir)
if err != nil {
return err
}
for _, file := range files {
filePath := filepath.Join(dir, file.Name())
err := os.RemoveAll(filePath)
if err != nil {
return err
}
}
return nil
}
func (s *indexServer) indexRepository(req indexRequest) (map[string]any, error) {
timeout, err := time.ParseDuration(req.Timeout)
if err != nil {
return nil, fmt.Errorf("failed to parse Timeout: %v with error %v", req.Timeout, err)
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
idx := &indexer.Indexer{
IndexDir: s.opts.indexDir,
ProjectID: req.RepoID,
CorrelationID: req.CorrelationID,
GitalyAddress: req.GitalyConnectionInfo.Address,
GitalyStorageName: req.GitalyConnectionInfo.Storage,
GitalyRelativePath: req.GitalyConnectionInfo.Path,
LimitFileSize: req.FileSizeLimit,
}
if err := idx.IndexRepository(ctx); err != nil {
return nil, err
}
response := map[string]any{
"Success": true,
}
return response, nil
}
func parseOptions() Options {
indexDir := flag.String("index_dir", "", "directory holding index shards.")
indexDir := flag.String("index_dir", "", "directory holding index shards.")
listen := flag.String("listen", ":6060", "listen on this address.")
listen := flag.String("listen", ":6060", "listen on this address.")
flag.Parse()
flag.Parse()
@@ -277,18 +287,34 @@ func parseOptions() Options {
@@ -277,18 +287,34 @@ func parseOptions() Options {
log.Fatal("must set -index_dir")
log.Fatal("must set -index_dir")
}
}
return Options{
return options{
indexDir: *indexDir,
indexDir: *indexDir,
listen: *listen,
listen: *listen,
}
}
}
}
func main() {
func main() {
 
opts := parseOptions()
 
server := indexServer{
server := indexServer{
opts: parseOptions(),
indexDir: opts.indexDir,
}
}
log.Printf("Starting server on %s", server.opts.listen)
server.startIndexingApi(opts.listen)
 
}
server.startIndexingApi()
func (s *indexServer) startIndexingApi(listen string) {
 
s.initMetrics()
 
s.createIndexDir()
 
 
httpServer := http.Server{
 
Addr: listen,
 
Handler: s.router(),
 
}
 
 
log.Printf("Starting server on %s", listen)
 
 
if err := httpServer.ListenAndServe(); err != nil {
 
log.Fatal(err)
 
}
}
}
Loading