From 516edb0b684bb9a51db9f1d2056a08314b9d0f4a Mon Sep 17 00:00:00 2001
From: Douglas Barbosa Alexandre <dbalexandre@gmail.com>
Date: Thu, 9 Nov 2023 22:14:11 +0000
Subject: [PATCH] Merge branch 'mk/backport-16-5-fix-base-batcher' into
 '16-5-stable-ee'

For 16.5: Fix Geo verification state backfill job can exceed batch size

See merge request https://gitlab.com/gitlab-org/gitlab/-/merge_requests/136399

Merged-by: Douglas Barbosa Alexandre <dbalexandre@gmail.com>
Approved-by: Douglas Barbosa Alexandre <dbalexandre@gmail.com>


(cherry picked from commit 5cd26902f98b56679d3981c41fe6ab4b93b81489)

a62bf5f2 Merge branch 'mk/fix-base-batcher' into 'master'
---
 .../verification_state_backfill_service.rb    |   6 +-
 ee/lib/gitlab/geo/base_batcher.rb             |  27 +++-
 ee/spec/lib/gitlab/geo/base_batcher_spec.rb   |  42 ++++++
 .../lib/gitlab/geo/registry_batcher_spec.rb   | 118 +++++++++++++++++
 .../geo/registry_consistency_service_spec.rb  |   8 +-
 ee/spec/support/helpers/ee/geo_helpers.rb     |  28 ++--
 .../models/geo_batcher_shared_examples.rb     | 123 +++---------------
 7 files changed, 227 insertions(+), 125 deletions(-)
 create mode 100644 ee/spec/lib/gitlab/geo/base_batcher_spec.rb

diff --git a/ee/app/services/geo/verification_state_backfill_service.rb b/ee/app/services/geo/verification_state_backfill_service.rb
index 3f0ac2f0516f67d6..00345b87ace977c9 100644
--- a/ee/app/services/geo/verification_state_backfill_service.rb
+++ b/ee/app/services/geo/verification_state_backfill_service.rb
@@ -32,7 +32,11 @@ def execute
 
     # @return [Range] the next range of a batch of records
     def next_range!
-      Gitlab::Geo::BaseBatcher.new(replicable_model, verification_state_table_class, verification_state_model_key, key: batcher_key, batch_size: batch_size).next_range!
+      batcher.next_range!
+    end
+
+    def batcher
+      Gitlab::Geo::BaseBatcher.new(replicable_model, verification_state_table_class, verification_state_model_key, key: batcher_key, batch_size: batch_size)
     end
 
     def batcher_key
diff --git a/ee/lib/gitlab/geo/base_batcher.rb b/ee/lib/gitlab/geo/base_batcher.rb
index a897660ef7ec7773..3b4111b4b83657fe 100644
--- a/ee/lib/gitlab/geo/base_batcher.rb
+++ b/ee/lib/gitlab/geo/base_batcher.rb
@@ -44,16 +44,35 @@ def next_range!
       # @param [Integer] batch_first_id the first ID of the batch
       # @return [Integer] batch_last_id the last ID of the batch (not the table)
       def get_batch_last_id(batch_first_id)
-        source_class_last_id, more_records = get_source_batch_last_id(batch_first_id)
+        source_class_last_id, more_source_records = get_source_batch_last_id(batch_first_id)
         destination_class_last_id, more_destination_records = get_destination_batch_last_id(batch_first_id)
-
+        more_records = more_source_records || more_destination_records
+        batch_last_id = batch_first_id + batch_size - 1
+
+        # Performance optimization:
+        #
+        # We *could* use batch_last_id as-is, but that would be really slow when there are a lot
+        # of unused IDs. This is not an unusual case for old, large GitLab instances. So this
+        # logic helps skip the gaps.
         batch_last_id = if source_class_last_id && destination_class_last_id
-                          [source_class_last_id, destination_class_last_id].max
+                          if source_class_last_id > batch_last_id || destination_class_last_id > batch_last_id
+                            # We are about to return the smaller increment, therefore there are more records to be
+                            # processed.
+                            more_records = true
+
+                            # Use the smaller increment to avoid returning more rows than batch_size. In some cases,
+                            # this will return fewer rows than batch_size.
+                            # See https://gitlab.com/gitlab-org/gitlab/-/issues/430913
+                            [source_class_last_id, destination_class_last_id].min
+                          else
+                            # Both increment less than batch size, so take the larger one
+                            [source_class_last_id, destination_class_last_id].max
+                          end
                         else
                           source_class_last_id || destination_class_last_id
                         end
 
-        if more_records || more_destination_records
+        if more_records
           increment_batch(batch_last_id)
         elsif batch_first_id > 1
           reset
diff --git a/ee/spec/lib/gitlab/geo/base_batcher_spec.rb b/ee/spec/lib/gitlab/geo/base_batcher_spec.rb
new file mode 100644
index 0000000000000000..838179c15146c53e
--- /dev/null
+++ b/ee/spec/lib/gitlab/geo/base_batcher_spec.rb
@@ -0,0 +1,42 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Gitlab::Geo::BaseBatcher,
+  :use_clean_rails_memory_store_caching,
+  feature_category: :geo_replication do
+  include EE::GeoHelpers
+
+  # Models which are operated on by VerificationStateBackfillService
+  models_with_separate_verification_state_table = [
+    Ci::JobArtifact,
+    Ci::SecureFile,
+    ContainerRepository,
+    DependencyProxy::Blob,
+    DependencyProxy::Manifest,
+    DesignManagement::Repository,
+    GroupWikiRepository,
+    LfsObject,
+    MergeRequestDiff,
+    PagesDeployment,
+    Projects::WikiRepository,
+    Project,
+    Upload
+  ]
+
+  models_with_separate_verification_state_table.each do |model|
+    context "for #{model.name}" do
+      let(:source_class) { model }
+      let(:destination_class) { model.verification_state_table_class }
+      let(:destination_class_factory) { registry_factory_name(destination_class) }
+      let(:key) { "verification_backfill:#{model.name.parameterize}" }
+
+      def batcher(batch_size)
+        service = Geo::VerificationStateBackfillService.new(source_class, batch_size: batch_size)
+        service.send(:batcher)
+      end
+
+      include_examples 'is a Geo batcher'
+    end
+  end
+end
diff --git a/ee/spec/lib/gitlab/geo/registry_batcher_spec.rb b/ee/spec/lib/gitlab/geo/registry_batcher_spec.rb
index 2f616056ce00df42..6f3f3e72c8174819 100644
--- a/ee/spec/lib/gitlab/geo/registry_batcher_spec.rb
+++ b/ee/spec/lib/gitlab/geo/registry_batcher_spec.rb
@@ -10,6 +10,124 @@
   let(:source_class) { LfsObject }
   let(:destination_class) { Geo::LfsObjectRegistry }
   let(:destination_class_factory) { registry_factory_name(destination_class) }
+  let(:key) { 'looping_batcher_spec' }
+
+  def batcher(batch_size)
+    described_class.new(destination_class, key: key, batch_size: batch_size)
+  end
 
   include_examples 'is a Geo batcher'
+
+  # Only Geo registry rows can get into these situations because they cannot have foreign key
+  # constraints across databases.
+  #
+  # As opposed to Verification state backfill, which operates on tables which are stored in the same
+  # database.
+  describe '#next_range!' do
+    let(:source_foreign_key) { batcher(batch_size).send(:source_foreign_key) }
+    let(:batch_size) { 2 }
+
+    subject { batcher(batch_size).next_range! }
+
+    context 'when there are no records but there are orphaned destination_records' do
+      let!(:destination_records) { create_list(destination_class_factory, 3) }
+
+      before do
+        source_class.delete_all
+      end
+
+      context 'when it has never been called before' do
+        it { is_expected.to be_a Range }
+
+        it 'starts from the beginning' do
+          expect(subject.first).to eq(1)
+        end
+
+        it 'ends at a full batch' do
+          expect(subject.last).to eq(destination_records.second.public_send(source_foreign_key))
+        end
+
+        context 'when the batch size is greater than the number of destination_records' do
+          let(:batch_size) { 5 }
+
+          it 'ends at the last ID' do
+            expect(subject.last).to eq(destination_records.last.public_send(source_foreign_key))
+          end
+        end
+      end
+
+      context 'when it was called before' do
+        context 'when the previous batch included the end of the table' do
+          before do
+            batcher(destination_class.count).next_range!
+          end
+
+          it 'starts from the beginning' do
+            expect(subject).to eq(1..destination_records.second.public_send(source_foreign_key))
+          end
+        end
+
+        context 'when the previous batch did not include the end of the table' do
+          before do
+            batcher(destination_class.count - 1).next_range!
+          end
+
+          it 'starts after the previous batch' do
+            last_id = destination_records.last.public_send(source_foreign_key)
+            expect(subject).to eq(last_id..last_id)
+          end
+        end
+
+        context 'if cache is cleared' do
+          before do
+            batcher(batch_size).next_range!
+          end
+
+          it 'starts from the beginning' do
+            Rails.cache.clear
+
+            expect(subject).to eq(1..destination_records.second.public_send(source_foreign_key))
+          end
+        end
+      end
+    end
+
+    context 'when there are records and orphaned destination_records with foreign key greater than last record id' do
+      let!(:records) { create_list(factory_name(source_class), 3) }
+      let(:orphaned_destination_foreign_key_id) { records.last.id }
+      let!(:destination) do
+        create(destination_class_factory, source_foreign_key => orphaned_destination_foreign_key_id)
+      end
+
+      before do
+        source_class.primary_key_in(orphaned_destination_foreign_key_id).delete_all
+      end
+
+      context 'when it has never been called before' do
+        it { is_expected.to be_a Range }
+
+        it 'ends at a full batch' do
+          expect(subject).to eq(1..records.second.id)
+        end
+      end
+
+      context 'when it was called before' do
+        before do
+          batcher(batch_size).next_range!
+        end
+
+        it 'ends at the last destination foreign key ID' do
+          expect(subject).to eq(orphaned_destination_foreign_key_id..orphaned_destination_foreign_key_id)
+        end
+
+        context 'if cache is cleared' do
+          it 'ends at a full batch' do
+            Rails.cache.clear
+
+            expect(subject).to eq(1..records.second.id)
+          end
+        end
+      end
+    end
+  end
 end
diff --git a/ee/spec/services/geo/registry_consistency_service_spec.rb b/ee/spec/services/geo/registry_consistency_service_spec.rb
index 48b6f3ba00d7507e..1e0a7be5c11b51d7 100644
--- a/ee/spec/services/geo/registry_consistency_service_spec.rb
+++ b/ee/spec/services/geo/registry_consistency_service_spec.rb
@@ -137,13 +137,17 @@
           end
 
           it 'deletes unused registries', :sidekiq_inline do
-            subject.execute
+            # Executing two times because when the first source record's ID is 3 or greater, the
+            # first execution processes a range which only skips over the gap of unused IDs.
+            2.times { subject.execute }
 
             expect(registry_class.where(model_foreign_key => unused_registry_ids)).to be_empty
           end
 
           it 'returns truthy' do
-            expect(subject.execute).to be_truthy
+            # Executing two times because when the first source record's ID is 3 or greater, the
+            # first execution processes a range which only skips over the gap of unused IDs.
+            expect(2.times { subject.execute }).to be_truthy
           end
         end
       end
diff --git a/ee/spec/support/helpers/ee/geo_helpers.rb b/ee/spec/support/helpers/ee/geo_helpers.rb
index 9eb59c0057e5dad1..a7dcf4bc2cd9e04e 100644
--- a/ee/spec/support/helpers/ee/geo_helpers.rb
+++ b/ee/spec/support/helpers/ee/geo_helpers.rb
@@ -52,22 +52,26 @@ def stub_batch_counter_transaction_open_check
       end
     end
 
+    def factory_name(klass)
+      klass_name = klass.name
+      default_factory_name = klass_name.underscore.tr('/', '_').to_sym
+
+      custom_mapping = {
+        'Ci::JobArtifact' => :ee_ci_job_artifact,
+        'MergeRequestDiff' => :external_merge_request_diff,
+        'Packages::PackageFile' => :package_file,
+        'Projects::WikiRepository' => :project_wiki_repository
+      }
+
+      custom_mapping.fetch(klass_name, default_factory_name)
+    end
+
     def model_class_factory_name(registry_class)
-      default_factory_name = registry_class::MODEL_CLASS.underscore.tr('/', '_').to_sym
-
-      {
-        Geo::MergeRequestDiffRegistry => :external_merge_request_diff,
-        Geo::PackageFileRegistry => :package_file,
-        Geo::UploadRegistry => :upload,
-        Geo::JobArtifactRegistry => :ci_job_artifact,
-        Geo::CiSecureFileRegistry => :ci_secure_file,
-        Geo::ProjectWikiRepositoryRegistry => :project_wiki_repository,
-        Geo::ProjectRepositoryRegistry => :project
-      }.fetch(registry_class, default_factory_name)
+      factory_name(registry_class::MODEL_CLASS)
     end
 
     def registry_factory_name(registry_class)
-      registry_class.underscore.tr('/', '_').to_sym
+      factory_name(registry_class)
     end
 
     def with_no_geo_database_configured(&block)
diff --git a/ee/spec/support/shared_examples/models/geo_batcher_shared_examples.rb b/ee/spec/support/shared_examples/models/geo_batcher_shared_examples.rb
index db9e95025321bb5c..e83379524cbfc8eb 100644
--- a/ee/spec/support/shared_examples/models/geo_batcher_shared_examples.rb
+++ b/ee/spec/support/shared_examples/models/geo_batcher_shared_examples.rb
@@ -1,80 +1,22 @@
 # frozen_string_literal: true
 
+# Requires the caller to define a method `def batcher(batch_size)` which returns
+# an instance of the described class.
 RSpec.shared_examples 'is a Geo batcher' do
   include EE::GeoHelpers
 
   describe '#next_range!' do
-    let(:batcher) { described_class.new(destination_class, key: key, batch_size: batch_size) }
-    let(:source_foreign_key) { batcher.send(:source_foreign_key) }
-    let(:key) { 'looping_batcher_spec' }
+    let(:source_foreign_key) { batcher(batch_size).send(:source_foreign_key) }
     let(:batch_size) { 2 }
 
-    subject { batcher.next_range! }
+    subject { batcher(batch_size).next_range! }
 
     context 'when there are no records' do
       it { is_expected.to be_nil }
     end
 
-    context 'when there are no records but there are orphaned destination_records' do
-      let!(:destination_records) { create_list(destination_class_factory, 3) }
-
-      context 'when it has never been called before' do
-        it { is_expected.to be_a Range }
-
-        it 'starts from the beginning' do
-          expect(subject.first).to eq(1)
-        end
-
-        it 'ends at a full batch' do
-          expect(subject.last).to eq(destination_records.second.public_send(source_foreign_key))
-        end
-
-        context 'when the batch size is greater than the number of destination_records' do
-          let(:batch_size) { 5 }
-
-          it 'ends at the last ID' do
-            expect(subject.last).to eq(destination_records.last.public_send(source_foreign_key))
-          end
-        end
-      end
-
-      context 'when it was called before' do
-        context 'when the previous batch included the end of the table' do
-          before do
-            described_class.new(destination_class, key: key, batch_size: destination_class.count).next_range!
-          end
-
-          it 'starts from the beginning' do
-            expect(subject).to eq(1..destination_records.second.public_send(source_foreign_key))
-          end
-        end
-
-        context 'when the previous batch did not include the end of the table' do
-          before do
-            described_class.new(destination_class, key: key, batch_size: destination_class.count - 1).next_range!
-          end
-
-          it 'starts after the previous batch' do
-            expect(subject).to eq(destination_records.last.public_send(source_foreign_key)..destination_records.last.public_send(source_foreign_key))
-          end
-        end
-
-        context 'if cache is cleared' do
-          before do
-            described_class.new(destination_class, key: key, batch_size: batch_size).next_range!
-          end
-
-          it 'starts from the beginning' do
-            Rails.cache.clear
-
-            expect(subject).to eq(1..destination_records.second.public_send(source_foreign_key))
-          end
-        end
-      end
-    end
-
     context 'when there are records' do
-      let!(:records) { create_list(source_class.underscore, 3) }
+      let!(:records) { create_list(factory_name(source_class), 3) }
 
       context 'when it has never been called before' do
         it { is_expected.to be_a Range }
@@ -94,12 +36,21 @@
             expect(subject.last).to eq(records.last.id)
           end
         end
+
+        context 'when the destination table has a gap of at least one batch' do
+          let(:batch_size) { 1 }
+          let!(:destination_records) { create(destination_class_factory, source_foreign_key => records.last.id) }
+
+          it 'returns a batch ending at batch size' do
+            expect(subject).to eq(1..source_class.first.id)
+          end
+        end
       end
 
       context 'when it was called before' do
         context 'when the previous batch included the end of the table' do
           before do
-            described_class.new(destination_class, key: key, batch_size: source_class.count).next_range!
+            batcher(source_class.count).next_range!
           end
 
           it 'starts from the beginning' do
@@ -109,7 +60,7 @@
 
         context 'when the previous batch did not include the end of the table' do
           before do
-            described_class.new(destination_class, key: key, batch_size: source_class.count - 1).next_range!
+            batcher(source_class.count - 1).next_range!
           end
 
           it 'starts after the previous batch' do
@@ -119,7 +70,7 @@
 
         context 'if cache is cleared' do
           before do
-            described_class.new(destination_class, key: key, batch_size: batch_size).next_range!
+            batcher(batch_size).next_range!
           end
 
           it 'starts from the beginning' do
@@ -130,45 +81,5 @@
         end
       end
     end
-
-    context 'when there are records and orphaned destination_records with foreign key greater than last record id' do
-      let!(:records) { create_list(source_class.underscore, 3) }
-      let(:orphaned_destination_foreign_key_id) { records.last.id }
-      let!(:destination) { create(destination_class_factory, source_foreign_key => orphaned_destination_foreign_key_id) }
-
-      before do
-        source_class.where(id: orphaned_destination_foreign_key_id).delete_all
-      end
-
-      context 'when it has never been called before' do
-        it { is_expected.to be_a Range }
-
-        it 'starts from the beginning' do
-          expect(subject.first).to eq(1)
-        end
-
-        it 'ends at the last destination foreign key ID' do
-          expect(subject.last).to eq(orphaned_destination_foreign_key_id)
-        end
-      end
-
-      context 'when it was called before' do
-        before do
-          described_class.new(destination_class, key: key, batch_size: batch_size).next_range!
-        end
-
-        it 'starts from the beginning' do
-          expect(subject).to eq(1..orphaned_destination_foreign_key_id)
-        end
-
-        context 'if cache is cleared' do
-          it 'starts from the beginning' do
-            Rails.cache.clear
-
-            expect(subject).to eq(1..orphaned_destination_foreign_key_id)
-          end
-        end
-      end
-    end
   end
 end
-- 
GitLab