Verified Commit 5b01b40b authored by Michael Angelo Rivera's avatar Michael Angelo Rivera Committed by GitLab
Browse files

feat(duckdb): add duckdb-client crate with schema and Arrow inserts

parent c2f3ac3c
Loading
Loading
Loading
Loading
+572 −87

File changed.

Preview size limit exceeded, changes collapsed.

+2 −0
Original line number Diff line number Diff line
@@ -30,6 +30,7 @@ members = [
  "crates/integration-testkit",
  "crates/utils",
  "crates/integration-tests",
  "crates/duckdb-client",
]

[workspace.dependencies]
@@ -85,6 +86,7 @@ tracing-test = "0.2.5"
# Parser-core
parser-core = { path = "crates/code-parser" }
clickhouse-client = { path = "crates/clickhouse-client" }
duckdb = { version = "1.10501.0", features = ["bundled", "vtab-arrow", "appender-arrow"] }
gitalisk-core = { git = "https://gitlab.com/gitlab-org/rust/gitalisk.git", tag = "v0.7.0" }
thiserror = "2.0.18"
tokio-stream = "0.1.18"

config/graph_local.sql

0 → 100644
+72 −0
Original line number Diff line number Diff line
-- DuckDB schema for local code graph tables.
--
-- Mirrors the code-indexing subset of graph.sql (ClickHouse).
-- Differences:
--   - No ENGINE, CODEC, PROJECTION, INDEX, or SETTINGS clauses
--   - _version is BIGINT (not DateTime64) — local mode uses a simple counter
--   - No _deleted column — local mode does full delete-and-reinsert

CREATE TABLE IF NOT EXISTS gl_directory (
    id BIGINT NOT NULL,
    traversal_path VARCHAR NOT NULL DEFAULT '0/',
    project_id BIGINT NOT NULL,
    branch VARCHAR NOT NULL,
    path VARCHAR NOT NULL,
    name VARCHAR NOT NULL,
    _version BIGINT NOT NULL DEFAULT 0
);

CREATE TABLE IF NOT EXISTS gl_file (
    id BIGINT NOT NULL,
    traversal_path VARCHAR NOT NULL DEFAULT '0/',
    project_id BIGINT NOT NULL,
    branch VARCHAR NOT NULL,
    path VARCHAR NOT NULL,
    name VARCHAR NOT NULL,
    extension VARCHAR NOT NULL DEFAULT '',
    language VARCHAR NOT NULL DEFAULT '',
    _version BIGINT NOT NULL DEFAULT 0
);

CREATE TABLE IF NOT EXISTS gl_definition (
    id BIGINT NOT NULL,
    traversal_path VARCHAR NOT NULL DEFAULT '0/',
    project_id BIGINT NOT NULL,
    branch VARCHAR NOT NULL,
    file_path VARCHAR NOT NULL,
    fqn VARCHAR NOT NULL,
    name VARCHAR NOT NULL,
    definition_type VARCHAR NOT NULL,
    start_line BIGINT NOT NULL,
    end_line BIGINT NOT NULL,
    start_byte BIGINT NOT NULL,
    end_byte BIGINT NOT NULL,
    _version BIGINT NOT NULL DEFAULT 0
);

CREATE TABLE IF NOT EXISTS gl_imported_symbol (
    id BIGINT NOT NULL,
    traversal_path VARCHAR NOT NULL DEFAULT '0/',
    project_id BIGINT NOT NULL,
    branch VARCHAR NOT NULL,
    file_path VARCHAR NOT NULL,
    import_type VARCHAR NOT NULL,
    import_path VARCHAR NOT NULL,
    identifier_name VARCHAR,
    identifier_alias VARCHAR,
    start_line BIGINT NOT NULL,
    end_line BIGINT NOT NULL,
    start_byte BIGINT NOT NULL,
    end_byte BIGINT NOT NULL,
    _version BIGINT NOT NULL DEFAULT 0
);

CREATE TABLE IF NOT EXISTS gl_edge (
    traversal_path VARCHAR NOT NULL DEFAULT '0/',
    source_id BIGINT NOT NULL,
    source_kind VARCHAR NOT NULL,
    relationship_kind VARCHAR NOT NULL,
    target_id BIGINT NOT NULL,
    target_kind VARCHAR NOT NULL,
    _version BIGINT NOT NULL DEFAULT 0
);
+18 −0
Original line number Diff line number Diff line
[package]
name = "duckdb-client"
version.workspace = true
edition = "2024"
license = "LicenseRef-EE"

[dependencies]
duckdb = { workspace = true }
arrow = { workspace = true }
gkg-utils = { path = "../utils" }
serde_json = { workspace = true }
thiserror = { workspace = true }

[dev-dependencies]
tempfile = { workspace = true }

[lints]
workspace = true
+304 −0
Original line number Diff line number Diff line
use std::path::Path;

use arrow::record_batch::RecordBatch;
use duckdb::params;

use crate::error::{DuckDbError, Result};
use crate::schema::{CODE_GRAPH_TABLES, SCHEMA_DDL};

pub struct DuckDbClient {
    conn: duckdb::Connection,
}

impl DuckDbClient {
    pub fn open(path: &Path) -> Result<Self> {
        if let Some(parent) = path.parent() {
            std::fs::create_dir_all(parent).map_err(|e| DuckDbError::Schema(e.to_string()))?;
        }
        let conn = duckdb::Connection::open(path)?;
        Ok(Self { conn })
    }

    #[cfg(test)]
    pub(crate) fn open_in_memory() -> Result<Self> {
        let conn = duckdb::Connection::open_in_memory()?;
        Ok(Self { conn })
    }

    pub fn initialize_schema(&self) -> Result<()> {
        self.conn
            .execute_batch(SCHEMA_DDL)
            .map_err(|e| DuckDbError::Schema(e.to_string()))?;
        Ok(())
    }

    /// Bulk insert via DuckDB's Appender, which converts Arrow RecordBatch
    /// directly to DuckDB DataChunks — no SQL parsing, no vtab overhead.
    pub fn insert_arrow(&self, table: &str, batch: RecordBatch) -> Result<()> {
        if !CODE_GRAPH_TABLES.contains(&table) {
            return Err(DuckDbError::Schema(format!("unknown table: {table}")));
        }
        if batch.num_rows() == 0 {
            return Ok(());
        }
        let mut appender = self.conn.appender(table)?;
        appender.append_record_batch(batch)?;
        appender.flush()?;
        Ok(())
    }

    pub fn query_arrow(&self, sql: &str) -> Result<Vec<RecordBatch>> {
        let mut stmt = self.conn.prepare(sql)?;
        let batches = stmt.query_arrow([])?.collect();
        Ok(batches)
    }

    pub fn query_arrow_params(
        &self,
        sql: &str,
        params: &[Box<dyn duckdb::ToSql>],
    ) -> Result<Vec<RecordBatch>> {
        let mut stmt = self.conn.prepare(sql)?;
        let batches = stmt
            .query_arrow(duckdb::params_from_iter(params.iter()))?
            .collect();
        Ok(batches)
    }

    /// Deletes all data for a project/branch across node tables and edges.
    ///
    /// Edge table uses `traversal_path` for scoping (matching the ClickHouse schema
    /// where `gl_edge` has no `project_id`/`branch` columns). In local mode, each
    /// DB file is one project, so deleting by the fixed traversal path is correct.
    pub fn delete_project_data(&self, project_id: i64, branch: &str) -> Result<()> {
        for table in CODE_GRAPH_TABLES {
            if *table == "gl_edge" {
                continue;
            }
            self.conn.execute(
                &format!("DELETE FROM {table} WHERE project_id = ? AND branch = ?"),
                params![project_id, branch],
            )?;
        }
        self.conn.execute(
            "DELETE FROM gl_edge WHERE traversal_path = ?",
            params!["0/"],
        )?;
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use arrow::array::{Int64Array, StringArray};
    use arrow::datatypes::{DataType, Field, Schema};
    use std::sync::Arc;

    fn file_schema() -> Arc<Schema> {
        Arc::new(Schema::new(vec![
            Field::new("id", DataType::Int64, false),
            Field::new("traversal_path", DataType::Utf8, false),
            Field::new("project_id", DataType::Int64, false),
            Field::new("branch", DataType::Utf8, false),
            Field::new("path", DataType::Utf8, false),
            Field::new("name", DataType::Utf8, false),
            Field::new("extension", DataType::Utf8, true),
            Field::new("language", DataType::Utf8, true),
            Field::new("_version", DataType::Int64, false),
        ]))
    }

    fn make_file_batch(ids: &[i64], names: &[&str]) -> RecordBatch {
        let n = ids.len();
        RecordBatch::try_new(
            file_schema(),
            vec![
                Arc::new(Int64Array::from(ids.to_vec())),
                Arc::new(StringArray::from(vec!["0/"; n])),
                Arc::new(Int64Array::from(vec![42; n])),
                Arc::new(StringArray::from(vec!["main"; n])),
                Arc::new(StringArray::from(names.to_vec())),
                Arc::new(StringArray::from(names.to_vec())),
                Arc::new(StringArray::from(vec![Some("rs"); n])),
                Arc::new(StringArray::from(vec![Some("Rust"); n])),
                Arc::new(Int64Array::from(vec![0; n])),
            ],
        )
        .unwrap()
    }

    #[test]
    fn schema_creation_and_sql_roundtrip() {
        let client = DuckDbClient::open_in_memory().unwrap();
        client.initialize_schema().unwrap();

        client
            .conn
            .execute(
                "INSERT INTO gl_file (id, traversal_path, project_id, branch, path, name, extension, language, _version) \
                 VALUES (1, '0/', 42, 'main', 'src/lib.rs', 'lib.rs', 'rs', 'Rust', 0)",
                [],
            )
            .unwrap();

        let batches = client
            .query_arrow("SELECT id, project_id, name, language FROM gl_file")
            .unwrap();
        assert_eq!(batches.len(), 1);
        assert_eq!(batches[0].num_rows(), 1);

        let ids = batches[0]
            .column(0)
            .as_any()
            .downcast_ref::<Int64Array>()
            .unwrap();
        assert_eq!(ids.value(0), 1);

        let names = batches[0]
            .column(2)
            .as_any()
            .downcast_ref::<StringArray>()
            .unwrap();
        assert_eq!(names.value(0), "lib.rs");
    }

    #[test]
    fn appender_insert_and_query() {
        let client = DuckDbClient::open_in_memory().unwrap();
        client.initialize_schema().unwrap();

        let batch = make_file_batch(&[10, 11], &["a.rs", "b.rs"]);
        client.insert_arrow("gl_file", batch).unwrap();

        let result = client
            .query_arrow("SELECT id, name FROM gl_file ORDER BY id")
            .unwrap();
        assert_eq!(result.len(), 1);
        assert_eq!(result[0].num_rows(), 2);

        let ids = result[0]
            .column(0)
            .as_any()
            .downcast_ref::<Int64Array>()
            .unwrap();
        assert_eq!(ids.value(0), 10);
        assert_eq!(ids.value(1), 11);
    }

    #[test]
    fn large_batch_appender() {
        let client = DuckDbClient::open_in_memory().unwrap();
        client.initialize_schema().unwrap();

        let n = 5000;
        let ids: Vec<i64> = (0..n).collect();
        let names: Vec<String> = (0..n).map(|i| format!("file_{i}.rs")).collect();
        let name_refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();

        let batch = make_file_batch(&ids, &name_refs);
        client.insert_arrow("gl_file", batch).unwrap();

        let result = client
            .query_arrow("SELECT count(*) as cnt FROM gl_file")
            .unwrap();
        let count = result[0]
            .column(0)
            .as_any()
            .downcast_ref::<Int64Array>()
            .unwrap();
        assert_eq!(count.value(0), n);
    }

    #[test]
    fn delete_project_data_isolates_projects() {
        let client = DuckDbClient::open_in_memory().unwrap();
        client.initialize_schema().unwrap();

        client
            .conn
            .execute(
                "INSERT INTO gl_file (id, project_id, branch, path, name, _version) VALUES (1, 42, 'main', 'a.rs', 'a.rs', 0)",
                [],
            )
            .unwrap();
        client
            .conn
            .execute(
                "INSERT INTO gl_file (id, project_id, branch, path, name, _version) VALUES (2, 99, 'main', 'b.rs', 'b.rs', 0)",
                [],
            )
            .unwrap();

        client.delete_project_data(42, "main").unwrap();

        let batches = client.query_arrow("SELECT id FROM gl_file").unwrap();
        assert_eq!(batches[0].num_rows(), 1);

        let ids = batches[0]
            .column(0)
            .as_any()
            .downcast_ref::<Int64Array>()
            .unwrap();
        assert_eq!(ids.value(0), 2);
    }

    #[test]
    fn file_backed_persistence() {
        let dir = tempfile::tempdir().unwrap();
        let db_path = dir.path().join("test.duckdb");

        let client = DuckDbClient::open(&db_path).unwrap();
        client.initialize_schema().unwrap();
        client
            .conn
            .execute(
                "INSERT INTO gl_directory (id, project_id, branch, path, name, _version) VALUES (1, 1, 'main', 'src', 'src', 0)",
                [],
            )
            .unwrap();
        drop(client);

        let client2 = DuckDbClient::open(&db_path).unwrap();
        let batches = client2
            .query_arrow("SELECT name FROM gl_directory")
            .unwrap();
        assert_eq!(batches[0].num_rows(), 1);

        let names = batches[0]
            .column(0)
            .as_any()
            .downcast_ref::<StringArray>()
            .unwrap();
        assert_eq!(names.value(0), "src");
    }

    #[test]
    fn insert_arrow_rejects_unknown_table() {
        let client = DuckDbClient::open_in_memory().unwrap();
        client.initialize_schema().unwrap();

        let batch = make_file_batch(&[1], &["a.rs"]);
        let err = client.insert_arrow("evil_table", batch).unwrap_err();
        assert!(err.to_string().contains("unknown table"));
    }

    #[test]
    fn insert_empty_batch_is_noop() {
        let client = DuckDbClient::open_in_memory().unwrap();
        client.initialize_schema().unwrap();

        let batch = make_file_batch(&[], &[]);
        client.insert_arrow("gl_file", batch).unwrap();

        let result = client
            .query_arrow("SELECT count(*) as cnt FROM gl_file")
            .unwrap();
        let count = result[0]
            .column(0)
            .as_any()
            .downcast_ref::<Int64Array>()
            .unwrap();
        assert_eq!(count.value(0), 0);
    }
}
Loading