Verified Commit 77586ca8 authored by Jean-Gabriel Doyon's avatar Jean-Gabriel Doyon Committed by GitLab
Browse files

feat(indexer): decouple code indexing dispatch from handler via NATS

parent 8165914c
Loading
Loading
Loading
Loading
+3 −4
Original line number Diff line number Diff line
@@ -11,7 +11,7 @@ use gkg_server::shutdown;
use gkg_server::webserver::Server as HttpServer;
use indexer::IndexerConfig;
use indexer::checkpoint::ClickHouseCheckpointStore;
use indexer::modules::code::dispatch::ProjectCodeDispatcher;
use indexer::modules::code::SiphonCodeIndexingTaskDispatcher;
use indexer::modules::namespace_deletion::{
    ClickHouseNamespaceDeletionStore, NamespaceDeletionScheduler, NamespaceDeletionStore,
};
@@ -70,11 +70,10 @@ async fn main() -> anyhow::Result<()> {
                    metrics.clone(),
                    config.schedule.tasks.namespace.clone(),
                )),
                Box::new(ProjectCodeDispatcher::new(
                Box::new(SiphonCodeIndexingTaskDispatcher::new(
                    services.nats.clone(),
                    graph.clone(),
                    metrics.clone(),
                    config.schedule.tasks.project_code.clone(),
                    config.schedule.tasks.code_indexing_task.clone(),
                )),
                Box::new(TableCleanup::new(
                    graph,
+3 −0
Original line number Diff line number Diff line
@@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;

use crate::modules::code::SiphonCodeIndexingTaskDispatcherConfig;
use crate::modules::code::dispatch::ProjectCodeDispatcherConfig;
use crate::modules::code::{CodeIndexingTaskHandlerConfig, ProjectCodeIndexingHandlerConfig};
use crate::modules::namespace_deletion::{
@@ -93,6 +94,8 @@ pub struct ScheduledTasksConfiguration {
    #[serde(default)]
    pub project_code: ProjectCodeDispatcherConfig,
    #[serde(default)]
    pub code_indexing_task: SiphonCodeIndexingTaskDispatcherConfig,
    #[serde(default)]
    pub table_cleanup: TableCleanupConfig,
    #[serde(default)]
    pub namespace_deletion: NamespaceDeletionSchedulerConfig,
+66 −172
Original line number Diff line number Diff line
//! Handler for processing code indexing tasks dispatched by Rails.

use std::sync::Arc;
use std::time::Instant;

use async_trait::async_trait;
use siphon_proto::replication_event::Operation;
use tracing::{debug, info, warn};

use serde::{Deserialize, Serialize};

use super::checkpoint_store::CodeCheckpointStore;
use super::config::CODE_LOCK_TTL;
use super::config::subjects;
use super::indexing_pipeline::{CodeIndexingPipeline, IndexingRequest};
use super::locking::project_lock_key;
use super::metrics::{CodeMetrics, RecordStageError};
use super::siphon_decoder::{ColumnExtractor, decode_logical_replication_events};
use super::metrics::CodeMetrics;
use crate::configuration::HandlerConfiguration;
use crate::handler::{Handler, HandlerContext, HandlerError};
use crate::types::{Envelope, Subscription};

fn default_events_stream_name() -> String {
    "siphon_stream_main_db".to_string()
}
use crate::topic::CodeIndexingTaskRequest;
use crate::types::{Envelope, Event, Subscription};

#[derive(Debug, Clone, Deserialize, Serialize)]
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct CodeIndexingTaskHandlerConfig {
    #[serde(flatten)]
    pub engine: HandlerConfiguration,

    #[serde(default = "default_events_stream_name")]
    pub events_stream_name: String,
}

impl Default for CodeIndexingTaskHandlerConfig {
    fn default() -> Self {
        Self {
            engine: HandlerConfiguration::default(),
            events_stream_name: default_events_stream_name(),
        }
    }
}

pub struct CodeIndexingTaskHandler {
@@ -72,16 +52,7 @@ impl Handler for CodeIndexingTaskHandler {
    }

    fn subscription(&self) -> Subscription {
        Subscription::new(
            self.config.events_stream_name.clone(),
            format!(
                "{}.{}",
                self.config.events_stream_name,
                subjects::CODE_INDEXING_TASKS
            ),
        )
        .manage_stream(false)
        .dead_letter_on_exhaustion(true)
        CodeIndexingTaskRequest::subscription().dead_letter_on_exhaustion(true)
    }

    fn engine_config(&self) -> &HandlerConfiguration {
@@ -89,43 +60,21 @@ impl Handler for CodeIndexingTaskHandler {
    }

    async fn handle(&self, context: HandlerContext, message: Envelope) -> Result<(), HandlerError> {
        debug!(message_id = %message.id.0, "received code indexing task");

        let replication_events = decode_logical_replication_events(&message.payload)
            .inspect_err(|e| warn!(message_id = %message.id.0, error = %e, "failed to decode code indexing task"))
            .map_err(HandlerError::Processing)
            .record_error_stage(&self.metrics, "decode")?;

        let extractor = ColumnExtractor::new(&replication_events);

        for event in &replication_events.events {
            if event.operation == Operation::InitialSnapshot as i32 {
                debug!("skipping initial snapshot event");
                continue;
            }

            let Some(task) = CodeIndexingTask::extract(&extractor, event) else {
                debug!("failed to extract code indexing task, skipping");
                continue;
            };
        let request: CodeIndexingTaskRequest =
            serde_json::from_slice(&message.payload).map_err(|e| {
                HandlerError::Processing(format!(
                    "failed to deserialize code indexing task request: {e}"
                ))
            })?;

        debug!(
                task_id = task.id,
                project_id = task.project_id,
                "processing code indexing task"
            task_id = request.task_id,
            project_id = request.project_id,
            branch = %request.branch,
            "received code indexing task"
        );

            if let Err(e) = self.process_task(&context, &task).await {
                warn!(
                    task_id = task.id,
                    project_id = task.project_id,
                    error = %e,
                    "failed to process code indexing task"
                );
            }
        }

        Ok(())
        self.process_task(&context, &request).await
    }
}

@@ -133,24 +82,23 @@ impl CodeIndexingTaskHandler {
    async fn process_task(
        &self,
        context: &HandlerContext,
        task: &CodeIndexingTask,
        request: &CodeIndexingTaskRequest,
    ) -> Result<(), HandlerError> {
        let started_at = Instant::now();
        let branch = task.branch_name();

        if self.is_already_indexed(task, &branch).await {
        if self.is_already_indexed(request).await {
            self.metrics.record_outcome("skipped_checkpoint");
            return Ok(());
        }

        info!(
            task_id = task.id,
            project_id = task.project_id,
            branch = %branch,
            task_id = request.task_id,
            project_id = request.project_id,
            branch = %request.branch,
            "starting code indexing"
        );

        let result = self.index_with_lock(context, task, &branch).await;
        let result = self.index_with_lock(context, request).await;

        let outcome = if result.is_ok() { "indexed" } else { "error" };
        self.metrics.record_outcome(outcome);
@@ -164,14 +112,14 @@ impl CodeIndexingTaskHandler {
    async fn index_with_lock(
        &self,
        context: &HandlerContext,
        task: &CodeIndexingTask,
        branch: &str,
        request: &CodeIndexingTaskRequest,
    ) -> Result<(), HandlerError> {
        let project_id = task.project_id;
        let project_id = request.project_id;
        let branch = &request.branch;

        if !self.try_acquire_lock(context, project_id, branch).await? {
            debug!(
                task_id = task.id,
                task_id = request.task_id,
                project_id,
                branch = %branch,
                "lock held by another indexer, skipping"
@@ -187,9 +135,9 @@ impl CodeIndexingTaskHandler {
                &IndexingRequest {
                    project_id,
                    branch: branch.to_string(),
                    traversal_path: task.traversal_path.clone(),
                    task_id: task.id,
                    commit_sha: Some(task.commit_sha.clone()),
                    traversal_path: request.traversal_path.clone(),
                    task_id: request.task_id,
                    commit_sha: Some(request.commit_sha.clone()),
                },
            )
            .await;
@@ -207,14 +155,14 @@ impl CodeIndexingTaskHandler {
}

impl CodeIndexingTaskHandler {
    async fn is_already_indexed(&self, task: &CodeIndexingTask, branch: &str) -> bool {
    async fn is_already_indexed(&self, request: &CodeIndexingTaskRequest) -> bool {
        if let Ok(Some(checkpoint)) = self
            .checkpoint_store
            .get_checkpoint(&task.traversal_path, task.project_id, branch)
            .get_checkpoint(&request.traversal_path, request.project_id, &request.branch)
            .await
            && checkpoint.last_task_id >= task.id
            && checkpoint.last_task_id >= request.task_id
        {
            debug!(task_id = task.id, "already indexed, skipping");
            debug!(task_id = request.task_id, "already indexed, skipping");
            return true;
        }
        false
@@ -249,37 +197,6 @@ impl CodeIndexingTaskHandler {
    }
}

#[derive(Debug, Clone)]
struct CodeIndexingTask {
    id: i64,
    project_id: i64,
    ref_name: String,
    commit_sha: String,
    traversal_path: String,
}

impl CodeIndexingTask {
    fn extract(
        extractor: &ColumnExtractor<'_>,
        event: &siphon_proto::ReplicationEvent,
    ) -> Option<Self> {
        Some(Self {
            id: extractor.get_i64(event, "id")?,
            project_id: extractor.get_i64(event, "project_id")?,
            ref_name: extractor.get_string(event, "ref")?.to_string(),
            commit_sha: extractor.get_string(event, "commit_sha")?.to_string(),
            traversal_path: extractor.get_string(event, "traversal_path")?.to_string(),
        })
    }

    fn branch_name(&self) -> String {
        self.ref_name
            .strip_prefix("refs/heads/")
            .unwrap_or(&self.ref_name)
            .to_string()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
@@ -290,11 +207,8 @@ mod tests {
    use crate::modules::code::metrics::CodeMetrics;
    use crate::modules::code::repository_service::test_utils::MockRepositoryService;
    use crate::modules::code::stale_data_cleaner::test_utils::MockStaleDataCleaner;
    use crate::modules::code::test_helpers::{
        build_replication_events, code_indexing_task_columns,
    };
    use crate::nats::ProgressNotifier;
    use crate::testkit::{MockDestination, MockLockService, MockNatsServices, TestEnvelopeFactory};
    use crate::testkit::{MockDestination, MockLockService, MockNatsServices};
    use chrono::Utc;

    fn test_metrics() -> CodeMetrics {
@@ -357,6 +271,17 @@ mod tests {
            )
        }

        fn make_request(task_id: i64, project_id: i64, branch: &str) -> Envelope {
            Envelope::new(&CodeIndexingTaskRequest {
                task_id,
                project_id,
                branch: branch.to_string(),
                commit_sha: "abc123".to_string(),
                traversal_path: format!("/org/project-{}", project_id),
            })
            .unwrap()
        }

        async fn set_checkpoint(
            &self,
            project_id: i64,
@@ -389,16 +314,12 @@ mod tests {
    }

    #[tokio::test]
    async fn skips_already_indexed_commits() {
    async fn skips_already_indexed_tasks() {
        let ctx = TestContext::new();
        ctx.set_checkpoint(123, "/org/project-123", "main", 100)
            .await;

        let payload = build_replication_events(vec![
            code_indexing_task_columns(50, 123, "main", "abc123", "/org/project-123").build(),
        ]);
        let envelope = TestEnvelopeFactory::with_bytes(payload);

        let envelope = TestContext::make_request(50, 123, "main");
        let result = ctx.handler.handle(ctx.handler_context(), envelope).await;

        assert!(result.is_ok());
@@ -410,58 +331,31 @@ mod tests {
        let ctx = TestContext::new();
        ctx.set_lock(123, "main");

        let payload = build_replication_events(vec![
            code_indexing_task_columns(100, 123, "main", "abc123", "/org/project-123").build(),
        ]);
        let envelope = TestEnvelopeFactory::with_bytes(payload);

        let envelope = TestContext::make_request(100, 123, "main");
        let result = ctx.handler.handle(ctx.handler_context(), envelope).await;

        assert!(result.is_ok());
    }

    #[tokio::test]
    async fn skips_initial_snapshot_events() {
        use siphon_proto::replication_event::Operation;

    #[test]
    fn handler_name() {
        let ctx = TestContext::new();

        let payload = build_replication_events(vec![
            code_indexing_task_columns(100, 123, "main", "abc123", "/org/project-123")
                .with_operation(Operation::InitialSnapshot as i32)
                .build(),
        ]);
        let envelope = TestEnvelopeFactory::with_bytes(payload);

        let result = ctx.handler.handle(ctx.handler_context(), envelope).await;

        assert!(result.is_ok());
        assert!(!ctx.lock_exists(123, "main"));
        assert_eq!(ctx.handler.name(), "code_indexing_task");
    }

    #[tokio::test]
    async fn strips_refs_heads_prefix_from_ref() {
        let task = CodeIndexingTask {
            id: 1,
            project_id: 123,
            ref_name: "refs/heads/main".to_string(),
            commit_sha: "abc123".to_string(),
            traversal_path: "/org/project-123".to_string(),
        };

        assert_eq!(task.branch_name(), "main");
    #[test]
    fn handler_subscription_matches_request_subscription() {
        let ctx = TestContext::new();
        let subscription = ctx.handler.subscription();
        let expected = CodeIndexingTaskRequest::subscription();
        assert_eq!(subscription.stream, expected.stream);
        assert_eq!(subscription.subject, expected.subject);
    }

    #[tokio::test]
    async fn handles_ref_without_prefix() {
        let task = CodeIndexingTask {
            id: 1,
            project_id: 123,
            ref_name: "main".to_string(),
            commit_sha: "abc123".to_string(),
            traversal_path: "/org/project-123".to_string(),
        };

        assert_eq!(task.branch_name(), "main");
    #[test]
    fn handler_subscription_has_dead_letter_on_exhaustion() {
        let ctx = TestContext::new();
        let subscription = ctx.handler.subscription();
        assert!(subscription.dead_letter_on_exhaustion);
    }
}
+4 −0
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@ mod project_code_indexing_handler;
mod project_store;
mod push_event_store;
mod repository_service;
mod siphon_code_indexing_task_dispatcher;
mod siphon_decoder;
mod stale_data_cleaner;
#[cfg(test)]
@@ -31,6 +32,9 @@ use config::CodeTableNames;
use gitlab_client::GitlabClient;
use metrics::CodeMetrics;
pub use project_code_indexing_handler::ProjectCodeIndexingHandlerConfig;
pub use siphon_code_indexing_task_dispatcher::{
    SiphonCodeIndexingTaskDispatcher, SiphonCodeIndexingTaskDispatcherConfig,
};

pub use checkpoint_store::ClickHouseCodeCheckpointStore;
pub use code_indexing_task_handler::CodeIndexingTaskHandler;
+463 −0

File added.

Preview size limit exceeded, changes collapsed.

Loading