Loading crates/gkg-server/src/pipeline/stages/hydration.rs +11 −5 Original line number Diff line number Diff line Loading @@ -70,6 +70,7 @@ impl HydrationStage { ctx: &QueryPipelineContext, nodes: Vec<InputNode>, total_ids: usize, kind: &str, ) -> Result<(PropertyMap, Vec<DebugQuery>, Vec<QueryExecution>), PipelineError> { if nodes.is_empty() { return Ok((HashMap::new(), Vec::new(), Vec::new())); Loading Loading @@ -108,10 +109,15 @@ impl HydrationStage { let start = Instant::now(); let mut query = client.query(&compiled.base.sql); let mut log_comment = match labkit::correlation::current() { Some(id) => format!("gkg;hydration:{kind};correlation_id={id}"), None => format!("gkg;hydration:{kind}"), }; if let Some(ref pid) = profiling_id { let log_comment = format!("gkg;hydration;profiling_id={pid}"); query = query.with_setting("log_comment", log_comment); log_comment.push_str(&format!(";profiling_id={pid}")); } query = query.with_setting("log_comment", log_comment); for (key, param) in &compiled.base.params { query = ArrowClickHouseClient::bind_param(query, key, ¶m.value, ¶m.ch_type); } Loading @@ -132,7 +138,7 @@ impl HydrationStage { ); let mut execution = QueryExecution { label: "hydration:dynamic".into(), label: format!("hydration:{kind}"), rendered_sql, query_id: String::new(), elapsed_ms: elapsed.as_secs_f64() * 1000.0, Loading Loading @@ -194,7 +200,7 @@ impl PipelineStage for HydrationStage { hydration_helpers::hydrate_static(templates, &query_result) .inspect_err(|e| obs.record_error(e))?; let (property_map, debug, executions) = Self::execute_hydration(ctx, nodes, ids_count) Self::execute_hydration(ctx, nodes, ids_count, "static") .await .inspect_err(|e| obs.record_error(e))?; hydration_queries = debug; Loading Loading @@ -258,7 +264,7 @@ impl PipelineStage for HydrationStage { hydration_helpers::hydrate_dynamic(entity_specs, &refs, &tps) .inspect_err(|e| obs.record_error(e))?; let (property_map, debug, executions) = Self::execute_hydration(ctx, nodes, ids_count) Self::execute_hydration(ctx, nodes, ids_count, "dynamic") .await .inspect_err(|e| obs.record_error(e))?; hydration_queries = debug; Loading Loading
crates/gkg-server/src/pipeline/stages/hydration.rs +11 −5 Original line number Diff line number Diff line Loading @@ -70,6 +70,7 @@ impl HydrationStage { ctx: &QueryPipelineContext, nodes: Vec<InputNode>, total_ids: usize, kind: &str, ) -> Result<(PropertyMap, Vec<DebugQuery>, Vec<QueryExecution>), PipelineError> { if nodes.is_empty() { return Ok((HashMap::new(), Vec::new(), Vec::new())); Loading Loading @@ -108,10 +109,15 @@ impl HydrationStage { let start = Instant::now(); let mut query = client.query(&compiled.base.sql); let mut log_comment = match labkit::correlation::current() { Some(id) => format!("gkg;hydration:{kind};correlation_id={id}"), None => format!("gkg;hydration:{kind}"), }; if let Some(ref pid) = profiling_id { let log_comment = format!("gkg;hydration;profiling_id={pid}"); query = query.with_setting("log_comment", log_comment); log_comment.push_str(&format!(";profiling_id={pid}")); } query = query.with_setting("log_comment", log_comment); for (key, param) in &compiled.base.params { query = ArrowClickHouseClient::bind_param(query, key, ¶m.value, ¶m.ch_type); } Loading @@ -132,7 +138,7 @@ impl HydrationStage { ); let mut execution = QueryExecution { label: "hydration:dynamic".into(), label: format!("hydration:{kind}"), rendered_sql, query_id: String::new(), elapsed_ms: elapsed.as_secs_f64() * 1000.0, Loading Loading @@ -194,7 +200,7 @@ impl PipelineStage for HydrationStage { hydration_helpers::hydrate_static(templates, &query_result) .inspect_err(|e| obs.record_error(e))?; let (property_map, debug, executions) = Self::execute_hydration(ctx, nodes, ids_count) Self::execute_hydration(ctx, nodes, ids_count, "static") .await .inspect_err(|e| obs.record_error(e))?; hydration_queries = debug; Loading Loading @@ -258,7 +264,7 @@ impl PipelineStage for HydrationStage { hydration_helpers::hydrate_dynamic(entity_specs, &refs, &tps) .inspect_err(|e| obs.record_error(e))?; let (property_map, debug, executions) = Self::execute_hydration(ctx, nodes, ids_count) Self::execute_hydration(ctx, nodes, ids_count, "dynamic") .await .inspect_err(|e| obs.record_error(e))?; hydration_queries = debug; Loading