Verified Commit 7e458212 authored by Jean-Gabriel Doyon's avatar Jean-Gabriel Doyon Committed by GitLab
Browse files

refactor(indexer): rename checkpoint fields and make commit_sha optional

parent d595032f
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -481,8 +481,8 @@ CREATE TABLE IF NOT EXISTS code_indexing_checkpoint (
    traversal_path String,
    project_id Int64,
    branch String,
    last_event_id Int64,
    last_commit String,
    last_task_id Int64,
    last_commit Nullable(String),
    indexed_at DateTime64(6, 'UTC'),
    _version UInt64,
    _deleted Bool DEFAULT false,
+15 −11
Original line number Diff line number Diff line
@@ -26,8 +26,8 @@ pub struct CodeIndexingCheckpoint {
    pub traversal_path: String,
    pub project_id: i64,
    pub branch: String,
    pub last_event_id: i64,
    pub last_commit: String,
    pub last_task_id: i64,
    pub last_commit: Option<String>,
    pub indexed_at: DateTime<Utc>,
}

@@ -68,7 +68,7 @@ impl ClickHouseCodeCheckpointStore {
            _ => return Ok(None),
        };

        let last_event_id_col = batch
        let last_task_id_col = batch
            .column(0)
            .as_any()
            .downcast_ref::<Int64Array>()
@@ -86,12 +86,16 @@ impl ClickHouseCodeCheckpointStore {
            .downcast_ref::<TimestampMicrosecondArray>()
            .ok_or(CheckpointError::InvalidType)?;

        if last_event_id_col.is_null(0) {
        if last_task_id_col.is_null(0) {
            return Ok(None);
        }

        let last_event_id = last_event_id_col.value(0);
        let last_commit = last_commit_col.value(0).to_string();
        let last_task_id = last_task_id_col.value(0);
        let last_commit = if last_commit_col.is_null(0) {
            None
        } else {
            Some(last_commit_col.value(0).to_string())
        };
        let indexed_at_micros = indexed_at_col.value(0);
        let indexed_at = Utc
            .timestamp_micros(indexed_at_micros)
@@ -102,7 +106,7 @@ impl ClickHouseCodeCheckpointStore {
            traversal_path: traversal_path.to_string(),
            project_id,
            branch: branch.to_string(),
            last_event_id,
            last_task_id,
            last_commit,
            indexed_at,
        }))
@@ -119,7 +123,7 @@ impl CodeCheckpointStore for ClickHouseCodeCheckpointStore {
    ) -> Result<Option<CodeIndexingCheckpoint>, CheckpointError> {
        let query = r#"
            SELECT
                argMax(last_event_id, _version) as last_event_id,
                argMax(last_task_id, _version) as last_task_id,
                argMax(last_commit, _version) as last_commit,
                argMax(indexed_at, _version) as indexed_at
            FROM code_indexing_checkpoint
@@ -151,14 +155,14 @@ impl CodeCheckpointStore for ClickHouseCodeCheckpointStore {
            .query(
                r#"
                INSERT INTO code_indexing_checkpoint
                (traversal_path, project_id, branch, last_event_id, last_commit, indexed_at)
                VALUES ({traversal_path:String}, {project_id:Int64}, {branch:String}, {last_event_id:Int64}, {last_commit:String}, {indexed_at:String})
                (traversal_path, project_id, branch, last_task_id, last_commit, indexed_at)
                VALUES ({traversal_path:String}, {project_id:Int64}, {branch:String}, {last_task_id:Int64}, {last_commit:Nullable(String)}, {indexed_at:String})
            "#,
            )
            .param("traversal_path", &checkpoint.traversal_path)
            .param("project_id", checkpoint.project_id)
            .param("branch", &checkpoint.branch)
            .param("last_event_id", checkpoint.last_event_id)
            .param("last_task_id", checkpoint.last_task_id)
            .param("last_commit", &checkpoint.last_commit)
            .param("indexed_at", formatted_timestamp)
            .execute()
+6 −6
Original line number Diff line number Diff line
@@ -188,8 +188,8 @@ impl CodeIndexingTaskHandler {
                    project_id,
                    branch: branch.to_string(),
                    traversal_path: task.traversal_path.clone(),
                    event_id: task.id,
                    commit_sha: task.commit_sha.clone(),
                    task_id: task.id,
                    commit_sha: Some(task.commit_sha.clone()),
                },
            )
            .await;
@@ -212,7 +212,7 @@ impl CodeIndexingTaskHandler {
            .checkpoint_store
            .get_checkpoint(&task.traversal_path, task.project_id, branch)
            .await
            && checkpoint.last_event_id >= task.id
            && checkpoint.last_task_id >= task.id
        {
            debug!(task_id = task.id, "already indexed, skipping");
            return true;
@@ -362,15 +362,15 @@ mod tests {
            project_id: i64,
            traversal_path: &str,
            branch: &str,
            last_event_id: i64,
            last_task_id: i64,
        ) {
            self.mock_checkpoints
                .set_checkpoint(&CodeIndexingCheckpoint {
                    traversal_path: traversal_path.to_string(),
                    project_id,
                    branch: branch.to_string(),
                    last_event_id,
                    last_commit: "abc".to_string(),
                    last_task_id,
                    last_commit: Some("abc".to_string()),
                    indexed_at: Utc::now(),
                })
                .await
+12 −11
Original line number Diff line number Diff line
@@ -22,8 +22,8 @@ pub struct IndexingRequest {
    pub project_id: i64,
    pub branch: String,
    pub traversal_path: String,
    pub event_id: i64,
    pub commit_sha: String,
    pub task_id: i64,
    pub commit_sha: Option<String>,
}

pub struct CodeIndexingPipeline {
@@ -60,9 +60,10 @@ impl CodeIndexingPipeline {
            .map_err(|e| HandlerError::Processing(format!("failed to create temp dir: {e}")))?;

        let fetch_start = Instant::now();
        let ref_name = request.commit_sha.as_deref().unwrap_or(&request.branch);
        let archive_bytes = self
            .repository_service
            .download_archive(request.project_id, &request.commit_sha)
            .download_archive(request.project_id, ref_name)
            .await
            .map_err(|e| HandlerError::Processing(format!("failed to download archive: {e}")))
            .record_error_stage(&self.metrics, "repository_fetch")?;
@@ -93,8 +94,8 @@ impl CodeIndexingPipeline {
            &request.traversal_path,
            request.project_id,
            &request.branch,
            request.event_id,
            &request.commit_sha,
            request.task_id,
            request.commit_sha.as_deref(),
            indexed_at,
        )
        .await
@@ -105,16 +106,16 @@ impl CodeIndexingPipeline {
        traversal_path: &str,
        project_id: i64,
        branch: &str,
        event_id: i64,
        commit_sha: &str,
        task_id: i64,
        last_commit: Option<&str>,
        indexed_at: DateTime<Utc>,
    ) -> Result<(), HandlerError> {
        let checkpoint = CodeIndexingCheckpoint {
            traversal_path: traversal_path.to_string(),
            project_id,
            branch: branch.to_string(),
            last_event_id: event_id,
            last_commit: commit_sha.to_string(),
            last_task_id: task_id,
            last_commit: last_commit.map(|s| s.to_string()),
            indexed_at,
        };

@@ -127,8 +128,8 @@ impl CodeIndexingPipeline {
        info!(
            project_id,
            branch = %branch,
            commit = %commit_sha,
            event_id,
            commit = ?last_commit,
            task_id,
            "completed code indexing"
        );

+5 −5
Original line number Diff line number Diff line
@@ -131,7 +131,7 @@ impl ProjectCodeIndexingHandler {
            .checkpoint_store
            .get_checkpoint(&project.traversal_path, project_id, default_branch)
            .await
            && checkpoint.last_event_id >= push_event.event_id
            && checkpoint.last_task_id >= push_event.event_id
        {
            debug!(project_id, "already indexed, skipping reconciliation");
            metrics.record_outcome("skipped_checkpoint");
@@ -166,8 +166,8 @@ impl ProjectCodeIndexingHandler {
                    project_id,
                    branch: default_branch.to_string(),
                    traversal_path: project.traversal_path.clone(),
                    event_id: push_event.event_id,
                    commit_sha: push_event.commit_sha.clone(),
                    task_id: push_event.event_id,
                    commit_sha: Some(push_event.commit_sha.clone()),
                },
            )
            .await;
@@ -335,8 +335,8 @@ mod tests {
                traversal_path: "/org/project-123".to_string(),
                project_id: 123,
                branch: "main".to_string(),
                last_event_id: 100,
                last_commit: "abc".to_string(),
                last_task_id: 100,
                last_commit: Some("abc".to_string()),
                indexed_at: Utc::now(),
            })
            .await
Loading