Unverified Commit a348b800 authored by Bohdan Parkhomchuk's avatar Bohdan Parkhomchuk 💬
Browse files

refactor(webserver): replace health probes with /live and /ready

parent 793abcce
Loading
Loading
Loading
Loading
Loading
+125 −0
Original line number Diff line number Diff line
@@ -214,6 +214,75 @@ impl Default for ClusterHealthChecker {
#[cfg(test)]
mod tests {
    use super::*;
    use axum::{Json, Router, routing::get};
    use health_check::{ComponentHealth as HcComponentHealth, HealthStatus, ServiceHealth, Status};
    use tokio::net::TcpListener;

    fn install_crypto_provider() {
        let _ = rustls::crypto::ring::default_provider().install_default();
    }

    async fn start_mock_sidecar(health: HealthStatus) -> String {
        install_crypto_provider();
        let app = Router::new().route(
            "/health",
            get(move || {
                let h = health.clone();
                async move { Json(h) }
            }),
        );
        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = listener.local_addr().unwrap();
        tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
        format!("http://{addr}")
    }

    fn healthy_sidecar_response() -> HealthStatus {
        HealthStatus {
            status: Status::Healthy,
            services: vec![
                ServiceHealth {
                    name: "webserver".to_string(),
                    status: Status::Healthy,
                    ready_replicas: 2,
                    desired_replicas: 2,
                },
                ServiceHealth {
                    name: "indexer".to_string(),
                    status: Status::Healthy,
                    ready_replicas: 1,
                    desired_replicas: 1,
                },
            ],
            clickhouse: HcComponentHealth {
                status: Status::Healthy,
                error: None,
            },
        }
    }

    fn degraded_sidecar_response() -> HealthStatus {
        HealthStatus {
            status: Status::Unhealthy,
            services: vec![ServiceHealth {
                name: "indexer".to_string(),
                status: Status::Unhealthy,
                ready_replicas: 0,
                desired_replicas: 2,
            }],
            clickhouse: HcComponentHealth {
                status: Status::Healthy,
                error: None,
            },
        }
    }

    fn extract_structured(response: GetClusterHealthResponse) -> StructuredClusterHealth {
        match response.content {
            Some(get_cluster_health_response::Content::Structured(s)) => s,
            _ => panic!("Expected structured response"),
        }
    }

    #[tokio::test]
    async fn test_stubbed_health_returns_healthy_structured() {
@@ -339,4 +408,60 @@ mod tests {
        let checker = ClusterHealthChecker::default();
        assert!(checker.health_client.is_none());
    }

    #[tokio::test]
    async fn real_mode_healthy_sidecar() {
        let url = start_mock_sidecar(healthy_sidecar_response()).await;
        let checker = ClusterHealthChecker::new(Some(url));

        let s = extract_structured(checker.get_cluster_health(ResponseFormat::Raw as i32).await);

        assert_eq!(s.status, ClusterStatus::Healthy as i32);
        let names: Vec<&str> = s.components.iter().map(|c| c.name.as_str()).collect();
        assert!(names.contains(&"webserver"));
        assert!(names.contains(&"indexer"));
        assert!(names.contains(&"clickhouse"));

        let webserver = s.components.iter().find(|c| c.name == "webserver").unwrap();
        let replicas = webserver.replicas.as_ref().unwrap();
        assert_eq!(replicas.ready, 2);
        assert_eq!(replicas.desired, 2);
    }

    #[tokio::test]
    async fn real_mode_unhealthy_component_propagates() {
        let url = start_mock_sidecar(degraded_sidecar_response()).await;
        let checker = ClusterHealthChecker::new(Some(url));

        let s = extract_structured(checker.get_cluster_health(ResponseFormat::Raw as i32).await);

        assert_eq!(s.status, ClusterStatus::Unhealthy as i32);
        let indexer = s.components.iter().find(|c| c.name == "indexer").unwrap();
        assert_eq!(indexer.status, ClusterStatus::Unhealthy as i32);
        let replicas = indexer.replicas.as_ref().unwrap();
        assert_eq!(replicas.ready, 0);
        assert_eq!(replicas.desired, 2);
    }

    #[tokio::test]
    async fn real_mode_unreachable_sidecar_returns_unhealthy() {
        install_crypto_provider();
        let checker = ClusterHealthChecker::new(Some("http://127.0.0.1:1".to_string()));

        let s = extract_structured(checker.get_cluster_health(ResponseFormat::Raw as i32).await);

        assert_eq!(s.status, ClusterStatus::Unhealthy as i32);
        let clickhouse = s
            .components
            .iter()
            .find(|c| c.name == "clickhouse")
            .unwrap();
        assert!(
            clickhouse
                .metrics
                .get("error")
                .unwrap()
                .contains("unreachable")
        );
    }
}
+3 −7
Original line number Diff line number Diff line
@@ -124,19 +124,15 @@ async fn run_webserver(config: &AppConfig) -> anyhow::Result<()> {

    let cluster_health = ClusterHealthChecker::new(config.health_check_url.clone()).into_arc();

    let http_server = HttpServer::bind(
        config.bind_address,
        (*validator).clone(),
        Arc::clone(&cluster_health),
    )
    .await?;
    let graph_client = config.graph.build_client();
    let http_server = HttpServer::bind(config.bind_address, graph_client).await?;
    info!(addr = %config.bind_address, "HTTP server bound");

    let grpc_server = GrpcServer::new(
        config.grpc_bind_address,
        validator,
        &config.graph,
        Arc::clone(&cluster_health),
        cluster_health,
    );
    info!(addr = %config.grpc_bind_address, "gRPC server starting");

+3 −7
Original line number Diff line number Diff line
@@ -2,14 +2,11 @@ mod health_client;
mod router;

use std::net::SocketAddr;
use std::sync::Arc;

use clickhouse_client::ArrowClickHouseClient;
use tokio::net::TcpListener;
use tracing::info;

use crate::auth::JwtValidator;
use crate::cluster_health::ClusterHealthChecker;

pub use health_client::InfrastructureHealthClient;
pub use router::create_router;

@@ -21,11 +18,10 @@ pub struct Server {
impl Server {
    pub async fn bind(
        addr: SocketAddr,
        validator: JwtValidator,
        cluster_health: Arc<ClusterHealthChecker>,
        graph_client: ArrowClickHouseClient,
    ) -> std::io::Result<Self> {
        let listener = TcpListener::bind(addr).await?;
        let router = create_router(validator, cluster_health);
        let router = create_router(graph_client);
        Ok(Self { listener, router })
    }

+43 −77
Original line number Diff line number Diff line
use std::sync::Arc;
use std::time::Duration;

use axum::extract::State;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::{Json, Router, routing::get};
use clickhouse_client::ArrowClickHouseClient;
use labkit_rs::correlation::http::{CorrelationIdLayer, PropagateCorrelationIdLayer};
use labkit_rs::metrics::http::HttpMetricsLayer;
use serde::Serialize;
use tokio::time::timeout;
use tower_http::trace::TraceLayer;

use crate::cluster_health::ClusterHealthChecker;
use crate::proto::{ClusterStatus, ResponseFormat, get_cluster_health_response};
use crate::webserver::JwtValidator;
const HEALTH_CHECK_TIMEOUT: Duration = Duration::from_secs(5);

#[derive(Clone)]
pub struct AppState {
    pub cluster_health: Arc<ClusterHealthChecker>,
    pub graph_client: ArrowClickHouseClient,
}

#[derive(Serialize)]
struct HealthResponse {
    status: &'static str,
    version: &'static str,
    #[serde(skip_serializing_if = "Vec::is_empty")]
    unhealthy_components: Vec<&'static str>,
}

#[derive(Serialize)]
struct ClusterHealthResponse {
    status: String,
    timestamp: String,
    version: String,
    components: Vec<ComponentHealthResponse>,
fn version() -> &'static str {
    match option_env!("GKG_VERSION") {
        Some(v) => v,
        None => env!("CARGO_PKG_VERSION"),
    }

#[derive(Serialize)]
struct ComponentHealthResponse {
    name: String,
    status: String,
    replicas: Option<ReplicaStatusResponse>,
    metrics: std::collections::HashMap<String, String>,
}

#[derive(Serialize)]
struct ReplicaStatusResponse {
    ready: i32,
    desired: i32,
}

async fn health() -> Json<HealthResponse> {
async fn live() -> Json<HealthResponse> {
    Json(HealthResponse {
        status: "ok",
        version: option_env!("GKG_VERSION").unwrap_or(env!("CARGO_PKG_VERSION")),
        version: version(),
        unhealthy_components: Vec::new(),
    })
}

fn status_label(val: i32) -> &'static str {
    match ClusterStatus::try_from(val) {
        Ok(ClusterStatus::Healthy) => "healthy",
        Ok(ClusterStatus::Degraded) => "degraded",
        Ok(ClusterStatus::Unhealthy) => "unhealthy",
        _ => "unknown",
    }
}

async fn cluster_health_handler(State(state): State<AppState>) -> Json<ClusterHealthResponse> {
    let health = state
        .cluster_health
        .get_cluster_health(ResponseFormat::Raw as i32)
        .await;
async fn ready(State(state): State<AppState>) -> impl IntoResponse {
    let graph_healthy = timeout(HEALTH_CHECK_TIMEOUT, state.graph_client.execute("SELECT 1"))
        .await
        .is_ok_and(|r| r.is_ok());

    let structured = match health.content {
        Some(get_cluster_health_response::Content::Structured(s)) => s,
        _ => {
            return Json(ClusterHealthResponse {
                status: "unknown".to_string(),
                timestamp: String::new(),
                version: String::new(),
                components: vec![],
            });
    let mut unhealthy_components = Vec::new();
    if !graph_healthy {
        unhealthy_components.push("clickhouse_graph");
    }

    let healthy = unhealthy_components.is_empty();
    let status_code = if healthy {
        StatusCode::OK
    } else {
        StatusCode::SERVICE_UNAVAILABLE
    };
    let label = if healthy { "ok" } else { "unhealthy" };

    let components = structured
        .components
        .into_iter()
        .map(|c| ComponentHealthResponse {
            name: c.name,
            status: status_label(c.status).to_string(),
            replicas: c.replicas.map(|r| ReplicaStatusResponse {
                ready: r.ready,
                desired: r.desired,
    (
        status_code,
        Json(HealthResponse {
            status: label,
            version: version(),
            unhealthy_components,
        }),
            metrics: c.metrics,
        })
        .collect();

    Json(ClusterHealthResponse {
        status: status_label(structured.status).to_string(),
        timestamp: structured.timestamp,
        version: structured.version,
        components,
    })
    )
}

pub fn create_router(
    _validator: JwtValidator,
    cluster_health: Arc<ClusterHealthChecker>,
) -> Router {
    let state = AppState { cluster_health };
pub fn create_router(graph_client: ArrowClickHouseClient) -> Router {
    let state = AppState { graph_client };

    Router::new()
        .route("/health", get(health))
        .route("/api/v1/cluster_health", get(cluster_health_handler))
        .route("/live", get(live))
        .route("/ready", get(ready))
        .with_state(state)
        .layer(HttpMetricsLayer::new())
        .layer(CorrelationIdLayer::new())
+38 −260
Original line number Diff line number Diff line
use std::net::SocketAddr;
use std::sync::Arc;

use axum::body::Body;
use axum::http::{Request, StatusCode};
use axum::{Json, Router, routing::get};
use gkg_server::auth::JwtValidator;
use gkg_server::cluster_health::ClusterHealthChecker;
use gkg_server::proto::ResponseFormat;
use clickhouse_client::ArrowClickHouseClient;
use gkg_server::webserver::create_router;
use health_check::{ComponentHealth, HealthStatus, ServiceHealth, Status};
use tokio::net::TcpListener;
use integration_testkit::TestContext;
use tower::ServiceExt;

fn test_validator() -> JwtValidator {
    JwtValidator::new("test-secret-that-is-at-least-32-bytes-long", 0).unwrap()
}

// ---------------------------------------------------------------------------
// Stubbed mode (no sidecar configured)
// ---------------------------------------------------------------------------
const GRAPH_SCHEMA_SQL: &str = include_str!("../../../../config/graph.sql");

#[tokio::test]
async fn liveness_probe_returns_ok() {
    let checker = ClusterHealthChecker::default().into_arc();
    let router = create_router(test_validator(), checker);

    let response = router
        .oneshot(Request::get("/health").body(Body::empty()).unwrap())
        .await
        .unwrap();

    assert_eq!(response.status(), StatusCode::OK);

    let body = axum::body::to_bytes(response.into_body(), 1024)
        .await
        .unwrap();
    let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
    assert_eq!(json["status"], "ok");
    assert!(json["version"].is_string());
fn live_request() -> Request<Body> {
    Request::get("/live").body(Body::empty()).unwrap()
}

#[tokio::test]
async fn stubbed_cluster_health_returns_all_components() {
    let checker = ClusterHealthChecker::default().into_arc();
    let router = create_router(test_validator(), checker);

    let response = router
        .oneshot(
            Request::get("/api/v1/cluster_health")
                .body(Body::empty())
                .unwrap(),
        )
        .await
        .unwrap();

    assert_eq!(response.status(), StatusCode::OK);
fn ready_request() -> Request<Body> {
    Request::get("/ready").body(Body::empty()).unwrap()
}

async fn parse_response(response: axum::response::Response) -> (StatusCode, serde_json::Value) {
    let status = response.status();
    let body = axum::body::to_bytes(response.into_body(), 4096)
        .await
        .unwrap();
    let json: serde_json::Value = serde_json::from_slice(&body).unwrap();

    assert_eq!(json["status"], "healthy");

    let components = json["components"].as_array().unwrap();
    let names: Vec<&str> = components
        .iter()
        .map(|c| c["name"].as_str().unwrap())
        .collect();
    assert!(names.contains(&"webserver"));
    assert!(names.contains(&"clickhouse"));
    assert!(names.contains(&"indexer"));

    for component in components {
        assert_eq!(component["status"], "healthy");
        assert_eq!(component["metrics"]["mode"], "stubbed");
    }
}

#[tokio::test]
async fn shared_checker_serves_same_components_over_http_and_grpc() {
    let checker = ClusterHealthChecker::default().into_arc();
    let router = create_router(test_validator(), Arc::clone(&checker));

    let http_response = router
        .oneshot(
            Request::get("/api/v1/cluster_health")
                .body(Body::empty())
                .unwrap(),
        )
        .await
        .unwrap();

    let http_body = axum::body::to_bytes(http_response.into_body(), 4096)
        .await
        .unwrap();
    let http_json: serde_json::Value = serde_json::from_slice(&http_body).unwrap();

    let grpc_response = checker.get_cluster_health(ResponseFormat::Raw as i32).await;

    let grpc_structured = match grpc_response.content {
        Some(gkg_server::proto::get_cluster_health_response::Content::Structured(s)) => s,
        _ => panic!("expected structured response from gRPC"),
    };

    assert_eq!(
        http_json["components"].as_array().unwrap().len(),
        grpc_structured.components.len()
    );

    let http_names: Vec<&str> = http_json["components"]
        .as_array()
        .unwrap()
        .iter()
        .map(|c| c["name"].as_str().unwrap())
        .collect();
    let grpc_names: Vec<&str> = grpc_structured
        .components
        .iter()
        .map(|c| c.name.as_str())
        .collect();
    assert_eq!(http_names, grpc_names);
}

// ---------------------------------------------------------------------------
// Real mode (sidecar returns health data over HTTP)
// ---------------------------------------------------------------------------

async fn start_mock_sidecar(health: HealthStatus) -> SocketAddr {
    let app = Router::new().route(
        "/health",
        get(move || {
            let h = health.clone();
            async move { Json(h) }
        }),
    );

    let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
    let addr = listener.local_addr().unwrap();

    tokio::spawn(async move {
        axum::serve(listener, app).await.unwrap();
    });

    addr
}

fn healthy_sidecar_response() -> HealthStatus {
    HealthStatus {
        status: Status::Healthy,
        services: vec![
            ServiceHealth {
                name: "webserver".to_string(),
                status: Status::Healthy,
                ready_replicas: 2,
                desired_replicas: 2,
            },
            ServiceHealth {
                name: "indexer".to_string(),
                status: Status::Healthy,
                ready_replicas: 1,
                desired_replicas: 1,
            },
        ],
        clickhouse: ComponentHealth {
            status: Status::Healthy,
            error: None,
        },
    }
}

fn degraded_sidecar_response() -> HealthStatus {
    HealthStatus {
        status: Status::Unhealthy,
        services: vec![ServiceHealth {
            name: "indexer".to_string(),
            status: Status::Unhealthy,
            ready_replicas: 0,
            desired_replicas: 2,
        }],
        clickhouse: ComponentHealth {
            status: Status::Healthy,
            error: None,
        },
    }
    (status, json)
}

#[tokio::test]
async fn real_mode_healthy_sidecar() {
    let addr = start_mock_sidecar(healthy_sidecar_response()).await;
    let checker = ClusterHealthChecker::new(Some(format!("http://{addr}"))).into_arc();
    let router = create_router(test_validator(), Arc::clone(&checker));

    let response = router
        .oneshot(
            Request::get("/api/v1/cluster_health")
                .body(Body::empty())
                .unwrap(),
        )
        .await
        .unwrap();

    let body = axum::body::to_bytes(response.into_body(), 4096)
        .await
        .unwrap();
    let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
async fn live_returns_ok() {
    let client = ArrowClickHouseClient::new("http://127.0.0.1:1", "default", "x", None);
    let router = create_router(client);

    assert_eq!(json["status"], "healthy");
    let (status, json) = parse_response(router.oneshot(live_request()).await.unwrap()).await;

    let components = json["components"].as_array().unwrap();
    let names: Vec<&str> = components
        .iter()
        .map(|c| c["name"].as_str().unwrap())
        .collect();
    assert!(names.contains(&"webserver"));
    assert!(names.contains(&"indexer"));
    assert!(names.contains(&"clickhouse"));

    let webserver = components
        .iter()
        .find(|c| c["name"] == "webserver")
        .unwrap();
    assert_eq!(webserver["replicas"]["ready"], 2);
    assert_eq!(webserver["replicas"]["desired"], 2);
    assert_eq!(status, StatusCode::OK);
    assert_eq!(json["status"], "ok");
    assert!(json["version"].is_string());
}

#[tokio::test]
async fn real_mode_unhealthy_component_propagates() {
    let addr = start_mock_sidecar(degraded_sidecar_response()).await;
    let checker = ClusterHealthChecker::new(Some(format!("http://{addr}"))).into_arc();
    let router = create_router(test_validator(), checker);
async fn ready_returns_ok_when_clickhouse_healthy() {
    let ctx = TestContext::new(&[GRAPH_SCHEMA_SQL]).await;
    let router = create_router(ctx.create_client());

    let response = router
        .oneshot(
            Request::get("/api/v1/cluster_health")
                .body(Body::empty())
                .unwrap(),
        )
        .await
        .unwrap();
    let (status, json) = parse_response(router.oneshot(ready_request()).await.unwrap()).await;

    let body = axum::body::to_bytes(response.into_body(), 4096)
        .await
        .unwrap();
    let json: serde_json::Value = serde_json::from_slice(&body).unwrap();

    assert_eq!(json["status"], "unhealthy");

    let indexer = json["components"]
        .as_array()
        .unwrap()
        .iter()
        .find(|c| c["name"] == "indexer")
        .unwrap();
    assert_eq!(indexer["status"], "unhealthy");
    assert_eq!(indexer["replicas"]["ready"], 0);
    assert_eq!(indexer["replicas"]["desired"], 2);
    assert_eq!(status, StatusCode::OK);
    assert_eq!(json["status"], "ok");
    assert!(
        json.get("unhealthy_components")
            .and_then(|v| v.as_array())
            .is_none_or(|a| a.is_empty())
    );
}

#[tokio::test]
async fn real_mode_unreachable_sidecar_returns_unhealthy() {
    let checker = ClusterHealthChecker::new(Some("http://127.0.0.1:1".to_string())).into_arc();
    let router = create_router(test_validator(), checker);
async fn ready_returns_503_when_clickhouse_unreachable() {
    let client = ArrowClickHouseClient::new("http://127.0.0.1:1", "default", "x", None);
    let router = create_router(client);

    let response = router
        .oneshot(
            Request::get("/api/v1/cluster_health")
                .body(Body::empty())
                .unwrap(),
        )
        .await
        .unwrap();

    let body = axum::body::to_bytes(response.into_body(), 4096)
        .await
        .unwrap();
    let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
    let (status, json) = parse_response(router.oneshot(ready_request()).await.unwrap()).await;

    assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
    assert_eq!(json["status"], "unhealthy");

    let clickhouse = json["components"]
    let components: Vec<String> = json["unhealthy_components"]
        .as_array()
        .unwrap()
        .iter()
        .find(|c| c["name"] == "clickhouse")
        .unwrap();
    assert!(
        clickhouse["metrics"]["error"]
            .as_str()
            .unwrap()
            .contains("unreachable")
    );
        .filter_map(|v| v.as_str().map(String::from))
        .collect();
    assert!(components.contains(&"clickhouse_graph".to_string()));
}