Verified Commit 4ce265fb authored by Bohdan Parkhomchuk's avatar Bohdan Parkhomchuk 💬 Committed by GitLab
Browse files

feat(indexer): loop-based dispatcher with cron scheduling

parent 2362a478
Loading
Loading
Loading
Loading
+84 −4
Original line number Diff line number Diff line
@@ -1471,6 +1471,17 @@ dependencies = [
 "itertools 0.13.0",
]

[[package]]
name = "croner"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4aa42bcd3d846ebf66e15bd528d1087f75d1c6c1c66ebff626178a106353c576"
dependencies = [
 "chrono",
 "derive_builder",
 "strum 0.27.2",
]

[[package]]
name = "crossbeam-channel"
version = "0.5.15"
@@ -1581,14 +1592,38 @@ dependencies = [
 "syn 2.0.117",
]

[[package]]
name = "darling"
version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee"
dependencies = [
 "darling_core 0.20.11",
 "darling_macro 0.20.11",
]

[[package]]
name = "darling"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25ae13da2f202d56bd7f91c25fba009e7717a1e4a1cc98a76d844b65ae912e9d"
dependencies = [
 "darling_core",
 "darling_macro",
 "darling_core 0.23.0",
 "darling_macro 0.23.0",
]

[[package]]
name = "darling_core"
version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e"
dependencies = [
 "fnv",
 "ident_case",
 "proc-macro2",
 "quote",
 "strsim",
 "syn 2.0.117",
]

[[package]]
@@ -1604,13 +1639,24 @@ dependencies = [
 "syn 2.0.117",
]

[[package]]
name = "darling_macro"
version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead"
dependencies = [
 "darling_core 0.20.11",
 "quote",
 "syn 2.0.117",
]

[[package]]
name = "darling_macro"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3984ec7bd6cfa798e62b4a642426a5be0e68f9401cfc2a01e3fa9ea2fcdb8d"
dependencies = [
 "darling_core",
 "darling_core 0.23.0",
 "quote",
 "syn 2.0.117",
]
@@ -2305,6 +2351,37 @@ dependencies = [
 "syn 2.0.117",
]

[[package]]
name = "derive_builder"
version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "507dfb09ea8b7fa618fcf76e953f4f5e192547945816d5358edffe39f6f94947"
dependencies = [
 "derive_builder_macro",
]

[[package]]
name = "derive_builder_core"
version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8"
dependencies = [
 "darling 0.20.11",
 "proc-macro2",
 "quote",
 "syn 2.0.117",
]

[[package]]
name = "derive_builder_macro"
version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c"
dependencies = [
 "derive_builder_core",
 "syn 2.0.117",
]

[[package]]
name = "derive_more"
version = "2.1.1"
@@ -3038,7 +3115,9 @@ dependencies = [
name = "gkg-server-config"
version = "0.1.0"
dependencies = [
 "chrono",
 "config",
 "croner",
 "schemars 1.2.1",
 "serde",
 "serde_json",
@@ -3605,6 +3684,7 @@ dependencies = [
 "clickhouse",
 "clickhouse-client",
 "code-graph",
 "croner",
 "datafusion",
 "flate2",
 "futures",
@@ -6629,7 +6709,7 @@ version = "3.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3db8978e608f1fe7357e211969fd9abdcae80bac1ba7a3369bb7eb6b404eb65"
dependencies = [
 "darling",
 "darling 0.23.0",
 "proc-macro2",
 "quote",
 "syn 2.0.117",
+1 −0
Original line number Diff line number Diff line
@@ -48,6 +48,7 @@ async-trait = "0.1.89"
bytes = "1.11.0"
flate2 = "1.1.9"
chrono = { version = "0.4.43", features = ["serde"] }
croner = "3.0.1"
futures = "0.3.31"
internment = "0.8.6"
mimalloc = "0.1.48"
+1 −0
Original line number Diff line number Diff line
@@ -112,6 +112,7 @@ k8s_yaml(helm(
))

# Skip readiness checks for components that may take time to connect
k8s_resource('gkg-dispatcher', pod_readiness='ignore')
k8s_resource('gkg-indexer', pod_readiness='ignore')
k8s_resource('gkg-webserver', pod_readiness='ignore', port_forwards=['8080:8080'])
k8s_resource('gkg-health-check', pod_readiness='ignore', port_forwards=['4201:4201'])
+13 −1
Original line number Diff line number Diff line
@@ -67,7 +67,19 @@ engine:
      max_attempts: 1  # re-dispatched daily, no need to retry

schedule:
  tasks: {}
  tasks:
    global:
      cron: "0 */1 * * * *"           # every minute
    namespace:
      cron: "0 */1 * * * *"           # every minute
    code-indexing-task:
      cron: "0 */1 * * * *"           # every minute
    namespace-code-backfill:
      cron: "0 */1 * * * *"           # every minute
    table-cleanup:
      cron: "0 0 3 * * *"             # daily at 03:00 UTC
    namespace-deletion:
      cron: "0 0 3 * * *"             # daily at 03:00 UTC

health_check:
  bind_address: "0.0.0.0:4201"
+62 −74
Original line number Diff line number Diff line
@@ -180,25 +180,25 @@
        "tasks": {
          "code-indexing-task": {
            "batch_size": 100,
            "events_stream_name": "siphon_stream_main_db",
            "interval_secs": null
            "cron": null,
            "events_stream_name": "siphon_stream_main_db"
          },
          "global": {
            "interval_secs": null
            "cron": null
          },
          "namespace": {
            "interval_secs": null
            "cron": null
          },
          "namespace-code-backfill": {
            "batch_size": 100,
            "events_stream_name": "siphon_stream_main_db",
            "interval_secs": null
            "cron": null,
            "events_stream_name": "siphon_stream_main_db"
          },
          "namespace-deletion": {
            "interval_secs": 86400
            "cron": "0 0 3 * * *"
          },
          "table-cleanup": {
            "interval_secs": 86400
            "cron": "0 0 3 * * *"
          }
        }
      }
@@ -371,18 +371,16 @@
      "additionalProperties": false
    },
    "GlobalDispatcherConfig": {
      "description": "Per-task schedule configuration (cadence interval).\n\nEach scheduled task embeds this via `#[serde(flatten)]` in its own typed config struct.\nThe scheduler loop reads it via `task.schedule()`.",
      "description": "Per-task schedule configuration.\n\nEach scheduled task embeds this via `#[serde(flatten)]` in its own typed config struct.\nThe scheduler reads it via `task.schedule()`.",
      "type": "object",
      "properties": {
        "interval_secs": {
          "description": "Interval in seconds between task runs.\nWhen absent, the task runs every cycle.",
        "cron": {
          "description": "Cron expression with seconds field (6-field: `sec min hour dom mon dow`).\nWhen absent, the task runs on a default 60-second interval.",
          "type": [
            "integer",
            "string",
            "null"
          ],
          "format": "uint64",
          "default": null,
          "minimum": 0
          "default": null
        }
      }
    },
@@ -595,7 +593,7 @@
      "additionalProperties": false
    },
    "NamespaceCodeBackfillDispatcherConfig": {
      "description": "Per-task schedule configuration (cadence interval).\n\nEach scheduled task embeds this via `#[serde(flatten)]` in its own typed config struct.\nThe scheduler loop reads it via `task.schedule()`.",
      "description": "Per-task schedule configuration.\n\nEach scheduled task embeds this via `#[serde(flatten)]` in its own typed config struct.\nThe scheduler reads it via `task.schedule()`.",
      "type": "object",
      "properties": {
        "batch_size": {
@@ -604,19 +602,17 @@
          "default": 100,
          "minimum": 0
        },
        "events_stream_name": {
          "type": "string",
          "default": "siphon_stream_main_db"
        },
        "interval_secs": {
          "description": "Interval in seconds between task runs.\nWhen absent, the task runs every cycle.",
        "cron": {
          "description": "Cron expression with seconds field (6-field: `sec min hour dom mon dow`).\nWhen absent, the task runs on a default 60-second interval.",
          "type": [
            "integer",
            "string",
            "null"
          ],
          "format": "uint64",
          "default": null,
          "minimum": 0
          "default": null
        },
        "events_stream_name": {
          "type": "string",
          "default": "siphon_stream_main_db"
        }
      }
    },
@@ -655,34 +651,30 @@
      }
    },
    "NamespaceDeletionSchedulerConfig": {
      "description": "Per-task schedule configuration (cadence interval).\n\nEach scheduled task embeds this via `#[serde(flatten)]` in its own typed config struct.\nThe scheduler loop reads it via `task.schedule()`.",
      "description": "Per-task schedule configuration.\n\nEach scheduled task embeds this via `#[serde(flatten)]` in its own typed config struct.\nThe scheduler reads it via `task.schedule()`.",
      "type": "object",
      "properties": {
        "interval_secs": {
          "description": "Interval in seconds between task runs.\nWhen absent, the task runs every cycle.",
        "cron": {
          "description": "Cron expression with seconds field (6-field: `sec min hour dom mon dow`).\nWhen absent, the task runs on a default 60-second interval.",
          "type": [
            "integer",
            "string",
            "null"
          ],
          "format": "uint64",
          "default": null,
          "minimum": 0
          "default": null
        }
      }
    },
    "NamespaceDispatcherConfig": {
      "description": "Per-task schedule configuration (cadence interval).\n\nEach scheduled task embeds this via `#[serde(flatten)]` in its own typed config struct.\nThe scheduler loop reads it via `task.schedule()`.",
      "description": "Per-task schedule configuration.\n\nEach scheduled task embeds this via `#[serde(flatten)]` in its own typed config struct.\nThe scheduler reads it via `task.schedule()`.",
      "type": "object",
      "properties": {
        "interval_secs": {
          "description": "Interval in seconds between task runs.\nWhen absent, the task runs every cycle.",
        "cron": {
          "description": "Cron expression with seconds field (6-field: `sec min hour dom mon dow`).\nWhen absent, the task runs on a default 60-second interval.",
          "type": [
            "integer",
            "string",
            "null"
          ],
          "format": "uint64",
          "default": null,
          "minimum": 0
          "default": null
        }
      }
    },
@@ -991,25 +983,25 @@
          "default": {
            "code-indexing-task": {
              "batch_size": 100,
              "events_stream_name": "siphon_stream_main_db",
              "interval_secs": null
              "cron": null,
              "events_stream_name": "siphon_stream_main_db"
            },
            "global": {
              "interval_secs": null
              "cron": null
            },
            "namespace": {
              "interval_secs": null
              "cron": null
            },
            "namespace-code-backfill": {
              "batch_size": 100,
              "events_stream_name": "siphon_stream_main_db",
              "interval_secs": null
              "cron": null,
              "events_stream_name": "siphon_stream_main_db"
            },
            "namespace-deletion": {
              "interval_secs": 86400
              "cron": "0 0 3 * * *"
            },
            "table-cleanup": {
              "interval_secs": 86400
              "cron": "0 0 3 * * *"
            }
          }
        }
@@ -1024,47 +1016,47 @@
          "$ref": "#/$defs/SiphonCodeIndexingTaskDispatcherConfig",
          "default": {
            "batch_size": 100,
            "events_stream_name": "siphon_stream_main_db",
            "interval_secs": null
            "cron": null,
            "events_stream_name": "siphon_stream_main_db"
          }
        },
        "global": {
          "$ref": "#/$defs/GlobalDispatcherConfig",
          "default": {
            "interval_secs": null
            "cron": null
          }
        },
        "namespace": {
          "$ref": "#/$defs/NamespaceDispatcherConfig",
          "default": {
            "interval_secs": null
            "cron": null
          }
        },
        "namespace-code-backfill": {
          "$ref": "#/$defs/NamespaceCodeBackfillDispatcherConfig",
          "default": {
            "batch_size": 100,
            "events_stream_name": "siphon_stream_main_db",
            "interval_secs": null
            "cron": null,
            "events_stream_name": "siphon_stream_main_db"
          }
        },
        "namespace-deletion": {
          "$ref": "#/$defs/NamespaceDeletionSchedulerConfig",
          "default": {
            "interval_secs": 86400
            "cron": "0 0 3 * * *"
          }
        },
        "table-cleanup": {
          "$ref": "#/$defs/TableCleanupConfig",
          "default": {
            "interval_secs": 86400
            "cron": "0 0 3 * * *"
          }
        }
      },
      "additionalProperties": false
    },
    "SiphonCodeIndexingTaskDispatcherConfig": {
      "description": "Per-task schedule configuration (cadence interval).\n\nEach scheduled task embeds this via `#[serde(flatten)]` in its own typed config struct.\nThe scheduler loop reads it via `task.schedule()`.",
      "description": "Per-task schedule configuration.\n\nEach scheduled task embeds this via `#[serde(flatten)]` in its own typed config struct.\nThe scheduler reads it via `task.schedule()`.",
      "type": "object",
      "properties": {
        "batch_size": {
@@ -1073,35 +1065,31 @@
          "default": 100,
          "minimum": 0
        },
        "events_stream_name": {
          "type": "string",
          "default": "siphon_stream_main_db"
        },
        "interval_secs": {
          "description": "Interval in seconds between task runs.\nWhen absent, the task runs every cycle.",
        "cron": {
          "description": "Cron expression with seconds field (6-field: `sec min hour dom mon dow`).\nWhen absent, the task runs on a default 60-second interval.",
          "type": [
            "integer",
            "string",
            "null"
          ],
          "format": "uint64",
          "default": null,
          "minimum": 0
          "default": null
        },
        "events_stream_name": {
          "type": "string",
          "default": "siphon_stream_main_db"
        }
      }
    },
    "TableCleanupConfig": {
      "description": "Per-task schedule configuration (cadence interval).\n\nEach scheduled task embeds this via `#[serde(flatten)]` in its own typed config struct.\nThe scheduler loop reads it via `task.schedule()`.",
      "description": "Per-task schedule configuration.\n\nEach scheduled task embeds this via `#[serde(flatten)]` in its own typed config struct.\nThe scheduler reads it via `task.schedule()`.",
      "type": "object",
      "properties": {
        "interval_secs": {
          "description": "Interval in seconds between task runs.\nWhen absent, the task runs every cycle.",
        "cron": {
          "description": "Cron expression with seconds field (6-field: `sec min hour dom mon dow`).\nWhen absent, the task runs on a default 60-second interval.",
          "type": [
            "integer",
            "string",
            "null"
          ],
          "format": "uint64",
          "default": null,
          "minimum": 0
          "default": null
        }
      }
    },
Loading