Verified Commit ad32db33 authored by Michael Angelo Rivera's avatar Michael Angelo Rivera Committed by GitLab
Browse files

feat(indexer): configure code indexing v2 pipeline

parent ee5878e6
Loading
Loading
Loading
Loading
+6 −1
Original line number Diff line number Diff line
@@ -62,6 +62,12 @@ engine:
      concurrency_group: code
      max_attempts: 5  # event-driven, must retry to avoid data loss
      retry_interval_secs: 60
      pipeline:
        max_file_size_bytes: 5000000
        max_files: 1000000
        respect_gitignore: true
        worker_threads: 0
        max_concurrent_languages: 0
    namespace-deletion:
      concurrency_group: code
      max_attempts: 1  # re-dispatched daily, no need to retry
@@ -126,4 +132,3 @@ analytics:
billing:
  enabled: false
  collector_url: "http://localhost:9090"
+65 −0
Original line number Diff line number Diff line
@@ -52,6 +52,13 @@
          "code-indexing-task": {
            "concurrency_group": null,
            "max_attempts": null,
            "pipeline": {
              "max_concurrent_languages": 0,
              "max_file_size_bytes": 5000000,
              "max_files": 1000000,
              "respect_gitignore": true,
              "worker_threads": 0
            },
            "retry_interval_secs": null
          },
          "global-handler": {
@@ -309,6 +316,40 @@
        "username"
      ]
    },
    "CodeIndexingPipelineConfig": {
      "type": "object",
      "properties": {
        "max_concurrent_languages": {
          "type": "integer",
          "format": "uint",
          "default": 0,
          "minimum": 0
        },
        "max_file_size_bytes": {
          "type": "integer",
          "format": "uint64",
          "default": 5000000,
          "minimum": 0
        },
        "max_files": {
          "type": "integer",
          "format": "uint",
          "default": 1000000,
          "minimum": 0
        },
        "respect_gitignore": {
          "type": "boolean",
          "default": true
        },
        "worker_threads": {
          "type": "integer",
          "format": "uint",
          "default": 0,
          "minimum": 0
        }
      },
      "additionalProperties": false
    },
    "CodeIndexingTaskHandlerConfig": {
      "description": "Per-handler engine configuration (retry policy, concurrency group).\n\nEach handler embeds this via `#[serde(flatten)]` in its own typed config struct.\nThe engine reads it via `handler.engine_config()`.\n\nRetries are opt-in: a handler with no retry config will ack on failure.",
      "type": "object",
@@ -331,6 +372,16 @@
          "default": null,
          "minimum": 0
        },
        "pipeline": {
          "$ref": "#/$defs/CodeIndexingPipelineConfig",
          "default": {
            "max_concurrent_languages": 0,
            "max_file_size_bytes": 5000000,
            "max_files": 1000000,
            "respect_gitignore": true,
            "worker_threads": 0
          }
        },
        "retry_interval_secs": {
          "description": "Delay in seconds between retry attempts. Used as the NATS nack delay.\nWhen absent, nacks use immediate redelivery.",
          "type": [
@@ -394,6 +445,13 @@
            "code-indexing-task": {
              "concurrency_group": null,
              "max_attempts": null,
              "pipeline": {
                "max_concurrent_languages": 0,
                "max_file_size_bytes": 5000000,
                "max_files": 1000000,
                "respect_gitignore": true,
                "worker_threads": 0
              },
              "retry_interval_secs": null
            },
            "global-handler": {
@@ -575,6 +633,13 @@
          "default": {
            "concurrency_group": null,
            "max_attempts": null,
            "pipeline": {
              "max_concurrent_languages": 0,
              "max_file_size_bytes": 5000000,
              "max_files": 1000000,
              "respect_gitignore": true,
              "worker_threads": 0
            },
            "retry_interval_secs": null
          }
        },
+29 −0
Original line number Diff line number Diff line
@@ -191,6 +191,9 @@ pub trait LanguagePipeline {

pub struct PipelineConfig {
    pub max_file_size: u64,
    /// Max language-supported files accepted for one pipeline run.
    /// 0 = no limit.
    pub max_files: usize,
    pub respect_gitignore: bool,
    pub cancel: CancellationToken,
    /// Rayon threads per language. 0 = use all available cores.
@@ -205,6 +208,7 @@ impl Default for PipelineConfig {
    fn default() -> Self {
        Self {
            max_file_size: 1_000_000,
            max_files: 0,
            respect_gitignore: true,
            cancel: CancellationToken::new(),
            worker_threads: 0,
@@ -411,6 +415,7 @@ impl Pipeline {

    fn walk_and_group(root: &Path, config: &PipelineConfig) -> FxHashMap<Language, Vec<FileInput>> {
        let mut groups: FxHashMap<Language, Vec<FileInput>> = FxHashMap::default();
        let mut accepted_files = 0usize;

        let walker = WalkBuilder::new(root)
            .git_ignore(config.respect_gitignore)
@@ -436,6 +441,10 @@ impl Pipeline {

            let rel_path = path.strip_prefix(root).unwrap_or(path).to_string_lossy();
            if let Some(lang) = detect_language_from_extension(ext) {
                if config.max_files > 0 && accepted_files >= config.max_files {
                    break;
                }

                if lang
                    .exclude_extensions()
                    .iter()
@@ -449,6 +458,7 @@ impl Pipeline {
                    continue;
                }

                accepted_files += 1;
                groups.entry(lang).or_default().push(rel_path.to_string());
            }
        }
@@ -869,6 +879,25 @@ mod tests {
        graphs.remove(0)
    }

    #[test]
    fn walk_and_group_respects_max_files() {
        let dir = tempfile::tempdir().expect("temp dir");
        std::fs::write(dir.path().join("a.java"), "class A {}").expect("write a");
        std::fs::write(dir.path().join("b.java"), "class B {}").expect("write b");
        std::fs::write(dir.path().join("c.java"), "class C {}").expect("write c");

        let groups = Pipeline::walk_and_group(
            dir.path(),
            &PipelineConfig {
                max_files: 2,
                ..PipelineConfig::default()
            },
        );
        let accepted = groups.values().map(Vec::len).sum::<usize>();

        assert_eq!(accepted, 2);
    }

    // ── Python fixture ──────────────────────────────────────────────

    #[test]
+37 −0
Original line number Diff line number Diff line
@@ -164,6 +164,12 @@ handlers:
    concurrency_group: code
    max_attempts: 5
    retry_interval_secs: 60
    pipeline:
      max_file_size_bytes: 10000000
      max_files: 200000
      respect_gitignore: false
      worker_threads: 2
      max_concurrent_languages: 3
  namespace-deletion:
    concurrency_group: code
    max_attempts: 1
@@ -203,6 +209,37 @@ handlers:
            engine.handlers.code_indexing_task.engine.max_attempts,
            Some(5)
        );
        assert_eq!(
            engine
                .handlers
                .code_indexing_task
                .pipeline
                .max_file_size_bytes,
            10_000_000
        );
        assert_eq!(
            engine.handlers.code_indexing_task.pipeline.max_files,
            200_000
        );
        assert!(
            !engine
                .handlers
                .code_indexing_task
                .pipeline
                .respect_gitignore
        );
        assert_eq!(
            engine.handlers.code_indexing_task.pipeline.worker_threads,
            2
        );
        assert_eq!(
            engine
                .handlers
                .code_indexing_task
                .pipeline
                .max_concurrent_languages,
            3
        );
        assert_eq!(
            engine
                .handlers
+41 −0
Original line number Diff line number Diff line
@@ -142,10 +142,51 @@ impl Default for NamespaceHandlerConfig {
    }
}

fn default_code_indexing_max_file_size_bytes() -> u64 {
    5_000_000
}

fn default_code_indexing_max_files() -> usize {
    1_000_000
}

fn default_code_indexing_respect_gitignore() -> bool {
    true
}

#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
#[schemars(deny_unknown_fields)]
pub struct CodeIndexingPipelineConfig {
    #[serde(default = "default_code_indexing_max_file_size_bytes")]
    pub max_file_size_bytes: u64,
    #[serde(default = "default_code_indexing_max_files")]
    pub max_files: usize,
    #[serde(default = "default_code_indexing_respect_gitignore")]
    pub respect_gitignore: bool,
    #[serde(default)]
    pub worker_threads: usize,
    #[serde(default)]
    pub max_concurrent_languages: usize,
}

impl Default for CodeIndexingPipelineConfig {
    fn default() -> Self {
        Self {
            max_file_size_bytes: default_code_indexing_max_file_size_bytes(),
            max_files: default_code_indexing_max_files(),
            respect_gitignore: default_code_indexing_respect_gitignore(),
            worker_threads: 0,
            max_concurrent_languages: 0,
        }
    }
}

#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
pub struct CodeIndexingTaskHandlerConfig {
    #[serde(flatten)]
    pub engine: HandlerConfiguration,
    #[serde(default)]
    pub pipeline: CodeIndexingPipelineConfig,
}

#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
Loading