Skip to content
Snippets Groups Projects
Commit 90827cbb authored by John Mason's avatar John Mason
Browse files

Merge branch '351381-backfill-blobs-and-wiki-blobs' into 'master'

Add migration for backfilling traversal_ids in blobs and wiki blobs

See merge request !107730



Merged-by: default avatarJohn Mason <9717668-john-mason@users.noreply.gitlab.com>
Approved-by: default avatarIan Anderson <ianderson@gitlab.com>
Approved-by: default avatarJohn Mason <9717668-john-mason@users.noreply.gitlab.com>
Reviewed-by: Terri Chu's avatarTerri Chu <tchu@gitlab.com>
Reviewed-by: default avatarIan Anderson <ianderson@gitlab.com>
Co-authored-by: Terri Chu's avatarTerri Chu <tchu@gitlab.com>
Co-authored-by: Siddharth Dungarwal's avatarSiddharth Dungarwal <sd5869@gmail.com>
parents 49ce21bf 81aa23cb
No related branches found
No related tags found
1 merge request!107730Add migration for backfilling traversal_ids in blobs and wiki blobs
Pipeline #751072421 passed
# frozen_string_literal: true
class BackfillTraversalIdsToBlobsAndWikiBlobs < Elastic::Migration
include Elastic::MigrationHelper
batch_size 100_000
ELASTIC_TIMEOUT = '5m'
BLOB_AND_WIKI_BLOB = %w[blob wiki_blob].freeze
batched!
throttle_delay 45.seconds
retry_on_failure
def migrate
task_id = migration_state[:task_id]
if task_id
task_status = helper.task_status(task_id: task_id)
if task_status['failures'].present?
set_migration_state(task_id: nil)
log_raise "Failed to update projects : #{task_status['failures']}"
end
if task_status['completed'].present?
log "Updating traversal_ids in original index is completed for task_id:#{task_id}"
set_migration_state(task_id: nil)
else
log "Updating traversal_ids in original index is still in progress for task_id: #{task_id}"
end
return
end
if completed?
log "Migration Completed: There are no projects left to add traversal_ids"
return
end
log "Searching for the projects with missing traversal_ids"
project_ids = projects_with_missing_traversal_ids
log "Found #{project_ids.size} projects with missing traversal_ids"
project_ids.each do |project_id|
update_by_query(Project.find(project_id))
rescue ActiveRecord::RecordNotFound
log "Project not found: #{project_ids[0]}"
end
end
def completed?
helper.refresh_index(index_name: helper.target_name)
log "Running the count_items_missing_traversal_ids query"
total_remaining = count_items_missing_traversal_ids
log "Checking to see if migration is completed based on index counts remaining: #{total_remaining}"
total_remaining == 0
end
private
def update_by_query(project)
log "Launching update query for project #{project.id}"
response = client.update_by_query(
index: helper.target_name,
body: {
query: {
bool: {
filter: [
{ term: { project_id: project.id } },
{ terms: { type: BLOB_AND_WIKI_BLOB } }
]
}
},
script: {
lang: "painless",
source: "ctx._source.traversal_ids = '#{project.namespace_ancestry}'"
}
},
wait_for_completion: false,
timeout: ELASTIC_TIMEOUT,
conflicts: 'proceed'
)
if response['failures'].present?
set_migration_state(task_id: nil)
log_raise "Failed to update project with project_id: #{project.id} - #{response['failures']}"
end
task_id = response['task']
log "Adding traversal_ids to original index is started with task_id: #{task_id}"
set_migration_state(
task_id: task_id
)
rescue StandardError => e
set_migration_state(task_id: nil)
raise e
end
def count_items_missing_traversal_ids
client.count(
index: helper.target_name,
body: {
query: query_missing_traversal_ids
}
)['count']
end
def projects_with_missing_traversal_ids
results = client.search(
index: helper.target_name,
body: {
size: 0,
query: query_missing_traversal_ids,
aggs: {
project_ids: {
terms: { size: batch_size, field: "project_id" }
}
}
}
)
project_ids_hist = results.dig('aggregations', 'project_ids', 'buckets') || []
project_ids_hist.pluck("key") # rubocop: disable CodeReuse/ActiveRecord
end
def query_missing_traversal_ids
{
bool: {
must_not: { exists: { field: "traversal_ids" } },
must: { terms: { type: BLOB_AND_WIKI_BLOB } }
}
}
end
end
# frozen_string_literal: true
require 'spec_helper'
require_relative 'migration_shared_examples'
require File.expand_path('ee/elastic/migrate/20221221110300_backfill_traversal_ids_to_blobs_and_wiki_blobs.rb')
RSpec.describe BackfillTraversalIdsToBlobsAndWikiBlobs, :elastic_clean, :sidekiq_inline,
feature_category: :global_search do
let(:version) { 20221221110300 }
let(:old_version_without_traversal_ids) { 20221213090600 }
let(:helper) { Gitlab::Elastic::Helper.new }
let(:index_name) { Project.__elasticsearch__.index_name }
let(:migration) { described_class.new(version) }
let_it_be_with_reload(:projects) { create_list(:project, 3, :repository) }
before do
stub_ee_application_setting(elasticsearch_search: true, elasticsearch_indexing: true)
set_elasticsearch_migration_to(version, including: false)
allow(migration).to receive(:helper).and_return(helper)
end
describe 'migration_options' do
it 'has migration options set', :aggregate_failures do
expect(migration.batched?).to be_truthy
expect(migration.retry_on_failure?).to be_truthy
expect(migration.throttle_delay).to eq(45.seconds)
expect(migration.batch_size).to eq(100_000)
end
end
describe '.migrate' do
context 'with traversal_ids in all projects' do
it 'does not execute update_by_query' do
projects = create_list(:project, 2, :repository)
projects.each { |p| p.repository.index_commits_and_blobs }
ensure_elasticsearch_index!
migration.migrate
expect(migration.completed?).to be_truthy
expect(helper.client).not_to receive(:update_by_query)
end
end
context 'when task in progress' do
let(:client) { instance_double('Elasticsearch::Transport::Client') }
before do
allow(migration).to receive(:completed?).and_return(false)
allow(migration).to receive(:client).and_return(client)
allow(helper).to receive(:task_status).and_return('completed' => false)
migration.set_migration_state(task_id: 'task_1')
end
it 'does nothing if task is not completed' do
expect(client).not_to receive(:update_by_query)
migration.migrate
end
end
context 'with project not found exception' do
let(:client) { instance_double('Elasticsearch::Transport::Client') }
before do
allow(migration).to receive(:client).and_return(client)
allow(migration).to receive(:projects_with_missing_traversal_ids).and_return([0])
allow(migration).to receive(:completed?).and_return(false)
end
it 'log failure when project is not found' do
migration.migrate
expect(migration).to receive(:log).with(/Searching for the projects with missing traversal_ids/).once
expect(migration).to receive(:log).with(/projects with missing traversal_ids/).once
expect(migration).to receive(:log).with(/Project not found/).once
migration.migrate
end
end
context 'when migration fails' do
let(:client) { instance_double('Elasticsearch::Transport::Client') }
before do
allow(client).to receive(:update_by_query).and_return(update_by_query_response)
allow(helper).to receive(:task_status).with(task_id: 'task_1').and_return(task_status_response)
allow(migration).to receive(:projects_with_missing_traversal_ids).and_return(projects.map(&:id))
allow(migration).to receive(:completed?).and_return(false)
allow(migration).to receive(:client).and_return(client)
end
context 'when Elasticsearch responds with errors' do
context 'when a task throws an error' do
let(:task_status_response) { { 'failures' => ['failed'] } }
let(:update_by_query_response) { { 'task' => 'task_1' } }
it 'resets task_id' do
migration.set_migration_state(task_id: 'task_1') # simulate a task in progress
expect { migration.migrate }.to raise_error(/Failed to update projects/)
expect(migration.migration_state).to match(task_id: nil)
end
end
context 'when update_by_query throws an error' do
let(:task_status_response) { {} }
let(:update_by_query_response) { { 'failures' => ['failed'] } }
it 'sets task_id to nil' do
migration.set_migration_state(task_id: nil) # simulate a new task being created
expect { migration.migrate }.to raise_error(/Failed to update project with project_id/)
expect(migration.migration_state).to match(task_id: nil)
end
end
end
end
end
describe 'integration test' do
before do
set_elasticsearch_migration_to(old_version_without_traversal_ids, including: false)
projects.each do |project|
project.repository.index_commits_and_blobs # ensure objects are indexed
end
ensure_elasticsearch_index!
set_elasticsearch_migration_to(version, including: false)
end
it 'updates all documents in single batch' do
expect(migration.completed?).to be_falsey
migration.migrate
expect(migration.migration_state).to match(task_id: anything)
# the migration might not complete after the initial task is created
# so make sure it actually completes
100.times do |_| # Max 1s waiting
migration.migrate
break if migration.migration_state[:task_id].nil?
sleep 0.01
end
expect(migration.completed?).to be_truthy
end
context 'with more than one batch' do
before do
allow(migration).to receive(:batch_size).and_return(2)
end
it 'tracks all blobs and wiki_blobs in two iterations in one batch' do
# First batch
# the migration might not complete after the initial task is created
# so make sure it actually completes
50.times do |_| # Max 0.5s waiting
migration.migrate
break if migration.migration_state[:task_id].nil?
sleep 0.01
end
expect(migration.completed?).to be_falsey
# Second batch
# the migration might not complete after the initial task is created
# so make sure it actually completes
100.times do |_| # Max 1s waiting
migration.migrate
break if migration.completed?
sleep 0.01
end
migration.migrate
expect(migration.completed?).to be_truthy
expect(migration.migration_state).to match(task_id: nil)
end
end
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment