Connect audit event streams from new tables
What does this MR do and why?
- Pass the audit event model's class name to the streaming worker to find the audit event proeprly
- Pass new audit event models if feature flag is turned on
- Override the
log_events_and_streamin the auditor class as checking forentityis only available on theeemodel
The audit events table is being split into four tables:
- user_audit_events
- project_audit_events
- group_audit_events
- instance_audit_events
We should still be able to stream audit events from these new tables: https://docs.gitlab.com/administration/compliance/audit_event_streaming/
Connect audit event streams from new tables
Changelog: added EE: true
References
Please include cross links to any resources that are relevant to this MR. This will give reviewers and future readers helpful context to give an efficient review of the changes introduced.
MR acceptance checklist
Please evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.
Screenshots or screen recordings
Screenshots are required for UI changes, and strongly recommended for all other merge requests.
| Before | After |
|---|---|
How to set up and validate locally
Numbered steps to set up and validate the change are strongly suggested.
- Create a HTTP streaming destination (instance or group level): https://docs.gitlab.com/administration/compliance/audit_event_streaming/
- Use https://webhook.site/ or equivalent website to have a HTTP destination
- Enable the
stream_audit_events_from_new_tablesfeature flag - Events should stream to your destination with / without the feature
Or use this script to see what it does:
# Test script for running in Rails console to verify AuditEventStreamingWorker behavior
# 1. Enable feature flag if not already enabled
Feature.enable(:stream_audit_events_from_new_tables)
ActiveRecord::Base.logger = nil
# 2. Create worker instance
worker = AuditEvents::AuditEventStreamingWorker.new
puts "Created worker instance: #{worker.class.name}"
# 3. Helper method to create audit event JSON payloads with only the needed attributes
def create_test_event_json(entity_type, entity_id)
base_attrs = {
author_id: User.first.id,
created_at: Time.current,
details: { custom_message: "Test #{entity_type} audit event" }
}
case entity_type.to_s.downcase
when 'group'
base_attrs.merge(group_id: entity_id)
when 'project'
base_attrs.merge(project_id: entity_id)
when 'user'
base_attrs.merge(user_id: entity_id)
when 'instance'
base_attrs
end.to_json
end
# 4. Helper to test direct model instantiation (without the worker)
def test_direct_model_creation(entity_type, entity_id)
puts "\n== Testing Direct Model Creation for #{entity_type.capitalize} =="
base_attrs = {
author_id: User.first.id,
created_at: Time.current,
details: { custom_message: "Test #{entity_type} audit event" }
}
begin
model = case entity_type.to_s.downcase
when 'group'
AuditEvents::GroupAuditEvent.new(base_attrs.merge(group_id: entity_id))
when 'project'
AuditEvents::ProjectAuditEvent.new(base_attrs.merge(project_id: entity_id))
when 'user'
AuditEvents::UserAuditEvent.new(base_attrs.merge(user_id: entity_id))
when 'instance'
AuditEvents::InstanceAuditEvent.new(base_attrs)
end
puts "✅ Successfully created model directly:"
puts "- Class: #{model.class.name}"
puts "- Valid: #{model.valid?}"
puts "- Errors: #{model.errors.full_messages}" unless model.valid?
model
rescue => e
puts "❌ Error directly creating #{entity_type} model: #{e.message}"
nil
end
end
# 5. Test worker's perform method with JSON
def test_worker_perform_with_json(worker, entity_type, entity_id)
puts "\n== Testing Worker's perform with JSON for #{entity_type.capitalize} =="
json = create_test_event_json(entity_type, entity_id)
begin
# Print the json to debug what we're sending
puts "JSON being sent to worker:"
pp Gitlab::Json.parse(json)
# Note: This will make real external calls if configured to do so
puts "⚠️ Warning: This test will make real external calls if external streaming is configured"
# Call the worker's perform method
result = worker.perform('create', nil, json)
puts "✅ Successfully performed worker action with JSON"
result
rescue => e
puts "❌ Error in worker perform with JSON for #{entity_type}: #{e.message}"
puts e.backtrace.take(5)
nil
end
end
# 6. Test worker's perform method with ID
def test_worker_perform_with_id(worker, entity_type, entity_id, model_class)
puts "\n== Testing Worker's perform with ID for #{entity_type.capitalize} =="
# First create and save an audit event to get an ID
model = nil
begin
base_attrs = {
author_id: User.first.id,
created_at: Time.current,
details: { custom_message: "Test #{entity_type} audit event for ID test" }
}
model = case entity_type.to_s.downcase
when 'group'
AuditEvents::GroupAuditEvent.new(base_attrs.merge(group_id: entity_id))
when 'project'
AuditEvents::ProjectAuditEvent.new(base_attrs.merge(project_id: entity_id))
when 'user'
AuditEvents::UserAuditEvent.new(base_attrs.merge(user_id: entity_id))
when 'instance'
AuditEvents::InstanceAuditEvent.new(base_attrs)
end
if model.save
puts "Created test audit event with ID: #{model.id}"
# Note: This will make real external calls if configured to do so
puts "⚠️ Warning: This test will make real external calls if external streaming is configured"
# Call the worker's perform method with ID
result = worker.perform('update', model.id, nil, model_class)
puts "✅ Successfully performed worker action with ID"
result
else
puts "❌ Could not save model for ID test: #{model.errors.full_messages}"
nil
end
rescue => e
puts "❌ Error in worker perform with ID for #{entity_type}: #{e.message}"
puts e.backtrace.take(5)
nil
end
end
# 7. Test AuditEvents::Processor public methods
def test_processor_methods(entity_type, entity_id)
puts "\n== Testing AuditEvents::Processor for #{entity_type.capitalize} =="
json = create_test_event_json(entity_type, entity_id)
begin
# Test fetch_from_json
puts "\nTesting fetch_from_json:"
result = AuditEvents::Processor.fetch_from_json(json)
if result
puts "✅ Successfully processed with fetch_from_json:"
puts "- Class: #{result.class.name}"
case entity_type.to_s.downcase
when 'group'
puts "- Group ID: #{result.group_id}" if result.respond_to?(:group_id)
when 'project'
puts "- Project ID: #{result.project_id}" if result.respond_to?(:project_id)
when 'user'
puts "- User ID: #{result.user_id}" if result.respond_to?(:user_id)
when 'instance'
puts "- Type: Instance Audit Event"
end
else
puts "❌ fetch_from_json returned nil"
end
# Test determine_audit_model_entity
puts "\nTesting determine_audit_model_entity:"
parsed_json = Gitlab::Json.parse(json).with_indifferent_access
model_class, entity = AuditEvents::Processor.determine_audit_model_entity(parsed_json)
puts "✅ determine_audit_model_entity results:"
puts "- Model Class: #{model_class}"
puts "- Entity: #{entity.class.name}" if entity.is_a?(ApplicationRecord)
puts "- Entity: #{entity}" unless entity.is_a?(ApplicationRecord)
# Test create_scoped_audit_event
puts "\nTesting create_scoped_audit_event:"
scoped_event = AuditEvents::Processor.create_scoped_audit_event(model_class, parsed_json)
puts "✅ create_scoped_audit_event result:"
puts "- Class: #{scoped_event.class.name}"
case entity_type.to_s.downcase
when 'group'
puts "- Group ID: #{scoped_event.group_id}" if scoped_event.respond_to?(:group_id)
when 'project'
puts "- Project ID: #{scoped_event.project_id}" if scoped_event.respond_to?(:project_id)
when 'user'
puts "- User ID: #{scoped_event.user_id}" if scoped_event.respond_to?(:user_id)
when 'instance'
puts "- Type: Instance Audit Event"
end
# Test primary fetch method
puts "\nTesting primary fetch method:"
# Test with JSON
json_result = AuditEvents::Processor.fetch(audit_event_json: json)
puts "✅ fetch with JSON result:"
puts "- Class: #{json_result.class.name}" if json_result
puts "- Returned nil" unless json_result
# Test with ID (if we have a saved instance)
if scoped_event.respond_to?(:save) && scoped_event.save
id_result = AuditEvents::Processor.fetch(audit_event_id: scoped_event.id, model_class: model_class.name)
puts "✅ fetch with ID result:"
puts "- Class: #{id_result.class.name}" if id_result
puts "- ID: #{id_result.id}" if id_result
puts "- Returned nil" unless id_result
else
puts "⚠️ Could not test fetch with ID because model couldn't be saved"
end
rescue => e
puts "❌ Error testing Processor methods for #{entity_type}: #{e.message}"
puts e.backtrace.take(5)
nil
end
end
# 8. Find example entities (adjust IDs as needed for your environment)
group_id = Group.first&.id
project_id = Project.first&.id
user_id = User.first&.id
puts "\n=== Starting Tests ==="
# 9. Test direct model creation first (to verify model schemas)
test_direct_model_creation('group', group_id) if group_id
test_direct_model_creation('project', project_id) if project_id
test_direct_model_creation('user', user_id) if user_id
test_direct_model_creation('instance', nil)
# 10. Test the Processor's methods
test_processor_methods('group', group_id) if group_id
test_processor_methods('project', project_id) if project_id
test_processor_methods('user', user_id) if user_id
test_processor_methods('instance', nil)
# 11. Test the worker's perform method with JSON
test_worker_perform_with_json(worker, 'group', group_id) if group_id
test_worker_perform_with_json(worker, 'project', project_id) if project_id
test_worker_perform_with_json(worker, 'user', user_id) if user_id
test_worker_perform_with_json(worker, 'instance', nil)
# 12. Test the worker's perform method with ID
test_worker_perform_with_id(worker, 'group', group_id, "AuditEvents::GroupAuditEvent") if group_id
test_worker_perform_with_id(worker, 'project', project_id, "AuditEvents::ProjectAuditEvent") if project_id
test_worker_perform_with_id(worker, 'user', user_id, "AuditEvents::UserAuditEvent") if user_id
test_worker_perform_with_id(worker, 'instance', nil, "AuditEvents::InstanceAuditEvent")
puts "\n=== Tests Completed ==="
Edited by Andrew Jung