Background syncing of catalog_resources and projects
What does this MR do and why?
This MR implements background syncing of catalog_resources
and projects
(#429376 (closed)). The full implementation involves a PG trigger which adds sync events to a queue table; then the sync events are processed by a worker.
Specifically, in this MR we:
- Added
catalog_resource_sync_events
table which will serve as the queue table for sync events.- This table is partitioned with List strategy as suggested in !137238 (comment 1665396792).
- Added the corresponding
Ci::Catalog::Resources::SyncEvent
model which is structured similarly to Namespaces::SyncEvent.- This allows us to leverage the existing Ci::ProcessSyncEventsService for the worker.
- Ci::ProcessSyncEventsService is updated to support the existing SyncEvent models (Projects::SyncEvent and Namespaces::SyncEvent) as well as the new Ci::Catalog::Resources::SyncEvent model. The latter marks records as processed instead of deleting them after processing.
- Added a model callback for
Ci::Catalog::Resource
to sync the record on creation. - Implemented a PG trigger that adds a SyncEvent when the associated
projects
record of a catalog resource updates its name, description, or visibility_level column. - Created a worker
Ci::Catalog::Resources::ProcessSyncEventsWorker
that follows a similar set up as Namespaces::ProcessSyncEventsWorker.
Resolves Steps 1-3 of #429376 (closed).
Roll out issue for FF ci_process_catalog_resource_sync_events
: #432963 (closed)
How to set up and validate locally
- Enable the feature flag:
Feature.enable(:ci_process_catalog_resource_sync_events)
. - Create a catalog resource.
- First observe that the catalog resource attributes match the project's.
project = Project.find(<YOUR-PROJECT-ID>)
project.slice(:name, :description, :visibility_level)
project.catalog_resource.reload.slice(:name, :description, :visibility_level)
- Update any of one of
name
,description
, orvisibility_level
on the project with.update!
.
project.update!(name: 'My new name')
project.catalog_resource.reload.name # We see that the name matches. Typically the worker processes the sync event almost instantaneously since our local queue is so small.
- Update any of one of
name
,description
, orvisibility_level
on the project. Useupdate_column
to ensure the model's callbacks aren't triggered; that way we can verify that the PG trigger works as expected.
project.update_column(:visibility_level, 20)
project.catalog_resource.reload.visibility_level # The value is not updated yet
Ci::Catalog::Resources::SyncEvent.last # We have a new sync event in pending status
Ci::Catalog::Resources::ProcessSyncEventsWorker.perform_async # Run the sync event worker
project.catalog_resource.reload.visibility_level # Now the value matches
Query plan
Up
Click to expand
main: == [advisory_lock_connection] object_id: 184020, pg_backend_pid: 94588 main: == 20231124191759 AddCatalogResourceSyncEventsTable: migrating ================ main: -- create_table(:p_catalog_resource_sync_events, {:primary_key=>[:id, :partition_id], :options=>"PARTITION BY LIST (partition_id)", :if_not_exists=>true}) main: -> 0.0103s main: == 20231124191759 AddCatalogResourceSyncEventsTable: migrated (0.1227s) =======main: == [advisory_lock_connection] object_id: 184020, pg_backend_pid: 94588 ci: == [advisory_lock_connection] object_id: 184120, pg_backend_pid: 95019 ci: == 20231124191759 AddCatalogResourceSyncEventsTable: migrating ================ ci: -- create_table(:p_catalog_resource_sync_events, {:primary_key=>[:id, :partition_id], :options=>"PARTITION BY LIST (partition_id)", :if_not_exists=>true}) ci: -> 0.0097s I, [2023-11-28T18:03:04.480192 #94861] INFO -- : Database: 'ci', Table: 'p_catalog_resource_sync_events': Lock Writes ci: == 20231124191759 AddCatalogResourceSyncEventsTable: migrated (0.1423s) =======
ci: == [advisory_lock_connection] object_id: 184120, pg_backend_pid: 95019
main: == [advisory_lock_connection] object_id: 184200, pg_backend_pid: 95464
main: == 20231124282441 AddCatalogResourceSyncEventTriggers: migrating ==============
main: -- execute("CREATE OR REPLACE FUNCTION insert_catalog_resource_sync_event()\nRETURNS TRIGGER AS\n\nBEGIN\nINSERT INTO p_catalog_resource_sync_events (catalog_resource_id, project_id)\nSELECT id, OLD.id FROM catalog_resources\nWHERE project_id = OLD.id;\nRETURN NULL;\n\nEND\n
LANGUAGE PLPGSQL\n")
main: -> 0.0027s
main: -- execute("CREATE TRIGGER trigger_catalog_resource_sync_event_on_project_update\nAFTER UPDATE ON projects\nFOR EACH ROW\nWHEN (\n OLD.name IS DISTINCT FROM NEW.name OR\n OLD.description IS DISTINCT FROM NEW.description OR\n OLD.visibility_level IS DISTINCT FROM NEW.visibility_level\n)\n\nEXECUTE FUNCTION insert_catalog_resource_sync_event()\n")
main: -> 0.0014s
main: == 20231124282441 AddCatalogResourceSyncEventTriggers: migrated (0.0077s) =====
main: == [advisory_lock_connection] object_id: 184200, pg_backend_pid: 95464
ci: == [advisory_lock_connection] object_id: 184200, pg_backend_pid: 95912
ci: == 20231124282441 AddCatalogResourceSyncEventTriggers: migrating ==============
ci: -- execute("CREATE OR REPLACE FUNCTION insert_catalog_resource_sync_event()\nRETURNS TRIGGER AS\n\nBEGIN\nINSERT INTO p_catalog_resource_sync_events (catalog_resource_id, project_id)\nSELECT id, OLD.id FROM catalog_resources\nWHERE project_id = OLD.id;\nRETURN NULL;\n\nEND\n
LANGUAGE PLPGSQL\n")
ci: -> 0.0054s
ci: -- execute("CREATE TRIGGER trigger_catalog_resource_sync_event_on_project_update\nAFTER UPDATE ON projects\nFOR EACH ROW\nWHEN (\n OLD.name IS DISTINCT FROM NEW.name OR\n OLD.description IS DISTINCT FROM NEW.description OR\n OLD.visibility_level IS DISTINCT FROM NEW.visibility_level\n)\n\nEXECUTE FUNCTION insert_catalog_resource_sync_event()\n")
ci: -> 0.0024s
ci: == 20231124282441 AddCatalogResourceSyncEventTriggers: migrated (0.0171s) =====
ci: == [advisory_lock_connection] object_id: 184200, pg_backend_pid: 95912
Down
Click to expand
main: == [advisory_lock_connection] object_id: 184220, pg_backend_pid: 98637 main: == 20231124282441 AddCatalogResourceSyncEventTriggers: reverting ============== main: -- execute("DROP TRIGGER IF EXISTS trigger_catalog_resource_sync_event_on_project_update ON projects") main: -> 0.0016s main: -- execute("DROP FUNCTION IF EXISTS insert_catalog_resource_sync_event()") main: -> 0.0004s main: == 20231124282441 AddCatalogResourceSyncEventTriggers: reverted (0.0060s) =====main: == [advisory_lock_connection] object_id: 184220, pg_backend_pid: 98637 ci: == [advisory_lock_connection] object_id: 184220, pg_backend_pid: 99074 ci: == 20231124282441 AddCatalogResourceSyncEventTriggers: reverting ============== ci: -- execute("DROP TRIGGER IF EXISTS trigger_catalog_resource_sync_event_on_project_update ON projects") ci: -> 0.0017s ci: -- execute("DROP FUNCTION IF EXISTS insert_catalog_resource_sync_event()") ci: -> 0.0003s ci: == 20231124282441 AddCatalogResourceSyncEventTriggers: reverted (0.0100s) =====
ci: == [advisory_lock_connection] object_id: 184220, pg_backend_pid: 99074 main: == [advisory_lock_connection] object_id: 184200, pg_backend_pid: 99508 main: == 20231124191759 AddCatalogResourceSyncEventsTable: reverting ================ main: -- drop_table(:p_catalog_resource_sync_events) main: -> 0.0019s main: == 20231124191759 AddCatalogResourceSyncEventsTable: reverted (0.0050s) =======
main: == [advisory_lock_connection] object_id: 184200, pg_backend_pid: 99508 ci: == [advisory_lock_connection] object_id: 184080, pg_backend_pid: 99941 ci: == 20231124191759 AddCatalogResourceSyncEventsTable: reverting ================ ci: -- drop_table(:p_catalog_resource_sync_events) ci: -> 0.0019s ci: == 20231124191759 AddCatalogResourceSyncEventsTable: reverted (0.0103s) =======
ci: == [advisory_lock_connection] object_id: 184080, pg_backend_pid: 99941
Query plans
- Load a batch of records for syncing:
Ci::Catalog::Resources::SyncEvent.unprocessed_events.first(1000)
SELECT "p_catalog_resource_sync_events"."id", "p_catalog_resource_sync_events"."catalog_resource_id", "p_catalog_resource_sync_events"."partition_id" AS partition
FROM "p_catalog_resource_sync_events"
WHERE "p_catalog_resource_sync_events"."status" = 1
ORDER BY "p_catalog_resource_sync_events"."id" ASC
LIMIT 1000
Plan:
Limit (cost=0.14..5.22 rows=5 width=24) (actual time=0.008..0.039 rows=35 loops=1)
-> Index Scan using p_catalog_resource_sync_events_1_id_idx on p_catalog_resource_sync_events_1 p_catalog_resource_sync_events (cost=0.14..5.22 rows=5 width=24) (actual time=0.007..0.036 rows=35 loops=1)
Planning Time: 0.417 ms
Execution Time: 0.056 ms
- Mark the batch of records processed:
Ci::Catalog::Resources::SyncEvent.mark_records_processed(processed_events)
UPDATE "p_catalog_resource_sync_events"
SET "status" = 2
WHERE "p_catalog_resource_sync_events"."status" = 1
AND "p_catalog_resource_sync_events"."partition_id" = 1
AND "p_catalog_resource_sync_events"."id" IN (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Plan:
Update on p_catalog_resource_sync_events (cost=0.14..4.54 rows=0 width=0) (actual time=0.169..0.169 rows=0 loops=1)
Update on p_catalog_resource_sync_events_1
-> Index Scan using p_catalog_resource_sync_events_1_id_idx on p_catalog_resource_sync_events_1 (cost=0.14..4.54 rows=1 width=12) (actual time=0.022..0.026 rows=10 loops=1)
Index Cond: (id = ANY ('{1,2,3,4,5,6,7,8,9,10}'::bigint[]))
Filter: ((status = 1) AND (partition_id = 1))
Planning Time: 0.199 ms
Execution Time: 0.189 ms
MR acceptance checklist
This checklist encourages us to confirm any changes have been analyzed to reduce risks in quality, performance, reliability, security, and maintainability.
-
I have evaluated the MR acceptance checklist for this MR.
Related to #429376 (closed)