Skip to content

Background syncing of catalog_resources and projects

Leaminn Ma requested to merge add-catalog-resource-sync-events into master

What does this MR do and why?

This MR implements background syncing of catalog_resources and projects (#429376 (closed)). The full implementation involves a PG trigger which adds sync events to a queue table; then the sync events are processed by a worker.

Specifically, in this MR we:

  • Added catalog_resource_sync_events table which will serve as the queue table for sync events.
  • Added the corresponding Ci::Catalog::Resources::SyncEvent model which is structured similarly to Namespaces::SyncEvent.
    • This allows us to leverage the existing Ci::ProcessSyncEventsService for the worker.
    • Ci::ProcessSyncEventsService is updated to support the existing SyncEvent models (Projects::SyncEvent and Namespaces::SyncEvent) as well as the new Ci::Catalog::Resources::SyncEvent model. The latter marks records as processed instead of deleting them after processing.
  • Added a model callback for Ci::Catalog::Resource to sync the record on creation.
  • Implemented a PG trigger that adds a SyncEvent when the associated projects record of a catalog resource updates its name, description, or visibility_level column.
  • Created a worker Ci::Catalog::Resources::ProcessSyncEventsWorker that follows a similar set up as Namespaces::ProcessSyncEventsWorker.

Resolves Steps 1-3 of #429376 (closed).

Roll out issue for FF ci_process_catalog_resource_sync_events: #432963 (closed)

How to set up and validate locally

  1. Enable the feature flag: Feature.enable(:ci_process_catalog_resource_sync_events).
  2. Create a catalog resource.
  3. First observe that the catalog resource attributes match the project's.
project = Project.find(<YOUR-PROJECT-ID>)
project.slice(:name, :description, :visibility_level)
project.catalog_resource.reload.slice(:name, :description, :visibility_level)
  1. Update any of one of name, description, or visibility_level on the project with .update!.
project.update!(name: 'My new name')
project.catalog_resource.reload.name # We see that the name matches. Typically the worker processes the sync event almost instantaneously since our local queue is so small.
  1. Update any of one of name, description, or visibility_level on the project. Use update_column to ensure the model's callbacks aren't triggered; that way we can verify that the PG trigger works as expected.
project.update_column(:visibility_level, 20)
project.catalog_resource.reload.visibility_level # The value is not updated yet
Ci::Catalog::Resources::SyncEvent.last # We have a new sync event in pending status
Ci::Catalog::Resources::ProcessSyncEventsWorker.perform_async # Run the sync event worker
project.catalog_resource.reload.visibility_level # Now the value matches

Query plan

Up

Click to expand main: == [advisory_lock_connection] object_id: 184020, pg_backend_pid: 94588 main: == 20231124191759 AddCatalogResourceSyncEventsTable: migrating ================ main: -- create_table(:p_catalog_resource_sync_events, {:primary_key=>[:id, :partition_id], :options=>"PARTITION BY LIST (partition_id)", :if_not_exists=>true}) main: -> 0.0103s main: == 20231124191759 AddCatalogResourceSyncEventsTable: migrated (0.1227s) =======

main: == [advisory_lock_connection] object_id: 184020, pg_backend_pid: 94588 ci: == [advisory_lock_connection] object_id: 184120, pg_backend_pid: 95019 ci: == 20231124191759 AddCatalogResourceSyncEventsTable: migrating ================ ci: -- create_table(:p_catalog_resource_sync_events, {:primary_key=>[:id, :partition_id], :options=>"PARTITION BY LIST (partition_id)", :if_not_exists=>true}) ci: -> 0.0097s I, [2023-11-28T18:03:04.480192 #94861] INFO -- : Database: 'ci', Table: 'p_catalog_resource_sync_events': Lock Writes ci: == 20231124191759 AddCatalogResourceSyncEventsTable: migrated (0.1423s) =======

ci: == [advisory_lock_connection] object_id: 184120, pg_backend_pid: 95019 main: == [advisory_lock_connection] object_id: 184200, pg_backend_pid: 95464 main: == 20231124282441 AddCatalogResourceSyncEventTriggers: migrating ============== main: -- execute("CREATE OR REPLACE FUNCTION insert_catalog_resource_sync_event()\nRETURNS TRIGGER AS\n\nBEGIN\nINSERT INTO p_catalog_resource_sync_events (catalog_resource_id, project_id)\nSELECT id, OLD.id FROM catalog_resources\nWHERE project_id = OLD.id;\nRETURN NULL;\n\nEND\n LANGUAGE PLPGSQL\n") main: -> 0.0027s main: -- execute("CREATE TRIGGER trigger_catalog_resource_sync_event_on_project_update\nAFTER UPDATE ON projects\nFOR EACH ROW\nWHEN (\n OLD.name IS DISTINCT FROM NEW.name OR\n OLD.description IS DISTINCT FROM NEW.description OR\n OLD.visibility_level IS DISTINCT FROM NEW.visibility_level\n)\n\nEXECUTE FUNCTION insert_catalog_resource_sync_event()\n") main: -> 0.0014s main: == 20231124282441 AddCatalogResourceSyncEventTriggers: migrated (0.0077s) =====

main: == [advisory_lock_connection] object_id: 184200, pg_backend_pid: 95464 ci: == [advisory_lock_connection] object_id: 184200, pg_backend_pid: 95912 ci: == 20231124282441 AddCatalogResourceSyncEventTriggers: migrating ============== ci: -- execute("CREATE OR REPLACE FUNCTION insert_catalog_resource_sync_event()\nRETURNS TRIGGER AS\n\nBEGIN\nINSERT INTO p_catalog_resource_sync_events (catalog_resource_id, project_id)\nSELECT id, OLD.id FROM catalog_resources\nWHERE project_id = OLD.id;\nRETURN NULL;\n\nEND\n LANGUAGE PLPGSQL\n") ci: -> 0.0054s ci: -- execute("CREATE TRIGGER trigger_catalog_resource_sync_event_on_project_update\nAFTER UPDATE ON projects\nFOR EACH ROW\nWHEN (\n OLD.name IS DISTINCT FROM NEW.name OR\n OLD.description IS DISTINCT FROM NEW.description OR\n OLD.visibility_level IS DISTINCT FROM NEW.visibility_level\n)\n\nEXECUTE FUNCTION insert_catalog_resource_sync_event()\n") ci: -> 0.0024s ci: == 20231124282441 AddCatalogResourceSyncEventTriggers: migrated (0.0171s) =====

ci: == [advisory_lock_connection] object_id: 184200, pg_backend_pid: 95912

Down

Click to expand main: == [advisory_lock_connection] object_id: 184220, pg_backend_pid: 98637 main: == 20231124282441 AddCatalogResourceSyncEventTriggers: reverting ============== main: -- execute("DROP TRIGGER IF EXISTS trigger_catalog_resource_sync_event_on_project_update ON projects") main: -> 0.0016s main: -- execute("DROP FUNCTION IF EXISTS insert_catalog_resource_sync_event()") main: -> 0.0004s main: == 20231124282441 AddCatalogResourceSyncEventTriggers: reverted (0.0060s) =====

main: == [advisory_lock_connection] object_id: 184220, pg_backend_pid: 98637 ci: == [advisory_lock_connection] object_id: 184220, pg_backend_pid: 99074 ci: == 20231124282441 AddCatalogResourceSyncEventTriggers: reverting ============== ci: -- execute("DROP TRIGGER IF EXISTS trigger_catalog_resource_sync_event_on_project_update ON projects") ci: -> 0.0017s ci: -- execute("DROP FUNCTION IF EXISTS insert_catalog_resource_sync_event()") ci: -> 0.0003s ci: == 20231124282441 AddCatalogResourceSyncEventTriggers: reverted (0.0100s) =====

ci: == [advisory_lock_connection] object_id: 184220, pg_backend_pid: 99074 main: == [advisory_lock_connection] object_id: 184200, pg_backend_pid: 99508 main: == 20231124191759 AddCatalogResourceSyncEventsTable: reverting ================ main: -- drop_table(:p_catalog_resource_sync_events) main: -> 0.0019s main: == 20231124191759 AddCatalogResourceSyncEventsTable: reverted (0.0050s) =======

main: == [advisory_lock_connection] object_id: 184200, pg_backend_pid: 99508 ci: == [advisory_lock_connection] object_id: 184080, pg_backend_pid: 99941 ci: == 20231124191759 AddCatalogResourceSyncEventsTable: reverting ================ ci: -- drop_table(:p_catalog_resource_sync_events) ci: -> 0.0019s ci: == 20231124191759 AddCatalogResourceSyncEventsTable: reverted (0.0103s) =======

ci: == [advisory_lock_connection] object_id: 184080, pg_backend_pid: 99941

Query plans

  1. Load a batch of records for syncing:
Ci::Catalog::Resources::SyncEvent.unprocessed_events.first(1000)
SELECT "p_catalog_resource_sync_events"."id", "p_catalog_resource_sync_events"."catalog_resource_id", "p_catalog_resource_sync_events"."partition_id" AS partition
FROM "p_catalog_resource_sync_events"
WHERE "p_catalog_resource_sync_events"."status" = 1
ORDER BY "p_catalog_resource_sync_events"."id" ASC
LIMIT 1000

Plan:

 Limit  (cost=0.14..5.22 rows=5 width=24) (actual time=0.008..0.039 rows=35 loops=1)
   ->  Index Scan using p_catalog_resource_sync_events_1_id_idx on p_catalog_resource_sync_events_1 p_catalog_resource_sync_events  (cost=0.14..5.22 rows=5 width=24) (actual time=0.007..0.036 rows=35 loops=1)
 Planning Time: 0.417 ms
 Execution Time: 0.056 ms
  1. Mark the batch of records processed:
Ci::Catalog::Resources::SyncEvent.mark_records_processed(processed_events)
UPDATE "p_catalog_resource_sync_events"
SET "status" = 2
WHERE "p_catalog_resource_sync_events"."status" = 1
AND "p_catalog_resource_sync_events"."partition_id" = 1
AND "p_catalog_resource_sync_events"."id" IN (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

Plan:

 Update on p_catalog_resource_sync_events  (cost=0.14..4.54 rows=0 width=0) (actual time=0.169..0.169 rows=0 loops=1)
   Update on p_catalog_resource_sync_events_1
   ->  Index Scan using p_catalog_resource_sync_events_1_id_idx on p_catalog_resource_sync_events_1  (cost=0.14..4.54 rows=1 width=12) (actual time=0.022..0.026 rows=10 loops=1)
         Index Cond: (id = ANY ('{1,2,3,4,5,6,7,8,9,10}'::bigint[]))
         Filter: ((status = 1) AND (partition_id = 1))
 Planning Time: 0.199 ms
 Execution Time: 0.189 ms

MR acceptance checklist

This checklist encourages us to confirm any changes have been analyzed to reduce risks in quality, performance, reliability, security, and maintainability.

Related to #429376 (closed)

Edited by Leaminn Ma

Merge request reports

Loading