From 3264d80fa912f4076e41e9005800d261a8b8c886 Mon Sep 17 00:00:00 2001
From: John Mason <jmason@gitlab.com>
Date: Thu, 12 Dec 2024 09:26:25 -0500
Subject: [PATCH 1/6] Add zoekt provisioning service

---
 app/models/namespaces/project_namespace.rb    |   2 +
 config/initializers/1_settings.rb             |   3 +
 ee/app/models/search/zoekt/node.rb            |   2 +-
 .../services/search/zoekt/planning_service.rb | 243 +++++++++++++++
 .../search/zoekt/provisioning_service.rb      | 118 ++++++++
 .../services/search/zoekt/rollout_service.rb  |  78 +++++
 .../search/zoekt/selection_service.rb         |  48 +++
 ee/app/workers/all_queues.yml                 |  10 +
 ee/app/workers/search/zoekt/rollout_worker.rb |  57 ++++
 .../ops/zoekt_rollout_worker.yml              |   9 +
 ee/spec/models/search/zoekt/node_spec.rb      |   4 +-
 .../search/zoekt/planning_service_spec.rb     | 148 ++++++++++
 .../search/zoekt/provisioning_service_spec.rb | 278 ++++++++++++++++++
 .../search/zoekt/rollout_service_spec.rb      | 151 ++++++++++
 .../search/zoekt/selection_service_spec.rb    | 109 +++++++
 .../namespaces/project_namespace_spec.rb      |  28 ++
 16 files changed, 1285 insertions(+), 3 deletions(-)
 create mode 100644 ee/app/services/search/zoekt/planning_service.rb
 create mode 100644 ee/app/services/search/zoekt/provisioning_service.rb
 create mode 100644 ee/app/services/search/zoekt/rollout_service.rb
 create mode 100644 ee/app/services/search/zoekt/selection_service.rb
 create mode 100644 ee/app/workers/search/zoekt/rollout_worker.rb
 create mode 100644 ee/config/feature_flags/ops/zoekt_rollout_worker.yml
 create mode 100644 ee/spec/services/search/zoekt/planning_service_spec.rb
 create mode 100644 ee/spec/services/search/zoekt/provisioning_service_spec.rb
 create mode 100644 ee/spec/services/search/zoekt/rollout_service_spec.rb
 create mode 100644 ee/spec/services/search/zoekt/selection_service_spec.rb

diff --git a/app/models/namespaces/project_namespace.rb b/app/models/namespaces/project_namespace.rb
index d60800af67067349..e1be5ccbe84b7542 100644
--- a/app/models/namespaces/project_namespace.rb
+++ b/app/models/namespaces/project_namespace.rb
@@ -4,6 +4,8 @@ module Namespaces
   class ProjectNamespace < Namespace
     self.allow_legacy_sti_class = true
 
+    scope :with_project_statistics, -> { includes(project: :statistics) }
+
     # These aliases are added to make it easier to sync parent/parent_id attribute with
     # project.namespace/project.namespace_id attribute.
     #
diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb
index 14a37ce32223e7bb..8a02b72906abd3a7 100644
--- a/config/initializers/1_settings.rb
+++ b/config/initializers/1_settings.rb
@@ -842,6 +842,9 @@
   Settings.cron_jobs['search_zoekt_scheduling_worker'] ||= {}
   Settings.cron_jobs['search_zoekt_scheduling_worker']['cron'] ||= '*/1 * * * *'
   Settings.cron_jobs['search_zoekt_scheduling_worker']['job_class'] ||= 'Search::Zoekt::SchedulingWorker'
+  Settings.cron_jobs['search_zoekt_rollout_worker'] ||= {}
+  Settings.cron_jobs['search_zoekt_rollout_worker']['cron'] ||= '*/10 * * * *'
+  Settings.cron_jobs['search_zoekt_rollout_worker']['job_class'] ||= 'Search::Zoekt::RolloutWorker'
   Settings.cron_jobs['search_elastic_metrics_update_cron_worker'] ||= {}
   Settings.cron_jobs['search_elastic_metrics_update_cron_worker']['cron'] ||= '*/1 * * * *'
   Settings.cron_jobs['search_elastic_metrics_update_cron_worker']['job_class'] ||= 'Search::Elastic::MetricsUpdateCronWorker'
diff --git a/ee/app/models/search/zoekt/node.rb b/ee/app/models/search/zoekt/node.rb
index 1e70d8209478ea31..22e8b8cb45c86435 100644
--- a/ee/app/models/search/zoekt/node.rb
+++ b/ee/app/models/search/zoekt/node.rb
@@ -53,7 +53,7 @@ class Node < ApplicationRecord
         sql = <<~SQL
           zoekt_nodes.*, #{UNCLAIMED_STORAGE_BYTES_FORMULA} AS unclaimed_storage_bytes
         SQL
-        left_joins(:indices).group(:id).having("#{UNCLAIMED_STORAGE_BYTES_FORMULA} >= 0").select(sql)
+        left_joins(:indices).group(:id).having("#{UNCLAIMED_STORAGE_BYTES_FORMULA} > 0").select(sql)
       end
       scope :order_by_unclaimed_space, -> do
         with_positive_unclaimed_storage_bytes.order('unclaimed_storage_bytes')
diff --git a/ee/app/services/search/zoekt/planning_service.rb b/ee/app/services/search/zoekt/planning_service.rb
new file mode 100644
index 0000000000000000..d556e41369bd4354
--- /dev/null
+++ b/ee/app/services/search/zoekt/planning_service.rb
@@ -0,0 +1,243 @@
+# frozen_string_literal: true
+
+module Search
+  module Zoekt
+    class PlanningService
+      attr_reader :enabled_namespaces, :nodes, :options
+
+      def self.plan(**kwargs)
+        new(**kwargs).plan
+      end
+
+      def initialize(
+        enabled_namespaces:, nodes:, **options)
+        @enabled_namespaces = Array(enabled_namespaces)
+        @nodes = format_nodes(nodes)
+        @options = options
+      end
+
+      def plan
+        all_plans = enabled_namespaces.map do |enabled_namespace|
+          Plan.new(
+            enabled_namespace: enabled_namespace,
+            nodes: nodes,
+            **options
+          ).generate
+        end
+
+        namespace_plans = []
+        failed_plans = []
+
+        all_plans.each do |p|
+          if p[:errors].present?
+            failed_plans << p
+          else
+            namespace_plans << p
+          end
+        end
+
+        successful_plans = all_plans.filter { |p| !p[:errors].present? }
+
+        {
+          total_required_storage_bytes: namespace_plans.sum { |plan| plan[:namespace_required_storage_bytes] },
+          namespaces: successful_plans,
+          nodes: build_node_change_plan,
+          failures: failed_plans
+        }
+      end
+
+      private
+
+      def format_nodes(zoekt_nodes)
+        Array(zoekt_nodes).map do |node|
+          {
+            id: node.id,
+            unclaimed_storage_bytes: node.unclaimed_storage_bytes,
+            unclaimed_storage_bytes_before: node.unclaimed_storage_bytes,
+            indices: [],
+            namespace_ids: [],
+            node: node
+          }
+        end
+      end
+
+      # presentation only
+      def build_node_change_plan
+        nodes.filter { |hsh| hsh[:indices].present? }.map do |hsh|
+          {
+            id: hsh[:id],
+            unclaimed_storage_bytes_before: hsh[:unclaimed_storage_bytes_before],
+            unclaimed_storage_bytes_after: hsh[:unclaimed_storage_bytes],
+            claimed_storage_bytes: hsh[:indices].sum { |hsh| hsh[:required_storage_bytes] },
+            namespace_ids: hsh[:namespace_ids].sort,
+            indices: hsh[:indices].sort_by { |hsh| hsh[:namespace_id] }
+          }
+        end
+      end
+
+      class Plan
+        attr_reader :enabled_namespace, :namespace, :num_replicas, :buffer_factor,
+          :max_indices_per_replica, :errors, :replica_plans, :nodes
+
+        def initialize(
+          enabled_namespace:, num_replicas:, nodes:,
+          buffer_factor: 3, max_indices_per_replica: 5)
+          @enabled_namespace = enabled_namespace
+          @namespace = enabled_namespace.namespace
+          @num_replicas = num_replicas
+          @buffer_factor = buffer_factor
+          @max_indices_per_replica = max_indices_per_replica
+          @errors = []
+          @replica_plans = []
+          @nodes = nodes
+          @used_node_ids = Set.new
+        end
+
+        def generate
+          if fetch_project_namespaces.exists?
+            num_replicas.times { simulate_replica_plan }
+          else
+            create_empty_replica
+          end
+
+          {
+            namespace_id: namespace.id,
+            enabled_namespace_id: enabled_namespace.id,
+            namespace_required_storage_bytes: calculate_namespace_storage,
+            replicas: replica_plans,
+            errors: errors.uniq
+          }
+        end
+
+        private
+
+        attr_reader :project_namespace_id
+
+        def simulate_replica_plan
+          replica_indices = []
+
+          fetch_project_namespaces.find_each do |project_namespace|
+            project = project_namespace.project
+            stats = project.statistics
+
+            if replica_indices.size >= max_indices_per_replica
+              log_error(replica_plans.size, :index_limit_exceeded,
+                "Replica reached maximum index limit (#{max_indices_per_replica})")
+              break
+            end
+
+            @project_namespace_id = project_namespace.id
+            assign_to_node(stats, replica_indices)
+          end
+
+          add_replica_plan(replica_indices)
+        end
+
+        def create_empty_replica
+          candidate_nodes = nodes.reject { |node| @used_node_ids.include?(node[:id]) }
+          best_node = candidate_nodes.max_by { |node| node[:unclaimed_storage_bytes] }
+
+          if best_node
+            replica_indices = [simulate_index(best_node)]
+            add_replica_plan(replica_indices)
+            @used_node_ids.add(best_node[:id])
+          else
+            log_error(nil, :node_unavailable, "No nodes available to create an empty replica")
+          end
+        end
+
+        def fetch_project_namespaces
+          scope = ::Namespace.by_root_id(namespace.id).project_namespaces
+
+          if scope.respond_to?(:with_project_statistics)
+            scope.with_project_statistics
+          else # TODO: remove this condition before merging
+            scope.includes(project: :statistics) # rubocop:disable CodeReuse/ActiveRecord -- will remove
+          end
+        end
+
+        def assign_to_node(stats, replica_indices)
+          best_node = find_best_node(stats)
+
+          if best_node
+            assign_project_to_index(best_node, stats, replica_indices)
+          else
+            log_error(replica_plans.size, :node_unavailable,
+              "No node can accommodate project #{stats.project_id} with size #{scaled_size(stats)}")
+          end
+        end
+
+        def find_best_node(stats)
+          nodes
+            .reject { |node| @used_node_ids.include?(node[:id]) }
+            .select { |node| node[:unclaimed_storage_bytes] && node[:unclaimed_storage_bytes] >= scaled_size(stats) }
+            .max_by { |node| node[:unclaimed_storage_bytes] }
+        end
+
+        def assign_project_to_index(node, stats, replica_indices)
+          project_size = scaled_size(stats)
+
+          index = replica_indices
+                    .select { |idx| idx[:required_storage_bytes] + project_size <= idx[:max_storage_bytes] }
+                    .min_by { |idx| idx[:max_storage_bytes] - (idx[:required_storage_bytes] + project_size) }
+
+          unless index
+            if replica_indices.size >= max_indices_per_replica
+              log_error(nil, :index_limit_exceeded, "Max indices per replica reached")
+              return
+            end
+
+            index = simulate_index(node)
+            replica_indices << index
+
+            node[:indices] ||= []
+            node[:indices] << index
+            node[:namespace_ids] << namespace.id unless node[:namespace_ids].include?(namespace.id)
+            @used_node_ids.add(node[:id])
+          end
+
+          add_project_to_index(index, stats)
+          node[:unclaimed_storage_bytes] -= project_size
+        end
+
+        def simulate_index(node)
+          {
+            node_id: node[:id],
+            namespace_id: namespace.id,
+            projects: { project_namespace_id_from: nil, project_namespace_id_to: nil },
+            required_storage_bytes: 0,
+            max_storage_bytes: node[:unclaimed_storage_bytes]
+          }
+        end
+
+        def add_project_to_index(index, stats)
+          index[:projects][:project_namespace_id_from] ||= project_namespace_id
+          index[:projects][:project_namespace_id_to] = project_namespace_id
+          index[:required_storage_bytes] += scaled_size(stats)
+        end
+
+        def add_replica_plan(replica_indices)
+          replica_indices.last[:projects][:project_namespace_id_to] = nil if replica_indices.any?
+
+          replica_plans << { indices: replica_indices.map { |index| format_index(index) } }
+        end
+
+        def format_index(index)
+          index.slice(:node_id, :projects, :required_storage_bytes, :max_storage_bytes)
+        end
+
+        def calculate_namespace_storage
+          replica_plans.sum { |replica| replica[:indices].sum { |index| index[:required_storage_bytes] } }
+        end
+
+        def scaled_size(stats)
+          stats.repository_size * buffer_factor
+        end
+
+        def log_error(replica_idx, type, details)
+          @errors << { namespace_id: namespace.id, replica_idx: replica_idx, type: type, details: details }
+        end
+      end
+    end
+  end
+end
diff --git a/ee/app/services/search/zoekt/provisioning_service.rb b/ee/app/services/search/zoekt/provisioning_service.rb
new file mode 100644
index 0000000000000000..a87ec5996c6d47c3
--- /dev/null
+++ b/ee/app/services/search/zoekt/provisioning_service.rb
@@ -0,0 +1,118 @@
+# frozen_string_literal: true
+
+module Search
+  module Zoekt
+    class ProvisioningService
+      def self.execute(plan)
+        new(plan).execute
+      end
+
+      attr_reader :plan, :errors
+
+      def initialize(plan)
+        @plan = plan
+        @errors = []
+      end
+
+      def execute
+        ApplicationRecord.transaction do
+          plan[:namespaces].each do |namespace_plan|
+            next if namespace_plan[:errors].present?
+
+            process_namespace(namespace_plan)
+          end
+        end
+        { errors: @errors }
+      rescue StandardError => e
+        log_error(:transaction_failed, e.message, e.backtrace)
+        { errors: @errors }
+      end
+
+      private
+
+      def process_namespace(namespace_plan)
+        namespace_id = namespace_plan.fetch(:namespace_id)
+        enabled_namespace_id = namespace_plan.fetch(:enabled_namespace_id)
+
+        # Remove any pre-existing replicas for this namespace since we are provisioning new ones.
+        enabled_namespace = Search::Zoekt::EnabledNamespace.for_root_namespace_id(namespace_id).first
+        if enabled_namespace.nil?
+          log_error(:missing_enabled_namespace, "Enabled namespace not found for namespace ID: #{namespace_id}")
+          return
+        end
+
+        enabled_namespace.replicas.delete_all
+
+        if Index.for_root_namespace_id(namespace_id).exists?
+          log_error(:index_already_exists, "Indices already exists for namespace ID: #{namespace_id}")
+          return
+        end
+
+        namespace_plan[:replicas].each do |replica_plan|
+          process_replica(
+            namespace_id: namespace_id,
+            enabled_namespace_id: enabled_namespace_id,
+            replica_plan: replica_plan
+          )
+        end
+      end
+
+      def process_replica(namespace_id:, enabled_namespace_id:, replica_plan:)
+        replica = Replica.create!(namespace_id: namespace_id, zoekt_enabled_namespace_id: enabled_namespace_id)
+
+        replica_plan[:indices].each do |index_plan|
+          process_index(replica, index_plan)
+        end
+      end
+
+      def process_index(replica, index_plan)
+        node = Node.find(index_plan[:node_id])
+        required_storage_bytes = index_plan[:required_storage_bytes]
+
+        if required_storage_bytes > node.unclaimed_storage_bytes
+          log_error(
+            :node_capacity_exceeded,
+            "Node #{node.id} has #{node.unclaimed_storage_bytes} unclaimed storage bytes and " \
+              "cannot fit #{required_storage_bytes} bytes."
+          )
+          return
+        end
+
+        required_storage_bytes = 1.kilobyte if required_storage_bytes == 0
+        Index.create!(
+          replica: replica,
+          zoekt_enabled_namespace_id: replica.zoekt_enabled_namespace_id,
+          namespace_id: replica.namespace_id,
+          zoekt_node_id: node.id,
+          reserved_storage_bytes: required_storage_bytes,
+
+          # Workaround: we remove nil project_namespace_id_to since it is not a valid property in json validator.
+          metadata: index_plan[:projects].compact
+        )
+
+        update_node_storage(node, required_storage_bytes)
+      end
+
+      def update_node_storage(node, used_bytes)
+        node.update!(used_bytes: node.used_bytes + used_bytes)
+      end
+
+      def log_error(message, details, trace = [])
+        err = {
+          class: self.class.name,
+          message: message,
+          details: details,
+          trace: trace.slice(0, 5)
+        }
+
+        @errors << err
+
+        logger.error(**err)
+      end
+
+      def logger
+        @logger ||= Search::Zoekt::Logger.build
+      end
+    end
+  end
+end
diff --git a/ee/app/services/search/zoekt/rollout_service.rb b/ee/app/services/search/zoekt/rollout_service.rb
new file mode 100644
index 0000000000000000..a489bd0d7ccaffae
--- /dev/null
+++ b/ee/app/services/search/zoekt/rollout_service.rb
@@ -0,0 +1,78 @@
+# frozen_string_literal: true
+
+module Search
+  module Zoekt
+    class RolloutService
+      DEFAULT_OPTIONS = {
+        num_replicas: 1,
+        max_indices_per_replica: 5,
+        dry_run: true,
+        batch_size: 128
+      }.freeze
+
+      Result = Struct.new(:success?, :message)
+
+      def self.execute(**kwargs)
+        new(**kwargs).execute
+      end
+
+      attr_reader :num_replicas, :max_indices_per_replica, :batch_size, :dry_run
+
+      def initialize(**kwargs)
+        options = DEFAULT_OPTIONS.merge(kwargs)
+
+        @num_replicas = options.fetch(:num_replicas)
+        @max_indices_per_replica = options.fetch(:max_indices_per_replica)
+        @batch_size = options.fetch(:batch_size)
+        @dry_run = options.fetch(:dry_run)
+
+        @logger = options[:logger] if options.has_key?(:logger)
+      end
+
+      def execute
+        logger.info "Selecting resources"
+        resource_pool = ::Search::Zoekt::SelectionService.execute(
+          max_batch_size: batch_size
+        )
+
+        return failed_result("No enabled namespaces found", logger) if resource_pool.enabled_namespaces.empty?
+
+        return failed_result("No available nodes found", logger) if resource_pool.nodes.empty?
+
+        logger.info "Planning"
+        plan = ::Search::Zoekt::PlanningService.plan(
+          enabled_namespaces: resource_pool.enabled_namespaces,
+          nodes: resource_pool.nodes,
+          num_replicas: num_replicas,
+          max_indices_per_replica: max_indices_per_replica
+        )
+        logger.info plan.to_json
+
+        return successful_result("Skipping execution of changes because of dry run", logger) if dry_run
+
+        logger.info "Executing changes"
+        changes = ::Search::Zoekt::ProvisioningService.execute(plan)
+
+        return failed_result("Change had an error: #{changes[:errors]}", logger) if changes[:errors].present?
+
+        successful_result("Rollout execution completed successfully", logger)
+      end
+
+      private
+
+      def logger
+        @logger ||= ::Search::Zoekt::Logger.build
+      end
+
+      def failed_result(message, logger)
+        logger.info message
+        Result.new(false, message)
+      end
+
+      def successful_result(message, logger)
+        logger.info message
+        Result.new(true, message)
+      end
+    end
+  end
+end
diff --git a/ee/app/services/search/zoekt/selection_service.rb b/ee/app/services/search/zoekt/selection_service.rb
new file mode 100644
index 0000000000000000..bdaec9e7531274d1
--- /dev/null
+++ b/ee/app/services/search/zoekt/selection_service.rb
@@ -0,0 +1,48 @@
+# frozen_string_literal: true
+
+module Search
+  module Zoekt
+    class SelectionService
+      attr_reader :max_batch_size
+
+      ResourcePool = Struct.new(:enabled_namespaces, :nodes)
+
+      def self.execute(**kwargs)
+        new(**kwargs).execute
+      end
+
+      def initialize(max_batch_size: 128)
+        @max_batch_size = max_batch_size
+      end
+
+      def execute
+        namespaces = fetch_enabled_namespace_for_indexing
+        nodes = fetch_available_nodes
+
+        ResourcePool.new(namespaces, nodes)
+      end
+
+      private
+
+      def log_info(str)
+        logger&.info(str)
+      end
+
+      def fetch_enabled_namespace_for_indexing(project_count_limit: 20_000)
+        [].tap do |batch|
+          ::Search::Zoekt::EnabledNamespace.with_missing_indices.find_each do |ns|
+            next if ::Namespace.by_root_id(ns.root_namespace_id).project_namespaces.count > project_count_limit
+
+            batch << ns
+
+            break if batch.count >= max_batch_size
+          end
+        end
+      end
+
+      def fetch_available_nodes
+        ::Search::Zoekt::Node.with_positive_unclaimed_storage_bytes
+      end
+    end
+  end
+end
diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml
index ef5da1c2e788c3a2..da58423ecefd4491 100644
--- a/ee/app/workers/all_queues.yml
+++ b/ee/app/workers/all_queues.yml
@@ -743,6 +743,16 @@
   :idempotent: true
   :tags: []
   :queue_namespace: :cronjob
+- :name: cronjob:search_zoekt_rollout
+  :worker_name: Search::Zoekt::RolloutWorker
+  :feature_category: :global_search
+  :has_external_dependencies: false
+  :urgency: :low
+  :resource_boundary: :unknown
+  :weight: 1
+  :idempotent: true
+  :tags: []
+  :queue_namespace: :cronjob
 - :name: cronjob:search_zoekt_scheduling
   :worker_name: Search::Zoekt::SchedulingWorker
   :feature_category: :global_search
diff --git a/ee/app/workers/search/zoekt/rollout_worker.rb b/ee/app/workers/search/zoekt/rollout_worker.rb
new file mode 100644
index 0000000000000000..fcdc3c2754a27830
--- /dev/null
+++ b/ee/app/workers/search/zoekt/rollout_worker.rb
@@ -0,0 +1,57 @@
+# frozen_string_literal: true
+
+module Search
+  module Zoekt
+    class RolloutWorker
+      include ApplicationWorker
+      include Search::Worker
+      include CronjobQueue # rubocop:disable Scalability/CronWorkerContext -- there is no relevant metadata
+      prepend ::Geo::SkipSecondary
+
+      deduplicate :until_executed
+      data_consistency :sticky
+      idempotent!
+      urgency :low
+
+      defer_on_database_health_signal :gitlab_main,
+        [:zoekt_nodes, :zoekt_enabled_namespaces, :zoekt_replicas, :zoekt_indices, :zoekt_repositories, :zoekt_tasks],
+        10.minutes
+
+      MAX_RETRIES = 5
+      INITIAL_BACKOFF = 5.minutes
+
+      def perform(retry_count = 0)
+        return false if Gitlab::CurrentSettings.zoekt_indexing_paused?
+        return false unless Search::Zoekt.licensed_and_indexing_enabled?
+        return false unless Feature.enabled?(:zoekt_rollout_worker, Feature.current_request)
+
+        result = ::Search::Zoekt::RolloutService.execute
+
+        if result.success?
+          log_info message: "RolloutWorker succeeded: #{result.message}"
+          self.class.perform_async # Immediately schedule another job
+        else
+          log_info message: "RolloutWorker did not do any work: #{result.message}"
+
+          if retry_count < MAX_RETRIES
+            backoff_time = INITIAL_BACKOFF * (2**retry_count)
+
+            self.class.perform_at(backoff_time.from_now, retry_count + 1)
+          else
+            log_info message: "RolloutWorker exceeded max back off interval. Last message: #{result.message}"
+          end
+        end
+      end
+
+      private
+
+      def logger
+        @logger ||= ::Search::Zoekt::Logger.build
+      end
+
+      def log_info(**payload)
+        logger.info(build_structured_payload(**payload))
+      end
+    end
+  end
+end
diff --git a/ee/config/feature_flags/ops/zoekt_rollout_worker.yml b/ee/config/feature_flags/ops/zoekt_rollout_worker.yml
new file mode 100644
index 0000000000000000..ca3775f2083993f6
--- /dev/null
+++ b/ee/config/feature_flags/ops/zoekt_rollout_worker.yml
@@ -0,0 +1,9 @@
+---
+name: zoekt_rollout_worker
+feature_issue_url:
+introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/175666
+rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/519660
+milestone: '17.9'
+group: group::global search
+type: ops
+default_enabled: false
diff --git a/ee/spec/models/search/zoekt/node_spec.rb b/ee/spec/models/search/zoekt/node_spec.rb
index fbffc1501bbf030f..60da4981c1c4cd42 100644
--- a/ee/spec/models/search/zoekt/node_spec.rb
+++ b/ee/spec/models/search/zoekt/node_spec.rb
@@ -150,7 +150,7 @@
         positive_nodes = described_class.with_positive_unclaimed_storage_bytes
 
         expect(positive_nodes).to include(node_with_positive_storage)
-        expect(positive_nodes).to include(node_with_zero_storage)
+        expect(positive_nodes).not_to include(node_with_zero_storage)
         expect(positive_nodes).not_to include(node_with_negative_storage)
       end
 
@@ -158,7 +158,7 @@
         result = described_class.with_positive_unclaimed_storage_bytes.find(node_with_positive_storage.id)
 
         expect(result).to respond_to(:unclaimed_storage_bytes)
-        expect(result.unclaimed_storage_bytes).to be >= 0
+        expect(result.unclaimed_storage_bytes).to be > 0
       end
 
       it 'calculates unclaimed_storage_bytes correctly' do
diff --git a/ee/spec/services/search/zoekt/planning_service_spec.rb b/ee/spec/services/search/zoekt/planning_service_spec.rb
new file mode 100644
index 0000000000000000..3441f7dc7de8f78e
--- /dev/null
+++ b/ee/spec/services/search/zoekt/planning_service_spec.rb
@@ -0,0 +1,148 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Search::Zoekt::PlanningService, feature_category: :global_search do
+  let_it_be(:group1) { create(:group) }
+  let_it_be(:enabled_namespace1) { create(:zoekt_enabled_namespace, namespace: group1) }
+  let_it_be(:group2) { create(:group) }
+  let_it_be(:enabled_namespace2) { create(:zoekt_enabled_namespace, namespace: group2) }
+  let_it_be_with_reload(:nodes) { create_list(:zoekt_node, 5, total_bytes: 100.gigabytes, used_bytes: 90.gigabytes) }
+  let_it_be(:projects_namespace1) do
+    [
+      create(:project, namespace: group1, statistics: create(:project_statistics, repository_size: 1.gigabyte)),
+      create(:project, namespace: group1, statistics: create(:project_statistics, repository_size: 2.gigabytes))
+    ]
+  end
+
+  let_it_be(:projects_namespace2) do
+    [create(:project, namespace: group2, statistics: create(:project_statistics, repository_size: 2.gigabytes))]
+  end
+
+  let(:max_indices_per_replica) { 5 }
+
+  describe '.plan' do
+    subject(:plan) do
+      described_class.plan(
+        enabled_namespaces: [enabled_namespace1, enabled_namespace2],
+        nodes: nodes,
+        num_replicas: num_replicas,
+        buffer_factor: buffer_factor,
+        max_indices_per_replica: max_indices_per_replica
+      )
+    end
+
+    let(:num_replicas) { 2 }
+    let(:buffer_factor) { 1.5 }
+
+    it 'returns total required storage bytes across all namespaces' do
+      total_storage = (projects_namespace1 + projects_namespace2).sum { |p| p.statistics.repository_size }
+      buffered_storage = total_storage * buffer_factor * num_replicas
+      expect(plan[:total_required_storage_bytes]).to eq(buffered_storage)
+    end
+
+    it 'returns plans for each enabled namespace' do
+      expect(plan[:namespaces].size).to eq(2)
+      expect(plan[:namespaces].pluck(:enabled_namespace_id))
+        .to contain_exactly(enabled_namespace1.id, enabled_namespace2.id)
+    end
+
+    it 'calculates the namespace-specific required storage bytes' do
+      namespace1_storage = projects_namespace1.sum { |p| p.statistics.repository_size * buffer_factor }
+      namespace2_storage = projects_namespace2.sum { |p| p.statistics.repository_size * buffer_factor }
+
+      expect(plan[:namespaces][0][:namespace_required_storage_bytes]).to eq(namespace1_storage * num_replicas)
+      expect(plan[:namespaces][1][:namespace_required_storage_bytes]).to eq(namespace2_storage * num_replicas)
+    end
+
+    it 'assigns projects to indices for each namespace without reusing nodes' do
+      namespace1_used_nodes = []
+      plan[:namespaces][0][:replicas].each do |replica|
+        replica[:indices].each do |index|
+          expect(namespace1_used_nodes).not_to include(index[:node_id])
+          namespace1_used_nodes << index[:node_id]
+        end
+      end
+    end
+
+    context 'when max indices per replica is reached' do
+      let(:max_indices_per_replica) { 1 }
+
+      it 'logs an error for the namespace which can not be fit into 1 index' do
+        plan[:failures].each do |namespace_plan|
+          expect(namespace_plan[:errors]).to include(a_hash_including(type: :index_limit_exceeded))
+        end
+      end
+    end
+
+    context 'when a namespace has to be spread across multiple indices' do
+      let(:buffer_factor) { 2.5 }
+      let(:num_replicas) { 1 }
+
+      before do
+        nodes.map { |node| node.update!(total_bytes: 10.gigabytes, used_bytes: 3.gigabytes) }
+      end
+
+      it 'creates multiple indices for a namespace' do
+        namespace1_plan = plan[:namespaces].find { |n| n[:enabled_namespace_id] == enabled_namespace1.id }
+        indices_plan = namespace1_plan[:replicas].flat_map { |replica| replica[:indices] }
+
+        expect(indices_plan.size).to eq(2)
+        expect(indices_plan.pluck(:node_id).uniq.size).to eq(2)
+        projects = indices_plan.first[:projects]
+        p_ns = ::Namespace.by_root_id(group1.id).project_namespaces.order(:id)
+        expect(projects).to eq({ project_namespace_id_from: p_ns[0].id, project_namespace_id_to: p_ns[0].id })
+        projects = indices_plan.last[:projects]
+        expect(projects).to eq({ project_namespace_id_from: p_ns[1].id, project_namespace_id_to: nil })
+
+        namespace2_plan = plan[:namespaces].find { |n| n[:enabled_namespace_id] == enabled_namespace2.id }
+        indices_plan = namespace2_plan[:replicas].flat_map { |replica| replica[:indices] }
+        expect(indices_plan.size).to eq(1)
+        projects = indices_plan.first[:projects]
+        p_ns = ::Namespace.by_root_id(group2.id).project_namespaces.order(:id)
+        expect(projects).to eq({ project_namespace_id_from: p_ns[0].id, project_namespace_id_to: nil })
+      end
+    end
+  end
+
+  context 'when there are more projects than the batch size' do
+    let(:batch_size) { 2 }
+    let(:num_replicas) { 2 }
+    let(:buffer_factor) { 1.5 }
+
+    before do
+      # Create more projects than the batch size
+      (1..6).each do |i|
+        create(:project, namespace: group1, statistics: create(:project_statistics, repository_size: i.megabytes))
+      end
+    end
+
+    it 'processes all projects in batches without skipping any' do
+      # Run the planning service with a specific batch size
+      result = described_class.plan(
+        enabled_namespaces: [enabled_namespace1],
+        nodes: nodes,
+        num_replicas: num_replicas,
+        buffer_factor: buffer_factor
+      )
+
+      # Total storage should account for all projects
+      total_storage = group1.projects.sum do |p|
+        p.statistics.repository_size
+      end
+
+      buffered_storage = total_storage * buffer_factor * num_replicas
+
+      expect(result[:total_required_storage_bytes]).to eq(buffered_storage)
+
+      # Ensure all projects are assigned
+      assigned_projects = result[:namespaces][0][:replicas].flat_map { |r| r[:indices].flat_map { |i| i[:projects] } }
+      lower, upper = assigned_projects.pluck(:project_namespace_id_from, :project_namespace_id_to).flatten.uniq
+      id_range = upper.blank? ? lower.. : lower..upper
+
+      project_ids = group1.projects.by_project_namespace(id_range).pluck(:id)
+
+      expect(project_ids).to match_array(group1.projects.pluck(:id))
+    end
+  end
+end
diff --git a/ee/spec/services/search/zoekt/provisioning_service_spec.rb b/ee/spec/services/search/zoekt/provisioning_service_spec.rb
new file mode 100644
index 0000000000000000..37f25f69bd199e6a
--- /dev/null
+++ b/ee/spec/services/search/zoekt/provisioning_service_spec.rb
@@ -0,0 +1,278 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Search::Zoekt::ProvisioningService, feature_category: :global_search do
+  let_it_be(:namespace) { create(:group) }
+  let_it_be(:enabled_namespace) { create(:zoekt_enabled_namespace, namespace: namespace) }
+  let_it_be(:namespace2) { create(:group) }
+  let_it_be(:enabled_namespace2) { create(:zoekt_enabled_namespace, namespace: namespace2) }
+  let_it_be(:nodes) { create_list(:zoekt_node, 5, total_bytes: 100.gigabytes, used_bytes: 90.gigabytes) }
+
+  let(:plan) do
+    {
+      namespaces: [
+        {
+          namespace_id: namespace.id,
+          enabled_namespace_id: enabled_namespace.id,
+          replicas: [
+            {
+              indices: [
+                {
+                  node_id: nodes.first.id,
+                  required_storage_bytes: 3.gigabytes,
+                  max_storage_bytes: 90.gigabytes,
+                  projects: { project_namespace_id_from: 1, project_namespace_id_to: 5 }
+                },
+                {
+                  node_id: nodes.second.id,
+                  required_storage_bytes: 2.gigabytes,
+                  max_storage_bytes: 80.gigabytes,
+                  projects: { project_namespace_id_from: 6, project_namespace_id_to: nil }
+                }
+              ]
+            },
+            {
+              indices: [
+                {
+                  node_id: nodes.third.id,
+                  required_storage_bytes: 3.gigabytes,
+                  max_storage_bytes: 90.gigabytes,
+                  projects: { project_namespace_id_from: 1, project_namespace_id_to: 5 }
+                },
+                {
+                  node_id: nodes.fourth.id,
+                  required_storage_bytes: 2.gigabytes,
+                  max_storage_bytes: 80.gigabytes,
+                  projects: { project_namespace_id_from: 6, project_namespace_id_to: nil }
+                }
+              ]
+            }
+          ],
+          errors: [],
+          namespace_required_storage_bytes: 10.gigabytes
+        },
+        {
+          namespace_id: namespace2.id,
+          enabled_namespace_id: enabled_namespace2.id,
+          replicas: [
+            {
+              indices: [
+                {
+                  node_id: nodes.first.id,
+                  required_storage_bytes: 2.gigabytes,
+                  max_storage_bytes: 90.gigabytes,
+                  projects: { project_namespace_id_from: 1, project_namespace_id_to: 3 }
+                },
+                {
+                  node_id: nodes.second.id,
+                  required_storage_bytes: 1.gigabyte,
+                  max_storage_bytes: 80.gigabytes,
+                  projects: { project_namespace_id_from: 4, project_namespace_id_to: nil }
+                }
+              ]
+            },
+            {
+              indices: [
+                {
+                  node_id: nodes.third.id,
+                  required_storage_bytes: 2.gigabytes,
+                  max_storage_bytes: 90.gigabytes,
+                  projects: { project_namespace_id_from: 1, project_namespace_id_to: 3 }
+                },
+                {
+                  node_id: nodes.fourth.id,
+                  required_storage_bytes: 1.gigabyte,
+                  max_storage_bytes: 80.gigabytes,
+                  projects: { project_namespace_id_from: 4, project_namespace_id_to: nil }
+                }
+              ]
+            }
+          ],
+          errors: [],
+          namespace_required_storage_bytes: 6.gigabytes
+        }
+      ],
+      total_required_storage_bytes: 16.gigabytes,
+      failures: []
+    }
+  end
+
+  subject(:provisioning_result) { described_class.execute(plan) }
+
+  describe '.provision' do
+    context 'when the plan is valid' do
+      it 'provisions all replicas and indices' do
+        result = provisioning_result
+        # Ensure there are no errors
+        expect(result[:errors]).to be_empty
+        expect(enabled_namespace.replicas.count).to eq(2)
+        expect(enabled_namespace.indices.count).to eq(4)
+        expect(enabled_namespace2.replicas.count).to eq(2)
+        expect(enabled_namespace2.indices.count).to eq(4)
+
+        metadata = enabled_namespace.replicas.first.indices.find_by_zoekt_node_id(nodes.first).metadata
+        expect(metadata).to eq({ 'project_namespace_id_to' => 5, 'project_namespace_id_from' => 1 })
+        metadata2 = enabled_namespace.replicas.first.indices.find_by_zoekt_node_id(nodes.second).metadata
+        expect(metadata2).to eq({ 'project_namespace_id_from' => 6 })
+        metadata3 = enabled_namespace.replicas.second.indices.find_by_zoekt_node_id(nodes.third).metadata
+        expect(metadata3).to eq({ 'project_namespace_id_to' => 5, 'project_namespace_id_from' => 1 })
+        metadata4 = enabled_namespace.replicas.second.indices.find_by_zoekt_node_id(nodes.fourth).metadata
+        expect(metadata4).to eq({ 'project_namespace_id_from' => 6 })
+
+        metadata5 = enabled_namespace2.replicas.first.indices.find_by_zoekt_node_id(nodes.first).metadata
+        expect(metadata5).to eq({ 'project_namespace_id_to' => 3, 'project_namespace_id_from' => 1 })
+        metadata6 = enabled_namespace2.replicas.first.indices.find_by_zoekt_node_id(nodes.second).metadata
+        expect(metadata6).to eq({ 'project_namespace_id_from' => 4 })
+        metadata7 = enabled_namespace2.replicas.second.indices.find_by_zoekt_node_id(nodes.third).metadata
+        expect(metadata7).to eq({ 'project_namespace_id_to' => 3, 'project_namespace_id_from' => 1 })
+        metadata8 = enabled_namespace2.replicas.second.indices.find_by_zoekt_node_id(nodes.fourth).metadata
+        expect(metadata8).to eq({ 'project_namespace_id_from' => 4 })
+      end
+    end
+
+    context 'when there is not enough space in node' do
+      before do
+        nodes.second.update!(used_bytes: 99.gigabytes) # Simulate node being near full
+      end
+
+      it 'logs an error and does not provision indices on that node' do
+        result = provisioning_result
+
+        expect(result[:errors]).to include(a_hash_including(message: :node_capacity_exceeded))
+      end
+    end
+
+    context 'when there is an error initializing a replica' do
+      it 'logs an error and does not creates anything' do
+        allow(::Search::Zoekt::Replica).to receive(:new).and_raise(StandardError, 'Replica initialization failed')
+
+        result = provisioning_result
+
+        expect(result[:errors]).to include(a_hash_including(details: 'Replica initialization failed'))
+        expect(Search::Zoekt::Replica.count).to be_zero
+        expect(Search::Zoekt::Index.count).to be_zero
+      end
+    end
+
+    context 'when there is an error initializing an index' do
+      it 'logs an error and does not creates anything' do
+        allow(::Search::Zoekt::Index).to receive(:new).once.and_raise(StandardError, 'Index initialization failed')
+
+        result = provisioning_result
+
+        expect(result[:errors]).to include(a_hash_including(details: 'Index initialization failed'))
+        expect(Search::Zoekt::Replica.count).to be_zero
+        expect(Search::Zoekt::Index.count).to be_zero
+      end
+    end
+
+    context 'when a namespace has errors in its plan' do
+      let(:plan) do
+        {
+          namespaces: [
+            {
+              namespace_id: namespace.id,
+              enabled_namespace_id: enabled_namespace.id,
+              replicas: [
+                {
+                  indices: [
+                    {
+                      node_id: nodes.first.id,
+                      required_storage_bytes: 3.gigabytes,
+                      max_storage_bytes: 90.gigabytes,
+                      projects: { project_namespace_id_from: 1, project_namespace_id_to: 5 }
+                    },
+                    {
+                      node_id: nodes.second.id,
+                      required_storage_bytes: 2.gigabytes,
+                      max_storage_bytes: 80.gigabytes,
+                      projects: { project_namespace_id_from: 6, project_namespace_id_to: nil }
+                    }
+                  ]
+                },
+                {
+                  indices: [
+                    {
+                      node_id: nodes.third.id,
+                      required_storage_bytes: 3.gigabytes,
+                      max_storage_bytes: 90.gigabytes,
+                      projects: { project_namespace_id_from: 1, project_namespace_id_to: 5 }
+                    },
+                    {
+                      node_id: nodes.fourth.id,
+                      required_storage_bytes: 2.gigabytes,
+                      max_storage_bytes: 80.gigabytes,
+                      projects: { project_namespace_id_from: 6, project_namespace_id_to: nil }
+                    }
+                  ]
+                }
+              ],
+              errors: [{ namespace_id: namespace.id, replica_idx: nil, type: :error_type, details: 'Detail' }],
+              namespace_required_storage_bytes: 10.gigabytes
+            },
+            {
+              namespace_id: namespace2.id,
+              enabled_namespace_id: enabled_namespace2.id,
+              replicas: [
+                {
+                  indices: [
+                    {
+                      node_id: nodes.first.id,
+                      required_storage_bytes: 2.gigabytes,
+                      max_storage_bytes: 90.gigabytes,
+                      projects: { project_namespace_id_from: 1, project_namespace_id_to: 3 }
+                    },
+                    {
+                      node_id: nodes.second.id,
+                      required_storage_bytes: 1.gigabyte,
+                      max_storage_bytes: 80.gigabytes,
+                      projects: { project_namespace_id_from: 4, project_namespace_id_to: nil }
+                    }
+                  ]
+                },
+                {
+                  indices: [
+                    {
+                      node_id: nodes.third.id,
+                      required_storage_bytes: 2.gigabytes,
+                      max_storage_bytes: 90.gigabytes,
+                      projects: { project_namespace_id_from: 1, project_namespace_id_to: 3 }
+                    },
+                    {
+                      node_id: nodes.fourth.id,
+                      required_storage_bytes: 1.gigabyte,
+                      max_storage_bytes: 80.gigabytes,
+                      projects: { project_namespace_id_from: 4, project_namespace_id_to: nil }
+                    }
+                  ]
+                }
+              ],
+              errors: [],
+              namespace_required_storage_bytes: 6.gigabytes
+            }
+          ],
+          total_required_storage_bytes: 8.gigabytes
+        }
+      end
+
+      it 'skips that namespace and continues with the rest' do
+        result = provisioning_result
+        # Ensure there are no errors
+        expect(result[:errors]).to be_empty
+        expect(enabled_namespace.replicas).to be_empty
+        expect(enabled_namespace.indices).to be_empty
+        expect(enabled_namespace2.replicas.count).to eq(2)
+        expect(enabled_namespace2.indices.count).to eq(4)
+        metadata = enabled_namespace2.replicas.first.indices.find_by_zoekt_node_id(nodes.first).metadata
+        expect(metadata).to eq({ 'project_namespace_id_to' => 3, 'project_namespace_id_from' => 1 })
+        metadata2 = enabled_namespace2.replicas.first.indices.find_by_zoekt_node_id(nodes.second).metadata
+        expect(metadata2).to eq({ 'project_namespace_id_from' => 4 })
+        metadata3 = enabled_namespace2.replicas.second.indices.find_by_zoekt_node_id(nodes.third).metadata
+        expect(metadata3).to eq({ 'project_namespace_id_to' => 3, 'project_namespace_id_from' => 1 })
+        metadata4 = enabled_namespace2.replicas.second.indices.find_by_zoekt_node_id(nodes.fourth).metadata
+        expect(metadata4).to eq({ 'project_namespace_id_from' => 4 })
+      end
+    end
+  end
+end
diff --git a/ee/spec/services/search/zoekt/rollout_service_spec.rb b/ee/spec/services/search/zoekt/rollout_service_spec.rb
new file mode 100644
index 0000000000000000..ca4e7ab97edf7b13
--- /dev/null
+++ b/ee/spec/services/search/zoekt/rollout_service_spec.rb
@@ -0,0 +1,151 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe ::Search::Zoekt::RolloutService, feature_category: :global_search do
+  let(:logger) { instance_double(Logger, info: nil) }
+  let(:resource_pool) do
+    instance_double(::Search::Zoekt::SelectionService::ResourcePool,
+      enabled_namespaces: enabled_namespaces,
+      nodes: nodes)
+  end
+
+  let(:plan) { instance_double(::Search::Zoekt::PlanningService::Plan, to_json: '{"plan": "data"}') }
+  let(:default_options) do
+    {
+      num_replicas: 1,
+      max_indices_per_replica: 5,
+      dry_run: true,
+      batch_size: 128,
+      logger: logger
+    }
+  end
+
+  let(:selection_service) do
+    ::Search::Zoekt::SelectionService
+  end
+
+  let(:planning_service) do
+    ::Search::Zoekt::PlanningService
+  end
+
+  let(:provisioning_service) do
+    ::Search::Zoekt::ProvisioningService
+  end
+
+  let(:enabled_namespaces) { ['namespace1'] }
+  let(:nodes) { ['node1'] }
+
+  subject(:service) { described_class.new(**options) }
+
+  describe '#execute' do
+    subject(:result) { service.execute }
+
+    let(:options) { {} }
+
+    before do
+      allow(selection_service).to receive(:execute)
+        .with(max_batch_size: default_options[:batch_size])
+        .and_return(resource_pool)
+    end
+
+    context 'when no enabled namespaces are found' do
+      let(:enabled_namespaces) { [] }
+
+      it 'returns a failed result with appropriate message' do
+        expect(result.success?).to be false
+        expect(result.message).to eq("No enabled namespaces found")
+      end
+    end
+
+    context 'when no available nodes are found' do
+      let(:nodes) { [] }
+
+      it 'returns a failed result with appropriate message' do
+        expect(result.success?).to be false
+        expect(result.message).to eq("No available nodes found")
+      end
+    end
+
+    context 'when dry_run is true' do
+      let(:options) { { dry_run: true, logger: logger } }
+
+      before do
+        allow(planning_service).to receive(:plan)
+          .with(
+            enabled_namespaces: enabled_namespaces,
+            nodes: nodes,
+            num_replicas: default_options[:num_replicas],
+            max_indices_per_replica: default_options[:max_indices_per_replica]
+          )
+          .and_return(plan)
+      end
+
+      it 'returns a successful result indicating a skipped execution' do
+        expect(result.success?).to be true
+        expect(result.message).to eq("Skipping execution of changes because of dry run")
+      end
+    end
+
+    context 'when dry_run is false and provisioning returns errors' do
+      let(:options) { { dry_run: false, logger: logger } }
+      let(:changes) { { errors: 'some error occurred' } }
+
+      before do
+        allow(planning_service).to receive(:plan)
+          .with(
+            enabled_namespaces: enabled_namespaces,
+            nodes: nodes,
+            num_replicas: default_options[:num_replicas],
+            max_indices_per_replica: default_options[:max_indices_per_replica]
+          )
+          .and_return(plan)
+
+        allow(provisioning_service).to receive(:execute)
+          .with(plan)
+          .and_return(changes)
+      end
+
+      it 'returns a failed result with the provisioning error message' do
+        expect(result.success?).to be false
+        expect(result.message).to eq("Change had an error: some error occurred")
+      end
+    end
+
+    context 'when dry_run is false and provisioning succeeds' do
+      let(:options) { { dry_run: false, logger: logger } }
+      let(:changes) { { errors: nil } }
+
+      before do
+        allow(planning_service).to receive(:plan)
+          .with(
+            enabled_namespaces: enabled_namespaces,
+            nodes: nodes,
+            num_replicas: default_options[:num_replicas],
+            max_indices_per_replica: default_options[:max_indices_per_replica]
+          )
+          .and_return(plan)
+
+        allow(provisioning_service).to receive(:execute)
+          .with(plan)
+          .and_return(changes)
+      end
+
+      it 'returns a successful result indicating completion' do
+        expect(result.success?).to be true
+        expect(result.message).to eq("Rollout execution completed successfully")
+      end
+    end
+  end
+
+  describe '.execute' do
+    let(:options) { { dry_run: true, logger: logger } }
+
+    it 'delegates to an instance of RolloutService' do
+      instance = instance_double(described_class)
+      expect(described_class).to receive(:new).with(**options).and_return(instance)
+      expect(instance).to receive(:execute)
+      described_class.execute(**options)
+    end
+  end
+end
diff --git a/ee/spec/services/search/zoekt/selection_service_spec.rb b/ee/spec/services/search/zoekt/selection_service_spec.rb
new file mode 100644
index 0000000000000000..2eee2e4e83ade7dd
--- /dev/null
+++ b/ee/spec/services/search/zoekt/selection_service_spec.rb
@@ -0,0 +1,109 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Search::Zoekt::SelectionService, feature_category: :global_search do
+  describe '.execute' do
+    subject(:resource_pool) { described_class.execute }
+
+    let_it_be(:ns_1) { create(:group) }
+    let_it_be(:ns_2) { create(:group) }
+
+    context 'with basic resource pool structure' do
+      it 'returns a resource pool responding to :enabled_namespaces and :nodes' do
+        expect(resource_pool).to respond_to(:enabled_namespaces)
+        expect(resource_pool).to respond_to(:nodes)
+      end
+    end
+
+    context 'with enabled namespaces selection' do
+      let_it_be(:eligible_namespace) { create(:zoekt_enabled_namespace, namespace: ns_1) }
+      let_it_be(:ineligible_namespace) { create(:zoekt_enabled_namespace, namespace: ns_2) }
+
+      before do
+        # For the eligible namespace, the project count will be low (default).
+        # For the ineligible namespace, stub its associated namespace so that
+        # project_namespaces.count returns 25_000 (above the 20,000 threshold).
+        allow(::Namespace).to receive(:by_root_id)
+          .with(eligible_namespace.root_namespace_id)
+          .and_return(::Namespace.where(id: eligible_namespace.root_namespace_id))
+
+        allow(::Namespace).to receive(:by_root_id)
+          .with(ineligible_namespace.root_namespace_id)
+          .and_return(::Namespace.where(id: ineligible_namespace.root_namespace_id))
+
+        allow(ineligible_namespace.namespace)
+          .to receive_message_chain(:project_namespaces, :count)
+          .and_return(25_000)
+
+        allow(::Namespace).to receive(:by_root_id)
+          .with(ineligible_namespace.root_namespace_id)
+          .and_return(ineligible_namespace.namespace.root_ancestor)
+      end
+
+      it 'includes only namespaces with a project count within the limit' do
+        expect(resource_pool.enabled_namespaces).to include(eligible_namespace)
+        expect(resource_pool.enabled_namespaces).not_to include(ineligible_namespace)
+      end
+    end
+
+    context 'with max batch size enforcement' do
+      let(:max_batch_size) { 2 }
+
+      subject(:resource_pool) { described_class.new(max_batch_size: max_batch_size).execute }
+
+      before do
+        # Create more eligible namespaces than the max batch size.
+        create_list(:zoekt_enabled_namespace, 3)
+      end
+
+      it 'limits the number of selected namespaces to the max batch size' do
+        expect(resource_pool.enabled_namespaces.size).to eq(max_batch_size)
+      end
+    end
+
+    context 'with available nodes selection' do
+      let_it_be(:eligible_node) { create(:zoekt_node, total_bytes: 100.gigabytes, used_bytes: 50.gigabytes) }
+      # Node with no unclaimed storage.
+      let_it_be(:ineligible_node) do
+        create(:zoekt_node, total_bytes: 100.gigabytes, used_bytes: 100.gigabytes)
+      end
+
+      it 'returns only nodes with positive unclaimed storage bytes' do
+        expect(resource_pool.nodes).to include(eligible_node)
+        expect(resource_pool.nodes).not_to include(ineligible_node)
+      end
+    end
+
+    context 'when no eligible namespaces exist' do
+      before do
+        # Create namespaces but stub each so that project_namespaces.count returns 30_000.
+        create_list(:zoekt_enabled_namespace, 2).each do |ns|
+          allow(ns.namespace.root_ancestor)
+            .to receive_message_chain(:project_namespaces, :count)
+            .and_return(30_000)
+          allow(::Namespace).to receive(:by_root_id)
+            .with(ns.root_namespace_id)
+            .and_return(ns.namespace.root_ancestor)
+        end
+      end
+
+      it 'returns an empty array for namespaces' do
+        expect(resource_pool.enabled_namespaces).to be_empty
+      end
+    end
+
+    context 'when no eligible nodes exist' do
+      before do
+        # Stub the node scope to return an empty array.
+        allow(::Search::Zoekt::Node)
+          .to receive(:with_positive_unclaimed_storage_bytes)
+          .and_return([])
+      end
+
+      it 'returns an empty array for nodes' do
+        expect(resource_pool.nodes).to eq([])
+      end
+    end
+  end
+end
diff --git a/spec/models/namespaces/project_namespace_spec.rb b/spec/models/namespaces/project_namespace_spec.rb
index 2620f287ae0d72d1..7ed858da9a7d720e 100644
--- a/spec/models/namespaces/project_namespace_spec.rb
+++ b/spec/models/namespaces/project_namespace_spec.rb
@@ -17,6 +17,34 @@
     end
   end
 
+  describe 'scopes' do
+    let_it_be(:namespace) { create(:namespace) }
+    let_it_be(:project) do
+      create(:project,
+        namespace: namespace,
+        statistics: build(
+          :project_statistics,
+          namespace: namespace,
+          repository_size: 101,
+          wiki_size: 505,
+          lfs_objects_size: 202,
+          build_artifacts_size: 303,
+          pipeline_artifacts_size: 707,
+          packages_size: 404,
+          snippets_size: 605,
+          uploads_size: 808
+        )
+      )
+    end
+
+    describe '.with_project_statistics' do
+      it 'includes the project statistics' do
+        project_namespaces = described_class.with_project_statistics
+        expect(project_namespaces.first.project.association(:statistics).loaded?).to be_truthy
+      end
+    end
+  end
+
   describe 'validations' do
     it { is_expected.not_to validate_presence_of :owner }
   end
-- 
GitLab


From 003a494cb903259e49bfef279ba50b4fbd02fa1b Mon Sep 17 00:00:00 2001
From: John Mason <jmason@gitlab.com>
Date: Thu, 20 Feb 2025 17:29:48 -0500
Subject: [PATCH 2/6] Update scope of project statistics

---
 app/models/namespace.rb                          | 1 +
 app/models/namespaces/project_namespace.rb       | 2 --
 ee/app/services/search/zoekt/planning_service.rb | 8 +-------
 3 files changed, 2 insertions(+), 9 deletions(-)

diff --git a/app/models/namespace.rb b/app/models/namespace.rb
index 48a58ee912424009..e31c54a664db0eed 100644
--- a/app/models/namespace.rb
+++ b/app/models/namespace.rb
@@ -230,6 +230,7 @@ class Namespace < ApplicationRecord
   scope :by_name, ->(name) { where('name LIKE ?', "#{sanitize_sql_like(name)}%") }
   scope :ordered_by_name, -> { order(:name) }
   scope :top_level, -> { by_parent(nil) }
+  scope :with_project_statistics, -> { includes(projects: :statistics) }
 
   scope :with_statistics, -> do
     namespace_statistic_columns = STATISTICS_COLUMNS.map { |column| sum_project_statistics_column(column) }
diff --git a/app/models/namespaces/project_namespace.rb b/app/models/namespaces/project_namespace.rb
index e1be5ccbe84b7542..d60800af67067349 100644
--- a/app/models/namespaces/project_namespace.rb
+++ b/app/models/namespaces/project_namespace.rb
@@ -4,8 +4,6 @@ module Namespaces
   class ProjectNamespace < Namespace
     self.allow_legacy_sti_class = true
 
-    scope :with_project_statistics, -> { includes(project: :statistics) }
-
     # These aliases are added to make it easier to sync parent/parent_id attribute with
     # project.namespace/project.namespace_id attribute.
     #
diff --git a/ee/app/services/search/zoekt/planning_service.rb b/ee/app/services/search/zoekt/planning_service.rb
index d556e41369bd4354..890431e13d159178 100644
--- a/ee/app/services/search/zoekt/planning_service.rb
+++ b/ee/app/services/search/zoekt/planning_service.rb
@@ -147,13 +147,7 @@ def create_empty_replica
         end
 
         def fetch_project_namespaces
-          scope = ::Namespace.by_root_id(namespace.id).project_namespaces
-
-          if scope.respond_to?(:with_project_statistics)
-            scope.with_project_statistics
-          else # TODO: remove this condition before merging
-            scope.includes(project: :statistics) # rubocop:disable CodeReuse/ActiveRecord -- will remove
-          end
+          ::Namespace.by_root_id(namespace.id).project_namespaces.with_project_statistics
         end
 
         def assign_to_node(stats, replica_indices)
-- 
GitLab


From f88300d312753871876737467a3c247b1a944f3a Mon Sep 17 00:00:00 2001
From: John Mason <jmason@gitlab.com>
Date: Fri, 21 Feb 2025 10:38:50 -0500
Subject: [PATCH 3/6] Add lock to rollout worker

---
 ee/app/workers/search/zoekt/rollout_worker.rb | 25 +++++++++++--------
 1 file changed, 14 insertions(+), 11 deletions(-)

diff --git a/ee/app/workers/search/zoekt/rollout_worker.rb b/ee/app/workers/search/zoekt/rollout_worker.rb
index fcdc3c2754a27830..d397bd461cb039d7 100644
--- a/ee/app/workers/search/zoekt/rollout_worker.rb
+++ b/ee/app/workers/search/zoekt/rollout_worker.rb
@@ -6,6 +6,7 @@ class RolloutWorker
       include ApplicationWorker
       include Search::Worker
       include CronjobQueue # rubocop:disable Scalability/CronWorkerContext -- there is no relevant metadata
+      include Gitlab::ExclusiveLeaseHelpers
       prepend ::Geo::SkipSecondary
 
       deduplicate :until_executed
@@ -25,20 +26,22 @@ def perform(retry_count = 0)
         return false unless Search::Zoekt.licensed_and_indexing_enabled?
         return false unless Feature.enabled?(:zoekt_rollout_worker, Feature.current_request)
 
-        result = ::Search::Zoekt::RolloutService.execute
+        in_lock(self.class.name.underscore, ttl: 10.minutes, retries: 10, sleep_sec: 1) do
+          result = ::Search::Zoekt::RolloutService.execute
 
-        if result.success?
-          log_info message: "RolloutWorker succeeded: #{result.message}"
-          self.class.perform_async # Immediately schedule another job
-        else
-          log_info message: "RolloutWorker did not do any work: #{result.message}"
+          if result.success?
+            log_info message: "RolloutWorker succeeded: #{result.message}"
+            self.class.perform_async # Immediately schedule another job
+          else
+            log_info message: "RolloutWorker did not do any work: #{result.message}"
 
-          if retry_count < MAX_RETRIES
-            backoff_time = INITIAL_BACKOFF * (2**retry_count)
+            if retry_count < MAX_RETRIES
+              backoff_time = INITIAL_BACKOFF * (2**retry_count)
 
-            self.class.perform_at(backoff_time.from_now, retry_count + 1)
-          else
-            log_info message: "RolloutWorker exceeded max back off interval. Last message: #{result.message}"
+              self.class.perform_at(backoff_time.from_now, retry_count + 1)
+            else
+              log_info message: "RolloutWorker exceeded max back off interval. Last message: #{result.message}"
+            end
           end
         end
       end
-- 
GitLab


From b9dbf3da5f09836943ad80bc6591b3fc0e68fa50 Mon Sep 17 00:00:00 2001
From: John Mason <jmason@gitlab.com>
Date: Fri, 21 Feb 2025 10:44:18 -0500
Subject: [PATCH 4/6] Add memoization to fetch project namespaces

---
 ee/app/services/search/zoekt/planning_service.rb | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/ee/app/services/search/zoekt/planning_service.rb b/ee/app/services/search/zoekt/planning_service.rb
index 890431e13d159178..2a03af229dddf0e0 100644
--- a/ee/app/services/search/zoekt/planning_service.rb
+++ b/ee/app/services/search/zoekt/planning_service.rb
@@ -76,6 +76,8 @@ def build_node_change_plan
       end
 
       class Plan
+        include Gitlab::Utils::StrongMemoize
+
         attr_reader :enabled_namespace, :namespace, :num_replicas, :buffer_factor,
           :max_indices_per_replica, :errors, :replica_plans, :nodes
 
@@ -149,6 +151,7 @@ def create_empty_replica
         def fetch_project_namespaces
           ::Namespace.by_root_id(namespace.id).project_namespaces.with_project_statistics
         end
+        strong_memoize_attr :fetch_project_namespaces
 
         def assign_to_node(stats, replica_indices)
           best_node = find_best_node(stats)
-- 
GitLab


From ea3e3c20b1823fe64d48e7cd844fe39f6fbddaed Mon Sep 17 00:00:00 2001
From: John Mason <jmason@gitlab.com>
Date: Fri, 21 Feb 2025 11:09:29 -0500
Subject: [PATCH 5/6] Optimize total required storage bytes calculation

---
 ee/app/services/search/zoekt/planning_service.rb | 15 ++-------------
 1 file changed, 2 insertions(+), 13 deletions(-)

diff --git a/ee/app/services/search/zoekt/planning_service.rb b/ee/app/services/search/zoekt/planning_service.rb
index 2a03af229dddf0e0..b0b74395d71967a1 100644
--- a/ee/app/services/search/zoekt/planning_service.rb
+++ b/ee/app/services/search/zoekt/planning_service.rb
@@ -25,21 +25,10 @@ def plan
           ).generate
         end
 
-        namespace_plans = []
-        failed_plans = []
-
-        all_plans.each do |p|
-          if p[:errors].present?
-            failed_plans << p
-          else
-            namespace_plans << p
-          end
-        end
-
-        successful_plans = all_plans.filter { |p| !p[:errors].present? }
+        failed_plans, successful_plans = all_plans.partition { |plan| plan[:errors].present? }
 
         {
-          total_required_storage_bytes: namespace_plans.sum { |plan| plan[:namespace_required_storage_bytes] },
+          total_required_storage_bytes: successful_plans.sum { |plan| plan[:namespace_required_storage_bytes] },
           namespaces: successful_plans,
           nodes: build_node_change_plan,
           failures: failed_plans
-- 
GitLab


From b3d3c37aa104d1e91148984229cfb8049fe988b5 Mon Sep 17 00:00:00 2001
From: John Mason <jmason@gitlab.com>
Date: Fri, 21 Feb 2025 12:06:03 -0500
Subject: [PATCH 6/6] Fix updated spec

---
 spec/models/namespace_spec.rb                 | 28 +++++++++++++++++++
 .../namespaces/project_namespace_spec.rb      | 28 -------------------
 2 files changed, 28 insertions(+), 28 deletions(-)

diff --git a/spec/models/namespace_spec.rb b/spec/models/namespace_spec.rb
index fbd89f83113e1640..dcd3ffcd5c96caa5 100644
--- a/spec/models/namespace_spec.rb
+++ b/spec/models/namespace_spec.rb
@@ -600,6 +600,34 @@
         end
       end
     end
+
+    describe '.with_project_statistics' do
+      let_it_be(:namespace) { create(:namespace) }
+      let_it_be(:project) do
+        create(:project,
+          namespace: namespace,
+          statistics: build(
+            :project_statistics,
+            namespace: namespace,
+            repository_size: 101,
+            wiki_size: 505,
+            lfs_objects_size: 202,
+            build_artifacts_size: 303,
+            pipeline_artifacts_size: 707,
+            packages_size: 404,
+            snippets_size: 605,
+            uploads_size: 808
+          )
+        )
+      end
+
+      it 'includes the project statistics' do
+        namespaces = described_class.with_project_statistics
+        expect(
+          namespaces.where(id: namespace.id).first.projects.first.association(:statistics).loaded?
+        ).to be_truthy
+      end
+    end
   end
 
   describe 'delegate' do
diff --git a/spec/models/namespaces/project_namespace_spec.rb b/spec/models/namespaces/project_namespace_spec.rb
index 7ed858da9a7d720e..2620f287ae0d72d1 100644
--- a/spec/models/namespaces/project_namespace_spec.rb
+++ b/spec/models/namespaces/project_namespace_spec.rb
@@ -17,34 +17,6 @@
     end
   end
 
-  describe 'scopes' do
-    let_it_be(:namespace) { create(:namespace) }
-    let_it_be(:project) do
-      create(:project,
-        namespace: namespace,
-        statistics: build(
-          :project_statistics,
-          namespace: namespace,
-          repository_size: 101,
-          wiki_size: 505,
-          lfs_objects_size: 202,
-          build_artifacts_size: 303,
-          pipeline_artifacts_size: 707,
-          packages_size: 404,
-          snippets_size: 605,
-          uploads_size: 808
-        )
-      )
-    end
-
-    describe '.with_project_statistics' do
-      it 'includes the project statistics' do
-        project_namespaces = described_class.with_project_statistics
-        expect(project_namespaces.first.project.association(:statistics).loaded?).to be_truthy
-      end
-    end
-  end
-
   describe 'validations' do
     it { is_expected.not_to validate_presence_of :owner }
   end
-- 
GitLab