Verified Commit 165ea180 authored by Michael Angelo Rivera's avatar Michael Angelo Rivera Committed by GitLab
Browse files

feat(query): consolidate dynamic hydration into single UNION ALL query

parent 7bf41715
Loading
Loading
Loading
Loading
+215 −72
Original line number Diff line number Diff line
@@ -7,7 +7,8 @@ use arrow::record_batch::RecordBatch;
use clickhouse_client::{ArrowClickHouseClient, ProfilingConfig};
use futures::future::try_join_all;
use query_engine::compiler::{
    DynamicColumnMode, HydrationPlan, HydrationTemplate, QueryType, compile,
    ColumnSelection, DynamicColumnMode, HydrationPlan, HydrationTemplate, Input, InputNode,
    QueryType, compile, compile_input,
};

use gkg_utils::arrow::{ArrowUtils, ColumnValue};
@@ -31,14 +32,12 @@ type PropertyMap = HashMap<(String, i64), HashMap<String, ColumnValue>>;
pub struct HydrationStage;

impl HydrationStage {
    /// Retrieve the ClickHouse client from server extensions.
    fn client(ctx: &QueryPipelineContext) -> Result<&Arc<ArrowClickHouseClient>, PipelineError> {
        ctx.server_extensions
            .get::<Arc<ArrowClickHouseClient>>()
            .ok_or_else(|| PipelineError::Execution("ClickHouse client not available".into()))
    }

    /// Static hydration: use pre-built templates from compile time.
    async fn hydrate_static(
        ctx: &QueryPipelineContext,
        templates: &[HydrationTemplate],
@@ -72,33 +71,222 @@ impl HydrationStage {
        Ok((merged, debug_queries, executions))
    }

    /// Dynamic hydration: build search queries from scratch at runtime.
    async fn hydrate_dynamic(
    /// Consolidated dynamic hydration: builds an `Input` with one node per entity
    /// type, compiles it as `QueryType::Hydration` (which generates a UNION ALL
    /// with proper parameterization), and executes a single query.
    async fn hydrate_dynamic_consolidated(
        ctx: &QueryPipelineContext,
        refs: &HashMap<String, Vec<i64>>,
    ) -> Result<(PropertyMap, Vec<DebugQuery>, Vec<QueryExecution>), PipelineError> {
        let futures: Vec<_> = refs
        let client = Self::client(ctx)?;
        let profiling = ctx
            .server_extensions
            .get::<ProfilingConfig>()
            .cloned()
            .unwrap_or_default();
        let base_input = &ctx.compiled()?.input;

        let mut nodes = Vec::new();
        let mut total_ids: usize = 0;

        for (entity_type, ids) in refs {
            if ids.is_empty() {
                continue;
            }
            let node = ctx.ontology.get_node(entity_type).ok_or_else(|| {
                PipelineError::Execution(format!(
                    "entity type not found in ontology: {entity_type}"
                ))
            })?;

            let columns = match base_input.options.dynamic_columns {
                DynamicColumnMode::All => node
                    .fields
                    .iter()
                    .filter(|f| f.name != "_version" && f.name != "_deleted")
                    .map(|f| f.name.clone())
                    .collect::<Vec<_>>(),
                DynamicColumnMode::Default => {
                    if node.default_columns.is_empty() {
                        continue;
                    }
                    node.default_columns.clone()
                }
            };

            let capped_ids: Vec<i64> = ids
                .iter()
            .filter(|(_, ids)| !ids.is_empty())
            .map(|(entity_type, ids)| {
                let query_json = Self::build_dynamic_search_query(ctx, entity_type, ids)?;
                Ok(Self::compile_and_fetch(ctx, entity_type, query_json))
                .copied()
                .take(MAX_DYNAMIC_HYDRATION_RESULTS)
                .collect();
            total_ids += capped_ids.len();

            nodes.push(InputNode {
                id: HYDRATION_NODE_ALIAS.to_string(),
                entity: Some(entity_type.clone()),
                table: Some(node.destination_table.clone()),
                columns: Some(ColumnSelection::List(columns)),
                node_ids: capped_ids,
                ..InputNode::default()
            });
        }

        if nodes.is_empty() {
            return Ok((HashMap::new(), Vec::new(), Vec::new()));
        }

        let hydration_input = Input {
            query_type: QueryType::Hydration,
            nodes,
            limit: total_ids as u32,
            ..Input::default()
        };

        let compiled = compile_input(hydration_input, ctx.security_context()?)
            .map_err(|e| PipelineError::Compile(e.to_string()))?;

        let rendered_sql = compiled.base.render();
        let debug = DebugQuery {
            sql: compiled.base.sql.clone(),
            rendered: rendered_sql.clone(),
        };

        let (batches, execution) = if profiling.enabled {
            let http_params: Vec<(String, String)> = compiled
                .base
                .params
                .iter()
                .map(|(k, v)| (k.clone(), v.render_http_param()))
                .collect();

            let t = Instant::now();
            let (batches, query_stats) = client
                .profiler()
                .execute_with_stats(&compiled.base.sql, &http_params, &[])
                .await
                .map_err(|e| PipelineError::Execution(e.to_string()))?;
            let elapsed = t.elapsed();

            let mut execution = QueryExecution {
                label: "hydration:consolidated".into(),
                rendered_sql,
                query_id: query_stats.query_id.clone(),
                elapsed_ms: elapsed.as_secs_f64() * 1000.0,
                stats: QueryExecutionStats {
                    read_rows: query_stats.read_rows,
                    read_bytes: query_stats.read_bytes,
                    result_rows: query_stats.result_rows,
                    result_bytes: query_stats.result_bytes,
                    elapsed_ns: query_stats.elapsed_ns,
                    memory_usage: query_stats.memory_usage,
                },
                explain_plan: None,
                explain_pipeline: None,
                query_log: None,
                processors: None,
            };

            if profiling.explain {
                execution.explain_plan = client.profiler().explain_plan(&debug.rendered).await.ok();
            }

            (batches, execution)
        } else {
            let t = Instant::now();
            let mut query = client.query(&compiled.base.sql);
            for (key, param) in &compiled.base.params {
                query = ArrowClickHouseClient::bind_param(query, key, &param.value, &param.ch_type);
            }
            let batches = query
                .fetch_arrow()
                .await
                .map_err(|e| PipelineError::Execution(e.to_string()))?;
            let elapsed = t.elapsed();
            let result_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>() as u64;

            let execution = QueryExecution {
                label: "hydration:consolidated".into(),
                rendered_sql,
                query_id: String::new(),
                elapsed_ms: elapsed.as_secs_f64() * 1000.0,
                stats: QueryExecutionStats {
                    result_rows,
                    elapsed_ns: elapsed.as_nanos() as u64,
                    ..Default::default()
                },
                explain_plan: None,
                explain_pipeline: None,
                query_log: None,
                processors: None,
            };

            (batches, execution)
        };

        let props = Self::parse_consolidated_batches(&batches)?;
        Ok((props, vec![debug], vec![execution]))
    }

    fn parse_consolidated_batches(batches: &[RecordBatch]) -> Result<PropertyMap, PipelineError> {
        let alias = HYDRATION_NODE_ALIAS;
        let entity_type_col = format!("{alias}_entity_type");
        let props_col = format!("{alias}_props");
        let id_col = format!("{alias}_id");

        let mut result = HashMap::new();

        for batch in batches {
            for row_idx in 0..batch.num_rows() {
                let Some(id) = ArrowUtils::get_column::<Int64Type>(batch, &id_col, row_idx) else {
                    continue;
                };

                let row_data = ArrowUtils::extract_row(batch, row_idx);

                let entity_type = row_data
                    .iter()
                    .find(|(name, _)| name.as_str() == entity_type_col)
                    .and_then(|(_, v)| v.as_string().cloned());

                let Some(entity_type) = entity_type else {
                    continue;
                };

                let props: HashMap<String, ColumnValue> = row_data
                    .iter()
                    .find(|(name, _)| name.as_str() == props_col)
                    .and_then(|(_, v)| v.as_string())
                    .and_then(|json_str| {
                        serde_json::from_str::<HashMap<String, serde_json::Value>>(json_str).ok()
                    })
            .collect::<Result<Vec<_>, PipelineError>>()?;
                    .map(|m| {
                        m.into_iter()
                            .filter_map(|(k, v)| match v {
                                serde_json::Value::String(s) => Some((k, ColumnValue::String(s))),
                                serde_json::Value::Number(n) => {
                                    if let Some(i) = n.as_i64() {
                                        Some((k, ColumnValue::Int64(i)))
                                    } else {
                                        n.as_f64().map(|f| (k, ColumnValue::Float64(f)))
                                    }
                                }
                                serde_json::Value::Bool(b) => {
                                    Some((k, ColumnValue::String(b.to_string())))
                                }
                                serde_json::Value::Null => None,
                                _ => Some((k, ColumnValue::String(v.to_string()))),
                            })
                            .collect()
                    })
                    .unwrap_or_default();

        let results = try_join_all(futures).await?;
        let mut merged = HashMap::new();
        let mut debug_queries = Vec::new();
        let mut executions = Vec::new();
        for (props, debug, execution) in results {
            merged.extend(props);
            debug_queries.push(debug);
            executions.push(execution);
                result.insert((entity_type, id), props);
            }
        Ok((merged, debug_queries, executions))
        }

    /// Compile a hydration query JSON string, execute it, and parse the results.
        Ok(result)
    }

    async fn compile_and_fetch(
        ctx: &QueryPipelineContext,
        entity_type: &str,
@@ -196,7 +384,6 @@ impl HydrationStage {
        Ok((props, debug, execution))
    }

    /// Collect entity IDs for a static template from `_gkg_{alias}_id` columns.
    fn collect_static_ids(result: &QueryResult, template: &HydrationTemplate) -> Vec<i64> {
        let id_column = redaction_id_column(&template.node_alias);
        let mut ids: Vec<i64> = result
@@ -208,7 +395,6 @@ impl HydrationStage {
        ids
    }

    /// Merge static hydration results back into rows as flat columns.
    fn merge_static_properties(
        result: &mut QueryResult,
        property_map: &PropertyMap,
@@ -228,7 +414,6 @@ impl HydrationStage {
        }
    }

    /// Collect unique entity (type, id) pairs from dynamic_nodes across all authorized rows.
    fn extract_dynamic_refs(result: &QueryResult) -> HashMap<String, Vec<i64>> {
        let mut refs: HashMap<String, Vec<i64>> = HashMap::new();

@@ -248,7 +433,6 @@ impl HydrationStage {
        refs
    }

    /// Merge dynamic hydration results into NodeRef.properties on dynamic_nodes.
    fn merge_dynamic_properties(result: &mut QueryResult, property_map: &PropertyMap) {
        for row in result.authorized_rows_mut() {
            for node_ref in row.dynamic_nodes_mut() {
@@ -260,48 +444,6 @@ impl HydrationStage {
        }
    }

    /// Build a search query JSON from scratch for dynamic hydration.
    /// Reads `input.options.dynamic_columns` to decide whether to fetch all columns
    /// or only the entity's `default_columns` from the ontology.
    fn build_dynamic_search_query(
        ctx: &QueryPipelineContext,
        entity_type: &str,
        ids: &[i64],
    ) -> Result<String, PipelineError> {
        let input = &ctx.compiled()?.input;
        let node = ctx.ontology.get_node(entity_type).ok_or_else(|| {
            PipelineError::Execution(format!(
                "entity type not found in ontology during dynamic hydration: {entity_type}"
            ))
        })?;

        let columns: serde_json::Value = match input.options.dynamic_columns {
            DynamicColumnMode::All => serde_json::json!("*"),
            DynamicColumnMode::Default => {
                if node.default_columns.is_empty() {
                    return Err(PipelineError::Execution(format!(
                        "no default_columns defined for {entity_type}"
                    )));
                }
                serde_json::json!(node.default_columns)
            }
        };

        let query_json = serde_json::json!({
            "query_type": QueryType::Search.to_string(),
            "node": {
                "id": HYDRATION_NODE_ALIAS,
                "entity": entity_type,
                "columns": columns,
                "node_ids": ids
            },
            "limit": ids.len().min(MAX_DYNAMIC_HYDRATION_RESULTS)
        })
        .to_string();

        Ok(query_json)
    }

    fn parse_property_batches(
        entity_type: &str,
        batches: &[RecordBatch],
@@ -382,7 +524,8 @@ impl PipelineStage for HydrationStage {
            HydrationPlan::Dynamic => {
                let refs = Self::extract_dynamic_refs(&query_result);
                if !refs.is_empty() {
                    let (property_map, debug, executions) = Self::hydrate_dynamic(ctx, &refs)
                    let (property_map, debug, executions) =
                        Self::hydrate_dynamic_consolidated(ctx, &refs)
                            .await
                            .inspect_err(|e| obs.record_error(e))?;
                    hydration_queries = debug;
+1 −0
Original line number Diff line number Diff line
@@ -111,6 +111,7 @@ impl QueryRequirements for Input {
            QueryType::Traversal | QueryType::Search => {
                reqs.insert(Requirement::NodeCount);
            }
            QueryType::Hydration => {}
        }

        // Traversal queries with joins produce edges the test must verify per type.
+200 −0
Original line number Diff line number Diff line
@@ -467,6 +467,23 @@ async fn traversal_produces_no_hydration_plan(_ctx: &TestContext) {
    );
}

async fn hydration_query_type_rejected_from_user_input(_ctx: &TestContext) {
    let ontology = load_ontology();
    let security_ctx = test_security_context();

    let json = r#"{
        "query_type": "hydration",
        "node": {"id": "h", "entity": "User", "node_ids": [1]},
        "limit": 10
    }"#;

    let result = compile(json, &ontology, &security_ctx);
    assert!(
        result.is_err(),
        "hydration query type must be rejected when submitted via user-facing compile(): {result:?}"
    );
}

// ─────────────────────────────────────────────────────────────────────────────
// Full Pipeline: Redact → Hydrate
// ─────────────────────────────────────────────────────────────────────────────
@@ -622,6 +639,183 @@ async fn path_finding_all_denied_then_hydrate(ctx: &TestContext) {
    );
}

// ─────────────────────────────────────────────────────────────────────────────
// Consolidated Hydration Data Correctness
// ─────────────────────────────────────────────────────────────────────────────

/// The consolidated path User→Group→Project hydrates all three entity types
/// in a single UNION ALL query. Verify each node has the correct property values.
async fn consolidated_path_hydrates_all_entity_types(ctx: &TestContext) {
    let (ontology, client) = make_test_resources(ctx);
    let security_ctx = test_security_context();

    let json = r#"{
        "query_type": "path_finding",
        "nodes": [
            {"id": "start", "entity": "User", "node_ids": [1]},
            {"id": "end", "entity": "Project", "node_ids": [1000]}
        ],
        "path": {"type": "shortest", "from": "start", "to": "end", "max_depth": 3}
    }"#;

    let (result, _ctx_ref, _plan) =
        compile_execute_hydrate(ctx, json, &ontology, &security_ctx, &client).await;

    let row = result.authorized_rows().next().expect("should have a path");
    let nodes = row.path_nodes();
    assert_eq!(nodes.len(), 3, "path should be User→Group→Project");

    let entity_types: Vec<&str> = nodes.iter().map(|n| n.entity_type.as_str()).collect();
    assert_eq!(entity_types, vec!["User", "Group", "Project"]);

    assert_eq!(
        nodes[0]
            .properties
            .get("username")
            .and_then(|v| v.as_string().map(|s| s.as_str())),
        Some("alice")
    );
    assert_eq!(
        nodes[1]
            .properties
            .get("name")
            .and_then(|v| v.as_string().map(|s| s.as_str())),
        Some("Public Group")
    );
    assert_eq!(
        nodes[2]
            .properties
            .get("name")
            .and_then(|v| v.as_string().map(|s| s.as_str())),
        Some("Public Project")
    );
}

/// Null values from ClickHouse (e.g. full_path=NULL) should be filtered out,
/// not appear as empty strings or crash the parser.
async fn consolidated_hydration_filters_null_properties(ctx: &TestContext) {
    let (ontology, client) = make_test_resources(ctx);
    let security_ctx = test_security_context();

    let json = r#"{
        "query_type": "neighbors",
        "node": {"id": "u", "entity": "User", "node_ids": [1]},
        "neighbors": {"node": "u", "direction": "outgoing"}
    }"#;

    let (result, _ctx_ref, _plan) =
        compile_execute_hydrate(ctx, json, &ontology, &security_ctx, &client).await;

    for row in result.authorized_rows() {
        let neighbor = row.neighbor_node().expect("should have neighbor");
        for (key, value) in &neighbor.properties {
            assert!(
                !value
                    .as_string()
                    .is_some_and(|s| s == "null" || s.is_empty()),
                "property '{key}' should not be null or empty string, got: {value:?}"
            );
        }
    }
}

/// When multiple IDs of the same entity type need hydration, all should be returned.
async fn consolidated_hydration_multiple_ids_same_type(ctx: &TestContext) {
    let (ontology, client) = make_test_resources(ctx);
    let security_ctx = test_security_context();

    // Both users 1 and 2 are members of groups, producing a path through both
    let json = r#"{
        "query_type": "path_finding",
        "nodes": [
            {"id": "start", "entity": "User", "node_ids": [1, 2]},
            {"id": "end", "entity": "Project", "node_ids": [1000, 1001]}
        ],
        "path": {"type": "shortest", "from": "start", "to": "end", "max_depth": 3}
    }"#;

    let (result, _ctx_ref, _plan) =
        compile_execute_hydrate(ctx, json, &ontology, &security_ctx, &client).await;

    assert!(
        result.authorized_count() >= 2,
        "should find paths for both users"
    );

    let user_ids: HashSet<i64> = result
        .authorized_rows()
        .filter_map(|r| r.path_nodes().first().map(|n| n.id))
        .collect();
    assert!(user_ids.contains(&1), "User 1 path should exist");
    assert!(user_ids.contains(&2), "User 2 path should exist");

    for row in result.authorized_rows() {
        for node in row.path_nodes() {
            assert!(
                !node.properties.is_empty(),
                "node {} ({}) should have hydrated properties",
                node.id,
                node.entity_type
            );
        }
    }
}

/// Verify that the consolidated query produces exactly one ClickHouse query
/// for hydration, regardless of how many entity types are discovered.
async fn consolidated_hydration_single_query_execution(ctx: &TestContext) {
    let (ontology, client) = make_test_resources(ctx);
    let security_ctx = test_security_context();

    let json = r#"{
        "query_type": "path_finding",
        "nodes": [
            {"id": "start", "entity": "User", "node_ids": [1]},
            {"id": "end", "entity": "Project", "node_ids": [1000]}
        ],
        "path": {"type": "shortest", "from": "start", "to": "end", "max_depth": 3}
    }"#;

    let compiled = compile(json, &ontology, &security_ctx).unwrap();
    let batches = ctx.query_parameterized(&compiled.base).await;
    let result = QueryResult::from_batches(&batches, &compiled.base.result_context);

    let redaction_output = RedactionOutput {
        query_result: result,
        redacted_count: 0,
    };

    let mut server_extensions = TypeMap::default();
    server_extensions.insert(Arc::clone(&client));
    let mut pipeline_ctx = QueryPipelineContext {
        query_json: String::new(),
        compiled: Some(Arc::new(compiled)),
        ontology: Arc::clone(&ontology),
        security_context: Some(security_ctx.clone()),
        server_extensions,
        phases: TypeMap::default(),
    };
    pipeline_ctx.phases.insert(redaction_output);

    let mut obs = NoOpObserver;
    let output = HydrationStage
        .execute(&mut pipeline_ctx, &mut obs)
        .await
        .expect("hydration should succeed");

    assert_eq!(
        output.hydration_queries.len(),
        1,
        "consolidated hydration should produce exactly 1 debug query entry, got {}",
        output.hydration_queries.len()
    );
    assert!(
        output.hydration_queries[0].sql.contains("UNION ALL")
            || output.hydration_queries[0].rendered.contains("UNION ALL"),
        "the single hydration query should be a UNION ALL"
    );
}

// ─────────────────────────────────────────────────────────────────────────────
// Orchestrator
// ─────────────────────────────────────────────────────────────────────────────
@@ -644,9 +838,15 @@ async fn hydration_integration() {
        // hydration plan selection
        search_produces_no_hydration_plan,
        traversal_produces_no_hydration_plan,
        hydration_query_type_rejected_from_user_input,
        // full pipeline: redact then hydrate
        path_finding_hydration_after_partial_redaction,
        neighbors_hydration_after_partial_redaction,
        path_finding_all_denied_then_hydrate,
        // consolidated hydration data correctness
        consolidated_path_hydrates_all_entity_types,
        consolidated_hydration_filters_null_properties,
        consolidated_hydration_multiple_ids_same_type,
        consolidated_hydration_single_query_execution,
    );
}
+1 −1
Original line number Diff line number Diff line
@@ -139,7 +139,7 @@ pub fn enforce_return(node: &mut Node, input: &Input) -> Result<ResultContext> {
        QueryType::Traversal | QueryType::Search | QueryType::Neighbors => {
            input.nodes.iter().map(|n| n.id.as_str()).collect()
        }
        QueryType::PathFinding => HashSet::new(),
        QueryType::PathFinding | QueryType::Hydration => HashSet::new(),
    };

    match node {
+5 −0
Original line number Diff line number Diff line
@@ -151,6 +151,11 @@ pub enum QueryType {
    PathFinding,
    Search,
    Neighbors,
    /// Internal-only: consolidated hydration for multiple entity types.
    /// Generates a UNION ALL of search-like arms, one per node. Skips
    /// security context injection (IDs are pre-authorized by the pipeline).
    #[serde(skip)]
    Hydration,
}

// ─────────────────────────────────────────────────────────────────────────────
Loading