Verified Commit 5f576dcd authored by Dmitry Gruzd's avatar Dmitry Gruzd 2️⃣ Committed by GitLab
Browse files

feat(indexer): add table-prefix-aware migration orchestrator

parent 7a071d86
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -87,7 +87,7 @@ Single binary: `gkg-server` (4 modes: Webserver, Indexer, DispatchIndexing, Heal
| `query-engine/pipeline` | Pipeline abstraction (stages, observers, context) |
| `query-engine/shared` | Shared pipeline stages (compilation, extraction, output), virtual column resolution (`ColumnResolver` trait, `ColumnResolverRegistry`, `resolve_virtual_columns`) |
| `query-engine/formatters` | Result formatters (graph, raw row, goon) |
| `indexer` | NATS consumer, SDLC + code + namespace deletion handler modules, worker pools, scheduler, `testkit/`, schema version tracking (`schema_version.rs`) |
| `indexer` | NATS consumer, SDLC + code + namespace deletion handler modules, worker pools, scheduler, `testkit/`, schema version tracking (`schema_version.rs`), migration orchestrator (`schema_migration.rs`) |
| `ontology` | Loads/validates YAML ontology, query validation helpers |
| `code-graph` | Parent crate for code parsing and graph construction; re-exports `treesitter-visit`, `parser-core`, `code-graph-linker` |
| `code-graph/treesitter-visit` | Tree-sitter language bindings wrapper |
+1 −1
Original line number Diff line number Diff line
@@ -87,7 +87,7 @@ Single binary: `gkg-server` (4 modes: Webserver, Indexer, DispatchIndexing, Heal
| `query-engine/pipeline` | Pipeline abstraction (stages, observers, context) |
| `query-engine/shared` | Shared pipeline stages (compilation, extraction, output), virtual column resolution (`ColumnResolver` trait, `ColumnResolverRegistry`, `resolve_virtual_columns`) |
| `query-engine/formatters` | Result formatters (graph, raw row, goon) |
| `indexer` | NATS consumer, SDLC + code + namespace deletion handler modules, worker pools, scheduler, `testkit/`, schema version tracking (`schema_version.rs`) |
| `indexer` | NATS consumer, SDLC + code + namespace deletion handler modules, worker pools, scheduler, `testkit/`, schema version tracking (`schema_version.rs`), migration orchestrator (`schema_migration.rs`) |
| `ontology` | Loads/validates YAML ontology, query validation helpers |
| `code-graph` | Parent crate for code parsing and graph construction; re-exports `treesitter-visit`, `parser-core`, `code-graph-linker` |
| `code-graph/treesitter-visit` | Tree-sitter language bindings wrapper |
+91 −0
Original line number Diff line number Diff line
@@ -92,6 +92,7 @@ impl AppConfig {
            .add_source(SecretFileSource::new(secret_dir))
            .add_source(
                config::Environment::with_prefix("GKG")
                    .prefix_separator("_")
                    .separator("__")
                    .try_parsing(true),
            )
@@ -267,4 +268,94 @@ handlers:
            Some("env-secret-at-least-32-bytes-long")
        );
    }

    /// Verifies `prefix_separator("_")` is required for `GKG_GRAPH__DATABASE`
    /// style env vars to work with real process env vars.
    ///
    /// Without it, the config crate defaults the prefix separator to the
    /// hierarchy separator (`__`), so it looks for `GKG__GRAPH__DATABASE`
    /// (double underscore after prefix) and silently ignores
    /// `GKG_GRAPH__DATABASE` (single underscore).
    ///
    /// Uses a subprocess since env vars must be set before config loading
    /// and `std::env::set_var` is unsafe in multi-threaded test runners.
    #[test]
    fn real_env_vars_override_yaml_defaults() {
        let test_bin = std::env::current_exe().unwrap();
        // The test binary must run from the workspace root so config/default.yaml is found.
        let workspace_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
            .parent()
            .unwrap()
            .parent()
            .unwrap();
        let output = std::process::Command::new(&test_bin)
            .current_dir(workspace_root)
            .arg("app::tests::subprocess_env_config_loader")
            .arg("--ignored")
            .arg("--exact")
            .arg("--nocapture")
            .env("GKG_GRAPH__DATABASE", "env_graph_db")
            .env("GKG_DATALAKE__DATABASE", "env_datalake_db")
            .env("GKG_NATS__URL", "nats://env-host:4222")
            .output()
            .expect("failed to spawn subprocess");

        let stdout = String::from_utf8_lossy(&output.stdout);
        let stderr = String::from_utf8_lossy(&output.stderr);

        // Parse the JSON line printed by the inner test.
        let json_line = stdout
            .lines()
            .find(|l| l.starts_with('{'))
            .unwrap_or_else(|| {
                panic!(
                    "inner test did not print JSON.\nstdout: {stdout}\nstderr: {stderr}\nexit: {}",
                    output.status
                )
            });
        let values: serde_json::Value =
            serde_json::from_str(json_line).expect("inner test should print valid JSON");

        // If the inner test returned an error, fail with it.
        if let Some(err) = values.get("error") {
            panic!("inner test config load failed: {err}");
        }

        assert_eq!(
            values["graph_database"], "env_graph_db",
            "GKG_GRAPH__DATABASE should override config/default.yaml"
        );
        assert_eq!(
            values["datalake_database"], "env_datalake_db",
            "GKG_DATALAKE__DATABASE should override config/default.yaml"
        );
        assert_eq!(
            values["nats_url"], "nats://env-host:4222",
            "GKG_NATS__URL should override config/default.yaml"
        );
    }

    /// Subprocess helper: loads config with real process env vars and prints
    /// the resolved values as JSON. Only runs when called by the outer test.
    #[test]
    #[ignore]
    fn subprocess_env_config_loader() {
        let dir = tempfile::TempDir::new().unwrap();
        let config = match AppConfig::load_with_secret_dir(dir.path().to_str().unwrap()) {
            Ok(c) => c,
            Err(e) => {
                println!("{}", serde_json::json!({"error": e.to_string()}));
                return;
            }
        };

        println!(
            "{}",
            serde_json::json!({
                "graph_database": config.graph.database,
                "datalake_database": config.datalake.database,
                "nats_url": config.nats.url,
            })
        );
    }
}
+3 −3
Original line number Diff line number Diff line
@@ -11,7 +11,7 @@ use gkg_server::health_check as health_check_mode;
use gkg_server::shutdown;
use gkg_server::webserver::Server as HttpServer;
use gkg_server_config::AppConfig;
use indexer::schema_version;
use indexer::schema;
use indexer::{DispatcherConfig, IndexerConfig};
use query_engine::compiler::input::QueryType;
use strum::VariantNames;
@@ -67,7 +67,7 @@ async fn main() -> anyhow::Result<()> {
            config.schema.validate()?;
            let graph = config.graph.build_client();
            info!("initializing schema version table");
            schema_version::init(&graph).await?;
            schema::version::init(&graph).await?;

            let dispatcher_config = DispatcherConfig {
                nats: config.nats.clone(),
@@ -100,7 +100,7 @@ async fn main() -> anyhow::Result<()> {
            config.schema.validate()?;
            let graph = config.graph.build_client();
            info!("initializing schema version table");
            schema_version::init(&graph).await?;
            schema::version::init(&graph).await?;
            run_webserver(&config, ontology).await
        }
    };
+14 −1
Original line number Diff line number Diff line
@@ -35,9 +35,22 @@ NATS JetStream → Engine → Handler Registry → ClickHouse
- **Destination**: Provides BatchWriter or StreamWriter
- **Event**: Type-safe message serialization

### Schema migration

Before the NATS engine starts, `schema_migration::run_if_needed()` compares the embedded
`SCHEMA_VERSION` with the active version in ClickHouse. On a mismatch, it acquires a NATS KV
distributed lock, generates DDL from the ontology via `generate_graph_tables_with_prefix()`,
creates new-prefix ClickHouse tables, and marks the new version as `migrating`. All write paths
(checkpoints, namespace deletion, ontology-driven tables) use
`prefixed_table_name(table, SCHEMA_VERSION)` so they always target the current schema version's
table-set.

### Entry point

The `run()` function in `lib.rs` wires everything together: connects to NATS and ClickHouse, registers handlers via `sdlc::register_handlers()`, `code::register_handlers()`, and `namespace_deletion::register_handlers()`, builds the engine, and runs until shutdown.
The `run()` function in `lib.rs` wires everything together: runs the migration orchestrator,
connects to NATS and ClickHouse, registers handlers via `sdlc::register_handlers()`,
`code::register_handlers()`, and `namespace_deletion::register_handlers()`, builds the engine,
and runs until shutdown.

`IndexerConfig` holds all configuration (NATS, ClickHouse graph/datalake, engine concurrency, handler configs, GitLab client). Handler configs are typed via `HandlersConfiguration` in `configuration.rs` — no string-keyed lookups.

Loading