Verified Commit e4fa2e61 authored by Michael Usachenko's avatar Michael Usachenko Committed by GitLab
Browse files

fix: virtual column resolution for all query types

parent 7809b60b
Loading
Loading
Loading
Loading
+36 −8
Original line number Diff line number Diff line
@@ -43,14 +43,8 @@ impl HydrationStage {
            .ok_or_else(|| PipelineError::Execution("ClickHouse client not available".into()))
    }

    /// Resolve virtual columns from remote services and merge results into
    /// the property map. Dispatches to the appropriate [`ColumnResolver`]
    /// by the `service` name declared in the ontology.
    ///
    /// Currently a no-op in practice because all virtual fields are
    /// `disabled: true` in the ontology. The full pipeline is wired up so
    /// that enabling a virtual field only requires removing the `disabled`
    /// flag and registering the service in [`ColumnResolverRegistry`].
    /// Resolve virtual columns via [`ColumnResolverRegistry`] from server
    /// extensions. Returns `ContentResolution` error if absent.
    pub async fn resolve_virtual_columns(
        ctx: &QueryPipelineContext,
        entity_virtual_columns: &[EntityVirtualColumns<'_>],
@@ -150,6 +144,26 @@ impl HydrationStage {
        Ok(())
    }

    /// Remove columns that were injected as dependencies for virtual column
    /// resolvers but not explicitly requested by the user.
    fn strip_injected_columns<'a>(
        property_map: &mut PropertyMap,
        specs: impl Iterator<Item = (&'a str, &'a Vec<String>)>,
    ) {
        for (entity_type, injected) in specs {
            if injected.is_empty() {
                continue;
            }
            for ((et, _), props) in property_map.iter_mut() {
                if et == entity_type {
                    for col in injected {
                        props.remove(col);
                    }
                }
            }
        }
    }

    async fn hydrate_static(
        ctx: &QueryPipelineContext,
        templates: &[HydrationTemplate],
@@ -494,6 +508,13 @@ impl PipelineStage for HydrationStage {
                    .collect();
                Self::resolve_virtual_columns(ctx, &entity_virtuals, &mut property_map).await?;

                Self::strip_injected_columns(
                    &mut property_map,
                    templates
                        .iter()
                        .map(|t| (t.entity_type.as_str(), &t.injected_columns)),
                );

                if !property_map.is_empty() {
                    Self::merge_static_properties(&mut query_result, &property_map, templates);
                }
@@ -525,6 +546,13 @@ impl PipelineStage for HydrationStage {
                        .collect();
                    Self::resolve_virtual_columns(ctx, &entity_virtuals, &mut property_map).await?;

                    Self::strip_injected_columns(
                        &mut property_map,
                        entity_specs
                            .iter()
                            .map(|s| (s.entity_type.as_str(), &s.injected_columns)),
                    );

                    Self::merge_dynamic_properties(&mut query_result, &property_map);
                }
            }
+1 −0
Original line number Diff line number Diff line
@@ -5,3 +5,4 @@ mod pipeline;
mod setup;
#[allow(dead_code)]
mod utils;
mod virtual_columns;
+283 −0
Original line number Diff line number Diff line
//! Virtual column handling: SQL stripping, hydration plans, depends_on injection.

use ontology::Ontology;
use query_engine::compiler::{HydrationPlan, SecurityContext, compile};

fn compile_query(json: &str) -> query_engine::compiler::CompiledQueryContext {
    let ontology = Ontology::load_embedded().unwrap();
    let security_ctx = SecurityContext::new(1, vec!["1/".into()]).unwrap();
    compile(json, &ontology, &security_ctx).unwrap()
}

#[test]
fn search_with_wildcard_excludes_virtual_columns_from_sql() {
    let compiled = compile_query(
        r#"{"query_type": "search", "node": {"id": "f", "entity": "File", "columns": "*"}, "limit": 5}"#,
    );
    let sql = &compiled.base.sql;
    assert!(
        !sql.contains("f_content") && !sql.contains("f.content"),
        "virtual column 'content' should not appear in search SQL, got:\n{sql}"
    );
    assert!(
        sql.contains("f_name") || sql.contains("f.name"),
        "normal columns should be in SQL"
    );
}

#[test]
fn search_with_explicit_content_excludes_from_sql() {
    let compiled = compile_query(
        r#"{"query_type": "search", "node": {"id": "f", "entity": "File", "columns": ["id", "name", "content"]}, "limit": 5}"#,
    );
    let sql = &compiled.base.sql;
    assert!(
        !sql.contains("f_content") && !sql.contains("f.content"),
        "explicitly requested virtual column 'content' should be stripped from search SQL"
    );
    assert!(sql.contains("f_name") || sql.contains("f.name"));
}

#[test]
fn search_with_content_produces_hydration_plan() {
    let compiled = compile_query(
        r#"{"query_type": "search", "node": {"id": "f", "entity": "File", "columns": ["id", "name", "content"]}, "limit": 5}"#,
    );
    match &compiled.hydration {
        HydrationPlan::Static(templates) => {
            assert_eq!(templates.len(), 1);
            let t = &templates[0];
            assert_eq!(t.entity_type, "File");
            assert_eq!(t.node_alias, "f");
            assert!(
                t.virtual_columns
                    .iter()
                    .any(|vc| vc.column_name == "content" && vc.service == "gitaly"),
                "search hydration plan should include content VCR"
            );
            for dep in &["project_id", "commit_sha", "branch", "path"] {
                assert!(
                    t.columns.contains(&dep.to_string()),
                    "depends_on column '{dep}' should be in search hydration plan"
                );
                // User only asked for ["id", "name", "content"], so deps are injected
                assert!(
                    t.injected_columns.contains(&dep.to_string()),
                    "'{dep}' should be in injected_columns"
                );
            }
        }
        other => {
            panic!("expected Static hydration plan for search with virtual cols, got: {other:?}")
        }
    }
}

#[test]
fn search_without_content_has_no_hydration_plan() {
    let compiled = compile_query(
        r#"{"query_type": "search", "node": {"id": "f", "entity": "File", "columns": ["id", "name", "path"]}, "limit": 5}"#,
    );
    assert!(
        matches!(&compiled.hydration, HydrationPlan::None),
        "search without virtual cols should have HydrationPlan::None, got: {:?}",
        compiled.hydration
    );
}

#[test]
fn aggregation_with_content_produces_hydration_plan() {
    let compiled = compile_query(
        r#"{
            "query_type": "aggregation",
            "nodes": [{"id": "f", "entity": "File", "columns": ["id", "content"]}],
            "aggregations": [{"function": "count", "target": "f", "alias": "total"}],
            "limit": 5
        }"#,
    );
    match &compiled.hydration {
        HydrationPlan::Static(templates) => {
            assert!(
                templates.iter().any(|t| t
                    .virtual_columns
                    .iter()
                    .any(|vc| vc.column_name == "content")),
                "aggregation hydration plan should include content VCR"
            );
        }
        other => panic!(
            "expected Static hydration plan for aggregation with virtual cols, got: {other:?}"
        ),
    }
}

#[test]
fn aggregation_with_wildcard_excludes_virtual_columns_from_sql() {
    let compiled = compile_query(
        r#"{
            "query_type": "aggregation",
            "nodes": [{"id": "f", "entity": "File", "columns": "*"}],
            "aggregations": [{"function": "count", "target": "f", "alias": "total"}],
            "limit": 5
        }"#,
    );
    let sql = &compiled.base.sql;
    assert!(
        !sql.contains("f_content") && !sql.contains("f.content"),
        "virtual column 'content' should not appear in aggregation SQL"
    );
}

#[test]
fn traversal_with_content_includes_virtual_in_hydration_plan() {
    let compiled = compile_query(
        r#"{
            "query_type": "traversal",
            "nodes": [
                {"id": "f", "entity": "File", "columns": ["id", "name", "content"]},
                {"id": "b", "entity": "Branch", "columns": ["id", "name"]}
            ],
            "relationships": [{"type": "ON_BRANCH", "from": "f", "to": "b"}],
            "limit": 5
        }"#,
    );

    let sql = &compiled.base.sql;
    assert!(
        !sql.contains("f_content") && !sql.contains("f.content"),
        "virtual column should not be in traversal base SQL"
    );

    match &compiled.hydration {
        HydrationPlan::Static(templates) => {
            let file_template = templates.iter().find(|t| t.entity_type == "File");
            assert!(
                file_template.is_some(),
                "File should have a hydration template"
            );
            let vcs = &file_template.unwrap().virtual_columns;
            assert!(
                vcs.iter()
                    .any(|vc| vc.column_name == "content" && vc.service == "gitaly"),
                "hydration plan should include content virtual column, got: {vcs:?}"
            );
        }
        other => panic!("expected Static hydration plan for traversal, got: {other:?}"),
    }
}

#[test]
fn traversal_hydration_injects_depends_on_columns() {
    let compiled = compile_query(
        r#"{
            "query_type": "traversal",
            "nodes": [
                {"id": "b", "entity": "Branch", "columns": ["id", "name"]},
                {"id": "f", "entity": "File", "columns": ["id", "content"]}
            ],
            "relationships": [{"type": "ON_BRANCH", "from": "f", "to": "b"}],
            "limit": 5
        }"#,
    );

    match &compiled.hydration {
        HydrationPlan::Static(templates) => {
            let file_template = templates.iter().find(|t| t.entity_type == "File").unwrap();
            let cols = &file_template.columns;
            for dep in &["project_id", "commit_sha", "branch", "path"] {
                assert!(
                    cols.iter().any(|c| c == dep),
                    "depends_on column '{dep}' should be auto-injected into hydration columns, got: {cols:?}"
                );
                assert!(
                    file_template.injected_columns.contains(&dep.to_string()),
                    "'{dep}' should be tracked in injected_columns for stripping"
                );
            }
        }
        other => panic!("expected Static hydration plan, got: {other:?}"),
    }
}

#[test]
fn definition_content_also_handled() {
    let compiled = compile_query(
        r#"{
            "query_type": "traversal",
            "nodes": [
                {"id": "f", "entity": "File", "columns": ["id", "path"]},
                {"id": "def", "entity": "Definition", "columns": ["id", "name", "content"]}
            ],
            "relationships": [{"type": "DEFINES", "from": "f", "to": "def"}],
            "limit": 5
        }"#,
    );

    match &compiled.hydration {
        HydrationPlan::Static(templates) => {
            let def_template = templates
                .iter()
                .find(|t| t.entity_type == "Definition")
                .unwrap();
            let vcs = &def_template.virtual_columns;
            assert!(
                vcs.iter().any(|vc| vc.column_name == "content"),
                "Definition should have content virtual column in hydration plan"
            );
            let cols = &def_template.columns;
            for dep in &[
                "project_id",
                "commit_sha",
                "branch",
                "file_path",
                "start_byte",
                "end_byte",
            ] {
                assert!(
                    cols.iter().any(|c| c == dep),
                    "depends_on column '{dep}' should be injected for Definition.content, got: {cols:?}"
                );
            }
        }
        other => panic!("expected Static hydration plan, got: {other:?}"),
    }
}

#[test]
fn multiple_virtual_entities_in_same_query() {
    let compiled = compile_query(
        r#"{
            "query_type": "traversal",
            "nodes": [
                {"id": "f", "entity": "File", "columns": ["id", "path", "content"]},
                {"id": "def", "entity": "Definition", "columns": ["id", "name", "content"]}
            ],
            "relationships": [{"type": "DEFINES", "from": "f", "to": "def"}],
            "limit": 5
        }"#,
    );

    match &compiled.hydration {
        HydrationPlan::Static(templates) => {
            let def_vcs = &templates
                .iter()
                .find(|t| t.entity_type == "Definition")
                .unwrap()
                .virtual_columns;
            let file_vcs = &templates
                .iter()
                .find(|t| t.entity_type == "File")
                .unwrap()
                .virtual_columns;
            assert!(
                def_vcs.iter().any(|vc| vc.column_name == "content"),
                "Definition missing content VC"
            );
            assert!(
                file_vcs.iter().any(|vc| vc.column_name == "content"),
                "File missing content VC"
            );
        }
        other => panic!("expected Static hydration plan, got: {other:?}"),
    }
}
+4 −0
Original line number Diff line number Diff line
@@ -228,6 +228,9 @@ pub struct InputNode {
    /// Always set before enforce.rs runs; do not add fallbacks in downstream code.
    #[serde(skip)]
    pub redaction_id_column: String,
    /// Virtual columns stripped by normalize, consumed by the hydration plan.
    #[serde(skip)]
    pub virtual_columns: Vec<crate::passes::hydrate::VirtualColumnRequest>,
}

impl Default for InputNode {
@@ -242,6 +245,7 @@ impl Default for InputNode {
            id_range: None,
            id_property: DEFAULT_PRIMARY_KEY.to_string(),
            redaction_id_column: DEFAULT_PRIMARY_KEY.to_string(),
            virtual_columns: Vec::new(),
        }
    }
}
+53 −31
Original line number Diff line number Diff line
@@ -26,31 +26,31 @@ pub enum HydrationPlan {
pub struct HydrationTemplate {
    pub entity_type: String,
    /// Alias from the base query (e.g. "u", "p"). Used to correlate hydration
    /// results back to the base query's `_gkg_{alias}_pk` (or `_gkg_{alias}_id`
    /// when PK == auth ID) column.
    /// results back to the base query's `_gkg_{alias}_pk` column.
    pub node_alias: String,
    /// ClickHouse table to query (resolved from ontology at compile time).
    pub destination_table: String,
    /// Column-backed columns to fetch from ClickHouse. Resolved at compile time
    /// from the user's explicit column selection or the ontology's default_columns,
    /// with virtual columns filtered out.
    /// DB columns to fetch from ClickHouse (user-requested columns with
    /// virtual columns filtered out, plus injected dependencies).
    pub columns: Vec<String>,
    /// Virtual columns that need to be resolved from remote services after
    /// ClickHouse hydration completes.
    pub virtual_columns: Vec<VirtualColumnRequest>,
    /// Dependency columns injected for virtual column resolvers that the
    /// user didn't explicitly request. These should be stripped from the
    /// final output after content resolution.
    pub injected_columns: Vec<String>,
}

/// Pre-resolved column spec for an entity type in dynamic hydration.
/// Built at compile time for every entity type in the ontology so the
/// server avoids runtime ontology lookups.
#[derive(Debug, Clone, PartialEq)]
pub struct DynamicEntityColumns {
    pub entity_type: String,
    pub destination_table: String,
    /// Column-backed columns to fetch from ClickHouse.
    pub columns: Vec<String>,
    /// Virtual columns that need remote resolution.
    pub virtual_columns: Vec<VirtualColumnRequest>,
    /// Columns injected as dependencies, not user-requested.
    pub injected_columns: Vec<String>,
}

/// A column that must be resolved from a remote service rather than ClickHouse.
@@ -70,22 +70,31 @@ pub struct VirtualColumnRequest {

/// Build the hydration plan for a compiled query.
///
/// - Aggregation: no hydration (results are aggregate values, not entity rows).
/// - Search: no hydration (base query already carries node columns).
/// - Traversal (edge-centric): static hydration — entity types are known at
///   compile time, so we build one template per entity type with pre-resolved
///   destination table, column-backed columns, and virtual column requests.
/// - PathFinding/Neighbors: dynamic hydration — entity types are discovered at
///   runtime. Column specs are pre-resolved for all ontology entity types so
///   the server just does a lookup, no ontology re-queries.
/// - Search/Aggregation/Traversal: static plan from input nodes. Virtual
///   columns come from `node.virtual_columns` (populated by normalize).
///   Search/Aggregation only get a plan when VCRs are present.
/// - PathFinding/Neighbors: dynamic plan over all ontology entity types.
pub fn generate_hydration_plan(input: &Input, ontology: &Ontology) -> HydrationPlan {
    match input.query_type {
        QueryType::Aggregation | QueryType::Hydration => HydrationPlan::None,
        QueryType::Hydration => HydrationPlan::None,
        QueryType::PathFinding | QueryType::Neighbors => {
            HydrationPlan::Dynamic(build_dynamic_specs(input, ontology))
        }
        QueryType::Search => HydrationPlan::None,
        QueryType::Traversal => HydrationPlan::Static(build_static_templates(input, ontology)),
        QueryType::Search | QueryType::Aggregation | QueryType::Traversal => {
            let mut templates = build_static_templates(input, ontology);

            // Search/Aggregation only need templates with VCRs.
            // Traversal needs all templates for DB-column hydration.
            if matches!(input.query_type, QueryType::Search | QueryType::Aggregation) {
                templates.retain(|t| !t.virtual_columns.is_empty());
            }

            if templates.is_empty() {
                HydrationPlan::None
            } else {
                HydrationPlan::Static(templates)
            }
        }
    }
}

@@ -97,17 +106,19 @@ fn build_static_templates(input: &Input, ontology: &Ontology) -> Vec<HydrationTe
            let entity = node.entity.as_ref()?;
            let ont_node = ontology.get_node(entity)?;

            // Normalize expands All and None into List before this pass runs.
            let Some(ColumnSelection::List(requested)) = &node.columns else {
                return None;
            };

            let (mut columns, virtual_columns) = split_columns(requested, ont_node);
            // DB-only columns (virtual already stripped by normalize).
            let mut columns: Vec<String> = requested.clone();
            let virtual_columns = node.virtual_columns.clone();

            if columns.is_empty() && virtual_columns.is_empty() {
                return None;
            }

            let injected_columns =
                inject_virtual_dependencies(&mut columns, &virtual_columns, ont_node);

            Some(HydrationTemplate {
@@ -116,6 +127,7 @@ fn build_static_templates(input: &Input, ontology: &Ontology) -> Vec<HydrationTe
                destination_table: ont_node.destination_table.clone(),
                columns,
                virtual_columns,
                injected_columns,
            })
        })
        .collect()
@@ -147,30 +159,34 @@ fn build_dynamic_specs(input: &Input, ontology: &Ontology) -> Vec<DynamicEntityC
                return None;
            }

            let (columns, virtual_columns) = split_columns(&requested, node);
            let (mut columns, virtual_columns) = split_columns(&requested, node);

            if columns.is_empty() && virtual_columns.is_empty() {
                return None;
            }

            let injected_columns =
                inject_virtual_dependencies(&mut columns, &virtual_columns, node);

            Some(DynamicEntityColumns {
                entity_type: name.to_string(),
                destination_table: node.destination_table.clone(),
                columns,
                virtual_columns,
                injected_columns,
            })
        })
        .collect()
}

/// Ensure that database-backed columns required by virtual column resolvers
/// are included in the hydration column list, even if the user didn't
/// request them. Reads `depends_on` from the ontology's `VirtualSource`.
/// Inject depends_on columns required by virtual column resolvers.
/// Returns the list of columns that were injected (not originally requested).
fn inject_virtual_dependencies(
    columns: &mut Vec<String>,
    virtual_columns: &[VirtualColumnRequest],
    node: &ontology::NodeEntity,
) {
) -> Vec<String> {
    let mut injected = Vec::new();
    for vc in virtual_columns {
        let Some(field) = node.fields.iter().find(|f| f.name == vc.column_name) else {
            continue;
@@ -183,10 +199,12 @@ fn inject_virtual_dependencies(
                    })
                {
                    columns.push(dep.clone());
                    injected.push(dep.clone());
                }
            }
        }
    }
    injected
}

/// Partition requested column names into CH-backed and virtual based on
@@ -303,11 +321,12 @@ mod tests {
        let vcs = vec![vc_req("content", "gitaly", "blob_content")];
        let mut columns = vec!["name".to_string()];

        inject_virtual_dependencies(&mut columns, &vcs, &node);
        let injected = inject_virtual_dependencies(&mut columns, &vcs, &node);

        assert!(columns.contains(&"project_id".to_string()));
        assert!(columns.contains(&"branch".to_string()));
        assert!(columns.contains(&"path".to_string()));
        assert_eq!(injected, vec!["project_id", "branch", "path"]);
    }

    #[test]
@@ -326,11 +345,13 @@ mod tests {
        let vcs = vec![vc_req("content", "gitaly", "blob_content")];
        let mut columns = vec!["project_id".to_string()];

        inject_virtual_dependencies(&mut columns, &vcs, &node);
        let injected = inject_virtual_dependencies(&mut columns, &vcs, &node);

        let count = columns.iter().filter(|c| *c == "project_id").count();
        assert_eq!(count, 1, "project_id should not be duplicated");
        assert!(columns.contains(&"branch".to_string()));
        // project_id was already present, so only branch is injected
        assert_eq!(injected, vec!["branch"]);
    }

    #[test]
@@ -339,9 +360,10 @@ mod tests {
        let vcs: Vec<VirtualColumnRequest> = vec![];
        let mut columns = vec!["name".to_string()];

        inject_virtual_dependencies(&mut columns, &vcs, &node);
        let injected = inject_virtual_dependencies(&mut columns, &vcs, &node);

        assert_eq!(columns, vec!["name".to_string()]);
        assert!(injected.is_empty());
    }

    #[test]
Loading