Loading crates/integration-tests/tests/server/data_correctness/dedup.rs +171 −3 Original line number Diff line number Diff line Loading @@ -14,10 +14,12 @@ use super::helpers::*; fn dedup_svc() -> MockRedactionService { let mut svc = allow_all(); svc.allow("user", &[9001, 9002, 9003, 9010, 9011, 9300, 9301]); svc.allow("user", &[9001, 9002, 9003, 9010, 9011, 9300, 9301, 9600]); svc.allow( "merge_request", &[9100, 9101, 9200, 9201, 9400, 9401, 9500, 9501], &[ 9100, 9101, 9200, 9201, 9400, 9401, 9500, 9501, 9700, 9701, 9800, 9801, 9900, ], ); svc } Loading Loading @@ -416,7 +418,7 @@ pub(super) async fn neighbors_dedup_returns_unique_edges(ctx: &TestContext) { /// don't join non-center node tables. In production the indexer soft-deletes /// FK edge rows alongside their parent node, so this scenario is synthetic. pub(super) async fn neighbors_deleted_node_visible_via_edge(ctx: &TestContext) { // User 9301: v1 alive, v2 deleted. Should not appear as a neighbor. // User 9301: v1 alive, v2 deleted. Still visible via edge (known limitation). ctx.execute( "INSERT INTO gl_user (id, username, name, state, user_type, _version, _deleted) VALUES (9301, 'del_nbr', 'Deleted Neighbor', 'active', 'human', '2024-01-01 00:00:00', false), Loading Loading @@ -450,3 +452,169 @@ pub(super) async fn neighbors_deleted_node_visible_via_edge(ctx: &TestContext) { // The edge still appears because the edge row has _deleted=false. resp.assert_edge_exists("User", 9301, "MergeRequest", 9101, "AUTHORED"); } /// Hydration returns properties from the latest version, not a stale one. /// User 9600 has v1 username='hydrate_old', v2 username='hydrate_new'. /// The hydrated properties should show 'hydrate_new'. pub(super) async fn hydration_returns_latest_properties(ctx: &TestContext) { ctx.execute( "INSERT INTO gl_user (id, username, name, state, user_type, _version, _deleted) VALUES (9600, 'hydrate_old', 'Old Hydrated', 'active', 'human', '2024-01-01 00:00:00', false), (9600, 'hydrate_new', 'New Hydrated', 'active', 'human', '2024-06-01 00:00:00', false)", ) .await; ctx.execute( "INSERT INTO gl_edge (traversal_path, source_id, source_kind, relationship_kind, target_id, target_kind) VALUES ('1/100/1000/', 9600, 'User', 'MEMBER_OF', 100, 'Group')", ) .await; let resp = run_query( ctx, r#"{ "query_type": "traversal", "nodes": [ {"id": "u", "entity": "User", "node_ids": [9600], "columns": ["username", "name"]}, {"id": "g", "entity": "Group"} ], "relationships": [{"type": "MEMBER_OF", "from": "u", "to": "g"}], "limit": 10 }"#, &dedup_svc(), ) .await; resp.skip_requirement(Requirement::NodeCount); resp.assert_node_ids("User", &[9600]); // Hydration should return v2 properties, not v1. let node = resp.find_node("User", 9600).unwrap(); node.assert_str("username", "hydrate_new"); node.assert_str("name", "New Hydrated"); resp.assert_edge_exists("User", 9600, "Group", 100, "MEMBER_OF"); } /// Soft-deleted edge row excluded from traversal results. /// The edge itself has _deleted=true (as the indexer would set it). pub(super) async fn traversal_excludes_deleted_edge(ctx: &TestContext) { // MR 9700: alive, with a deleted edge to project 1003 // MR 9701: alive, with a non-deleted edge to project 1003 ctx.execute( "INSERT INTO gl_merge_request (id, iid, title, state, traversal_path, _version, _deleted) VALUES (9700, 700, 'Alive MR deleted edge', 'merged', '1/100/1003/', '2024-06-01 00:00:00', false), (9701, 701, 'Alive MR good edge', 'merged', '1/100/1003/', '2024-06-01 00:00:00', false)", ) .await; ctx.execute( "INSERT INTO gl_edge (traversal_path, source_id, source_kind, relationship_kind, target_id, target_kind, _deleted) VALUES ('1/100/1003/', 9700, 'MergeRequest', 'IN_PROJECT', 1003, 'Project', true), ('1/100/1003/', 9701, 'MergeRequest', 'IN_PROJECT', 1003, 'Project', false)", ) .await; let resp = run_query( ctx, r#"{ "query_type": "traversal", "nodes": [ {"id": "mr", "entity": "MergeRequest"}, {"id": "p", "entity": "Project", "node_ids": [1003]} ], "relationships": [{"type": "IN_PROJECT", "from": "mr", "to": "p"}], "limit": 10 }"#, &dedup_svc(), ) .await; // Only MR 9701's edge should appear. MR 9700's edge is soft-deleted. // Other tests may insert edges to project 1003 concurrently, so we // verify the deleted edge is absent rather than asserting exact count. resp.skip_requirement(Requirement::NodeCount); resp.assert_node_ids("Project", &[1003]); resp.assert_edge_exists("MergeRequest", 9701, "Project", 1003, "IN_PROJECT"); assert!( resp.find_edge("MergeRequest", 9700, "Project", 1003, "IN_PROJECT") .is_none(), "edge for MR 9700 should be excluded (_deleted=true)" ); } /// Three versions of the same entity: dedup picks the latest (v3), not v2. /// Confirms ORDER BY _version DESC picks the true latest across multiple versions. pub(super) async fn search_three_versions_returns_latest(ctx: &TestContext) { ctx.execute( "INSERT INTO gl_merge_request (id, iid, title, state, traversal_path, _version, _deleted) VALUES (9800, 800, 'MR v1', 'opened', '1/100/1000/', '2024-01-01 00:00:00', false), (9800, 800, 'MR v2', 'merged', '1/100/1000/', '2024-03-01 00:00:00', false), (9800, 800, 'MR v3', 'closed', '1/100/1000/', '2024-06-01 00:00:00', false), (9801, 801, 'Control MR', 'merged', '1/100/1000/', '2024-06-01 00:00:00', false)", ) .await; let resp = run_query( ctx, r#"{ "query_type": "search", "node": {"id": "mr", "entity": "MergeRequest", "columns": ["title", "state"], "node_ids": [9800, 9801]}, "limit": 10 }"#, &dedup_svc(), ) .await; resp.assert_node_count(2); resp.assert_node_ids("MergeRequest", &[9800, 9801]); // MR 9800 should show v3 (state='closed'), not v2 (state='merged'). let node = resp.find_node("MergeRequest", 9800).unwrap(); node.assert_str("state", "closed"); node.assert_str("title", "MR v3"); // Control: MR 9801 has a single version. let control = resp.find_node("MergeRequest", 9801).unwrap(); control.assert_str("state", "merged"); } /// Aggregation excludes deleted entities from count via _nf_* CTE argMax. /// MR 9900 has latest version _deleted=true. It should not be counted. pub(super) async fn aggregation_excludes_deleted_from_count(ctx: &TestContext) { // MR 9900: v1 alive, v2 deleted -- should NOT be counted ctx.execute( "INSERT INTO gl_merge_request (id, iid, title, state, traversal_path, _version, _deleted) VALUES (9900, 900, 'Counted then deleted', 'merged', '1/100/1002/', '2024-01-01 00:00:00', false), (9900, 900, 'Counted then deleted', 'merged', '1/100/1002/', '2024-06-01 00:00:00', true)", ) .await; ctx.execute( "INSERT INTO gl_edge (traversal_path, source_id, source_kind, relationship_kind, target_id, target_kind) VALUES ('1/100/1002/', 9900, 'MergeRequest', 'IN_PROJECT', 1002, 'Project')", ) .await; let resp = run_query( ctx, r#"{ "query_type": "aggregation", "nodes": [ {"id": "mr", "entity": "MergeRequest", "filters": {"state": "merged"}}, {"id": "p", "entity": "Project", "columns": ["name"], "node_ids": [1002]} ], "relationships": [{"type": "IN_PROJECT", "from": "mr", "to": "p"}], "aggregations": [{"function": "count", "target": "mr", "group_by": "p", "alias": "mr_count"}], "limit": 10 }"#, &dedup_svc(), ) .await; resp.assert_node_count(1); resp.assert_node_ids("Project", &[1002]); resp.skip_requirement(Requirement::Filter { field: "state".into(), }); // MR 9900 is deleted, so only MR 9201 (from earlier test) should be counted. // The _nf_mr CTE's argMax HAVING excludes deleted entities. resp.assert_node("Project", 1002, |n| { n.prop_i64("mr_count") == Some(1) && n.prop_str("name") == Some("Internal Project") }); } crates/integration-tests/tests/server/data_correctness/mod.rs +4 −0 Original line number Diff line number Diff line Loading @@ -176,5 +176,9 @@ async fn data_correctness() { dedup::traversal_deleted_node_visible_via_edge, dedup::neighbors_dedup_returns_unique_edges, dedup::neighbors_deleted_node_visible_via_edge, dedup::hydration_returns_latest_properties, dedup::traversal_excludes_deleted_edge, dedup::search_three_versions_returns_latest, dedup::aggregation_excludes_deleted_from_count, ); } crates/query-engine/compiler/src/passes/deduplicate.rs +9 −1 Original line number Diff line number Diff line Loading @@ -80,6 +80,10 @@ fn dedup_query(q: &mut Query, input: &Input, ontology: &Ontology) { /// Edge tables use ReplacingMergeTree with `_deleted` but are not wrapped /// in dedup subqueries (their full-tuple ORDER BY makes RMT dedup effective). /// Between merges, soft-deleted edge rows can still appear. /// /// Only handles Scan and Join variants. Union arms and Subquery inner queries /// are covered by `dedup_query`'s recursion, which calls this function on /// each nested query's own FROM/WHERE. fn add_edge_deleted_filters(from: &TableRef, where_clause: &mut Option<Expr>) { match from { TableRef::Scan { table, alias } if is_edge_table(table) => { Loading Loading @@ -531,7 +535,7 @@ mod tests { } #[test] fn skips_edge_table() { fn skips_edge_table_but_adds_deleted_filter() { let ont = ontology(); let mut node = Node::Query(Box::new(Query { select: vec![SelectExpr::new(Expr::col("e", "source_id"), "src")], Loading @@ -542,6 +546,10 @@ mod tests { let Node::Query(q) = &node; assert!(matches!(&q.from, TableRef::Scan { table, .. } if table == "gl_edge")); assert!( where_contains(&q.where_clause, "_deleted"), "edge scan should have _deleted filter" ); } #[test] Loading Loading
crates/integration-tests/tests/server/data_correctness/dedup.rs +171 −3 Original line number Diff line number Diff line Loading @@ -14,10 +14,12 @@ use super::helpers::*; fn dedup_svc() -> MockRedactionService { let mut svc = allow_all(); svc.allow("user", &[9001, 9002, 9003, 9010, 9011, 9300, 9301]); svc.allow("user", &[9001, 9002, 9003, 9010, 9011, 9300, 9301, 9600]); svc.allow( "merge_request", &[9100, 9101, 9200, 9201, 9400, 9401, 9500, 9501], &[ 9100, 9101, 9200, 9201, 9400, 9401, 9500, 9501, 9700, 9701, 9800, 9801, 9900, ], ); svc } Loading Loading @@ -416,7 +418,7 @@ pub(super) async fn neighbors_dedup_returns_unique_edges(ctx: &TestContext) { /// don't join non-center node tables. In production the indexer soft-deletes /// FK edge rows alongside their parent node, so this scenario is synthetic. pub(super) async fn neighbors_deleted_node_visible_via_edge(ctx: &TestContext) { // User 9301: v1 alive, v2 deleted. Should not appear as a neighbor. // User 9301: v1 alive, v2 deleted. Still visible via edge (known limitation). ctx.execute( "INSERT INTO gl_user (id, username, name, state, user_type, _version, _deleted) VALUES (9301, 'del_nbr', 'Deleted Neighbor', 'active', 'human', '2024-01-01 00:00:00', false), Loading Loading @@ -450,3 +452,169 @@ pub(super) async fn neighbors_deleted_node_visible_via_edge(ctx: &TestContext) { // The edge still appears because the edge row has _deleted=false. resp.assert_edge_exists("User", 9301, "MergeRequest", 9101, "AUTHORED"); } /// Hydration returns properties from the latest version, not a stale one. /// User 9600 has v1 username='hydrate_old', v2 username='hydrate_new'. /// The hydrated properties should show 'hydrate_new'. pub(super) async fn hydration_returns_latest_properties(ctx: &TestContext) { ctx.execute( "INSERT INTO gl_user (id, username, name, state, user_type, _version, _deleted) VALUES (9600, 'hydrate_old', 'Old Hydrated', 'active', 'human', '2024-01-01 00:00:00', false), (9600, 'hydrate_new', 'New Hydrated', 'active', 'human', '2024-06-01 00:00:00', false)", ) .await; ctx.execute( "INSERT INTO gl_edge (traversal_path, source_id, source_kind, relationship_kind, target_id, target_kind) VALUES ('1/100/1000/', 9600, 'User', 'MEMBER_OF', 100, 'Group')", ) .await; let resp = run_query( ctx, r#"{ "query_type": "traversal", "nodes": [ {"id": "u", "entity": "User", "node_ids": [9600], "columns": ["username", "name"]}, {"id": "g", "entity": "Group"} ], "relationships": [{"type": "MEMBER_OF", "from": "u", "to": "g"}], "limit": 10 }"#, &dedup_svc(), ) .await; resp.skip_requirement(Requirement::NodeCount); resp.assert_node_ids("User", &[9600]); // Hydration should return v2 properties, not v1. let node = resp.find_node("User", 9600).unwrap(); node.assert_str("username", "hydrate_new"); node.assert_str("name", "New Hydrated"); resp.assert_edge_exists("User", 9600, "Group", 100, "MEMBER_OF"); } /// Soft-deleted edge row excluded from traversal results. /// The edge itself has _deleted=true (as the indexer would set it). pub(super) async fn traversal_excludes_deleted_edge(ctx: &TestContext) { // MR 9700: alive, with a deleted edge to project 1003 // MR 9701: alive, with a non-deleted edge to project 1003 ctx.execute( "INSERT INTO gl_merge_request (id, iid, title, state, traversal_path, _version, _deleted) VALUES (9700, 700, 'Alive MR deleted edge', 'merged', '1/100/1003/', '2024-06-01 00:00:00', false), (9701, 701, 'Alive MR good edge', 'merged', '1/100/1003/', '2024-06-01 00:00:00', false)", ) .await; ctx.execute( "INSERT INTO gl_edge (traversal_path, source_id, source_kind, relationship_kind, target_id, target_kind, _deleted) VALUES ('1/100/1003/', 9700, 'MergeRequest', 'IN_PROJECT', 1003, 'Project', true), ('1/100/1003/', 9701, 'MergeRequest', 'IN_PROJECT', 1003, 'Project', false)", ) .await; let resp = run_query( ctx, r#"{ "query_type": "traversal", "nodes": [ {"id": "mr", "entity": "MergeRequest"}, {"id": "p", "entity": "Project", "node_ids": [1003]} ], "relationships": [{"type": "IN_PROJECT", "from": "mr", "to": "p"}], "limit": 10 }"#, &dedup_svc(), ) .await; // Only MR 9701's edge should appear. MR 9700's edge is soft-deleted. // Other tests may insert edges to project 1003 concurrently, so we // verify the deleted edge is absent rather than asserting exact count. resp.skip_requirement(Requirement::NodeCount); resp.assert_node_ids("Project", &[1003]); resp.assert_edge_exists("MergeRequest", 9701, "Project", 1003, "IN_PROJECT"); assert!( resp.find_edge("MergeRequest", 9700, "Project", 1003, "IN_PROJECT") .is_none(), "edge for MR 9700 should be excluded (_deleted=true)" ); } /// Three versions of the same entity: dedup picks the latest (v3), not v2. /// Confirms ORDER BY _version DESC picks the true latest across multiple versions. pub(super) async fn search_three_versions_returns_latest(ctx: &TestContext) { ctx.execute( "INSERT INTO gl_merge_request (id, iid, title, state, traversal_path, _version, _deleted) VALUES (9800, 800, 'MR v1', 'opened', '1/100/1000/', '2024-01-01 00:00:00', false), (9800, 800, 'MR v2', 'merged', '1/100/1000/', '2024-03-01 00:00:00', false), (9800, 800, 'MR v3', 'closed', '1/100/1000/', '2024-06-01 00:00:00', false), (9801, 801, 'Control MR', 'merged', '1/100/1000/', '2024-06-01 00:00:00', false)", ) .await; let resp = run_query( ctx, r#"{ "query_type": "search", "node": {"id": "mr", "entity": "MergeRequest", "columns": ["title", "state"], "node_ids": [9800, 9801]}, "limit": 10 }"#, &dedup_svc(), ) .await; resp.assert_node_count(2); resp.assert_node_ids("MergeRequest", &[9800, 9801]); // MR 9800 should show v3 (state='closed'), not v2 (state='merged'). let node = resp.find_node("MergeRequest", 9800).unwrap(); node.assert_str("state", "closed"); node.assert_str("title", "MR v3"); // Control: MR 9801 has a single version. let control = resp.find_node("MergeRequest", 9801).unwrap(); control.assert_str("state", "merged"); } /// Aggregation excludes deleted entities from count via _nf_* CTE argMax. /// MR 9900 has latest version _deleted=true. It should not be counted. pub(super) async fn aggregation_excludes_deleted_from_count(ctx: &TestContext) { // MR 9900: v1 alive, v2 deleted -- should NOT be counted ctx.execute( "INSERT INTO gl_merge_request (id, iid, title, state, traversal_path, _version, _deleted) VALUES (9900, 900, 'Counted then deleted', 'merged', '1/100/1002/', '2024-01-01 00:00:00', false), (9900, 900, 'Counted then deleted', 'merged', '1/100/1002/', '2024-06-01 00:00:00', true)", ) .await; ctx.execute( "INSERT INTO gl_edge (traversal_path, source_id, source_kind, relationship_kind, target_id, target_kind) VALUES ('1/100/1002/', 9900, 'MergeRequest', 'IN_PROJECT', 1002, 'Project')", ) .await; let resp = run_query( ctx, r#"{ "query_type": "aggregation", "nodes": [ {"id": "mr", "entity": "MergeRequest", "filters": {"state": "merged"}}, {"id": "p", "entity": "Project", "columns": ["name"], "node_ids": [1002]} ], "relationships": [{"type": "IN_PROJECT", "from": "mr", "to": "p"}], "aggregations": [{"function": "count", "target": "mr", "group_by": "p", "alias": "mr_count"}], "limit": 10 }"#, &dedup_svc(), ) .await; resp.assert_node_count(1); resp.assert_node_ids("Project", &[1002]); resp.skip_requirement(Requirement::Filter { field: "state".into(), }); // MR 9900 is deleted, so only MR 9201 (from earlier test) should be counted. // The _nf_mr CTE's argMax HAVING excludes deleted entities. resp.assert_node("Project", 1002, |n| { n.prop_i64("mr_count") == Some(1) && n.prop_str("name") == Some("Internal Project") }); }
crates/integration-tests/tests/server/data_correctness/mod.rs +4 −0 Original line number Diff line number Diff line Loading @@ -176,5 +176,9 @@ async fn data_correctness() { dedup::traversal_deleted_node_visible_via_edge, dedup::neighbors_dedup_returns_unique_edges, dedup::neighbors_deleted_node_visible_via_edge, dedup::hydration_returns_latest_properties, dedup::traversal_excludes_deleted_edge, dedup::search_three_versions_returns_latest, dedup::aggregation_excludes_deleted_from_count, ); }
crates/query-engine/compiler/src/passes/deduplicate.rs +9 −1 Original line number Diff line number Diff line Loading @@ -80,6 +80,10 @@ fn dedup_query(q: &mut Query, input: &Input, ontology: &Ontology) { /// Edge tables use ReplacingMergeTree with `_deleted` but are not wrapped /// in dedup subqueries (their full-tuple ORDER BY makes RMT dedup effective). /// Between merges, soft-deleted edge rows can still appear. /// /// Only handles Scan and Join variants. Union arms and Subquery inner queries /// are covered by `dedup_query`'s recursion, which calls this function on /// each nested query's own FROM/WHERE. fn add_edge_deleted_filters(from: &TableRef, where_clause: &mut Option<Expr>) { match from { TableRef::Scan { table, alias } if is_edge_table(table) => { Loading Loading @@ -531,7 +535,7 @@ mod tests { } #[test] fn skips_edge_table() { fn skips_edge_table_but_adds_deleted_filter() { let ont = ontology(); let mut node = Node::Query(Box::new(Query { select: vec![SelectExpr::new(Expr::col("e", "source_id"), "src")], Loading @@ -542,6 +546,10 @@ mod tests { let Node::Query(q) = &node; assert!(matches!(&q.from, TableRef::Scan { table, .. } if table == "gl_edge")); assert!( where_contains(&q.where_clause, "_deleted"), "edge scan should have _deleted filter" ); } #[test] Loading