Verified Commit 8575aa42 authored by Jean-Gabriel Doyon's avatar Jean-Gabriel Doyon Committed by GitLab
Browse files

feat(indexer): replace polling code reconciliation with event-driven backfill

parent f2908ac7
Loading
Loading
Loading
Loading
+8 −1
Original line number Diff line number Diff line
@@ -11,7 +11,7 @@ use gkg_server::shutdown;
use gkg_server::webserver::Server as HttpServer;
use indexer::IndexerConfig;
use indexer::checkpoint::ClickHouseCheckpointStore;
use indexer::modules::code::SiphonCodeIndexingTaskDispatcher;
use indexer::modules::code::{NamespaceCodeBackfillDispatcher, SiphonCodeIndexingTaskDispatcher};
use indexer::modules::namespace_deletion::{
    ClickHouseNamespaceDeletionStore, NamespaceDeletionScheduler, NamespaceDeletionStore,
};
@@ -75,6 +75,13 @@ async fn main() -> anyhow::Result<()> {
                    metrics.clone(),
                    config.schedule.tasks.code_indexing_task.clone(),
                )),
                Box::new(NamespaceCodeBackfillDispatcher::new(
                    services.nats.clone(),
                    config.datalake.build_client(),
                    config.graph.build_client(),
                    metrics.clone(),
                    config.schedule.tasks.namespace_code_backfill.clone(),
                )),
                Box::new(TableCleanup::new(
                    graph,
                    metrics.clone(),
+6 −7
Original line number Diff line number Diff line
@@ -4,9 +4,10 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;

use crate::modules::code::SiphonCodeIndexingTaskDispatcherConfig;
use crate::modules::code::dispatch::ProjectCodeDispatcherConfig;
use crate::modules::code::{CodeIndexingTaskHandlerConfig, ProjectCodeIndexingHandlerConfig};
use crate::modules::code::{
    CodeIndexingTaskHandlerConfig, NamespaceCodeBackfillDispatcherConfig,
    SiphonCodeIndexingTaskDispatcherConfig,
};
use crate::modules::namespace_deletion::{
    NamespaceDeletionHandlerConfig, NamespaceDeletionSchedulerConfig,
};
@@ -61,8 +62,6 @@ pub struct HandlersConfiguration {
    #[serde(default)]
    pub code_indexing_task: CodeIndexingTaskHandlerConfig,
    #[serde(default)]
    pub code_project_reconciliation: ProjectCodeIndexingHandlerConfig,
    #[serde(default)]
    pub namespace_deletion: NamespaceDeletionHandlerConfig,
}

@@ -92,10 +91,10 @@ pub struct ScheduledTasksConfiguration {
    #[serde(default)]
    pub namespace: NamespaceDispatcherConfig,
    #[serde(default)]
    pub project_code: ProjectCodeDispatcherConfig,
    #[serde(default)]
    pub code_indexing_task: SiphonCodeIndexingTaskDispatcherConfig,
    #[serde(default)]
    pub namespace_code_backfill: NamespaceCodeBackfillDispatcherConfig,
    #[serde(default)]
    pub table_cleanup: TableCleanupConfig,
    #[serde(default)]
    pub namespace_deletion: NamespaceDeletionSchedulerConfig,
+1 −0
Original line number Diff line number Diff line
@@ -122,6 +122,7 @@ impl CodeCheckpointStore for ClickHouseCodeCheckpointStore {
            WHERE traversal_path = {traversal_path:String}
              AND project_id = {project_id:Int64}
              AND branch = {branch:String}
            HAVING count() > 0
        "#;

        let batches = self
+59 −12
Original line number Diff line number Diff line
@@ -11,6 +11,7 @@ use super::config::CODE_LOCK_TTL;
use super::indexing_pipeline::{CodeIndexingPipeline, IndexingRequest};
use super::locking::project_lock_key;
use super::metrics::CodeMetrics;
use super::repository_service::RepositoryService;
use crate::configuration::HandlerConfiguration;
use crate::handler::{Handler, HandlerContext, HandlerError};
use crate::topic::CodeIndexingTaskRequest;
@@ -24,6 +25,7 @@ pub struct CodeIndexingTaskHandlerConfig {

pub struct CodeIndexingTaskHandler {
    pipeline: Arc<CodeIndexingPipeline>,
    repository_service: Arc<dyn RepositoryService>,
    checkpoint_store: Arc<dyn CodeCheckpointStore>,
    metrics: CodeMetrics,
    config: CodeIndexingTaskHandlerConfig,
@@ -32,12 +34,14 @@ pub struct CodeIndexingTaskHandler {
impl CodeIndexingTaskHandler {
    pub fn new(
        pipeline: Arc<CodeIndexingPipeline>,
        repository_service: Arc<dyn RepositoryService>,
        checkpoint_store: Arc<dyn CodeCheckpointStore>,
        metrics: CodeMetrics,
        config: CodeIndexingTaskHandlerConfig,
    ) -> Self {
        Self {
            pipeline,
            repository_service,
            checkpoint_store,
            metrics,
            config,
@@ -70,7 +74,7 @@ impl Handler for CodeIndexingTaskHandler {
        debug!(
            task_id = request.task_id,
            project_id = request.project_id,
            branch = %request.branch,
            branch = ?request.branch,
            "received code indexing task"
        );

@@ -79,6 +83,25 @@ impl Handler for CodeIndexingTaskHandler {
}

impl CodeIndexingTaskHandler {
    async fn resolve_branch(
        &self,
        request: &CodeIndexingTaskRequest,
    ) -> Result<String, HandlerError> {
        match &request.branch {
            Some(branch) => Ok(branch.clone()),
            None => {
                let project_info = self
                    .repository_service
                    .project_info(request.project_id)
                    .await
                    .map_err(|e| {
                        HandlerError::Processing(format!("failed to fetch project info: {e}"))
                    })?;
                Ok(project_info.default_branch)
            }
        }
    }

    async fn process_task(
        &self,
        context: &HandlerContext,
@@ -86,7 +109,9 @@ impl CodeIndexingTaskHandler {
    ) -> Result<(), HandlerError> {
        let started_at = Instant::now();

        if self.is_already_indexed(request).await {
        let branch = self.resolve_branch(request).await?;

        if self.is_already_indexed(request, &branch).await {
            self.metrics.record_outcome("skipped_checkpoint");
            return Ok(());
        }
@@ -94,11 +119,11 @@ impl CodeIndexingTaskHandler {
        info!(
            task_id = request.task_id,
            project_id = request.project_id,
            branch = %request.branch,
            branch = %branch,
            "starting code indexing"
        );

        let result = self.index_with_lock(context, request).await;
        let result = self.index_with_lock(context, request, &branch).await;

        let outcome = if result.is_ok() { "indexed" } else { "error" };
        self.metrics.record_outcome(outcome);
@@ -113,9 +138,9 @@ impl CodeIndexingTaskHandler {
        &self,
        context: &HandlerContext,
        request: &CodeIndexingTaskRequest,
        branch: &str,
    ) -> Result<(), HandlerError> {
        let project_id = request.project_id;
        let branch = &request.branch;

        if !self.try_acquire_lock(context, project_id, branch).await? {
            debug!(
@@ -137,7 +162,7 @@ impl CodeIndexingTaskHandler {
                    branch: branch.to_string(),
                    traversal_path: request.traversal_path.clone(),
                    task_id: request.task_id,
                    commit_sha: Some(request.commit_sha.clone()),
                    commit_sha: request.commit_sha.clone(),
                },
            )
            .await;
@@ -155,10 +180,10 @@ impl CodeIndexingTaskHandler {
}

impl CodeIndexingTaskHandler {
    async fn is_already_indexed(&self, request: &CodeIndexingTaskRequest) -> bool {
    async fn is_already_indexed(&self, request: &CodeIndexingTaskRequest, branch: &str) -> bool {
        if let Ok(Some(checkpoint)) = self
            .checkpoint_store
            .get_checkpoint(&request.traversal_path, request.project_id, &request.branch)
            .get_checkpoint(&request.traversal_path, request.project_id, branch)
            .await
            && checkpoint.last_task_id >= request.task_id
        {
@@ -224,7 +249,8 @@ mod tests {

    impl TestContext {
        fn new() -> Self {
            let mock_repo = MockRepositoryService::with_default_branch(123, "main");
            let mock_repo: Arc<dyn RepositoryService> =
                MockRepositoryService::with_default_branch(123, "main");
            let mock_nats = Arc::new(MockNatsServices::new());
            let mock_locks = Arc::new(MockLockService::new());
            let mock_checkpoints = Arc::new(MockCodeCheckpointStore::new());
@@ -240,7 +266,7 @@ mod tests {
            );

            let pipeline = Arc::new(CodeIndexingPipeline::new(
                mock_repo,
                Arc::clone(&mock_repo),
                Arc::clone(&checkpoint_store),
                stale_data_cleaner,
                metrics.clone(),
@@ -249,6 +275,7 @@ mod tests {

            let handler = CodeIndexingTaskHandler::new(
                pipeline,
                mock_repo,
                Arc::clone(&checkpoint_store),
                metrics,
                CodeIndexingTaskHandlerConfig::default(),
@@ -275,8 +302,8 @@ mod tests {
            Envelope::new(&CodeIndexingTaskRequest {
                task_id,
                project_id,
                branch: branch.to_string(),
                commit_sha: "abc123".to_string(),
                branch: Some(branch.to_string()),
                commit_sha: Some("abc123".to_string()),
                traversal_path: format!("/org/project-{}", project_id),
            })
            .unwrap()
@@ -337,6 +364,26 @@ mod tests {
        assert!(result.is_ok());
    }

    #[tokio::test]
    async fn resolves_default_branch_when_branch_is_none() {
        let ctx = TestContext::new();
        ctx.set_checkpoint(123, "/org/project-123", "main", 100)
            .await;

        let envelope = Envelope::new(&CodeIndexingTaskRequest {
            task_id: 0,
            project_id: 123,
            branch: None,
            commit_sha: None,
            traversal_path: "/org/project-123".to_string(),
        })
        .unwrap();

        let result = ctx.handler.handle(ctx.handler_context(), envelope).await;
        assert!(result.is_ok());
        assert!(!ctx.lock_exists(123, "main"));
    }

    #[test]
    fn handler_name() {
        let ctx = TestContext::new();
+1 −0
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@ pub const CODE_LOCK_TTL: Duration = Duration::from_secs(60);

pub mod subjects {
    pub const CODE_INDEXING_TASKS: &str = "p_knowledge_graph_code_indexing_tasks";
    pub const KNOWLEDGE_GRAPH_ENABLED_NAMESPACES: &str = "knowledge_graph_enabled_namespaces";
}

/// ClickHouse table names for code graph entities, derived from the ontology.
Loading