Skip to content
Snippets Groups Projects
Commit cb4270d6 authored by John Mason's avatar John Mason
Browse files

WIP commit

parent 3d22b7bc
No related branches found
No related tags found
No related merge requests found
This commit is part of merge request !175666. Comments created here will be created in the context of that merge request.
......@@ -55,13 +55,6 @@ def self.destroy_namespaces_with_expired_subscriptions!
end
end
def fetch_unassigned_projects(batch_size)
scope = namespace.all_projects.with_statistics
last_id = metadata[:last_processed_project_id]
scope = scope.where("projects.id > ?", last_id) if last_id
scope.order(repository_size: :desc).limit(batch_size)
end
def self.update_last_used_storage_bytes!
find_each(&:update_last_used_storage_bytes!)
end
......@@ -76,6 +69,16 @@ def update_last_used_storage_bytes!
update_column(:metadata, metadata.merge(last_used_storage_bytes: size))
end
def fetch_unassigned_projects(batch_size)
# Fetch all projects in the namespace that are not associated with any zoekt index
namespace.all_projects
.with_statistics
.left_joins(:zoekt_repositories) # Join with zoekt_repository, allowing NULL matches
.where(zoekt_repositories: { id: nil }) # Filter for projects without a zoekt repository
.joins(:statistics).order(repository_size: :desc) # Prioritize by repository size
.limit(batch_size) # Limit the results to the batch size
end
private
def only_root_namespaces_can_be_indexed
......
......@@ -7,7 +7,7 @@ class NamespaceAssignmentServiceV2
PRE_READY_LIMIT = 1
def self.execute(namespace_id:, batch_size:)
namespace = ZoektEnabledNamespace.find(namespace_id)
namespace = EnabledNamespace.find(namespace_id)
new(namespace, batch_size).execute
end
......@@ -15,12 +15,13 @@ def initialize(zoekt_enabled_namespace, batch_size)
@zoekt_enabled_namespace = zoekt_enabled_namespace
@batch_size = batch_size
@logger = logger
reset_index_tracking!
@indices = []
reset_tracking!
initialize_metadata_offset!
end
def execute
projects = zoekt_enabled_namespace.fetch_projects
projects = zoekt_enabled_namespace.fetch_unassigned_projects(batch_size)
if projects.empty?
logger.info("All projects for namespace #{zoekt_enabled_namespace.id} are already assigned.")
mark_namespace_as_complete
......@@ -29,17 +30,19 @@ def execute
projects.each do |project|
process_project(project)
break if early_return?
break if stop_processing?
end
update_metadata_offset
finalize_indices
end
attr_accessor :indices
private
attr_reader :zoekt_enabled_namespace, :batch_size
attr_accessor :set_new_index, :index_bytes, :indices, :previous_project_id, :early_return_flag
attr_reader :zoekt_enabled_namespace, :batch_size, :logger
attr_accessor :set_new_index, :index_bytes, :previous_project_id, :stop_processing_flag
def initialize_metadata_offset!
zoekt_enabled_namespace.metadata ||= {}
......@@ -50,37 +53,37 @@ def process_project(project)
project_stats = project.statistics
if project_stats.nil?
mark_early_return!
stop_processing!
return
end
initialize_index_if_needed(project)
initialize_index_if_needed
current_index = indices.last
index = indices.last
required_bytes = calculate_required_bytes(index_bytes, project_stats)
if index_has_space?(index, required_bytes)
update_index_state(index, required_bytes)
if index_has_space?(current_index, required_bytes)
update_index_state(current_index, required_bytes)
else
handle_insufficient_space(project, index)
handle_insufficient_space(project, current_index)
end
end
def initialize_index_if_needed(project)
def initialize_index_if_needed
return unless set_new_index
if indices.size >= MAX_INDICES_PER_REPLICA
mark_early_return!
stop_processing!
return
end
node = find_suitable_node
if node.nil? || node_pre_ready_limit_reached?(node)
mark_early_return!
stop_processing!
return
end
initialize_new_index(node, project)
initialize_new_index(node)
end
def find_suitable_node
......@@ -91,35 +94,36 @@ def node_pre_ready_limit_reached?(node)
Search::Zoekt::Index.for_node(node).pre_ready.count > PRE_READY_LIMIT
end
def initialize_new_index(node, project)
index = Index.new(reserved_storage_bytes: node.unclaimed_storage_bytes * Node::WATERMARK_LIMIT_LOW, node: node)
index.metadata[:project_id_from] = project.id
def initialize_new_index(node)
index = Index.new(
reserved_storage_bytes: node.unclaimed_storage_bytes * Node::WATERMARK_LIMIT_LOW,
node: node
)
index.metadata[:project_id_from] = previous_project_id
indices << index
reset_index_tracking!(index)
reset_tracking!(index)
end
def update_index_state(index, required_bytes)
index.state = required_bytes == 0 ? :ready : :pending
indices[-1] = index if indices.last.zoekt_node_id == index.zoekt_node_id
self.set_new_index = false
self.index_bytes = required_bytes
end
def handle_insufficient_space(project, index)
def handle_insufficient_space(project, current_index)
if set_new_index
mark_early_return!
stop_processing!
return
end
finalize_current_index(index)
reset_index_tracking!
initialize_index_if_needed(project)
finalize_current_index(current_index)
reset_tracking!
initialize_index_if_needed
end
def finalize_current_index(index)
index.metadata[:project_id_to] = previous_project_id
indices[-1] = index unless indices.empty?
end
def calculate_required_bytes(current_bytes, project_stats)
......@@ -131,10 +135,10 @@ def index_has_space?(index, required_bytes)
end
def finalize_indices
indices.each do |zoekt_index|
zoekt_index.replica = Replica.for_enabled_namespace!(zoekt_enabled_namespace)
zoekt_index.zoekt_enabled_namespace = zoekt_enabled_namespace
zoekt_index.namespace_id = zoekt_enabled_namespace.root_namespace_id
indices.each do |index|
index.replica = Replica.for_enabled_namespace!(zoekt_enabled_namespace)
index.zoekt_enabled_namespace = zoekt_enabled_namespace
index.namespace_id = zoekt_enabled_namespace.root_namespace_id
end
end
......@@ -150,20 +154,19 @@ def mark_namespace_as_complete
logger.info("Namespace #{zoekt_enabled_namespace.id} has been fully assigned.")
end
def reset_index_tracking!(index = nil)
def reset_tracking!(index = nil)
self.set_new_index = true
self.index_bytes = 0
self.previous_project_id = index&.metadata&.[](:project_id_from)
self.early_return_flag = false
self.stop_processing_flag = false
end
def mark_early_return!
self.early_return_flag = true
self.indices = []
def stop_processing!
self.stop_processing_flag = true
end
def early_return?
early_return_flag
def stop_processing?
stop_processing_flag
end
def logger
......
......@@ -24,11 +24,13 @@
before do
zoekt_enabled_namespace.metadata[:last_processed_project_id] = project3.id
zoekt_enabled_namespace.metadata[:complete] = true
zoekt_enabled_namespace.save!
end
it 'logs and exits without further action' do
expect(service).not_to receive(:process_project)
expect { execute_service }.to change { zoekt_enabled_namespace.metadata[:complete] }.from(true).to(true)
expect { execute_service }.not_to change { zoekt_enabled_namespace.metadata[:complete] }.from(true)
expect(zoekt_enabled_namespace.metadata[:last_processed_project_id]).to eq(project3.id)
end
end
......@@ -59,13 +61,14 @@
it 'creates a new index for the project' do
expect { execute_service }.not_to raise_error
expect(service.indices.size).to eq(2)
expect(service.indices.size).to eq(1)
end
end
context 'when the service encounters an error' do
before do
allow(service).to receive(:fetch_projects).and_raise(StandardError, 'Simulated error')
allow(zoekt_enabled_namespace).to receive(:fetch_unassigned_projects).and_raise(StandardError,
'Simulated error')
end
it 'does not update metadata and allows retry' do
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment