Verified Commit 04236f13 authored by Jean-Gabriel Doyon PTO until 2024-04-17's avatar Jean-Gabriel Doyon PTO until 2024-04-17 Committed by GitLab
Browse files

refactor(indexer): move dispatcher into indexer crate and hook datalake-generator

parent 304534e3
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -2070,6 +2070,7 @@ dependencies = [
 "clap",
 "clickhouse-client",
 "flate2",
 "indexer",
 "ontology",
 "opentelemetry",
 "rand 0.8.5",
+1 −0
Original line number Diff line number Diff line
@@ -26,4 +26,5 @@ tracing.workspace = true
tracing-subscriber.workspace = true

clickhouse-client = { path = "../clickhouse-client" }
indexer = { path = "../indexer" }
ontology = { path = "../ontology" }
+4 −0
Original line number Diff line number Diff line
@@ -8,6 +8,9 @@ datalake:
  database: gitlab_clickhouse_development
  username: default

nats:
  url: localhost:4222

generation:
  seed: 42
  batch_size: 100000
@@ -69,6 +72,7 @@ continuous:
  enabled: false
  cycles: 10
  cycle_interval_secs: 5
  dispatch_indexing: true
  inserts_per_cycle:
    MergeRequest: 20
    WorkItem: 10
+18 −0
Original line number Diff line number Diff line
@@ -8,6 +8,8 @@ use serde::{Deserialize, Serialize};
#[serde(deny_unknown_fields)]
pub struct SimulatorConfig {
    pub datalake: ClickHouseConfig,
    #[serde(default)]
    pub nats: NatsConfig,
    pub generation: GenerationConfig,
    #[serde(default)]
    pub continuous: ContinuousConfig,
@@ -17,6 +19,20 @@ pub struct SimulatorConfig {
    pub state: StateConfig,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct NatsConfig {
    pub url: String,
}

impl Default for NatsConfig {
    fn default() -> Self {
        Self {
            url: "localhost:4222".to_string(),
        }
    }
}

impl SimulatorConfig {
    pub fn load(path: impl AsRef<Path>) -> Result<Self> {
        let path = path.as_ref();
@@ -170,6 +186,7 @@ pub struct ContinuousConfig {
    pub inserts_per_cycle: HashMap<String, usize>,
    pub updates_per_cycle: HashMap<String, usize>,
    pub deletes_per_cycle: HashMap<String, usize>,
    pub dispatch_indexing: bool,
}

impl Default for ContinuousConfig {
@@ -181,6 +198,7 @@ impl Default for ContinuousConfig {
            inserts_per_cycle: HashMap::new(),
            updates_per_cycle: HashMap::new(),
            deletes_per_cycle: HashMap::new(),
            dispatch_indexing: false,
        }
    }
}
+8 −1
Original line number Diff line number Diff line
@@ -6,13 +6,14 @@ use chrono::{DateTime, Utc};
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};
use tokio::sync::RwLock;
use tracing::info;
use tracing::{info, warn};

use crate::clickhouse::ClickHouseWriter;
use crate::config::SimulatorConfig;
use crate::data_generation::SchemaRegistry;
use crate::data_generation::fake_values::SiphonFakeValueGenerator;
use crate::data_generation::row_builder::DirectBatchBuilder;
use crate::dispatch::run_dispatch_indexing;
use crate::seeding::catalog;
use crate::state::HierarchyState;

@@ -113,6 +114,12 @@ impl ContinuousGenerator {

            info!(cycle, inserts, updates, deletes, "cycle complete");

            if self.config.continuous.dispatch_indexing
                && let Err(error) = run_dispatch_indexing(&self.config).await
            {
                warn!(cycle, %error, "dispatch indexing failed, continuing");
            }

            tokio::time::sleep(std::time::Duration::from_secs(
                self.config.continuous.cycle_interval_secs,
            ))
Loading