Loading crates/indexer/src/engine.rs +22 −5 Original line number Diff line number Diff line Loading @@ -33,7 +33,7 @@ use futures::StreamExt; use opentelemetry::KeyValue; use thiserror::Error; use tokio_util::sync::CancellationToken; use tracing::warn; use tracing::{debug, info, warn}; use crate::configuration::EngineConfiguration; use crate::destination::Destination; Loading Loading @@ -185,9 +185,12 @@ impl Engine { } async fn listen(&self, topic: Topic, worker_pool: Arc<WorkerPool>) -> Result<(), EngineError> { let topic_name = format!("{}.{}", topic.stream, topic.subject); info!(topic = %topic_name, "topic listener starting"); let mut subscription = self.broker.subscribe(&topic, self.metrics.clone()).await?; let mut inflight = tokio::task::JoinSet::new(); let topic_label = KeyValue::new("topic", format!("{}.{}", topic.stream, topic.subject)); let topic_label = KeyValue::new("topic", topic_name.clone()); loop { tokio::select! { Loading @@ -201,17 +204,20 @@ impl Engine { worker_pool.clone(), self.metrics.clone(), topic_label.clone(), topic_name.clone(), )); } } } let drained = inflight.len(); while let Some(result) = inflight.join_next().await { if let Err(error) = result { warn!(%error, "message processing task panicked"); warn!(%error, topic = %topic_name, "message processing task panicked"); } } info!(topic = %topic_name, drained, "topic listener stopped"); Ok(()) } Loading @@ -230,7 +236,17 @@ async fn process_message( worker_pool: Arc<WorkerPool>, metrics: Arc<EngineMetrics>, topic_label: KeyValue, topic_name: String, ) { let message_id = message.envelope.id.0.clone(); let attempt = message.envelope.attempt; debug!(topic = %topic_name, %message_id, attempt, "message received"); if attempt > 1 { info!(topic = %topic_name, %message_id, attempt, "message retry received"); } let message_start = Instant::now(); let result = run_handlers(&handlers, &context, &message, &worker_pool, &metrics).await; Loading @@ -238,13 +254,14 @@ async fn process_message( let outcome = match result { Ok(()) => { if let Err(error) = message.ack().await { warn!(%error, "failed to ack message"); warn!(%error, %message_id, "failed to ack message"); } "ack" } Err(_) => { info!(topic = %topic_name, %message_id, "message nacked, handler failure"); if let Err(error) = message.nack().await { warn!(%error, "failed to nack message"); warn!(%error, %message_id, "failed to nack message"); } "nack" } Loading crates/indexer/src/locking.rs +18 −5 Original line number Diff line number Diff line use std::time::Duration; use async_trait::async_trait; use tracing::debug; #[derive(Debug, thiserror::Error)] pub enum LockError { Loading Loading @@ -37,18 +38,30 @@ impl LockService for NatsLockService { .kv_put(INDEXING_LOCKS_BUCKET, key, Bytes::new(), options) .await { Ok(KvPutResult::Success(_)) => Ok(true), Ok(KvPutResult::AlreadyExists | KvPutResult::RevisionMismatch) => Ok(false), Err(e) => Err(LockError::Backend(e.to_string())), Ok(KvPutResult::Success(_)) => { debug!(key, "lock acquired"); Ok(true) } Ok(KvPutResult::AlreadyExists | KvPutResult::RevisionMismatch) => { debug!(key, "lock contention, already held"); Ok(false) } Err(e) => { debug!(key, error = %e, "lock acquisition error"); Err(LockError::Backend(e.to_string())) } } } async fn release(&self, key: &str) -> Result<(), LockError> { use crate::modules::sdlc::locking::INDEXING_LOCKS_BUCKET; self.nats let result = self .nats .kv_delete(INDEXING_LOCKS_BUCKET, key) .await .map_err(|e| LockError::Backend(e.to_string())) .map_err(|e| LockError::Backend(e.to_string())); debug!(key, "lock released"); result } } crates/indexer/src/nats/broker.rs +28 −3 Original line number Diff line number Diff line Loading @@ -19,6 +19,8 @@ use tokio::task::JoinHandle; use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; use crate::metrics::EngineMetrics; use crate::types::{Envelope, MessageId, Topic}; Loading Loading @@ -63,6 +65,7 @@ impl NatsBroker { } pub async fn shutdown(self) { info!("broker shutdown initiated"); self.cancellation_token.cancel(); let handles: Vec<_> = self.subscription_handles.lock().drain(..).collect(); for handle in handles { Loading Loading @@ -122,6 +125,8 @@ impl NatsBroker { message: e.to_string(), })?; info!(bucket, "KV bucket ready"); let mut cache = self.kv_stores.write().await; cache.insert(bucket.to_string(), store); Ok(()) Loading @@ -133,14 +138,17 @@ impl NatsBroker { subjects: Vec<String>, ) -> Result<Stream, NatsError> { match self.get_stream(stream_name).await { Ok(stream) => return Ok(stream), Ok(stream) => { info!(stream = %stream_name, "stream found"); return Ok(stream); } Err(NatsError::StreamNotFound { .. }) => {} Err(e) => return Err(e), } let stream_config = async_nats::jetstream::stream::Config { name: stream_name.to_string(), subjects, subjects: subjects.clone(), num_replicas: self.config.stream_replicas, max_age: self.config.stream_max_age().unwrap_or_default(), max_bytes: self.config.stream_max_bytes.unwrap_or(-1), Loading @@ -159,6 +167,8 @@ impl NatsBroker { source: e, })?; info!(stream = %stream_name, ?subjects, "stream created"); let mut cache = self.streams.write().await; cache.insert(stream_name.clone(), stream.clone()); Ok(stream) Loading Loading @@ -277,10 +287,21 @@ impl NatsBroker { let stream = self.get_stream(&topic.stream).await?; let consumer = self.get_or_create_consumer(&stream, &topic.subject).await?; let consumer_type = match &self.config.consumer_name { Some(name) => format!("durable({})", name), None => "ephemeral".to_string(), }; let batch_size = self.config.batch_size(); info!( topic = %format!("{}.{}", topic.stream, topic.subject), consumer_type, batch_size, "subscription started" ); let (sender, receiver) = tokio::sync::mpsc::channel(self.config.subscription_buffer_size()); let cancel_token = self.cancellation_token.clone(); let batch_size = self.config.batch_size(); let handle = tokio::spawn(async move { loop { Loading @@ -292,6 +313,7 @@ impl NatsBroker { let batch = match consumer.fetch().max_messages(batch_size).messages().await { Ok(batch) => batch, Err(e) => { warn!(error = %e, "fetch batch error"); metrics.nats_fetch_duration.record( fetch_start.elapsed().as_secs_f64(), &[opentelemetry::KeyValue::new("outcome", "error")], Loading @@ -308,11 +330,13 @@ impl NatsBroker { tokio::pin!(batch); let mut batch_count: usize = 0; while let Some(result) = batch.next().await { if cancel_token.is_cancelled() { break; } batch_count += 1; let converted = match result { Ok(msg) => Self::convert_message(msg), Err(e) => Err(map_subscribe_error(e)), Loading @@ -322,6 +346,7 @@ impl NatsBroker { return; } } debug!(count = batch_count, "batch fetched"); } }); Loading crates/indexer/src/worker_pool.rs +15 −2 Original line number Diff line number Diff line Loading @@ -23,6 +23,7 @@ use std::time::Instant; use opentelemetry::KeyValue; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tracing::{debug, info}; use crate::configuration::EngineConfiguration; use crate::metrics::EngineMetrics; Loading Loading @@ -71,7 +72,7 @@ impl WorkerPool { pub fn new(configuration: &EngineConfiguration, metrics: Arc<EngineMetrics>) -> Self { let global_semaphore = Arc::new(Semaphore::new(configuration.max_concurrent_workers)); let module_semaphores = configuration let module_semaphores: HashMap<String, Arc<Semaphore>> = configuration .modules .iter() .filter_map(|(name, config)| { Loading @@ -81,6 +82,12 @@ impl WorkerPool { }) .collect(); info!( global_limit = configuration.max_concurrent_workers, module_limits = ?module_semaphores.keys().collect::<Vec<_>>(), "worker pool created" ); WorkerPool { global_semaphore, module_semaphores, Loading @@ -106,13 +113,19 @@ impl WorkerPool { if let Some(semaphore) = self.module_semaphores.get(module_name) { let module_start = Instant::now(); module_permit = Some(semaphore.clone().acquire_owned().await.ok()?); let wait_duration = module_start.elapsed(); self.metrics.permit_wait_duration.record( module_start.elapsed().as_secs_f64(), wait_duration.as_secs_f64(), &[ KeyValue::new("scope", "module"), KeyValue::new("module", module_name.to_owned()), ], ); debug!( module = module_name, wait_ms = wait_duration.as_millis() as u64, "module permit acquired" ); attributes.push(KeyValue::new("scope", module_name.to_owned())); } Loading Loading
crates/indexer/src/engine.rs +22 −5 Original line number Diff line number Diff line Loading @@ -33,7 +33,7 @@ use futures::StreamExt; use opentelemetry::KeyValue; use thiserror::Error; use tokio_util::sync::CancellationToken; use tracing::warn; use tracing::{debug, info, warn}; use crate::configuration::EngineConfiguration; use crate::destination::Destination; Loading Loading @@ -185,9 +185,12 @@ impl Engine { } async fn listen(&self, topic: Topic, worker_pool: Arc<WorkerPool>) -> Result<(), EngineError> { let topic_name = format!("{}.{}", topic.stream, topic.subject); info!(topic = %topic_name, "topic listener starting"); let mut subscription = self.broker.subscribe(&topic, self.metrics.clone()).await?; let mut inflight = tokio::task::JoinSet::new(); let topic_label = KeyValue::new("topic", format!("{}.{}", topic.stream, topic.subject)); let topic_label = KeyValue::new("topic", topic_name.clone()); loop { tokio::select! { Loading @@ -201,17 +204,20 @@ impl Engine { worker_pool.clone(), self.metrics.clone(), topic_label.clone(), topic_name.clone(), )); } } } let drained = inflight.len(); while let Some(result) = inflight.join_next().await { if let Err(error) = result { warn!(%error, "message processing task panicked"); warn!(%error, topic = %topic_name, "message processing task panicked"); } } info!(topic = %topic_name, drained, "topic listener stopped"); Ok(()) } Loading @@ -230,7 +236,17 @@ async fn process_message( worker_pool: Arc<WorkerPool>, metrics: Arc<EngineMetrics>, topic_label: KeyValue, topic_name: String, ) { let message_id = message.envelope.id.0.clone(); let attempt = message.envelope.attempt; debug!(topic = %topic_name, %message_id, attempt, "message received"); if attempt > 1 { info!(topic = %topic_name, %message_id, attempt, "message retry received"); } let message_start = Instant::now(); let result = run_handlers(&handlers, &context, &message, &worker_pool, &metrics).await; Loading @@ -238,13 +254,14 @@ async fn process_message( let outcome = match result { Ok(()) => { if let Err(error) = message.ack().await { warn!(%error, "failed to ack message"); warn!(%error, %message_id, "failed to ack message"); } "ack" } Err(_) => { info!(topic = %topic_name, %message_id, "message nacked, handler failure"); if let Err(error) = message.nack().await { warn!(%error, "failed to nack message"); warn!(%error, %message_id, "failed to nack message"); } "nack" } Loading
crates/indexer/src/locking.rs +18 −5 Original line number Diff line number Diff line use std::time::Duration; use async_trait::async_trait; use tracing::debug; #[derive(Debug, thiserror::Error)] pub enum LockError { Loading Loading @@ -37,18 +38,30 @@ impl LockService for NatsLockService { .kv_put(INDEXING_LOCKS_BUCKET, key, Bytes::new(), options) .await { Ok(KvPutResult::Success(_)) => Ok(true), Ok(KvPutResult::AlreadyExists | KvPutResult::RevisionMismatch) => Ok(false), Err(e) => Err(LockError::Backend(e.to_string())), Ok(KvPutResult::Success(_)) => { debug!(key, "lock acquired"); Ok(true) } Ok(KvPutResult::AlreadyExists | KvPutResult::RevisionMismatch) => { debug!(key, "lock contention, already held"); Ok(false) } Err(e) => { debug!(key, error = %e, "lock acquisition error"); Err(LockError::Backend(e.to_string())) } } } async fn release(&self, key: &str) -> Result<(), LockError> { use crate::modules::sdlc::locking::INDEXING_LOCKS_BUCKET; self.nats let result = self .nats .kv_delete(INDEXING_LOCKS_BUCKET, key) .await .map_err(|e| LockError::Backend(e.to_string())) .map_err(|e| LockError::Backend(e.to_string())); debug!(key, "lock released"); result } }
crates/indexer/src/nats/broker.rs +28 −3 Original line number Diff line number Diff line Loading @@ -19,6 +19,8 @@ use tokio::task::JoinHandle; use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; use crate::metrics::EngineMetrics; use crate::types::{Envelope, MessageId, Topic}; Loading Loading @@ -63,6 +65,7 @@ impl NatsBroker { } pub async fn shutdown(self) { info!("broker shutdown initiated"); self.cancellation_token.cancel(); let handles: Vec<_> = self.subscription_handles.lock().drain(..).collect(); for handle in handles { Loading Loading @@ -122,6 +125,8 @@ impl NatsBroker { message: e.to_string(), })?; info!(bucket, "KV bucket ready"); let mut cache = self.kv_stores.write().await; cache.insert(bucket.to_string(), store); Ok(()) Loading @@ -133,14 +138,17 @@ impl NatsBroker { subjects: Vec<String>, ) -> Result<Stream, NatsError> { match self.get_stream(stream_name).await { Ok(stream) => return Ok(stream), Ok(stream) => { info!(stream = %stream_name, "stream found"); return Ok(stream); } Err(NatsError::StreamNotFound { .. }) => {} Err(e) => return Err(e), } let stream_config = async_nats::jetstream::stream::Config { name: stream_name.to_string(), subjects, subjects: subjects.clone(), num_replicas: self.config.stream_replicas, max_age: self.config.stream_max_age().unwrap_or_default(), max_bytes: self.config.stream_max_bytes.unwrap_or(-1), Loading @@ -159,6 +167,8 @@ impl NatsBroker { source: e, })?; info!(stream = %stream_name, ?subjects, "stream created"); let mut cache = self.streams.write().await; cache.insert(stream_name.clone(), stream.clone()); Ok(stream) Loading Loading @@ -277,10 +287,21 @@ impl NatsBroker { let stream = self.get_stream(&topic.stream).await?; let consumer = self.get_or_create_consumer(&stream, &topic.subject).await?; let consumer_type = match &self.config.consumer_name { Some(name) => format!("durable({})", name), None => "ephemeral".to_string(), }; let batch_size = self.config.batch_size(); info!( topic = %format!("{}.{}", topic.stream, topic.subject), consumer_type, batch_size, "subscription started" ); let (sender, receiver) = tokio::sync::mpsc::channel(self.config.subscription_buffer_size()); let cancel_token = self.cancellation_token.clone(); let batch_size = self.config.batch_size(); let handle = tokio::spawn(async move { loop { Loading @@ -292,6 +313,7 @@ impl NatsBroker { let batch = match consumer.fetch().max_messages(batch_size).messages().await { Ok(batch) => batch, Err(e) => { warn!(error = %e, "fetch batch error"); metrics.nats_fetch_duration.record( fetch_start.elapsed().as_secs_f64(), &[opentelemetry::KeyValue::new("outcome", "error")], Loading @@ -308,11 +330,13 @@ impl NatsBroker { tokio::pin!(batch); let mut batch_count: usize = 0; while let Some(result) = batch.next().await { if cancel_token.is_cancelled() { break; } batch_count += 1; let converted = match result { Ok(msg) => Self::convert_message(msg), Err(e) => Err(map_subscribe_error(e)), Loading @@ -322,6 +346,7 @@ impl NatsBroker { return; } } debug!(count = batch_count, "batch fetched"); } }); Loading
crates/indexer/src/worker_pool.rs +15 −2 Original line number Diff line number Diff line Loading @@ -23,6 +23,7 @@ use std::time::Instant; use opentelemetry::KeyValue; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tracing::{debug, info}; use crate::configuration::EngineConfiguration; use crate::metrics::EngineMetrics; Loading Loading @@ -71,7 +72,7 @@ impl WorkerPool { pub fn new(configuration: &EngineConfiguration, metrics: Arc<EngineMetrics>) -> Self { let global_semaphore = Arc::new(Semaphore::new(configuration.max_concurrent_workers)); let module_semaphores = configuration let module_semaphores: HashMap<String, Arc<Semaphore>> = configuration .modules .iter() .filter_map(|(name, config)| { Loading @@ -81,6 +82,12 @@ impl WorkerPool { }) .collect(); info!( global_limit = configuration.max_concurrent_workers, module_limits = ?module_semaphores.keys().collect::<Vec<_>>(), "worker pool created" ); WorkerPool { global_semaphore, module_semaphores, Loading @@ -106,13 +113,19 @@ impl WorkerPool { if let Some(semaphore) = self.module_semaphores.get(module_name) { let module_start = Instant::now(); module_permit = Some(semaphore.clone().acquire_owned().await.ok()?); let wait_duration = module_start.elapsed(); self.metrics.permit_wait_duration.record( module_start.elapsed().as_secs_f64(), wait_duration.as_secs_f64(), &[ KeyValue::new("scope", "module"), KeyValue::new("module", module_name.to_owned()), ], ); debug!( module = module_name, wait_ms = wait_duration.as_millis() as u64, "module permit acquired" ); attributes.push(KeyValue::new("scope", module_name.to_owned())); } Loading