Loading crates/gkg-server/src/pipeline/mod.rs +1 −0 Original line number Diff line number Diff line Loading @@ -5,6 +5,7 @@ mod stages; pub mod types; pub use helpers::{QueryRequest, receive_query_request, send_query_error}; pub use metrics::OTelPipelineObserver; pub use service::QueryPipelineService; pub use stages::{ AuthorizationStage, ClickHouseExecutor, HydrationStage, RedactionStage, SecurityStage, Loading crates/integration-tests/tests/server/telemetry.rs +4 −11 Original line number Diff line number Diff line Loading @@ -3,11 +3,12 @@ use std::time::Duration; use axum::body::Body; use axum::http::Request; use clickhouse_client::ArrowClickHouseClient; use gkg_server::query_pipeline::{PipelineObserver, PipelineOutput}; use gkg_server::pipeline::OTelPipelineObserver; use gkg_server::webserver::create_router; use opentelemetry::global; use opentelemetry_sdk::metrics::data::{AggregatedMetrics, HistogramDataPoint, MetricData}; use opentelemetry_sdk::metrics::{InMemoryMetricExporter, PeriodicReader, SdkMeterProvider}; use query_engine::pipeline::PipelineObserver; use tokio::time::sleep; use tower::ServiceExt; Loading Loading @@ -146,21 +147,13 @@ async fn correlation_id_generated_when_absent() { async fn pipeline_observer_records_query_metrics() { let (provider, exporter) = setup_meter_provider(); let mut obs = PipelineObserver::start(); let mut obs = OTelPipelineObserver::start(); obs.set_query_type("search"); obs.compiled(Duration::from_millis(5)); obs.executed(Duration::from_millis(50), 3); obs.authorized(Duration::from_millis(10)); obs.hydrated(Duration::from_millis(2)); let output = PipelineOutput { formatted_result: serde_json::json!({}), query_type: "search".into(), raw_query_strings: vec![], row_count: 42, redacted_count: 3, }; obs.finish(&output); obs.finish(42, 3); sleep(Duration::from_millis(150)).await; provider.force_flush().unwrap(); Loading Loading
crates/gkg-server/src/pipeline/mod.rs +1 −0 Original line number Diff line number Diff line Loading @@ -5,6 +5,7 @@ mod stages; pub mod types; pub use helpers::{QueryRequest, receive_query_request, send_query_error}; pub use metrics::OTelPipelineObserver; pub use service::QueryPipelineService; pub use stages::{ AuthorizationStage, ClickHouseExecutor, HydrationStage, RedactionStage, SecurityStage, Loading
crates/integration-tests/tests/server/telemetry.rs +4 −11 Original line number Diff line number Diff line Loading @@ -3,11 +3,12 @@ use std::time::Duration; use axum::body::Body; use axum::http::Request; use clickhouse_client::ArrowClickHouseClient; use gkg_server::query_pipeline::{PipelineObserver, PipelineOutput}; use gkg_server::pipeline::OTelPipelineObserver; use gkg_server::webserver::create_router; use opentelemetry::global; use opentelemetry_sdk::metrics::data::{AggregatedMetrics, HistogramDataPoint, MetricData}; use opentelemetry_sdk::metrics::{InMemoryMetricExporter, PeriodicReader, SdkMeterProvider}; use query_engine::pipeline::PipelineObserver; use tokio::time::sleep; use tower::ServiceExt; Loading Loading @@ -146,21 +147,13 @@ async fn correlation_id_generated_when_absent() { async fn pipeline_observer_records_query_metrics() { let (provider, exporter) = setup_meter_provider(); let mut obs = PipelineObserver::start(); let mut obs = OTelPipelineObserver::start(); obs.set_query_type("search"); obs.compiled(Duration::from_millis(5)); obs.executed(Duration::from_millis(50), 3); obs.authorized(Duration::from_millis(10)); obs.hydrated(Duration::from_millis(2)); let output = PipelineOutput { formatted_result: serde_json::json!({}), query_type: "search".into(), raw_query_strings: vec![], row_count: 42, redacted_count: 3, }; obs.finish(&output); obs.finish(42, 3); sleep(Duration::from_millis(150)).await; provider.force_flush().unwrap(); Loading