Skip to content
Snippets Groups Projects

Add cron worker to automatically rollout zoekt exact code search to paid namespaces

Merged John Mason requested to merge jm-namespace-assignment-service into master
1 unresolved thread
Compare and Show latest version
3 files
+ 228
100
Compare changes
  • Side-by-side
  • Inline
Files
3
@@ -20,13 +20,17 @@ def initialize(enabled_namespaces:, num_replicas:, buffer_factor: 3, batch_size:
end
def plan
# Load nodes with unclaimed storage once, to be shared across all namespaces
nodes = load_nodes_with_unclaimed_storage
namespace_plans = enabled_namespaces.map do |enabled_namespace|
Plan.new(
enabled_namespace: enabled_namespace,
number_of_replicas: number_of_replicas,
buffer_factor: buffer_factor,
batch_size: batch_size,
errors: errors
errors: errors,
nodes: nodes
).generate
end
@@ -36,11 +40,30 @@ def plan
}
end
private
def load_nodes_with_unclaimed_storage
::Search::Zoekt::Node
.includes(:indices)
.with_unclaimed_storage_bytes
.having('zoekt_nodes.total_bytes - zoekt_nodes.used_bytes + zoekt_nodes.indexed_bytes - COALESCE(SUM(zoekt_indices.reserved_storage_bytes), 0) > 0')
.group(:id)
.order('unclaimed_storage_bytes DESC')
.map do |node|
{
id: node.id,
unclaimed_storage_bytes: node.total_bytes - node.used_bytes + node.indexed_bytes - node.indices.sum(:reserved_storage_bytes),
indices: [],
node: node
}
end
end
class Plan
attr_reader :enabled_namespace, :namespace, :number_of_replicas, :buffer_factor, :batch_size, :errors,
:replica_plans, :existing_indices, :used_indices
:replica_plans, :nodes
def initialize(enabled_namespace:, number_of_replicas:, buffer_factor:, batch_size:, errors:)
def initialize(enabled_namespace:, number_of_replicas:, buffer_factor:, batch_size:, errors:, nodes:)
@enabled_namespace = enabled_namespace
@namespace = enabled_namespace.namespace
@number_of_replicas = number_of_replicas
@@ -48,8 +71,7 @@ def initialize(enabled_namespace:, number_of_replicas:, buffer_factor:, batch_si
@errors = errors
@replica_plans = []
@batch_size = batch_size
@existing_indices = load_existing_indices
@used_indices = Set.new # Track used indices for the namespace
@nodes = nodes # Shared nodes across namespaces
end
def generate
@@ -66,144 +88,107 @@ def generate
private
def load_existing_indices
enabled_namespace.replicas.includes(:indices).flat_map(&:indices)
rescue StandardError => e
log_error(nil, nil, :preload_failed, "Failed to preload existing indices: #{e.message}")
[]
end
def simulate_replica_plan
replica_id = find_replica&.id
indices = plan_indices
add_replica_plan(replica_id, indices)
plan_indices
add_replica_plan(replica_id)
rescue StandardError => e
log_error(replica_plans.size, nil, :replica_failed, e.message)
end
def plan_indices
indices = []
fetch_unassigned_projects(enabled_namespace: enabled_namespace,
num_replicas: number_of_replicas).each_batch(of: batch_size) do |batch|
fetch_unassigned_projects.each_batch(of: batch_size) do |batch|
batch.each do |project|
stats = project.statistics
assign_to_index(indices, stats)
assign_to_node(stats)
rescue StandardError => e
log_error(replica_plans.size, indices.size, :assignment_failed, e.message)
log_error(replica_plans.size, nil, :assignment_failed, e.message)
end
end
indices
end
def fetch_unassigned_projects(enabled_namespace:, num_replicas:, last_project_id: nil)
scope = enabled_namespace.namespace.all_projects
def fetch_unassigned_projects(last_project_id: nil)
scope = namespace.all_projects
.with_statistics
.left_joins(zoekt_repositories: { zoekt_index: :replica })
.joins(:statistics)
.group('projects.id, project_statistics.id, repository_size')
.order('repository_size DESC, projects.id ASC') # Prioritize by size, then ID
.order('repository_size DESC, projects.id ASC')
# Filter projects with no repositories or fewer than required replicas
scope = scope.having('COUNT(DISTINCT zoekt_replicas.id) < ? OR COUNT(zoekt_repositories.id) = 0',
num_replicas)
number_of_replicas)
scope = scope.where('projects.id > ?', last_project_id) if last_project_id
scope
end
def assign_to_index(indices, stats)
index = find_or_create_index(indices, stats)
if index
add_project_to_index(index, stats)
indices << index unless indices.include?(index)
def assign_to_node(stats)
# Find the best node that can fit the project
best_node = find_best_node(stats)
if best_node
assign_project_to_index(best_node, stats)
else
log_error(replica_plans.size, indices.size, :assignment_failed,
"Could not assign project #{stats.project_id}")
log_error(replica_plans.size, nil, :node_unavailable,
"No node can accommodate project #{stats.project_id} with size #{scaled_size(stats)}")
end
end
def find_or_create_index(indices, stats)
existing_index = find_existing_index(stats)
if existing_index
formatted_index = format_existing_index(existing_index)
used_indices << existing_index.id # Mark index as used
return formatted_index
def find_best_node(stats)
nodes.sort_by { |node| -node[:unclaimed_storage_bytes] }.find do |node|
node[:unclaimed_storage_bytes] >= scaled_size(stats)
end
create_new_index(indices)
end
def find_existing_index(stats)
existing_indices.find do |index|
next if used_indices.include?(index.id) # Skip indices already assigned to another replica
index.reserved_storage_bytes + scaled_size(stats) <= index.node.unclaimed_storage_bytes
def assign_project_to_index(node, stats)
index = node[:indices].find do |idx|
idx[:required_storage_bytes] + scaled_size(stats) <= idx[:max_storage_bytes]
end
end
def create_new_index(indices)
if indices.size >= MAX_INDICES_PER_REPLICA
log_error(replica_plans.size, indices.size, :index_limit_exceeded, "Max indices reached")
return
unless index
index = simulate_index(node)
node[:indices] << index
end
node = find_best_node
if node.nil?
log_error(replica_plans.size, indices.size, :node_unavailable, "No suitable nodes available")
return
end
indices << simulate_index(node)
indices.last
add_project_to_index(index, stats)
node[:unclaimed_storage_bytes] -= scaled_size(stats)
end
def simulate_index(node)
{
id: nil,
node_id: node.id,
node_id: node[:id],
projects: { project_id_from: nil, project_id_to: nil },
required_storage_bytes: 0,
used_storage_bytes: 0,
reserved_storage_bytes: 0,
max_storage_bytes: node.unclaimed_storage_bytes
}
end
def format_existing_index(index)
{
id: index.id,
node_id: index.zoekt_node_id,
projects: { project_id_from: nil, project_id_to: nil },
required_storage_bytes: 0,
used_storage_bytes: index.used_storage_bytes,
reserved_storage_bytes: index.reserved_storage_bytes,
max_storage_bytes: index.node.unclaimed_storage_bytes
max_storage_bytes: node[:unclaimed_storage_bytes]
}
end
def add_project_to_index(index, stats)
index[:projects][:project_id_from] ||= stats.project_id
index[:projects][:project_id_to] = stats.project_id
index[:required_storage_bytes] += scaled_size(stats)
end
# Initialize project_id_from for the first project in the index
if index[:projects][:project_id_from].nil? || index[:projects][:project_id_from] > stats.project_id
index[:projects][:project_id_from] = stats.project_id
end
def scaled_size(stats)
stats.repository_size * buffer_factor
end
# Update project_id_to to the latest project
index[:projects][:project_id_to] = stats.project_id
def find_best_node
used_node_ids = replica_plans.flat_map { |replica| replica[:indices].map { |index| index[:node_id] } }
Node.order_by_unclaimed_space.where.not(id: used_node_ids).first
# Update the required storage bytes
index[:required_storage_bytes] += scaled_size(stats)
end
def find_replica
enabled_namespace.replicas.order(:created_at).offset(replica_plans.size).first
end
def add_replica_plan(replica_id, indices)
def add_replica_plan(replica_id)
indices = nodes.flat_map { |node| node[:indices] } # Collect all indices from nodes
# Set project_id_to to nil for the last index in the replica
indices.last[:projects][:project_id_to] = nil if indices.any?
replica_plans << { id: replica_id, indices: indices.map { |index| format_index(index) } }
end
@@ -223,6 +208,10 @@ def calculate_namespace_storage
replica_plans.sum { |replica| replica[:indices].sum { |index| index[:required_storage_bytes] } }
end
def scaled_size(stats)
stats.repository_size * buffer_factor
end
def log_error(replica_idx, index_idx, type, details)
errors << {
namespace_id: namespace.id,
Loading