Skip to content
Snippets Groups Projects

Full MVC of gitlab-zoekt-indexer

Merged Dylan Griffith requested to merge add-new-api-paths into main
All threads resolved!
Compare and Show latest version
4 files
+ 206
116
Compare changes
  • Side-by-side
  • Inline
Files
4
@@ -6,7 +6,6 @@ import (
@@ -6,7 +6,6 @@ import (
"errors"
"errors"
"flag"
"flag"
"fmt"
"fmt"
"io"
"log"
"log"
"net/http"
"net/http"
"os"
"os"
@@ -19,6 +18,7 @@ import (
@@ -19,6 +18,7 @@ import (
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/client_golang/prometheus/promhttp"
 
"gitlab.com/gitlab-org/search-team/gitlab-zoekt-indexer/correlation"
"gitlab.com/gitlab-org/search-team/gitlab-zoekt-indexer/indexer"
"gitlab.com/gitlab-org/search-team/gitlab-zoekt-indexer/indexer"
)
)
@@ -29,6 +29,7 @@ type options struct {
@@ -29,6 +29,7 @@ type options struct {
type indexServer struct {
type indexServer struct {
indexDir string
indexDir string
 
indexBuilder indexBuilder
promRegistry *prometheus.Registry
promRegistry *prometheus.Registry
metricsRequestsTotal *prometheus.CounterVec
metricsRequestsTotal *prometheus.CounterVec
}
}
@@ -43,11 +44,50 @@ type gitalyConnectionInfo struct {
@@ -43,11 +44,50 @@ type gitalyConnectionInfo struct {
type indexRequest struct {
type indexRequest struct {
Timeout string
Timeout string
RepoID uint32
RepoID uint32
CorrelationID string
GitalyConnectionInfo *gitalyConnectionInfo
GitalyConnectionInfo *gitalyConnectionInfo
FileSizeLimit int
FileSizeLimit int
}
}
 
type indexBuilder interface {
 
indexRepository(ctx context.Context, req indexRequest, indexDir string) error
 
}
 
 
type defaultIndexBuilder struct{}
 
 
func (b defaultIndexBuilder) indexRepository(ctx context.Context, req indexRequest, indexDir string) error {
 
if req.GitalyConnectionInfo == nil {
 
return errors.New("gitalyConnectionInfo is not set")
 
}
 
 
if req.FileSizeLimit == 0 {
 
return errors.New("fileSizeLimit is not set")
 
}
 
 
timeout, err := time.ParseDuration(req.Timeout)
 
if err != nil {
 
return fmt.Errorf("failed to parse Timeout: %v with error %v", req.Timeout, err)
 
}
 
 
ctx, cancel := context.WithTimeout(ctx, timeout)
 
defer cancel()
 
 
idx := &indexer.Indexer{
 
IndexDir: indexDir,
 
ProjectID: req.RepoID,
 
CorrelationID: correlation.GetCorrelationID(ctx),
 
GitalyAddress: req.GitalyConnectionInfo.Address,
 
GitalyStorageName: req.GitalyConnectionInfo.Storage,
 
GitalyRelativePath: req.GitalyConnectionInfo.Path,
 
LimitFileSize: req.FileSizeLimit,
 
}
 
 
if err := idx.IndexRepository(ctx); err != nil {
 
return err
 
}
 
 
return nil
 
}
 
func (s *indexServer) createIndexDir() {
func (s *indexServer) createIndexDir() {
if err := os.MkdirAll(s.indexDir, 0o755); err != nil {
if err := os.MkdirAll(s.indexDir, 0o755); err != nil {
log.Fatalf("createIndexDir %s: %v", s.indexDir, err)
log.Fatalf("createIndexDir %s: %v", s.indexDir, err)
@@ -57,6 +97,11 @@ func (s *indexServer) createIndexDir() {
@@ -57,6 +97,11 @@ func (s *indexServer) createIndexDir() {
func (s *indexServer) handleStatus() http.HandlerFunc {
func (s *indexServer) handleStatus() http.HandlerFunc {
route := "status"
route := "status"
 
type response struct {
 
Success bool
 
SHA string
 
}
 
return func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
param := chi.URLParam(r, "id")
param := chi.URLParam(r, "id")
repoID, err := strconv.ParseUint(param, 10, 32)
repoID, err := strconv.ParseUint(param, 10, 32)
@@ -83,12 +128,12 @@ func (s *indexServer) handleStatus() http.HandlerFunc {
@@ -83,12 +128,12 @@ func (s *indexServer) handleStatus() http.HandlerFunc {
return
return
}
}
response := map[string]any{
resp := response{
"Success": true,
Success: true,
"SHA": currentSHA,
SHA: currentSHA,
}
}
s.respondWith(w, r, route, response)
s.respondWith(w, r, route, resp)
}
}
}
}
@@ -98,79 +143,76 @@ func (s *indexServer) handleMetrics() http.HandlerFunc {
@@ -98,79 +143,76 @@ func (s *indexServer) handleMetrics() http.HandlerFunc {
}
}
}
}
 
func (s *indexServer) decode(r *http.Request, v interface{}) error {
 
dec := json.NewDecoder(r.Body)
 
dec.DisallowUnknownFields()
 
return dec.Decode(v)
 
}
 
func (s *indexServer) handleIndex() http.HandlerFunc {
func (s *indexServer) handleIndex() http.HandlerFunc {
route := "index"
route := "index"
parseRequest := func(r io.Reader) (indexRequest, error) {
type response struct {
dec := json.NewDecoder(r)
Success bool
dec.DisallowUnknownFields()
}
 
 
parseRequest := func(r *http.Request) (indexRequest, error) {
var req indexRequest
var req indexRequest
err := dec.Decode(&req)
err := s.decode(r, &req)
if err != nil {
if err != nil {
return req, errors.New("json parser error")
return req, errors.New("json parser error")
}
}
if req.GitalyConnectionInfo == nil {
return req, errors.New("gitalyConnectionInfo Error")
}
if req.FileSizeLimit == 0 {
return req, errors.New("fileSizeLimit is not set")
}
return req, nil
return req, nil
}
}
return func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
req, err := parseRequest(r.Body)
req, err := parseRequest(r)
if err != nil {
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
http.Error(w, err.Error(), http.StatusBadRequest)
return
return
}
}
err = s.indexRepository(req)
err = s.indexBuilder.indexRepository(r.Context(), req, s.indexDir)
if err != nil {
if err != nil {
s.respondWithError(w, r, route, err)
s.respondWithError(w, r, route, err)
return
return
}
}
response := map[string]any{
resp := response{
"Success": true,
Success: true,
}
}
s.respondWith(w, r, route, response)
s.respondWith(w, r, route, resp)
}
}
}
}
func (s *indexServer) indexRepository(req indexRequest) error {
func (s *indexServer) handleTruncate() http.HandlerFunc {
timeout, err := time.ParseDuration(req.Timeout)
route := "truncate"
if err != nil {
return fmt.Errorf("failed to parse Timeout: %v with error %v", req.Timeout, err)
type response struct {
 
Success bool
}
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
emptyDirectory := func(dir string) error {
defer cancel()
files, err := os.ReadDir(dir)
idx := &indexer.Indexer{
if err != nil {
IndexDir: s.indexDir,
return err
ProjectID: req.RepoID,
}
CorrelationID: req.CorrelationID,
GitalyAddress: req.GitalyConnectionInfo.Address,
GitalyStorageName: req.GitalyConnectionInfo.Storage,
GitalyRelativePath: req.GitalyConnectionInfo.Path,
LimitFileSize: req.FileSizeLimit,
}
if err := idx.IndexRepository(ctx); err != nil {
for _, file := range files {
return err
filePath := filepath.Join(dir, file.Name())
}
err := os.RemoveAll(filePath)
 
if err != nil {
 
return err
 
}
 
}
return nil
return nil
}
}
func (s *indexServer) handleTruncate() http.HandlerFunc {
route := "truncate"
return func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
err := emptyDirectory(s.indexDir)
err := emptyDirectory(s.indexDir)
@@ -181,11 +223,11 @@ func (s *indexServer) handleTruncate() http.HandlerFunc {
@@ -181,11 +223,11 @@ func (s *indexServer) handleTruncate() http.HandlerFunc {
return
return
}
}
response := map[string]any{
resp := response{
"Success": true,
Success: true,
}
}
s.respondWith(w, r, route, response)
s.respondWith(w, r, route, resp)
}
}
}
}
@@ -194,24 +236,31 @@ func (s *indexServer) respondWith(w http.ResponseWriter, r *http.Request, route
@@ -194,24 +236,31 @@ func (s *indexServer) respondWith(w http.ResponseWriter, r *http.Request, route
if err := json.NewEncoder(w).Encode(data); err != nil {
if err := json.NewEncoder(w).Encode(data); err != nil {
s.respondWithError(w, r, route, err)
s.respondWithError(w, r, route, err)
 
return
}
}
s.incrementRequestsTotal(r.Method, route, http.StatusOK)
s.incrementRequestsTotal(r.Method, route, http.StatusOK)
}
}
func (s *indexServer) respondWithError(w http.ResponseWriter, r *http.Request, route string, err error) {
func (s *indexServer) respondWithError(w http.ResponseWriter, r *http.Request, route string, err error) {
responseCode := http.StatusInternalServerError
type response struct {
 
Success bool
 
Error string
 
}
s.incrementRequestsTotal(r.Method, route, responseCode)
responseCode := http.StatusInternalServerError
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(responseCode)
w.WriteHeader(responseCode)
response := map[string]any{
"Success": false,
resp := response{
"Error": err.Error(),
Success: false,
 
Error: err.Error(),
}
}
_ = json.NewEncoder(w).Encode(response)
_ = json.NewEncoder(w).Encode(resp)
 
 
s.incrementRequestsTotal(r.Method, route, responseCode)
}
}
func (s *indexServer) incrementRequestsTotal(method, route string, responseCode int) {
func (s *indexServer) incrementRequestsTotal(method, route string, responseCode int) {
@@ -236,24 +285,6 @@ func (s *indexServer) initMetrics() {
@@ -236,24 +285,6 @@ func (s *indexServer) initMetrics() {
)
)
}
}
func emptyDirectory(dir string) error {
files, err := os.ReadDir(dir)
if err != nil {
return err
}
for _, file := range files {
filePath := filepath.Join(dir, file.Name())
err := os.RemoveAll(filePath)
if err != nil {
return err
}
}
return nil
}
func parseOptions() options {
func parseOptions() options {
indexDir := flag.String("index_dir", "", "directory holding index shards.")
indexDir := flag.String("index_dir", "", "directory holding index shards.")
listen := flag.String("listen", ":6060", "listen on this address.")
listen := flag.String("listen", ":6060", "listen on this address.")
@@ -273,7 +304,8 @@ func main() {
@@ -273,7 +304,8 @@ func main() {
opts := parseOptions()
opts := parseOptions()
server := indexServer{
server := indexServer{
indexDir: opts.indexDir,
indexDir: opts.indexDir,
 
indexBuilder: defaultIndexBuilder{},
}
}
server.startIndexingApi(opts.listen)
server.startIndexingApi(opts.listen)
@@ -282,11 +314,15 @@ func main() {
@@ -282,11 +314,15 @@ func main() {
func (s *indexServer) startIndexingApi(listen string) {
func (s *indexServer) startIndexingApi(listen string) {
s.initMetrics()
s.initMetrics()
s.createIndexDir()
s.createIndexDir()
router := s.router()
 
httpServer := http.Server{
 
Addr: listen,
 
Handler: s.router(),
 
}
log.Printf("Starting server on %s", listen)
log.Printf("Starting server on %s", listen)
if err := http.ListenAndServe(listen, router); err != nil {
if err := httpServer.ListenAndServe(); err != nil {
log.Fatal(err)
log.Fatal(err)
}
}
}
}
Loading