Skip to content
Snippets Groups Projects

Add cron worker to automatically rollout zoekt exact code search to paid namespaces

Merged John Mason requested to merge jm-namespace-assignment-service into master
1 unresolved thread
Compare and
4 files
+ 614
0
Compare changes
  • Side-by-side
  • Inline
Files
4
# frozen_string_literal: true
module Search
module Zoekt
class PlanningService
PRE_READY_LIMIT = 1
attr_reader :enabled_namespaces, :nodes, :options
def self.plan(**kwargs)
new(**kwargs).plan
end
def initialize(
enabled_namespaces:, nodes: nil, **options)
@enabled_namespaces = Array(enabled_namespaces)
@nodes = nodes || load_nodes_with_unclaimed_storage
@options = options
end
def plan
namespace_plans = enabled_namespaces.map do |enabled_namespace|
Plan.new(
enabled_namespace: enabled_namespace,
nodes: nodes,
**options
).generate
end
{
total_required_storage_bytes: namespace_plans.sum { |plan| plan[:namespace_required_storage_bytes] },
namespaces: namespace_plans
}
end
private
def load_nodes_with_unclaimed_storage
::Search::Zoekt::Node
.with_positive_unclaimed_storage_bytes
.select { |node| Search::Zoekt::Index.for_node(node).pre_ready.count <= PRE_READY_LIMIT }
.map do |node|
{
id: node.id,
unclaimed_storage_bytes: node.unclaimed_storage_bytes,
indices: [],
node: node
}
end
end
class Plan
attr_reader :enabled_namespace, :namespace, :num_replicas, :buffer_factor, :batch_size,
:max_indices_per_replica, :errors, :replica_plans, :nodes
def initialize(
enabled_namespace:, num_replicas:, nodes:,
buffer_factor: 3, batch_size: 500, max_indices_per_replica: 5)
@enabled_namespace = enabled_namespace
@namespace = enabled_namespace.namespace
@num_replicas = num_replicas
@buffer_factor = buffer_factor
@batch_size = batch_size
@max_indices_per_replica = max_indices_per_replica
@errors = []
@replica_plans = []
@nodes = nodes
@used_node_ids = Set.new
end
def generate
num_replicas.times { simulate_replica_plan }
{
namespace_id: namespace.id,
enabled_namespace_id: enabled_namespace.id,
namespace_required_storage_bytes: calculate_namespace_storage,
replicas: replica_plans,
errors: errors
}
end
private
def simulate_replica_plan
replica_indices = []
fetch_projects.each_batch(of: batch_size) do |batch|
batch.each do |project|
stats = project.statistics
if replica_indices.size >= max_indices_per_replica
log_error(replica_plans.size, :index_limit_exceeded,
"Replica reached maximum index limit (#{max_indices_per_replica})")
break
end
assign_to_node(stats, replica_indices)
end
end
add_replica_plan(replica_indices)
end
def fetch_projects
namespace.all_projects
.with_statistics
.joins(:statistics)
.group('projects.id, project_statistics.id, repository_size')
.order('repository_size DESC, projects.id ASC')
end
def assign_to_node(stats, replica_indices)
best_node = find_best_node(stats)
if best_node
assign_project_to_index(best_node, stats, replica_indices)
else
log_error(replica_plans.size, :node_unavailable,
"No node can accommodate project #{stats.project_id} with size #{scaled_size(stats)}")
end
end
def find_best_node(stats)
nodes
.reject { |node| @used_node_ids.include?(node[:id]) }
.select { |node| node[:unclaimed_storage_bytes] && node[:unclaimed_storage_bytes] >= scaled_size(stats) }
.max_by { |node| node[:unclaimed_storage_bytes] }
end
def assign_project_to_index(node, stats, replica_indices)
project_size = scaled_size(stats)
index = replica_indices
.select { |idx| idx[:required_storage_bytes] + project_size <= idx[:max_storage_bytes] }
.min_by { |idx| idx[:max_storage_bytes] - (idx[:required_storage_bytes] + project_size) }
unless index
if replica_indices.size >= max_indices_per_replica
log_error(nil, :index_limit_exceeded, "Max indices per replica reached")
return
end
index = simulate_index(node)
replica_indices << index
node[:indices] << index
@used_node_ids.add(node[:id])
end
add_project_to_index(index, stats)
node[:unclaimed_storage_bytes] -= project_size
end
def simulate_index(node)
{
node_id: node[:id],
projects: { project_id_from: nil, project_id_to: nil },
required_storage_bytes: 0,
max_storage_bytes: node[:unclaimed_storage_bytes]
}
end
def add_project_to_index(index, stats)
index[:projects][:project_id_from] ||= stats.project_id
index[:projects][:project_id_to] = stats.project_id
index[:required_storage_bytes] += scaled_size(stats)
end
def add_replica_plan(replica_indices)
replica_indices.last[:projects][:project_id_to] = nil if replica_indices.any?
replica_plans << { indices: replica_indices.map { |index| format_index(index) } }
end
def format_index(index)
index.slice(:node_id, :projects, :required_storage_bytes, :max_storage_bytes)
end
def calculate_namespace_storage
replica_plans.sum { |replica| replica[:indices].sum { |index| index[:required_storage_bytes] } }
end
def scaled_size(stats)
stats.repository_size * buffer_factor
end
def log_error(replica_idx, type, details)
@errors << { namespace_id: namespace.id, replica_idx: replica_idx, type: type, details: details }
end
end
end
end
end
Loading