Verified Commit 55c949e4 authored by Michael Angelo Rivera's avatar Michael Angelo Rivera Committed by GitLab
Browse files

fix(indexer): checkpoint empty repositories as terminal

parent 62732cc3
Loading
Loading
Loading
Loading
+14 −4
Original line number Diff line number Diff line
@@ -258,11 +258,16 @@ impl GitlabClient {
        response: &reqwest::Response,
        project_id: i64,
    ) -> Result<(), GitlabClientError> {
        match response.status() {
        let status = response.status();
        match status {
            StatusCode::OK => Ok(()),
            StatusCode::UNAUTHORIZED => Err(GitlabClientError::Unauthorized),
            StatusCode::NOT_FOUND => Err(GitlabClientError::NotFound(project_id)),
            status => Err(GitlabClientError::Unexpected(format!(
            _ if status.is_server_error() => Err(GitlabClientError::ServerError {
                project_id,
                status: status.as_u16(),
            }),
            _ => Err(GitlabClientError::Unexpected(format!(
                "unexpected status {status}"
            ))),
        }
@@ -275,11 +280,16 @@ impl GitlabClient {
        if response.status() == StatusCode::BAD_REQUEST {
            return Err(GitlabClientError::ForcePush(project_id));
        }
        match response.status() {
        let status = response.status();
        match status {
            StatusCode::OK => Ok(()),
            StatusCode::UNAUTHORIZED => Err(GitlabClientError::Unauthorized),
            StatusCode::NOT_FOUND => Err(GitlabClientError::NotFound(project_id)),
            status => Err(GitlabClientError::Unexpected(format!(
            _ if status.is_server_error() => Err(GitlabClientError::ServerError {
                project_id,
                status: status.as_u16(),
            }),
            _ => Err(GitlabClientError::Unexpected(format!(
                "unexpected status {status} for project {project_id}"
            ))),
        }
+3 −0
Original line number Diff line number Diff line
@@ -9,6 +9,9 @@ pub enum GitlabClientError {
    #[error("project {0} not found (404)")]
    NotFound(i64),

    #[error("server error for project {project_id}: status {status}")]
    ServerError { project_id: i64, status: u16 },

    #[error("force push detected for project {0}")]
    ForcePush(i64),

+70 −11
Original line number Diff line number Diff line
@@ -6,7 +6,7 @@ use tracing::{debug, info, warn};

use super::checkpoint_store::CodeCheckpointStore;
use super::config::CODE_LOCK_TTL;
use super::indexing_pipeline::{CodeIndexingPipeline, IndexingRequest};
use super::indexing_pipeline::{CodeIndexingPipeline, IndexOutcome, IndexingRequest};
use super::locking::project_lock_key;
use super::metrics::CodeMetrics;
use super::repository::RepositoryService;
@@ -117,13 +117,18 @@ impl CodeIndexingTaskHandler {

        let result = self.index_with_lock(context, request, &branch).await;

        let outcome = if result.is_ok() { "indexed" } else { "error" };
        let outcome = match &result {
            Ok(Some(IndexOutcome::Indexed)) => "indexed",
            Ok(Some(IndexOutcome::EmptyRepository)) => "empty_repository",
            Ok(None) => "skipped_lock",
            Err(_) => "error",
        };
        self.metrics.record_outcome(outcome);
        self.metrics
            .handler_duration
            .record(started_at.elapsed().as_secs_f64(), &[]);

        result
        result.map(|_| ())
    }

    async fn index_with_lock(
@@ -131,7 +136,7 @@ impl CodeIndexingTaskHandler {
        context: &HandlerContext,
        request: &CodeIndexingTaskRequest,
        branch: &str,
    ) -> Result<(), HandlerError> {
    ) -> Result<Option<IndexOutcome>, HandlerError> {
        let project_id = request.project_id;

        if !self.try_acquire_lock(context, project_id, branch).await? {
@@ -141,8 +146,7 @@ impl CodeIndexingTaskHandler {
                branch = %branch,
                "lock held by another indexer, skipping"
            );
            self.metrics.record_outcome("skipped_lock");
            return Ok(());
            return Ok(None);
        }

        let result = self
@@ -167,7 +171,7 @@ impl CodeIndexingTaskHandler {
            warn!(project_id, branch = %branch, error = %e, "failed to index code");
        }

        result
        result.map(Some)
    }
}

@@ -239,13 +243,13 @@ mod tests {
        mock_nats: Arc<MockNatsServices>,
        mock_locks: Arc<MockLockService>,
        mock_checkpoints: Arc<MockCodeCheckpointStore>,
        mock_repo: Arc<MockRepositoryService>,
        _cache_dir: tempfile::TempDir,
    }

    impl TestContext {
        fn new() -> Self {
            let mock_repo: Arc<dyn RepositoryService> =
                MockRepositoryService::with_default_branch(123, "main");
            let mock_repo = MockRepositoryService::with_default_branch(123, "main");
            let mock_nats = Arc::new(MockNatsServices::new());
            let mock_locks = Arc::new(MockLockService::new());
            let mock_checkpoints = Arc::new(MockCodeCheckpointStore::new());
@@ -253,6 +257,7 @@ mod tests {
            let metrics = test_metrics();

            let checkpoint_store: Arc<dyn CodeCheckpointStore> = mock_checkpoints.clone();
            let repo_service: Arc<dyn RepositoryService> = mock_repo.clone();

            let ontology = ontology::Ontology::load_embedded().expect("ontology must load");
            let table_names = Arc::new(
@@ -263,7 +268,8 @@ mod tests {
            let temp_dir = tempfile::TempDir::new().expect("failed to create temp dir");
            let cache: Arc<dyn crate::modules::code::repository::RepositoryCache> =
                Arc::new(LocalRepositoryCache::new(temp_dir.path().to_path_buf()));
            let resolver = RepositoryResolver::new(Arc::clone(&mock_repo), cache, metrics.clone());
            let resolver =
                RepositoryResolver::new(Arc::clone(&repo_service), cache, metrics.clone());

            let pipeline = Arc::new(CodeIndexingPipeline::new(
                resolver,
@@ -275,7 +281,7 @@ mod tests {

            let handler = CodeIndexingTaskHandler::new(
                pipeline,
                mock_repo,
                repo_service,
                Arc::clone(&checkpoint_store),
                metrics,
                CodeIndexingTaskHandlerConfig::default(),
@@ -286,6 +292,7 @@ mod tests {
                mock_nats,
                mock_locks,
                mock_checkpoints,
                mock_repo,
                _cache_dir: temp_dir,
            }
        }
@@ -385,6 +392,58 @@ mod tests {
        assert!(!ctx.lock_exists(123, "main"));
    }

    #[tokio::test]
    async fn empty_repository_sets_checkpoint_and_acks() {
        use crate::modules::code::repository::RepositoryServiceError;
        use gitlab_client::GitlabClientError;

        let ctx = TestContext::new();
        ctx.mock_repo.set_download_error(
            123,
            RepositoryServiceError::GitlabApi(GitlabClientError::NotFound(123)),
        );

        let envelope = TestContext::make_request(42, 123, "main");
        let result = ctx.handler.handle(ctx.handler_context(), envelope).await;

        assert!(result.is_ok(), "empty repo should ack, got {result:?}");
        let checkpoint = ctx
            .mock_checkpoints
            .get_checkpoint("/org/project-123", 123, "main")
            .await
            .unwrap()
            .expect("checkpoint should be set for empty repo");
        assert_eq!(checkpoint.last_task_id, 42);
        assert!(checkpoint.last_commit.is_none());
    }

    #[tokio::test]
    async fn server_error_sets_checkpoint_and_acks() {
        use crate::modules::code::repository::RepositoryServiceError;
        use gitlab_client::GitlabClientError;

        let ctx = TestContext::new();
        ctx.mock_repo.set_download_error(
            123,
            RepositoryServiceError::GitlabApi(GitlabClientError::ServerError {
                project_id: 123,
                status: 500,
            }),
        );

        let envelope = TestContext::make_request(7, 123, "main");
        let result = ctx.handler.handle(ctx.handler_context(), envelope).await;

        assert!(result.is_ok());
        let checkpoint = ctx
            .mock_checkpoints
            .get_checkpoint("/org/project-123", 123, "main")
            .await
            .unwrap()
            .expect("checkpoint should be set for missing repository");
        assert_eq!(checkpoint.last_task_id, 7);
    }

    #[test]
    fn handler_name() {
        let ctx = TestContext::new();
+57 −5
Original line number Diff line number Diff line
@@ -12,9 +12,10 @@ use super::arrow_converter::ArrowConverter;
use super::checkpoint_store::{CodeCheckpointStore, CodeIndexingCheckpoint};
use super::config::CodeTableNames;
use super::metrics::{CodeMetrics, RecordStageError};
use super::repository::RepositoryResolver;
use super::repository::{RepositoryResolver, ResolveError};
use super::stale_data_cleaner::StaleDataCleaner;
use crate::handler::{HandlerContext, HandlerError};
use opentelemetry::KeyValue;

pub struct IndexingRequest {
    pub project_id: i64,
@@ -24,6 +25,19 @@ pub struct IndexingRequest {
    pub commit_sha: Option<String>,
}

/// Terminal outcome of `CodeIndexingPipeline::index_project`.
///
/// The handler records a single `events_processed` outcome label based on
/// this variant — keeping `indexed` and `empty_repository` mutually exclusive.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IndexOutcome {
    /// Repository downloaded, parsed, written to the graph, and checkpointed.
    Indexed,
    /// Archive endpoint signalled no repository content (404 or 5xx); the
    /// checkpoint was still set so retries and DLQ are avoided.
    EmptyRepository,
}

pub struct CodeIndexingPipeline {
    resolver: RepositoryResolver,
    checkpoint_store: Arc<dyn CodeCheckpointStore>,
@@ -53,9 +67,9 @@ impl CodeIndexingPipeline {
        &self,
        context: &HandlerContext,
        request: &IndexingRequest,
    ) -> Result<(), HandlerError> {
    ) -> Result<IndexOutcome, HandlerError> {
        let fetch_start = Instant::now();
        let repo_path = self
        let repo_path = match self
            .resolver
            .resolve(
                request.project_id,
@@ -63,7 +77,43 @@ impl CodeIndexingPipeline {
                request.commit_sha.as_deref(),
            )
            .await
            .record_error_stage(&self.metrics, "repository_fetch")?;
        {
            Ok(path) => {
                self.metrics.record_resolution_strategy("full_download");
                path
            }
            Err(ResolveError::EmptyRepository { reason, detail }) => {
                warn!(
                    project_id = request.project_id,
                    branch = %request.branch,
                    reason = %reason,
                    detail,
                    "project has no repository content; checkpointing as indexed-empty"
                );
                self.metrics.record_resolution_strategy("empty_repository");
                self.metrics
                    .record_empty_repository(reason.as_metric_label());
                self.metrics
                    .repository_fetch_duration
                    .record(fetch_start.elapsed().as_secs_f64(), &[]);
                self.set_checkpoint(
                    &request.traversal_path,
                    request.project_id,
                    &request.branch,
                    request.task_id,
                    None,
                    Utc::now(),
                )
                .await?;
                return Ok(IndexOutcome::EmptyRepository);
            }
            Err(ResolveError::Other(err)) => {
                self.metrics
                    .errors
                    .add(1, &[KeyValue::new("stage", "repository_fetch")]);
                return Err(err);
            }
        };
        self.metrics
            .repository_fetch_duration
            .record(fetch_start.elapsed().as_secs_f64(), &[]);
@@ -110,7 +160,9 @@ impl CodeIndexingPipeline {
            request.commit_sha.as_deref(),
            indexed_at,
        )
        .await
        .await?;

        Ok(IndexOutcome::Indexed)
    }

    async fn set_checkpoint(
+17 −1
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@ pub struct CodeMetrics {
    pub(super) repository_fetch_duration: Histogram<f64>,
    pub(super) repository_resolution_strategy: Counter<u64>,
    pub(super) repository_cleanup: Counter<u64>,
    pub(super) repository_empty: Counter<u64>,
    pub(super) indexing_duration: Histogram<f64>,
    pub(super) files_processed: Counter<u64>,
    pub(super) nodes_indexed: Counter<u64>,
@@ -47,7 +48,9 @@ impl CodeMetrics {

        let repository_resolution_strategy = meter
            .u64_counter("gkg.indexer.code.repository.resolution")
            .with_description("Repository resolution strategy used (full_download)")
            .with_description(
                "Repository resolution strategy used (full_download, empty_repository)",
            )
            .build();

        let repository_cleanup = meter
@@ -55,6 +58,13 @@ impl CodeMetrics {
            .with_description("Repository disk cleanup outcomes (success, failure)")
            .build();

        let repository_empty = meter
            .u64_counter("gkg.indexer.code.repository.empty")
            .with_description(
                "Projects short-circuited as terminal-empty at fetch time, labelled by reason (not_found, server_error)",
            )
            .build();

        let indexing_duration = meter
            .f64_histogram("gkg.indexer.code.indexing.duration")
            .with_unit("s")
@@ -83,6 +93,7 @@ impl CodeMetrics {
            repository_fetch_duration,
            repository_resolution_strategy,
            repository_cleanup,
            repository_empty,
            indexing_duration,
            files_processed,
            nodes_indexed,
@@ -107,6 +118,11 @@ impl CodeMetrics {
            .add(1, &[KeyValue::new("outcome", outcome)]);
    }

    pub(super) fn record_empty_repository(&self, reason: &'static str) {
        self.repository_empty
            .add(1, &[KeyValue::new("reason", reason)]);
    }

    pub(super) fn record_files_processed(&self, count: u64, outcome: &'static str) {
        self.files_processed
            .add(count, &[KeyValue::new("outcome", outcome)]);
Loading