Skip to content
Snippets Groups Projects
Verified Commit 6d2d2149 authored by Dmitry Gruzd's avatar Dmitry Gruzd :red_circle: Committed by GitLab
Browse files

Zoekt: Return tasks in internal API

parent 217e635b
No related branches found
No related tags found
1 merge request!145796Zoekt: Return tasks in internal API
Showing
with 412 additions and 9 deletions
......@@ -37,6 +37,8 @@ def self.find_or_initialize_by_task_request(params)
s.used_bytes = params.fetch("disk.used")
s.total_bytes = params.fetch("disk.all")
s.metadata['name'] = params.fetch("node.name")
s.metadata['task_count'] = params["node.task_count"].to_i if params["node.task_count"].present?
s.metadata['concurrency'] = params["node.concurrency"].to_i if params["node.concurrency"].present?
end
end
......
......@@ -4,9 +4,11 @@ module Search
module Zoekt
class Task < ApplicationRecord
PARTITION_DURATION = 1.day
PROCESSING_BATCH_SIZE = 100
include PartitionedTable
include IgnorableColumns
include EachBatch
self.table_name = 'zoekt_tasks'
self.primary_key = :id
......@@ -17,6 +19,7 @@ class Task < ApplicationRecord
belongs_to :zoekt_repository, inverse_of: :tasks, class_name: '::Search::Zoekt::Repository'
scope :for_partition, ->(partition) { where(partition_id: partition) }
scope :with_project, -> { includes(zoekt_repository: :project) }
enum state: {
pending: 0,
......@@ -49,6 +52,34 @@ class Task < ApplicationRecord
.where(state: :pending)
.exists?
end
def self.each_task(limit:)
return unless block_given?
count = 0
scope = pending.with_project.order(:perform_at, :id)
iterator = Gitlab::Pagination::Keyset::Iterator.new(scope: scope)
iterator.each_batch(of: PROCESSING_BATCH_SIZE) do |tasks|
orphaned_task_ids = []
tasks.each do |task|
unless task.zoekt_repository&.project
orphaned_task_ids << task.id
next
end
yield task
count += 1
break if count >= limit
end
tasks.where(id: orphaned_task_ids).update_all(state: :orphaned) if orphaned_task_ids.any?
break if count >= limit
end
end
end
end
end
# frozen_string_literal: true
module Search
module Zoekt
class TaskPresenterService
DEFAULT_LIMIT = 20
MAX_LIMIT = 100
attr_reader :node, :concurrency_limit
def initialize(node)
@node = node
@concurrency_limit = get_concurrency_limit(node: node)
end
def execute
[].tap do |payload|
break [] if Feature.enabled?(:zoekt_pause_indexing)
node.tasks.each_task(limit: concurrency_limit) do |task|
payload << TaskSerializerService.execute(task)
end
end
end
def self.execute(...)
new(...).execute
end
private
def get_concurrency_limit(node:)
task_count = node.metadata['task_count']
concurrency = node.metadata['concurrency']
return DEFAULT_LIMIT if task_count.nil? || concurrency.nil? || concurrency == 0
(concurrency - task_count).clamp(0, MAX_LIMIT)
end
end
end
end
# frozen_string_literal: true
module Search
module Zoekt
class TaskSerializerService
INDEXING_TIMEOUT_S = 30.minutes.to_i
attr_reader :task
def initialize(task)
@task = task
end
def execute
case task.task_type.to_sym
when :index_repo
{
name: :index,
payload: index_repo_payload
}
when :force_index_repo
{
name: :index,
payload: force_index_repo_payload
}
when :delete_repo
{
name: :delete,
payload: delete_repo_payload
}
else
raise ArgumentError, "Unknown task_type: #{task.task_type.inspect}"
end
end
def self.execute(...)
new(...).execute
end
private
def index_repo_payload
project = task.zoekt_repository.project
repository_storage = project.repository_storage
connection_info = Gitlab::GitalyClient.connection_data(repository_storage)
repository_path = "#{project.repository.disk_path}.git"
address = connection_info['address']
# This code is needed to support relative unix: connection strings. For example, specs
if address.match?(%r{\Aunix:[^/.]})
path = address.split('unix:').last
address = "unix:#{Rails.root.join(path)}"
end
{
GitalyConnectionInfo: {
Address: address,
Token: connection_info['token'],
Storage: repository_storage,
Path: repository_path
},
Callback: { name: 'index', payload: { task_id: task.id } },
RepoId: project.id,
FileSizeLimit: Gitlab::CurrentSettings.elasticsearch_indexed_file_size_limit_kb.kilobytes,
Timeout: "#{INDEXING_TIMEOUT_S}s"
}
end
def force_index_repo_payload
index_repo_payload.merge(Force: true)
end
def delete_repo_payload
{
RepoId: task.zoekt_repository.project_id,
Callback: { name: 'delete', payload: { task_id: task.id } }
}
end
end
end
end
......@@ -4,6 +4,12 @@
"properties": {
"name": {
"type": "string"
},
"task_count": {
"type": "integer"
},
"concurrency": {
"type": "integer"
}
},
"additionalProperties": true
......
---
name: zoekt_send_tasks
feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/424124
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/145796
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/443300
milestone: '16.11'
group: group::global search
type: ops
default_enabled: false
......@@ -34,7 +34,11 @@ def logger
# We don't want to register (save) the node if the feature flag is disabled
if Feature.disabled?(:zoekt_internal_api_register_nodes, type: :ops) || node.save
{ id: node.id }
{ id: node.id }.tap do |resp|
if Feature.enabled?(:zoekt_send_tasks)
resp[:tasks] = ::Search::Zoekt::TaskPresenterService.execute(node)
end
end
else
unprocessable_entity!
end
......
......@@ -42,7 +42,9 @@
'node.url' => 'http://localhost:6080',
'disk.all' => 994662584320,
'disk.used' => 532673712128,
'disk.free' => 461988872192
'disk.free' => 461988872192,
'node.task_count' => 5,
'node.concurrency' => 10
}
end
......@@ -71,6 +73,8 @@
expect(tasked_node.used_bytes).to eq(params['disk.used'])
expect(tasked_node.total_bytes).to eq(params['disk.all'])
expect(tasked_node.metadata['name']).to eq(params['node.name'])
expect(tasked_node.metadata['task_count']).to eq(params['node.task_count'])
expect(tasked_node.metadata['concurrency']).to eq(params['node.concurrency'])
end
end
......
......@@ -10,6 +10,46 @@
it { is_expected.to belong_to(:zoekt_repository).inverse_of(:tasks) }
end
describe '.with_project' do
it 'eager loads the zoekt_repositories and projects' do
create(:zoekt_task)
task = described_class.with_project.first
recorder = ActiveRecord::QueryRecorder.new { task.zoekt_repository.project }
expect(recorder.count).to be_zero
expect(task.association(:zoekt_repository).loaded?).to eq(true)
end
end
describe '.each_task' do
it 'returns tasks sorted by performed_at' do
task_1 = create(:zoekt_task, perform_at: 1.minute.ago)
task_2 = create(:zoekt_task, perform_at: 3.minutes.ago)
task_3 = create(:zoekt_task, perform_at: 2.minutes.ago)
tasks = []
described_class.each_task(limit: 10) do |task|
tasks << task
end
expect(tasks).to eq([task_2, task_3, task_1])
end
context 'with orphaned task' do
let_it_be(:orphaned_task) { create(:zoekt_task) }
before do
orphaned_task.zoekt_repository.destroy!
end
it 'marks tasks as orphaned' do
expect do
described_class.each_task(limit: 10) { |t| t }
end.to change { orphaned_task.reload.state }.from('pending').to('orphaned')
end
end
end
describe 'sliding_list partitioning' do
let(:partition_manager) { Gitlab::Database::Partitioning::PartitionManager.new(described_class) }
......
......@@ -41,7 +41,7 @@
end
context 'when node does not exist' do
let(:node) { instance_double(::Search::Zoekt::Node, id: nil) }
let(:node) { build(:zoekt_node, id: nil) }
it 'does not save node' do
expect(node).not_to receive(:save)
......@@ -49,12 +49,12 @@
request
expect(response).to have_gitlab_http_status(:ok)
expect(json_response).to eq({ 'id' => nil })
expect(json_response).to eq({ 'id' => nil, 'tasks' => [] })
end
end
context 'when node exists' do
let(:node) { instance_double(::Search::Zoekt::Node, id: 123) }
let(:node) { build(:zoekt_node, id: 123) }
it 'does not save node when node does not exist' do
expect(node).not_to receive(:save)
......@@ -62,14 +62,20 @@
request
expect(response).to have_gitlab_http_status(:ok)
expect(json_response).to eq({ 'id' => node.id })
expect(json_response).to eq({ 'id' => node.id, 'tasks' => [] })
end
end
end
context 'when a task request is received with valid params' do
it 'returns node ID for task request' do
node = instance_double(::Search::Zoekt::Node, id: 123)
let(:node) { build(:zoekt_node, id: 123) }
let(:tasks) { %w[task1 task2] }
before do
allow(::Search::Zoekt::TaskPresenterService).to receive(:execute).and_return(tasks)
end
it 'returns node ID and tasks for task request' do
expect(::Search::Zoekt::Node).to receive(:find_or_initialize_by_task_request)
.with(valid_params).and_return(node)
expect(node).to receive(:save).and_return(true)
......@@ -77,7 +83,25 @@
get api(endpoint), params: valid_params, headers: gitlab_shell_internal_api_request_header
expect(response).to have_gitlab_http_status(:ok)
expect(json_response).to eq({ 'id' => node.id })
expect(json_response).to eq({ 'id' => node.id, 'tasks' => tasks })
end
context 'when zoekt_send_tasks is disabled' do
before do
stub_feature_flags(zoekt_send_tasks: false)
end
it 'does not return tasks' do
expect(::Search::Zoekt::Node).to receive(:find_or_initialize_by_task_request)
.with(valid_params).and_return(node)
expect(node).to receive(:save).and_return(true)
get api(endpoint), params: valid_params, headers: gitlab_shell_internal_api_request_header
expect(::Search::Zoekt::TaskPresenterService).not_to receive(:execute)
expect(response).to have_gitlab_http_status(:ok)
expect(json_response).to eq({ 'id' => node.id })
end
end
end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ::Search::Zoekt::TaskPresenterService, feature_category: :global_search do
let_it_be(:node) { create(:zoekt_node) }
let_it_be(:task) { create(:zoekt_task, node: node) }
let(:service) { described_class.new(node) }
subject(:execute_task) { service.execute }
describe '.execute' do
it 'passes arguments to new and calls execute' do
expect(described_class).to receive(:new).with(node).and_return(service)
expect(service).to receive(:execute)
described_class.execute(node)
end
end
describe '#execute' do
context 'when zoekt_pause_indexing is true' do
before do
stub_feature_flags(zoekt_pause_indexing: true)
end
it 'does nothing' do
expect(::Search::Zoekt::TaskSerializerService).not_to receive(:execute)
expect(execute_task).to be_empty
end
end
context 'when zoekt_pause_indexing is false' do
before do
stub_feature_flags(zoekt_pause_indexing: false)
end
it 'returns serialized tasks' do
expect(execute_task).to contain_exactly(::Search::Zoekt::TaskSerializerService.execute(task))
end
end
end
describe '.concurrency_limit' do
subject(:concurrency_limit) { service.concurrency_limit }
context 'when node does not have task_count/concurrency set' do
let(:node) { build(:zoekt_node) }
it 'returns the default limit' do
expect(concurrency_limit).to eq(described_class::DEFAULT_LIMIT)
end
end
context 'when node has task_count/concurrency set' do
using RSpec::Parameterized::TableSyntax
where(:task_count, :concurrency, :result) do
1 | 1 | 0
1 | 10 | 9
5 | 10 | 5
1 | 200 | described_class::MAX_LIMIT
end
with_them do
let(:node) { build(:zoekt_node, metadata: { 'task_count' => task_count, 'concurrency' => concurrency }) }
it 'returns correct value' do
expect(concurrency_limit).to eq(result)
end
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ::Search::Zoekt::TaskSerializerService, feature_category: :global_search do
let_it_be(:node) { create(:zoekt_node) }
let_it_be(:task) { create(:zoekt_task, node: node) }
let(:service) { described_class.new(task) }
subject(:execute_task) { service.execute }
describe '.execute' do
it 'passes arguments to new and calls execute' do
expect(described_class).to receive(:new).with(task).and_return(service)
expect(service).to receive(:execute)
described_class.execute(task)
end
end
describe '#execute' do
it 'serializes the task' do
expect(execute_task[:name]).to eq(:index)
expect(execute_task[:payload].keys).to contain_exactly(
:GitalyConnectionInfo,
:Callback,
:RepoId,
:FileSizeLimit,
:Timeout
)
end
context 'when local socket is used' do
let(:connection_data) { { "address" => "unix:gdk-ee/praefect.socket", "token" => nil } }
before do
allow(Gitlab::GitalyClient).to receive(:connection_data).and_return(connection_data)
end
it 'transforms unix socket' do
expected_path = "unix:#{Rails.root.join('gdk-ee/praefect.socket')}"
expect(execute_task[:payload][:GitalyConnectionInfo][:Address]).to eq(expected_path)
end
end
context 'with :force_index_repo task' do
let(:task) { create(:zoekt_task, task_type: :force_index_repo) }
it 'serializes the task' do
expect(execute_task[:name]).to eq(:index)
expect(execute_task[:payload].keys).to contain_exactly(
:GitalyConnectionInfo,
:Callback,
:RepoId,
:FileSizeLimit,
:Timeout,
:Force
)
end
end
context 'with :delete_repo task' do
let(:task) { create(:zoekt_task, task_type: :delete_repo) }
it 'serializes the task' do
expect(execute_task[:name]).to eq(:delete)
expect(execute_task[:payload].keys).to contain_exactly(:RepoId, :Callback)
end
end
context 'with unknown task' do
let(:task) { create(:zoekt_task) }
before do
allow(task).to receive(:task_type).and_return(:unknown)
end
it 'raises an exception' do
expect { execute_task }.to raise_error(ArgumentError)
end
end
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment