Loading crates/query-engine/compiler/src/passes/deduplicate.rs +153 −15 Original line number Diff line number Diff line Loading @@ -76,13 +76,26 @@ fn dedup_nf_cte(q: &mut Query, input: &Input, ontology: &Ontology) { let selects_traversal_path = select .iter() .any(|expr| expr.alias.as_deref() == Some(TRAVERSAL_PATH_COLUMN)); if selects_traversal_path { // When the CTE is fed by a cascade (WHERE id IN (SELECT id FROM // _cascade_*)), use [id] as the sort key so the ORDER BY becomes // `id ASC, _version DESC`. This lets ClickHouse use the by_id // projection instead of the main table's primary key order, // avoiding a full-table sort on traversal_path. let has_cascade = q .where_clause .as_ref() .is_some_and(|w| w.contains_in_subquery()); if selects_traversal_path && !has_cascade { apply_limit_by_dedup_with_inner_filters( &mut q.from, &mut q.where_clause, &table, ontology, ); } else if has_cascade { apply_limit_by_dedup_id_only(&mut q.from, &mut q.where_clause, selects_traversal_path); } else { apply_limit_by_dedup(&mut q.from, &mut q.where_clause, &table, ontology); } Loading Loading @@ -360,6 +373,27 @@ fn make_dedup_subquery( ) } /// Cascade-optimized dedup: uses `ORDER BY id, _version DESC` so ClickHouse /// can pick the `by_id` projection instead of the main table's primary key /// order. All WHERE filters are pushed inside since cascade CTEs have an /// `id IN (...)` filter that already restricts the scan. fn apply_limit_by_dedup_id_only( from: &mut TableRef, where_clause: &mut Option<Expr>, push_all_filters_inside: bool, ) { let (table_name, alias) = match from { TableRef::Scan { table, alias, .. } => (table.clone(), alias.clone()), _ => return, }; let id_sort_key: &[String] = &[DEFAULT_PRIMARY_KEY.to_string()]; if push_all_filters_inside { wrap_scan_with_limit_by_inner_filters(from, where_clause, table_name, alias, id_sort_key); } else { wrap_scan_with_limit_by(from, where_clause, table_name, alias, None, id_sort_key); } } fn apply_limit_by_dedup( from: &mut TableRef, where_clause: &mut Option<Expr>, Loading Loading @@ -447,21 +481,37 @@ fn wrap_join_scans( // _ce.source_id that don't exist in the dedup subquery scope). // Cascade-derived CTEs are identified by containing InSubquery // anywhere in their WHERE tree. let nf_filter = cte_filters.get(&nf_cte).and_then(|cte_where| { let cte_where = cte_where.as_ref()?; let (nf_filter, is_cascade) = cte_filters.get(&nf_cte).map_or((None, false), |cte_where| { let Some(cte_where) = cte_where.as_ref() else { return (None, false); }; if cte_where.contains_in_subquery() { // Cascade-derived: fall back to CTE reference ( Some(Expr::InSubquery { expr: Box::new(Expr::col(&alias_str, DEFAULT_PRIMARY_KEY)), cte_name: nf_cte, column: DEFAULT_PRIMARY_KEY.to_string(), }) }), true, ) } else { // Lowerer-created: inline the WHERE conditions Some(cte_where.clone()) (Some(cte_where.clone()), false) } }); // When cascade-derived, use [id] as sort key so ClickHouse // picks the by_id projection instead of the main table order. let effective_sort_key: Vec<String>; let sort_key = if is_cascade { effective_sort_key = vec![DEFAULT_PRIMARY_KEY.to_string()]; &effective_sort_key } else { sort_key }; wrap_scan_with_limit_by( from, where_clause, Loading Loading @@ -1022,6 +1072,94 @@ mod tests { assert!(arm2.having.is_some(), "second arm should have HAVING"); } #[test] fn cascade_fed_nf_cte_uses_id_only_sort_key() { let ont = ontology(); let mut node = Node::Query(Box::new(Query { ctes: vec![Cte::new( "_nf_mr", Query { select: vec![SelectExpr::new(Expr::col("mr", "id"), "id")], from: TableRef::scan("gl_merge_request", "mr"), where_clause: Some(Expr::and( Expr::func( "startsWith", vec![ Expr::col("mr", TRAVERSAL_PATH_COLUMN), Expr::string("1/100/"), ], ), Expr::InSubquery { expr: Box::new(Expr::col("mr", "id")), cte_name: "_cascade_mr".to_string(), column: "id".to_string(), }, )), ..Default::default() }, )], select: vec![SelectExpr::new(Expr::col("b", "id"), "id")], from: TableRef::scan("_nf_mr", "b"), ..Default::default() })); deduplicate(&mut node, &input_for(QueryType::Traversal), &ont); let Node::Query(q) = &node else { unreachable!() }; let cte_q = &q.ctes[0].query; let inner = find_subquery(&cte_q.from, "mr").expect("CTE scan should be wrapped"); assert!(has_limit_by(inner), "inner should have LIMIT 1 BY"); // CASCADE-FED: ORDER BY should be [id ASC, _version DESC], NOT // [traversal_path ASC, id ASC, _version DESC]. This lets // ClickHouse use the by_id projection. assert_eq!( inner.order_by.len(), 2, "should have exactly 2 ORDER BY columns (id, _version)" ); let first = &inner.order_by[0]; assert!( matches!(&first.expr, Expr::Column { column, .. } if column == "id"), "first ORDER BY should be id" ); assert!(!first.desc, "id should be ASC"); let second = &inner.order_by[1]; assert!(second.desc, "second ORDER BY should be _version DESC"); } #[test] fn non_cascade_nf_cte_uses_full_sort_key() { let ont = ontology(); let mut node = Node::Query(Box::new(Query { ctes: vec![Cte::new( "_nf_mr", Query { select: vec![SelectExpr::new(Expr::col("mr", "id"), "id")], from: TableRef::scan("gl_merge_request", "mr"), where_clause: Some(Expr::eq(Expr::col("mr", "state"), Expr::string("merged"))), ..Default::default() }, )], select: vec![SelectExpr::new(Expr::col("b", "id"), "id")], from: TableRef::scan("_nf_mr", "b"), ..Default::default() })); deduplicate(&mut node, &input_for(QueryType::Traversal), &ont); let Node::Query(q) = &node else { unreachable!() }; let cte_q = &q.ctes[0].query; let inner = find_subquery(&cte_q.from, "mr").expect("CTE scan should be wrapped"); assert!(has_limit_by(inner), "inner should have LIMIT 1 BY"); // NON-CASCADE: ORDER BY should include traversal_path (full sort key) assert!( inner.order_by.len() > 2, "non-cascade should have full sort key + _version" ); } #[test] fn gl_table_regex_matches_prefixed_and_unprefixed() { assert!(GL_TABLE_RE.is_match("gl_user")); Loading Loading
crates/query-engine/compiler/src/passes/deduplicate.rs +153 −15 Original line number Diff line number Diff line Loading @@ -76,13 +76,26 @@ fn dedup_nf_cte(q: &mut Query, input: &Input, ontology: &Ontology) { let selects_traversal_path = select .iter() .any(|expr| expr.alias.as_deref() == Some(TRAVERSAL_PATH_COLUMN)); if selects_traversal_path { // When the CTE is fed by a cascade (WHERE id IN (SELECT id FROM // _cascade_*)), use [id] as the sort key so the ORDER BY becomes // `id ASC, _version DESC`. This lets ClickHouse use the by_id // projection instead of the main table's primary key order, // avoiding a full-table sort on traversal_path. let has_cascade = q .where_clause .as_ref() .is_some_and(|w| w.contains_in_subquery()); if selects_traversal_path && !has_cascade { apply_limit_by_dedup_with_inner_filters( &mut q.from, &mut q.where_clause, &table, ontology, ); } else if has_cascade { apply_limit_by_dedup_id_only(&mut q.from, &mut q.where_clause, selects_traversal_path); } else { apply_limit_by_dedup(&mut q.from, &mut q.where_clause, &table, ontology); } Loading Loading @@ -360,6 +373,27 @@ fn make_dedup_subquery( ) } /// Cascade-optimized dedup: uses `ORDER BY id, _version DESC` so ClickHouse /// can pick the `by_id` projection instead of the main table's primary key /// order. All WHERE filters are pushed inside since cascade CTEs have an /// `id IN (...)` filter that already restricts the scan. fn apply_limit_by_dedup_id_only( from: &mut TableRef, where_clause: &mut Option<Expr>, push_all_filters_inside: bool, ) { let (table_name, alias) = match from { TableRef::Scan { table, alias, .. } => (table.clone(), alias.clone()), _ => return, }; let id_sort_key: &[String] = &[DEFAULT_PRIMARY_KEY.to_string()]; if push_all_filters_inside { wrap_scan_with_limit_by_inner_filters(from, where_clause, table_name, alias, id_sort_key); } else { wrap_scan_with_limit_by(from, where_clause, table_name, alias, None, id_sort_key); } } fn apply_limit_by_dedup( from: &mut TableRef, where_clause: &mut Option<Expr>, Loading Loading @@ -447,21 +481,37 @@ fn wrap_join_scans( // _ce.source_id that don't exist in the dedup subquery scope). // Cascade-derived CTEs are identified by containing InSubquery // anywhere in their WHERE tree. let nf_filter = cte_filters.get(&nf_cte).and_then(|cte_where| { let cte_where = cte_where.as_ref()?; let (nf_filter, is_cascade) = cte_filters.get(&nf_cte).map_or((None, false), |cte_where| { let Some(cte_where) = cte_where.as_ref() else { return (None, false); }; if cte_where.contains_in_subquery() { // Cascade-derived: fall back to CTE reference ( Some(Expr::InSubquery { expr: Box::new(Expr::col(&alias_str, DEFAULT_PRIMARY_KEY)), cte_name: nf_cte, column: DEFAULT_PRIMARY_KEY.to_string(), }) }), true, ) } else { // Lowerer-created: inline the WHERE conditions Some(cte_where.clone()) (Some(cte_where.clone()), false) } }); // When cascade-derived, use [id] as sort key so ClickHouse // picks the by_id projection instead of the main table order. let effective_sort_key: Vec<String>; let sort_key = if is_cascade { effective_sort_key = vec![DEFAULT_PRIMARY_KEY.to_string()]; &effective_sort_key } else { sort_key }; wrap_scan_with_limit_by( from, where_clause, Loading Loading @@ -1022,6 +1072,94 @@ mod tests { assert!(arm2.having.is_some(), "second arm should have HAVING"); } #[test] fn cascade_fed_nf_cte_uses_id_only_sort_key() { let ont = ontology(); let mut node = Node::Query(Box::new(Query { ctes: vec![Cte::new( "_nf_mr", Query { select: vec![SelectExpr::new(Expr::col("mr", "id"), "id")], from: TableRef::scan("gl_merge_request", "mr"), where_clause: Some(Expr::and( Expr::func( "startsWith", vec![ Expr::col("mr", TRAVERSAL_PATH_COLUMN), Expr::string("1/100/"), ], ), Expr::InSubquery { expr: Box::new(Expr::col("mr", "id")), cte_name: "_cascade_mr".to_string(), column: "id".to_string(), }, )), ..Default::default() }, )], select: vec![SelectExpr::new(Expr::col("b", "id"), "id")], from: TableRef::scan("_nf_mr", "b"), ..Default::default() })); deduplicate(&mut node, &input_for(QueryType::Traversal), &ont); let Node::Query(q) = &node else { unreachable!() }; let cte_q = &q.ctes[0].query; let inner = find_subquery(&cte_q.from, "mr").expect("CTE scan should be wrapped"); assert!(has_limit_by(inner), "inner should have LIMIT 1 BY"); // CASCADE-FED: ORDER BY should be [id ASC, _version DESC], NOT // [traversal_path ASC, id ASC, _version DESC]. This lets // ClickHouse use the by_id projection. assert_eq!( inner.order_by.len(), 2, "should have exactly 2 ORDER BY columns (id, _version)" ); let first = &inner.order_by[0]; assert!( matches!(&first.expr, Expr::Column { column, .. } if column == "id"), "first ORDER BY should be id" ); assert!(!first.desc, "id should be ASC"); let second = &inner.order_by[1]; assert!(second.desc, "second ORDER BY should be _version DESC"); } #[test] fn non_cascade_nf_cte_uses_full_sort_key() { let ont = ontology(); let mut node = Node::Query(Box::new(Query { ctes: vec![Cte::new( "_nf_mr", Query { select: vec![SelectExpr::new(Expr::col("mr", "id"), "id")], from: TableRef::scan("gl_merge_request", "mr"), where_clause: Some(Expr::eq(Expr::col("mr", "state"), Expr::string("merged"))), ..Default::default() }, )], select: vec![SelectExpr::new(Expr::col("b", "id"), "id")], from: TableRef::scan("_nf_mr", "b"), ..Default::default() })); deduplicate(&mut node, &input_for(QueryType::Traversal), &ont); let Node::Query(q) = &node else { unreachable!() }; let cte_q = &q.ctes[0].query; let inner = find_subquery(&cte_q.from, "mr").expect("CTE scan should be wrapped"); assert!(has_limit_by(inner), "inner should have LIMIT 1 BY"); // NON-CASCADE: ORDER BY should include traversal_path (full sort key) assert!( inner.order_by.len() > 2, "non-cascade should have full sort key + _version" ); } #[test] fn gl_table_regex_matches_prefixed_and_unprefixed() { assert!(GL_TABLE_RE.is_match("gl_user")); Loading