Verified Commit 0c602453 authored by Jean-Gabriel Doyon's avatar Jean-Gabriel Doyon Committed by GitLab
Browse files

refactor(indexer): rename Topic to Subscription with explicit options

parent 90f70e59
Loading
Loading
Loading
Loading
+7 −7
Original line number Diff line number Diff line
@@ -13,7 +13,7 @@ The crate has two layers:
1. **Engine** - generic message routing, concurrency control, and destination abstraction
2. **Domain modules** - the actual indexing logic for SDLC entities and code

The engine subscribes to NATS topics, dispatches messages to handlers, and acks or nacks based on the handler result. Domain modules provide the handlers that transform GitLab events into property graph records.
The engine subscribes to NATS JetStream subscriptions, dispatches messages to handlers, and acks or nacks based on the handler result. Domain modules provide the handlers that transform GitLab events into property graph records.

## Quick start

@@ -44,7 +44,7 @@ Indexes git repositories via the Rails internal API. Fetches archives on code in

### Handlers

A handler listens to one topic and processes messages from it. Return `Ok(())` to ack, return an error to nack (the message gets redelivered). Each handler provides its own `engine_config()` controlling retry policy and concurrency group.
A handler listens on one subscription and processes messages from it. Return `Ok(())` to ack, return an error to nack (the message gets redelivered). Each handler provides its own `engine_config()` controlling retry policy and concurrency group.

```rust
pub struct UserCreatedHandler;
@@ -55,8 +55,8 @@ impl Handler for UserCreatedHandler {
        "user-created"
    }

    fn topic(&self) -> Topic {
        Topic::new("users", "user.created")
    fn subscription(&self) -> Subscription {
        Subscription::new("users", "user.created")
    }

    fn engine_config(&self) -> &HandlerConfiguration {
@@ -187,10 +187,10 @@ Three error types, nested:
- `HandlerError` has `Processing(String)` and `Deserialization(serde_json::Error)`
- `BrokerError` has variants for publish, subscribe, ack, nack, connection issues, etc.

When a handler returns an error, the engine nacks the message so the broker can redeliver it (if retries are configured for that handler). When retries are exhausted, the outcome depends on the topic:
When a handler returns an error, the engine nacks the message so the broker can redeliver it (if retries are configured for that handler). When retries are exhausted, the outcome depends on the subscription's `dead_letter_on_exhaustion` setting:

- **External topics** (e.g. Siphon CDC): the message is published to the `GKG_DEAD_LETTERS` stream for inspection and replay, then acked. If the DLQ publish fails, the message is nacked for redelivery instead.
- **Owned topics** (internal dispatch): the message is term-acked, since it will be regenerated on the next dispatch cycle.
- **`dead_letter_on_exhaustion: true`** (e.g. Siphon CDC subscriptions): the message is published to the `GKG_DEAD_LETTERS` stream for inspection and replay, then acked. If the DLQ publish fails, the message is nacked for redelivery instead.
- **`dead_letter_on_exhaustion: false`** (default, used by internal dispatch subscriptions): the message is term-acked, since it will be regenerated on the next dispatch cycle.

`IndexerError` wraps top-level failures: NATS connection, ClickHouse connection, engine errors, and handler initialization.

+14 −14
Original line number Diff line number Diff line
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

use crate::types::{Envelope, Topic};
use crate::types::{Envelope, Subscription};

pub const DEAD_LETTER_STREAM: &str = "GKG_DEAD_LETTERS";
pub const DEAD_LETTER_SUBJECT_PREFIX: &str = "dlq";
@@ -19,14 +19,14 @@ pub struct DeadLetterEnvelope {
}

impl DeadLetterEnvelope {
    pub fn new(original_topic: &Topic, envelope: &Envelope, error: &str) -> Self {
    pub fn new(original_subscription: &Subscription, envelope: &Envelope, error: &str) -> Self {
        let original_payload = serde_json::from_slice(&envelope.payload).unwrap_or_else(|_| {
            serde_json::Value::String(String::from_utf8_lossy(&envelope.payload).into_owned())
        });

        Self {
            original_subject: original_topic.subject.to_string(),
            original_stream: original_topic.stream.to_string(),
            original_subject: original_subscription.subject.to_string(),
            original_stream: original_subscription.stream.to_string(),
            original_payload,
            original_message_id: envelope.id.0.to_string(),
            original_timestamp: envelope.timestamp,
@@ -37,15 +37,15 @@ impl DeadLetterEnvelope {
    }
}

pub fn dead_letter_subject(topic: &Topic) -> String {
pub fn dead_letter_subject(subscription: &Subscription) -> String {
    format!(
        "{}.{}.{}",
        DEAD_LETTER_SUBJECT_PREFIX, topic.stream, topic.subject
        DEAD_LETTER_SUBJECT_PREFIX, subscription.stream, subscription.subject
    )
}

pub fn dead_letter_topic(topic: &Topic) -> Topic {
    Topic::owned(DEAD_LETTER_STREAM, dead_letter_subject(topic))
pub fn dead_letter_subscription(subscription: &Subscription) -> Subscription {
    Subscription::new(DEAD_LETTER_STREAM, dead_letter_subject(subscription))
}

#[cfg(test)]
@@ -54,19 +54,19 @@ mod tests {

    #[test]
    fn dead_letter_subject_formats_correctly() {
        let topic = Topic::external("siphon_db", "tables.merge_requests");
        let subscription = Subscription::new("siphon_db", "tables.merge_requests");
        assert_eq!(
            dead_letter_subject(&topic),
            dead_letter_subject(&subscription),
            "dlq.siphon_db.tables.merge_requests"
        );
    }

    #[test]
    fn dead_letter_topic_points_to_dlq_stream() {
        let topic = Topic::external("siphon_db", "tables.users");
        let dlq = dead_letter_topic(&topic);
    fn dead_letter_subscription_points_to_dlq_stream() {
        let subscription = Subscription::new("siphon_db", "tables.users");
        let dlq = dead_letter_subscription(&subscription);
        assert_eq!(&*dlq.stream, DEAD_LETTER_STREAM);
        assert_eq!(&*dlq.subject, "dlq.siphon_db.tables.users");
        assert!(dlq.owned);
        assert!(dlq.manage_stream);
    }
}
+26 −22
Original line number Diff line number Diff line
@@ -44,7 +44,7 @@ use crate::handler::{Handler, HandlerContext, HandlerError, HandlerRegistry};
use crate::locking::{LockService, NatsLockService};
use crate::metrics::EngineMetrics;
use crate::nats::{DlqResult, NatsBroker, NatsError, NatsMessage, NatsServices, NatsServicesImpl};
use crate::types::{Envelope, Topic};
use crate::types::{Envelope, Subscription};
use crate::worker_pool::WorkerPool;

/// Errors that can occur during engine operation.
@@ -167,51 +167,55 @@ impl Engine {
    ///
    /// Returns when stopped via [`Engine::stop`] or when all subscriptions end.
    pub async fn run(&self, configuration: &EngineConfiguration) -> Result<(), EngineError> {
        let topics = self.registry.topics();
        if topics.is_empty() {
        let subscriptions = self.registry.subscriptions();
        if subscriptions.is_empty() {
            return Ok(());
        }

        self.validate_concurrency_groups(configuration)?;

        self.broker.ensure_streams(&topics).await?;
        self.broker.ensure_streams(&subscriptions).await?;

        let runtime = Arc::new(EngineRuntime {
            worker_pool: WorkerPool::new(configuration, self.metrics.clone()),
            metrics: self.metrics.clone(),
        });
        let tasks: Vec<_> = topics
        let tasks: Vec<_> = subscriptions
            .into_iter()
            .map(|topic| self.listen(topic, runtime.clone()))
            .map(|subscription| self.listen(subscription, runtime.clone()))
            .collect();
        futures::future::try_join_all(tasks).await?;

        Ok(())
    }

    async fn listen(&self, topic: Topic, runtime: Arc<EngineRuntime>) -> Result<(), EngineError> {
        let topic_name = format!("{}.{}", topic.stream, topic.subject);
    async fn listen(
        &self,
        subscription: Subscription,
        runtime: Arc<EngineRuntime>,
    ) -> Result<(), EngineError> {
        let topic_name = format!("{}.{}", subscription.stream, subscription.subject);
        info!(topic = %topic_name, "topic listener starting");

        let mut subscription = self
        let mut messages = self
            .broker
            .subscribe(&topic, runtime.metrics.clone())
            .subscribe(&subscription, runtime.metrics.clone())
            .await?;
        let mut inflight = tokio::task::JoinSet::new();

        loop {
            tokio::select! {
                _ = self.cancel.cancelled() => break,
                Some(message) = subscription.next() => {
                Some(message) = messages.next() => {
                    let message = message?;
                    let progress = message.progress_notifier();
                    inflight.spawn(process_message(
                        message,
                        self.registry.handlers_for(&topic),
                        self.registry.handlers_for(&subscription),
                        HandlerContext::new(self.destination.clone(), self.nats_services.clone(), self.lock_service.clone(), progress),
                        self.broker.clone(),
                        runtime.clone(),
                        topic.clone(),
                        subscription.clone(),
                        topic_name.clone(),
                    ));
                }
@@ -233,8 +237,8 @@ impl Engine {
        &self,
        configuration: &EngineConfiguration,
    ) -> Result<(), EngineError> {
        for topic in &self.registry.topics() {
            for handler in self.registry.handlers_for(topic) {
        for subscription in &self.registry.subscriptions() {
            for handler in self.registry.handlers_for(subscription) {
                if let Some(group) = &handler.engine_config().concurrency_group
                    && !configuration.concurrency_groups.contains_key(group)
                {
@@ -274,7 +278,7 @@ async fn process_message(
    context: HandlerContext,
    broker: Arc<NatsBroker>,
    runtime: Arc<EngineRuntime>,
    topic: Topic,
    subscription: Subscription,
    topic_name: String,
) {
    let message_id = message.envelope.id.0.clone();
@@ -333,16 +337,16 @@ async fn process_message(
            "nack"
        }
        HandlersOutcome::Exhausted { error } => {
            if topic.owned {
            if subscription.dead_letter_on_exhaustion {
                match message.to_dlq(&broker, &subscription, &error).await {
                    DlqResult::Published => "dead_letter",
                    DlqResult::Nacked => "nack",
                }
            } else {
                if let Err(term_error) = message.term_ack().await {
                    warn!(%term_error, %message_id, "failed to term-ack exhausted message");
                }
                "term"
            } else {
                match message.to_dlq(&broker, &topic, &error).await {
                    DlqResult::Published => "dead_letter",
                    DlqResult::Nacked => "nack",
                }
            }
        }
    };
+34 −29
Original line number Diff line number Diff line
@@ -2,7 +2,7 @@
//!
//! ```ignore
//! use etl_engine::handler::{Handler, HandlerContext, HandlerError};
//! use etl_engine::types::{Envelope, Topic};
//! use etl_engine::types::{Envelope, Subscription};
//! use async_trait::async_trait;
//!
//! struct MyHandler;
@@ -10,7 +10,7 @@
//! #[async_trait]
//! impl Handler for MyHandler {
//!     fn name(&self) -> &str { "my-handler" }
//!     fn topic(&self) -> Topic { Topic::owned("my-stream", "my-subject") }
//!     fn subscription(&self) -> Subscription { Subscription::new("my-stream", "my-subject") }
//!
//!     async fn handle(&self, ctx: HandlerContext, msg: Envelope) -> Result<(), HandlerError> {
//!         // ctx.destination has your writers
@@ -32,7 +32,7 @@ use crate::{
    destination::Destination,
    locking::LockService,
    nats::{NatsServices, ProgressNotifier},
    types::{Envelope, Topic},
    types::{Envelope, Subscription},
};

/// Errors that can occur during message handling.
@@ -116,8 +116,8 @@ pub trait Handler: Send + Sync {
    /// Used for metrics labeling, config lookup, and debugging. Should be a stable identifier.
    fn name(&self) -> &str;

    /// Returns the topic this handler subscribes to.
    fn topic(&self) -> Topic;
    /// Returns the subscription this handler listens on.
    fn subscription(&self) -> Subscription;

    /// Returns the engine configuration for this handler (retry policy, concurrency group).
    ///
@@ -139,39 +139,44 @@ pub trait Handler: Send + Sync {
/// for the engine to dispatch messages.
#[derive(Default)]
pub struct HandlerRegistry {
    handlers_by_topic: RwLock<HashMap<Topic, Vec<Arc<dyn Handler>>>>,
    handlers_by_subscription: RwLock<HashMap<Subscription, Vec<Arc<dyn Handler>>>>,
}

impl HandlerRegistry {
    /// Registers a handler, adding it to the registry under its topic.
    /// Registers a handler, adding it to the registry under its subscription.
    pub fn register_handler(&self, handler: Box<dyn Handler>) {
        let mut handlers_by_topic = self.handlers_by_topic.write();
        let topic = handler.topic();
        handlers_by_topic
            .entry(topic)
        let mut handlers_by_subscription = self.handlers_by_subscription.write();
        let subscription = handler.subscription();
        handlers_by_subscription
            .entry(subscription)
            .or_default()
            .push(Arc::from(handler));
    }

    /// Returns all handlers registered for a given topic.
    pub fn handlers_for(&self, topic: &Topic) -> Vec<Arc<dyn Handler>> {
        self.handlers_by_topic
    /// Returns all handlers registered for a given subscription.
    pub fn handlers_for(&self, subscription: &Subscription) -> Vec<Arc<dyn Handler>> {
        self.handlers_by_subscription
            .read()
            .get(topic)
            .get(subscription)
            .cloned()
            .unwrap_or_default()
    }

    /// Returns all unique topics that have registered handlers.
    pub fn topics(&self) -> Vec<Topic> {
        let mut topics: Vec<_> = self.handlers_by_topic.read().keys().cloned().collect();
        topics.sort_by(|a, b| (&*a.stream, &*a.subject).cmp(&(&*b.stream, &*b.subject)));
        topics
    /// Returns all unique subscriptions that have registered handlers.
    pub fn subscriptions(&self) -> Vec<Subscription> {
        let mut subscriptions: Vec<_> = self
            .handlers_by_subscription
            .read()
            .keys()
            .cloned()
            .collect();
        subscriptions.sort_by(|a, b| (&*a.stream, &*a.subject).cmp(&(&*b.stream, &*b.subject)));
        subscriptions
    }

    /// Finds a handler by name across all topics.
    /// Finds a handler by name across all subscriptions.
    pub fn find_by_name(&self, name: &str) -> Option<Arc<dyn Handler>> {
        self.handlers_by_topic
        self.handlers_by_subscription
            .read()
            .values()
            .flatten()
@@ -192,14 +197,14 @@ mod tests {
        registry.register_handler(Box::new(MockHandler::new("stream1", "subject1")));
        registry.register_handler(Box::new(MockHandler::new("stream1", "subject1")));

        let topic = Topic::owned("stream1", "subject1");
        let handlers = registry.handlers_for(&topic);
        let subscription = Subscription::new("stream1", "subject1");
        let handlers = registry.handlers_for(&subscription);
        assert_eq!(handlers.len(), 2);

        let unknown = Topic::owned("unknown", "unknown");
        let unknown = Subscription::new("unknown", "unknown");
        assert!(registry.handlers_for(&unknown).is_empty());

        assert_eq!(registry.topics(), vec![topic]);
        assert_eq!(registry.subscriptions(), vec![subscription]);
    }

    #[tokio::test]
@@ -210,9 +215,9 @@ mod tests {
        registry.register_handler(Box::new(MockHandler::new("stream", "s1")));
        registry.register_handler(Box::new(MockHandler::new("stream", "s2")));

        let t0 = Topic::owned("stream", "s0");
        let t1 = Topic::owned("stream", "s1");
        let t2 = Topic::owned("stream", "s2");
        let t0 = Subscription::new("stream", "s0");
        let t1 = Subscription::new("stream", "s1");
        let t2 = Subscription::new("stream", "s2");

        let handles: Vec<_> = (0..50)
            .map(|_| {
+4 −1
Original line number Diff line number Diff line
@@ -152,7 +152,10 @@ pub async fn run(
    info!("initializing Namespace Deletion handler");
    modules::namespace_deletion::register_handlers(&registry, config, &ontology)?;

    info!(topics = registry.topics().len(), "registered handlers");
    info!(
        subscriptions = registry.subscriptions().len(),
        "registered handlers"
    );

    let gitlab_client = config
        .gitlab
Loading