Verified Commit 5660a250 authored by Michael Usachenko's avatar Michael Usachenko Committed by GitLab
Browse files

feat(querying): add options field to json queries, apply it to dynamic hydration

parent 6ae6ecfa
Loading
Loading
Loading
Loading
+31 −14
Original line number Diff line number Diff line
@@ -6,7 +6,9 @@ use arrow::record_batch::RecordBatch;
use clickhouse_client::ArrowClickHouseClient;
use futures::future::try_join_all;
use ontology::Ontology;
use query_engine::{HydrationPlan, HydrationTemplate, SecurityContext, compile};
use query_engine::{
    DynamicColumnMode, HydrationPlan, HydrationTemplate, Input, QueryType, SecurityContext, compile,
};

use crate::redaction::{ColumnValue, QueryResult, RedactionMessage};

@@ -17,7 +19,9 @@ use super::super::types::{
};
use super::PipelineStage;

use query_engine::constants::{GKG_COLUMN_PREFIX, HYDRATION_NODE_ALIAS, redaction_id_column};
use query_engine::constants::{
    GKG_COLUMN_PREFIX, HYDRATION_NODE_ALIAS, MAX_DYNAMIC_HYDRATION_RESULTS, redaction_id_column,
};

type PropertyMap = HashMap<(String, i64), HashMap<String, ColumnValue>>;

@@ -64,13 +68,14 @@ impl HydrationStage {
        client: &ArrowClickHouseClient,
        ontology: &Ontology,
        refs: &HashMap<String, Vec<i64>>,
        input: &Input,
        security_context: &SecurityContext,
    ) -> Result<PropertyMap, PipelineError> {
        let futures: Vec<_> = refs
            .iter()
            .filter(|(_, ids)| !ids.is_empty())
            .map(|(entity_type, ids)| {
                let query_json = build_dynamic_search_query(entity_type, ids, ontology)?;
                let query_json = build_dynamic_search_query(entity_type, ids, input, ontology)?;
                Ok(compile_and_fetch(
                    client,
                    ontology,
@@ -160,6 +165,7 @@ impl<M: RedactionMessage> PipelineStage<M> for HydrationStage {
                            &ctx.client,
                            &ctx.ontology,
                            &refs,
                            &ctx.compiled()?.input,
                            ctx.security_context()?,
                        )
                        .await,
@@ -247,30 +253,41 @@ fn merge_dynamic_properties(result: &mut QueryResult, property_map: &PropertyMap

/// Build a search query JSON from scratch for dynamic hydration.
/// Only used when entity types are discovered at runtime (PathFinding, Neighbors).
/// Reads `input.options.dynamic_columns` to decide whether to fetch all columns
/// or only the entity's `default_columns` from the ontology.
fn build_dynamic_search_query(
    entity_type: &str,
    ids: &[i64],
    input: &Input,
    ontology: &ontology::Ontology,
) -> Result<String, PipelineError> {
    let columns: serde_json::Value = ontology
        .get_node(entity_type)
        .filter(|n| !n.default_columns.is_empty())
        .map(|n| serde_json::json!(n.default_columns))
        .ok_or_else(|| {
    let node = ontology.get_node(entity_type).ok_or_else(|| {
        PipelineError::Execution(format!(
                "entity type not found in ontology or has no default_columns during dynamic hydration: {entity_type}"
            "entity type not found in ontology during dynamic hydration: {entity_type}"
        ))
    })?;

    let columns: serde_json::Value = match input.options.dynamic_columns {
        DynamicColumnMode::All => serde_json::json!("*"),
        DynamicColumnMode::Default => {
            if node.default_columns.is_empty() {
                return Err(PipelineError::Execution(format!(
                    "no default_columns defined for {entity_type}"
                )));
            }
            serde_json::json!(node.default_columns)
        }
    };

    let query_json = serde_json::json!({
        "query_type": "search",
        "query_type": QueryType::Search.to_string(),
        "node": {
            "id": HYDRATION_NODE_ALIAS,
            "entity": entity_type,
            "columns": columns,
            "node_ids": ids
        },
        "limit": 1000
        "limit": ids.len().min(MAX_DYNAMIC_HYDRATION_RESULTS)
    })
    .to_string();

+2 −2
Original line number Diff line number Diff line
@@ -80,8 +80,8 @@ mod tests {
        let condensed = condensed_query_schema().expect("Should condense");

        assert!(
            condensed.len() < 15000,
            "Condensed schema should be under 15KB, got {} bytes",
            condensed.len() < 16000,
            "Condensed schema should be under 16KB, got {} bytes",
            condensed.len()
        );
    }
+22 −0
Original line number Diff line number Diff line
@@ -79,6 +79,10 @@
    "aggregation_sort": {
      "$ref": "#/$defs/AggregationSort",
      "description": "Ordering for aggregation outputs"
    },
    "options": {
      "$ref": "#/$defs/QueryOptions",
      "description": "Consumer-level preferences that affect result presentation, not query semantics."
    }
  },
  "allOf": [
@@ -601,6 +605,24 @@
      "description": "Sort direction",
      "enum": ["ASC", "DESC"],
      "default": "ASC"
    },
    "QueryOptions": {
      "type": "object",
      "description": "Consumer-level preferences that affect result presentation, not query semantics.",
      "properties": {
        "dynamic_columns": {
          "$ref": "#/$defs/DynamicColumnMode",
          "description": "Columns fetched for dynamically-discovered entities (PathFinding, Neighbors hydration). '*' returns all columns; 'default' returns the entity's default_columns from the ontology. Defaults to 'default'.",
          "default": "default"
        }
      },
      "additionalProperties": false
    },
    "DynamicColumnMode": {
      "type": "string",
      "description": "Controls column selection for dynamically-discovered entities during hydration.",
      "enum": ["*", "default"],
      "default": "default"
    }
  }
}
+3 −0
Original line number Diff line number Diff line
@@ -46,3 +46,6 @@ pub fn redaction_type_column(alias: &str) -> String {
/// Node alias used in synthetic hydration search queries.
/// `parse_property_batches` strips this prefix so consumers see clean keys.
pub const HYDRATION_NODE_ALIAS: &str = "hydrate";

/// Upper bound on rows fetched per entity type during dynamic hydration.
pub const MAX_DYNAMIC_HYDRATION_RESULTS: usize = 1000;
+79 −1
Original line number Diff line number Diff line
@@ -11,6 +11,28 @@ use std::collections::HashMap;
// Top-level input
// ─────────────────────────────────────────────────────────────────────────────

/// Controls which columns are fetched for dynamically-discovered entities
/// during hydration (PathFinding, Neighbors).
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Deserialize)]
pub enum DynamicColumnMode {
    /// Fetch all columns from the ontology for each entity type.
    #[serde(rename = "*")]
    All,
    /// Fetch only the entity's `default_columns` from the ontology.
    #[default]
    #[serde(rename = "default")]
    Default,
}

/// Consumer-level preferences that affect result presentation, not query semantics.
#[derive(Debug, Clone, Default, Deserialize)]
pub struct QueryOptions {
    /// Columns fetched for dynamically-discovered entities during hydration.
    /// `All` returns every column; `Default` returns the entity's `default_columns`.
    #[serde(default)]
    pub dynamic_columns: DynamicColumnMode,
}

/// Authorization config for an entity type, derived from the ontology and carried
/// through the compilation pipeline so the server never re-consults the ontology at
/// request time.
@@ -45,6 +67,8 @@ pub struct Input {
    pub range: Option<InputRange>,
    pub order_by: Option<InputOrderBy>,
    pub aggregation_sort: Option<InputAggSort>,
    #[serde(default)]
    pub options: QueryOptions,
    /// Auth config for every entity type with redaction configured. Populated by
    /// normalization; covers all ontology entities (not just those in this query)
    /// so dynamic nodes (path/neighbors) can be resolved without re-consulting the ontology.
@@ -65,6 +89,7 @@ impl Default for Input {
            range: None,
            order_by: None,
            aggregation_sort: None,
            options: QueryOptions::default(),
            entity_auth: HashMap::new(),
        }
    }
@@ -106,7 +131,7 @@ pub struct InputRange {
    pub end: u32,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, strum::IntoStaticStr)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, strum::Display, strum::IntoStaticStr)]
#[serde(rename_all = "snake_case")]
#[strum(serialize_all = "snake_case")]
pub enum QueryType {
@@ -673,4 +698,57 @@ mod tests {
        assert!(input.range.is_none());
        assert_eq!(input.limit, 30);
    }

    #[test]
    fn options_default_when_omitted() {
        let input =
            parse_input(r#"{"query_type": "search", "node": {"id": "u", "entity": "User"}}"#)
                .unwrap();

        assert_eq!(input.options.dynamic_columns, DynamicColumnMode::Default);
    }

    #[test]
    fn options_dynamic_columns_all() {
        let input = parse_input(
            r#"{
            "query_type": "neighbors",
            "node": {"id": "u", "entity": "User", "node_ids": [1]},
            "neighbors": {"node": "u"},
            "options": {"dynamic_columns": "*"}
        }"#,
        )
        .unwrap();

        assert_eq!(input.options.dynamic_columns, DynamicColumnMode::All);
    }

    #[test]
    fn options_dynamic_columns_default() {
        let input = parse_input(
            r#"{
            "query_type": "neighbors",
            "node": {"id": "u", "entity": "User", "node_ids": [1]},
            "neighbors": {"node": "u"},
            "options": {"dynamic_columns": "default"}
        }"#,
        )
        .unwrap();

        assert_eq!(input.options.dynamic_columns, DynamicColumnMode::Default);
    }

    #[test]
    fn options_empty_object_uses_defaults() {
        let input = parse_input(
            r#"{
            "query_type": "search",
            "node": {"id": "u", "entity": "User"},
            "options": {}
        }"#,
        )
        .unwrap();

        assert_eq!(input.options.dynamic_columns, DynamicColumnMode::Default);
    }
}
Loading