Verified Commit 09d33651 authored by Jean-Gabriel Doyon PTO until 2024-04-17's avatar Jean-Gabriel Doyon PTO until 2024-04-17 Committed by GitLab
Browse files

feat(indexer): clean-up node tables with deleted entries

parent e46f5a94
Loading
Loading
Loading
Loading
+4 −4
Original line number Diff line number Diff line
use anyhow::{Context, Result};
use indexer::dispatcher::ScheduledTask;
use indexer::dispatcher::ScheduledTaskMetrics;
use indexer::modules::sdlc::dispatch::{
    GlobalDispatcher, GlobalDispatcherConfig, NamespaceDispatcher, NamespaceDispatcherConfig,
};
use indexer::scheduler::ScheduledTask;
use indexer::scheduler::ScheduledTaskMetrics;
use tracing::info;

use crate::config::SimulatorConfig;
@@ -23,7 +23,7 @@ pub async fn run_dispatch_indexing(config: &SimulatorConfig) -> Result<()> {
        password: config.datalake.password.clone(),
    };

    let services = indexer::dispatcher::connect(&nats_config)
    let services = indexer::scheduler::connect(&nats_config)
        .await
        .context("dispatcher connect failed")?;

@@ -44,7 +44,7 @@ pub async fn run_dispatch_indexing(config: &SimulatorConfig) -> Result<()> {
        )),
    ];

    indexer::dispatcher::run(&tasks, &*lock_service)
    indexer::scheduler::run(&tasks, &*lock_service)
        .await
        .context("dispatch indexing failed")?;

+1 −1
Original line number Diff line number Diff line
@@ -5,8 +5,8 @@ use gitlab_client::GitlabClientConfiguration;
use health_check::HealthCheckConfig;
use indexer::clickhouse::ClickHouseConfiguration;
use indexer::configuration::EngineConfiguration;
use indexer::dispatcher::ScheduleConfig;
use indexer::nats::NatsConfiguration;
use indexer::scheduler::ScheduleConfig;
use labkit_rs::metrics::MetricsConfig;
use serde::{Deserialize, Serialize};

+11 −5
Original line number Diff line number Diff line
@@ -10,10 +10,11 @@ use gkg_server::health_check as health_check_mode;
use gkg_server::shutdown;
use gkg_server::webserver::Server as HttpServer;
use indexer::IndexerConfig;
use indexer::dispatcher::ScheduledTask;
use indexer::dispatcher::ScheduledTaskMetrics;
use indexer::modules::code::dispatch::ProjectCodeDispatcher;
use indexer::modules::sdlc::dispatch::{GlobalDispatcher, NamespaceDispatcher};
use indexer::scheduler::ScheduledTask;
use indexer::scheduler::ScheduledTaskMetrics;
use indexer::scheduler::TableCleanup;
use tokio_util::sync::CancellationToken;
use tracing::info;

@@ -38,7 +39,7 @@ async fn main() -> anyhow::Result<()> {

    let result = match args.mode {
        Mode::DispatchIndexing => {
            let services = indexer::dispatcher::connect(&config.nats).await?;
            let services = indexer::scheduler::connect(&config.nats).await?;
            let graph = config.graph.build_client();
            let datalake = config.datalake.build_client();
            let metrics = ScheduledTaskMetrics::new();
@@ -57,12 +58,17 @@ async fn main() -> anyhow::Result<()> {
                )),
                Box::new(ProjectCodeDispatcher::new(
                    services.nats,
                    graph.clone(),
                    metrics.clone(),
                    config.schedule.tasks.project_code.clone(),
                )),
                Box::new(TableCleanup::new(
                    graph,
                    metrics,
                    config.schedule.tasks.project_code.clone(),
                    config.schedule.tasks.table_cleanup.clone(),
                )),
            ];
            indexer::dispatcher::run(&tasks, &*lock_service)
            indexer::scheduler::run(&tasks, &*lock_service)
                .await
                .map_err(Into::into)
        }
+3 −0
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@ use crate::modules::code::dispatch::ProjectCodeDispatcherConfig;
use crate::modules::code::{ProjectCodeIndexingHandlerConfig, PushEventHandlerConfig};
use crate::modules::sdlc::dispatch::{GlobalDispatcherConfig, NamespaceDispatcherConfig};
use crate::modules::sdlc::{GlobalHandlerConfig, NamespaceHandlerConfig};
use crate::scheduler::TableCleanupConfig;

/// Per-handler engine configuration (retry policy, concurrency group).
///
@@ -86,6 +87,8 @@ pub struct ScheduledTasksConfiguration {
    pub namespace: NamespaceDispatcherConfig,
    #[serde(default)]
    pub project_code: ProjectCodeDispatcherConfig,
    #[serde(default)]
    pub table_cleanup: TableCleanupConfig,
}

/// ETL engine configuration.
+2 −2
Original line number Diff line number Diff line
@@ -29,7 +29,6 @@
pub mod clickhouse;
pub mod configuration;
pub mod destination;
pub mod dispatcher;
pub mod engine;
pub(crate) mod env;
pub mod handler;
@@ -38,6 +37,7 @@ pub mod locking;
pub mod metrics;
pub mod modules;
pub mod nats;
pub mod scheduler;
pub mod topic;
pub mod types;
pub mod worker_pool;
@@ -51,13 +51,13 @@ use std::sync::Arc;
use clickhouse::ClickHouseConfiguration;
use clickhouse::ClickHouseDestination;
use configuration::EngineConfiguration;
use dispatcher::ScheduleConfig;
use engine::EngineBuilder;
use gitlab_client::GitlabClientConfiguration;
use handler::{HandlerInitError, HandlerRegistry};
use health::{HealthState, run_health_server};
use locking::INDEXING_LOCKS_BUCKET;
use nats::{KvBucketConfig, NatsBroker, NatsConfiguration};
use scheduler::ScheduleConfig;
use thiserror::Error;
use tokio_util::sync::CancellationToken;
use tracing::info;
Loading