Skip to content
Snippets Groups Projects
Commit f7467bbf authored by Pedro Pombeiro's avatar Pedro Pombeiro
Browse files

Merge branch 'pedropombeiro/428146/remove-duplicates-from-batches' into 'master'

Revert 400010d1 merge commit

See merge request gitlab-org/gitlab!138066



Merged-by: default avatarPedro Pombeiro <noreply@pedro.pombei.ro>
parents b3e08f29 353b8370
No related branches found
No related tags found
No related merge requests found
......@@ -18,6 +18,7 @@ def initialize(worker_index: 0, total_workers: 1)
@runtime_limiter = Analytics::CycleAnalytics::RuntimeLimiter.new(MAX_RUNTIME)
@worker_index = worker_index
@total_workers = total_workers
@processed_record_ids = Set.new
end
def execute
......@@ -49,18 +50,6 @@ def execute
private
def builds_batch_size
return BUILDS_BATCH_SIZE if Feature.enabled?(:large_finished_builds_batch_size)
BUILDS_BATCH_SIZE * 2
end
def builds_batch_count
return BUILDS_BATCH_COUNT if Feature.enabled?(:large_finished_builds_batch_size)
BUILDS_BATCH_COUNT / 2
end
def enabled?
Feature.enabled?(:ci_data_ingestion_to_click_house)
end
......@@ -77,11 +66,10 @@ def service_payload
end
def insert_new_finished_builds
# Read BUILDS_BATCH_COUNT batches of developer until the timeout in MAX_RUNTIME is reached
# Read BUILDS_BATCH_COUNT batches of BUILDS_BATCH_SIZE until the timeout in MAX_RUNTIME is reached
# We can expect a single worker to process around 2M builds/hour with a single worker,
# and a bit over 5M builds/hour with three workers (measured in prod).
@reached_end_of_table = false
@processed_record_ids = []
csv_batches.each do |csv_batch|
break unless continue?
......@@ -91,7 +79,7 @@ def insert_new_finished_builds
next if csv_builder.rows_written == 0
File.open(tempfile.path) do |f|
ClickHouse::Client.insert_csv(insert_finished_builds_query, f, :main)
ClickHouse::Client.insert_csv(INSERT_FINISHED_BUILDS_QUERY, f, :main)
end
end
end
......@@ -110,7 +98,7 @@ def insert_new_finished_builds
def csv_batches
events_batches_enumerator = Enumerator.new do |small_batches_yielder|
# Main loop to page through the events
keyset_iterator_scope.each_batch(of: builds_batch_size) { |batch| small_batches_yielder << batch }
keyset_iterator_scope.each_batch(of: BUILDS_BATCH_SIZE) { |batch| small_batches_yielder << batch }
@reached_end_of_table = true
end
......@@ -119,7 +107,7 @@ def csv_batches
while continue?
batches_yielder << Enumerator.new do |records_yielder|
# records_yielder sends rows to the CSV builder
builds_batch_count.times do
BUILDS_BATCH_COUNT.times do
break unless continue?
yield_builds(events_batches_enumerator.next, records_yielder)
......@@ -134,7 +122,7 @@ def csv_batches
def yield_builds(events_batch, records_yielder)
build_ids = events_batch.pluck(:build_id) # rubocop: disable CodeReuse/ActiveRecord
Ci::Build.id_in(build_ids)
Ci::Build.id_in(build_ids.select { |id| @processed_record_ids.exclude?(id) })
.left_outer_joins(:runner, :runner_manager)
.select(:finished_at, *finished_build_projections)
.each { |build| records_yielder << build }
......@@ -165,19 +153,10 @@ def finished_build_projections
**RUNNER_MANAGER_FIELD_NAMES.map { |n| :"runner_manager_#{n}" }.index_with { |n| n }
}.freeze
def insert_finished_builds_query
deduplicate = Feature.enabled?(:ci_deduplicate_build_ingestion_to_click_house) ? 1 : 0
<<~SQL.squish
INSERT INTO ci_finished_builds (#{CSV_MAPPING.keys.join(',')})
SETTINGS
async_insert=1,
async_insert_deduplicate=#{deduplicate},
deduplicate_blocks_in_dependent_materialized_views=#{deduplicate},
wait_for_async_insert=1
FORMAT CSV
SQL
end
INSERT_FINISHED_BUILDS_QUERY = <<~SQL.squish
INSERT INTO ci_finished_builds (#{CSV_MAPPING.keys.join(',')})
SETTINGS async_insert=1, wait_for_async_insert=1 FORMAT CSV
SQL
def keyset_iterator_scope
lower_bound = (@worker_index * BUILD_ID_PARTITIONS / @total_workers).to_i
......
......@@ -5,7 +5,7 @@ class CiFinishedBuildsSyncWorker
include ApplicationWorker
idempotent!
data_consistency :delayed, feature_flag: :load_balancing_for_ci_finished_builds_sync_worker
data_consistency :delayed
urgency :throttled
feature_category :runner_fleet
loggable_arguments 1, 2
......
---
name: ci_deduplicate_build_ingestion_to_click_house
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/135733
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/424866
milestone: '16.6'
type: development
group: group::runner
default_enabled: false
---
name: large_finished_builds_batch_size
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/137394
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/424866
milestone: '16.7'
type: development
group: group::runner
default_enabled: false
---
name: load_balancing_for_ci_finished_builds_sync_worker
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/136864
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/424866
milestone: '16.7'
type: development
group: group::runner
default_enabled: true
......@@ -21,8 +21,6 @@
before_all do
create_sync_events(*Ci::Build.finished.order(id: :desc))
stub_feature_flags(large_finished_builds_batch_size: false)
end
context 'when the ci_data_ingestion_to_click_house feature flag is on' do
......@@ -32,10 +30,7 @@
context 'when all builds fit in a single batch' do
it 'processes the builds' do
expect(ClickHouse::Client).to receive(:insert_csv)
.once
.with(a_string_including('async_insert_deduplicate=1'), an_instance_of(File), :main)
.and_call_original
expect(ClickHouse::Client).to receive(:insert_csv).once.and_call_original
expect { execute }.to change { ci_finished_builds_row_count }.by(3)
expect(execute).to have_attributes({
......@@ -57,38 +52,6 @@
)
end
context 'when :ci_deduplicate_build_ingestion_to_click_house FF is disabled' do
before do
stub_feature_flags(ci_deduplicate_build_ingestion_to_click_house: false)
end
it 'processes the builds' do
expect(ClickHouse::Client).to receive(:insert_csv)
.once
.with(a_string_including('async_insert_deduplicate=0'), an_instance_of(File), :main)
.and_call_original
expect { execute }.to change { ci_finished_builds_row_count }.by(3)
expect(execute).to have_attributes({
payload: {
reached_end_of_table: true,
records_inserted: 3,
worker_index: 0, total_workers: 1,
min_build_id: build1.id,
max_build_id: build3.id
}
})
records = ci_finished_builds
expect(records.count).to eq 3
expect(records).to contain_exactly(
a_hash_including(**expected_build_attributes(build1)),
a_hash_including(**expected_build_attributes(build2)),
a_hash_including(**expected_build_attributes(build3))
)
end
end
it 'processes only builds from Ci::FinishedBuildChSyncEvent' do
build = create(:ci_build, :failed)
......@@ -134,13 +97,16 @@
stub_const("#{described_class}::BUILDS_BATCH_SIZE", 2)
end
it 'processes the builds' do
it 'processes the builds ignoring duplicate sync events' do
expect(ClickHouse::Client).to receive(:insert_csv).once.and_call_original
expect { execute }.to change { ci_finished_builds_row_count }.by(3)
expect(execute).to have_attributes({
payload: a_hash_including(reached_end_of_table: true, records_inserted: 3)
})
Ci::FinishedBuildChSyncEvent.last.update!(processed: false) # simulate an out-of-date replica
expect { service.execute }.not_to change { ci_finished_builds_row_count }
end
end
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment