Skip to content
Snippets Groups Projects
Verified Commit 0826899d authored by Dmitry Gruzd's avatar Dmitry Gruzd :red_circle: Committed by GitLab
Browse files

Zoekt: Fix negative unclaimed storage bytes

Changelog: fixed
EE: true
parent 5aac74bc
No related branches found
No related tags found
2 merge requests!181325Fix ambiguous `created_at` in project.rb,!175561Zoekt: Fix negative unclaimed storage bytes
Showing with 248 additions and 4 deletions
......@@ -789,6 +789,8 @@
- 1
- - search_zoekt_namespace_initial_indexing
- 1
- - search_zoekt_node_with_negative_unclaimed_storage_event
- 1
- - search_zoekt_orphaned_index_event
- 1
- - search_zoekt_orphaned_repo_event
......
# frozen_string_literal: true
module Search
module Zoekt
class NodeWithNegativeUnclaimedStorageEvent < ::Gitlab::EventStore::Event
def schema
{
'type' => 'object',
'properties' => {
'node_ids' => { 'type' => 'array', 'items' => { 'type' => 'integer' } }
},
'required' => %w[node_ids]
}
end
end
end
end
......@@ -4,6 +4,7 @@ module Search
module Zoekt
class Node < ApplicationRecord
self.table_name = 'zoekt_nodes'
include EachBatch
DEFAULT_CONCURRENCY_LIMIT = 20
MAX_CONCURRENCY_LIMIT = 200
......@@ -16,6 +17,10 @@ class Node < ApplicationRecord
TASK_PULL_FREQUENCY_INCREASED = '500ms'
DEBOUNCE_DELAY = 5.seconds
UNCLAIMED_STORAGE_BYTES_FORMULA = <<~SQL
(zoekt_nodes.total_bytes - zoekt_nodes.used_bytes + zoekt_nodes.indexed_bytes - COALESCE(sum(zoekt_indices.reserved_storage_bytes), 0))
SQL
has_many :indices,
foreign_key: :zoekt_node_id, inverse_of: :node, class_name: '::Search::Zoekt::Index'
has_many :enabled_namespaces,
......@@ -44,14 +49,17 @@ class Node < ApplicationRecord
.merge(Repository.searchable)
.where(zoekt_repositories: { project: project })
end
scope :with_unclaimed_storage_bytes, -> do
scope :with_positive_unclaimed_storage_bytes, -> do
sql = <<~SQL
zoekt_nodes.*, (zoekt_nodes.total_bytes - zoekt_nodes.used_bytes + zoekt_nodes.indexed_bytes - COALESCE(sum(zoekt_indices.reserved_storage_bytes), 0)) AS unclaimed_storage_bytes
zoekt_nodes.*, #{UNCLAIMED_STORAGE_BYTES_FORMULA} AS unclaimed_storage_bytes
SQL
left_joins(:indices).group(:id).select(sql)
left_joins(:indices).group(:id).having("#{UNCLAIMED_STORAGE_BYTES_FORMULA} >= 0").select(sql)
end
scope :order_by_unclaimed_space, -> do
with_unclaimed_storage_bytes.order('unclaimed_storage_bytes')
with_positive_unclaimed_storage_bytes.order('unclaimed_storage_bytes')
end
scope :negative_unclaimed_storage_bytes, -> do
left_joins(:indices).group(:id).having("#{UNCLAIMED_STORAGE_BYTES_FORMULA} < 0")
end
def self.find_or_initialize_by_task_request(params)
......
......@@ -18,6 +18,7 @@ class SchedulingService
lost_nodes_check
mark_indices_as_ready
node_assignment
node_with_negative_unclaimed_storage_bytes_check
remove_expired_subscriptions
repo_should_be_marked_as_orphaned_check
repo_to_delete_check
......@@ -273,6 +274,16 @@ def node_assignment
end
# rubocop: enable Metrics/AbcSize
def node_with_negative_unclaimed_storage_bytes_check
execute_every 1.hour, cache_key: :node_with_negative_unclaimed_storage_bytes_check do
Search::Zoekt::Node.negative_unclaimed_storage_bytes.each_batch do |batch|
Gitlab::EventStore.publish(
Search::Zoekt::NodeWithNegativeUnclaimedStorageEvent.new(data: { node_ids: batch.pluck_primary_key })
)
end
end
end
# indices that don't have zoekt_repositories are already in `ready` state
def mark_indices_as_ready
execute_every 10.minutes, cache_key: :mark_indices_as_ready do
......
......@@ -2397,6 +2397,15 @@
:weight: 1
:idempotent: true
:tags: []
- :name: search_zoekt_node_with_negative_unclaimed_storage_event
:worker_name: Search::Zoekt::NodeWithNegativeUnclaimedStorageEventWorker
:feature_category: :global_search
:has_external_dependencies: false
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: search_zoekt_orphaned_index_event
:worker_name: Search::Zoekt::OrphanedIndexEventWorker
:feature_category: :global_search
......
# frozen_string_literal: true
module Search
module Zoekt
class NodeWithNegativeUnclaimedStorageEventWorker
include Gitlab::EventStore::Subscriber
include Search::Worker
prepend ::Geo::SkipSecondary
deduplicate :until_executed
idempotent!
BATCH_SIZE = 1_000
def handle_event(event)
node_ids = event.data[:node_ids]
return if node_ids.empty?
Search::Zoekt::Node.negative_unclaimed_storage_bytes.id_in(node_ids).find_each do |node|
unclaimed_storage_bytes = node.unclaimed_storage_bytes
next unless unclaimed_storage_bytes < 0
logger.info(build_structured_payload(
message: 'Processing node with negative unclaimed storage bytes',
'zoekt.node_id': node.id,
'zoekt.unclaimed_storage_bytes': unclaimed_storage_bytes))
total_reserved_bytes = 0
index_ids_to_evict = []
node.indices.find_each do |index|
total_reserved_bytes += index.reserved_storage_bytes if index.reserved_storage_bytes > 0
index_ids_to_evict << index.id
break if total_reserved_bytes >= unclaimed_storage_bytes.abs
end
index_ids_to_evict.each_slice(BATCH_SIZE) do |index_ids|
Gitlab::EventStore.publish(
Search::Zoekt::IndexToEvictEvent.new(data: { index_ids: index_ids })
)
end
end
end
private
def logger
@logger ||= ::Search::Zoekt::Logger.build
end
end
end
end
......@@ -209,6 +209,9 @@ def subscribe_to_zoekt_events(store)
store.subscribe ::Search::Zoekt::AdjustIndicesReservedStorageBytesEventWorker,
to: ::Search::Zoekt::AdjustIndicesReservedStorageBytesEvent
store.subscribe ::Search::Zoekt::NodeWithNegativeUnclaimedStorageEventWorker,
to: ::Search::Zoekt::NodeWithNegativeUnclaimedStorageEvent
end
def subscribe_to_members_added_event(store)
......
......@@ -61,6 +61,7 @@
Search::Zoekt::LostNodeEvent,
Search::Zoekt::IndexWatermarkChangedEvent,
Search::Zoekt::AdjustIndicesReservedStorageBytesEvent,
Search::Zoekt::NodeWithNegativeUnclaimedStorageEvent,
Security::PolicyCreatedEvent,
Security::PolicyUpdatedEvent,
Security::PolicyDeletedEvent,
......
......@@ -108,6 +108,93 @@
end
end
end
describe '.negative_unclaimed_storage_bytes' do
let_it_be(:negative_node) { create(:zoekt_node, :enough_free_space) }
let_it_be(:_negative_index) do
create(:zoekt_index, reserved_storage_bytes: negative_node.total_bytes * 2, node: negative_node)
end
let_it_be(:positive_node) { create(:zoekt_node, :enough_free_space) }
let_it_be(:_positive_index) { create(:zoekt_index, node: positive_node) }
it 'includes only nodes with negative unclaimed storage' do
expect(described_class.negative_unclaimed_storage_bytes).to contain_exactly(node, negative_node)
end
it 'does not include nodes with positive unclaimed storage' do
expect(described_class.negative_unclaimed_storage_bytes).not_to include(positive_node)
end
end
describe '.with_positive_unclaimed_storage_bytes' do
let_it_be(:node_with_positive_storage) { create(:zoekt_node, :enough_free_space) }
let_it_be(:node_with_zero_storage) { create(:zoekt_node, total_bytes: 1000, used_bytes: 1000) }
let_it_be(:node_with_negative_storage) { create(:zoekt_node, :enough_free_space) }
before do
# Scenario with positive unclaimed storage
create(:zoekt_index,
node: node_with_positive_storage,
reserved_storage_bytes: node_with_positive_storage.total_bytes / 2
)
# Scenario with negative unclaimed storage
create(:zoekt_index,
node: node_with_negative_storage,
reserved_storage_bytes: node_with_negative_storage.total_bytes * 2
)
end
it 'returns only nodes with non-negative unclaimed storage bytes' do
positive_nodes = described_class.with_positive_unclaimed_storage_bytes
expect(positive_nodes).to include(node_with_positive_storage)
expect(positive_nodes).to include(node_with_zero_storage)
expect(positive_nodes).not_to include(node_with_negative_storage)
end
it 'adds unclaimed_storage_bytes attribute to returned nodes' do
result = described_class.with_positive_unclaimed_storage_bytes.find(node_with_positive_storage.id)
expect(result).to respond_to(:unclaimed_storage_bytes)
expect(result.unclaimed_storage_bytes).to be >= 0
end
it 'calculates unclaimed_storage_bytes correctly' do
result = described_class.with_positive_unclaimed_storage_bytes.find(node_with_positive_storage.id)
# Manual calculation to verify the scope's calculation
expected_unclaimed_bytes = node_with_positive_storage.total_bytes -
node_with_positive_storage.used_bytes +
node_with_positive_storage.indexed_bytes -
node_with_positive_storage.indices.sum(:reserved_storage_bytes)
expect(result.unclaimed_storage_bytes).to eq(expected_unclaimed_bytes)
end
it 'groups results by node id to handle multiple indices' do
# Create multiple indices for the same node
create(:zoekt_index,
node: node_with_positive_storage,
reserved_storage_bytes: node_with_positive_storage.total_bytes / 4
)
results = described_class.with_positive_unclaimed_storage_bytes
expect(results).to include(node_with_positive_storage)
end
context 'when no indices exist' do
let_it_be(:node_without_indices) { create(:zoekt_node, :enough_free_space) }
it 'includes nodes without indices if they have positive unclaimed storage' do
results = described_class.with_positive_unclaimed_storage_bytes
expect(results).to include(node_without_indices)
end
end
end
end
describe 'validations' do
......
......@@ -825,4 +825,22 @@
expect { execute_task }.to publish_event(Search::Zoekt::AdjustIndicesReservedStorageBytesEvent).with(expected)
end
end
describe '#node_with_negative_unclaimed_storage_bytes_check' do
let(:task) { :node_with_negative_unclaimed_storage_bytes_check }
let_it_be(:negative_node) { create(:zoekt_node, :enough_free_space) }
let_it_be(:_negative_index) do
create(:zoekt_index, reserved_storage_bytes: negative_node.total_bytes * 2, node: negative_node)
end
let_it_be(:positive_node) { create(:zoekt_node, :enough_free_space) }
let_it_be(:_positive_index) { create(:zoekt_index, node: positive_node) }
it 'publishes a NodeWithNegativeUnclaimedStorageEvent for required nodes' do
expected = {
node_ids: [negative_node].map(&:id)
}
expect { execute_task }.to publish_event(Search::Zoekt::NodeWithNegativeUnclaimedStorageEvent).with(expected)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Search::Zoekt::NodeWithNegativeUnclaimedStorageEventWorker, :zoekt_settings_enabled,
feature_category: :global_search do
let(:event) { Search::Zoekt::NodeWithNegativeUnclaimedStorageEvent.new(data: data) }
let_it_be(:node) { create(:zoekt_node, :enough_free_space) }
let_it_be(:index) { create(:zoekt_index, node: node) }
let_it_be(:negative_node) { create(:zoekt_node, :enough_free_space) }
let_it_be(:negative_index) do
create(:zoekt_index, reserved_storage_bytes: negative_node.total_bytes * 2, node: negative_node)
end
let(:data) do
{ node_ids: [node.id, negative_node.id] }
end
it_behaves_like 'subscribes to event'
it 'has the `until_executed` deduplicate strategy' do
expect(described_class.get_deduplicate_strategy).to eq(:until_executed)
end
it_behaves_like 'an idempotent worker' do
it 'processes nodes with negative unclaimed storage bytes' do
expect do
consume_event(subscriber: described_class, event: event)
end.to publish_event(Search::Zoekt::IndexToEvictEvent).with({ index_ids: [negative_index.id] })
end
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment