Verified Commit 96f86301 authored by Jean-Gabriel Doyon PTO until 2024-04-17's avatar Jean-Gabriel Doyon PTO until 2024-04-17 Committed by GitLab
Browse files

feat(indexer): add health endpoint for Kubernetes probes

parent 0c092f9f
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -3355,6 +3355,7 @@ dependencies = [
 "arrow",
 "async-nats",
 "async-trait",
 "axum",
 "base64 0.22.1",
 "bytes",
 "chrono",
+6 −0
Original line number Diff line number Diff line
@@ -18,6 +18,10 @@ fn default_grpc_bind_address() -> SocketAddr {
    "127.0.0.1:50054".parse().unwrap()
}

fn default_indexer_health_bind_address() -> SocketAddr {
    "0.0.0.0:4202".parse().unwrap()
}

fn default_jwt_clock_skew_secs() -> u64 {
    60
}
@@ -48,6 +52,8 @@ pub struct AppConfig {
    pub modules: ModulesConfig,
    #[serde(default)]
    pub health_check: HealthCheckConfig,
    #[serde(default = "default_indexer_health_bind_address")]
    pub indexer_health_bind_address: SocketAddr,
    #[serde(default)]
    pub metrics: MetricsConfig,
}
+1 −0
Original line number Diff line number Diff line
@@ -60,6 +60,7 @@ async fn main() -> anyhow::Result<()> {
                engine: config.engine.clone(),
                gitlab: config.gitlab.clone(),
                modules: config.modules.clone(),
                health_bind_address: config.indexer_health_bind_address,
            };
            indexer::run(&indexer_config, shutdown)
                .await
+1 −0
Original line number Diff line number Diff line
@@ -38,6 +38,7 @@ tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true }
axum = { workspace = true }
zstd = "0.13.3"

[dev-dependencies]
+102 −0
Original line number Diff line number Diff line
use std::net::SocketAddr;
use std::time::Duration;

use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
use clickhouse_client::ArrowClickHouseClient;
use serde::Serialize;
use tokio::net::TcpListener;
use tokio::time::timeout;
use tracing::info;

const HEALTH_CHECK_TIMEOUT: Duration = Duration::from_secs(5);

#[derive(Clone)]
pub struct HealthState {
    pub nats_client: async_nats::Client,
    pub graph_client: ArrowClickHouseClient,
    pub datalake_client: ArrowClickHouseClient,
}

#[derive(Serialize)]
struct HealthResponse {
    status: &'static str,
    version: &'static str,
    #[serde(skip_serializing_if = "Vec::is_empty")]
    unhealthy_components: Vec<&'static str>,
}

fn version() -> &'static str {
    match option_env!("GKG_VERSION") {
        Some(v) => v,
        None => env!("CARGO_PKG_VERSION"),
    }
}

async fn live() -> Json<HealthResponse> {
    Json(HealthResponse {
        status: "ok",
        version: version(),
        unhealthy_components: Vec::new(),
    })
}

async fn ready(State(state): State<HealthState>) -> impl IntoResponse {
    let nats_healthy =
        state.nats_client.connection_state() == async_nats::connection::State::Connected;

    let graph_healthy = timeout(HEALTH_CHECK_TIMEOUT, state.graph_client.execute("SELECT 1"))
        .await
        .is_ok_and(|r| r.is_ok());
    let datalake_healthy = timeout(
        HEALTH_CHECK_TIMEOUT,
        state.datalake_client.execute("SELECT 1"),
    )
    .await
    .is_ok_and(|r| r.is_ok());

    let mut unhealthy_components = Vec::new();
    if !nats_healthy {
        unhealthy_components.push("nats");
    }
    if !graph_healthy {
        unhealthy_components.push("clickhouse_graph");
    }
    if !datalake_healthy {
        unhealthy_components.push("clickhouse_datalake");
    }

    let healthy = unhealthy_components.is_empty();

    let status = if healthy {
        StatusCode::OK
    } else {
        StatusCode::SERVICE_UNAVAILABLE
    };

    let label = if healthy { "ok" } else { "unhealthy" };

    (
        status,
        Json(HealthResponse {
            status: label,
            version: version(),
            unhealthy_components,
        }),
    )
}

pub async fn run_health_server(
    bind_address: SocketAddr,
    state: HealthState,
) -> Result<(), std::io::Error> {
    let app = Router::new()
        .route("/live", get(live))
        .route("/ready", get(ready))
        .with_state(state);

    let listener = TcpListener::bind(bind_address).await?;

    info!(%bind_address, "indexer health server listening");

    axum::serve(listener, app).await
}
Loading