Skip to content
Snippets Groups Projects
Verified Commit eb6be9bb authored by Harsimar Sandhu's avatar Harsimar Sandhu :three: Committed by GitLab
Browse files

Remove sync_audit_events_to_clickhouse feature flag

For now we are splitting table of audit events into 4 new tables
to support cells architecture, so clickhouse migration is blocked
therefore we can remove the feature flag along with the code behind it

Changelog: removed
parent 794ec884
No related branches found
No related tags found
3 merge requests!181325Fix ambiguous `created_at` in project.rb,!179611Draft: Rebase CR approach for zoekt assignments,!175370Remove sync_audit_events_to_clickhouse feature flag
Showing
with 22 additions and 354 deletions
......@@ -1430,7 +1430,6 @@ Gitlab/BoundedContexts:
- 'app/services/captcha/captcha_verification_service.rb'
- 'app/services/chat_names/authorize_user_service.rb'
- 'app/services/chat_names/find_user_service.rb'
- 'app/services/click_house/sync_strategies/audit_event_sync_strategy.rb'
- 'app/services/click_house/sync_strategies/base_sync_strategy.rb'
- 'app/services/cloud_seed/google_cloud/base_service.rb'
- 'app/services/cloud_seed/google_cloud/create_cloudsql_instance_service.rb'
......@@ -1893,8 +1892,6 @@ Gitlab/BoundedContexts:
- 'app/workers/chaos/sleep_worker.rb'
- 'app/workers/chat_notification_worker.rb'
- 'app/workers/cleanup_container_repository_worker.rb'
- 'app/workers/click_house/audit_event_partition_sync_worker.rb'
- 'app/workers/click_house/audit_events_sync_worker.rb'
- 'app/workers/click_house/concerns/consistency_worker.rb'
- 'app/workers/cluster_configure_istio_worker.rb'
- 'app/workers/cluster_install_app_worker.rb'
......
......@@ -47,7 +47,6 @@ Gitlab/FeatureFlagWithoutActor:
- 'app/serializers/web_ide_terminal_entity.rb'
- 'app/services/ci/job_artifacts/update_unknown_locked_status_service.rb'
- 'app/services/ci/register_job_service.rb'
- 'app/services/click_house/sync_strategies/audit_event_sync_strategy.rb'
- 'app/services/concerns/measurable.rb'
- 'app/services/groups/participants_service.rb'
- 'app/services/integrations/propagation/bulk_create_service.rb'
......@@ -71,7 +70,6 @@ Gitlab/FeatureFlagWithoutActor:
- 'app/workers/bulk_imports/pipeline_batch_worker.rb'
- 'app/workers/bulk_imports/pipeline_worker.rb'
- 'app/workers/ci/unlock_pipelines_in_queue_worker.rb'
- 'app/workers/click_house/audit_events_sync_worker.rb'
- 'app/workers/concerns/worker_attributes.rb'
- 'app/workers/container_registry/cleanup_worker.rb'
- 'app/workers/container_registry/record_data_repair_detail_worker.rb'
......
# frozen_string_literal: true
module ClickHouse
module SyncStrategies
class AuditEventSyncStrategy < BaseSyncStrategy
def execute(identifier)
@serialized_model = build_serialized_class(identifier)
@non_serialized_model = build_non_serialized_class(identifier)
super()
end
private
def build_serialized_class(identifier)
Class.new(ApplicationRecord) do
self.table_name = identifier
include EachBatch
self.primary_key = :id
serialize :details, Hash
end
end
def build_non_serialized_class(identifier)
Class.new(ApplicationRecord) do
self.table_name = identifier
include EachBatch
self.primary_key = :id
attr_accessor :casted_created_at
end
end
def model_class
@serialized_model
end
def enabled?
super && Feature.enabled?(:sync_audit_events_to_clickhouse, type: :gitlab_com_derisk)
end
def transform_row(row)
convert_to_non_serialized_model(row)
end
def convert_to_non_serialized_model(serialized_model)
non_serialized_model = @non_serialized_model.new(serialized_model.attributes)
non_serialized_model.details = serialized_model.details.to_json
non_serialized_model
end
def csv_mapping
{
id: :id,
author_id: :author_id,
author_name: :author_name,
details: :details,
entity_id: :entity_id,
entity_path: :entity_path,
entity_type: :entity_type,
ip_address: :ip_address,
target_details: :target_details,
target_id: :target_id,
target_type: :target_type,
created_at: :casted_created_at
}
end
def projections
[
:id,
:author_id,
:author_name,
:details,
:entity_id,
:entity_path,
:entity_type,
:ip_address,
:target_details,
:target_id,
:target_type,
'EXTRACT(epoch FROM created_at) AS casted_created_at'
]
end
def insert_query
<<~SQL.squish
INSERT INTO audit_events (#{csv_mapping.keys.join(', ')})
SETTINGS async_insert=1, wait_for_async_insert=1 FORMAT CSV
SQL
end
end
end
end
......@@ -390,15 +390,6 @@
:weight: 1
:idempotent: false
:tags: []
- :name: cronjob:click_house_audit_events_sync
:worker_name: ClickHouse::AuditEventsSyncWorker
:feature_category: :compliance_management
:has_external_dependencies: true
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:concurrency_limit_resume
:worker_name: ConcurrencyLimit::ResumeWorker
:feature_category: :scalability
......@@ -3000,15 +2991,6 @@
:weight: 1
:idempotent: true
:tags: []
- :name: click_house_audit_event_partition_sync
:worker_name: ClickHouse::AuditEventPartitionSyncWorker
:feature_category: :compliance_management
:has_external_dependencies: true
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: counters_cleanup_refresh
:worker_name: Counters::CleanupRefreshWorker
:feature_category: :not_owned
......
# frozen_string_literal: true
module ClickHouse
class AuditEventPartitionSyncWorker
include ApplicationWorker
include ClickHouseWorker
idempotent!
data_consistency :delayed
feature_category :compliance_management
deduplicate :until_executed, including_scheduled: true # The second job can be skipped if first job hasn't run yet.
def perform(identifier)
result = ::ClickHouse::SyncStrategies::AuditEventSyncStrategy.new.execute(identifier)
log_extra_metadata_on_done(:result, result)
end
end
end
# frozen_string_literal: true
module ClickHouse
class AuditEventsSyncWorker
include ApplicationWorker
include ClickHouseWorker
idempotent!
queue_namespace :cronjob
data_consistency :delayed
feature_category :compliance_management
def perform
return unless enabled?
partition_identifiers.each do |identifier|
::ClickHouse::AuditEventPartitionSyncWorker.perform_async(identifier)
end
end
private
def partition_identifiers
::Gitlab::Database::PostgresPartition.for_parent_table(:audit_events).map(&:identifier)
end
def enabled?
Gitlab::ClickHouse.configured? && Feature.enabled?(:sync_audit_events_to_clickhouse,
type: :gitlab_com_derisk)
end
end
end
---
name: sync_audit_events_to_clickhouse
feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/404405
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/140924
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/427738
milestone: '16.9'
group: group::compliance
type: gitlab_com_derisk
default_enabled: false
......@@ -982,9 +982,6 @@
Settings.cron_jobs['gitlab_subscriptions_schedule_refresh_seats_worker'] ||= {}
Settings.cron_jobs['gitlab_subscriptions_schedule_refresh_seats_worker']['cron'] ||= "0 */6 * * *"
Settings.cron_jobs['gitlab_subscriptions_schedule_refresh_seats_worker']['job_class'] = 'GitlabSubscriptions::ScheduleRefreshSeatsWorker'
Settings.cron_jobs['click_house_audit_events_sync_worker'] ||= {}
Settings.cron_jobs['click_house_audit_events_sync_worker']['cron'] ||= "*/3 * * * *"
Settings.cron_jobs['click_house_audit_events_sync_worker']['job_class'] = 'ClickHouse::AuditEventsSyncWorker'
Settings.cron_jobs['namespaces_schedule_dormant_member_removal_worker'] ||= {}
Settings.cron_jobs['namespaces_schedule_dormant_member_removal_worker']['cron'] ||= "0 */6 * * *"
Settings.cron_jobs['namespaces_schedule_dormant_member_removal_worker']['job_class'] = 'Namespaces::ScheduleDormantMemberRemoval'
......
......@@ -203,8 +203,6 @@
- 1
- - ci_user_cancel_pipeline
- 1
- - click_house_audit_event_partition_sync
- 1
- - click_house_buffer_sync
- 1
- - click_house_ci_finished_builds_sync
......
# frozen_string_literal: true
class RemoveAuditEventSyncWorkerJobs < Gitlab::Database::Migration[2.2]
milestone '17.8'
disable_ddl_transaction!
DEPRECATED_JOB_CLASSES = %w[
::ClickHouse::AuditEventsSyncWorker
::ClickHouse::AuditEventPartitionSyncWorker
].freeze
def up
# Removes scheduled instances from Sidekiq queues
sidekiq_remove_jobs(job_klasses: DEPRECATED_JOB_CLASSES)
end
def down
# This migration removes any instances of deprecated workers and cannot be undone.
end
end
13737980c435158408a40624e5bd2ccb181071634b15d506a39ffac33aa26186
\ No newline at end of file
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ClickHouse::SyncStrategies::AuditEventSyncStrategy, '#execute', :click_house, feature_category: :compliance_management do
let(:strategy) { described_class.new }
let_it_be(:group_audit_event) { create(:audit_event, :group_event) }
let_it_be(:group_audit_event_2) { create(:audit_event, :group_event) }
let_it_be(:project_audit_event) { create(:audit_event, :project_event) }
let_it_be(:project_audit_event_2) { create(:audit_event, :project_event) }
subject(:execute) { strategy.execute(:audit_events) }
it 'inserts all records' do
expect(execute).to eq({ status: :processed, records_inserted: 4, reached_end_of_table: true })
expected_records = [
an_audit_event_sync_model(group_audit_event),
an_audit_event_sync_model(group_audit_event_2),
an_audit_event_sync_model(project_audit_event),
an_audit_event_sync_model(project_audit_event_2)
]
audit_events = ClickHouse::Client.select('SELECT * FROM audit_events FINAL ORDER BY id', :main)
expect(audit_events).to match(expected_records)
last_processed_id = ClickHouse::SyncCursor.cursor_for(:audit_events)
expect(last_processed_id).to eq(project_audit_event_2.id)
end
context 'when the feature flag is disabled' do
before do
stub_feature_flags(sync_audit_events_to_clickhouse: false)
end
it 'inserts no records' do
expect(execute).to eq({ status: :disabled })
audit_events = ClickHouse::Client.select('SELECT * FROM audit_events FINAL ORDER BY id', :main)
expect(audit_events).to be_empty
end
end
context 'when the clickhouse database is not configured' do
before do
allow(Gitlab::ClickHouse).to receive(:configured?).and_return(false)
end
it 'inserts no records' do
expect(execute).to eq({ status: :disabled })
audit_events = ClickHouse::Client.select('SELECT * FROM audit_events FINAL ORDER BY id', :main)
expect(audit_events).to be_empty
end
end
def an_audit_event_sync_model(audit_event)
hash_including(
'id' => audit_event.id,
'entity_type' => audit_event.entity_type,
'entity_id' => audit_event.entity_id.to_s,
'author_id' => audit_event.author_id.to_s,
'target_id' => audit_event.target_id.to_s,
'target_type' => audit_event.target_type,
'entity_path' => audit_event.entity_path,
'target_details' => audit_event.target_details,
'ip_address' => audit_event.ip_address,
'details' => audit_event.details.to_json,
'created_at' => a_value_within(0.01).of(audit_event.created_at))
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ClickHouse::AuditEventPartitionSyncWorker, feature_category: :compliance_management do
let(:worker) { described_class.new }
subject(:perform) { worker.perform("audit_events") }
it_behaves_like 'an idempotent worker' do
let(:job_args) { [:audit_events] }
context 'when worker is enqueued' do
it 'calls ::ClickHouse::SyncStrategies::AuditEventSyncStrategy with correct args' do
expect_next_instance_of(::ClickHouse::SyncStrategies::AuditEventSyncStrategy) do |instance|
expect(instance).to receive(:execute).with("audit_events")
end
perform
end
it 'correctly logs the metadata on done' do
expect_next_instance_of(::ClickHouse::SyncStrategies::AuditEventSyncStrategy) do |instance|
expect(instance).to receive(:execute).and_return({ status: :ok })
end
expect(worker).to receive(:log_extra_metadata_on_done).with(:result, { status: :ok })
perform
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ClickHouse::AuditEventsSyncWorker, '#perform', :click_house, feature_category: :compliance_management do
let(:worker) { described_class.new }
let(:partition_identifiers) { %w[audit_event_001 audit_event_002] }
subject(:perform) { worker.perform }
it_behaves_like 'an idempotent worker' do
context 'when audit event partitions are present' do
before do
allow(worker).to receive(:partition_identifiers).and_return(partition_identifiers)
end
it 'enqueues identifiers for syncing' do
partition_identifiers.each do |identifier|
expect(::ClickHouse::AuditEventPartitionSyncWorker).to receive(:perform_async).with(identifier)
end
perform
end
context 'when no partition is present' do
before do
allow(worker).to receive(:partition_identifiers).and_return([])
end
it 'does not enqueue for syncing' do
expect(::ClickHouse::AuditEventPartitionSyncWorker).not_to receive(:perform_async)
perform
end
end
end
context 'when clickhouse is not configured' do
before do
allow(Gitlab::ClickHouse).to receive(:configured?).and_return(false)
end
it 'skips execution' do
expect(::ClickHouse::AuditEventPartitionSyncWorker).not_to receive(:perform_async)
perform
end
end
context 'when feature flag `sync_audit_events_to_clickhouse` is disabled' do
before do
stub_feature_flags(sync_audit_events_to_clickhouse: false)
end
it 'does not enqueues for syncing' do
expect(::ClickHouse::AuditEventPartitionSyncWorker).not_to receive(:perform_async)
perform
end
end
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment