Skip to content

Fix BBMs for Http audit event destinations

What does this MR do and why?

Headers, filters (event type, namespace filters) were not migrated properly to the streaming tables.

This MR resolves this and makes the previous BBMs a noop by:

  • Filtering through all destinations (rather than just stream_destination is nil)
  • Do a 'sync' for already migrated records by checking the filters (event type, namepsace) and header tables
  • Do a full new migration for unmigrated records, using a more specific encrypted attribute method as the model uses
  • Additional checks to find missing data between the tables

The original migrations had issues with the encrypted attribute columns, the dependency tables were not properly migrated / created due to only filtering by stream_destination_id: nil,

Additional context / examples: #457623 (comment 2476039841)

Diff is larger due to making the older migrations and tests into aa noop, the main files are:

  • ee/lib/ee/gitlab/background_migration/fix_incomplete_external_audit_destinations.rb
  • ee/lib/ee/gitlab/background_migration/fix_incomplete_instance_external_audit_destinations.rb

Diff between the two migrations:

  • git diff master:ee/lib/ee/gitlab/background_migration/backfill_external_group_audit_event_destinations_fixed.rb 457623/fix-http-bbm-partial-conversions:ee/lib/ee/gitlab/background_migration/fix_incomplete_external_audit_destinations.rb
Group migration diff
diff --git a/ee/lib/ee/gitlab/background_migration/backfill_external_group_audit_event_destinations_fixed.rb b/ee/lib/ee/gitlab/background_migration/fix_incomplete_external_audit_destinations.rb
index 627f48257e03..86e99d3f9e47 100644
--- a/ee/lib/ee/gitlab/background_migration/backfill_external_group_audit_event_destinations_fixed.rb
+++ b/ee/lib/ee/gitlab/background_migration/fix_incomplete_external_audit_destinations.rb
@@ -3,35 +3,43 @@
 module EE
   module Gitlab
     module BackgroundMigration
-      module BackfillExternalGroupAuditEventDestinationsFixed
+      module FixIncompleteExternalAuditDestinations
         extend ActiveSupport::Concern
         extend ::Gitlab::Utils::Override
 
-        class AuditEventsExternalAuditEventDestination < ::ApplicationRecord
+        class ExternalAuditEventDestination < ::ApplicationRecord
           self.table_name = 'audit_events_external_audit_event_destinations'
+
+          belongs_to :group, class_name: '::Group', foreign_key: 'namespace_id'
+          belongs_to :stream_destination, class_name: 'GroupStreamingDestination', optional: true
+        end
+
+        class LegacyHeader < ::ApplicationRecord
+          self.table_name = 'audit_events_streaming_headers'
+        end
+
+        class LegacyEventTypeFilter < ::ApplicationRecord
+          self.table_name = 'audit_events_streaming_event_type_filters'
+        end
+
+        class LegacyGroupNamespaceFilter < ::ApplicationRecord
+          self.table_name = 'audit_events_streaming_http_group_namespace_filters'
         end
 
-        class GroupExternalStreamingDestination < ::ApplicationRecord
+        class GroupStreamingDestination < ::ApplicationRecord
+          include ::Gitlab::EncryptedAttribute
+
           self.table_name = 'audit_events_group_external_streaming_destinations'
           enum :category, { http: 0, gcp: 1, aws: 2 }
 
           attr_accessor :secret_token
 
-          before_validation :encrypt_secret_token, if: :secret_token
-
-          private
-
-          def encrypt_secret_token
-            key = Settings.db_key_base_keys_32_bytes.first
-            cipher = OpenSSL::Cipher.new('aes-256-gcm')
-            cipher.encrypt
-            iv = cipher.random_iv
-            cipher.key = Digest::SHA256.digest(key)[0...32]
-            cipher.iv = iv
-            encrypted = cipher.update(secret_token) + cipher.final
-            self.encrypted_secret_token = encrypted
-            self.encrypted_secret_token_iv = iv
-          end
+          attr_encrypted :secret_token,
+            mode: :per_attribute_iv,
+            key: :db_key_base_32,
+            algorithm: 'aes-256-gcm',
+            encode: false,
+            encode_iv: false
         end
 
         class GroupEventTypeFilter < ::ApplicationRecord
@@ -42,24 +50,10 @@ class GroupNamespaceFilter < ::ApplicationRecord
           self.table_name = 'audit_events_streaming_group_namespace_filters'
         end
 
-        class StreamingHeader < ::ApplicationRecord
-          self.table_name = 'audit_events_streaming_headers'
-        end
-
-        class StreamingEventTypeFilter < ::ApplicationRecord
-          self.table_name = 'audit_events_streaming_event_type_filters'
-        end
-
-        class StreamingHttpGroupNamespaceFilter < ::ApplicationRecord
-          self.table_name = 'audit_events_streaming_http_group_namespace_filters'
-        end
-
         prepended do
-          operation_name :backfill_external_group_audit_event_destinations
+          operation_name :fix_external_audit_destinations_migration
           feature_category :audit_events
-          scope_to ->(relation) do
-            relation.where(stream_destination_id: nil)
-          end
+          scope_to ->(relation) { relation }
         end
 
         override :perform
@@ -73,22 +67,29 @@ def perform
 
         def process_batch(sub_batch)
           sub_batch.each do |legacy_destination|
-            ApplicationRecord.transaction do
-              destination = create_streaming_destination(legacy_destination)
-              next unless destination
-
-              copy_event_type_filters(legacy_destination, destination)
-              copy_namespace_filter(legacy_destination, destination)
-              update_legacy_record(legacy_destination, destination)
+            ::ApplicationRecord.transaction do
+              if legacy_destination.stream_destination_id.present?
+                sync_migrated_record(legacy_destination)
+              else
+                migrate_new_record(legacy_destination)
+              end
             end
           end
         end
 
-        def create_streaming_destination(legacy_destination)
-          token = legacy_destination.verification_token
-          return unless token
+        def sync_migrated_record(legacy_destination)
+          destination = GroupStreamingDestination.find_by(id: legacy_destination.stream_destination_id)
+          return unless destination
+
+          sync_custom_headers(legacy_destination, destination)
+          sync_event_type_filters(legacy_destination, destination)
+          sync_namespace_filter(legacy_destination, destination)
+        end
+
+        def migrate_new_record(legacy_destination)
+          return unless legacy_destination.verification_token.present?
 
-          destination = GroupExternalStreamingDestination.new(
+          destination = GroupStreamingDestination.new(
             name: legacy_destination.name,
             category: :http,
             config: build_config(legacy_destination),
@@ -98,16 +99,20 @@ def create_streaming_destination(legacy_destination)
             updated_at: legacy_destination.updated_at
           )
 
-          destination.secret_token = token
-          destination.save!
+          destination.secret_token = legacy_destination.verification_token
+          return unless destination.save
+
+          legacy_destination.update_column(:stream_destination_id, destination.id)
+
+          migrate_event_type_filters(legacy_destination, destination)
+          migrate_namespace_filter(legacy_destination, destination)
+
           destination
         end
 
         def build_config(legacy_destination)
-          headers = StreamingHeader.where(
-            external_audit_event_destination_id: legacy_destination.id,
-            group_id: legacy_destination.namespace_id
-          ).pluck(:key, :value, :active)
+          headers = LegacyHeader.where(external_audit_event_destination_id: legacy_destination.id)
+                               .pluck(:key, :value, :active)
 
           header_config = {
             'X-Gitlab-Event-Streaming-Token' => {
@@ -129,10 +134,64 @@ def build_config(legacy_destination)
           }
         end
 
-        def copy_event_type_filters(source, destination)
-          filters = StreamingEventTypeFilter.where(
-            external_audit_event_destination_id: source.id,
-            group_id: source.namespace_id
+        def sync_custom_headers(legacy_destination, destination)
+          headers = LegacyHeader.where(external_audit_event_destination_id: legacy_destination.id)
+                              .pluck(:key, :value, :active)
+
+          return if headers.empty?
+
+          config = destination.config.deep_dup
+          config['headers'] ||= {}
+
+          if legacy_destination.verification_token.present? &&
+              !config['headers'].key?('X-Gitlab-Event-Streaming-Token')
+            config['headers']['X-Gitlab-Event-Streaming-Token'] = {
+              'value' => legacy_destination.verification_token,
+              'active' => true
+            }
+          end
+
+          headers.each do |key, value, active|
+            next if config['headers'].key?(key)
+
+            config['headers'][key] = {
+              'value' => value,
+              'active' => active
+            }
+          end
+
+          destination.update!(config: config)
+        end
+
+        def sync_event_type_filters(legacy_destination, destination)
+          existing_filters = GroupEventTypeFilter
+                              .where(external_streaming_destination_id: destination.id)
+                              .pluck(:audit_event_type)
+
+          legacy_filters = LegacyEventTypeFilter
+                            .where(external_audit_event_destination_id: legacy_destination.id)
+                            .pluck(:audit_event_type, :created_at, :updated_at)
+
+          missing_filters = legacy_filters.reject { |filter| existing_filters.include?(filter[0]) }
+
+          return if missing_filters.empty?
+
+          attributes = missing_filters.map do |audit_event_type, created_at, updated_at|
+            {
+              audit_event_type: audit_event_type,
+              created_at: created_at,
+              updated_at: updated_at,
+              external_streaming_destination_id: destination.id,
+              namespace_id: legacy_destination.namespace_id
+            }
+          end
+
+          GroupEventTypeFilter.insert_all!(attributes)
+        end
+
+        def migrate_event_type_filters(legacy_destination, destination)
+          filters = LegacyEventTypeFilter.where(
+            external_audit_event_destination_id: legacy_destination.id
           ).pluck(:audit_event_type, :created_at, :updated_at)
 
           return if filters.empty?
@@ -143,16 +202,34 @@ def copy_event_type_filters(source, destination)
               created_at: created_at,
               updated_at: updated_at,
               external_streaming_destination_id: destination.id,
-              namespace_id: source.namespace_id
+              namespace_id: legacy_destination.namespace_id
             }
           end
 
           GroupEventTypeFilter.insert_all!(attributes)
         end
 
-        def copy_namespace_filter(source, destination)
-          filter = StreamingHttpGroupNamespaceFilter.find_by(
-            external_audit_event_destination_id: source.id
+        def sync_namespace_filter(legacy_destination, destination)
+          existing_filter = GroupNamespaceFilter.find_by(external_streaming_destination_id: destination.id)
+          return if existing_filter
+
+          legacy_filter = LegacyGroupNamespaceFilter.find_by(
+            external_audit_event_destination_id: legacy_destination.id
+          )
+
+          return unless legacy_filter
+
+          GroupNamespaceFilter.create!(
+            namespace_id: legacy_filter.namespace_id,
+            external_streaming_destination_id: destination.id,
+            created_at: legacy_filter.created_at,
+            updated_at: legacy_filter.updated_at
+          )
+        end
+
+        def migrate_namespace_filter(legacy_destination, destination)
+          filter = LegacyGroupNamespaceFilter.find_by(
+            external_audit_event_destination_id: legacy_destination.id
           )
 
           return unless filter
@@ -164,10 +241,6 @@ def copy_namespace_filter(source, destination)
             updated_at: filter.updated_at
           )
         end
-
-        def update_legacy_record(source, destination)
-          source.update!(stream_destination_id: destination.id)
-        end
       end
     end
   end
  • git diff master:ee/lib/ee/gitlab/background_migration/backfill_external_instance_audit_event_destinations_fixed.rb 457623/fix-http-bbm-partial-conversions:ee/lib/ee/gitlab/background_migration/fix_incomplete_instance_external_audit_destinations.rb
Instance migration diff
diff --git a/ee/lib/ee/gitlab/background_migration/backfill_external_instance_audit_event_destinations_fixed.rb b/ee/lib/ee/gitlab/background_migration/fix_incomplete_instance_external_audit_destinations.rb
index 8d40fa44c3cf..48025a0b52d1 100644
--- a/ee/lib/ee/gitlab/background_migration/backfill_external_instance_audit_event_destinations_fixed.rb
+++ b/ee/lib/ee/gitlab/background_migration/fix_incomplete_instance_external_audit_destinations.rb
@@ -3,43 +3,66 @@
 module EE
   module Gitlab
     module BackgroundMigration
-      module BackfillExternalInstanceAuditEventDestinationsFixed
+      module FixIncompleteInstanceExternalAuditDestinations
         extend ActiveSupport::Concern
         extend ::Gitlab::Utils::Override
 
-        class AuditEventsExternalAuditEventDestination < ::ApplicationRecord
+        class InstanceExternalAuditEventDestination < ::ApplicationRecord
+          include ::Gitlab::EncryptedAttribute
+
           self.table_name = 'audit_events_instance_external_audit_event_destinations'
+
+          attr_accessor :verification_token
+
+          attr_encrypted :verification_token,
+            mode: :per_attribute_iv,
+            algorithm: 'aes-256-gcm',
+            key: :db_key_base_32,
+            encode: false,
+            encode_iv: false
         end
 
-        class InstanceExternalStreamingDestination < ::ApplicationRecord
-          self.table_name = 'audit_events_instance_external_streaming_destinations'
-          enum :category, { http: 0, gcp: 1, aws: 2 }
+        class LegacyInstanceHeader < ::ApplicationRecord
+          self.table_name = 'instance_audit_events_streaming_headers'
         end
 
-        class InstanceEventTypeFilter < ::ApplicationRecord
-          self.table_name = 'audit_events_instance_streaming_event_type_filters'
+        class LegacyInstanceEventTypeFilter < ::ApplicationRecord
+          self.table_name = 'audit_events_streaming_instance_event_type_filters'
         end
 
-        class InstanceNamespaceFilter < ::ApplicationRecord
-          self.table_name = 'audit_events_streaming_instance_namespace_filters'
+        class LegacyInstanceNamespaceFilter < ::ApplicationRecord
+          self.table_name = 'audit_events_streaming_http_instance_namespace_filters'
         end
 
-        class StreamingHeader < ::ApplicationRecord
-          self.table_name = 'instance_audit_events_streaming_headers'
+        class InstanceStreamingDestination < ::ApplicationRecord
+          include ::Gitlab::EncryptedAttribute
+
+          self.table_name = 'audit_events_instance_external_streaming_destinations'
+          enum :category, { http: 0, gcp: 1, aws: 2 }
+
+          attr_accessor :secret_token
+
+          attr_encrypted :secret_token,
+            mode: :per_attribute_iv,
+            key: :db_key_base_32,
+            algorithm: 'aes-256-gcm',
+            encode: false,
+            encode_iv: false
         end
 
-        class StreamingEventTypeFilter < ::ApplicationRecord
-          self.table_name = 'audit_events_streaming_instance_event_type_filters'
+        class InstanceEventTypeFilter < ::ApplicationRecord
+          self.table_name = 'audit_events_instance_streaming_event_type_filters'
         end
 
-        class StreamingHttpInstanceNamespaceFilter < ::ApplicationRecord
-          self.table_name = 'audit_events_streaming_http_instance_namespace_filters'
+        class InstanceNamespaceFilter < ::ApplicationRecord
+          self.table_name = 'audit_events_streaming_instance_namespace_filters'
         end
 
         prepended do
-          operation_name :backfill_external_instance_audit_event_destinations
+          operation_name :fix_instance_external_audit_destinations_migration
           feature_category :audit_events
-          scope_to ->(relation) { relation.where(stream_destination_id: nil) }
+
+          scope_to ->(relation) { relation }
         end
 
         override :perform
@@ -53,51 +76,62 @@ def perform
 
         def process_batch(sub_batch)
           sub_batch.each do |legacy_destination|
-            ApplicationRecord.transaction do
-              destination = create_streaming_destination(legacy_destination)
-              next unless destination
-
-              copy_event_type_filters(legacy_destination, destination)
-              copy_namespace_filter(legacy_destination, destination)
-              update_legacy_record(legacy_destination, destination)
+            ::ApplicationRecord.transaction do
+              if legacy_destination.stream_destination_id.present?
+                sync_migrated_record(legacy_destination)
+              else
+                migrate_new_record(legacy_destination)
+              end
             end
           end
         end
 
-        def create_streaming_destination(legacy_destination)
-          encrypted_token, encrypted_iv = AuditEventsExternalAuditEventDestination
-            .where(id: legacy_destination.id)
-            .pick(:encrypted_verification_token, :encrypted_verification_token_iv)
+        def sync_migrated_record(legacy_destination)
+          destination = InstanceStreamingDestination.find_by(id: legacy_destination.stream_destination_id)
+          return unless destination
 
-          return unless encrypted_token.present?
+          sync_custom_headers(legacy_destination, destination)
+          sync_event_type_filters(legacy_destination, destination)
+          sync_namespace_filters(legacy_destination, destination)
+        end
 
-          token_for_config = get_verification_token(legacy_destination.id)
-          return unless token_for_config
+        def migrate_new_record(legacy_destination)
+          token = decrypt_verification_token(legacy_destination)
+          return unless token
 
-          destination = InstanceExternalStreamingDestination.new(
+          destination = InstanceStreamingDestination.new(
             name: legacy_destination.name,
             category: :http,
-            config: build_config(legacy_destination, token_for_config),
+            config: build_config(legacy_destination, token),
             legacy_destination_ref: legacy_destination.id,
             created_at: legacy_destination.created_at,
             updated_at: legacy_destination.updated_at
           )
 
-          destination.encrypted_secret_token = encrypted_token
-          destination.encrypted_secret_token_iv = encrypted_iv
+          destination.secret_token = token
 
-          destination.save!(validate: false)
-          destination
+          return unless destination.save
+
+          legacy_destination.update_column(:stream_destination_id, destination.id)
+
+          sync_event_type_filters(legacy_destination, destination)
+          sync_namespace_filters(legacy_destination, destination)
         end
 
-        def get_verification_token(destination_id)
-          model_class = ::AuditEvents::InstanceExternalAuditEventDestination
-          legacy_model = model_class.find_by(id: destination_id)
-          legacy_model&.verification_token
+        def decrypt_verification_token(legacy_destination)
+          return unless legacy_destination.encrypted_verification_token.present? &&
+            legacy_destination.encrypted_verification_token_iv.present?
+
+          ::Gitlab::CryptoHelper.aes256_gcm_decrypt(
+            legacy_destination.encrypted_verification_token,
+            nonce: legacy_destination.encrypted_verification_token_iv
+          )
+        rescue OpenSSL::Cipher::CipherError
+          nil
         end
 
         def build_config(legacy_destination, token)
-          headers = StreamingHeader.where(
+          headers = LegacyInstanceHeader.where(
             instance_external_audit_event_destination_id: legacy_destination.id
           ).pluck(:key, :value, :active)
 
@@ -121,14 +155,42 @@ def build_config(legacy_destination, token)
           }
         end
 
-        def copy_event_type_filters(source, destination)
-          filters = StreamingEventTypeFilter.where(
-            instance_external_audit_event_destination_id: source.id
-          ).pluck(:audit_event_type, :created_at, :updated_at)
+        def sync_custom_headers(legacy_destination, destination)
+          headers = LegacyInstanceHeader.where(
+            instance_external_audit_event_destination_id: legacy_destination.id
+          ).pluck(:key, :value, :active)
+
+          return if headers.empty?
 
-          return if filters.empty?
+          config = destination.config.deep_dup
+          config['headers'] ||= {}
+
+          headers.each do |key, value, active|
+            next if config['headers'].key?(key)
+
+            config['headers'][key] = {
+              'value' => value,
+              'active' => active
+            }
+          end
 
-          attributes = filters.map do |audit_event_type, created_at, updated_at|
+          destination.update!(config: config)
+        end
+
+        def sync_event_type_filters(legacy_destination, destination)
+          existing_filters = InstanceEventTypeFilter
+                            .where(external_streaming_destination_id: destination.id)
+                            .pluck(:audit_event_type)
+
+          legacy_filters = LegacyInstanceEventTypeFilter
+                          .where(instance_external_audit_event_destination_id: legacy_destination.id)
+                          .pluck(:audit_event_type, :created_at, :updated_at)
+
+          missing_filters = legacy_filters.reject { |filter| existing_filters.include?(filter[0]) }
+
+          return if missing_filters.empty?
+
+          attributes = missing_filters.map do |audit_event_type, created_at, updated_at|
             {
               audit_event_type: audit_event_type,
               created_at: created_at,
@@ -140,23 +202,27 @@ def copy_event_type_filters(source, destination)
           InstanceEventTypeFilter.insert_all!(attributes)
         end
 
-        def copy_namespace_filter(source, destination)
-          filter = StreamingHttpInstanceNamespaceFilter.find_by(
-            audit_events_instance_external_audit_event_destination_id: source.id
-          )
+        def sync_namespace_filters(legacy_destination, destination)
+          existing_filter_namespace_ids = InstanceNamespaceFilter
+                                          .where(external_streaming_destination_id: destination.id)
+                                          .pluck(:namespace_id)
 
-          return unless filter
+          legacy_filters = LegacyInstanceNamespaceFilter
+                            .where(audit_events_instance_external_audit_event_destination_id: legacy_destination.id)
+                            .where.not(namespace_id: existing_filter_namespace_ids)
 
-          InstanceNamespaceFilter.create!(
-            namespace_id: filter.namespace_id,
-            external_streaming_destination_id: destination.id,
-            created_at: filter.created_at,
-            updated_at: filter.updated_at
-          )
-        end
+          return if legacy_filters.empty?
+
+          attributes = legacy_filters.map do |filter|
+            {
+              namespace_id: filter.namespace_id,
+              external_streaming_destination_id: destination.id,
+              created_at: filter.created_at,
+              updated_at: filter.updated_at
+            }
+          end
 
-        def update_legacy_record(source, destination)
-          source.update!(stream_destination_id: destination.id)
+          InstanceNamespaceFilter.insert_all!(attributes)
         end
       end
     end

Query Plans

References

Screenshots or screen recordings

Before After

How to set up and validate locally

MR acceptance checklist

Evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.

Edited by Andrew Jung

Merge request reports

Loading