Verified Commit 0970e23a authored by Bohdan Parkhomchuk's avatar Bohdan Parkhomchuk 💬 Committed by GitLab
Browse files

fix(indexer): backfill code indexing during schema migration

parent e0968761
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -314,6 +314,7 @@ pub async fn run_dispatcher(
        )),
        Box::new(NamespaceCodeBackfillDispatcher::new(
            services.nats.clone(),
            config.graph.build_client(),
            config.datalake.build_client(),
            metrics.clone(),
            config.schedule.tasks.namespace_code_backfill.clone(),
+101 −7
Original line number Diff line number Diff line
@@ -10,6 +10,7 @@ use super::siphon_decoder::{ColumnExtractor, decode_logical_replication_events};
use crate::clickhouse::ArrowClickHouseClient;
use crate::nats::NatsServices;
use crate::scheduler::{ScheduledTask, ScheduledTaskMetrics, TaskError};
use crate::schema::version::read_migrating_version;
use crate::topic::CodeIndexingTaskRequest;
use crate::types::{Envelope, Subscription};
use clickhouse_client::FromArrowColumn;
@@ -30,8 +31,17 @@ WHERE deleted = false
  AND startsWith(traversal_path, {traversal_path:String})
"#;

/// All enabled namespace IDs from the datalake. Used to trigger a full
/// backfill when a schema migration is in progress.
const ENABLED_NAMESPACE_IDS_QUERY: &str = r#"
SELECT DISTINCT root_namespace_id
FROM siphon_knowledge_graph_enabled_namespaces
WHERE _siphon_deleted = false
"#;

pub struct NamespaceCodeBackfillDispatcher {
    nats: Arc<dyn NatsServices>,
    graph: ArrowClickHouseClient,
    datalake: ArrowClickHouseClient,
    metrics: ScheduledTaskMetrics,
    config: NamespaceCodeBackfillDispatcherConfig,
@@ -40,12 +50,14 @@ pub struct NamespaceCodeBackfillDispatcher {
impl NamespaceCodeBackfillDispatcher {
    pub fn new(
        nats: Arc<dyn NatsServices>,
        graph: ArrowClickHouseClient,
        datalake: ArrowClickHouseClient,
        metrics: ScheduledTaskMetrics,
        config: NamespaceCodeBackfillDispatcherConfig,
    ) -> Self {
        Self {
            nats,
            graph,
            datalake,
            metrics,
            config,
@@ -100,6 +112,13 @@ struct PendingProject {

impl NamespaceCodeBackfillDispatcher {
    async fn dispatch_inner(&self) -> Result<(), TaskError> {
        self.dispatch_cdc_events().await?;
        self.dispatch_migration_backfill().await?;
        Ok(())
    }

    /// Consume CDC events for newly-enabled namespaces and dispatch backfill.
    async fn dispatch_cdc_events(&self) -> Result<(), TaskError> {
        let subscription = self.siphon_subscription();
        let mut total = DispatchOutcome {
            dispatched: 0,
@@ -152,6 +171,77 @@ impl NamespaceCodeBackfillDispatcher {
        Ok(())
    }

    /// When a schema migration is in progress, dispatch code backfill for
    /// all enabled namespaces so the new-prefix checkpoint table gets
    /// populated. Without this, migration completion blocks indefinitely
    /// because the code checkpoint table stays empty.
    async fn dispatch_migration_backfill(&self) -> Result<(), TaskError> {
        let migrating = match read_migrating_version(&self.graph).await {
            Ok(v) => v,
            Err(e) => {
                self.metrics.record_error(self.name(), "migration_status");
                warn!(error = %e, "failed to check migration status, skipping migration backfill");
                return Ok(());
            }
        };

        let Some(version) = migrating else {
            return Ok(());
        };

        let namespace_ids = self.fetch_enabled_namespace_ids().await?;
        if namespace_ids.is_empty() {
            return Ok(());
        }

        info!(
            version,
            namespace_count = namespace_ids.len(),
            "schema migration in progress, dispatching code backfill for all enabled namespaces"
        );

        let mut total = DispatchOutcome {
            dispatched: 0,
            skipped: 0,
        };

        for namespace_id in &namespace_ids {
            let outcome = self.dispatch_projects_code_indexing(*namespace_id).await?;
            total.dispatched += outcome.dispatched;
            total.skipped += outcome.skipped;
        }

        if total.dispatched > 0 || total.skipped > 0 {
            self.metrics
                .record_requests_published(self.name(), total.dispatched);
            self.metrics
                .record_requests_skipped(self.name(), total.skipped);

            info!(
                version,
                dispatched = total.dispatched,
                skipped = total.skipped,
                "dispatched migration code backfill requests"
            );
        }

        Ok(())
    }

    async fn fetch_enabled_namespace_ids(&self) -> Result<Vec<i64>, TaskError> {
        let batches = self
            .datalake
            .query(ENABLED_NAMESPACE_IDS_QUERY)
            .fetch_arrow()
            .await
            .map_err(|error| {
                self.metrics.record_error(self.name(), "query");
                TaskError::new(error)
            })?;

        i64::extract_column(&batches, 0).map_err(TaskError::new)
    }

    fn extract_namespace_ids(
        &self,
        messages: &[crate::nats::NatsMessage],
@@ -331,15 +421,14 @@ mod tests {
    }

    fn create_dispatcher(nats: Arc<MockNatsServices>) -> NamespaceCodeBackfillDispatcher {
        let datalake = ArrowClickHouseClient::new(
            "http://localhost:0",
            "default",
            "default",
            None,
            &std::collections::HashMap::new(),
        );
        let empty = &std::collections::HashMap::new();
        let graph =
            ArrowClickHouseClient::new("http://localhost:0", "default", "default", None, empty);
        let datalake =
            ArrowClickHouseClient::new("http://localhost:0", "default", "default", None, empty);
        NamespaceCodeBackfillDispatcher::new(
            nats,
            graph,
            datalake,
            test_metrics(),
            NamespaceCodeBackfillDispatcherConfig::default(),
@@ -403,6 +492,11 @@ mod tests {
        assert_eq!(ids, vec![100, 200]);
    }

    #[test]
    fn enabled_namespace_query_filters_deleted() {
        assert!(ENABLED_NAMESPACE_IDS_QUERY.contains("_siphon_deleted = false"));
    }

    #[tokio::test]
    async fn extracts_namespace_ids_from_snapshot_events() {
        let nats = Arc::new(MockNatsServices::new());
+243 −0
Original line number Diff line number Diff line
//! Integration test: migration-triggered code backfill.
//!
//! When a schema migration is in progress (`gkg_schema_version` has a
//! `migrating` row), `NamespaceCodeBackfillDispatcher` must dispatch
//! code indexing tasks for **all** enabled namespaces, not only for
//! newly-enabled ones arriving via CDC events.

use std::collections::HashSet;

use clickhouse_client::ClickHouseConfigurationExt;
use gkg_server_config::{
    NamespaceCodeBackfillDispatcherConfig, NatsConfiguration, ScheduleConfiguration,
};
use indexer::modules::code::NamespaceCodeBackfillDispatcher;
use indexer::scheduler::{ScheduledTask, ScheduledTaskMetrics};
use indexer::schema::version::{
    ensure_version_table, write_migrating_version, write_schema_version,
};
use indexer::topic::{CODE_INDEXING_TASK_SUBJECT_PATTERN, INDEXER_STREAM};
use serde::Deserialize;
use testcontainers::ImageExt;
use testcontainers::core::{ContainerPort, WaitFor};
use testcontainers::runners::AsyncRunner;
use testcontainers_modules::nats::{Nats, NatsServerCmd};

use super::super::common;
use common::TestContext as ClickHouseContext;

#[derive(Deserialize)]
struct CodeIndexingRequest {
    task_id: i64,
    project_id: i64,
}

struct TestContext {
    clickhouse: ClickHouseContext,
    _nats: testcontainers::ContainerAsync<Nats>,
    nats_url: String,
}

impl TestContext {
    async fn new() -> Self {
        let clickhouse =
            ClickHouseContext::new(&[common::SIPHON_SCHEMA_SQL, *common::GRAPH_SCHEMA_SQL]).await;
        let (nats, nats_url) = Self::start_nats().await;
        Self::create_streams(&nats_url).await;
        Self {
            clickhouse,
            _nats: nats,
            nats_url,
        }
    }

    fn nats_config(&self) -> NatsConfiguration {
        NatsConfiguration {
            url: self.nats_url.clone(),
            ..Default::default()
        }
    }

    async fn given_enabled_namespaces(&self, namespace_ids: impl IntoIterator<Item = i64>) {
        for (i, ns_id) in namespace_ids.into_iter().enumerate() {
            self.clickhouse
                .execute(&format!(
                    "INSERT INTO siphon_knowledge_graph_enabled_namespaces \
                     (id, root_namespace_id, created_at, updated_at) \
                     VALUES ({}, {ns_id}, now(), now())",
                    i + 1
                ))
                .await;
        }
    }

    async fn consume_code_indexing_requests(&self) -> Vec<CodeIndexingRequest> {
        use futures::StreamExt;

        let client = async_nats::connect(format!("nats://{}", self.nats_url))
            .await
            .unwrap();
        let jetstream = async_nats::jetstream::new(client);

        let consumer = jetstream
            .create_consumer_on_stream(
                async_nats::jetstream::consumer::pull::Config {
                    filter_subject: CODE_INDEXING_TASK_SUBJECT_PATTERN.into(),
                    ..Default::default()
                },
                INDEXER_STREAM,
            )
            .await
            .unwrap();

        let mut messages = consumer.fetch().max_messages(100).messages().await.unwrap();
        let mut results = Vec::new();

        while let Some(Ok(msg)) = messages.next().await {
            results.push(serde_json::from_slice(&msg.payload).unwrap());
            msg.ack().await.unwrap();
        }

        results
    }

    async fn start_nats() -> (testcontainers::ContainerAsync<Nats>, String) {
        let container = Nats::default()
            .with_cmd(&NatsServerCmd::default().with_jetstream())
            .with_tag("2.11-alpine")
            .with_mapped_port(0, ContainerPort::Tcp(4222))
            .with_ready_conditions(vec![WaitFor::seconds(3)])
            .start()
            .await
            .unwrap();

        let host = container.get_host().await.unwrap();
        let port = container.get_host_port_ipv4(4222).await.unwrap();

        (container, format!("{host}:{port}"))
    }

    async fn create_streams(url: &str) {
        let client = async_nats::connect(format!("nats://{url}")).await.unwrap();
        let jetstream = async_nats::jetstream::new(client);

        // Stream for dispatched code indexing tasks.
        jetstream
            .create_stream(async_nats::jetstream::stream::Config {
                name: INDEXER_STREAM.into(),
                subjects: vec![CODE_INDEXING_TASK_SUBJECT_PATTERN.into()],
                retention: async_nats::jetstream::stream::RetentionPolicy::WorkQueue,
                max_messages_per_subject: 1,
                discard: async_nats::jetstream::stream::DiscardPolicy::New,
                discard_new_per_subject: true,
                ..Default::default()
            })
            .await
            .unwrap();

        // Siphon stream consumed by the CDC path of the backfill dispatcher.
        jetstream
            .create_stream(async_nats::jetstream::stream::Config {
                name: "siphon_stream_main_db".into(),
                subjects: vec!["siphon_stream_main_db.>".into()],
                ..Default::default()
            })
            .await
            .unwrap();
    }
}

#[tokio::test]
async fn migration_triggers_backfill_for_all_enabled_namespaces() {
    let context = TestContext::new().await;

    // Seed: two namespaces with projects.
    common::create_namespace(&context.clickhouse, 100, None, 20, "1/100/").await;
    common::create_namespace(&context.clickhouse, 200, None, 20, "1/200/").await;
    common::create_project(&context.clickhouse, 10, 100, 1, 20, "1/100/10/").await;
    common::create_project(&context.clickhouse, 20, 200, 1, 20, "1/200/20/").await;
    common::create_project(&context.clickhouse, 21, 200, 1, 20, "1/200/21/").await;
    context.given_enabled_namespaces([100, 200]).await;

    // Put a migrating version into gkg_schema_version.
    let graph = context.clickhouse.create_client();
    ensure_version_table(&graph).await.unwrap();
    write_schema_version(&graph, 0).await.unwrap();
    write_migrating_version(&graph, 1).await.unwrap();

    // Build and run the dispatcher once.
    let services = indexer::scheduler::connect(&context.nats_config())
        .await
        .unwrap();

    let task: Box<dyn ScheduledTask> = Box::new(NamespaceCodeBackfillDispatcher::new(
        services.nats.clone(),
        context.clickhouse.create_client(),
        context.clickhouse.config.build_client(),
        ScheduledTaskMetrics::new(),
        NamespaceCodeBackfillDispatcherConfig {
            schedule: ScheduleConfiguration::default(),
            ..Default::default()
        },
    ));

    indexer::scheduler::run_once(&[task], &*services.lock_service)
        .await
        .unwrap();

    // Verify: code indexing tasks dispatched for all 3 projects.
    let requests = context.consume_code_indexing_requests().await;
    let project_ids: HashSet<i64> = requests.iter().map(|r| r.project_id).collect();

    assert_eq!(
        project_ids,
        HashSet::from([10, 20, 21]),
        "expected backfill for all projects in enabled namespaces"
    );

    // All backfill requests should have task_id=0 (backfill marker).
    assert!(
        requests.iter().all(|r| r.task_id == 0),
        "migration backfill requests should use task_id=0"
    );
}

#[tokio::test]
async fn no_backfill_without_migrating_version() {
    let context = TestContext::new().await;

    // Seed namespace with project, but no migrating version.
    common::create_namespace(&context.clickhouse, 100, None, 20, "1/100/").await;
    common::create_project(&context.clickhouse, 10, 100, 1, 20, "1/100/10/").await;
    context.given_enabled_namespaces([100]).await;

    let graph = context.clickhouse.create_client();
    ensure_version_table(&graph).await.unwrap();
    write_schema_version(&graph, 0).await.unwrap();

    let services = indexer::scheduler::connect(&context.nats_config())
        .await
        .unwrap();

    let task: Box<dyn ScheduledTask> = Box::new(NamespaceCodeBackfillDispatcher::new(
        services.nats.clone(),
        context.clickhouse.create_client(),
        context.clickhouse.config.build_client(),
        ScheduledTaskMetrics::new(),
        NamespaceCodeBackfillDispatcherConfig {
            schedule: ScheduleConfiguration::default(),
            ..Default::default()
        },
    ));

    indexer::scheduler::run_once(&[task], &*services.lock_service)
        .await
        .unwrap();

    // No code indexing requests should be dispatched.
    let requests = context.consume_code_indexing_requests().await;
    assert!(
        requests.is_empty(),
        "no backfill expected when no migration in progress"
    );
}
+1 −0
Original line number Diff line number Diff line
mod migration;
mod migration_backfill;