Loading crates/datalake-generator/src/dispatch.rs +4 −1 Original line number Diff line number Diff line use anyhow::{Context, Result}; use indexer::dispatcher::Dispatcher; use indexer::modules::sdlc::dispatch::{GlobalDispatcher, NamespaceDispatcher}; use indexer::modules::sdlc::dispatch::{DispatchMetrics, GlobalDispatcher, NamespaceDispatcher}; use tracing::info; use crate::config::SimulatorConfig; Loading @@ -25,15 +25,18 @@ pub async fn run_dispatch_indexing(config: &SimulatorConfig) -> Result<()> { .context("dispatcher connect failed")?; let datalake = datalake_config.build_client(); let metrics = DispatchMetrics::new(); let dispatchers: Vec<Box<dyn Dispatcher>> = vec![ Box::new(GlobalDispatcher::new( services.nats.clone(), services.lock_service.clone(), metrics.clone(), )), Box::new(NamespaceDispatcher::new( services.nats, services.lock_service, datalake, metrics, )), ]; Loading crates/gkg-server/src/main.rs +4 −1 Original line number Diff line number Diff line Loading @@ -10,7 +10,7 @@ use gkg_server::shutdown; use gkg_server::webserver::Server as HttpServer; use indexer::IndexerConfig; use indexer::dispatcher::Dispatcher; use indexer::modules::sdlc::dispatch::{GlobalDispatcher, NamespaceDispatcher}; use indexer::modules::sdlc::dispatch::{DispatchMetrics, GlobalDispatcher, NamespaceDispatcher}; use tokio_util::sync::CancellationToken; use tracing::info; Loading @@ -36,15 +36,18 @@ async fn main() -> anyhow::Result<()> { Mode::DispatchIndexing => { let services = indexer::dispatcher::connect(&config.nats).await?; let datalake = config.datalake.build_client(); let metrics = DispatchMetrics::new(); let dispatchers: Vec<Box<dyn Dispatcher>> = vec![ Box::new(GlobalDispatcher::new( services.nats.clone(), services.lock_service.clone(), metrics.clone(), )), Box::new(NamespaceDispatcher::new( services.nats, services.lock_service, datalake, metrics, )), ]; indexer::dispatcher::run(&dispatchers) Loading crates/indexer/src/modules/sdlc/dispatch/global_dispatch.rs +41 −5 Original line number Diff line number Diff line use std::sync::Arc; use std::time::Instant; use async_trait::async_trait; use chrono::Utc; use tracing::info; use super::metrics::DispatchMetrics; use crate::dispatcher::{DispatchError, Dispatcher}; use crate::locking::LockService; use crate::modules::sdlc::locking::{LOCK_TTL, global_lock_key}; Loading @@ -14,11 +16,20 @@ use crate::types::{Envelope, Event}; pub struct GlobalDispatcher { nats: Arc<dyn NatsServices>, lock_service: Arc<dyn LockService>, metrics: DispatchMetrics, } impl GlobalDispatcher { pub fn new(nats: Arc<dyn NatsServices>, lock_service: Arc<dyn LockService>) -> Self { Self { nats, lock_service } pub fn new( nats: Arc<dyn NatsServices>, lock_service: Arc<dyn LockService>, metrics: DispatchMetrics, ) -> Self { Self { nats, lock_service, metrics, } } } Loading @@ -29,27 +40,52 @@ impl Dispatcher for GlobalDispatcher { } async fn dispatch(&self) -> Result<(), DispatchError> { let start = Instant::now(); let result = self.dispatch_inner().await; let duration = start.elapsed().as_secs_f64(); let outcome = if result.is_ok() { "success" } else { "error" }; self.metrics.record_run(self.name(), outcome, duration); result } } impl GlobalDispatcher { async fn dispatch_inner(&self) -> Result<(), DispatchError> { let acquired = self .lock_service .try_acquire(global_lock_key(), LOCK_TTL) .await .map_err(DispatchError::new)?; .map_err(|error| { self.metrics.record_error(self.name(), "lock"); DispatchError::new(error) })?; if !acquired { info!("skipping global indexing request, lock already held"); self.metrics.record_requests_skipped(self.name(), 1); return Ok(()); } let envelope = Envelope::new(&GlobalIndexingRequest { watermark: Utc::now(), }) .map_err(DispatchError::new)?; .map_err(|error| { self.metrics.record_error(self.name(), "publish"); DispatchError::new(error) })?; self.nats .publish(&GlobalIndexingRequest::topic(), &envelope) .await .map_err(DispatchError::new)?; .map_err(|error| { self.metrics.record_error(self.name(), "publish"); DispatchError::new(error) })?; self.metrics.record_requests_published(self.name(), 1); info!("dispatched global indexing request"); Ok(()) } Loading crates/indexer/src/modules/sdlc/dispatch/metrics.rs 0 → 100644 +111 −0 Original line number Diff line number Diff line use opentelemetry::KeyValue; use opentelemetry::global; use opentelemetry::metrics::{Counter, Histogram, Meter}; const DURATION_BUCKETS: &[f64] = &[ 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0, ]; #[derive(Clone)] pub struct DispatchMetrics { runs: Counter<u64>, duration: Histogram<f64>, requests_published: Counter<u64>, requests_skipped: Counter<u64>, query_duration: Histogram<f64>, errors: Counter<u64>, } impl DispatchMetrics { pub fn new() -> Self { let meter = global::meter("indexer_dispatch"); Self::with_meter(&meter) } pub fn with_meter(meter: &Meter) -> Self { let runs = meter .u64_counter("indexer.dispatch.runs") .with_description("Total dispatch runs by dispatcher") .build(); let duration = meter .f64_histogram("indexer.dispatch.duration") .with_unit("s") .with_description("End-to-end duration of a dispatch cycle") .with_boundaries(DURATION_BUCKETS.to_vec()) .build(); let requests_published = meter .u64_counter("indexer.dispatch.requests.published") .with_description("Namespace/global requests successfully published") .build(); let requests_skipped = meter .u64_counter("indexer.dispatch.requests.skipped") .with_description("Requests skipped due to lock contention") .build(); let query_duration = meter .f64_histogram("indexer.dispatch.query.duration") .with_unit("s") .with_description("Duration of the enabled-namespaces ClickHouse query") .with_boundaries(DURATION_BUCKETS.to_vec()) .build(); let errors = meter .u64_counter("indexer.dispatch.errors") .with_description("Dispatch errors by stage") .build(); Self { runs, duration, requests_published, requests_skipped, query_duration, errors, } } pub fn record_run(&self, dispatcher: &str, outcome: &str, duration: f64) { let labels = [ KeyValue::new("dispatcher", dispatcher.to_owned()), KeyValue::new("outcome", outcome.to_owned()), ]; self.runs.add(1, &labels); self.duration.record( duration, &[KeyValue::new("dispatcher", dispatcher.to_owned())], ); } pub fn record_requests_published(&self, dispatcher: &str, count: u64) { self.requests_published .add(count, &[KeyValue::new("dispatcher", dispatcher.to_owned())]); } pub fn record_requests_skipped(&self, dispatcher: &str, count: u64) { self.requests_skipped .add(count, &[KeyValue::new("dispatcher", dispatcher.to_owned())]); } pub fn record_query_duration(&self, duration: f64) { self.query_duration.record(duration, &[]); } pub fn record_error(&self, dispatcher: &str, stage: &str) { self.errors.add( 1, &[ KeyValue::new("dispatcher", dispatcher.to_owned()), KeyValue::new("stage", stage.to_owned()), ], ); } } impl Default for DispatchMetrics { fn default() -> Self { Self::new() } } crates/indexer/src/modules/sdlc/dispatch/mod.rs +2 −0 Original line number Diff line number Diff line mod global_dispatch; mod metrics; mod namespace_dispatch; pub use global_dispatch::GlobalDispatcher; pub use metrics::DispatchMetrics; pub use namespace_dispatch::NamespaceDispatcher; Loading
crates/datalake-generator/src/dispatch.rs +4 −1 Original line number Diff line number Diff line use anyhow::{Context, Result}; use indexer::dispatcher::Dispatcher; use indexer::modules::sdlc::dispatch::{GlobalDispatcher, NamespaceDispatcher}; use indexer::modules::sdlc::dispatch::{DispatchMetrics, GlobalDispatcher, NamespaceDispatcher}; use tracing::info; use crate::config::SimulatorConfig; Loading @@ -25,15 +25,18 @@ pub async fn run_dispatch_indexing(config: &SimulatorConfig) -> Result<()> { .context("dispatcher connect failed")?; let datalake = datalake_config.build_client(); let metrics = DispatchMetrics::new(); let dispatchers: Vec<Box<dyn Dispatcher>> = vec![ Box::new(GlobalDispatcher::new( services.nats.clone(), services.lock_service.clone(), metrics.clone(), )), Box::new(NamespaceDispatcher::new( services.nats, services.lock_service, datalake, metrics, )), ]; Loading
crates/gkg-server/src/main.rs +4 −1 Original line number Diff line number Diff line Loading @@ -10,7 +10,7 @@ use gkg_server::shutdown; use gkg_server::webserver::Server as HttpServer; use indexer::IndexerConfig; use indexer::dispatcher::Dispatcher; use indexer::modules::sdlc::dispatch::{GlobalDispatcher, NamespaceDispatcher}; use indexer::modules::sdlc::dispatch::{DispatchMetrics, GlobalDispatcher, NamespaceDispatcher}; use tokio_util::sync::CancellationToken; use tracing::info; Loading @@ -36,15 +36,18 @@ async fn main() -> anyhow::Result<()> { Mode::DispatchIndexing => { let services = indexer::dispatcher::connect(&config.nats).await?; let datalake = config.datalake.build_client(); let metrics = DispatchMetrics::new(); let dispatchers: Vec<Box<dyn Dispatcher>> = vec![ Box::new(GlobalDispatcher::new( services.nats.clone(), services.lock_service.clone(), metrics.clone(), )), Box::new(NamespaceDispatcher::new( services.nats, services.lock_service, datalake, metrics, )), ]; indexer::dispatcher::run(&dispatchers) Loading
crates/indexer/src/modules/sdlc/dispatch/global_dispatch.rs +41 −5 Original line number Diff line number Diff line use std::sync::Arc; use std::time::Instant; use async_trait::async_trait; use chrono::Utc; use tracing::info; use super::metrics::DispatchMetrics; use crate::dispatcher::{DispatchError, Dispatcher}; use crate::locking::LockService; use crate::modules::sdlc::locking::{LOCK_TTL, global_lock_key}; Loading @@ -14,11 +16,20 @@ use crate::types::{Envelope, Event}; pub struct GlobalDispatcher { nats: Arc<dyn NatsServices>, lock_service: Arc<dyn LockService>, metrics: DispatchMetrics, } impl GlobalDispatcher { pub fn new(nats: Arc<dyn NatsServices>, lock_service: Arc<dyn LockService>) -> Self { Self { nats, lock_service } pub fn new( nats: Arc<dyn NatsServices>, lock_service: Arc<dyn LockService>, metrics: DispatchMetrics, ) -> Self { Self { nats, lock_service, metrics, } } } Loading @@ -29,27 +40,52 @@ impl Dispatcher for GlobalDispatcher { } async fn dispatch(&self) -> Result<(), DispatchError> { let start = Instant::now(); let result = self.dispatch_inner().await; let duration = start.elapsed().as_secs_f64(); let outcome = if result.is_ok() { "success" } else { "error" }; self.metrics.record_run(self.name(), outcome, duration); result } } impl GlobalDispatcher { async fn dispatch_inner(&self) -> Result<(), DispatchError> { let acquired = self .lock_service .try_acquire(global_lock_key(), LOCK_TTL) .await .map_err(DispatchError::new)?; .map_err(|error| { self.metrics.record_error(self.name(), "lock"); DispatchError::new(error) })?; if !acquired { info!("skipping global indexing request, lock already held"); self.metrics.record_requests_skipped(self.name(), 1); return Ok(()); } let envelope = Envelope::new(&GlobalIndexingRequest { watermark: Utc::now(), }) .map_err(DispatchError::new)?; .map_err(|error| { self.metrics.record_error(self.name(), "publish"); DispatchError::new(error) })?; self.nats .publish(&GlobalIndexingRequest::topic(), &envelope) .await .map_err(DispatchError::new)?; .map_err(|error| { self.metrics.record_error(self.name(), "publish"); DispatchError::new(error) })?; self.metrics.record_requests_published(self.name(), 1); info!("dispatched global indexing request"); Ok(()) } Loading
crates/indexer/src/modules/sdlc/dispatch/metrics.rs 0 → 100644 +111 −0 Original line number Diff line number Diff line use opentelemetry::KeyValue; use opentelemetry::global; use opentelemetry::metrics::{Counter, Histogram, Meter}; const DURATION_BUCKETS: &[f64] = &[ 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0, ]; #[derive(Clone)] pub struct DispatchMetrics { runs: Counter<u64>, duration: Histogram<f64>, requests_published: Counter<u64>, requests_skipped: Counter<u64>, query_duration: Histogram<f64>, errors: Counter<u64>, } impl DispatchMetrics { pub fn new() -> Self { let meter = global::meter("indexer_dispatch"); Self::with_meter(&meter) } pub fn with_meter(meter: &Meter) -> Self { let runs = meter .u64_counter("indexer.dispatch.runs") .with_description("Total dispatch runs by dispatcher") .build(); let duration = meter .f64_histogram("indexer.dispatch.duration") .with_unit("s") .with_description("End-to-end duration of a dispatch cycle") .with_boundaries(DURATION_BUCKETS.to_vec()) .build(); let requests_published = meter .u64_counter("indexer.dispatch.requests.published") .with_description("Namespace/global requests successfully published") .build(); let requests_skipped = meter .u64_counter("indexer.dispatch.requests.skipped") .with_description("Requests skipped due to lock contention") .build(); let query_duration = meter .f64_histogram("indexer.dispatch.query.duration") .with_unit("s") .with_description("Duration of the enabled-namespaces ClickHouse query") .with_boundaries(DURATION_BUCKETS.to_vec()) .build(); let errors = meter .u64_counter("indexer.dispatch.errors") .with_description("Dispatch errors by stage") .build(); Self { runs, duration, requests_published, requests_skipped, query_duration, errors, } } pub fn record_run(&self, dispatcher: &str, outcome: &str, duration: f64) { let labels = [ KeyValue::new("dispatcher", dispatcher.to_owned()), KeyValue::new("outcome", outcome.to_owned()), ]; self.runs.add(1, &labels); self.duration.record( duration, &[KeyValue::new("dispatcher", dispatcher.to_owned())], ); } pub fn record_requests_published(&self, dispatcher: &str, count: u64) { self.requests_published .add(count, &[KeyValue::new("dispatcher", dispatcher.to_owned())]); } pub fn record_requests_skipped(&self, dispatcher: &str, count: u64) { self.requests_skipped .add(count, &[KeyValue::new("dispatcher", dispatcher.to_owned())]); } pub fn record_query_duration(&self, duration: f64) { self.query_duration.record(duration, &[]); } pub fn record_error(&self, dispatcher: &str, stage: &str) { self.errors.add( 1, &[ KeyValue::new("dispatcher", dispatcher.to_owned()), KeyValue::new("stage", stage.to_owned()), ], ); } } impl Default for DispatchMetrics { fn default() -> Self { Self::new() } }
crates/indexer/src/modules/sdlc/dispatch/mod.rs +2 −0 Original line number Diff line number Diff line mod global_dispatch; mod metrics; mod namespace_dispatch; pub use global_dispatch::GlobalDispatcher; pub use metrics::DispatchMetrics; pub use namespace_dispatch::NamespaceDispatcher;