Verified Commit bbc5aac2 authored by Jean-Gabriel Doyon's avatar Jean-Gabriel Doyon Committed by GitLab
Browse files

feat(indexer): add dead letter queue for exhausted retry messages

parent 6a0231a5
Loading
Loading
Loading
Loading
+4 −1
Original line number Diff line number Diff line
@@ -187,7 +187,10 @@ Three error types, nested:
- `HandlerError` has `Processing(String)` and `Deserialization(serde_json::Error)`
- `BrokerError` has variants for publish, subscribe, ack, nack, connection issues, etc.

When a handler returns an error, the engine nacks the message so the broker can redeliver it (if retries are configured for that handler).
When a handler returns an error, the engine nacks the message so the broker can redeliver it (if retries are configured for that handler). When retries are exhausted, the outcome depends on the topic:

- **External topics** (e.g. Siphon CDC): the message is published to the `GKG_DEAD_LETTERS` stream for inspection and replay, then acked. If the DLQ publish fails, the message is nacked for redelivery instead.
- **Owned topics** (internal dispatch): the message is term-acked, since it will be regenerated on the next dispatch cycle.

`IndexerError` wraps top-level failures: NATS connection, ClickHouse connection, engine errors, and handler initialization.

+72 −0
Original line number Diff line number Diff line
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

use crate::types::{Envelope, Topic};

pub const DEAD_LETTER_STREAM: &str = "GKG_DEAD_LETTERS";
pub const DEAD_LETTER_SUBJECT_PREFIX: &str = "dlq";

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeadLetterEnvelope {
    pub original_subject: String,
    pub original_stream: String,
    pub original_payload: serde_json::Value,
    pub original_message_id: String,
    pub original_timestamp: DateTime<Utc>,
    pub failed_at: DateTime<Utc>,
    pub attempts: u32,
    pub last_error: String,
}

impl DeadLetterEnvelope {
    pub fn new(original_topic: &Topic, envelope: &Envelope, error: &str) -> Self {
        let original_payload = serde_json::from_slice(&envelope.payload).unwrap_or_else(|_| {
            serde_json::Value::String(String::from_utf8_lossy(&envelope.payload).into_owned())
        });

        Self {
            original_subject: original_topic.subject.to_string(),
            original_stream: original_topic.stream.to_string(),
            original_payload,
            original_message_id: envelope.id.0.to_string(),
            original_timestamp: envelope.timestamp,
            failed_at: Utc::now(),
            attempts: envelope.attempt,
            last_error: error.to_string(),
        }
    }
}

pub fn dead_letter_subject(topic: &Topic) -> String {
    format!(
        "{}.{}.{}",
        DEAD_LETTER_SUBJECT_PREFIX, topic.stream, topic.subject
    )
}

pub fn dead_letter_topic(topic: &Topic) -> Topic {
    Topic::owned(DEAD_LETTER_STREAM, dead_letter_subject(topic))
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn dead_letter_subject_formats_correctly() {
        let topic = Topic::external("siphon_db", "tables.merge_requests");
        assert_eq!(
            dead_letter_subject(&topic),
            "dlq.siphon_db.tables.merge_requests"
        );
    }

    #[test]
    fn dead_letter_topic_points_to_dlq_stream() {
        let topic = Topic::external("siphon_db", "tables.users");
        let dlq = dead_letter_topic(&topic);
        assert_eq!(&*dlq.stream, DEAD_LETTER_STREAM);
        assert_eq!(&*dlq.subject, "dlq.siphon_db.tables.users");
        assert!(dlq.owned);
    }
}
+34 −13
Original line number Diff line number Diff line
@@ -43,7 +43,7 @@ use crate::destination::Destination;
use crate::handler::{Handler, HandlerContext, HandlerError, HandlerRegistry};
use crate::locking::{LockService, NatsLockService};
use crate::metrics::EngineMetrics;
use crate::nats::{NatsBroker, NatsError, NatsMessage, NatsServices, NatsServicesImpl};
use crate::nats::{DlqResult, NatsBroker, NatsError, NatsMessage, NatsServices, NatsServicesImpl};
use crate::types::{Envelope, Topic};
use crate::worker_pool::WorkerPool;

@@ -209,7 +209,9 @@ impl Engine {
                        message,
                        self.registry.handlers_for(&topic),
                        HandlerContext::new(self.destination.clone(), self.nats_services.clone(), self.lock_service.clone(), progress),
                        self.broker.clone(),
                        runtime.clone(),
                        topic.clone(),
                        topic_name.clone(),
                    ));
                }
@@ -254,10 +256,11 @@ impl Engine {
    }
}

#[derive(Debug)]
enum HandlersOutcome {
    Success,
    Failed { retry_delay: Option<Duration> },
    TerminalFailure,
    Exhausted { error: String },
}

struct EngineRuntime {
@@ -269,7 +272,9 @@ async fn process_message(
    message: NatsMessage,
    handlers: Vec<Arc<dyn Handler>>,
    context: HandlerContext,
    broker: Arc<NatsBroker>,
    runtime: Arc<EngineRuntime>,
    topic: Topic,
    topic_name: String,
) {
    let message_id = message.envelope.id.0.clone();
@@ -301,9 +306,11 @@ async fn process_message(
                %message_id,
                attempt,
                panic = %panic_message,
                "handler panicked, term-acking to free subject slot"
                "handler panicked"
            );
            HandlersOutcome::TerminalFailure
            HandlersOutcome::Exhausted {
                error: format!("handler panicked: {panic_message}"),
            }
        }
    };

@@ -325,11 +332,18 @@ async fn process_message(
            }
            "nack"
        }
        HandlersOutcome::TerminalFailure => {
            if let Err(error) = message.term_ack().await {
                warn!(%error, %message_id, "failed to term-ack message");
        HandlersOutcome::Exhausted { error } => {
            if topic.owned {
                if let Err(term_error) = message.term_ack().await {
                    warn!(%term_error, %message_id, "failed to term-ack exhausted message");
                }
                "term"
            } else {
                match message.to_dlq(&broker, &topic, &error).await {
                    DlqResult::Published => "dead_letter",
                    DlqResult::Nacked => "nack",
                }
            }
        }
    };

@@ -379,15 +393,17 @@ async fn run_handlers(

            if let Some(max_attempts) = max_attempts {
                if envelope.attempt >= max_attempts {
                    error!(
                    warn!(
                        handler = handler.name(),
                        message_id = %envelope.id.0,
                        attempt = envelope.attempt,
                        %max_attempts,
                        %error,
                        "max attempts reached, term-acking message"
                        "retry attempts exhausted"
                    );
                    return HandlersOutcome::TerminalFailure;
                    return HandlersOutcome::Exhausted {
                        error: error.to_string(),
                    };
                }

                let retry_delay = handler_config.retry_interval();
@@ -465,7 +481,7 @@ mod tests {
    }

    #[tokio::test]
    async fn handler_failure_at_retry_limit_returns_terminal_failure() {
    async fn handler_failure_at_retry_limit_returns_exhausted() {
        let configuration = EngineConfiguration::default();

        let handler = MockHandler::new("stream", "subject")
@@ -481,7 +497,12 @@ mod tests {
        let runtime = test_runtime(&configuration);
        let outcome = run_handlers(&handlers, &test_context(), &envelope, &runtime).await;

        assert!(matches!(outcome, HandlersOutcome::TerminalFailure));
        match outcome {
            HandlersOutcome::Exhausted { error } => {
                assert!(error.contains("boom"));
            }
            other => panic!("expected Exhausted, got {other:?}"),
        }
    }

    #[tokio::test]
+1 −0
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@
pub mod checkpoint;
pub mod clickhouse;
pub mod configuration;
pub mod dead_letter;
pub mod destination;
pub mod engine;
pub(crate) mod env;
+40 −0
Original line number Diff line number Diff line
@@ -21,6 +21,9 @@ use tokio_util::sync::CancellationToken;

use tracing::{debug, info, warn};

use crate::dead_letter::{
    DEAD_LETTER_STREAM, DEAD_LETTER_SUBJECT_PREFIX, DeadLetterEnvelope, dead_letter_subject,
};
use crate::metrics::EngineMetrics;
use crate::types::{Envelope, MessageId, Topic};

@@ -121,6 +124,43 @@ impl NatsBroker {
            self.get_stream(stream_name).await?;
        }

        self.ensure_dead_letter_stream().await?;

        Ok(())
    }

    async fn ensure_dead_letter_stream(&self) -> Result<(), NatsError> {
        let stream_name: Arc<str> = Arc::from(DEAD_LETTER_STREAM);
        let subject = format!("{}.>", DEAD_LETTER_SUBJECT_PREFIX);
        self.create_or_update_stream(&stream_name, vec![subject])
            .await?;
        Ok(())
    }

    pub async fn publish_dead_letter(
        &self,
        original_topic: &Topic,
        envelope: &Envelope,
        error: &str,
    ) -> Result<(), NatsError> {
        let dead_letter = DeadLetterEnvelope::new(original_topic, envelope, error);

        let payload = serde_json::to_vec(&dead_letter)
            .map(Bytes::from)
            .map_err(|error| {
                NatsError::Publish(format!("failed to serialize dead letter: {error}"))
            })?;

        let subject = dead_letter_subject(original_topic);
        self.jetstream
            .publish(subject.clone(), payload)
            .await
            .map_err(|error| {
                NatsError::Publish(format!(
                    "failed to publish dead letter to '{subject}': {error}"
                ))
            })?;

        Ok(())
    }

Loading