Skip to content
Snippets Groups Projects
Commit 260b390b authored by John Mason's avatar John Mason
Browse files

Simplify planning service

parent baa9a27a
No related branches found
No related tags found
No related merge requests found
This commit is part of merge request !175666. Comments created here will be created in the context of that merge request.
......@@ -23,6 +23,8 @@ def save!
def apply_replica_changes!
replica_changes.each do |replica_change|
# TODO: delete replica if necessary
replica = replica_change[:replica]
replica.save! unless replica.persisted?
......
......@@ -20,7 +20,6 @@ def initialize(enabled_namespaces:, num_replicas:, buffer_factor: 3, batch_size:
end
def plan
# Load nodes with unclaimed storage once, to be shared across all namespaces
nodes = load_nodes_with_unclaimed_storage
namespace_plans = enabled_namespaces.map do |enabled_namespace|
......@@ -61,21 +60,21 @@ def load_nodes_with_unclaimed_storage
end
class Plan
attr_reader :enabled_namespace, :namespace, :number_of_replicas, :buffer_factor, :batch_size, :max_indices_per_replica,
:errors, :replica_plans, :nodes
attr_reader :enabled_namespace, :namespace, :number_of_replicas, :buffer_factor, :batch_size,
:max_indices_per_replica, :errors, :replica_plans, :nodes
def initialize(
enabled_namespace:, number_of_replicas:, buffer_factor:, batch_size:, errors:,
max_indices_per_replica:, nodes:)
enabled_namespace:, number_of_replicas:, buffer_factor:, batch_size:, max_indices_per_replica:,
errors:, nodes:)
@enabled_namespace = enabled_namespace
@namespace = enabled_namespace.namespace
@number_of_replicas = number_of_replicas
@buffer_factor = buffer_factor
@errors = errors
@replica_plans = []
@batch_size = batch_size
@max_indices_per_replica = max_indices_per_replica
@nodes = nodes # Shared nodes across namespaces
@errors = errors
@replica_plans = []
@nodes = nodes
end
def generate
......@@ -93,41 +92,38 @@ def generate
private
def simulate_replica_plan
replica_id = find_replica&.id
replica_indices = []
# Fetch and assign projects to nodes
fetch_unassigned_projects.each_batch(of: batch_size) do |batch|
fetch_projects.each_batch(of: batch_size) do |batch|
batch.each do |project|
stats = project.statistics
assign_to_node(stats) # Updates node[:indices] directly
if replica_indices.size >= max_indices_per_replica
log_error(replica_plans.size, nil, :index_limit_exceeded,
"Replica reached maximum index limit (#{max_indices_per_replica})")
break
end
assign_to_node(stats, replica_indices)
end
end
# Add the replica plan with indices collected from all nodes
add_replica_plan(replica_id)
add_replica_plan(replica_indices)
end
def fetch_unassigned_projects(last_project_id: nil)
scope = namespace.all_projects
.with_statistics
.left_joins(zoekt_repositories: { zoekt_index: :replica })
.joins(:statistics)
.group('projects.id, project_statistics.id, repository_size')
.order('repository_size DESC, projects.id ASC')
scope = scope.having('COUNT(DISTINCT zoekt_replicas.id) < ? OR COUNT(zoekt_repositories.id) = 0',
number_of_replicas)
scope = scope.where('projects.id > ?', last_project_id) if last_project_id
scope
def fetch_projects
namespace.all_projects
.with_statistics
.joins(:statistics)
.group('projects.id, project_statistics.id, repository_size')
.order('repository_size DESC, projects.id ASC')
end
def assign_to_node(stats)
# Find the best node that can fit the project
def assign_to_node(stats, replica_indices)
best_node = find_best_node(stats)
if best_node
assign_project_to_index(best_node, stats)
assign_project_to_index(best_node, stats, replica_indices)
else
log_error(replica_plans.size, nil, :node_unavailable,
"No node can accommodate project #{stats.project_id} with size #{scaled_size(stats)}")
......@@ -135,83 +131,56 @@ def assign_to_node(stats)
end
def find_best_node(stats)
nodes.sort_by { |node| -node[:unclaimed_storage_bytes] }.find do |node|
node[:unclaimed_storage_bytes] >= scaled_size(stats)
end
nodes
.select { |node| node[:unclaimed_storage_bytes] && node[:unclaimed_storage_bytes] >= scaled_size(stats) }
.max_by { |node| node[:unclaimed_storage_bytes] }
end
def assign_project_to_index(node, stats)
def assign_project_to_index(node, stats, replica_indices)
project_size = scaled_size(stats)
# First, try to fit the project into an existing index
index = node[:indices]
index = replica_indices
.select { |idx| idx[:required_storage_bytes] + project_size <= idx[:max_storage_bytes] }
.min_by { |idx| idx[:max_storage_bytes] - (idx[:required_storage_bytes] + project_size) }
# If no index can fit, create a new one
unless index
if node[:indices].size >= max_indices_per_replica
log_error(nil, nil, :index_limit_exceeded, "Max indices per replica reached on node #{node[:id]}")
if replica_indices.size >= max_indices_per_replica
log_error(nil, nil, :index_limit_exceeded, "Max indices per replica reached")
return
end
index = simulate_index(node)
replica_indices << index
node[:indices] << index
end
# Add the project to the chosen index
add_project_to_index(index, stats)
node[:unclaimed_storage_bytes] -= project_size
end
def simulate_index(node)
{
id: nil,
node_id: node[:id],
projects: { project_id_from: nil, project_id_to: nil },
required_storage_bytes: 0,
used_storage_bytes: 0,
reserved_storage_bytes: 0,
max_storage_bytes: node[:unclaimed_storage_bytes]
}
end
def add_project_to_index(index, stats)
# Initialize project_id_from for the first project in the index
if index[:projects][:project_id_from].nil? || index[:projects][:project_id_from] > stats.project_id
index[:projects][:project_id_from] = stats.project_id
end
# Update project_id_to to the latest project
index[:projects][:project_id_from] ||= stats.project_id
index[:projects][:project_id_to] = stats.project_id
# Update the required storage bytes
index[:required_storage_bytes] += scaled_size(stats)
end
def find_replica
enabled_namespace.replicas.order(:created_at).offset(replica_plans.size).first
end
def add_replica_plan(replica_id)
indices = nodes.flat_map { |node| node[:indices] } # Collect all indices from nodes
# Set project_id_to to nil for the last index in the replica
indices.last[:projects][:project_id_to] = nil if indices.any?
def add_replica_plan(replica_indices)
replica_indices.last[:projects][:project_id_to] = nil if replica_indices.any?
replica_plans << { id: replica_id, indices: indices.map { |index| format_index(index) } }
replica_plans << { indices: replica_indices.map { |index| format_index(index) } }
end
def format_index(index)
{
id: index[:id],
node_id: index[:node_id],
projects: index[:projects],
required_storage_bytes: index[:required_storage_bytes],
used_storage_bytes: index[:used_storage_bytes],
reserved_storage_bytes: index[:reserved_storage_bytes],
max_storage_bytes: index[:max_storage_bytes]
}
index.slice(:node_id, :projects, :required_storage_bytes, :max_storage_bytes)
end
def calculate_namespace_storage
......@@ -223,14 +192,8 @@ def scaled_size(stats)
end
def log_error(replica_idx, index_idx, type, details)
errors << {
namespace_id: namespace.id,
replica_idx: replica_idx,
index_idx: index_idx,
type: type,
class: self.class.name,
details: details
}
errors << { namespace_id: namespace.id, replica_idx: replica_idx, index_idx: index_idx, type: type,
details: details }
end
end
end
......
......@@ -9,7 +9,7 @@
let(:namespace2) { create(:namespace) }
let(:enabled_namespace2) { create(:zoekt_enabled_namespace, namespace: namespace2) }
let(:nodes) { create_list(:zoekt_node, 3, total_bytes: 100.gigabytes, used_bytes: 90.gigabytes) }
let(:nodes) { create_list(:zoekt_node, 3, total_bytes: 100.gigabytes, used_bytes: 50.gigabytes) }
let(:projects_namespace1) do
[
......@@ -24,15 +24,6 @@
]
end
let!(:existing_indices) do
[
create(:zoekt_index, node: nodes.first, reserved_storage_bytes: 1.gigabyte,
used_storage_bytes: 500.megabytes, zoekt_enabled_namespace: enabled_namespace1),
create(:zoekt_index, node: nodes.second, reserved_storage_bytes: 2.gigabytes,
used_storage_bytes: 1.gigabyte, zoekt_enabled_namespace: enabled_namespace2)
]
end
let(:max_indices_per_replica) { 5 }
before do
......@@ -41,12 +32,6 @@
allow(relation_stub).to receive_message_chain(:where, :not).and_return(nodes)
allow(Search::Zoekt::Node).to receive(:order_by_unclaimed_space).and_return(relation_stub)
# Preload existing indices into replicas
allow_next_instance_of(Search::Zoekt::PlanningService::Plan) do |instance|
allow(instance).to receive(:preload_existing_indices)
.and_return(existing_indices)
end
# Ensure projects are created before running tests
projects_namespace1
projects_namespace2
......@@ -66,20 +51,16 @@
let(:buffer_factor) { 1.5 }
it 'returns total required storage bytes across all namespaces' do
total_storage = (projects_namespace1 + projects_namespace2).sum do |p|
p.statistics.repository_size
end
buffered_storage = total_storage * buffer_factor
total_storage = (projects_namespace1 + projects_namespace2).sum { |p| p.statistics.repository_size }
buffered_storage = total_storage * buffer_factor * num_replicas
expect(plan[:total_required_storage_bytes]).to eq(buffered_storage * num_replicas)
expect(plan[:total_required_storage_bytes]).to eq(buffered_storage)
end
it 'returns plans for each enabled namespace' do
expect(plan[:namespaces].size).to eq(2)
expect(plan[:namespaces].map do |n|
n[:enabled_namespace_id]
end).to contain_exactly(enabled_namespace1.id, enabled_namespace2.id)
expect(plan[:namespaces].map { |n| n[:enabled_namespace_id] })
.to contain_exactly(enabled_namespace1.id, enabled_namespace2.id)
end
it 'calculates the namespace-specific required storage bytes' do
......@@ -98,17 +79,8 @@
expect(namespace2_projects).not_to be_empty
end
it 'uses existing indices where possible' do
namespace1_indices = plan[:namespaces][0][:replicas].flat_map { |r| r[:indices] }
namespace2_indices = plan[:namespaces][1][:replicas].flat_map { |r| r[:indices] }
expect(namespace1_indices.any? { |index| index[:id] == existing_indices.first.id }).to be true
expect(namespace2_indices.any? { |index| index[:id] == existing_indices.second.id }).to be true
end
context 'when no nodes are available' do
let(:nodes) { [] }
let(:existing_indices) { [] }
it 'logs an error for unavailable nodes for each namespace' do
plan[:namespaces].each do |namespace_plan|
......@@ -117,12 +89,11 @@
end
end
fcontext 'when max indices per replica is reached' do
context 'when max indices per replica is reached' do
let(:nodes) { create_list(:zoekt_node, 5, total_bytes: 10.gigabytes, used_bytes: 3.gigabytes) }
let(:max_indices_per_replica) { 1 }
it 'logs an error for exceeding max indices for each namespace' do
binding.pry
plan[:namespaces].each do |namespace_plan|
expect(namespace_plan[:errors]).to include(a_hash_including(type: :index_limit_exceeded))
end
......@@ -156,13 +127,16 @@
p.statistics.repository_size
end
buffered_storage = total_storage * buffer_factor
buffered_storage = total_storage * buffer_factor * num_replicas
expect(result[:total_required_storage_bytes]).to eq(buffered_storage * num_replicas)
expect(result[:total_required_storage_bytes]).to eq(buffered_storage)
# Ensure all projects are assigned
assigned_projects = result[:namespaces][0][:replicas].flat_map { |r| r[:indices].flat_map { |i| i[:projects] } }
project_ids = assigned_projects.flat_map { |p| [p[:project_id_from], p[:project_id_to]] }.uniq
lower, upper = assigned_projects.flat_map { |p| [p[:project_id_from], p[:project_id_to]] }.uniq
id_range = upper.blank? ? lower.. : lower..upper
project_ids = namespace1.projects.id_in(id_range).pluck(:id)
expect(project_ids).to match_array(namespace1.projects.pluck(:id))
end
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment