From 3264d80fa912f4076e41e9005800d261a8b8c886 Mon Sep 17 00:00:00 2001 From: John Mason <jmason@gitlab.com> Date: Thu, 12 Dec 2024 09:26:25 -0500 Subject: [PATCH 1/6] Add zoekt provisioning service --- app/models/namespaces/project_namespace.rb | 2 + config/initializers/1_settings.rb | 3 + ee/app/models/search/zoekt/node.rb | 2 +- .../services/search/zoekt/planning_service.rb | 243 +++++++++++++++ .../search/zoekt/provisioning_service.rb | 118 ++++++++ .../services/search/zoekt/rollout_service.rb | 78 +++++ .../search/zoekt/selection_service.rb | 48 +++ ee/app/workers/all_queues.yml | 10 + ee/app/workers/search/zoekt/rollout_worker.rb | 57 ++++ .../ops/zoekt_rollout_worker.yml | 9 + ee/spec/models/search/zoekt/node_spec.rb | 4 +- .../search/zoekt/planning_service_spec.rb | 148 ++++++++++ .../search/zoekt/provisioning_service_spec.rb | 278 ++++++++++++++++++ .../search/zoekt/rollout_service_spec.rb | 151 ++++++++++ .../search/zoekt/selection_service_spec.rb | 109 +++++++ .../namespaces/project_namespace_spec.rb | 28 ++ 16 files changed, 1285 insertions(+), 3 deletions(-) create mode 100644 ee/app/services/search/zoekt/planning_service.rb create mode 100644 ee/app/services/search/zoekt/provisioning_service.rb create mode 100644 ee/app/services/search/zoekt/rollout_service.rb create mode 100644 ee/app/services/search/zoekt/selection_service.rb create mode 100644 ee/app/workers/search/zoekt/rollout_worker.rb create mode 100644 ee/config/feature_flags/ops/zoekt_rollout_worker.yml create mode 100644 ee/spec/services/search/zoekt/planning_service_spec.rb create mode 100644 ee/spec/services/search/zoekt/provisioning_service_spec.rb create mode 100644 ee/spec/services/search/zoekt/rollout_service_spec.rb create mode 100644 ee/spec/services/search/zoekt/selection_service_spec.rb diff --git a/app/models/namespaces/project_namespace.rb b/app/models/namespaces/project_namespace.rb index d60800af67067349..e1be5ccbe84b7542 100644 --- a/app/models/namespaces/project_namespace.rb +++ b/app/models/namespaces/project_namespace.rb @@ -4,6 +4,8 @@ module Namespaces class ProjectNamespace < Namespace self.allow_legacy_sti_class = true + scope :with_project_statistics, -> { includes(project: :statistics) } + # These aliases are added to make it easier to sync parent/parent_id attribute with # project.namespace/project.namespace_id attribute. # diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb index 14a37ce32223e7bb..8a02b72906abd3a7 100644 --- a/config/initializers/1_settings.rb +++ b/config/initializers/1_settings.rb @@ -842,6 +842,9 @@ Settings.cron_jobs['search_zoekt_scheduling_worker'] ||= {} Settings.cron_jobs['search_zoekt_scheduling_worker']['cron'] ||= '*/1 * * * *' Settings.cron_jobs['search_zoekt_scheduling_worker']['job_class'] ||= 'Search::Zoekt::SchedulingWorker' + Settings.cron_jobs['search_zoekt_rollout_worker'] ||= {} + Settings.cron_jobs['search_zoekt_rollout_worker']['cron'] ||= '*/10 * * * *' + Settings.cron_jobs['search_zoekt_rollout_worker']['job_class'] ||= 'Search::Zoekt::RolloutWorker' Settings.cron_jobs['search_elastic_metrics_update_cron_worker'] ||= {} Settings.cron_jobs['search_elastic_metrics_update_cron_worker']['cron'] ||= '*/1 * * * *' Settings.cron_jobs['search_elastic_metrics_update_cron_worker']['job_class'] ||= 'Search::Elastic::MetricsUpdateCronWorker' diff --git a/ee/app/models/search/zoekt/node.rb b/ee/app/models/search/zoekt/node.rb index 1e70d8209478ea31..22e8b8cb45c86435 100644 --- a/ee/app/models/search/zoekt/node.rb +++ b/ee/app/models/search/zoekt/node.rb @@ -53,7 +53,7 @@ class Node < ApplicationRecord sql = <<~SQL zoekt_nodes.*, #{UNCLAIMED_STORAGE_BYTES_FORMULA} AS unclaimed_storage_bytes SQL - left_joins(:indices).group(:id).having("#{UNCLAIMED_STORAGE_BYTES_FORMULA} >= 0").select(sql) + left_joins(:indices).group(:id).having("#{UNCLAIMED_STORAGE_BYTES_FORMULA} > 0").select(sql) end scope :order_by_unclaimed_space, -> do with_positive_unclaimed_storage_bytes.order('unclaimed_storage_bytes') diff --git a/ee/app/services/search/zoekt/planning_service.rb b/ee/app/services/search/zoekt/planning_service.rb new file mode 100644 index 0000000000000000..d556e41369bd4354 --- /dev/null +++ b/ee/app/services/search/zoekt/planning_service.rb @@ -0,0 +1,243 @@ +# frozen_string_literal: true + +module Search + module Zoekt + class PlanningService + attr_reader :enabled_namespaces, :nodes, :options + + def self.plan(**kwargs) + new(**kwargs).plan + end + + def initialize( + enabled_namespaces:, nodes:, **options) + @enabled_namespaces = Array(enabled_namespaces) + @nodes = format_nodes(nodes) + @options = options + end + + def plan + all_plans = enabled_namespaces.map do |enabled_namespace| + Plan.new( + enabled_namespace: enabled_namespace, + nodes: nodes, + **options + ).generate + end + + namespace_plans = [] + failed_plans = [] + + all_plans.each do |p| + if p[:errors].present? + failed_plans << p + else + namespace_plans << p + end + end + + successful_plans = all_plans.filter { |p| !p[:errors].present? } + + { + total_required_storage_bytes: namespace_plans.sum { |plan| plan[:namespace_required_storage_bytes] }, + namespaces: successful_plans, + nodes: build_node_change_plan, + failures: failed_plans + } + end + + private + + def format_nodes(zoekt_nodes) + Array(zoekt_nodes).map do |node| + { + id: node.id, + unclaimed_storage_bytes: node.unclaimed_storage_bytes, + unclaimed_storage_bytes_before: node.unclaimed_storage_bytes, + indices: [], + namespace_ids: [], + node: node + } + end + end + + # presentation only + def build_node_change_plan + nodes.filter { |hsh| hsh[:indices].present? }.map do |hsh| + { + id: hsh[:id], + unclaimed_storage_bytes_before: hsh[:unclaimed_storage_bytes_before], + unclaimed_storage_bytes_after: hsh[:unclaimed_storage_bytes], + claimed_storage_bytes: hsh[:indices].sum { |hsh| hsh[:required_storage_bytes] }, + namespace_ids: hsh[:namespace_ids].sort, + indices: hsh[:indices].sort_by { |hsh| hsh[:namespace_id] } + } + end + end + + class Plan + attr_reader :enabled_namespace, :namespace, :num_replicas, :buffer_factor, + :max_indices_per_replica, :errors, :replica_plans, :nodes + + def initialize( + enabled_namespace:, num_replicas:, nodes:, + buffer_factor: 3, max_indices_per_replica: 5) + @enabled_namespace = enabled_namespace + @namespace = enabled_namespace.namespace + @num_replicas = num_replicas + @buffer_factor = buffer_factor + @max_indices_per_replica = max_indices_per_replica + @errors = [] + @replica_plans = [] + @nodes = nodes + @used_node_ids = Set.new + end + + def generate + if fetch_project_namespaces.exists? + num_replicas.times { simulate_replica_plan } + else + create_empty_replica + end + + { + namespace_id: namespace.id, + enabled_namespace_id: enabled_namespace.id, + namespace_required_storage_bytes: calculate_namespace_storage, + replicas: replica_plans, + errors: errors.uniq + } + end + + private + + attr_reader :project_namespace_id + + def simulate_replica_plan + replica_indices = [] + + fetch_project_namespaces.find_each do |project_namespace| + project = project_namespace.project + stats = project.statistics + + if replica_indices.size >= max_indices_per_replica + log_error(replica_plans.size, :index_limit_exceeded, + "Replica reached maximum index limit (#{max_indices_per_replica})") + break + end + + @project_namespace_id = project_namespace.id + assign_to_node(stats, replica_indices) + end + + add_replica_plan(replica_indices) + end + + def create_empty_replica + candidate_nodes = nodes.reject { |node| @used_node_ids.include?(node[:id]) } + best_node = candidate_nodes.max_by { |node| node[:unclaimed_storage_bytes] } + + if best_node + replica_indices = [simulate_index(best_node)] + add_replica_plan(replica_indices) + @used_node_ids.add(best_node[:id]) + else + log_error(nil, :node_unavailable, "No nodes available to create an empty replica") + end + end + + def fetch_project_namespaces + scope = ::Namespace.by_root_id(namespace.id).project_namespaces + + if scope.respond_to?(:with_project_statistics) + scope.with_project_statistics + else # TODO: remove this condition before merging + scope.includes(project: :statistics) # rubocop:disable CodeReuse/ActiveRecord -- will remove + end + end + + def assign_to_node(stats, replica_indices) + best_node = find_best_node(stats) + + if best_node + assign_project_to_index(best_node, stats, replica_indices) + else + log_error(replica_plans.size, :node_unavailable, + "No node can accommodate project #{stats.project_id} with size #{scaled_size(stats)}") + end + end + + def find_best_node(stats) + nodes + .reject { |node| @used_node_ids.include?(node[:id]) } + .select { |node| node[:unclaimed_storage_bytes] && node[:unclaimed_storage_bytes] >= scaled_size(stats) } + .max_by { |node| node[:unclaimed_storage_bytes] } + end + + def assign_project_to_index(node, stats, replica_indices) + project_size = scaled_size(stats) + + index = replica_indices + .select { |idx| idx[:required_storage_bytes] + project_size <= idx[:max_storage_bytes] } + .min_by { |idx| idx[:max_storage_bytes] - (idx[:required_storage_bytes] + project_size) } + + unless index + if replica_indices.size >= max_indices_per_replica + log_error(nil, :index_limit_exceeded, "Max indices per replica reached") + return + end + + index = simulate_index(node) + replica_indices << index + + node[:indices] ||= [] + node[:indices] << index + node[:namespace_ids] << namespace.id unless node[:namespace_ids].include?(namespace.id) + @used_node_ids.add(node[:id]) + end + + add_project_to_index(index, stats) + node[:unclaimed_storage_bytes] -= project_size + end + + def simulate_index(node) + { + node_id: node[:id], + namespace_id: namespace.id, + projects: { project_namespace_id_from: nil, project_namespace_id_to: nil }, + required_storage_bytes: 0, + max_storage_bytes: node[:unclaimed_storage_bytes] + } + end + + def add_project_to_index(index, stats) + index[:projects][:project_namespace_id_from] ||= project_namespace_id + index[:projects][:project_namespace_id_to] = project_namespace_id + index[:required_storage_bytes] += scaled_size(stats) + end + + def add_replica_plan(replica_indices) + replica_indices.last[:projects][:project_namespace_id_to] = nil if replica_indices.any? + + replica_plans << { indices: replica_indices.map { |index| format_index(index) } } + end + + def format_index(index) + index.slice(:node_id, :projects, :required_storage_bytes, :max_storage_bytes) + end + + def calculate_namespace_storage + replica_plans.sum { |replica| replica[:indices].sum { |index| index[:required_storage_bytes] } } + end + + def scaled_size(stats) + stats.repository_size * buffer_factor + end + + def log_error(replica_idx, type, details) + @errors << { namespace_id: namespace.id, replica_idx: replica_idx, type: type, details: details } + end + end + end + end +end diff --git a/ee/app/services/search/zoekt/provisioning_service.rb b/ee/app/services/search/zoekt/provisioning_service.rb new file mode 100644 index 0000000000000000..a87ec5996c6d47c3 --- /dev/null +++ b/ee/app/services/search/zoekt/provisioning_service.rb @@ -0,0 +1,118 @@ +# frozen_string_literal: true + +module Search + module Zoekt + class ProvisioningService + def self.execute(plan) + new(plan).execute + end + + attr_reader :plan, :errors + + def initialize(plan) + @plan = plan + @errors = [] + end + + def execute + ApplicationRecord.transaction do + plan[:namespaces].each do |namespace_plan| + next if namespace_plan[:errors].present? + + process_namespace(namespace_plan) + end + end + { errors: @errors } + rescue StandardError => e + log_error(:transaction_failed, e.message, e.backtrace) + { errors: @errors } + end + + private + + def process_namespace(namespace_plan) + namespace_id = namespace_plan.fetch(:namespace_id) + enabled_namespace_id = namespace_plan.fetch(:enabled_namespace_id) + + # Remove any pre-existing replicas for this namespace since we are provisioning new ones. + enabled_namespace = Search::Zoekt::EnabledNamespace.for_root_namespace_id(namespace_id).first + if enabled_namespace.nil? + log_error(:missing_enabled_namespace, "Enabled namespace not found for namespace ID: #{namespace_id}") + return + end + + enabled_namespace.replicas.delete_all + + if Index.for_root_namespace_id(namespace_id).exists? + log_error(:index_already_exists, "Indices already exists for namespace ID: #{namespace_id}") + return + end + + namespace_plan[:replicas].each do |replica_plan| + process_replica( + namespace_id: namespace_id, + enabled_namespace_id: enabled_namespace_id, + replica_plan: replica_plan + ) + end + end + + def process_replica(namespace_id:, enabled_namespace_id:, replica_plan:) + replica = Replica.create!(namespace_id: namespace_id, zoekt_enabled_namespace_id: enabled_namespace_id) + + replica_plan[:indices].each do |index_plan| + process_index(replica, index_plan) + end + end + + def process_index(replica, index_plan) + node = Node.find(index_plan[:node_id]) + required_storage_bytes = index_plan[:required_storage_bytes] + + if required_storage_bytes > node.unclaimed_storage_bytes + log_error( + :node_capacity_exceeded, + "Node #{node.id} has #{node.unclaimed_storage_bytes} unclaimed storage bytes and " \ + "cannot fit #{required_storage_bytes} bytes." + ) + return + end + + required_storage_bytes = 1.kilobyte if required_storage_bytes == 0 + Index.create!( + replica: replica, + zoekt_enabled_namespace_id: replica.zoekt_enabled_namespace_id, + namespace_id: replica.namespace_id, + zoekt_node_id: node.id, + reserved_storage_bytes: required_storage_bytes, + + # Workaround: we remove nil project_namespace_id_to since it is not a valid property in json validator. + metadata: index_plan[:projects].compact + ) + + update_node_storage(node, required_storage_bytes) + end + + def update_node_storage(node, used_bytes) + node.update!(used_bytes: node.used_bytes + used_bytes) + end + + def log_error(message, details, trace = []) + err = { + class: self.class.name, + message: message, + details: details, + trace: trace.slice(0, 5) + } + + @errors << err + + logger.error(**err) + end + + def logger + @logger ||= Search::Zoekt::Logger.build + end + end + end +end diff --git a/ee/app/services/search/zoekt/rollout_service.rb b/ee/app/services/search/zoekt/rollout_service.rb new file mode 100644 index 0000000000000000..a489bd0d7ccaffae --- /dev/null +++ b/ee/app/services/search/zoekt/rollout_service.rb @@ -0,0 +1,78 @@ +# frozen_string_literal: true + +module Search + module Zoekt + class RolloutService + DEFAULT_OPTIONS = { + num_replicas: 1, + max_indices_per_replica: 5, + dry_run: true, + batch_size: 128 + }.freeze + + Result = Struct.new(:success?, :message) + + def self.execute(**kwargs) + new(**kwargs).execute + end + + attr_reader :num_replicas, :max_indices_per_replica, :batch_size, :dry_run + + def initialize(**kwargs) + options = DEFAULT_OPTIONS.merge(kwargs) + + @num_replicas = options.fetch(:num_replicas) + @max_indices_per_replica = options.fetch(:max_indices_per_replica) + @batch_size = options.fetch(:batch_size) + @dry_run = options.fetch(:dry_run) + + @logger = options[:logger] if options.has_key?(:logger) + end + + def execute + logger.info "Selecting resources" + resource_pool = ::Search::Zoekt::SelectionService.execute( + max_batch_size: batch_size + ) + + return failed_result("No enabled namespaces found", logger) if resource_pool.enabled_namespaces.empty? + + return failed_result("No available nodes found", logger) if resource_pool.nodes.empty? + + logger.info "Planning" + plan = ::Search::Zoekt::PlanningService.plan( + enabled_namespaces: resource_pool.enabled_namespaces, + nodes: resource_pool.nodes, + num_replicas: num_replicas, + max_indices_per_replica: max_indices_per_replica + ) + logger.info plan.to_json + + return successful_result("Skipping execution of changes because of dry run", logger) if dry_run + + logger.info "Executing changes" + changes = ::Search::Zoekt::ProvisioningService.execute(plan) + + return failed_result("Change had an error: #{changes[:errors]}", logger) if changes[:errors].present? + + successful_result("Rollout execution completed successfully", logger) + end + + private + + def logger + @logger ||= ::Search::Zoekt::Logger.build + end + + def failed_result(message, logger) + logger.info message + Result.new(false, message) + end + + def successful_result(message, logger) + logger.info message + Result.new(true, message) + end + end + end +end diff --git a/ee/app/services/search/zoekt/selection_service.rb b/ee/app/services/search/zoekt/selection_service.rb new file mode 100644 index 0000000000000000..bdaec9e7531274d1 --- /dev/null +++ b/ee/app/services/search/zoekt/selection_service.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +module Search + module Zoekt + class SelectionService + attr_reader :max_batch_size + + ResourcePool = Struct.new(:enabled_namespaces, :nodes) + + def self.execute(**kwargs) + new(**kwargs).execute + end + + def initialize(max_batch_size: 128) + @max_batch_size = max_batch_size + end + + def execute + namespaces = fetch_enabled_namespace_for_indexing + nodes = fetch_available_nodes + + ResourcePool.new(namespaces, nodes) + end + + private + + def log_info(str) + logger&.info(str) + end + + def fetch_enabled_namespace_for_indexing(project_count_limit: 20_000) + [].tap do |batch| + ::Search::Zoekt::EnabledNamespace.with_missing_indices.find_each do |ns| + next if ::Namespace.by_root_id(ns.root_namespace_id).project_namespaces.count > project_count_limit + + batch << ns + + break if batch.count >= max_batch_size + end + end + end + + def fetch_available_nodes + ::Search::Zoekt::Node.with_positive_unclaimed_storage_bytes + end + end + end +end diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml index ef5da1c2e788c3a2..da58423ecefd4491 100644 --- a/ee/app/workers/all_queues.yml +++ b/ee/app/workers/all_queues.yml @@ -743,6 +743,16 @@ :idempotent: true :tags: [] :queue_namespace: :cronjob +- :name: cronjob:search_zoekt_rollout + :worker_name: Search::Zoekt::RolloutWorker + :feature_category: :global_search + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] + :queue_namespace: :cronjob - :name: cronjob:search_zoekt_scheduling :worker_name: Search::Zoekt::SchedulingWorker :feature_category: :global_search diff --git a/ee/app/workers/search/zoekt/rollout_worker.rb b/ee/app/workers/search/zoekt/rollout_worker.rb new file mode 100644 index 0000000000000000..fcdc3c2754a27830 --- /dev/null +++ b/ee/app/workers/search/zoekt/rollout_worker.rb @@ -0,0 +1,57 @@ +# frozen_string_literal: true + +module Search + module Zoekt + class RolloutWorker + include ApplicationWorker + include Search::Worker + include CronjobQueue # rubocop:disable Scalability/CronWorkerContext -- there is no relevant metadata + prepend ::Geo::SkipSecondary + + deduplicate :until_executed + data_consistency :sticky + idempotent! + urgency :low + + defer_on_database_health_signal :gitlab_main, + [:zoekt_nodes, :zoekt_enabled_namespaces, :zoekt_replicas, :zoekt_indices, :zoekt_repositories, :zoekt_tasks], + 10.minutes + + MAX_RETRIES = 5 + INITIAL_BACKOFF = 5.minutes + + def perform(retry_count = 0) + return false if Gitlab::CurrentSettings.zoekt_indexing_paused? + return false unless Search::Zoekt.licensed_and_indexing_enabled? + return false unless Feature.enabled?(:zoekt_rollout_worker, Feature.current_request) + + result = ::Search::Zoekt::RolloutService.execute + + if result.success? + log_info message: "RolloutWorker succeeded: #{result.message}" + self.class.perform_async # Immediately schedule another job + else + log_info message: "RolloutWorker did not do any work: #{result.message}" + + if retry_count < MAX_RETRIES + backoff_time = INITIAL_BACKOFF * (2**retry_count) + + self.class.perform_at(backoff_time.from_now, retry_count + 1) + else + log_info message: "RolloutWorker exceeded max back off interval. Last message: #{result.message}" + end + end + end + + private + + def logger + @logger ||= ::Search::Zoekt::Logger.build + end + + def log_info(**payload) + logger.info(build_structured_payload(**payload)) + end + end + end +end diff --git a/ee/config/feature_flags/ops/zoekt_rollout_worker.yml b/ee/config/feature_flags/ops/zoekt_rollout_worker.yml new file mode 100644 index 0000000000000000..ca3775f2083993f6 --- /dev/null +++ b/ee/config/feature_flags/ops/zoekt_rollout_worker.yml @@ -0,0 +1,9 @@ +--- +name: zoekt_rollout_worker +feature_issue_url: +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/175666 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/519660 +milestone: '17.9' +group: group::global search +type: ops +default_enabled: false diff --git a/ee/spec/models/search/zoekt/node_spec.rb b/ee/spec/models/search/zoekt/node_spec.rb index fbffc1501bbf030f..60da4981c1c4cd42 100644 --- a/ee/spec/models/search/zoekt/node_spec.rb +++ b/ee/spec/models/search/zoekt/node_spec.rb @@ -150,7 +150,7 @@ positive_nodes = described_class.with_positive_unclaimed_storage_bytes expect(positive_nodes).to include(node_with_positive_storage) - expect(positive_nodes).to include(node_with_zero_storage) + expect(positive_nodes).not_to include(node_with_zero_storage) expect(positive_nodes).not_to include(node_with_negative_storage) end @@ -158,7 +158,7 @@ result = described_class.with_positive_unclaimed_storage_bytes.find(node_with_positive_storage.id) expect(result).to respond_to(:unclaimed_storage_bytes) - expect(result.unclaimed_storage_bytes).to be >= 0 + expect(result.unclaimed_storage_bytes).to be > 0 end it 'calculates unclaimed_storage_bytes correctly' do diff --git a/ee/spec/services/search/zoekt/planning_service_spec.rb b/ee/spec/services/search/zoekt/planning_service_spec.rb new file mode 100644 index 0000000000000000..3441f7dc7de8f78e --- /dev/null +++ b/ee/spec/services/search/zoekt/planning_service_spec.rb @@ -0,0 +1,148 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Search::Zoekt::PlanningService, feature_category: :global_search do + let_it_be(:group1) { create(:group) } + let_it_be(:enabled_namespace1) { create(:zoekt_enabled_namespace, namespace: group1) } + let_it_be(:group2) { create(:group) } + let_it_be(:enabled_namespace2) { create(:zoekt_enabled_namespace, namespace: group2) } + let_it_be_with_reload(:nodes) { create_list(:zoekt_node, 5, total_bytes: 100.gigabytes, used_bytes: 90.gigabytes) } + let_it_be(:projects_namespace1) do + [ + create(:project, namespace: group1, statistics: create(:project_statistics, repository_size: 1.gigabyte)), + create(:project, namespace: group1, statistics: create(:project_statistics, repository_size: 2.gigabytes)) + ] + end + + let_it_be(:projects_namespace2) do + [create(:project, namespace: group2, statistics: create(:project_statistics, repository_size: 2.gigabytes))] + end + + let(:max_indices_per_replica) { 5 } + + describe '.plan' do + subject(:plan) do + described_class.plan( + enabled_namespaces: [enabled_namespace1, enabled_namespace2], + nodes: nodes, + num_replicas: num_replicas, + buffer_factor: buffer_factor, + max_indices_per_replica: max_indices_per_replica + ) + end + + let(:num_replicas) { 2 } + let(:buffer_factor) { 1.5 } + + it 'returns total required storage bytes across all namespaces' do + total_storage = (projects_namespace1 + projects_namespace2).sum { |p| p.statistics.repository_size } + buffered_storage = total_storage * buffer_factor * num_replicas + expect(plan[:total_required_storage_bytes]).to eq(buffered_storage) + end + + it 'returns plans for each enabled namespace' do + expect(plan[:namespaces].size).to eq(2) + expect(plan[:namespaces].pluck(:enabled_namespace_id)) + .to contain_exactly(enabled_namespace1.id, enabled_namespace2.id) + end + + it 'calculates the namespace-specific required storage bytes' do + namespace1_storage = projects_namespace1.sum { |p| p.statistics.repository_size * buffer_factor } + namespace2_storage = projects_namespace2.sum { |p| p.statistics.repository_size * buffer_factor } + + expect(plan[:namespaces][0][:namespace_required_storage_bytes]).to eq(namespace1_storage * num_replicas) + expect(plan[:namespaces][1][:namespace_required_storage_bytes]).to eq(namespace2_storage * num_replicas) + end + + it 'assigns projects to indices for each namespace without reusing nodes' do + namespace1_used_nodes = [] + plan[:namespaces][0][:replicas].each do |replica| + replica[:indices].each do |index| + expect(namespace1_used_nodes).not_to include(index[:node_id]) + namespace1_used_nodes << index[:node_id] + end + end + end + + context 'when max indices per replica is reached' do + let(:max_indices_per_replica) { 1 } + + it 'logs an error for the namespace which can not be fit into 1 index' do + plan[:failures].each do |namespace_plan| + expect(namespace_plan[:errors]).to include(a_hash_including(type: :index_limit_exceeded)) + end + end + end + + context 'when a namespace has to be spread across multiple indices' do + let(:buffer_factor) { 2.5 } + let(:num_replicas) { 1 } + + before do + nodes.map { |node| node.update!(total_bytes: 10.gigabytes, used_bytes: 3.gigabytes) } + end + + it 'creates multiple indices for a namespace' do + namespace1_plan = plan[:namespaces].find { |n| n[:enabled_namespace_id] == enabled_namespace1.id } + indices_plan = namespace1_plan[:replicas].flat_map { |replica| replica[:indices] } + + expect(indices_plan.size).to eq(2) + expect(indices_plan.pluck(:node_id).uniq.size).to eq(2) + projects = indices_plan.first[:projects] + p_ns = ::Namespace.by_root_id(group1.id).project_namespaces.order(:id) + expect(projects).to eq({ project_namespace_id_from: p_ns[0].id, project_namespace_id_to: p_ns[0].id }) + projects = indices_plan.last[:projects] + expect(projects).to eq({ project_namespace_id_from: p_ns[1].id, project_namespace_id_to: nil }) + + namespace2_plan = plan[:namespaces].find { |n| n[:enabled_namespace_id] == enabled_namespace2.id } + indices_plan = namespace2_plan[:replicas].flat_map { |replica| replica[:indices] } + expect(indices_plan.size).to eq(1) + projects = indices_plan.first[:projects] + p_ns = ::Namespace.by_root_id(group2.id).project_namespaces.order(:id) + expect(projects).to eq({ project_namespace_id_from: p_ns[0].id, project_namespace_id_to: nil }) + end + end + end + + context 'when there are more projects than the batch size' do + let(:batch_size) { 2 } + let(:num_replicas) { 2 } + let(:buffer_factor) { 1.5 } + + before do + # Create more projects than the batch size + (1..6).each do |i| + create(:project, namespace: group1, statistics: create(:project_statistics, repository_size: i.megabytes)) + end + end + + it 'processes all projects in batches without skipping any' do + # Run the planning service with a specific batch size + result = described_class.plan( + enabled_namespaces: [enabled_namespace1], + nodes: nodes, + num_replicas: num_replicas, + buffer_factor: buffer_factor + ) + + # Total storage should account for all projects + total_storage = group1.projects.sum do |p| + p.statistics.repository_size + end + + buffered_storage = total_storage * buffer_factor * num_replicas + + expect(result[:total_required_storage_bytes]).to eq(buffered_storage) + + # Ensure all projects are assigned + assigned_projects = result[:namespaces][0][:replicas].flat_map { |r| r[:indices].flat_map { |i| i[:projects] } } + lower, upper = assigned_projects.pluck(:project_namespace_id_from, :project_namespace_id_to).flatten.uniq + id_range = upper.blank? ? lower.. : lower..upper + + project_ids = group1.projects.by_project_namespace(id_range).pluck(:id) + + expect(project_ids).to match_array(group1.projects.pluck(:id)) + end + end +end diff --git a/ee/spec/services/search/zoekt/provisioning_service_spec.rb b/ee/spec/services/search/zoekt/provisioning_service_spec.rb new file mode 100644 index 0000000000000000..37f25f69bd199e6a --- /dev/null +++ b/ee/spec/services/search/zoekt/provisioning_service_spec.rb @@ -0,0 +1,278 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Search::Zoekt::ProvisioningService, feature_category: :global_search do + let_it_be(:namespace) { create(:group) } + let_it_be(:enabled_namespace) { create(:zoekt_enabled_namespace, namespace: namespace) } + let_it_be(:namespace2) { create(:group) } + let_it_be(:enabled_namespace2) { create(:zoekt_enabled_namespace, namespace: namespace2) } + let_it_be(:nodes) { create_list(:zoekt_node, 5, total_bytes: 100.gigabytes, used_bytes: 90.gigabytes) } + + let(:plan) do + { + namespaces: [ + { + namespace_id: namespace.id, + enabled_namespace_id: enabled_namespace.id, + replicas: [ + { + indices: [ + { + node_id: nodes.first.id, + required_storage_bytes: 3.gigabytes, + max_storage_bytes: 90.gigabytes, + projects: { project_namespace_id_from: 1, project_namespace_id_to: 5 } + }, + { + node_id: nodes.second.id, + required_storage_bytes: 2.gigabytes, + max_storage_bytes: 80.gigabytes, + projects: { project_namespace_id_from: 6, project_namespace_id_to: nil } + } + ] + }, + { + indices: [ + { + node_id: nodes.third.id, + required_storage_bytes: 3.gigabytes, + max_storage_bytes: 90.gigabytes, + projects: { project_namespace_id_from: 1, project_namespace_id_to: 5 } + }, + { + node_id: nodes.fourth.id, + required_storage_bytes: 2.gigabytes, + max_storage_bytes: 80.gigabytes, + projects: { project_namespace_id_from: 6, project_namespace_id_to: nil } + } + ] + } + ], + errors: [], + namespace_required_storage_bytes: 10.gigabytes + }, + { + namespace_id: namespace2.id, + enabled_namespace_id: enabled_namespace2.id, + replicas: [ + { + indices: [ + { + node_id: nodes.first.id, + required_storage_bytes: 2.gigabytes, + max_storage_bytes: 90.gigabytes, + projects: { project_namespace_id_from: 1, project_namespace_id_to: 3 } + }, + { + node_id: nodes.second.id, + required_storage_bytes: 1.gigabyte, + max_storage_bytes: 80.gigabytes, + projects: { project_namespace_id_from: 4, project_namespace_id_to: nil } + } + ] + }, + { + indices: [ + { + node_id: nodes.third.id, + required_storage_bytes: 2.gigabytes, + max_storage_bytes: 90.gigabytes, + projects: { project_namespace_id_from: 1, project_namespace_id_to: 3 } + }, + { + node_id: nodes.fourth.id, + required_storage_bytes: 1.gigabyte, + max_storage_bytes: 80.gigabytes, + projects: { project_namespace_id_from: 4, project_namespace_id_to: nil } + } + ] + } + ], + errors: [], + namespace_required_storage_bytes: 6.gigabytes + } + ], + total_required_storage_bytes: 16.gigabytes, + failures: [] + } + end + + subject(:provisioning_result) { described_class.execute(plan) } + + describe '.provision' do + context 'when the plan is valid' do + it 'provisions all replicas and indices' do + result = provisioning_result + # Ensure there are no errors + expect(result[:errors]).to be_empty + expect(enabled_namespace.replicas.count).to eq(2) + expect(enabled_namespace.indices.count).to eq(4) + expect(enabled_namespace2.replicas.count).to eq(2) + expect(enabled_namespace2.indices.count).to eq(4) + + metadata = enabled_namespace.replicas.first.indices.find_by_zoekt_node_id(nodes.first).metadata + expect(metadata).to eq({ 'project_namespace_id_to' => 5, 'project_namespace_id_from' => 1 }) + metadata2 = enabled_namespace.replicas.first.indices.find_by_zoekt_node_id(nodes.second).metadata + expect(metadata2).to eq({ 'project_namespace_id_from' => 6 }) + metadata3 = enabled_namespace.replicas.second.indices.find_by_zoekt_node_id(nodes.third).metadata + expect(metadata3).to eq({ 'project_namespace_id_to' => 5, 'project_namespace_id_from' => 1 }) + metadata4 = enabled_namespace.replicas.second.indices.find_by_zoekt_node_id(nodes.fourth).metadata + expect(metadata4).to eq({ 'project_namespace_id_from' => 6 }) + + metadata5 = enabled_namespace2.replicas.first.indices.find_by_zoekt_node_id(nodes.first).metadata + expect(metadata5).to eq({ 'project_namespace_id_to' => 3, 'project_namespace_id_from' => 1 }) + metadata6 = enabled_namespace2.replicas.first.indices.find_by_zoekt_node_id(nodes.second).metadata + expect(metadata6).to eq({ 'project_namespace_id_from' => 4 }) + metadata7 = enabled_namespace2.replicas.second.indices.find_by_zoekt_node_id(nodes.third).metadata + expect(metadata7).to eq({ 'project_namespace_id_to' => 3, 'project_namespace_id_from' => 1 }) + metadata8 = enabled_namespace2.replicas.second.indices.find_by_zoekt_node_id(nodes.fourth).metadata + expect(metadata8).to eq({ 'project_namespace_id_from' => 4 }) + end + end + + context 'when there is not enough space in node' do + before do + nodes.second.update!(used_bytes: 99.gigabytes) # Simulate node being near full + end + + it 'logs an error and does not provision indices on that node' do + result = provisioning_result + + expect(result[:errors]).to include(a_hash_including(message: :node_capacity_exceeded)) + end + end + + context 'when there is an error initializing a replica' do + it 'logs an error and does not creates anything' do + allow(::Search::Zoekt::Replica).to receive(:new).and_raise(StandardError, 'Replica initialization failed') + + result = provisioning_result + + expect(result[:errors]).to include(a_hash_including(details: 'Replica initialization failed')) + expect(Search::Zoekt::Replica.count).to be_zero + expect(Search::Zoekt::Index.count).to be_zero + end + end + + context 'when there is an error initializing an index' do + it 'logs an error and does not creates anything' do + allow(::Search::Zoekt::Index).to receive(:new).once.and_raise(StandardError, 'Index initialization failed') + + result = provisioning_result + + expect(result[:errors]).to include(a_hash_including(details: 'Index initialization failed')) + expect(Search::Zoekt::Replica.count).to be_zero + expect(Search::Zoekt::Index.count).to be_zero + end + end + + context 'when a namespace has errors in its plan' do + let(:plan) do + { + namespaces: [ + { + namespace_id: namespace.id, + enabled_namespace_id: enabled_namespace.id, + replicas: [ + { + indices: [ + { + node_id: nodes.first.id, + required_storage_bytes: 3.gigabytes, + max_storage_bytes: 90.gigabytes, + projects: { project_namespace_id_from: 1, project_namespace_id_to: 5 } + }, + { + node_id: nodes.second.id, + required_storage_bytes: 2.gigabytes, + max_storage_bytes: 80.gigabytes, + projects: { project_namespace_id_from: 6, project_namespace_id_to: nil } + } + ] + }, + { + indices: [ + { + node_id: nodes.third.id, + required_storage_bytes: 3.gigabytes, + max_storage_bytes: 90.gigabytes, + projects: { project_namespace_id_from: 1, project_namespace_id_to: 5 } + }, + { + node_id: nodes.fourth.id, + required_storage_bytes: 2.gigabytes, + max_storage_bytes: 80.gigabytes, + projects: { project_namespace_id_from: 6, project_namespace_id_to: nil } + } + ] + } + ], + errors: [{ namespace_id: namespace.id, replica_idx: nil, type: :error_type, details: 'Detail' }], + namespace_required_storage_bytes: 10.gigabytes + }, + { + namespace_id: namespace2.id, + enabled_namespace_id: enabled_namespace2.id, + replicas: [ + { + indices: [ + { + node_id: nodes.first.id, + required_storage_bytes: 2.gigabytes, + max_storage_bytes: 90.gigabytes, + projects: { project_namespace_id_from: 1, project_namespace_id_to: 3 } + }, + { + node_id: nodes.second.id, + required_storage_bytes: 1.gigabyte, + max_storage_bytes: 80.gigabytes, + projects: { project_namespace_id_from: 4, project_namespace_id_to: nil } + } + ] + }, + { + indices: [ + { + node_id: nodes.third.id, + required_storage_bytes: 2.gigabytes, + max_storage_bytes: 90.gigabytes, + projects: { project_namespace_id_from: 1, project_namespace_id_to: 3 } + }, + { + node_id: nodes.fourth.id, + required_storage_bytes: 1.gigabyte, + max_storage_bytes: 80.gigabytes, + projects: { project_namespace_id_from: 4, project_namespace_id_to: nil } + } + ] + } + ], + errors: [], + namespace_required_storage_bytes: 6.gigabytes + } + ], + total_required_storage_bytes: 8.gigabytes + } + end + + it 'skips that namespace and continues with the rest' do + result = provisioning_result + # Ensure there are no errors + expect(result[:errors]).to be_empty + expect(enabled_namespace.replicas).to be_empty + expect(enabled_namespace.indices).to be_empty + expect(enabled_namespace2.replicas.count).to eq(2) + expect(enabled_namespace2.indices.count).to eq(4) + metadata = enabled_namespace2.replicas.first.indices.find_by_zoekt_node_id(nodes.first).metadata + expect(metadata).to eq({ 'project_namespace_id_to' => 3, 'project_namespace_id_from' => 1 }) + metadata2 = enabled_namespace2.replicas.first.indices.find_by_zoekt_node_id(nodes.second).metadata + expect(metadata2).to eq({ 'project_namespace_id_from' => 4 }) + metadata3 = enabled_namespace2.replicas.second.indices.find_by_zoekt_node_id(nodes.third).metadata + expect(metadata3).to eq({ 'project_namespace_id_to' => 3, 'project_namespace_id_from' => 1 }) + metadata4 = enabled_namespace2.replicas.second.indices.find_by_zoekt_node_id(nodes.fourth).metadata + expect(metadata4).to eq({ 'project_namespace_id_from' => 4 }) + end + end + end +end diff --git a/ee/spec/services/search/zoekt/rollout_service_spec.rb b/ee/spec/services/search/zoekt/rollout_service_spec.rb new file mode 100644 index 0000000000000000..ca4e7ab97edf7b13 --- /dev/null +++ b/ee/spec/services/search/zoekt/rollout_service_spec.rb @@ -0,0 +1,151 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ::Search::Zoekt::RolloutService, feature_category: :global_search do + let(:logger) { instance_double(Logger, info: nil) } + let(:resource_pool) do + instance_double(::Search::Zoekt::SelectionService::ResourcePool, + enabled_namespaces: enabled_namespaces, + nodes: nodes) + end + + let(:plan) { instance_double(::Search::Zoekt::PlanningService::Plan, to_json: '{"plan": "data"}') } + let(:default_options) do + { + num_replicas: 1, + max_indices_per_replica: 5, + dry_run: true, + batch_size: 128, + logger: logger + } + end + + let(:selection_service) do + ::Search::Zoekt::SelectionService + end + + let(:planning_service) do + ::Search::Zoekt::PlanningService + end + + let(:provisioning_service) do + ::Search::Zoekt::ProvisioningService + end + + let(:enabled_namespaces) { ['namespace1'] } + let(:nodes) { ['node1'] } + + subject(:service) { described_class.new(**options) } + + describe '#execute' do + subject(:result) { service.execute } + + let(:options) { {} } + + before do + allow(selection_service).to receive(:execute) + .with(max_batch_size: default_options[:batch_size]) + .and_return(resource_pool) + end + + context 'when no enabled namespaces are found' do + let(:enabled_namespaces) { [] } + + it 'returns a failed result with appropriate message' do + expect(result.success?).to be false + expect(result.message).to eq("No enabled namespaces found") + end + end + + context 'when no available nodes are found' do + let(:nodes) { [] } + + it 'returns a failed result with appropriate message' do + expect(result.success?).to be false + expect(result.message).to eq("No available nodes found") + end + end + + context 'when dry_run is true' do + let(:options) { { dry_run: true, logger: logger } } + + before do + allow(planning_service).to receive(:plan) + .with( + enabled_namespaces: enabled_namespaces, + nodes: nodes, + num_replicas: default_options[:num_replicas], + max_indices_per_replica: default_options[:max_indices_per_replica] + ) + .and_return(plan) + end + + it 'returns a successful result indicating a skipped execution' do + expect(result.success?).to be true + expect(result.message).to eq("Skipping execution of changes because of dry run") + end + end + + context 'when dry_run is false and provisioning returns errors' do + let(:options) { { dry_run: false, logger: logger } } + let(:changes) { { errors: 'some error occurred' } } + + before do + allow(planning_service).to receive(:plan) + .with( + enabled_namespaces: enabled_namespaces, + nodes: nodes, + num_replicas: default_options[:num_replicas], + max_indices_per_replica: default_options[:max_indices_per_replica] + ) + .and_return(plan) + + allow(provisioning_service).to receive(:execute) + .with(plan) + .and_return(changes) + end + + it 'returns a failed result with the provisioning error message' do + expect(result.success?).to be false + expect(result.message).to eq("Change had an error: some error occurred") + end + end + + context 'when dry_run is false and provisioning succeeds' do + let(:options) { { dry_run: false, logger: logger } } + let(:changes) { { errors: nil } } + + before do + allow(planning_service).to receive(:plan) + .with( + enabled_namespaces: enabled_namespaces, + nodes: nodes, + num_replicas: default_options[:num_replicas], + max_indices_per_replica: default_options[:max_indices_per_replica] + ) + .and_return(plan) + + allow(provisioning_service).to receive(:execute) + .with(plan) + .and_return(changes) + end + + it 'returns a successful result indicating completion' do + expect(result.success?).to be true + expect(result.message).to eq("Rollout execution completed successfully") + end + end + end + + describe '.execute' do + let(:options) { { dry_run: true, logger: logger } } + + it 'delegates to an instance of RolloutService' do + instance = instance_double(described_class) + expect(described_class).to receive(:new).with(**options).and_return(instance) + expect(instance).to receive(:execute) + described_class.execute(**options) + end + end +end diff --git a/ee/spec/services/search/zoekt/selection_service_spec.rb b/ee/spec/services/search/zoekt/selection_service_spec.rb new file mode 100644 index 0000000000000000..2eee2e4e83ade7dd --- /dev/null +++ b/ee/spec/services/search/zoekt/selection_service_spec.rb @@ -0,0 +1,109 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Search::Zoekt::SelectionService, feature_category: :global_search do + describe '.execute' do + subject(:resource_pool) { described_class.execute } + + let_it_be(:ns_1) { create(:group) } + let_it_be(:ns_2) { create(:group) } + + context 'with basic resource pool structure' do + it 'returns a resource pool responding to :enabled_namespaces and :nodes' do + expect(resource_pool).to respond_to(:enabled_namespaces) + expect(resource_pool).to respond_to(:nodes) + end + end + + context 'with enabled namespaces selection' do + let_it_be(:eligible_namespace) { create(:zoekt_enabled_namespace, namespace: ns_1) } + let_it_be(:ineligible_namespace) { create(:zoekt_enabled_namespace, namespace: ns_2) } + + before do + # For the eligible namespace, the project count will be low (default). + # For the ineligible namespace, stub its associated namespace so that + # project_namespaces.count returns 25_000 (above the 20,000 threshold). + allow(::Namespace).to receive(:by_root_id) + .with(eligible_namespace.root_namespace_id) + .and_return(::Namespace.where(id: eligible_namespace.root_namespace_id)) + + allow(::Namespace).to receive(:by_root_id) + .with(ineligible_namespace.root_namespace_id) + .and_return(::Namespace.where(id: ineligible_namespace.root_namespace_id)) + + allow(ineligible_namespace.namespace) + .to receive_message_chain(:project_namespaces, :count) + .and_return(25_000) + + allow(::Namespace).to receive(:by_root_id) + .with(ineligible_namespace.root_namespace_id) + .and_return(ineligible_namespace.namespace.root_ancestor) + end + + it 'includes only namespaces with a project count within the limit' do + expect(resource_pool.enabled_namespaces).to include(eligible_namespace) + expect(resource_pool.enabled_namespaces).not_to include(ineligible_namespace) + end + end + + context 'with max batch size enforcement' do + let(:max_batch_size) { 2 } + + subject(:resource_pool) { described_class.new(max_batch_size: max_batch_size).execute } + + before do + # Create more eligible namespaces than the max batch size. + create_list(:zoekt_enabled_namespace, 3) + end + + it 'limits the number of selected namespaces to the max batch size' do + expect(resource_pool.enabled_namespaces.size).to eq(max_batch_size) + end + end + + context 'with available nodes selection' do + let_it_be(:eligible_node) { create(:zoekt_node, total_bytes: 100.gigabytes, used_bytes: 50.gigabytes) } + # Node with no unclaimed storage. + let_it_be(:ineligible_node) do + create(:zoekt_node, total_bytes: 100.gigabytes, used_bytes: 100.gigabytes) + end + + it 'returns only nodes with positive unclaimed storage bytes' do + expect(resource_pool.nodes).to include(eligible_node) + expect(resource_pool.nodes).not_to include(ineligible_node) + end + end + + context 'when no eligible namespaces exist' do + before do + # Create namespaces but stub each so that project_namespaces.count returns 30_000. + create_list(:zoekt_enabled_namespace, 2).each do |ns| + allow(ns.namespace.root_ancestor) + .to receive_message_chain(:project_namespaces, :count) + .and_return(30_000) + allow(::Namespace).to receive(:by_root_id) + .with(ns.root_namespace_id) + .and_return(ns.namespace.root_ancestor) + end + end + + it 'returns an empty array for namespaces' do + expect(resource_pool.enabled_namespaces).to be_empty + end + end + + context 'when no eligible nodes exist' do + before do + # Stub the node scope to return an empty array. + allow(::Search::Zoekt::Node) + .to receive(:with_positive_unclaimed_storage_bytes) + .and_return([]) + end + + it 'returns an empty array for nodes' do + expect(resource_pool.nodes).to eq([]) + end + end + end +end diff --git a/spec/models/namespaces/project_namespace_spec.rb b/spec/models/namespaces/project_namespace_spec.rb index 2620f287ae0d72d1..7ed858da9a7d720e 100644 --- a/spec/models/namespaces/project_namespace_spec.rb +++ b/spec/models/namespaces/project_namespace_spec.rb @@ -17,6 +17,34 @@ end end + describe 'scopes' do + let_it_be(:namespace) { create(:namespace) } + let_it_be(:project) do + create(:project, + namespace: namespace, + statistics: build( + :project_statistics, + namespace: namespace, + repository_size: 101, + wiki_size: 505, + lfs_objects_size: 202, + build_artifacts_size: 303, + pipeline_artifacts_size: 707, + packages_size: 404, + snippets_size: 605, + uploads_size: 808 + ) + ) + end + + describe '.with_project_statistics' do + it 'includes the project statistics' do + project_namespaces = described_class.with_project_statistics + expect(project_namespaces.first.project.association(:statistics).loaded?).to be_truthy + end + end + end + describe 'validations' do it { is_expected.not_to validate_presence_of :owner } end -- GitLab From 003a494cb903259e49bfef279ba50b4fbd02fa1b Mon Sep 17 00:00:00 2001 From: John Mason <jmason@gitlab.com> Date: Thu, 20 Feb 2025 17:29:48 -0500 Subject: [PATCH 2/6] Update scope of project statistics --- app/models/namespace.rb | 1 + app/models/namespaces/project_namespace.rb | 2 -- ee/app/services/search/zoekt/planning_service.rb | 8 +------- 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/app/models/namespace.rb b/app/models/namespace.rb index 48a58ee912424009..e31c54a664db0eed 100644 --- a/app/models/namespace.rb +++ b/app/models/namespace.rb @@ -230,6 +230,7 @@ class Namespace < ApplicationRecord scope :by_name, ->(name) { where('name LIKE ?', "#{sanitize_sql_like(name)}%") } scope :ordered_by_name, -> { order(:name) } scope :top_level, -> { by_parent(nil) } + scope :with_project_statistics, -> { includes(projects: :statistics) } scope :with_statistics, -> do namespace_statistic_columns = STATISTICS_COLUMNS.map { |column| sum_project_statistics_column(column) } diff --git a/app/models/namespaces/project_namespace.rb b/app/models/namespaces/project_namespace.rb index e1be5ccbe84b7542..d60800af67067349 100644 --- a/app/models/namespaces/project_namespace.rb +++ b/app/models/namespaces/project_namespace.rb @@ -4,8 +4,6 @@ module Namespaces class ProjectNamespace < Namespace self.allow_legacy_sti_class = true - scope :with_project_statistics, -> { includes(project: :statistics) } - # These aliases are added to make it easier to sync parent/parent_id attribute with # project.namespace/project.namespace_id attribute. # diff --git a/ee/app/services/search/zoekt/planning_service.rb b/ee/app/services/search/zoekt/planning_service.rb index d556e41369bd4354..890431e13d159178 100644 --- a/ee/app/services/search/zoekt/planning_service.rb +++ b/ee/app/services/search/zoekt/planning_service.rb @@ -147,13 +147,7 @@ def create_empty_replica end def fetch_project_namespaces - scope = ::Namespace.by_root_id(namespace.id).project_namespaces - - if scope.respond_to?(:with_project_statistics) - scope.with_project_statistics - else # TODO: remove this condition before merging - scope.includes(project: :statistics) # rubocop:disable CodeReuse/ActiveRecord -- will remove - end + ::Namespace.by_root_id(namespace.id).project_namespaces.with_project_statistics end def assign_to_node(stats, replica_indices) -- GitLab From f88300d312753871876737467a3c247b1a944f3a Mon Sep 17 00:00:00 2001 From: John Mason <jmason@gitlab.com> Date: Fri, 21 Feb 2025 10:38:50 -0500 Subject: [PATCH 3/6] Add lock to rollout worker --- ee/app/workers/search/zoekt/rollout_worker.rb | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/ee/app/workers/search/zoekt/rollout_worker.rb b/ee/app/workers/search/zoekt/rollout_worker.rb index fcdc3c2754a27830..d397bd461cb039d7 100644 --- a/ee/app/workers/search/zoekt/rollout_worker.rb +++ b/ee/app/workers/search/zoekt/rollout_worker.rb @@ -6,6 +6,7 @@ class RolloutWorker include ApplicationWorker include Search::Worker include CronjobQueue # rubocop:disable Scalability/CronWorkerContext -- there is no relevant metadata + include Gitlab::ExclusiveLeaseHelpers prepend ::Geo::SkipSecondary deduplicate :until_executed @@ -25,20 +26,22 @@ def perform(retry_count = 0) return false unless Search::Zoekt.licensed_and_indexing_enabled? return false unless Feature.enabled?(:zoekt_rollout_worker, Feature.current_request) - result = ::Search::Zoekt::RolloutService.execute + in_lock(self.class.name.underscore, ttl: 10.minutes, retries: 10, sleep_sec: 1) do + result = ::Search::Zoekt::RolloutService.execute - if result.success? - log_info message: "RolloutWorker succeeded: #{result.message}" - self.class.perform_async # Immediately schedule another job - else - log_info message: "RolloutWorker did not do any work: #{result.message}" + if result.success? + log_info message: "RolloutWorker succeeded: #{result.message}" + self.class.perform_async # Immediately schedule another job + else + log_info message: "RolloutWorker did not do any work: #{result.message}" - if retry_count < MAX_RETRIES - backoff_time = INITIAL_BACKOFF * (2**retry_count) + if retry_count < MAX_RETRIES + backoff_time = INITIAL_BACKOFF * (2**retry_count) - self.class.perform_at(backoff_time.from_now, retry_count + 1) - else - log_info message: "RolloutWorker exceeded max back off interval. Last message: #{result.message}" + self.class.perform_at(backoff_time.from_now, retry_count + 1) + else + log_info message: "RolloutWorker exceeded max back off interval. Last message: #{result.message}" + end end end end -- GitLab From b9dbf3da5f09836943ad80bc6591b3fc0e68fa50 Mon Sep 17 00:00:00 2001 From: John Mason <jmason@gitlab.com> Date: Fri, 21 Feb 2025 10:44:18 -0500 Subject: [PATCH 4/6] Add memoization to fetch project namespaces --- ee/app/services/search/zoekt/planning_service.rb | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ee/app/services/search/zoekt/planning_service.rb b/ee/app/services/search/zoekt/planning_service.rb index 890431e13d159178..2a03af229dddf0e0 100644 --- a/ee/app/services/search/zoekt/planning_service.rb +++ b/ee/app/services/search/zoekt/planning_service.rb @@ -76,6 +76,8 @@ def build_node_change_plan end class Plan + include Gitlab::Utils::StrongMemoize + attr_reader :enabled_namespace, :namespace, :num_replicas, :buffer_factor, :max_indices_per_replica, :errors, :replica_plans, :nodes @@ -149,6 +151,7 @@ def create_empty_replica def fetch_project_namespaces ::Namespace.by_root_id(namespace.id).project_namespaces.with_project_statistics end + strong_memoize_attr :fetch_project_namespaces def assign_to_node(stats, replica_indices) best_node = find_best_node(stats) -- GitLab From ea3e3c20b1823fe64d48e7cd844fe39f6fbddaed Mon Sep 17 00:00:00 2001 From: John Mason <jmason@gitlab.com> Date: Fri, 21 Feb 2025 11:09:29 -0500 Subject: [PATCH 5/6] Optimize total required storage bytes calculation --- ee/app/services/search/zoekt/planning_service.rb | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/ee/app/services/search/zoekt/planning_service.rb b/ee/app/services/search/zoekt/planning_service.rb index 2a03af229dddf0e0..b0b74395d71967a1 100644 --- a/ee/app/services/search/zoekt/planning_service.rb +++ b/ee/app/services/search/zoekt/planning_service.rb @@ -25,21 +25,10 @@ def plan ).generate end - namespace_plans = [] - failed_plans = [] - - all_plans.each do |p| - if p[:errors].present? - failed_plans << p - else - namespace_plans << p - end - end - - successful_plans = all_plans.filter { |p| !p[:errors].present? } + failed_plans, successful_plans = all_plans.partition { |plan| plan[:errors].present? } { - total_required_storage_bytes: namespace_plans.sum { |plan| plan[:namespace_required_storage_bytes] }, + total_required_storage_bytes: successful_plans.sum { |plan| plan[:namespace_required_storage_bytes] }, namespaces: successful_plans, nodes: build_node_change_plan, failures: failed_plans -- GitLab From b3d3c37aa104d1e91148984229cfb8049fe988b5 Mon Sep 17 00:00:00 2001 From: John Mason <jmason@gitlab.com> Date: Fri, 21 Feb 2025 12:06:03 -0500 Subject: [PATCH 6/6] Fix updated spec --- spec/models/namespace_spec.rb | 28 +++++++++++++++++++ .../namespaces/project_namespace_spec.rb | 28 ------------------- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/spec/models/namespace_spec.rb b/spec/models/namespace_spec.rb index fbd89f83113e1640..dcd3ffcd5c96caa5 100644 --- a/spec/models/namespace_spec.rb +++ b/spec/models/namespace_spec.rb @@ -600,6 +600,34 @@ end end end + + describe '.with_project_statistics' do + let_it_be(:namespace) { create(:namespace) } + let_it_be(:project) do + create(:project, + namespace: namespace, + statistics: build( + :project_statistics, + namespace: namespace, + repository_size: 101, + wiki_size: 505, + lfs_objects_size: 202, + build_artifacts_size: 303, + pipeline_artifacts_size: 707, + packages_size: 404, + snippets_size: 605, + uploads_size: 808 + ) + ) + end + + it 'includes the project statistics' do + namespaces = described_class.with_project_statistics + expect( + namespaces.where(id: namespace.id).first.projects.first.association(:statistics).loaded? + ).to be_truthy + end + end end describe 'delegate' do diff --git a/spec/models/namespaces/project_namespace_spec.rb b/spec/models/namespaces/project_namespace_spec.rb index 7ed858da9a7d720e..2620f287ae0d72d1 100644 --- a/spec/models/namespaces/project_namespace_spec.rb +++ b/spec/models/namespaces/project_namespace_spec.rb @@ -17,34 +17,6 @@ end end - describe 'scopes' do - let_it_be(:namespace) { create(:namespace) } - let_it_be(:project) do - create(:project, - namespace: namespace, - statistics: build( - :project_statistics, - namespace: namespace, - repository_size: 101, - wiki_size: 505, - lfs_objects_size: 202, - build_artifacts_size: 303, - pipeline_artifacts_size: 707, - packages_size: 404, - snippets_size: 605, - uploads_size: 808 - ) - ) - end - - describe '.with_project_statistics' do - it 'includes the project statistics' do - project_namespaces = described_class.with_project_statistics - expect(project_namespaces.first.project.association(:statistics).loaded?).to be_truthy - end - end - end - describe 'validations' do it { is_expected.not_to validate_presence_of :owner } end -- GitLab