Close CI graph gaps: parent/child pipelines, runner attribution, bridge jobs
## The story
Today you can ask the orbit graph "what are gitlab-org/gitlab's longest pipelines," and you get an answer. You cannot ask "and which jobs made them slow," "which runner ran them," "what triggered the parent pipeline," or "did this pipeline get auto-canceled by a newer one." The data is mostly there in the Siphon mirrors. The ontology just stops short.
This issue closes that gap. We extend the CI ontology so the graph models pipelines, stages, jobs, and runners as a connected network, with parent/child links between pipelines, runner attribution on every job, and bridges flagged as bridges. The reader should finish this issue knowing exactly what the end-state looks like, and an implementing agent should be able to work top-down through the workstreams without re-exploring.
## Target ontology
### Nodes after this issue lands
| Node | New properties | New edges (direction) |
|---|---|---|
| Pipeline | `auto_canceled_by_id`, `committed_at`, `before_sha`, `yaml_errors`, `protected`, `updated_at` | `HAS_JOB` (out, direct), `AUTO_CANCELED_BY` (out, self), `CHILD_OF` (out, self) |
| Stage | unchanged | unchanged |
| Job | `runner_id`, `type`, `timeout`, `timeout_source`, `exit_code`, `scheduling_type`, `interruptible`, `expanded_environment_name` | `RUNS_ON` (out → Runner), `TRIGGERS_PIPELINE` (out → Pipeline, bridges only), `IN_PIPELINE` (out → Pipeline, redundant with two-hop but cheap), `AUTO_CANCELED_BY` (out, self) |
| **Runner (new)** | `id`, `runner_type`, `description`, `name`, `active`, `locked`, `run_untagged`, `access_level`, `maximum_timeout`, `created_at`, `contacted_at`, `organization_id` | `RUNS_FOR` (in from Group via `siphon_ci_runner_namespaces`), `RUNS_FOR` (in from Project via `siphon_ci_runner_projects`) |
### Edges after this issue lands (CI domain only)
| Edge | From → To | Source |
|---|---|---|
| `IN_PROJECT` | Pipeline / Stage / Job / Runner → Project | FK on each node |
| `HAS_STAGE` | Pipeline → Stage | `stages.pipeline_id` |
| `HAS_JOB` | Stage → Job, **Pipeline → Job** | `builds.stage_id`, `builds.commit_id` |
| `TRIGGERED` | User / MergeRequest → Pipeline / Job | `pipelines.user_id`, `builds.user_id`, `pipelines.merge_request_id` |
| `RUNS_ON` (new) | Job → Runner | `builds.runner_id` |
| `RUNS_FOR` (new) | Runner → Group / Project | `ci_runner_namespaces`, `ci_runner_projects` |
| `AUTO_CANCELED_BY` (new) | Pipeline → Pipeline, Job → Job | `auto_canceled_by_id` |
| `CHILD_OF` (new) | Pipeline → Pipeline | `ci_sources_pipelines.source_pipeline_id` |
| `TRIGGERS_PIPELINE` (new) | Job → Pipeline | `ci_sources_pipelines.source_job_id` (bridge job) |
| `IN_PIPELINE` (new for Job) | Job → Pipeline | `builds.commit_id` |
### What you can ask after this lands
```text
Longest pipelines for gitlab-org/gitlab and the slowest job in each:
Pipeline -[HAS_JOB]-> Job ORDER BY (job.finished_at - job.started_at) DESC
Which runner ran the slowest jobs:
Job -[RUNS_ON]-> Runner
Bridge-driven downstream pipelines and their jobs:
Pipeline -[CHILD_OF]-> ParentPipeline -[HAS_JOB]-> Bridge (Job WHERE type='Ci::Bridge')
Was this pipeline auto-canceled by a newer push:
Pipeline -[AUTO_CANCELED_BY]-> CancelingPipeline
```
## Workstreams
Eleven workstreams, ordered by dependency. Each is small. Items 1, 2, 5, 6, 7, 8, 11 are pure orbit YAML. Items 3, 4, 9 add monolith ClickHouse migrations first, then orbit YAML. Item 10 is a config check. None of these should be deferred.
| # | Workstream | Repos touched | Blocks |
|---|---|---|---|
| 1 | Pipeline self-edges (`AUTO_CANCELED_BY`, `IN_PIPELINE` for Job) | orbit | none |
| 2 | Direct `Pipeline -[HAS_JOB]-> Job` | orbit | none |
| 3 | Bridge child pipelines via `ci_sources_pipelines` | monolith + orbit | adds new siphon table |
| 4 | Runner node + `RUNS_ON` + `RUNS_FOR` | orbit (+ orbit fixture sync from monolith) | runner siphon already in monolith |
| 5 | Job perf fields (`runner_id`, `timeout`, `exit_code`, `type`, `scheduling_type`) | orbit | none |
| 6 | Bridge discrimination (`type` enum on Job, store `Ci::Bridge` rows distinctly) | orbit | depends on 5 |
| 7 | Pipeline metadata (`committed_at`, `before_sha`, `yaml_errors`, `protected`, `updated_at`) | orbit | none |
| 8 | `Job -[TRIGGERS_PIPELINE]-> Pipeline` via `upstream_pipeline_id` | orbit | depends on 6 |
| 9 | `interruptible`, `timeout` policy via `siphon_p_ci_builds_metadata` | monolith + orbit | adds new siphon table |
| 10 | Verify Siphon CDC subscription for `siphon_p_ci_builds` is live for gitlab-org/gitlab | siphon repo | unblocks observed Job=0 |
| 11 | Bump `config/SCHEMA_VERSION` from 11 to 12 and fix the doc drift in `data_model.md` | orbit | last |
The remainder of the issue is the implementation reference. Each section below is collapsed; expand the one you are working on.
---
<details>
<summary><b>Reference: source data inventory</b> (read this first)</summary>
#### `siphon_p_ci_pipelines` columns currently mapped to Pipeline node
`id, iid, sha, ref, status, source, tag, duration, failure_reason, created_at, started_at, finished_at`
#### `siphon_p_ci_pipelines` columns NOT mapped (candidates)
| Column | Type | Use |
|---|---|---|
| `auto_canceled_by_id` | `Nullable(Int64)` | self-edge target |
| `committed_at` | `Nullable(DateTime64)` | property |
| `before_sha` | `Nullable(String)` | property |
| `yaml_errors` | `Nullable(String)` | property |
| `protected` | `Nullable(Bool)` | property |
| `updated_at` | `DateTime64` | property |
| `pipeline_schedule_id` | `Nullable(Int64)` | future Schedule node FK |
| `trigger_id` | `Nullable(Int64)` | future Trigger FK |
#### `siphon_p_ci_builds` columns currently mapped to Job
`id, name, status, ref, tag, allow_failure, coverage, environment, when, retried, failure_reason, created_at, started_at, finished_at, queued_at`
#### `siphon_p_ci_builds` columns NOT mapped (candidates)
| Column | Type | Use |
|---|---|---|
| `type` | `LowCardinality(String)` | enum property: `Ci::Build` vs `Ci::Bridge` |
| `runner_id` | `Nullable(Int64)` | edge to Runner |
| `commit_id` | `Nullable(Int64)` | edge to Pipeline (this is the pipeline FK; legacy column name) |
| `upstream_pipeline_id` | `Nullable(Int64)` | edge from bridge to triggered pipeline |
| `auto_canceled_by_id` | `Nullable(Int64)` | self-edge target |
| `erased_by_id` | `Nullable(Int64)` | future User edge |
| `timeout` | `Nullable(Int64)` | property |
| `timeout_source` | `Nullable(Int16)` | property |
| `exit_code` | `Nullable(Int16)` | property |
| `scheduling_type` | `Nullable(Int16)` | property |
| `resource_group_id` | `Nullable(Int64)` | future ResourceGroup FK |
#### `siphon_ci_runners`, `siphon_ci_runner_namespaces`, `siphon_ci_runner_projects`
Exist in the monolith ClickHouse migrations as of `20260216174855`, `20260216175519`, `20260216175533`. Not yet in `fixtures/siphon.sql` of orbit. Must be copied in. See workstream 4 for the DDL.
#### Tables not yet in any siphon mirror
| Table | Adds | Needed for |
|---|---|---|
| `ci_sources_pipelines` | `pipeline_id`, `source_pipeline_id`, `source_project_id`, `source_job_id`, `partition_id`, `source_partition_id` | workstream 3 (Pipeline `CHILD_OF`) |
| `p_ci_builds_metadata` | `interruptible`, `timeout`, `timeout_source`, `expanded_environment_name`, `id_tokens`, `secrets`, `build_id` (FK back to job) | workstream 9 |
Both must be added to the monolith first via a new ClickHouse migration in `db/click_house/migrate/main/`, then mirrored into orbit's `fixtures/siphon.sql`.
</details>
<details>
<summary><b>Workstream 1: Pipeline self-edges (<code>AUTO_CANCELED_BY</code>) and Job <code>IN_PIPELINE</code></b></summary>
#### Goal
Add `Pipeline -[AUTO_CANCELED_BY]-> Pipeline` and `Job -[IN_PIPELINE]-> Pipeline` (the latter is the cheap one-hop replacement for going through Stage). Both source columns already exist in the siphon mirrors.
#### Files to change
- `config/ontology/nodes/ci/pipeline.yaml`: add FK `auto_canceled_by_id` to `etl.edges`.
- `config/ontology/nodes/ci/job.yaml`: add FK `commit_id` to `etl.edges` (this column on builds is the pipeline FK).
- `config/ontology/edges/auto_canceled_by.yaml`: new edge with two variants (Pipeline→Pipeline, Job→Job).
- `config/ontology/edges/in_pipeline.yaml`: extend existing file (currently has SecurityScan→Pipeline) with Job→Pipeline variant.
#### Exact YAML diffs
`pipeline.yaml`:
```yaml
etl:
edges:
project_id: { to: Project, as: IN_PROJECT, direction: outgoing }
user_id: { to: User, as: TRIGGERED, direction: incoming }
merge_request_id: { to: MergeRequest, as: TRIGGERED, direction: incoming }
auto_canceled_by_id: { to: Pipeline, as: AUTO_CANCELED_BY, direction: outgoing } # new
```
`job.yaml`:
```yaml
etl:
edges:
project_id: { to: Project, as: IN_PROJECT, direction: outgoing }
stage_id: { to: Stage, as: HAS_JOB, direction: incoming }
user_id: { to: User, as: TRIGGERED, direction: incoming }
commit_id: { to: Pipeline, as: IN_PIPELINE, direction: outgoing } # new
auto_canceled_by_id: { to: Job, as: AUTO_CANCELED_BY, direction: outgoing } # new
```
`config/ontology/edges/auto_canceled_by.yaml` (new file):
```yaml
description: Entity was auto-canceled by a newer entity of the same kind
variants:
- from_node: { type: Pipeline, id: id }
to_node: { type: Pipeline, id: id }
description: "Pipeline was auto-canceled by a newer pipeline."
- from_node: { type: Job, id: id }
to_node: { type: Job, id: id }
description: "Job was auto-canceled when its pipeline was superseded."
```
Append to `config/ontology/edges/in_pipeline.yaml`:
```yaml
- from_node: { type: Job, id: id }
to_node: { type: Pipeline, id: id }
description: "Job belongs to pipeline (bypasses Stage hop)."
```
#### Tests
Extend `crates/integration-tests/tests/indexer/sdlc/ci.rs::processes_pipelines` to insert two pipelines where the second sets `auto_canceled_by_id` to the first, then assert one `AUTO_CANCELED_BY` edge with the right traversal path. Mirror the pattern from `processes_issue_links` in `work_items.rs`. For `IN_PIPELINE`, extend `processes_jobs` to assert one Job→Pipeline edge per Job row.
#### How the FK→edge transform works
The indexer turns each `etl.edges` entry into a `FkEdgeTransform` (see `crates/indexer/src/modules/sdlc/plan/lower.rs::lower_fk_edge_transform`). For an `outgoing` direction with `auto_canceled_by_id`, the generated SQL projects `id AS source_id, 'Pipeline' AS source_kind, 'AUTO_CANCELED_BY' AS relationship_kind, auto_canceled_by_id AS target_id, 'Pipeline' AS target_kind` from `siphon_p_ci_pipelines` rows where the FK is non-null. No code change needed beyond YAML.
</details>
<details>
<summary><b>Workstream 2: Direct <code>Pipeline -[HAS_JOB]-> Job</code></b></summary>
Two equivalent options. Pick one.
**Option A (preferred, less data duplication)**: rely on workstream 1's `IN_PIPELINE` (Job → Pipeline). Queries that today do `Pipeline → Stage → Job` can do `Pipeline <- IN_PIPELINE - Job` instead, one hop.
**Option B (more idiomatic)**: extend `edges/has_job.yaml` to a second variant `Pipeline → Job`, and add `commit_id` to `job.yaml::etl.edges` as `as: HAS_JOB, direction: incoming`. Doubles the rows in the edge table for jobs but matches the original CI mental model.
Recommendation: implement A in workstream 1, evaluate query patterns in production, then decide whether B is worth the storage. Open a follow-up issue if so.
</details>
<details>
<summary><b>Workstream 3: Bridge child pipelines via <code>ci_sources_pipelines</code></b></summary>
#### Goal
Add `Pipeline -[CHILD_OF]-> Pipeline` and `Job (Bridge) -[TRIGGERS_PIPELINE]-> Pipeline` sourced from `ci_sources_pipelines`. This is the canonical link for downstream pipelines triggered by bridges.
#### Step 1, monolith side
New migration in `/Users/angelo.rivera/gitlab/gdk/gitlab/db/click_house/migrate/main/`. Filename pattern matches the existing 2026-02 batch:
```
YYYYMMDDHHMMSS_create_siphon_ci_sources_pipelines.rb
```
```ruby
# frozen_string_literal: true
class CreateSiphonCiSourcesPipelines < ClickHouse::Migration
def up
execute <<-SQL
CREATE TABLE IF NOT EXISTS siphon_ci_sources_pipelines
(
id Int64 CODEC(DoubleDelta, ZSTD),
project_id Nullable(Int64),
source_project_id Nullable(Int64),
source_job_id Nullable(Int64),
partition_id Int64,
source_partition_id Int64,
pipeline_id Nullable(Int64),
source_pipeline_id Nullable(Int64),
traversal_path String DEFAULT multiIf(
coalesce(project_id, 0) != 0,
dictGetOrDefault('project_traversal_paths_dict', 'traversal_path', project_id, '0/'),
'0/'
) CODEC(ZSTD(3)),
_siphon_replicated_at DateTime64(6, 'UTC') DEFAULT now() CODEC(ZSTD(1)),
_siphon_deleted Bool DEFAULT FALSE CODEC(ZSTD(1))
)
ENGINE = ReplacingMergeTree(_siphon_replicated_at, _siphon_deleted)
PRIMARY KEY (traversal_path, id, partition_id)
ORDER BY (traversal_path, id, partition_id)
SETTINGS index_granularity = 2048
SQL
end
def down
execute "DROP TABLE IF EXISTS siphon_ci_sources_pipelines"
end
end
```
#### Step 2, orbit fixture
Append the equivalent CREATE TABLE to `/Users/angelo.rivera/gitlab/orbit/knowledge-graph/fixtures/siphon.sql` (look for the block of `siphon_p_ci_*` definitions near line 669 and place it next to them).
#### Step 3, orbit ontology
`config/ontology/edges/child_of.yaml` (new file, follows the `related_to.yaml` template for join-table sources):
```yaml
description: Pipeline was triggered as a child of another pipeline (bridge job downstream)
variants:
- from_node: { type: Pipeline, id: id }
to_node: { type: Pipeline, id: id }
description: "Pipeline is a downstream child of source pipeline."
etl:
scope: namespaced
source: siphon_ci_sources_pipelines
order_by: [traversal_path, id, partition_id]
from:
id: pipeline_id
type: Pipeline
to:
id: source_pipeline_id
type: Pipeline
```
`config/ontology/edges/triggers_pipeline.yaml` (new file):
```yaml
description: Bridge job triggered a downstream pipeline
variants:
- from_node: { type: Job, id: id }
to_node: { type: Pipeline, id: id }
description: "Bridge job triggered downstream pipeline."
etl:
scope: namespaced
source: siphon_ci_sources_pipelines
order_by: [traversal_path, id, partition_id]
from:
id: source_job_id
type: Job
to:
id: pipeline_id
type: Pipeline
```
#### Tests
New subtest `processes_ci_sources_pipelines` in `crates/integration-tests/tests/indexer/sdlc/ci.rs`. Insert parent pipeline, child pipeline, bridge job in parent, source row linking them. Assert one `CHILD_OF` edge and one `TRIGGERS_PIPELINE` edge.
</details>
<details>
<summary><b>Workstream 4: Runner node + <code>RUNS_ON</code> + <code>RUNS_FOR</code></b></summary>
#### Step 1, orbit fixture
Copy DDL from `/Users/angelo.rivera/gitlab/gdk/gitlab/db/click_house/migrate/main/20260216174855_create_siphon_ci_runners.rb` (and the two related runner namespace/project migrations) into `/Users/angelo.rivera/gitlab/orbit/knowledge-graph/fixtures/siphon.sql`. Match the SQL the migration emits, not the Ruby wrapping.
#### Step 2, ontology
`config/ontology/nodes/ci/runner.yaml` (new file):
```yaml
node_type: Runner
domain: ci
description: A CI/CD runner that executes jobs
label: name
destination_table: gl_runner
default_columns: [id, runner_type, name, active]
style: { size: 22, color: "#A78BFA" }
redaction:
resource_type: ci_runner
id_column: id
ability: read_runner
properties:
id: { type: int64, source: id, nullable: false }
runner_type:
type: enum
source: runner_type
nullable: false
enum_type: string
values: { 1: instance_type, 2: group_type, 3: project_type }
name: { type: string, source: name, nullable: true }
description: { type: string, source: description, nullable: true }
active: { type: boolean, source: active, nullable: false }
locked: { type: boolean, source: locked, nullable: false }
run_untagged: { type: boolean, source: run_untagged, nullable: false }
access_level: { type: int64, source: access_level, nullable: false }
maximum_timeout: { type: int64, source: maximum_timeout, nullable: true }
organization_id: { type: int64, source: organization_id, nullable: true }
created_at: { type: timestamp, source: created_at, nullable: true }
contacted_at: { type: timestamp, source: contacted_at, nullable: true }
traversal_path: { type: string, source: traversal_path, filterable: false, nullable: false }
etl:
type: table
scope: global # runners are scoped per-instance; use 'namespaced' if traversal_path is reliable
source: siphon_ci_runners
order_by: [id, runner_type]
storage:
primary_key: [id]
columns:
- { name: id, type: "Int64", codec: ["Delta(8)", "ZSTD(1)"] }
- { name: runner_type, type: "LowCardinality(String)", default: "''", codec: ["LZ4"] }
- { name: name, type: "Nullable(String)", codec: ["ZSTD(1)"] }
- { name: description, type: "Nullable(String)", codec: ["ZSTD(1)"] }
- { name: active, type: "Bool", default: "true", codec: ["LZ4"] }
- { name: locked, type: "Bool", default: "false", codec: ["LZ4"] }
- { name: run_untagged, type: "Bool", default: "true", codec: ["LZ4"] }
- { name: access_level, type: "Int64", default: "0", codec: ["LZ4"] }
- { name: maximum_timeout, type: "Nullable(Int64)", codec: ["ZSTD(1)"] }
- { name: organization_id, type: "Nullable(Int64)", codec: ["ZSTD(1)"] }
- { name: created_at, type: "Nullable(DateTime64(6, 'UTC'))", codec: ["Delta(8)", "ZSTD(1)"] }
- { name: contacted_at, type: "Nullable(DateTime64(6, 'UTC'))", codec: ["Delta(8)", "ZSTD(1)"] }
- { name: traversal_path, type: "String", default: "'0/'", codec: ["ZSTD(1)"] }
```
`config/ontology/edges/runs_on.yaml`:
```yaml
description: Job executed on a runner
variants:
- from_node: { type: Job, id: id }
to_node: { type: Runner, id: id }
description: "Job ran on this runner."
```
`config/ontology/edges/runs_for.yaml`:
```yaml
description: Runner is associated with a group or project
variants:
- from_node: { type: Runner, id: id }
to_node: { type: Group, id: id }
- from_node: { type: Runner, id: id }
to_node: { type: Project, id: id }
# Two etl blocks would be needed; siphon_ci_runner_namespaces and siphon_ci_runner_projects
# are separate join tables, so this needs two edge files OR one with separate variant-scoped
# etl. The cleaner pattern is two separate edge files (e.g. runs_for_group.yaml and
# runs_for_project.yaml). See related_to.yaml for the single-source single-edge template.
```
Recommendation: split into two edge files for clarity, since the schema does not support per-variant `etl` blocks today.
`config/ontology/edges/runs_for_group.yaml`:
```yaml
description: Runner is registered for a group
variants:
- from_node: { type: Runner, id: id }
to_node: { type: Group, id: id }
etl:
scope: namespaced
source: siphon_ci_runner_namespaces
order_by: [traversal_path, id]
from: { id: runner_id, type: Runner }
to: { id: namespace_id, type: Group }
```
`config/ontology/edges/runs_for_project.yaml`:
```yaml
description: Runner is registered for a project
variants:
- from_node: { type: Runner, id: id }
to_node: { type: Project, id: id }
etl:
scope: namespaced
source: siphon_ci_runner_projects
order_by: [traversal_path, id]
from: { id: runner_id, type: Runner }
to: { id: project_id, type: Project }
```
#### Step 3, wire RUNS_ON onto Job
Add to `job.yaml::etl.edges`:
```yaml
runner_id: { to: Runner, as: RUNS_ON, direction: outgoing }
```
#### Step 4, fixtures and tests
Add helpers to `crates/integration-tests/tests/indexer/common/siphon.rs`: `create_runner`, `create_runner_namespace`, `create_runner_project`. Mirror the signatures of `create_project` and `create_user`. New subtest `processes_runners` in `ci.rs` covering Runner node, RUNS_ON edge, RUNS_FOR_GROUP and RUNS_FOR_PROJECT.
</details>
<details>
<summary><b>Workstream 5: Job perf fields</b></summary>
Pure additive in `nodes/ci/job.yaml`. Add the following entries under `properties:` and matching rows under `storage.columns`:
```yaml
runner_id: { type: int64, source: runner_id, nullable: true }
type: { type: string, source: type, nullable: false, description: "Ci::Build or Ci::Bridge" }
timeout: { type: int64, source: timeout, nullable: true }
timeout_source: { type: int64, source: timeout_source, nullable: true }
exit_code: { type: int64, source: exit_code, nullable: true }
scheduling_type: { type: int64, source: scheduling_type, nullable: true }
```
Storage columns (LowCardinality for type since cardinality is bounded):
```yaml
- { name: runner_id, type: "Nullable(Int64)", codec: ["ZSTD(1)"] }
- { name: type, type: "LowCardinality(String)", default: "''", codec: ["LZ4"] }
- { name: timeout, type: "Nullable(Int64)", codec: ["ZSTD(1)"] }
- { name: timeout_source, type: "Nullable(Int64)", codec: ["ZSTD(1)"] }
- { name: exit_code, type: "Nullable(Int64)", codec: ["ZSTD(1)"] }
- { name: scheduling_type, type: "Nullable(Int64)", codec: ["ZSTD(1)"] }
```
Extend `processes_jobs` test fixtures to set these and assert they round-trip.
</details>
<details>
<summary><b>Workstream 6: Bridge discrimination</b></summary>
Already covered by exposing `type` in workstream 5. To make queries readable, add a virtual property or document the convention: rows with `type='Ci::Bridge'` are bridges and target a downstream pipeline via `TRIGGERS_PIPELINE` (workstream 3).
Optional follow-up: split into a separate `Bridge` node by promoting `type` to a discriminator. Track that as a separate issue once query patterns settle. Not in scope here.
</details>
<details>
<summary><b>Workstream 7: Pipeline metadata fields</b></summary>
Add to `nodes/ci/pipeline.yaml::properties` and `storage.columns`:
```yaml
committed_at: { type: timestamp, source: committed_at, nullable: true }
before_sha: { type: string, source: before_sha, nullable: true }
yaml_errors: { type: string, source: yaml_errors, nullable: true }
protected: { type: boolean, source: protected, nullable: true }
updated_at: { type: timestamp, source: updated_at, nullable: true }
```
Storage rows:
```yaml
- { name: committed_at, type: "Nullable(DateTime64(6, 'UTC'))", codec: ["Delta(8)", "ZSTD(1)"] }
- { name: before_sha, type: "Nullable(String)", codec: ["ZSTD(1)"] }
- { name: yaml_errors, type: "Nullable(String)", codec: ["ZSTD(3)"] }
- { name: protected, type: "Nullable(Bool)", codec: ["LZ4"] }
- { name: updated_at, type: "Nullable(DateTime64(6, 'UTC'))", codec: ["Delta(8)", "ZSTD(1)"] }
```
</details>
<details>
<summary><b>Workstream 8: <code>Job -[TRIGGERS_PIPELINE]-> Pipeline</code> via <code>upstream_pipeline_id</code></b></summary>
This is the cheap path that does not require the new `siphon_ci_sources_pipelines` table. `siphon_p_ci_builds.upstream_pipeline_id` is already populated for jobs in child pipelines and points at the parent pipeline.
Either reuse this column under a different edge name (e.g. `JOB_IN_UPSTREAM_PIPELINE`), or wait for workstream 3 to land and use the canonical `TRIGGERS_PIPELINE` from the bridge side. The simpler short-term version:
`job.yaml::etl.edges`:
```yaml
upstream_pipeline_id: { to: Pipeline, as: TRIGGERED_BY_PIPELINE, direction: outgoing }
```
If workstream 3 lands first, drop this in favor of the join-table-sourced edge. Keep both for a release if you want graceful migration.
</details>
<details>
<summary><b>Workstream 9: <code>siphon_p_ci_builds_metadata</code> for <code>interruptible</code> and timeout policy</b></summary>
#### Step 1, monolith migration
```ruby
# frozen_string_literal: true
class CreateSiphonPCiBuildsMetadata < ClickHouse::Migration
def up
execute <<-SQL
CREATE TABLE IF NOT EXISTS siphon_p_ci_builds_metadata
(
id Int64 CODEC(DoubleDelta, ZSTD),
build_id Int64,
project_id Int64,
partition_id Int64,
timeout Nullable(Int64),
timeout_source Int64 DEFAULT 1,
interruptible Nullable(Bool),
has_exposed_artifacts Nullable(Bool),
environment_auto_stop_in Nullable(String),
expanded_environment_name Nullable(String),
debug_trace_enabled Bool DEFAULT false,
exit_code Nullable(Int16),
traversal_path String DEFAULT multiIf(
coalesce(project_id, 0) != 0,
dictGetOrDefault('project_traversal_paths_dict', 'traversal_path', project_id, '0/'),
'0/'
) CODEC(ZSTD(3)),
_siphon_replicated_at DateTime64(6, 'UTC') DEFAULT now() CODEC(ZSTD(1)),
_siphon_deleted Bool DEFAULT FALSE CODEC(ZSTD(1))
)
ENGINE = ReplacingMergeTree(_siphon_replicated_at, _siphon_deleted)
PRIMARY KEY (traversal_path, build_id, partition_id)
ORDER BY (traversal_path, build_id, partition_id)
SETTINGS index_granularity = 2048
SQL
end
def down
execute "DROP TABLE IF EXISTS siphon_p_ci_builds_metadata"
end
end
```
#### Step 2, orbit
Two options for surfacing these fields:
**Option A**: separate `JobMetadata` node + `HAS_METADATA` edge. Cleaner but adds a node.
**Option B**: merge into Job via a virtual-column resolver (look at `column_resolver.rs` patterns in `query-engine/shared`). Same query surface, no new node.
Recommendation: Option A unless query patterns demand B.
</details>
<details>
<summary><b>Workstream 10: Verify Siphon CDC subscription</b></summary>
The orbit graph today returns zero `Job` rows for `gitlab-org/gitlab` even though the indexer is healthy and the CI integration test passes. Confirm in the lake:
```sql
SELECT count() FROM siphon_p_ci_builds WHERE startsWith(traversal_path, '1/9970/');
SELECT count() FROM siphon_p_ci_stages WHERE startsWith(traversal_path, '1/9970/');
SELECT count() FROM siphon_p_ci_pipelines WHERE startsWith(traversal_path, '1/9970/');
```
If `siphon_p_ci_builds` is zero or much smaller than `siphon_p_ci_pipelines`, the subscription is missing or filtered. Fix in the Siphon repo's `table_mapping`. Track as a sub-task with the data-platform team.
</details>
<details>
<summary><b>Workstream 11: Schema version + doc fix</b></summary>
After all workstreams above land:
1. Bump `config/SCHEMA_VERSION` from `11` to `12`. The lefthook check (`schema-version-check`) will fail otherwise.
2. Fix `docs/design-documents/data_model.md` lines 97 and 114: `HAS_JOB` is `Stage → Job`, not `Pipeline → Job` (with workstream 2 Option A, also document `IN_PIPELINE` as the Job→Pipeline edge). `HAS_PIPELINE: Project → Pipeline` should be `IN_PROJECT: Pipeline → Project`.
</details>
---
Parent epic: gitlab-org&21720
<!-- AI-Sessions
dir: ~/.claude/projects/-Users-angelo-rivera-gitlab-orbit-knowledge-graph/
f14e21e4-c319-4c24-a6a3-3c5694d71414.jsonl (2026-04-26)
-->
issue