Skip to content
Snippets Groups Projects

Add cron worker to automatically rollout zoekt exact code search to paid namespaces

Merged John Mason requested to merge jm-namespace-assignment-service into master
1 unresolved thread
Compare and
4 files
+ 601
0
Compare changes
  • Side-by-side
  • Inline
Files
4
# frozen_string_literal: true
module Search
module Zoekt
class PlanningService
attr_reader :enabled_namespaces, :nodes, :options
def self.plan(**kwargs)
new(**kwargs).plan
end
def initialize(
enabled_namespaces:, pre_ready_limit: 1, nodes: nil, **options)
@enabled_namespaces = Array(enabled_namespaces)
@nodes = nodes || load_nodes_with_unclaimed_storage(pre_ready_limit: pre_ready_limit)
@options = options
end
def plan
all_plans = enabled_namespaces.map do |enabled_namespace|
Plan.new(
enabled_namespace: enabled_namespace,
nodes: nodes,
**options
).generate
end
namespace_plans = []
failed_plans = []
all_plans.each do |p|
if p[:errors].present?
failed_plans << p
else
namespace_plans << p
end
end
successful_plans = all_plans.filter { |p| !p[:errors].present? }
{
total_required_storage_bytes: namespace_plans.sum { |plan| plan[:namespace_required_storage_bytes] },
namespaces: successful_plans,
failures: failed_plans
}
end
private
def load_nodes_with_unclaimed_storage(pre_ready_limit:)
::Search::Zoekt::Node
.with_positive_unclaimed_storage_bytes
.select { |node| Search::Zoekt::Index.for_node(node).pre_ready.count <= pre_ready_limit }
.map do |node|
{
id: node.id,
unclaimed_storage_bytes: node.unclaimed_storage_bytes,
indices: [],
node: node
}
end
end
class Plan
attr_reader :enabled_namespace, :namespace, :num_replicas, :buffer_factor, :batch_size,
:max_indices_per_replica, :errors, :replica_plans, :nodes
def initialize(
enabled_namespace:, num_replicas:, nodes:,
buffer_factor: 3, batch_size: 500, max_indices_per_replica: 5)
@enabled_namespace = enabled_namespace
@namespace = enabled_namespace.namespace
@num_replicas = num_replicas
@buffer_factor = buffer_factor
@batch_size = batch_size
@max_indices_per_replica = max_indices_per_replica
@errors = []
@replica_plans = []
@nodes = nodes
@used_node_ids = Set.new
end
def generate
if fetch_projects.exists?
num_replicas.times { simulate_replica_plan }
else
create_empty_replica
end
{
namespace_id: namespace.id,
enabled_namespace_id: enabled_namespace.id,
namespace_required_storage_bytes: calculate_namespace_storage,
replicas: replica_plans,
errors: errors.uniq
}
end
private
def simulate_replica_plan
replica_indices = []
fetch_projects.each_batch(of: batch_size) do |batch|
batch.each do |project|
stats = project.statistics
if replica_indices.size >= max_indices_per_replica
log_error(replica_plans.size, :index_limit_exceeded,
"Replica reached maximum index limit (#{max_indices_per_replica})")
break
end
assign_to_node(stats, replica_indices)
end
end
add_replica_plan(replica_indices)
end
def create_empty_replica
candidate_nodes = nodes.reject { |node| @used_node_ids.include?(node[:id]) }
best_node = candidate_nodes.max_by { |node| node[:unclaimed_storage_bytes] }
if best_node
replica_indices = [simulate_index(best_node)]
add_replica_plan(replica_indices)
@used_node_ids.add(best_node[:id])
else
log_error(nil, :node_unavailable, "No nodes available to create an empty replica")
end
end
def fetch_projects
namespace.all_projects.with_statistics
end
def assign_to_node(stats, replica_indices)
best_node = find_best_node(stats)
if best_node
assign_project_to_index(best_node, stats, replica_indices)
else
log_error(replica_plans.size, :node_unavailable,
"No node can accommodate project #{stats.project_id} with size #{scaled_size(stats)}")
end
end
def find_best_node(stats)
nodes
.reject { |node| @used_node_ids.include?(node[:id]) }
.select { |node| node[:unclaimed_storage_bytes] && node[:unclaimed_storage_bytes] >= scaled_size(stats) }
.max_by { |node| node[:unclaimed_storage_bytes] }
end
def assign_project_to_index(node, stats, replica_indices)
project_size = scaled_size(stats)
index = replica_indices
.select { |idx| idx[:required_storage_bytes] + project_size <= idx[:max_storage_bytes] }
.min_by { |idx| idx[:max_storage_bytes] - (idx[:required_storage_bytes] + project_size) }
unless index
if replica_indices.size >= max_indices_per_replica
log_error(nil, :index_limit_exceeded, "Max indices per replica reached")
return
end
index = simulate_index(node)
replica_indices << index
node[:indices] << index
@used_node_ids.add(node[:id])
end
add_project_to_index(index, stats)
node[:unclaimed_storage_bytes] -= project_size
end
def simulate_index(node)
{
node_id: node[:id],
projects: { project_id_from: nil, project_id_to: nil },
required_storage_bytes: 0,
max_storage_bytes: node[:unclaimed_storage_bytes]
}
end
def add_project_to_index(index, stats)
index[:projects][:project_id_from] ||= stats.project_id
index[:projects][:project_id_to] = stats.project_id
index[:required_storage_bytes] += scaled_size(stats)
end
def add_replica_plan(replica_indices)
replica_indices.last[:projects][:project_id_to] = nil if replica_indices.any?
replica_plans << { indices: replica_indices.map { |index| format_index(index) } }
end
def format_index(index)
index.slice(:node_id, :projects, :required_storage_bytes, :max_storage_bytes)
end
def calculate_namespace_storage
replica_plans.sum { |replica| replica[:indices].sum { |index| index[:required_storage_bytes] } }
end
def scaled_size(stats)
stats.repository_size * buffer_factor
end
def log_error(replica_idx, type, details)
@errors << { namespace_id: namespace.id, replica_idx: replica_idx, type: type, details: details }
end
end
end
end
end
Loading