Verified Commit fcca027c authored by Bohdan Parkhomchuk's avatar Bohdan Parkhomchuk 💬 Committed by GitLab
Browse files

feat(indexer): add mTLS support for NATS client

parent 2ccd31ee
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -3611,6 +3611,7 @@ dependencies = [
 "rustc-hash",
 "serde",
 "serde_json",
 "serde_yaml",
 "sha2",
 "siphon-proto",
 "smallvec",
+3 −0
Original line number Diff line number Diff line
@@ -6,6 +6,9 @@ nats:
  url: "localhost:4222"
  # username: ""
  # password: ""
  # tls_ca_cert_path: "/etc/nats/ca.pem"
  # tls_cert_path: "/etc/nats/client.pem"
  # tls_key_path: "/etc/nats/client-key.pem"

datalake:
  url: "http://127.0.0.1:8123"
+1 −0
Original line number Diff line number Diff line
@@ -49,6 +49,7 @@ zstd = "0.13.3"
[dev-dependencies]
internment = { workspace = true }
parser-core = { workspace = true }
serde_yaml = { workspace = true }
smallvec = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "time", "test-util", "macros"] }
tokio-stream = { workspace = true }
+19 −1
Original line number Diff line number Diff line
//! NATS JetStream message broker.

use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

@@ -52,9 +53,13 @@ pub struct NatsBroker {

impl NatsBroker {
    pub async fn connect(config: &NatsConfiguration) -> Result<Self, NatsError> {
        config
            .validate_tls_config()
            .map_err(NatsError::Connection)?;

        let connect_options = Self::build_connect_options(config);

        let url = format!("nats://{}", config.url);
        let url = config.connection_url();
        let client = async_nats::connect_with_options(&url, connect_options)
            .await
            .map_err(map_connect_error)?;
@@ -85,6 +90,7 @@ impl NatsBroker {
        &self.client
    }

    /// Builds connect options. Must be called after `validate_tls_config()`.
    fn build_connect_options(config: &NatsConfiguration) -> async_nats::ConnectOptions {
        let mut options = async_nats::ConnectOptions::new()
            .connection_timeout(config.connection_timeout())
@@ -94,6 +100,18 @@ impl NatsBroker {
            options = options.user_and_password(user.clone(), pass.clone());
        }

        if config.tls_enabled() {
            options = options.require_tls(true);
        }

        if let Some(ca_path) = &config.tls_ca_cert_path {
            options = options.add_root_certificates(PathBuf::from(ca_path));
        }

        if let (Some(cert), Some(key)) = (&config.tls_cert_path, &config.tls_key_path) {
            options = options.add_client_certificate(PathBuf::from(cert), PathBuf::from(key));
        }

        options
    }

+257 −0
Original line number Diff line number Diff line
//! NATS broker configuration.

use std::path::Path;
use std::time::Duration;

use serde::{Deserialize, Serialize};
@@ -23,6 +24,21 @@ pub struct NatsConfiguration {
    #[serde(default)]
    pub password: Option<String>,

    /// Path to CA certificate (PEM) for verifying the NATS server.
    /// Setting this enables TLS (connection uses `tls://` scheme).
    #[serde(default)]
    pub tls_ca_cert_path: Option<String>,

    /// Path to client certificate (PEM) for mTLS authentication.
    /// Must be paired with `tls_key_path`.
    #[serde(default)]
    pub tls_cert_path: Option<String>,

    /// Path to client private key (PEM) for mTLS authentication.
    /// Must be paired with `tls_cert_path`.
    #[serde(default)]
    pub tls_key_path: Option<String>,

    /// Connection timeout in seconds. Defaults to 10.
    #[serde(default = "NatsConfiguration::default_connection_timeout_secs")]
    pub connection_timeout_secs: u64,
@@ -126,6 +142,67 @@ impl NatsConfiguration {
        1
    }

    /// Returns true when TLS is configured — either via cert paths or a `tls://` url scheme.
    pub fn tls_enabled(&self) -> bool {
        self.url.starts_with("tls://")
            || self.tls_ca_cert_path.is_some()
            || self.tls_cert_path.is_some()
            || self.tls_key_path.is_some()
    }

    /// Returns the full connection URL with the appropriate scheme.
    ///
    /// Accepts `url` in any of these formats:
    /// - `"host:port"` — scheme derived from TLS config
    /// - `"nats://host:port"` — plaintext
    /// - `"tls://host:port"` — TLS required
    pub fn connection_url(&self) -> String {
        if self.url.starts_with("nats://") || self.url.starts_with("tls://") {
            return self.url.clone();
        }

        let scheme = if self.tls_enabled() { "tls" } else { "nats" };
        format!("{scheme}://{}", self.url)
    }

    /// Validates TLS configuration completeness and file existence.
    ///
    /// Returns `Ok(())` when:
    /// - No TLS paths are configured (plaintext), or
    /// - All configured paths point to existing files and cert/key form a complete pair.
    ///
    /// Returns `Err` when:
    /// - `tls_cert_path` is set without `tls_key_path` (or vice versa)
    /// - Any configured path points to a nonexistent file
    pub fn validate_tls_config(&self) -> Result<(), String> {
        if !self.tls_enabled() {
            return Ok(());
        }

        match (&self.tls_cert_path, &self.tls_key_path) {
            (Some(_), None) => {
                return Err("tls_cert_path is set but tls_key_path is missing".into());
            }
            (None, Some(_)) => {
                return Err("tls_key_path is set but tls_cert_path is missing".into());
            }
            _ => {}
        }

        let checks: [(&str, Option<&String>); 3] = [
            ("tls_ca_cert_path", self.tls_ca_cert_path.as_ref()),
            ("tls_cert_path", self.tls_cert_path.as_ref()),
            ("tls_key_path", self.tls_key_path.as_ref()),
        ];
        for (field, path) in checks {
            if let Some(p) = path.filter(|p| !Path::new(p.as_str()).exists()) {
                return Err(format!("{field}: file not found at '{p}'"));
            }
        }

        Ok(())
    }

    pub fn connection_timeout(&self) -> Duration {
        Duration::from_secs(self.connection_timeout_secs)
    }
@@ -163,12 +240,18 @@ impl NatsConfiguration {
    /// - `NATS_STREAM_REPLICAS`: Number of stream replicas (default: 1)
    /// - `NATS_STREAM_MAX_AGE_SECS`: Maximum age of messages in seconds
    /// - `NATS_STREAM_MAX_BYTES`: Maximum bytes per stream
    /// - `NATS_TLS_CA_CERT_PATH`: Path to CA certificate (PEM)
    /// - `NATS_TLS_CERT_PATH`: Path to client certificate (PEM)
    /// - `NATS_TLS_KEY_PATH`: Path to client private key (PEM)
    /// - `NATS_STREAM_MAX_MESSAGES`: Maximum messages per stream
    pub fn from_env() -> Self {
        Self {
            url: std::env::var("NATS_URL").unwrap_or_else(|_| "localhost:4222".into()),
            username: std::env::var("NATS_USERNAME").ok(),
            password: std::env::var("NATS_PASSWORD").ok(),
            tls_ca_cert_path: std::env::var("NATS_TLS_CA_CERT_PATH").ok(),
            tls_cert_path: std::env::var("NATS_TLS_CERT_PATH").ok(),
            tls_key_path: std::env::var("NATS_TLS_KEY_PATH").ok(),
            consumer_name: std::env::var("NATS_CONSUMER_NAME").ok(),
            auto_create_streams: env_var_or(
                "NATS_AUTO_CREATE_STREAMS",
@@ -189,6 +272,9 @@ impl Default for NatsConfiguration {
            url: "localhost:4222".to_string(),
            username: None,
            password: None,
            tls_ca_cert_path: None,
            tls_cert_path: None,
            tls_key_path: None,
            connection_timeout_secs: Self::default_connection_timeout_secs(),
            request_timeout_secs: Self::default_request_timeout_secs(),
            ack_wait_secs: Self::default_ack_wait_secs(),
@@ -204,3 +290,174 @@ impl Default for NatsConfiguration {
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::NamedTempFile;

    #[test]
    fn bare_host_defaults_to_nats_scheme() {
        let config = NatsConfiguration::default();
        assert_eq!(config.connection_url(), "nats://localhost:4222");
        assert!(!config.tls_enabled());
    }

    #[test]
    fn bare_host_uses_tls_scheme_when_ca_set() {
        let config = NatsConfiguration {
            tls_ca_cert_path: Some("/tmp/ca.pem".into()),
            ..Default::default()
        };
        assert_eq!(config.connection_url(), "tls://localhost:4222");
        assert!(config.tls_enabled());
    }

    #[test]
    fn bare_host_uses_tls_scheme_when_client_cert_set() {
        let config = NatsConfiguration {
            tls_cert_path: Some("/tmp/cert.pem".into()),
            tls_key_path: Some("/tmp/key.pem".into()),
            ..Default::default()
        };
        assert_eq!(config.connection_url(), "tls://localhost:4222");
    }

    #[test]
    fn nats_scheme_in_url_is_preserved() {
        let config = NatsConfiguration {
            url: "nats://my-nats:4222".into(),
            ..Default::default()
        };
        assert_eq!(config.connection_url(), "nats://my-nats:4222");
        assert!(!config.tls_enabled());
    }

    #[test]
    fn tls_scheme_in_url_enables_tls() {
        let config = NatsConfiguration {
            url: "tls://secure-nats:4222".into(),
            ..Default::default()
        };
        assert_eq!(config.connection_url(), "tls://secure-nats:4222");
        assert!(config.tls_enabled());
    }

    #[test]
    fn tls_scheme_in_url_not_duplicated_with_cert_paths() {
        let config = NatsConfiguration {
            url: "tls://secure-nats:4222".into(),
            tls_ca_cert_path: Some("/tmp/ca.pem".into()),
            ..Default::default()
        };
        assert_eq!(config.connection_url(), "tls://secure-nats:4222");
    }

    #[test]
    fn validate_no_tls_is_valid() {
        let config = NatsConfiguration::default();
        assert!(config.validate_tls_config().is_ok());
    }

    #[test]
    fn validate_ca_only_is_valid() {
        let ca_file = NamedTempFile::new().unwrap();
        let config = NatsConfiguration {
            tls_ca_cert_path: Some(ca_file.path().to_str().unwrap().into()),
            ..Default::default()
        };
        assert!(config.validate_tls_config().is_ok());
    }

    #[test]
    fn validate_full_mtls_is_valid() {
        let ca = NamedTempFile::new().unwrap();
        let cert = NamedTempFile::new().unwrap();
        let key = NamedTempFile::new().unwrap();
        let config = NatsConfiguration {
            tls_ca_cert_path: Some(ca.path().to_str().unwrap().into()),
            tls_cert_path: Some(cert.path().to_str().unwrap().into()),
            tls_key_path: Some(key.path().to_str().unwrap().into()),
            ..Default::default()
        };
        assert!(config.validate_tls_config().is_ok());
    }

    #[test]
    fn validate_cert_without_key_is_invalid() {
        let cert = NamedTempFile::new().unwrap();
        let config = NatsConfiguration {
            tls_cert_path: Some(cert.path().to_str().unwrap().into()),
            ..Default::default()
        };
        let err = config.validate_tls_config().unwrap_err();
        assert!(err.contains("tls_key_path is missing"), "{err}");
    }

    #[test]
    fn validate_key_without_cert_is_invalid() {
        let key = NamedTempFile::new().unwrap();
        let config = NatsConfiguration {
            tls_key_path: Some(key.path().to_str().unwrap().into()),
            ..Default::default()
        };
        let err = config.validate_tls_config().unwrap_err();
        assert!(err.contains("tls_cert_path is missing"), "{err}");
    }

    #[test]
    fn validate_missing_file_is_invalid() {
        let config = NatsConfiguration {
            tls_ca_cert_path: Some("/nonexistent/ca.pem".into()),
            ..Default::default()
        };
        let err = config.validate_tls_config().unwrap_err();
        assert!(err.contains("tls_ca_cert_path"), "{err}");
        assert!(err.contains("file not found"), "{err}");
    }

    #[test]
    fn validate_existing_ca_but_missing_cert_file_is_invalid() {
        let ca = NamedTempFile::new().unwrap();
        let key = NamedTempFile::new().unwrap();
        let config = NatsConfiguration {
            tls_ca_cert_path: Some(ca.path().to_str().unwrap().into()),
            tls_cert_path: Some("/nonexistent/cert.pem".into()),
            tls_key_path: Some(key.path().to_str().unwrap().into()),
            ..Default::default()
        };
        let err = config.validate_tls_config().unwrap_err();
        assert!(err.contains("tls_cert_path"), "{err}");
    }

    #[test]
    fn deserialize_with_tls_fields() {
        let yaml = r#"
            url: "localhost:4222"
            tls_ca_cert_path: "/etc/nats/ca.pem"
            tls_cert_path: "/etc/nats/client.pem"
            tls_key_path: "/etc/nats/client-key.pem"
        "#;
        let config: NatsConfiguration = serde_yaml::from_str(yaml).unwrap();
        assert_eq!(config.tls_ca_cert_path.as_deref(), Some("/etc/nats/ca.pem"));
        assert_eq!(
            config.tls_cert_path.as_deref(),
            Some("/etc/nats/client.pem")
        );
        assert_eq!(
            config.tls_key_path.as_deref(),
            Some("/etc/nats/client-key.pem")
        );
        assert!(config.tls_enabled());
    }

    #[test]
    fn deserialize_without_tls_fields_uses_defaults() {
        let yaml = r#"url: "localhost:4222""#;
        let config: NatsConfiguration = serde_yaml::from_str(yaml).unwrap();
        assert!(config.tls_ca_cert_path.is_none());
        assert!(config.tls_cert_path.is_none());
        assert!(config.tls_key_path.is_none());
        assert!(!config.tls_enabled());
    }
}
Loading