Fix BBMs for Http audit event destinations
What does this MR do and why?
Headers, filters (event type, namespace filters) were not migrated properly to the streaming tables.
This MR resolves this and makes the previous BBMs a noop by:
- Filtering through all destinations (rather than just stream_destination is nil)
- Do a 'sync' for already migrated records by checking the filters (event type, namepsace) and header tables
- Do a full new migration for unmigrated records, using a more specific encrypted attribute method as the model uses
- Additional checks to find missing data between the tables
The original migrations had issues with the encrypted attribute columns, the dependency tables were not properly migrated / created due to only filtering by stream_destination_id: nil,
Additional context / examples: #457623 (comment 2476039841)
Diff is larger due to making the older migrations and tests into aa noop, the main files are:
ee/lib/ee/gitlab/background_migration/fix_incomplete_external_audit_destinations.rbee/lib/ee/gitlab/background_migration/fix_incomplete_instance_external_audit_destinations.rb
Diff between the two migrations:
git diff master:ee/lib/ee/gitlab/background_migration/backfill_external_group_audit_event_destinations_fixed.rb 457623/fix-http-bbm-partial-conversions:ee/lib/ee/gitlab/background_migration/fix_incomplete_external_audit_destinations.rb
Group migration diff
diff --git a/ee/lib/ee/gitlab/background_migration/backfill_external_group_audit_event_destinations_fixed.rb b/ee/lib/ee/gitlab/background_migration/fix_incomplete_external_audit_destinations.rb
index 627f48257e03..86e99d3f9e47 100644
--- a/ee/lib/ee/gitlab/background_migration/backfill_external_group_audit_event_destinations_fixed.rb
+++ b/ee/lib/ee/gitlab/background_migration/fix_incomplete_external_audit_destinations.rb
@@ -3,35 +3,43 @@
module EE
module Gitlab
module BackgroundMigration
- module BackfillExternalGroupAuditEventDestinationsFixed
+ module FixIncompleteExternalAuditDestinations
extend ActiveSupport::Concern
extend ::Gitlab::Utils::Override
- class AuditEventsExternalAuditEventDestination < ::ApplicationRecord
+ class ExternalAuditEventDestination < ::ApplicationRecord
self.table_name = 'audit_events_external_audit_event_destinations'
+
+ belongs_to :group, class_name: '::Group', foreign_key: 'namespace_id'
+ belongs_to :stream_destination, class_name: 'GroupStreamingDestination', optional: true
+ end
+
+ class LegacyHeader < ::ApplicationRecord
+ self.table_name = 'audit_events_streaming_headers'
+ end
+
+ class LegacyEventTypeFilter < ::ApplicationRecord
+ self.table_name = 'audit_events_streaming_event_type_filters'
+ end
+
+ class LegacyGroupNamespaceFilter < ::ApplicationRecord
+ self.table_name = 'audit_events_streaming_http_group_namespace_filters'
end
- class GroupExternalStreamingDestination < ::ApplicationRecord
+ class GroupStreamingDestination < ::ApplicationRecord
+ include ::Gitlab::EncryptedAttribute
+
self.table_name = 'audit_events_group_external_streaming_destinations'
enum :category, { http: 0, gcp: 1, aws: 2 }
attr_accessor :secret_token
- before_validation :encrypt_secret_token, if: :secret_token
-
- private
-
- def encrypt_secret_token
- key = Settings.db_key_base_keys_32_bytes.first
- cipher = OpenSSL::Cipher.new('aes-256-gcm')
- cipher.encrypt
- iv = cipher.random_iv
- cipher.key = Digest::SHA256.digest(key)[0...32]
- cipher.iv = iv
- encrypted = cipher.update(secret_token) + cipher.final
- self.encrypted_secret_token = encrypted
- self.encrypted_secret_token_iv = iv
- end
+ attr_encrypted :secret_token,
+ mode: :per_attribute_iv,
+ key: :db_key_base_32,
+ algorithm: 'aes-256-gcm',
+ encode: false,
+ encode_iv: false
end
class GroupEventTypeFilter < ::ApplicationRecord
@@ -42,24 +50,10 @@ class GroupNamespaceFilter < ::ApplicationRecord
self.table_name = 'audit_events_streaming_group_namespace_filters'
end
- class StreamingHeader < ::ApplicationRecord
- self.table_name = 'audit_events_streaming_headers'
- end
-
- class StreamingEventTypeFilter < ::ApplicationRecord
- self.table_name = 'audit_events_streaming_event_type_filters'
- end
-
- class StreamingHttpGroupNamespaceFilter < ::ApplicationRecord
- self.table_name = 'audit_events_streaming_http_group_namespace_filters'
- end
-
prepended do
- operation_name :backfill_external_group_audit_event_destinations
+ operation_name :fix_external_audit_destinations_migration
feature_category :audit_events
- scope_to ->(relation) do
- relation.where(stream_destination_id: nil)
- end
+ scope_to ->(relation) { relation }
end
override :perform
@@ -73,22 +67,29 @@ def perform
def process_batch(sub_batch)
sub_batch.each do |legacy_destination|
- ApplicationRecord.transaction do
- destination = create_streaming_destination(legacy_destination)
- next unless destination
-
- copy_event_type_filters(legacy_destination, destination)
- copy_namespace_filter(legacy_destination, destination)
- update_legacy_record(legacy_destination, destination)
+ ::ApplicationRecord.transaction do
+ if legacy_destination.stream_destination_id.present?
+ sync_migrated_record(legacy_destination)
+ else
+ migrate_new_record(legacy_destination)
+ end
end
end
end
- def create_streaming_destination(legacy_destination)
- token = legacy_destination.verification_token
- return unless token
+ def sync_migrated_record(legacy_destination)
+ destination = GroupStreamingDestination.find_by(id: legacy_destination.stream_destination_id)
+ return unless destination
+
+ sync_custom_headers(legacy_destination, destination)
+ sync_event_type_filters(legacy_destination, destination)
+ sync_namespace_filter(legacy_destination, destination)
+ end
+
+ def migrate_new_record(legacy_destination)
+ return unless legacy_destination.verification_token.present?
- destination = GroupExternalStreamingDestination.new(
+ destination = GroupStreamingDestination.new(
name: legacy_destination.name,
category: :http,
config: build_config(legacy_destination),
@@ -98,16 +99,20 @@ def create_streaming_destination(legacy_destination)
updated_at: legacy_destination.updated_at
)
- destination.secret_token = token
- destination.save!
+ destination.secret_token = legacy_destination.verification_token
+ return unless destination.save
+
+ legacy_destination.update_column(:stream_destination_id, destination.id)
+
+ migrate_event_type_filters(legacy_destination, destination)
+ migrate_namespace_filter(legacy_destination, destination)
+
destination
end
def build_config(legacy_destination)
- headers = StreamingHeader.where(
- external_audit_event_destination_id: legacy_destination.id,
- group_id: legacy_destination.namespace_id
- ).pluck(:key, :value, :active)
+ headers = LegacyHeader.where(external_audit_event_destination_id: legacy_destination.id)
+ .pluck(:key, :value, :active)
header_config = {
'X-Gitlab-Event-Streaming-Token' => {
@@ -129,10 +134,64 @@ def build_config(legacy_destination)
}
end
- def copy_event_type_filters(source, destination)
- filters = StreamingEventTypeFilter.where(
- external_audit_event_destination_id: source.id,
- group_id: source.namespace_id
+ def sync_custom_headers(legacy_destination, destination)
+ headers = LegacyHeader.where(external_audit_event_destination_id: legacy_destination.id)
+ .pluck(:key, :value, :active)
+
+ return if headers.empty?
+
+ config = destination.config.deep_dup
+ config['headers'] ||= {}
+
+ if legacy_destination.verification_token.present? &&
+ !config['headers'].key?('X-Gitlab-Event-Streaming-Token')
+ config['headers']['X-Gitlab-Event-Streaming-Token'] = {
+ 'value' => legacy_destination.verification_token,
+ 'active' => true
+ }
+ end
+
+ headers.each do |key, value, active|
+ next if config['headers'].key?(key)
+
+ config['headers'][key] = {
+ 'value' => value,
+ 'active' => active
+ }
+ end
+
+ destination.update!(config: config)
+ end
+
+ def sync_event_type_filters(legacy_destination, destination)
+ existing_filters = GroupEventTypeFilter
+ .where(external_streaming_destination_id: destination.id)
+ .pluck(:audit_event_type)
+
+ legacy_filters = LegacyEventTypeFilter
+ .where(external_audit_event_destination_id: legacy_destination.id)
+ .pluck(:audit_event_type, :created_at, :updated_at)
+
+ missing_filters = legacy_filters.reject { |filter| existing_filters.include?(filter[0]) }
+
+ return if missing_filters.empty?
+
+ attributes = missing_filters.map do |audit_event_type, created_at, updated_at|
+ {
+ audit_event_type: audit_event_type,
+ created_at: created_at,
+ updated_at: updated_at,
+ external_streaming_destination_id: destination.id,
+ namespace_id: legacy_destination.namespace_id
+ }
+ end
+
+ GroupEventTypeFilter.insert_all!(attributes)
+ end
+
+ def migrate_event_type_filters(legacy_destination, destination)
+ filters = LegacyEventTypeFilter.where(
+ external_audit_event_destination_id: legacy_destination.id
).pluck(:audit_event_type, :created_at, :updated_at)
return if filters.empty?
@@ -143,16 +202,34 @@ def copy_event_type_filters(source, destination)
created_at: created_at,
updated_at: updated_at,
external_streaming_destination_id: destination.id,
- namespace_id: source.namespace_id
+ namespace_id: legacy_destination.namespace_id
}
end
GroupEventTypeFilter.insert_all!(attributes)
end
- def copy_namespace_filter(source, destination)
- filter = StreamingHttpGroupNamespaceFilter.find_by(
- external_audit_event_destination_id: source.id
+ def sync_namespace_filter(legacy_destination, destination)
+ existing_filter = GroupNamespaceFilter.find_by(external_streaming_destination_id: destination.id)
+ return if existing_filter
+
+ legacy_filter = LegacyGroupNamespaceFilter.find_by(
+ external_audit_event_destination_id: legacy_destination.id
+ )
+
+ return unless legacy_filter
+
+ GroupNamespaceFilter.create!(
+ namespace_id: legacy_filter.namespace_id,
+ external_streaming_destination_id: destination.id,
+ created_at: legacy_filter.created_at,
+ updated_at: legacy_filter.updated_at
+ )
+ end
+
+ def migrate_namespace_filter(legacy_destination, destination)
+ filter = LegacyGroupNamespaceFilter.find_by(
+ external_audit_event_destination_id: legacy_destination.id
)
return unless filter
@@ -164,10 +241,6 @@ def copy_namespace_filter(source, destination)
updated_at: filter.updated_at
)
end
-
- def update_legacy_record(source, destination)
- source.update!(stream_destination_id: destination.id)
- end
end
end
end
git diff master:ee/lib/ee/gitlab/background_migration/backfill_external_instance_audit_event_destinations_fixed.rb 457623/fix-http-bbm-partial-conversions:ee/lib/ee/gitlab/background_migration/fix_incomplete_instance_external_audit_destinations.rb
Instance migration diff
diff --git a/ee/lib/ee/gitlab/background_migration/backfill_external_instance_audit_event_destinations_fixed.rb b/ee/lib/ee/gitlab/background_migration/fix_incomplete_instance_external_audit_destinations.rb
index 8d40fa44c3cf..48025a0b52d1 100644
--- a/ee/lib/ee/gitlab/background_migration/backfill_external_instance_audit_event_destinations_fixed.rb
+++ b/ee/lib/ee/gitlab/background_migration/fix_incomplete_instance_external_audit_destinations.rb
@@ -3,43 +3,66 @@
module EE
module Gitlab
module BackgroundMigration
- module BackfillExternalInstanceAuditEventDestinationsFixed
+ module FixIncompleteInstanceExternalAuditDestinations
extend ActiveSupport::Concern
extend ::Gitlab::Utils::Override
- class AuditEventsExternalAuditEventDestination < ::ApplicationRecord
+ class InstanceExternalAuditEventDestination < ::ApplicationRecord
+ include ::Gitlab::EncryptedAttribute
+
self.table_name = 'audit_events_instance_external_audit_event_destinations'
+
+ attr_accessor :verification_token
+
+ attr_encrypted :verification_token,
+ mode: :per_attribute_iv,
+ algorithm: 'aes-256-gcm',
+ key: :db_key_base_32,
+ encode: false,
+ encode_iv: false
end
- class InstanceExternalStreamingDestination < ::ApplicationRecord
- self.table_name = 'audit_events_instance_external_streaming_destinations'
- enum :category, { http: 0, gcp: 1, aws: 2 }
+ class LegacyInstanceHeader < ::ApplicationRecord
+ self.table_name = 'instance_audit_events_streaming_headers'
end
- class InstanceEventTypeFilter < ::ApplicationRecord
- self.table_name = 'audit_events_instance_streaming_event_type_filters'
+ class LegacyInstanceEventTypeFilter < ::ApplicationRecord
+ self.table_name = 'audit_events_streaming_instance_event_type_filters'
end
- class InstanceNamespaceFilter < ::ApplicationRecord
- self.table_name = 'audit_events_streaming_instance_namespace_filters'
+ class LegacyInstanceNamespaceFilter < ::ApplicationRecord
+ self.table_name = 'audit_events_streaming_http_instance_namespace_filters'
end
- class StreamingHeader < ::ApplicationRecord
- self.table_name = 'instance_audit_events_streaming_headers'
+ class InstanceStreamingDestination < ::ApplicationRecord
+ include ::Gitlab::EncryptedAttribute
+
+ self.table_name = 'audit_events_instance_external_streaming_destinations'
+ enum :category, { http: 0, gcp: 1, aws: 2 }
+
+ attr_accessor :secret_token
+
+ attr_encrypted :secret_token,
+ mode: :per_attribute_iv,
+ key: :db_key_base_32,
+ algorithm: 'aes-256-gcm',
+ encode: false,
+ encode_iv: false
end
- class StreamingEventTypeFilter < ::ApplicationRecord
- self.table_name = 'audit_events_streaming_instance_event_type_filters'
+ class InstanceEventTypeFilter < ::ApplicationRecord
+ self.table_name = 'audit_events_instance_streaming_event_type_filters'
end
- class StreamingHttpInstanceNamespaceFilter < ::ApplicationRecord
- self.table_name = 'audit_events_streaming_http_instance_namespace_filters'
+ class InstanceNamespaceFilter < ::ApplicationRecord
+ self.table_name = 'audit_events_streaming_instance_namespace_filters'
end
prepended do
- operation_name :backfill_external_instance_audit_event_destinations
+ operation_name :fix_instance_external_audit_destinations_migration
feature_category :audit_events
- scope_to ->(relation) { relation.where(stream_destination_id: nil) }
+
+ scope_to ->(relation) { relation }
end
override :perform
@@ -53,51 +76,62 @@ def perform
def process_batch(sub_batch)
sub_batch.each do |legacy_destination|
- ApplicationRecord.transaction do
- destination = create_streaming_destination(legacy_destination)
- next unless destination
-
- copy_event_type_filters(legacy_destination, destination)
- copy_namespace_filter(legacy_destination, destination)
- update_legacy_record(legacy_destination, destination)
+ ::ApplicationRecord.transaction do
+ if legacy_destination.stream_destination_id.present?
+ sync_migrated_record(legacy_destination)
+ else
+ migrate_new_record(legacy_destination)
+ end
end
end
end
- def create_streaming_destination(legacy_destination)
- encrypted_token, encrypted_iv = AuditEventsExternalAuditEventDestination
- .where(id: legacy_destination.id)
- .pick(:encrypted_verification_token, :encrypted_verification_token_iv)
+ def sync_migrated_record(legacy_destination)
+ destination = InstanceStreamingDestination.find_by(id: legacy_destination.stream_destination_id)
+ return unless destination
- return unless encrypted_token.present?
+ sync_custom_headers(legacy_destination, destination)
+ sync_event_type_filters(legacy_destination, destination)
+ sync_namespace_filters(legacy_destination, destination)
+ end
- token_for_config = get_verification_token(legacy_destination.id)
- return unless token_for_config
+ def migrate_new_record(legacy_destination)
+ token = decrypt_verification_token(legacy_destination)
+ return unless token
- destination = InstanceExternalStreamingDestination.new(
+ destination = InstanceStreamingDestination.new(
name: legacy_destination.name,
category: :http,
- config: build_config(legacy_destination, token_for_config),
+ config: build_config(legacy_destination, token),
legacy_destination_ref: legacy_destination.id,
created_at: legacy_destination.created_at,
updated_at: legacy_destination.updated_at
)
- destination.encrypted_secret_token = encrypted_token
- destination.encrypted_secret_token_iv = encrypted_iv
+ destination.secret_token = token
- destination.save!(validate: false)
- destination
+ return unless destination.save
+
+ legacy_destination.update_column(:stream_destination_id, destination.id)
+
+ sync_event_type_filters(legacy_destination, destination)
+ sync_namespace_filters(legacy_destination, destination)
end
- def get_verification_token(destination_id)
- model_class = ::AuditEvents::InstanceExternalAuditEventDestination
- legacy_model = model_class.find_by(id: destination_id)
- legacy_model&.verification_token
+ def decrypt_verification_token(legacy_destination)
+ return unless legacy_destination.encrypted_verification_token.present? &&
+ legacy_destination.encrypted_verification_token_iv.present?
+
+ ::Gitlab::CryptoHelper.aes256_gcm_decrypt(
+ legacy_destination.encrypted_verification_token,
+ nonce: legacy_destination.encrypted_verification_token_iv
+ )
+ rescue OpenSSL::Cipher::CipherError
+ nil
end
def build_config(legacy_destination, token)
- headers = StreamingHeader.where(
+ headers = LegacyInstanceHeader.where(
instance_external_audit_event_destination_id: legacy_destination.id
).pluck(:key, :value, :active)
@@ -121,14 +155,42 @@ def build_config(legacy_destination, token)
}
end
- def copy_event_type_filters(source, destination)
- filters = StreamingEventTypeFilter.where(
- instance_external_audit_event_destination_id: source.id
- ).pluck(:audit_event_type, :created_at, :updated_at)
+ def sync_custom_headers(legacy_destination, destination)
+ headers = LegacyInstanceHeader.where(
+ instance_external_audit_event_destination_id: legacy_destination.id
+ ).pluck(:key, :value, :active)
+
+ return if headers.empty?
- return if filters.empty?
+ config = destination.config.deep_dup
+ config['headers'] ||= {}
+
+ headers.each do |key, value, active|
+ next if config['headers'].key?(key)
+
+ config['headers'][key] = {
+ 'value' => value,
+ 'active' => active
+ }
+ end
- attributes = filters.map do |audit_event_type, created_at, updated_at|
+ destination.update!(config: config)
+ end
+
+ def sync_event_type_filters(legacy_destination, destination)
+ existing_filters = InstanceEventTypeFilter
+ .where(external_streaming_destination_id: destination.id)
+ .pluck(:audit_event_type)
+
+ legacy_filters = LegacyInstanceEventTypeFilter
+ .where(instance_external_audit_event_destination_id: legacy_destination.id)
+ .pluck(:audit_event_type, :created_at, :updated_at)
+
+ missing_filters = legacy_filters.reject { |filter| existing_filters.include?(filter[0]) }
+
+ return if missing_filters.empty?
+
+ attributes = missing_filters.map do |audit_event_type, created_at, updated_at|
{
audit_event_type: audit_event_type,
created_at: created_at,
@@ -140,23 +202,27 @@ def copy_event_type_filters(source, destination)
InstanceEventTypeFilter.insert_all!(attributes)
end
- def copy_namespace_filter(source, destination)
- filter = StreamingHttpInstanceNamespaceFilter.find_by(
- audit_events_instance_external_audit_event_destination_id: source.id
- )
+ def sync_namespace_filters(legacy_destination, destination)
+ existing_filter_namespace_ids = InstanceNamespaceFilter
+ .where(external_streaming_destination_id: destination.id)
+ .pluck(:namespace_id)
- return unless filter
+ legacy_filters = LegacyInstanceNamespaceFilter
+ .where(audit_events_instance_external_audit_event_destination_id: legacy_destination.id)
+ .where.not(namespace_id: existing_filter_namespace_ids)
- InstanceNamespaceFilter.create!(
- namespace_id: filter.namespace_id,
- external_streaming_destination_id: destination.id,
- created_at: filter.created_at,
- updated_at: filter.updated_at
- )
- end
+ return if legacy_filters.empty?
+
+ attributes = legacy_filters.map do |filter|
+ {
+ namespace_id: filter.namespace_id,
+ external_streaming_destination_id: destination.id,
+ created_at: filter.created_at,
+ updated_at: filter.updated_at
+ }
+ end
- def update_legacy_record(source, destination)
- source.update!(stream_destination_id: destination.id)
+ InstanceNamespaceFilter.insert_all!(attributes)
end
end
end
Query Plans
- Update stream_destination_id for legacy destination: https://console.postgres.ai/gitlab/gitlab-production-main/sessions/39274/commands/120879
- Update config for stream_destination: https://console.postgres.ai/gitlab/gitlab-production-main/sessions/39274/commands/120880
- Inserting bulk event type filters: https://console.postgres.ai/gitlab/gitlab-production-main/sessions/39274/commands/120881
References
Screenshots or screen recordings
| Before | After |
|---|---|
How to set up and validate locally
MR acceptance checklist
Evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.
Edited by Andrew Jung