Verified Commit 62db4eaf authored by Jean-Gabriel Doyon PTO until 2024-04-17's avatar Jean-Gabriel Doyon PTO until 2024-04-17 Committed by GitLab
Browse files

refactor(engine): replace string-keyed handler configs with typed structs

parent e0c8ff0d
Loading
Loading
Loading
Loading
+10 −21
Original line number Diff line number Diff line
@@ -4,11 +4,9 @@ use std::sync::Arc;

use chrono::{DateTime, Utc};
use indexer::clickhouse::ClickHouseDestination;
use indexer::handler::{Handler, HandlerContext};
use indexer::handler::{Handler, HandlerContext, HandlerRegistry};
use indexer::metrics::EngineMetrics;
use indexer::modules::create_sdlc_handlers;
use indexer::testkit::{MockLockService, MockNatsServices};
use std::collections::HashMap;
use indexer::testkit::{MockLockService, MockNatsServices, create_test_indexer_config};

pub use integration_testkit::{
    TestContext, get_boolean_column, get_int64_column, get_string_column, get_uint64_column,
@@ -20,7 +18,7 @@ pub const GRAPH_SCHEMA_SQL: &str = include_str!("../../../../fixtures/schema/gra
pub trait GkgServerTestExt {
    fn create_destination(&self) -> ClickHouseDestination;
    fn create_handler_context(&self) -> HandlerContext;
    async fn get_namespace_handler(&self) -> Box<dyn Handler>;
    async fn get_namespace_handler(&self) -> Arc<dyn Handler>;
    async fn assert_edge_count(
        &self,
        relationship_kind: &str,
@@ -44,24 +42,15 @@ impl GkgServerTestExt for TestContext {
        )
    }

    async fn get_namespace_handler(&self) -> Box<dyn Handler> {
        let handler_configs = HashMap::from([
            (
                "global-handler".to_string(),
                serde_json::json!({ "datalake_batch_size": 1 }),
            ),
            (
                "namespace-handler".to_string(),
                serde_json::json!({ "datalake_batch_size": 1 }),
            ),
        ]);
        let handlers = create_sdlc_handlers(&self.config, &self.config, &handler_configs)
    async fn get_namespace_handler(&self) -> Arc<dyn Handler> {
        let indexer_config = create_test_indexer_config(&self.config);
        let registry = HandlerRegistry::default();
        indexer::modules::sdlc::register_handlers(&registry, &indexer_config)
            .await
            .expect("failed to create SDLC handlers");
        handlers
            .into_iter()
            .find(|h| h.name() == "namespace-handler")
            .expect("namespace-handler not found")
        registry
            .find_by_name("namespace_handler")
            .expect("namespace_handler not found")
    }

    async fn assert_edge_count(
+6 −5
Original line number Diff line number Diff line
@@ -25,8 +25,8 @@ NATS JetStream → Engine → Handler Registry → ClickHouse

| Module | Directory | Purpose |
|--------|-----------|---------|
| `create_code_handlers` | `modules/code/` | Git repository indexing via Gitaly, call graph extraction |
| `create_sdlc_handlers` | `modules/sdlc/` | SDLC entity indexing (projects, MRs, CI, issues, etc.) |
| `code::register_handlers` | `modules/code/` | Git repository indexing via Gitaly, call graph extraction |
| `sdlc::register_handlers` | `modules/sdlc/` | SDLC entity indexing (projects, MRs, CI, issues, etc.) |

### Traits

@@ -36,9 +36,9 @@ NATS JetStream → Engine → Handler Registry → ClickHouse

### Entry point

The `run()` function in `lib.rs` wires everything together: connects to NATS and ClickHouse, creates handlers via `create_sdlc_handlers()` and `create_code_handlers()`, registers them in a `HandlerRegistry`, builds the engine, and runs until shutdown.
The `run()` function in `lib.rs` wires everything together: connects to NATS and ClickHouse, registers handlers via `sdlc::register_handlers()` and `code::register_handlers()`, builds the engine, and runs until shutdown.

`IndexerConfig` holds all configuration (NATS, ClickHouse graph/datalake, engine concurrency, Gitaly, code indexing).
`IndexerConfig` holds all configuration (NATS, ClickHouse graph/datalake, engine concurrency, handler configs, Gitaly). Handler configs are typed via `HandlersConfiguration` in `configuration.rs` — no string-keyed lookups.

## Development

@@ -66,7 +66,8 @@ Located in `testkit/`:

1. Define event type implementing `Event`
2. Create handler implementing `Handler` (including `engine_config()`)
3. Register in `create_sdlc_handlers()` or `create_code_handlers()`
3. Add a typed config field to `HandlersConfiguration` in `configuration.rs`
4. Register in `sdlc::register_handlers()` or `code::register_handlers()`

### Concurrency

+20 −5
Original line number Diff line number Diff line
@@ -4,6 +4,9 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;

use crate::modules::code::{ProjectCodeIndexingHandlerConfig, PushEventHandlerConfig};
use crate::modules::sdlc::{GlobalHandlerConfig, NamespaceHandlerConfig};

/// Per-handler engine configuration (retry policy, concurrency group).
///
/// Each handler embeds this via `#[serde(flatten)]` in its own typed config struct.
@@ -41,13 +44,26 @@ impl HandlerConfiguration {
    }
}

/// Typed per-handler configuration for all registered handlers.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct HandlersConfiguration {
    #[serde(default)]
    pub global_handler: GlobalHandlerConfig,
    #[serde(default)]
    pub namespace_handler: NamespaceHandlerConfig,
    #[serde(default)]
    pub code_push_event: PushEventHandlerConfig,
    #[serde(default)]
    pub code_project_reconciliation: ProjectCodeIndexingHandlerConfig,
}

/// ETL engine configuration.
///
/// # Defaults
///
/// - `max_concurrent_workers`: 16
/// - `concurrency_groups`: empty
/// - `handlers`: empty
/// - `handlers`: defaults for all handlers
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EngineConfiguration {
    /// Maximum concurrent message handlers across all modules. Defaults to 16.
@@ -59,10 +75,9 @@ pub struct EngineConfiguration {
    #[serde(default)]
    pub concurrency_groups: HashMap<String, usize>,

    /// Per-handler configuration, keyed by handler name.
    /// Each value is raw JSON that modules deserialize into their typed config structs.
    /// Per-handler configuration.
    #[serde(default)]
    pub handlers: HashMap<String, serde_json::Value>,
    pub handlers: HandlersConfiguration,
}

impl Default for EngineConfiguration {
@@ -70,7 +85,7 @@ impl Default for EngineConfiguration {
        EngineConfiguration {
            max_concurrent_workers: Self::default_max_concurrent_workers(),
            concurrency_groups: HashMap::new(),
            handlers: HashMap::new(),
            handlers: HandlersConfiguration::default(),
        }
    }
}
+10 −30
Original line number Diff line number Diff line
@@ -26,7 +26,6 @@ use std::sync::Arc;
use async_trait::async_trait;
use parking_lot::RwLock;
use thiserror::Error;
use tracing::info;

use crate::{
    configuration::HandlerConfiguration,
@@ -57,13 +56,6 @@ impl HandlerError {
    }
}

#[derive(Debug, Clone, Error)]
#[error("failed to create handler '{handler_name}': {reason}")]
pub struct HandlerCreationError {
    pub handler_name: String,
    pub reason: String,
}

/// Errors that can occur during handler initialization.
#[derive(Debug, Error)]
#[error("{0}")]
@@ -76,28 +68,6 @@ impl HandlerInitError {
    }
}

/// Deserializes a handler's config from the raw handler configs map.
///
/// Logs at info level when a handler key is missing and falls back to defaults.
/// Returns an error if the key exists but deserialization fails (invalid config).
pub fn deserialize_handler_config<T: serde::de::DeserializeOwned + Default>(
    handler_configs: &HashMap<String, serde_json::Value>,
    handler_name: &str,
) -> Result<T, HandlerInitError> {
    match handler_configs.get(handler_name) {
        Some(value) => serde_json::from_value(value.clone()).map_err(|e| {
            HandlerInitError::new(HandlerCreationError {
                handler_name: handler_name.to_string(),
                reason: e.to_string(),
            })
        }),
        None => {
            info!(handler = handler_name, "no config found, using defaults");
            Ok(T::default())
        }
    }
}

/// Context provided to handlers during message processing.
///
/// Contains shared resources that handlers need to process messages
@@ -193,6 +163,16 @@ impl HandlerRegistry {
        topics.sort_by(|a, b| (&*a.stream, &*a.subject).cmp(&(&*b.stream, &*b.subject)));
        topics
    }

    /// Finds a handler by name across all topics.
    pub fn find_by_name(&self, name: &str) -> Option<Arc<dyn Handler>> {
        self.handlers_by_topic
            .read()
            .values()
            .flatten()
            .find(|handler| handler.name() == name)
            .cloned()
    }
}

#[cfg(test)]
+18 −22
Original line number Diff line number Diff line
@@ -54,11 +54,10 @@ use clickhouse::ClickHouseDestination;
use configuration::EngineConfiguration;
use dispatcher::DispatchConfig;
use engine::EngineBuilder;
use gitlab_client::{GitlabClient, GitlabClientConfiguration};
use gitlab_client::GitlabClientConfiguration;
use handler::{HandlerInitError, HandlerRegistry};
use health::{HealthState, run_health_server};
use modules::sdlc::locking::INDEXING_LOCKS_BUCKET;
use modules::{create_code_handlers, create_sdlc_handlers};
use nats::{KvBucketConfig, NatsBroker, NatsConfiguration};
use thiserror::Error;
use tokio_util::sync::CancellationToken;
@@ -86,6 +85,20 @@ pub struct IndexerConfig {
    pub health_bind_address: SocketAddr,
}

impl Default for IndexerConfig {
    fn default() -> Self {
        Self {
            nats: NatsConfiguration::default(),
            graph: ClickHouseConfiguration::default(),
            datalake: ClickHouseConfiguration::default(),
            engine: EngineConfiguration::default(),
            gitlab: None,
            dispatch: DispatchConfig::default(),
            health_bind_address: default_health_bind_address(),
        }
    }
}

#[derive(Debug, Error)]
pub enum IndexerError {
    #[error("NATS connection failed: {0}")]
@@ -125,27 +138,10 @@ pub async fn run(config: &IndexerConfig, shutdown: CancellationToken) -> Result<
    let registry = Arc::new(HandlerRegistry::default());

    info!("initializing SDLC handlers");
    for handler in
        create_sdlc_handlers(&config.datalake, &config.graph, &config.engine.handlers).await?
    {
        registry.register_handler(handler);
    }
    modules::sdlc::register_handlers(&registry, config).await?;

    if let Some(gitlab_config) = &config.gitlab {
    info!("initializing Code handlers");
        let gitlab_client =
            Arc::new(GitlabClient::new(gitlab_config.clone()).map_err(HandlerInitError::new)?);
        for handler in create_code_handlers(
            &config.graph,
            &config.datalake,
            gitlab_client,
            &config.engine.handlers,
        )? {
            registry.register_handler(handler);
        }
    } else {
        info!("Code handlers disabled (GitLab client not configured)");
    }
    modules::code::register_handlers(&registry, config)?;

    info!(topics = registry.topics().len(), "registered handlers");

Loading