Verified Commit f734de92 authored by Jean-Gabriel Doyon PTO until 2024-04-17's avatar Jean-Gabriel Doyon PTO until 2024-04-17 Committed by GitLab
Browse files

refactor(indexer): extract ScheduledTask abstraction from Dispatcher

parent b74b37ae
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -35,7 +35,7 @@ flowchart LR
- **GitLab Core** -- PostgreSQL (OLTP), Gitaly (Git storage), and Rails (application server). The source of all SDLC and code data. Handles authentication and authorization for graph queries.
- **Data Insights Platform** -- Siphon (CDC) streams PostgreSQL logical replication events through NATS JetStream into ClickHouse.
- **ClickHouse** -- Columnar database serving two logical databases on one instance: the datalake (raw CDC rows from Siphon) and the graph database (indexed property graph tables).
- **Knowledge Graph (Orbit)** -- Rust service that transforms datalake rows into a property graph, parses code via Gitaly, and serves graph queries over gRPC. Single binary running as indexer, webserver, dispatcher, and health-check.
- **Knowledge Graph (Orbit)** -- Rust service that transforms datalake rows into a property graph, parses code via Gitaly, and serves graph queries over gRPC. Single binary running as indexer, webserver, scheduler, and health-check.

| Resource | Location |
|---|---|
@@ -91,7 +91,7 @@ Filtered by `knowledge graph` label:

| Repository | Purpose |
|---|---|
| [gitlab-org/orbit/knowledge-graph](https://gitlab.com/gitlab-org/orbit/knowledge-graph) | Main GKG service -- 14 Rust crates covering parsing, indexing, query compilation, serving, and infrastructure. Single `gkg-server` binary runs in 4 modes (webserver, indexer, dispatcher, health-check). |
| [gitlab-org/orbit/knowledge-graph](https://gitlab.com/gitlab-org/orbit/knowledge-graph) | Main GKG service -- 14 Rust crates covering parsing, indexing, query compilation, serving, and infrastructure. Single `gkg-server` binary runs in 4 modes (webserver, indexer, scheduler, health-check). |
| [gitlab-org/orbit/build-images](https://gitlab.com/gitlab-org/orbit/build-images) | CI builder images (Rust toolchain, pre-compiled tools, sccache) used by the knowledge-graph pipeline |
| [gitlab-org/orbit/gkg-helm-charts](https://gitlab.com/gitlab-org/orbit/gkg-helm-charts) | Official production Helm chart for GKG (v1.0.0, application chart, uses [common-ci-tasks](https://gitlab.com/gitlab-com/gl-infra/common-ci-tasks) patterns) |
| [gitlab-org/orbit/documentation/orbit-artifacts](https://gitlab.com/gitlab-org/orbit/documentation/orbit-artifacts) | Offsite transcripts and session notes (Feb 3-5, 2026) |
+2 −3
Original line number Diff line number Diff line
@@ -48,9 +48,8 @@ engine:
      concurrency_group: code
      max_attempts: 1  # re-dispatched every 30 minutes, no need to retry

dispatch:
  intervals: {}
  batch_size: 1000
schedule:
  tasks: {}

health_check:
  bind_address: "0.0.0.0:4201"
+6 −6
Original line number Diff line number Diff line
use anyhow::{Context, Result};
use indexer::dispatcher::Dispatcher;
use indexer::dispatcher::ScheduledTask;
use indexer::dispatcher::ScheduledTaskMetrics;
use indexer::modules::sdlc::dispatch::{
    DispatchMetrics, GlobalDispatcher, GlobalDispatcherConfig, NamespaceDispatcher,
    NamespaceDispatcherConfig,
    GlobalDispatcher, GlobalDispatcherConfig, NamespaceDispatcher, NamespaceDispatcherConfig,
};
use tracing::info;

@@ -28,9 +28,9 @@ pub async fn run_dispatch_indexing(config: &SimulatorConfig) -> Result<()> {
        .context("dispatcher connect failed")?;

    let datalake = datalake_config.build_client();
    let metrics = DispatchMetrics::new();
    let metrics = ScheduledTaskMetrics::new();
    let lock_service = services.lock_service.clone();
    let dispatchers: Vec<Box<dyn Dispatcher>> = vec![
    let tasks: Vec<Box<dyn ScheduledTask>> = vec![
        Box::new(GlobalDispatcher::new(
            services.nats.clone(),
            metrics.clone(),
@@ -44,7 +44,7 @@ pub async fn run_dispatch_indexing(config: &SimulatorConfig) -> Result<()> {
        )),
    ];

    indexer::dispatcher::run(&dispatchers, &*lock_service)
    indexer::dispatcher::run(&tasks, &*lock_service)
        .await
        .context("dispatch indexing failed")?;

+2 −2
Original line number Diff line number Diff line
@@ -5,7 +5,7 @@ use gitlab_client::GitlabClientConfiguration;
use health_check::HealthCheckConfig;
use indexer::clickhouse::ClickHouseConfiguration;
use indexer::configuration::EngineConfiguration;
use indexer::dispatcher::DispatchConfig;
use indexer::dispatcher::ScheduleConfig;
use indexer::nats::NatsConfiguration;
use labkit_rs::metrics::MetricsConfig;
use serde::{Deserialize, Serialize};
@@ -77,7 +77,7 @@ pub struct AppConfig {
    #[serde(default)]
    pub gitlab: GitlabConfig,
    #[serde(default)]
    pub dispatch: DispatchConfig,
    pub schedule: ScheduleConfig,
    #[serde(default)]
    pub health_check: HealthCheckConfig,
    #[serde(default = "default_indexer_health_bind_address")]
+10 −9
Original line number Diff line number Diff line
@@ -10,9 +10,10 @@ use gkg_server::health_check as health_check_mode;
use gkg_server::shutdown;
use gkg_server::webserver::Server as HttpServer;
use indexer::IndexerConfig;
use indexer::dispatcher::Dispatcher;
use indexer::dispatcher::ScheduledTask;
use indexer::dispatcher::ScheduledTaskMetrics;
use indexer::modules::code::dispatch::ProjectCodeDispatcher;
use indexer::modules::sdlc::dispatch::{DispatchMetrics, GlobalDispatcher, NamespaceDispatcher};
use indexer::modules::sdlc::dispatch::{GlobalDispatcher, NamespaceDispatcher};
use tokio_util::sync::CancellationToken;
use tracing::info;

@@ -40,28 +41,28 @@ async fn main() -> anyhow::Result<()> {
            let services = indexer::dispatcher::connect(&config.nats).await?;
            let graph = config.graph.build_client();
            let datalake = config.datalake.build_client();
            let metrics = DispatchMetrics::new();
            let metrics = ScheduledTaskMetrics::new();
            let lock_service = services.lock_service.clone();
            let dispatchers: Vec<Box<dyn Dispatcher>> = vec![
            let tasks: Vec<Box<dyn ScheduledTask>> = vec![
                Box::new(GlobalDispatcher::new(
                    services.nats.clone(),
                    metrics.clone(),
                    config.dispatch.dispatchers.global.clone(),
                    config.schedule.tasks.global.clone(),
                )),
                Box::new(NamespaceDispatcher::new(
                    services.nats.clone(),
                    datalake.clone(),
                    metrics.clone(),
                    config.dispatch.dispatchers.namespace.clone(),
                    config.schedule.tasks.namespace.clone(),
                )),
                Box::new(ProjectCodeDispatcher::new(
                    services.nats,
                    graph,
                    metrics,
                    config.dispatch.dispatchers.project_code.clone(),
                    config.schedule.tasks.project_code.clone(),
                )),
            ];
            indexer::dispatcher::run(&dispatchers, &*lock_service)
            indexer::dispatcher::run(&tasks, &*lock_service)
                .await
                .map_err(Into::into)
        }
@@ -73,7 +74,7 @@ async fn main() -> anyhow::Result<()> {
                datalake: config.datalake.clone(),
                engine: config.engine.clone(),
                gitlab: config.gitlab_client_config(),
                dispatch: config.dispatch.clone(),
                schedule: config.schedule.clone(),
                health_bind_address: config.indexer_health_bind_address,
            };
            indexer::run(&indexer_config, shutdown)
Loading