Connect audit event streams from new tables

What does this MR do and why?

  • Pass the audit event model's class name to the streaming worker to find the audit event proeprly
  • Pass new audit event models if feature flag is turned on
  • Override the log_events_and_stream in the auditor class as checking for entity is only available on the ee model

The audit events table is being split into four tables:

  1. user_audit_events
  2. project_audit_events
  3. group_audit_events
  4. instance_audit_events

We should still be able to stream audit events from these new tables: https://docs.gitlab.com/administration/compliance/audit_event_streaming/

Connect audit event streams from new tables

Changelog: added EE: true

References

Please include cross links to any resources that are relevant to this MR. This will give reviewers and future readers helpful context to give an efficient review of the changes introduced.

MR acceptance checklist

Please evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.

Screenshots or screen recordings

Screenshots are required for UI changes, and strongly recommended for all other merge requests.

Before After

How to set up and validate locally

Numbered steps to set up and validate the change are strongly suggested.

  1. Create a HTTP streaming destination (instance or group level): https://docs.gitlab.com/administration/compliance/audit_event_streaming/
  2. Use https://webhook.site/ or equivalent website to have a HTTP destination
  3. Enable the stream_audit_events_from_new_tables feature flag
  4. Events should stream to your destination with / without the feature

Or use this script to see what it does:

# Test script for running in Rails console to verify AuditEventStreamingWorker behavior

# 1. Enable feature flag if not already enabled
Feature.enable(:stream_audit_events_from_new_tables)
ActiveRecord::Base.logger = nil

# 2. Create worker instance
worker = AuditEvents::AuditEventStreamingWorker.new
puts "Created worker instance: #{worker.class.name}"

# 3. Helper method to create audit event JSON payloads with only the needed attributes
def create_test_event_json(entity_type, entity_id)
  base_attrs = {
    author_id: User.first.id,
    created_at: Time.current,
    details: { custom_message: "Test #{entity_type} audit event" }
  }
  
  case entity_type.to_s.downcase
  when 'group'
    base_attrs.merge(group_id: entity_id)
  when 'project'
    base_attrs.merge(project_id: entity_id)
  when 'user'
    base_attrs.merge(user_id: entity_id)
  when 'instance'
    base_attrs
  end.to_json
end

# 4. Helper to test direct model instantiation (without the worker)
def test_direct_model_creation(entity_type, entity_id)
  puts "\n== Testing Direct Model Creation for #{entity_type.capitalize} =="
  
  base_attrs = {
    author_id: User.first.id,
    created_at: Time.current,
    details: { custom_message: "Test #{entity_type} audit event" }
  }
  
  begin
    model = case entity_type.to_s.downcase
    when 'group'
      AuditEvents::GroupAuditEvent.new(base_attrs.merge(group_id: entity_id))
    when 'project'
      AuditEvents::ProjectAuditEvent.new(base_attrs.merge(project_id: entity_id))
    when 'user'
      AuditEvents::UserAuditEvent.new(base_attrs.merge(user_id: entity_id))
    when 'instance'
      AuditEvents::InstanceAuditEvent.new(base_attrs)
    end
    
    puts "✅ Successfully created model directly:"
    puts "- Class: #{model.class.name}"
    puts "- Valid: #{model.valid?}"
    puts "- Errors: #{model.errors.full_messages}" unless model.valid?
    model
  rescue => e
    puts "❌ Error directly creating #{entity_type} model: #{e.message}"
    nil
  end
end

# 5. Test worker's perform method with JSON
def test_worker_perform_with_json(worker, entity_type, entity_id)
  puts "\n== Testing Worker's perform with JSON for #{entity_type.capitalize} =="
  
  json = create_test_event_json(entity_type, entity_id)
  
  begin
    # Print the json to debug what we're sending
    puts "JSON being sent to worker:"
    pp Gitlab::Json.parse(json)
    
    # Note: This will make real external calls if configured to do so
    puts "⚠️ Warning: This test will make real external calls if external streaming is configured"
    
    # Call the worker's perform method
    result = worker.perform('create', nil, json)
    
    puts "✅ Successfully performed worker action with JSON"
    result
  rescue => e
    puts "❌ Error in worker perform with JSON for #{entity_type}: #{e.message}"
    puts e.backtrace.take(5)
    nil
  end
end

# 6. Test worker's perform method with ID
def test_worker_perform_with_id(worker, entity_type, entity_id, model_class)
  puts "\n== Testing Worker's perform with ID for #{entity_type.capitalize} =="
  
  # First create and save an audit event to get an ID
  model = nil
  
  begin
    base_attrs = {
      author_id: User.first.id,
      created_at: Time.current,
      details: { custom_message: "Test #{entity_type} audit event for ID test" }
    }
    
    model = case entity_type.to_s.downcase
    when 'group'
      AuditEvents::GroupAuditEvent.new(base_attrs.merge(group_id: entity_id))
    when 'project'
      AuditEvents::ProjectAuditEvent.new(base_attrs.merge(project_id: entity_id))
    when 'user'
      AuditEvents::UserAuditEvent.new(base_attrs.merge(user_id: entity_id))
    when 'instance'
      AuditEvents::InstanceAuditEvent.new(base_attrs)
    end
    
    if model.save
      puts "Created test audit event with ID: #{model.id}"
      
      # Note: This will make real external calls if configured to do so
      puts "⚠️ Warning: This test will make real external calls if external streaming is configured"
      
      # Call the worker's perform method with ID
      result = worker.perform('update', model.id, nil, model_class)
      
      puts "✅ Successfully performed worker action with ID"
      result
    else
      puts "❌ Could not save model for ID test: #{model.errors.full_messages}"
      nil
    end
  rescue => e
    puts "❌ Error in worker perform with ID for #{entity_type}: #{e.message}"
    puts e.backtrace.take(5)
    nil
  end
end

# 7. Test AuditEvents::Processor public methods
def test_processor_methods(entity_type, entity_id)
  puts "\n== Testing AuditEvents::Processor for #{entity_type.capitalize} =="
  
  json = create_test_event_json(entity_type, entity_id)
  
  begin
    # Test fetch_from_json
    puts "\nTesting fetch_from_json:"
    result = AuditEvents::Processor.fetch_from_json(json)
    
    if result
      puts "✅ Successfully processed with fetch_from_json:"
      puts "- Class: #{result.class.name}"
      case entity_type.to_s.downcase
      when 'group'
        puts "- Group ID: #{result.group_id}" if result.respond_to?(:group_id)
      when 'project'
        puts "- Project ID: #{result.project_id}" if result.respond_to?(:project_id)
      when 'user'
        puts "- User ID: #{result.user_id}" if result.respond_to?(:user_id)
      when 'instance'
        puts "- Type: Instance Audit Event"
      end
    else
      puts "❌ fetch_from_json returned nil"
    end
    
    # Test determine_audit_model_entity
    puts "\nTesting determine_audit_model_entity:"
    parsed_json = Gitlab::Json.parse(json).with_indifferent_access
    model_class, entity = AuditEvents::Processor.determine_audit_model_entity(parsed_json)
    
    puts "✅ determine_audit_model_entity results:"
    puts "- Model Class: #{model_class}"
    puts "- Entity: #{entity.class.name}" if entity.is_a?(ApplicationRecord)
    puts "- Entity: #{entity}" unless entity.is_a?(ApplicationRecord)
    
    # Test create_scoped_audit_event
    puts "\nTesting create_scoped_audit_event:"
    scoped_event = AuditEvents::Processor.create_scoped_audit_event(model_class, parsed_json)
    
    puts "✅ create_scoped_audit_event result:"
    puts "- Class: #{scoped_event.class.name}"
    case entity_type.to_s.downcase
    when 'group'
      puts "- Group ID: #{scoped_event.group_id}" if scoped_event.respond_to?(:group_id)
    when 'project'
      puts "- Project ID: #{scoped_event.project_id}" if scoped_event.respond_to?(:project_id)
    when 'user'
      puts "- User ID: #{scoped_event.user_id}" if scoped_event.respond_to?(:user_id)
    when 'instance'
      puts "- Type: Instance Audit Event"
    end
    
    # Test primary fetch method
    puts "\nTesting primary fetch method:"
    
    # Test with JSON
    json_result = AuditEvents::Processor.fetch(audit_event_json: json)
    puts "✅ fetch with JSON result:"
    puts "- Class: #{json_result.class.name}" if json_result
    puts "- Returned nil" unless json_result
    
    # Test with ID (if we have a saved instance)
    if scoped_event.respond_to?(:save) && scoped_event.save
      id_result = AuditEvents::Processor.fetch(audit_event_id: scoped_event.id, model_class: model_class.name)
      puts "✅ fetch with ID result:"
      puts "- Class: #{id_result.class.name}" if id_result
      puts "- ID: #{id_result.id}" if id_result
      puts "- Returned nil" unless id_result
    else
      puts "⚠️ Could not test fetch with ID because model couldn't be saved"
    end
    
  rescue => e
    puts "❌ Error testing Processor methods for #{entity_type}: #{e.message}"
    puts e.backtrace.take(5)
    nil
  end
end

# 8. Find example entities (adjust IDs as needed for your environment)
group_id = Group.first&.id
project_id = Project.first&.id
user_id = User.first&.id

puts "\n=== Starting Tests ==="

# 9. Test direct model creation first (to verify model schemas)
test_direct_model_creation('group', group_id) if group_id
test_direct_model_creation('project', project_id) if project_id
test_direct_model_creation('user', user_id) if user_id
test_direct_model_creation('instance', nil)

# 10. Test the Processor's methods
test_processor_methods('group', group_id) if group_id
test_processor_methods('project', project_id) if project_id
test_processor_methods('user', user_id) if user_id
test_processor_methods('instance', nil)

# 11. Test the worker's perform method with JSON
test_worker_perform_with_json(worker, 'group', group_id) if group_id
test_worker_perform_with_json(worker, 'project', project_id) if project_id
test_worker_perform_with_json(worker, 'user', user_id) if user_id
test_worker_perform_with_json(worker, 'instance', nil)

# 12. Test the worker's perform method with ID
test_worker_perform_with_id(worker, 'group', group_id, "AuditEvents::GroupAuditEvent") if group_id
test_worker_perform_with_id(worker, 'project', project_id, "AuditEvents::ProjectAuditEvent") if project_id
test_worker_perform_with_id(worker, 'user', user_id, "AuditEvents::UserAuditEvent") if user_id
test_worker_perform_with_id(worker, 'instance', nil, "AuditEvents::InstanceAuditEvent")

puts "\n=== Tests Completed ==="
Edited by Andrew Jung

Merge request reports

Loading