Verified Commit d72d3e23 authored by Jean-Gabriel Doyon PTO until 2024-04-17's avatar Jean-Gabriel Doyon PTO until 2024-04-17 Committed by GitLab
Browse files

feat(indexer): add metrics and improve logging for code indexing handler

parent 28db0201
Loading
Loading
Loading
Loading
+147 −0
Original line number Diff line number Diff line
use code_graph::analysis::types::GraphData;
use opentelemetry::KeyValue;
use opentelemetry::global;
use opentelemetry::metrics::{Counter, Histogram, Meter};

use crate::module::HandlerError;

const DURATION_BUCKETS: &[f64] = &[
    0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0,
];

#[derive(Clone)]
pub struct CodeMetrics {
    pub(super) events_processed: Counter<u64>,
    pub(super) handler_duration: Histogram<f64>,
    pub(super) repository_fetch_duration: Histogram<f64>,
    pub(super) indexing_duration: Histogram<f64>,
    pub(super) write_duration: Histogram<f64>,
    pub(super) files_processed: Counter<u64>,
    pub(super) nodes_indexed: Counter<u64>,
    pub(super) errors: Counter<u64>,
}

impl CodeMetrics {
    pub fn new() -> Self {
        let meter = global::meter("indexer_code");
        Self::with_meter(&meter)
    }

    pub fn with_meter(meter: &Meter) -> Self {
        let events_processed = meter
            .u64_counter("indexer.code.events.processed")
            .with_description("Total push events processed by the code indexing handler")
            .build();

        let handler_duration = meter
            .f64_histogram("indexer.code.handler.duration")
            .with_unit("s")
            .with_description("End-to-end duration of processing a single push event")
            .with_boundaries(DURATION_BUCKETS.to_vec())
            .build();

        let repository_fetch_duration = meter
            .f64_histogram("indexer.code.repository.fetch.duration")
            .with_unit("s")
            .with_description("Duration of fetching and extracting a repository from Gitaly")
            .with_boundaries(DURATION_BUCKETS.to_vec())
            .build();

        let indexing_duration = meter
            .f64_histogram("indexer.code.indexing.duration")
            .with_unit("s")
            .with_description("Duration of code-graph parsing and analysis")
            .with_boundaries(DURATION_BUCKETS.to_vec())
            .build();

        let write_duration = meter
            .f64_histogram("indexer.code.write.duration")
            .with_unit("s")
            .with_description("Duration of writing all graph tables to ClickHouse")
            .with_boundaries(DURATION_BUCKETS.to_vec())
            .build();

        let files_processed = meter
            .u64_counter("indexer.code.files.processed")
            .with_description("Total files seen by the code-graph indexer")
            .build();

        let nodes_indexed = meter
            .u64_counter("indexer.code.nodes.indexed")
            .with_description("Total graph nodes and edges indexed by the code handler")
            .build();

        let errors = meter
            .u64_counter("indexer.code.errors")
            .with_description("Total code indexing errors by pipeline stage")
            .build();

        Self {
            events_processed,
            handler_duration,
            repository_fetch_duration,
            indexing_duration,
            write_duration,
            files_processed,
            nodes_indexed,
            errors,
        }
    }
}

impl CodeMetrics {
    pub(super) fn record_outcome(&self, outcome: &'static str) {
        self.events_processed
            .add(1, &[KeyValue::new("outcome", outcome)]);
    }

    pub(super) fn record_node_counts(&self, graph_data: &GraphData) {
        self.nodes_indexed.add(
            graph_data.directory_nodes.len() as u64,
            &[KeyValue::new("kind", "directory")],
        );
        self.nodes_indexed.add(
            graph_data.file_nodes.len() as u64,
            &[KeyValue::new("kind", "file")],
        );
        self.nodes_indexed.add(
            graph_data.definition_nodes.len() as u64,
            &[KeyValue::new("kind", "definition")],
        );
        self.nodes_indexed.add(
            graph_data.imported_symbol_nodes.len() as u64,
            &[KeyValue::new("kind", "imported_symbol")],
        );
        self.nodes_indexed.add(
            graph_data.relationships.len() as u64,
            &[KeyValue::new("kind", "edge")],
        );
    }
}

impl Default for CodeMetrics {
    fn default() -> Self {
        Self::new()
    }
}

pub(super) trait RecordStageError<T> {
    fn record_error_stage(
        self,
        metrics: &CodeMetrics,
        stage: &'static str,
    ) -> Result<T, HandlerError>;
}

impl<T> RecordStageError<T> for Result<T, HandlerError> {
    fn record_error_stage(
        self,
        metrics: &CodeMetrics,
        stage: &'static str,
    ) -> Result<T, HandlerError> {
        if self.is_err() {
            metrics.errors.add(1, &[KeyValue::new("stage", stage)]);
        }
        self
    }
}
+5 −0
Original line number Diff line number Diff line
@@ -6,6 +6,7 @@

mod arrow_converter;
pub mod config;
pub mod metrics;
mod project_store;
mod push_event_handler;
mod repository_service;
@@ -20,6 +21,7 @@ use std::sync::Arc;
use crate::clickhouse::ClickHouseConfiguration;
use crate::module::{Handler, Module, ModuleInitError};
use gitlab_client::GitlabClient;
use metrics::CodeMetrics;

pub use config::CodeIndexingConfig;
pub use project_store::ClickHouseProjectStore;
@@ -36,6 +38,7 @@ pub struct CodeModule {
    project_store: Arc<dyn project_store::ProjectStore>,
    stale_data_cleaner: Arc<dyn stale_data_cleaner::StaleDataCleaner>,
    config: CodeIndexingConfig,
    metrics: CodeMetrics,
}

impl CodeModule {
@@ -56,6 +59,7 @@ impl CodeModule {
                client,
            )),
            config,
            metrics: CodeMetrics::new(),
        })
    }
}
@@ -72,6 +76,7 @@ impl Module for CodeModule {
            Arc::clone(&self.project_store),
            Arc::clone(&self.stale_data_cleaner),
            self.config.clone(),
            self.metrics.clone(),
        ))]
    }

+118 −32
Original line number Diff line number Diff line
@@ -2,14 +2,17 @@

use std::path::Path;
use std::sync::Arc;
use std::time::Instant;

use crate::module::{Handler, HandlerContext, HandlerError};
use crate::types::{Envelope, Topic};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use code_graph::analysis::types::GraphData;
use code_graph::indexer::{IndexingConfig, RepositoryIndexer};
use code_graph::loading::DirectoryFileSource;
use ontology::EDGE_TABLE;
use opentelemetry::KeyValue;
use siphon_proto::replication_event::Operation;
use tempfile::TempDir;
use tracing::{debug, info, warn};
@@ -17,6 +20,7 @@ use tracing::{debug, info, warn};
use super::arrow_converter::ArrowConverter;
use super::config::LOCK_TTL;
use super::config::{CodeIndexingConfig, siphon_actions, siphon_ref_types, subjects, tables};
use super::metrics::{CodeMetrics, RecordStageError};
use super::project_store::{ProjectInfo, ProjectStore};
use super::repository_service::RepositoryService;
use super::siphon_decoder::{ColumnExtractor, decode_logical_replication_events};
@@ -31,6 +35,7 @@ pub struct PushEventHandler {
    project_store: Arc<dyn ProjectStore>,
    stale_data_cleaner: Arc<dyn StaleDataCleaner>,
    config: CodeIndexingConfig,
    metrics: CodeMetrics,
}

impl PushEventHandler {
@@ -40,6 +45,7 @@ impl PushEventHandler {
        project_store: Arc<dyn ProjectStore>,
        stale_data_cleaner: Arc<dyn StaleDataCleaner>,
        config: CodeIndexingConfig,
        metrics: CodeMetrics,
    ) -> Self {
        Self {
            repository_service,
@@ -47,6 +53,7 @@ impl PushEventHandler {
            project_store,
            stale_data_cleaner,
            config,
            metrics,
        }
    }
}
@@ -68,15 +75,13 @@ impl Handler for PushEventHandler {
        )
    }

    // TODO: Add metrics around the processed events
    async fn handle(&self, context: HandlerContext, message: Envelope) -> Result<(), HandlerError> {
        debug!(message_id = %message.id.0, "received push event payload");

        let replication_events =
            decode_logical_replication_events(&message.payload).map_err(|e| {
                warn!(message_id = %message.id.0, error = %e, "failed to decode");
                HandlerError::Processing(e)
            })?;
        let replication_events = decode_logical_replication_events(&message.payload)
            .inspect_err(|e| warn!(message_id = %message.id.0, error = %e, "failed to decode push event payload"))
            .map_err(HandlerError::Processing)
            .record_error_stage(&self.metrics, "decode")?;

        let extractor = ColumnExtractor::new(&replication_events);

@@ -93,13 +98,19 @@ impl Handler for PushEventHandler {

            debug!(
                event_id = push_event.event_id,
                project_id = ?push_event.project_id,
                project_id = push_event.project_id,
                ref_name = %push_event.ref_name,
                "processing push event"
            );

            if let Err(e) = self.process_push_event(&context, &push_event).await {
                warn!(event_id = push_event.event_id, error = %e, "failed to process");
                warn!(
                    event_id = push_event.event_id,
                    project_id = push_event.project_id,
                    ref_name = %push_event.ref_name,
                    error = %e,
                    "failed to process push event"
                );
            }
        }

@@ -113,6 +124,8 @@ impl PushEventHandler {
        context: &HandlerContext,
        event: &PushEventPayload,
    ) -> Result<(), HandlerError> {
        let started_at = Instant::now();

        let Some(branch) = self.validate_push_event(event) else {
            return Ok(());
        };
@@ -123,9 +136,8 @@ impl PushEventHandler {
            .repository_service
            .repository_info(project_id)
            .await
            .map_err(|e| {
                HandlerError::Processing(format!("failed to fetch repository info: {e}"))
            })?;
            .map_err(|e| HandlerError::Processing(format!("failed to fetch repository info: {e}")))
            .record_error_stage(&self.metrics, "repository_fetch")?;

        let default_branch = repository
            .default_branch
@@ -139,16 +151,19 @@ impl PushEventHandler {
                branch = %branch,
                "skipping non-default branch"
            );
            self.metrics.record_outcome("skipped_branch");
            return Ok(());
        }

        let Some(project) = self.lookup_project(event.event_id, project_id).await? else {
            self.metrics.record_outcome("skipped_project_not_found");
            return Err(HandlerError::Processing(
                "project not found in knowledge graph".into(),
            ));
        };

        if self.is_already_indexed(event, project_id, &branch).await {
            self.metrics.record_outcome("skipped_watermark");
            return Ok(());
        }

@@ -159,8 +174,17 @@ impl PushEventHandler {
            "starting code indexing"
        );

        self.index_with_lock(context, event, project_id, &branch, &project, &repository)
            .await
        let result = self
            .index_with_lock(context, event, project_id, &branch, &project, &repository)
            .await;

        let outcome = if result.is_ok() { "indexed" } else { "error" };
        self.metrics.record_outcome(outcome);
        self.metrics
            .handler_duration
            .record(started_at.elapsed().as_secs_f64(), &[]);

        result
    }

    async fn index_with_lock(
@@ -179,6 +203,7 @@ impl PushEventHandler {
                branch = %branch,
                "lock held by another indexer, skipping"
            );
            self.metrics.record_outcome("skipped_lock");
            return Ok(());
        }

@@ -187,10 +212,15 @@ impl PushEventHandler {
        let temp_dir = TempDir::new()
            .map_err(|e| HandlerError::Processing(format!("failed to create temp dir: {e}")))?;

        let fetch_start = Instant::now();
        self.repository_service
            .extract_repository(repository, temp_dir.path(), &event.revision_after)
            .await
            .map_err(|e| HandlerError::Processing(format!("failed to extract repository: {e}")))?;
            .map_err(|e| HandlerError::Processing(format!("failed to extract repository: {e}")))
            .record_error_stage(&self.metrics, "repository_extract")?;
        self.metrics
            .repository_fetch_duration
            .record(fetch_start.elapsed().as_secs_f64(), &[]);

        let result = self
            .run_indexing(
@@ -224,14 +254,49 @@ impl PushEventHandler {
        let indexer = RepositoryIndexer::new(format!("project-{project_id}"), repo_path.clone());
        let file_source = DirectoryFileSource::new(repo_path);

        let indexing_start = Instant::now();
        let result = indexer
            .index_files(file_source, &IndexingConfig::default())
            .await
            .map_err(|e| HandlerError::Processing(format!("failed to index code: {e}")))?;
            .map_err(|e| HandlerError::Processing(format!("failed to index code: {e}")))
            .record_error_stage(&self.metrics, "indexing")?;
        self.metrics
            .indexing_duration
            .record(indexing_start.elapsed().as_secs_f64(), &[]);

        self.metrics.files_processed.add(
            result.skipped_files.len() as u64,
            &[KeyValue::new("outcome", "skipped")],
        );
        self.metrics.files_processed.add(
            result.errored_files.len() as u64,
            &[KeyValue::new("outcome", "errored")],
        );

        if !result.errored_files.is_empty() {
            warn!(
                project_id,
                branch = %branch,
                count = result.errored_files.len(),
                "some files failed to parse during code indexing"
            );
        }

        let Some(mut graph_data) = result.graph_data else {
            debug!(project_id, branch = %branch, "indexing produced no graph data, skipping write");
            return Ok(());
        };

        if let Some(mut graph_data) = result.graph_data {
        // TODO: This should be done on construction of the GraphData struct.
        graph_data.assign_node_ids(project_id, branch);

        self.metrics.files_processed.add(
            graph_data.file_nodes.len() as u64,
            &[KeyValue::new("outcome", "parsed")],
        );
        self.metrics.record_node_counts(&graph_data);

        let write_start = Instant::now();
        self.write_graph_data(
            context,
            project_id,
@@ -241,14 +306,21 @@ impl PushEventHandler {
            &graph_data,
        )
        .await?;
        }
        self.metrics
            .write_duration
            .record(write_start.elapsed().as_secs_f64(), &[]);

        if let Err(error) = self
            .stale_data_cleaner
            .delete_stale_data(traversal_path, project_id, branch, indexed_at)
            .await
        {
            warn!(%error, "failed to delete stale data, will retry on next push");
            warn!(
                project_id,
                branch = %branch,
                %error,
                "failed to delete stale data, will retry on next push"
            );
        }

        Ok(())
@@ -278,13 +350,15 @@ impl PushEventHandler {
        self.watermark_store
            .set_watermark(&watermark)
            .await
            .map_err(|e| HandlerError::Processing(format!("failed to set watermark: {e}")))?;
            .map_err(|e| HandlerError::Processing(format!("failed to set watermark: {e}")))
            .record_error_stage(&self.metrics, "watermark")?;

        info!(
            project_id,
            branch = %branch,
            commit = %event.revision_after,
            "successfully indexed code"
            event_id = event.event_id,
            "completed code indexing"
        );

        Ok(())
@@ -385,7 +459,7 @@ impl PushEventHandler {
        branch: &str,
        traversal_path: &str,
        indexed_at: DateTime<Utc>,
        graph_data: &code_graph::analysis::types::GraphData,
        graph_data: &GraphData,
    ) -> Result<(), HandlerError> {
        let converter = ArrowConverter::new(
            traversal_path.to_string(),
@@ -396,7 +470,8 @@ impl PushEventHandler {

        let converted = converter
            .convert_all(graph_data)
            .map_err(|e| HandlerError::Processing(format!("arrow conversion failed: {e}")))?;
            .map_err(|e| HandlerError::Processing(format!("arrow conversion failed: {e}")))
            .record_error_stage(&self.metrics, "arrow_conversion")?;

        self.write_batch(ctx, tables::GL_DIRECTORY, &converted.directories)
            .await?;
@@ -425,12 +500,14 @@ impl PushEventHandler {
            .destination
            .new_batch_writer(table)
            .await
            .map_err(|e| HandlerError::Processing(format!("writer creation failed: {e}")))?;
            .map_err(|e| HandlerError::Processing(format!("writer creation failed: {e}")))
            .record_error_stage(&self.metrics, "write")?;

        writer
            .write_batch(std::slice::from_ref(batch))
            .await
            .map_err(|e| HandlerError::Processing(format!("write to {table} failed: {e}")))
            .record_error_stage(&self.metrics, "write")
    }
}

@@ -468,6 +545,7 @@ impl PushEventPayload {
mod tests {
    use super::*;
    use crate::module::Handler;
    use crate::modules::code::metrics::CodeMetrics;
    use crate::modules::code::project_store::ProjectInfo;
    use crate::modules::code::project_store::test_utils::MockProjectStore;
    use crate::modules::code::repository_service::test_utils::MockRepositoryService;
@@ -475,6 +553,13 @@ mod tests {
    use crate::modules::code::test_helpers::{build_replication_events, push_payload_columns};
    use crate::modules::code::watermark_store::test_utils::MockCodeWatermarkStore;
    use crate::testkit::{MockDestination, MockLockService, MockNatsServices, TestEnvelopeFactory};

    fn test_metrics() -> CodeMetrics {
        let provider = opentelemetry::global::meter_provider();
        let meter = provider.meter("test");
        CodeMetrics::with_meter(&meter)
    }

    struct TestContext {
        handler: PushEventHandler,
        mock_nats: Arc<MockNatsServices>,
@@ -497,6 +582,7 @@ mod tests {
                project_store.clone(),
                stale_data_cleaner,
                CodeIndexingConfig::default(),
                test_metrics(),
            );

            Self {
+3 −1
Original line number Diff line number Diff line
@@ -10,7 +10,7 @@ use gitlab_client::{GitalyConnectionInfo, RepositoryInfo};
use indexer::module::{Handler, HandlerContext};
use indexer::modules::code::{
    ClickHouseCodeWatermarkStore, ClickHouseProjectStore, ClickHouseStaleDataCleaner,
    CodeIndexingConfig, PushEventHandler, RepositoryService,
    CodeIndexingConfig, PushEventHandler, RepositoryService, metrics::CodeMetrics,
};
use indexer::testkit::{MockLockService, MockNatsServices, TestEnvelopeFactory};
use prost::Message;
@@ -124,6 +124,7 @@ async fn indexes_repository_from_gitaly() {
        Arc::new(ClickHouseProjectStore::new(Arc::clone(&clickhouse_client))),
        Arc::new(ClickHouseStaleDataCleaner::new(clickhouse_client)),
        CodeIndexingConfig::default(),
        CodeMetrics::new(),
    );

    let context = HandlerContext::new(
@@ -304,6 +305,7 @@ fn create_push_event_handler(gitaly_address: &str, clickhouse: &TestContext) ->
        Arc::new(ClickHouseProjectStore::new(Arc::clone(&clickhouse_client))),
        Arc::new(ClickHouseStaleDataCleaner::new(clickhouse_client)),
        CodeIndexingConfig::default(),
        CodeMetrics::new(),
    )
}

+14 −1
Original line number Diff line number Diff line
@@ -36,7 +36,7 @@ Each service exposes a Prometheus `/metrics` endpoint. We use LabKit for instrum

**KG Indexer Service:**

The indexer emits metrics under two OpenTelemetry meters: `etl_engine` for the core engine and `indexer_sdlc` for the SDLC module. All duration histograms use OTel-recommended buckets (5 ms to 10 s).
The indexer emits metrics under three OpenTelemetry meters: `etl_engine` for the core engine, `indexer_sdlc` for the SDLC module, and `indexer_code` for the code indexing module. All duration histograms use OTel-recommended buckets (5 ms to 10 s).

*Engine metrics (`etl_engine`):*

@@ -68,6 +68,19 @@ The indexer emits metrics under two OpenTelemetry meters: `etl_engine` for the c
| `indexer.sdlc.transform.duration` | Histogram | s | `entity` | Duration of DataFusion SQL transform per batch |
| `indexer.sdlc.watermark.lag` | Gauge | s | `entity` | Seconds between the current watermark and wall clock (data freshness) |

*Code module metrics (`indexer_code`):*

| Metric | Type | Unit | Labels | Description |
|---|---|---|---|---|
| `indexer.code.events.processed` | Counter | count | `outcome` (indexed, skipped_branch, skipped_watermark, skipped_lock, skipped_project_not_found, error) | Total push events processed by the code handler |
| `indexer.code.handler.duration` | Histogram | s | | End-to-end duration of processing a single push event |
| `indexer.code.repository.fetch.duration` | Histogram | s | | Duration of fetching and extracting a repository from Gitaly |
| `indexer.code.indexing.duration` | Histogram | s | | Duration of code-graph parsing and analysis |
| `indexer.code.write.duration` | Histogram | s | | Duration of writing all graph tables to ClickHouse |
| `indexer.code.files.processed` | Counter | count | `outcome` (parsed, skipped, errored) | Total files seen by the code-graph indexer |
| `indexer.code.nodes.indexed` | Counter | count | `kind` (directory, file, definition, imported_symbol, edge) | Total graph nodes and edges indexed |
| `indexer.code.errors` | Counter | count | `stage` (decode, repository_fetch, repository_extract, indexing, arrow_conversion, write, watermark) | Code indexing errors by pipeline stage |

**KG Web Service:**

- **Query Health**: p50/p95 latency by tool (`find_nodes`, `traverse`, `explore`, `aggregate`), memory spikes, and rows/bytes read per query.