Loading Cargo.lock +2 −0 Original line number Diff line number Diff line Loading @@ -5161,9 +5161,11 @@ dependencies = [ name = "gkg-billing" version = "0.1.0" dependencies = [ "gkg-observability", "gkg-server-config", "labkit", "labkit-events", "opentelemetry", "query-engine", "serde_json", "tracing", Loading crates/gkg-billing/Cargo.toml +2 −0 Original line number Diff line number Diff line Loading @@ -10,7 +10,9 @@ path = "src/lib.rs" [dependencies] labkit = { workspace = true } labkit-events = { workspace = true } gkg-observability = { workspace = true } gkg-server-config = { path = "../gkg-server-config" } opentelemetry = { workspace = true } query-engine = { path = "../query-engine" } serde_json = { workspace = true } tracing = { workspace = true } Loading crates/gkg-billing/src/lib.rs +2 −0 Original line number Diff line number Diff line Loading @@ -8,9 +8,11 @@ pub mod constants; pub mod inputs; mod metrics; mod observer; mod tracker; pub use inputs::BillingInputs; pub use metrics::register as register_metrics; pub use observer::BillingObserver; pub use tracker::{BillingTracker, SnowplowBillingTracker}; crates/gkg-billing/src/metrics.rs 0 → 100644 +46 −0 Original line number Diff line number Diff line use std::sync::LazyLock; use gkg_observability::billing::events as spec; use opentelemetry::KeyValue; use opentelemetry::metrics::Counter; pub(crate) const REASON_REALM_MISSING: &str = "realm_missing"; pub(crate) const REASON_REALM_UNRECOGNIZED: &str = "realm_unrecognized"; pub(crate) const REASON_EVENT_BUILD_FAILED: &str = "event_build_failed"; pub(crate) static METRICS: LazyLock<BillingMetrics> = LazyLock::new(BillingMetrics::new); pub(crate) struct BillingMetrics { pub emitted: Counter<u64>, pub dropped: Counter<u64>, pub rejected: Counter<u64>, } impl BillingMetrics { fn new() -> Self { let meter = gkg_observability::meter(); Self { emitted: spec::EVENTS_EMITTED.build_counter_u64(&meter), dropped: spec::EVENTS_DROPPED.build_counter_u64(&meter), rejected: spec::EVENTS_REJECTED.build_counter_u64(&meter), } } } /// Register every series at 0 at startup so `rate(...) == 0` alerts compare /// against zero rather than an absent series — OTel only exposes a series /// once it has been observed at least once. The labelled `dropped` counter /// is touched per known `reason` value so each labelled series appears. pub fn register() { METRICS.emitted.add(0, &[]); METRICS.rejected.add(0, &[]); for reason in [ REASON_REALM_MISSING, REASON_REALM_UNRECOGNIZED, REASON_EVENT_BUILD_FAILED, ] { METRICS .dropped .add(0, &[KeyValue::new(spec::labels::REASON, reason)]); } } crates/gkg-billing/src/observer.rs +32 −1 Original line number Diff line number Diff line Loading @@ -2,14 +2,31 @@ use std::cell::Cell; use std::sync::Arc; use std::time::Duration; use gkg_observability::billing::events as spec; use labkit_events::BillingEvent; use opentelemetry::KeyValue; use query_engine::pipeline::{PipelineError, PipelineObserver}; use serde_json::json; use crate::constants::{CATEGORY, EVENT_TYPE, UNIT_OF_MEASURE, normalize_realm}; use crate::inputs::BillingInputs; use crate::metrics::{ METRICS, REASON_EVENT_BUILD_FAILED, REASON_REALM_MISSING, REASON_REALM_UNRECOGNIZED, }; use crate::tracker::BillingTracker; fn record_dropped(reason: &'static str) { METRICS .dropped .add(1, &[KeyValue::new(spec::labels::REASON, reason)]); } fn correlation_id_string() -> String { labkit::correlation::current() .map(|id| id.as_str().to_string()) .unwrap_or_default() } pub struct BillingObserver { tracker: Option<Arc<dyn BillingTracker>>, inputs: BillingInputs, Loading @@ -28,19 +45,24 @@ impl BillingObserver { } fn build_event(&self) -> Option<BillingEvent> { let correlation_id = correlation_id_string(); let Some(raw_realm) = self.inputs.realm.as_deref() else { tracing::warn!( user_id = self.inputs.user_id, correlation_id = %correlation_id, "billing event skipped: realm missing from JWT claims" ); record_dropped(REASON_REALM_MISSING); return None; }; let Some(realm) = normalize_realm(raw_realm) else { tracing::warn!( user_id = self.inputs.user_id, raw_realm = raw_realm, correlation_id = %correlation_id, "billing event skipped: unrecognized realm value" ); record_dropped(REASON_REALM_UNRECOGNIZED); return None; }; Loading Loading @@ -86,7 +108,13 @@ impl BillingObserver { match builder.build() { Ok(event) => Some(event), Err(e) => { tracing::error!(error = %e, "failed to build billing event"); tracing::error!( error = %e, user_id = self.inputs.user_id, correlation_id = %correlation_id, "failed to build billing event" ); record_dropped(REASON_EVENT_BUILD_FAILED); None } } Loading Loading @@ -119,7 +147,10 @@ impl PipelineObserver for BillingObserver { if let Some(ref tracker) = self.tracker && let Some(event) = self.build_event() { let _span = tracing::info_span!("billing.track", query_type = self.query_type).entered(); tracker.track(event); METRICS.emitted.add(1, &[]); } } } Loading Loading
Cargo.lock +2 −0 Original line number Diff line number Diff line Loading @@ -5161,9 +5161,11 @@ dependencies = [ name = "gkg-billing" version = "0.1.0" dependencies = [ "gkg-observability", "gkg-server-config", "labkit", "labkit-events", "opentelemetry", "query-engine", "serde_json", "tracing", Loading
crates/gkg-billing/Cargo.toml +2 −0 Original line number Diff line number Diff line Loading @@ -10,7 +10,9 @@ path = "src/lib.rs" [dependencies] labkit = { workspace = true } labkit-events = { workspace = true } gkg-observability = { workspace = true } gkg-server-config = { path = "../gkg-server-config" } opentelemetry = { workspace = true } query-engine = { path = "../query-engine" } serde_json = { workspace = true } tracing = { workspace = true } Loading
crates/gkg-billing/src/lib.rs +2 −0 Original line number Diff line number Diff line Loading @@ -8,9 +8,11 @@ pub mod constants; pub mod inputs; mod metrics; mod observer; mod tracker; pub use inputs::BillingInputs; pub use metrics::register as register_metrics; pub use observer::BillingObserver; pub use tracker::{BillingTracker, SnowplowBillingTracker};
crates/gkg-billing/src/metrics.rs 0 → 100644 +46 −0 Original line number Diff line number Diff line use std::sync::LazyLock; use gkg_observability::billing::events as spec; use opentelemetry::KeyValue; use opentelemetry::metrics::Counter; pub(crate) const REASON_REALM_MISSING: &str = "realm_missing"; pub(crate) const REASON_REALM_UNRECOGNIZED: &str = "realm_unrecognized"; pub(crate) const REASON_EVENT_BUILD_FAILED: &str = "event_build_failed"; pub(crate) static METRICS: LazyLock<BillingMetrics> = LazyLock::new(BillingMetrics::new); pub(crate) struct BillingMetrics { pub emitted: Counter<u64>, pub dropped: Counter<u64>, pub rejected: Counter<u64>, } impl BillingMetrics { fn new() -> Self { let meter = gkg_observability::meter(); Self { emitted: spec::EVENTS_EMITTED.build_counter_u64(&meter), dropped: spec::EVENTS_DROPPED.build_counter_u64(&meter), rejected: spec::EVENTS_REJECTED.build_counter_u64(&meter), } } } /// Register every series at 0 at startup so `rate(...) == 0` alerts compare /// against zero rather than an absent series — OTel only exposes a series /// once it has been observed at least once. The labelled `dropped` counter /// is touched per known `reason` value so each labelled series appears. pub fn register() { METRICS.emitted.add(0, &[]); METRICS.rejected.add(0, &[]); for reason in [ REASON_REALM_MISSING, REASON_REALM_UNRECOGNIZED, REASON_EVENT_BUILD_FAILED, ] { METRICS .dropped .add(0, &[KeyValue::new(spec::labels::REASON, reason)]); } }
crates/gkg-billing/src/observer.rs +32 −1 Original line number Diff line number Diff line Loading @@ -2,14 +2,31 @@ use std::cell::Cell; use std::sync::Arc; use std::time::Duration; use gkg_observability::billing::events as spec; use labkit_events::BillingEvent; use opentelemetry::KeyValue; use query_engine::pipeline::{PipelineError, PipelineObserver}; use serde_json::json; use crate::constants::{CATEGORY, EVENT_TYPE, UNIT_OF_MEASURE, normalize_realm}; use crate::inputs::BillingInputs; use crate::metrics::{ METRICS, REASON_EVENT_BUILD_FAILED, REASON_REALM_MISSING, REASON_REALM_UNRECOGNIZED, }; use crate::tracker::BillingTracker; fn record_dropped(reason: &'static str) { METRICS .dropped .add(1, &[KeyValue::new(spec::labels::REASON, reason)]); } fn correlation_id_string() -> String { labkit::correlation::current() .map(|id| id.as_str().to_string()) .unwrap_or_default() } pub struct BillingObserver { tracker: Option<Arc<dyn BillingTracker>>, inputs: BillingInputs, Loading @@ -28,19 +45,24 @@ impl BillingObserver { } fn build_event(&self) -> Option<BillingEvent> { let correlation_id = correlation_id_string(); let Some(raw_realm) = self.inputs.realm.as_deref() else { tracing::warn!( user_id = self.inputs.user_id, correlation_id = %correlation_id, "billing event skipped: realm missing from JWT claims" ); record_dropped(REASON_REALM_MISSING); return None; }; let Some(realm) = normalize_realm(raw_realm) else { tracing::warn!( user_id = self.inputs.user_id, raw_realm = raw_realm, correlation_id = %correlation_id, "billing event skipped: unrecognized realm value" ); record_dropped(REASON_REALM_UNRECOGNIZED); return None; }; Loading Loading @@ -86,7 +108,13 @@ impl BillingObserver { match builder.build() { Ok(event) => Some(event), Err(e) => { tracing::error!(error = %e, "failed to build billing event"); tracing::error!( error = %e, user_id = self.inputs.user_id, correlation_id = %correlation_id, "failed to build billing event" ); record_dropped(REASON_EVENT_BUILD_FAILED); None } } Loading Loading @@ -119,7 +147,10 @@ impl PipelineObserver for BillingObserver { if let Some(ref tracker) = self.tracker && let Some(event) = self.build_event() { let _span = tracing::info_span!("billing.track", query_type = self.query_type).entered(); tracker.track(event); METRICS.emitted.add(1, &[]); } } } Loading