Skip to content

Proposal: Solving the de-normalized hierarchy problem

Problem Statement

Siphon can replicate data correctly from a PostgreSQL table. This allows us to build materialized views on top of these tables (siphon_ tables) where we join additional data. By denormalizing the data, we achieve very fast organization-, group-, and project-level reporting.

This problem was highlighted in the Data Insights Platform Hierarchical Data Retrieval Optimization: https://handbook.gitlab.com/handbook/engineering/architecture/design-documents/data_insights_platform_hierarchical_data_retrieval_optimization/#data-inconsistency

Example: namespaces + merge requests

By joining these two tables in a materialized view, we now have the hierarchy_merge_requests table (primary keys: traversal_path, id), which contains all columns from siphon_merge_requests (primary key: id) and the special traversal_path column from the siphon_namespaces table.

Materialized view definition
CREATE MATERIALIZED VIEW hierarchy_merge_requests_mv TO hierarchy_merge_requests
(
    `traversal_path` String, -- This is the join-ed column
    `id` Int64,
    `target_branch` String,
    `source_branch` String,
    `source_project_id` Nullable(Int64),
    `author_id` Nullable(Int64),
    `assignee_id` Nullable(Int64),
    `title` String,
    `created_at` DateTime64(6, 'UTC'),
    `updated_at` DateTime64(6, 'UTC'),
    `milestone_id` Nullable(Int64),
    `merge_status` LowCardinality(String),
    `target_project_id` Int64,
    `iid` Nullable(Int64),
    `description` String,
    `updated_by_id` Nullable(Int64),
    `merge_error` Nullable(String),
    `merge_params` Nullable(String),
    `merge_when_pipeline_succeeds` Bool,
    `merge_user_id` Nullable(Int64),
    `merge_commit_sha` Nullable(String),
    `approvals_before_merge` Nullable(Int64),
    `rebase_commit_sha` Nullable(String),
    `in_progress_merge_commit_sha` Nullable(String),
    `lock_version` Int64,
    `time_estimate` Nullable(Int64),
    `squash` Bool,
    `cached_markdown_version` Nullable(Int64),
    `last_edited_at` Nullable(DateTime64(6, 'UTC')),
    `last_edited_by_id` Nullable(Int64),
    `merge_jid` Nullable(String),
    `discussion_locked` Nullable(Bool),
    `latest_merge_request_diff_id` Nullable(Int64),
    `allow_maintainer_to_push` Nullable(Bool),
    `state_id` Int8,
    `rebase_jid` Nullable(String),
    `squash_commit_sha` Nullable(String),
    `sprint_id` Nullable(Int64),
    `merge_ref_sha` Nullable(String),
    `draft` Bool,
    `prepared_at` Nullable(DateTime64(6, 'UTC')),
    `merged_commit_sha` Nullable(String),
    `override_requested_changes` Bool,
    `head_pipeline_id` Nullable(Int64),
    `imported_from` Int8,
    `retargeted` Bool,
    `version` DateTime64(6, 'UTC'),
    `deleted` Bool,
    `label_ids` String,
    `assignee_ids` String,
    `approver_ids` String,
    `metric_latest_build_started_at` Nullable(DateTime64(6, 'UTC')),
    `metric_latest_build_finished_at` Nullable(DateTime64(6, 'UTC')),
    `metric_first_deployed_to_production_at` Nullable(DateTime64(6, 'UTC')),
    `metric_merged_at` Nullable(DateTime64(6, 'UTC')),
    `metric_merged_by_id` Nullable(Int64),
    `metric_latest_closed_by_id` Nullable(Int64),
    `metric_latest_closed_at` Nullable(DateTime64(6, 'UTC')),
    `metric_first_comment_at` Nullable(DateTime64(6, 'UTC')),
    `metric_first_commit_at` Nullable(DateTime64(6, 'UTC')),
    `metric_last_commit_at` Nullable(DateTime64(6, 'UTC')),
    `metric_diff_size` Nullable(Int64),
    `metric_modified_paths_size` Nullable(Int64),
    `metric_commits_count` Nullable(Int64),
    `metric_first_approved_at` Nullable(DateTime64(6, 'UTC')),
    `metric_first_reassigned_at` Nullable(DateTime64(6, 'UTC')),
    `metric_added_lines` Nullable(Int64),
    `metric_removed_lines` Nullable(Int64),
    `metric_first_contribution` Bool,
    `metric_pipeline_id` Nullable(Int64),
    `metric_reviewer_first_assigned_at` Nullable(DateTime64(6, 'UTC'))
)
AS WITH
    cte AS
    (
        SELECT *
        FROM siphon_merge_requests
    ),
    project_namespace_paths AS
    (
        SELECT *
        FROM
        (
            SELECT
                id,
                argMax(traversal_path, version) AS traversal_path,
                argMax(deleted, version) AS deleted
            FROM project_namespace_traversal_paths
            WHERE id IN (
                SELECT DISTINCT target_project_id
                FROM cte
            )
            GROUP BY id
        )
        WHERE deleted = false
    ),
    collected_label_ids AS
    (
        SELECT
            merge_request_id,
            concat('/', arrayStringConcat(arraySort(groupArray(label_id)), '/'), '/') AS label_ids
        FROM
        (
            SELECT
                merge_request_id,
                label_id,
                id,
                argMax(deleted, version) AS deleted
            FROM merge_request_label_links
            WHERE merge_request_id IN (
                SELECT id
                FROM cte
            )
            GROUP BY
                merge_request_id,
                label_id,
                id
        )
        WHERE deleted = false
        GROUP BY merge_request_id
    ),
    collected_assignee_ids AS
    (
        SELECT
            merge_request_id,
            concat('/', arrayStringConcat(arraySort(groupArray(user_id)), '/'), '/') AS user_ids
        FROM
        (
            SELECT
                merge_request_id,
                user_id,
                argMax(_siphon_deleted, _siphon_replicated_at) AS _siphon_deleted
            FROM siphon_merge_request_assignees
            WHERE merge_request_id IN (
                SELECT id
                FROM cte
            )
            GROUP BY
                merge_request_id,
                user_id
        )
        WHERE _siphon_deleted = false
        GROUP BY merge_request_id
    ),
    collected_approver_ids AS
    (
        SELECT
            merge_request_id,
            concat('/', arrayStringConcat(arraySort(groupArray(user_id)), '/'), '/') AS user_ids
        FROM
        (
            SELECT
                merge_request_id,
                user_id,
                argMax(_siphon_deleted, _siphon_replicated_at) AS _siphon_deleted
            FROM siphon_approvals
            WHERE merge_request_id IN (
                SELECT id
                FROM cte
            )
            GROUP BY
                merge_request_id,
                user_id
        )
        WHERE _siphon_deleted = false
        GROUP BY merge_request_id
    ),
    collected_merge_request_metrics AS
    (
        SELECT *
        FROM
        (
            SELECT
                merge_request_id,
                argMax(latest_build_started_at, _siphon_replicated_at) AS latest_build_started_at,
                argMax(latest_build_finished_at, _siphon_replicated_at) AS latest_build_finished_at,
                argMax(first_deployed_to_production_at, _siphon_replicated_at) AS first_deployed_to_production_at,
                argMax(merged_at, _siphon_replicated_at) AS merged_at,
                argMax(merged_by_id, _siphon_replicated_at) AS merged_by_id,
                argMax(latest_closed_by_id, _siphon_replicated_at) AS latest_closed_by_id,
                argMax(latest_closed_at, _siphon_replicated_at) AS latest_closed_at,
                argMax(first_comment_at, _siphon_replicated_at) AS first_comment_at,
                argMax(first_commit_at, _siphon_replicated_at) AS first_commit_at,
                argMax(last_commit_at, _siphon_replicated_at) AS last_commit_at,
                argMax(diff_size, _siphon_replicated_at) AS diff_size,
                argMax(modified_paths_size, _siphon_replicated_at) AS modified_paths_size,
                argMax(commits_count, _siphon_replicated_at) AS commits_count,
                argMax(first_approved_at, _siphon_replicated_at) AS first_approved_at,
                argMax(first_reassigned_at, _siphon_replicated_at) AS first_reassigned_at,
                argMax(added_lines, _siphon_replicated_at) AS added_lines,
                argMax(removed_lines, _siphon_replicated_at) AS removed_lines,
                argMax(first_contribution, _siphon_replicated_at) AS first_contribution,
                argMax(pipeline_id, _siphon_replicated_at) AS pipeline_id,
                argMax(reviewer_first_assigned_at, _siphon_replicated_at) AS reviewer_first_assigned_at,
                argMax(_siphon_deleted, _siphon_replicated_at) AS _siphon_deleted
            FROM siphon_merge_request_metrics
            GROUP BY
                merge_request_id,
                id
        )
        WHERE _siphon_deleted = false
    )
SELECT
    multiIf(cte.target_project_id != 0, project_namespace_paths.traversal_path, '0/') AS traversal_path,
    cte.id AS id,
    cte.target_branch AS target_branch,
    cte.source_branch AS source_branch,
    cte.source_project_id AS source_project_id,
    cte.author_id AS author_id,
    cte.assignee_id AS assignee_id,
    cte.title AS title,
    cte.created_at AS created_at,
    cte.updated_at AS updated_at,
    cte.milestone_id AS milestone_id,
    cte.merge_status AS merge_status,
    cte.target_project_id AS target_project_id,
    cte.iid AS iid,
    cte.description AS description,
    cte.updated_by_id AS updated_by_id,
    cte.merge_error AS merge_error,
    cte.merge_params AS merge_params,
    cte.merge_when_pipeline_succeeds AS merge_when_pipeline_succeeds,
    cte.merge_user_id AS merge_user_id,
    cte.merge_commit_sha AS merge_commit_sha,
    cte.approvals_before_merge AS approvals_before_merge,
    cte.rebase_commit_sha AS rebase_commit_sha,
    cte.in_progress_merge_commit_sha AS in_progress_merge_commit_sha,
    cte.lock_version AS lock_version,
    cte.time_estimate AS time_estimate,
    cte.squash AS squash,
    cte.cached_markdown_version AS cached_markdown_version,
    cte.last_edited_at AS last_edited_at,
    cte.last_edited_by_id AS last_edited_by_id,
    cte.merge_jid AS merge_jid,
    cte.discussion_locked AS discussion_locked,
    cte.latest_merge_request_diff_id AS latest_merge_request_diff_id,
    cte.allow_maintainer_to_push AS allow_maintainer_to_push,
    cte.state_id AS state_id,
    cte.rebase_jid AS rebase_jid,
    cte.squash_commit_sha AS squash_commit_sha,
    cte.sprint_id AS sprint_id,
    cte.merge_ref_sha AS merge_ref_sha,
    cte.draft AS draft,
    cte.prepared_at AS prepared_at,
    cte.merged_commit_sha AS merged_commit_sha,
    cte.override_requested_changes AS override_requested_changes,
    cte.head_pipeline_id AS head_pipeline_id,
    cte.imported_from AS imported_from,
    cte.retargeted AS retargeted,
    cte._siphon_replicated_at AS version,
    cte._siphon_deleted AS deleted,
    collected_label_ids.label_ids AS label_ids,
    collected_assignee_ids.user_ids AS assignee_ids,
    collected_approver_ids.user_ids AS approver_ids,
    collected_merge_request_metrics.latest_build_started_at AS metric_latest_build_started_at,
    collected_merge_request_metrics.latest_build_finished_at AS metric_latest_build_finished_at,
    collected_merge_request_metrics.first_deployed_to_production_at AS metric_first_deployed_to_production_at,
    collected_merge_request_metrics.merged_at AS metric_merged_at,
    collected_merge_request_metrics.merged_by_id AS metric_merged_by_id,
    collected_merge_request_metrics.latest_closed_by_id AS metric_latest_closed_by_id,
    collected_merge_request_metrics.latest_closed_at AS metric_latest_closed_at,
    collected_merge_request_metrics.first_comment_at AS metric_first_comment_at,
    collected_merge_request_metrics.first_commit_at AS metric_first_commit_at,
    collected_merge_request_metrics.last_commit_at AS metric_last_commit_at,
    collected_merge_request_metrics.diff_size AS metric_diff_size,
    collected_merge_request_metrics.modified_paths_size AS metric_modified_paths_size,
    collected_merge_request_metrics.commits_count AS metric_commits_count,
    collected_merge_request_metrics.first_approved_at AS metric_first_approved_at,
    collected_merge_request_metrics.first_reassigned_at AS metric_first_reassigned_at,
    collected_merge_request_metrics.added_lines AS metric_added_lines,
    collected_merge_request_metrics.removed_lines AS metric_removed_lines,
    collected_merge_request_metrics.first_contribution AS metric_first_contribution,
    collected_merge_request_metrics.pipeline_id AS metric_pipeline_id,
    collected_merge_request_metrics.reviewer_first_assigned_at AS metric_reviewer_first_assigned_at
FROM cte
LEFT JOIN project_namespace_paths ON project_namespace_paths.id = cte.target_project_id
LEFT JOIN collected_assignee_ids ON collected_assignee_ids.merge_request_id = cte.id
LEFT JOIN collected_label_ids ON collected_label_ids.merge_request_id = cte.id
LEFT JOIN collected_approver_ids ON collected_approver_ids.merge_request_id = cte.id
LEFT JOIN collected_merge_request_metrics ON collected_merge_request_metrics.merge_request_id = cte.id;
  • When any property changes on the merge_requests table in PostgreSQL, those changes are automatically propagated.
  • When a project or group is moved, the traversal_path column does not update in the hierarchy_merge_requests table.
    • Note: when after this step a merge_requests record is changed, traversal_path will be correctly set.

This behavior is specific to ClickHouse materialized views, where JOINed columns in the materialized view query do not trigger an insert.
As a result, this leads to stale data and inconsistencies.

Proposal

Track the traversal_path values for every namespace in a separate table using a materialized view.
A dedicated goroutine periodically scans this table for changes and writes them to a traversal_path_changes table.
When a change is detected, Siphon will refresh the affected records in the hierarchy_merge_requests table using the refresh_on_change feature.


1. Project creation

Each project has an associated namespaces record. After project creation, the traversal_path_changes table looks like this:

namespace_id traversal_path timestamp
3 1/2/3 Wed 29 Oct 08:57:13 CET 2025
  • 1: organization ID
  • 2: group ID (from the namespaces table)
  • 3: project namespace ID (from the namespaces table)

At this stage, querying this table yields no changes since no traversal_path updates have occurred.


2. Project moved to a subgroup

Let's assume that the user created a few merge requests. The user decides to move the project, the traversal_path_changes table looks like this:

namespace_id traversal_path timestamp
3 1/2/3 Wed 29 Oct 08:57:13 CET 2025
3 1/2/6/3 Wed 29 Oct 08:57:28 CET 2025
  • 1: organization ID
  • 2: group ID (from the namespaces table)
  • 6: subgroup ID (from the namespaces table)
  • 3: project namespace ID (from the namespaces table)

When querying the table for changes, we detect that for namespace_id = 3 there are two versions:
1/2/3 (old) and 1/2/6/3 (new).

The latest one (by timestamp) represents the current version. For each outdated traversal_path (in this case, 1/2/3), perform the following steps:

  1. Query the hierarchy_merge_requests table to find affected rows:

    SELECT id FROM hierarchy_merge_requests WHERE traversal_path = '1/2/3';
  2. Enqueue a refresh package using the retrieved id values (this feature already exists).
    Siphon will reinsert the corresponding siphon_merge_requests records by id, automatically setting the new, correct traversal_path.

  3. Run an INSERT query marking records with the old traversal_path as deleted:

    INSERT INTO hierarchy_merge_requests (traversal_path, id, deleted) (...);
  4. Mark the traversal_path_changes record with traversal_path = '1/2/3' as deleted so it won't be picked up again.

Implementation details

Ideally, we want to handle this in the consumer. We can introduce an optional configuration entry:

reinsert_using_change_table:
  - table: traversal_path_changes # monitor this table for changes
    group_by:
      - namespace_id # group traversal_path changes by namespace_id
    column: traversal_path
    materialized_views_to_refresh: # list of materialzied views to change + configure the refresh package
      - name: hierarchy_issues
        target_stream_identifier: issues
        target_keys:
          - id
      - name: hierarchy_merge_requests
        target_stream_identifier: merge_requests
        target_keys:
          - id

Additional notes:

  • Parallel processing: it's possible to do parallel processing by namespace_id. Maybe we could hash-partition the traversal_path_changes table and have N goroutines can monitoring the partitions?
Edited by Adam Hegyi