Verified Commit 96adcd93 authored by Michael Angelo Rivera's avatar Michael Angelo Rivera Committed by GitLab
Browse files

perf(compiler): hoist sort-key filters into aggregation dedup subquery

parent db181f67
Loading
Loading
Loading
Loading
+94 −0
Original line number Diff line number Diff line
@@ -1162,3 +1162,97 @@ fn filterable_allows_traversal_path_in_columns() {
        .is_ok()
    );
}

// ─────────────────────────────────────────────────────────────────────────────
// Aggregation filter pushdown (Bug 1 regression guard)
//
// Single-aggregate queries with a sort-key filter must push the filter into
// the LIMIT 1 BY dedup subquery so ClickHouse uses the primary-key index to
// skip granules. Without this, the dedup subquery scans the full authorized
// table before the outer countIf filters, costing ~15s on a 5,086-row count
// in production.
// ─────────────────────────────────────────────────────────────────────────────

#[test]
fn aggregation_count_pushes_project_id_into_dedup_subquery() {
    let json = r#"{
        "query_type": "aggregation",
        "nodes": [{"id": "d", "entity": "Definition",
                   "filters": {"project_id": {"op": "eq", "value": 278964}}}],
        "aggregations": [{"function": "count", "target": "d", "alias": "total"}]
    }"#;
    let result = compile(json, &embedded_ontology(), &admin_ctx()).unwrap();
    let rendered = result.base.render();

    // The countIf must contain the filter (folded as today).
    assert!(
        rendered.contains("countIf"),
        "should fold into countIf: {rendered}"
    );
    // The dedup subquery (the inner SELECT before LIMIT 1 BY) must also
    // carry the project_id filter so the granule index narrows the read.
    let inner = rendered
        .split("LIMIT 1 BY")
        .next()
        .expect("rendered SQL should contain LIMIT 1 BY");
    assert!(
        inner.contains("project_id"),
        "project_id must appear inside the LIMIT 1 BY dedup subquery: {rendered}"
    );
}

#[test]
fn pinned_traversal_narrows_joined_node_via_nf_cte() {
    // Bug 2: when one node has node_ids pinned and joins to another via an
    // edge, the joined-side node table must be narrowed to ids reachable
    // from the pinned source. Without the fix, the joined Definition table
    // dedups the full authorized scope (~tens of millions of rows on
    // production data) before the JOIN.
    let json = r#"{
        "query_type": "traversal",
        "nodes": [
            {"id": "f", "entity": "File", "node_ids": ["12345"], "columns": ["path"]},
            {"id": "d", "entity": "Definition", "columns": ["name"]}
        ],
        "relationships": [{"type": "DEFINES", "from": "f", "to": "d"}],
        "limit": 50
    }"#;
    let result = compile(json, &embedded_ontology(), &admin_ctx()).unwrap();
    let rendered = result.base.render();

    // Both _nf_f (existing) and _nf_d (new) CTEs must be present.
    assert!(
        rendered.contains("_nf_f"),
        "_nf_f CTE must be defined: {rendered}"
    );
    assert!(
        rendered.contains("_nf_d"),
        "_nf_d CTE must be derived from edge filtered by _nf_f: {rendered}"
    );
    // The Definition dedup subquery must filter by _nf_d.
    assert!(
        rendered.contains("d.id IN (SELECT id FROM _nf_d)"),
        "Definition subquery must be narrowed by _nf_d: {rendered}"
    );
}

#[test]
fn aggregation_count_in_clause_pushes_project_id() {
    let json = r#"{
        "query_type": "aggregation",
        "nodes": [{"id": "d", "entity": "Definition",
                   "filters": {"project_id": {"op": "in", "value": [69095239, 278964, 74646916]}}}],
        "aggregations": [{"function": "count", "target": "d", "alias": "total"}]
    }"#;
    let result = compile(json, &embedded_ontology(), &admin_ctx()).unwrap();
    let rendered = result.base.render();

    let inner = rendered
        .split("LIMIT 1 BY")
        .next()
        .expect("rendered SQL should contain LIMIT 1 BY");
    assert!(
        inner.contains("project_id"),
        "project_id IN must appear inside dedup subquery: {rendered}"
    );
}
+30 −0
Original line number Diff line number Diff line
@@ -561,6 +561,36 @@ mod tests {
        assert!(has_limit_by(inner));
    }

    #[test]
    fn aggregation_pushes_sort_key_filter_inside() {
        // When fold_filters_into_aggregates retains a structural conjunct in
        // the outer WHERE for a single-aggregate target, deduplicate must
        // hoist it into the LIMIT 1 BY subquery so ClickHouse can use the
        // primary-key index to skip granules. Regression guard for the
        // 411x slowdown on count(Definition where project_id=X).
        let ont = ontology();
        let mut node = Node::Query(Box::new(Query {
            select: vec![SelectExpr::new(Expr::col("mr", "id"), "id")],
            from: TableRef::scan("gl_merge_request", "mr"),
            where_clause: Some(Expr::and(
                Expr::eq(Expr::col("mr", "id"), Expr::lit(42)),
                Expr::eq(Expr::col("mr", "state"), Expr::lit("opened")),
            )),
            ..Default::default()
        }));
        deduplicate(&mut node, &input_for(QueryType::Aggregation), &ont);

        let Node::Query(q) = &node else {
            unreachable!()
        };
        let inner = find_subquery(&q.from, "mr").expect("should be wrapped");
        // id is in the sort key -- pushed inside.
        assert!(where_contains(&inner.where_clause, "\"id\""));
        // state is mutable -- stays in outer WHERE.
        assert!(!where_contains(&inner.where_clause, "state"));
        assert!(where_contains(&q.where_clause, "state"));
    }

    #[test]
    fn neighbors_wraps_node_scan() {
        let ont = ontology();
+248 −3
Original line number Diff line number Diff line
@@ -51,6 +51,9 @@ pub fn optimize(node: &mut Node, input: &mut Input) {
            if input.query_type == QueryType::Traversal && input.relationships.len() > 1 {
                cascade_node_filter_ctes(q, input);
            }
            if input.query_type == QueryType::Traversal {
                narrow_joined_nodes_via_pinned_neighbors(q, input);
            }
            if input.query_type == QueryType::Aggregation {
                apply_target_sip_prefilter(q, input);
                fold_filters_into_aggregates(q, input);
@@ -700,6 +703,19 @@ fn fold_filters_into_aggregates(q: &mut Query, input: &Input) {
        .filter(|t| !input.compiler.node_edge_col.contains_key(*t))
        .collect();

    // Count aggregations per target alias. When a single aggregation targets
    // an alias, folded conjuncts can be retained in WHERE so DeduplicatePass
    // can hoist sort-key (structural) ones into the LIMIT 1 BY subquery for
    // granule pruning. With multiple aggregations targeting the same alias
    // (e.g. countIf(state='opened') + countIf(state='closed')), per-If
    // filters disagree and a retained outer WHERE would corrupt the counts.
    let mut aggs_per_alias: HashMap<&str, usize> = HashMap::new();
    for agg in &input.aggregations {
        if let Some(t) = agg.target.as_deref() {
            *aggs_per_alias.entry(t).or_default() += 1;
        }
    }

    // Build group-by alias set to avoid folding their filters.
    let group_aliases: HashSet<&str> = input
        .aggregations
@@ -741,6 +757,17 @@ fn fold_filters_into_aggregates(q: &mut Query, input: &Input) {
        if should_keep {
            remaining.push(conjunct);
        } else if let Some(alias) = aliases.into_iter().next() {
            // Retain in WHERE when this alias has exactly one aggregation
            // target. DeduplicatePass.partition_filters will hoist sort-key
            // columns (id, project_id, traversal_path, branch) into the
            // dedup subquery's WHERE, enabling granule pruning. Mutable
            // columns stay in the outer WHERE, where they correctly
            // evaluate against the deduped row. The countIf(_, conjunct)
            // becomes redundant in this case but the cost is negligible.
            let single_target = aggs_per_alias.get(alias.as_str()).copied().unwrap_or(0) <= 1;
            if single_target {
                remaining.push(conjunct.clone());
            }
            folded_by_alias.entry(alias).or_default().push(conjunct);
        }
    }
@@ -902,6 +929,70 @@ fn cascade_node_filter_ctes(q: &mut Query, input: &Input) {
    }
}

/// For traversal queries, derive `_nf_{neighbor}` CTEs for un-pinned nodes
/// that are reachable via a single hop from a pinned node.
///
/// Without this, a query like `File[node_ids: [X]] --DEFINES--> Definition`
/// builds `_nf_f` for the pinned File but leaves the joined-side Definition
/// table unrestricted. DeduplicatePass.wrap_join_scans then dedups the full
/// authorized Definition table before the JOIN, which on production data
/// scans tens of millions of rows for a single file's ~30 definitions.
///
/// We materialize the neighbor's reachable ids in `_nf_{neighbor}` once.
/// `wrap_join_scans` (deduplicate.rs) already injects the standard
/// `neighbor.id IN (SELECT id FROM _nf_{neighbor})` filter into the
/// neighbor's dedup subquery whenever such a CTE exists.
fn narrow_joined_nodes_via_pinned_neighbors(q: &mut Query, input: &Input) {
    if input.relationships.is_empty() {
        return;
    }

    let pinned: HashSet<String> = input
        .nodes
        .iter()
        .filter(|n| !n.node_ids.is_empty())
        .map(|n| n.id.clone())
        .collect();
    if pinned.is_empty() {
        return;
    }

    for rel in &input.relationships {
        let (start_col, end_col) = rel.direction.edge_columns();

        // Choose direction: pinned -> neighbor.
        let (source_id, target_id, edge_filter_col, edge_select_col) =
            if pinned.contains(&rel.from) && !pinned.contains(&rel.to) {
                (&rel.from, &rel.to, start_col, end_col)
            } else if pinned.contains(&rel.to) && !pinned.contains(&rel.from) {
                (&rel.to, &rel.from, end_col, start_col)
            } else {
                continue;
            };

        let source_nf = node_filter_cte(source_id);
        let target_nf = node_filter_cte(target_id);

        if !q.ctes.iter().any(|c| c.name == source_nf) {
            continue;
        }
        if q.ctes.iter().any(|c| c.name == target_nf) {
            continue;
        }

        if let Some(cte_query) = build_cascade_for_node(
            input,
            target_id,
            edge_select_col,
            edge_filter_col,
            &source_nf,
            &rel.types,
        ) {
            q.ctes.push(Cte::new(&target_nf, cte_query));
        }
    }
}

/// Path hop frontier optimization.
///
/// For path-finding queries with max_depth > 2, materializes the reachable
@@ -1253,10 +1344,12 @@ mod tests {
            other => panic!("expected FuncCall, got {other:?}"),
        }

        // Group-by node filter stays in WHERE.
        // Group-by node filter stays in WHERE; target's filter is retained
        // alongside the countIf so DeduplicatePass can hoist sort-key
        // columns into the LIMIT 1 BY subquery for granule pruning.
        let where_aliases = q.where_clause.as_ref().unwrap().column_aliases();
        assert!(where_aliases.contains("p"));
        assert!(!where_aliases.contains("mr"));
        assert!(where_aliases.contains("mr"));
    }

    #[test]
@@ -1306,7 +1399,8 @@ mod tests {
            }
            other => panic!("expected countIf, got {other:?}"),
        }
        assert!(q.where_clause.is_none());
        // Single-aggregate target: filter retained in WHERE for granule pruning.
        assert!(q.where_clause.is_some());
    }

    #[test]
@@ -1365,6 +1459,46 @@ mod tests {
            other => panic!("expected countIf, got {other:?}"),
        }

        // Single-aggregate target: both conjuncts retained in WHERE alongside
        // the per-If filters, so DeduplicatePass can hoist them.
        let where_aliases = q.where_clause.as_ref().unwrap().column_aliases();
        assert!(where_aliases.contains("mr"));
    }

    #[test]
    fn multi_aggregate_does_not_retain_conjuncts() {
        // Two aggregations target the same alias with conflicting per-If
        // filters. Retaining either filter in outer WHERE would corrupt the
        // other count, so fold must REMOVE conjuncts from WHERE in this case.
        let input = agg_input(vec![
            InputAggregation {
                function: AggFunction::Count,
                target: Some("mr".to_string()),
                group_by: None,
                property: None,
                alias: Some("opened".to_string()),
            },
            InputAggregation {
                function: AggFunction::Count,
                target: Some("mr".to_string()),
                group_by: None,
                property: None,
                alias: Some("merged".to_string()),
            },
        ]);
        let mut q = Query {
            select: vec![
                SelectExpr::new(count_expr("mr", "id"), "opened"),
                SelectExpr::new(count_expr("mr", "id"), "merged"),
            ],
            from: TableRef::scan("gl_merge_request", "mr"),
            where_clause: Some(eq_filter("mr", "state", "opened")),
            ..Default::default()
        };

        fold_filters_into_aggregates(&mut q, &input);

        // Filter must NOT be retained — would corrupt the other countIf.
        assert!(q.where_clause.is_none());
    }

@@ -1443,6 +1577,117 @@ mod tests {
        assert!(q.where_clause.is_some());
    }

    #[test]
    fn pinned_traversal_creates_neighbor_nf_cte() {
        use crate::input::{Direction, InputNode, InputRelationship};

        // Source node is pinned via node_ids; target is unpinned.
        // The pass must create _nf_<target> by deriving target ids from the
        // edge filtered by _nf_<source>, so DeduplicatePass can narrow the
        // target's dedup subquery.
        let input = Input {
            query_type: QueryType::Traversal,
            nodes: vec![
                InputNode {
                    id: "f".into(),
                    entity: Some("File".into()),
                    table: Some("gl_file".into()),
                    node_ids: vec![42i64],
                    ..Default::default()
                },
                InputNode {
                    id: "d".into(),
                    entity: Some("Definition".into()),
                    table: Some("gl_definition".into()),
                    ..Default::default()
                },
            ],
            relationships: vec![InputRelationship {
                types: vec!["DEFINES".into()],
                from: "f".into(),
                to: "d".into(),
                min_hops: 1,
                max_hops: 1,
                direction: Direction::Outgoing,
                filters: Default::default(),
            }],
            ..Default::default()
        };

        // Simulate the lowerer having created _nf_f for the pinned File.
        let mut q = Query {
            select: vec![SelectExpr::new(Expr::col("d", "name"), "name")],
            from: TableRef::scan("gl_definition", "d"),
            ctes: vec![Cte::new(
                "_nf_f",
                Query {
                    select: vec![SelectExpr::new(Expr::col("f", "id"), "id")],
                    from: TableRef::scan("gl_file", "f"),
                    where_clause: Some(Expr::eq(Expr::col("f", "id"), Expr::lit(42))),
                    ..Default::default()
                },
            )],
            ..Default::default()
        };

        narrow_joined_nodes_via_pinned_neighbors(&mut q, &input);

        // _nf_d should now exist alongside _nf_f.
        assert!(
            q.ctes.iter().any(|c| c.name == "_nf_d"),
            "expected _nf_d CTE to be derived from edge filtered by _nf_f"
        );
    }

    #[test]
    fn pinned_traversal_skips_when_both_sides_pinned() {
        use crate::input::{Direction, InputNode, InputRelationship};

        // Both pinned: nothing to derive.
        let input = Input {
            query_type: QueryType::Traversal,
            nodes: vec![
                InputNode {
                    id: "f".into(),
                    entity: Some("File".into()),
                    table: Some("gl_file".into()),
                    node_ids: vec![1i64],
                    ..Default::default()
                },
                InputNode {
                    id: "d".into(),
                    entity: Some("Definition".into()),
                    table: Some("gl_definition".into()),
                    node_ids: vec![2i64],
                    ..Default::default()
                },
            ],
            relationships: vec![InputRelationship {
                types: vec!["DEFINES".into()],
                from: "f".into(),
                to: "d".into(),
                min_hops: 1,
                max_hops: 1,
                direction: Direction::Outgoing,
                filters: Default::default(),
            }],
            ..Default::default()
        };

        let mut q = Query {
            select: vec![SelectExpr::new(Expr::col("d", "name"), "name")],
            from: TableRef::scan("gl_definition", "d"),
            ..Default::default()
        };

        narrow_joined_nodes_via_pinned_neighbors(&mut q, &input);

        assert!(
            q.ctes.is_empty(),
            "no CTEs should be created when both sides are pinned"
        );
    }

    #[test]
    fn target_sip_injects_cte_for_aggregation_target_with_filters() {
        use crate::input::{Direction, InputNode, InputRelationship};