Skip to content
Snippets Groups Projects
Commit 42fff2b1 authored by Siddharth Dungarwal's avatar Siddharth Dungarwal :two:
Browse files

Merge branch '351381-backfill-blobs-and-wiki-blobs' into 'master'

Add migration for backfilling traversal_ids in blobs and wiki blobs

See merge request !107730



Merged-by: default avatarSiddharth Dungarwal <sdungarwal@gitlab.com>
Approved-by: default avatarIan Anderson <ianderson@gitlab.com>
Reviewed-by: Terri Chu's avatarTerri Chu <tchu@gitlab.com>
Reviewed-by: default avatarIan Anderson <ianderson@gitlab.com>
Co-authored-by: default avatarIan Anderson <ianderson@gitlab.com>
Co-authored-by: Siddharth Dungarwal's avatarSiddharth Dungarwal <sd5869@gmail.com>
parents 7591675a d1bce549
No related branches found
No related tags found
No related merge requests found
Pipeline #745077570 canceled
# frozen_string_literal: true
class BackfillTraversalIdsToBlobsAndWikiBlobs < Elastic::Migration
include Elastic::MigrationHelper
batch_size 100_000
ELASTIC_TIMEOUT = '5m'
BLOB_AND_WIKI_BLOB = %w[blob wiki_blob].freeze
batched!
throttle_delay 1.minute
retry_on_failure
def migrate
task_id = migration_state[:task_id]
if task_id
task_status = helper.task_status(task_id: task_id)
if task_status['failures'].present?
set_migration_state(task_id: nil)
log_raise "Failed to update projects : #{task_status['failures']}"
end
if task_status['completed']
log "Updating traversal_ids in original index is completed for task_id: #{task_id}"
set_migration_state(task_id: nil)
else
log "Updating traversal_ids in original index is still in progress for task_id: #{task_id}"
end
return
end
if completed?
log "Migration Completed: There are no projects left to add traversal_ids"
return
end
log "Searching for the projects with missing traversal_ids"
project_ids = projects_with_missing_traversal_ids
log "Found #{project_ids.size} projects with missing traversal_ids"
begin
update_by_query(Project.find(project_ids[0])) unless project_ids.empty?
rescue ActiveRecord::RecordNotFound
log "Project not found: #{project_ids[0]}"
end
end
def completed?
helper.refresh_index(index_name: helper.target_name)
log "Running the count_items_missing_traversal_ids query"
total_remaining = count_items_missing_traversal_ids
log "Checking to see if migration is completed based on index counts remaining: #{total_remaining}"
total_remaining == 0
end
private
def update_by_query(project)
log "Launching update query for project #{project.id}"
response = client.update_by_query(
index: helper.target_name,
body: {
query: {
bool: {
filter: [
{ term: { project_id: project.id.to_s } },
{ terms: { type: BLOB_AND_WIKI_BLOB } }
]
}
},
script: {
lang: "painless",
source: "ctx._source.traversal_ids = '#{project.namespace_ancestry}'"
}
},
wait_for_completion: false,
timeout: ELASTIC_TIMEOUT,
conflicts: 'proceed'
)
if response['failures'].present?
set_migration_state(task_id: nil)
log_raise "Failed to update project with project_id: #{project.id} - #{response['failures']}"
end
task_id = response['task']
log "Adding traversal_ids to original index is started with task_id: #{task_id}"
set_migration_state(
task_id: task_id
)
rescue StandardError => e
set_migration_state(task_id: nil)
raise e
end
def count_items_missing_traversal_ids
client.count(
index: helper.target_name,
body: {
query: {
bool: {
must_not: { exists: { field: "traversal_ids" } },
must: { terms: { type: BLOB_AND_WIKI_BLOB } }
}
}
}
)['count']
end
def projects_with_missing_traversal_ids
results = client.search(
index: helper.target_name,
body: {
size: 0,
query: {
bool: {
must_not: { exists: { field: "traversal_ids" } },
must: { terms: { type: BLOB_AND_WIKI_BLOB } }
}
},
aggs: {
project_ids: {
terms: { size: batch_size, field: "project_id" }
}
}
}
)
project_ids_hist = results.dig('aggregations', 'project_ids', 'buckets') || []
# rubocop: disable CodeReuse/ActiveRecord
project_ids_hist.pluck("key")
# rubocop: enable CodeReuse/ActiveRecord
end
end
# frozen_string_literal: true
require 'spec_helper'
require_relative 'migration_shared_examples'
require File.expand_path('ee/elastic/migrate/20221221110300_backfill_traversal_ids_to_blobs_and_wiki_blobs.rb')
RSpec.describe BackfillTraversalIdsToBlobsAndWikiBlobs, :elastic_clean, :sidekiq_inline,
feature_category: :global_search do
let(:version) { 20221221110300 }
let(:old_version_without_traversal_ids) { 20221213090600 }
let(:helper) { Gitlab::Elastic::Helper.new }
let(:index_name) { Project.__elasticsearch__.index_name }
let(:migration) { described_class.new(version) }
before do
stub_ee_application_setting(elasticsearch_search: true, elasticsearch_indexing: true)
allow(migration).to receive(:helper).and_return(helper)
end
describe 'migration_options' do
it 'has migration options set', :aggregate_failures do
expect(migration.batched?).to be_truthy
expect(migration.retry_on_failure?).to be_truthy
expect(migration.throttle_delay).to eq(1.minute)
expect(migration.batch_size).to eq(100_000)
end
end
describe '.migrate' do
context 'with traversal_ids in all projects' do
it 'does not execute update_by_query' do
migration.migrate
expect(migration.completed?).to be_truthy
expect(helper.client).not_to receive(:update_by_query)
end
end
context 'when task in progress' do
let(:client) { instance_double('Elasticsearch::Transport::Client') }
before do
allow(migration).to receive(:completed?).and_return(false)
allow(migration).to receive(:client).and_return(client)
allow(helper).to receive(:task_status).and_return('completed' => false)
migration.set_migration_state(task_id: 'task_1')
end
it 'does nothing if task is not completed' do
expect(client).not_to receive(:update_by_query)
migration.migrate
end
end
context 'when migration fails' do
let(:client) { instance_double('Elasticsearch::Transport::Client') }
let_it_be(:project) { create(:project, :repository) }
before do
allow(migration).to receive(:client).and_return(client)
allow(migration).to receive(:completed?).and_return(false)
allow(migration).to receive(:projects_with_missing_traversal_ids).and_return([project.id])
end
context 'when es responds with errors' do
before do
allow(client).to receive(:update_by_query).and_return('task' => 'task_1')
end
context 'when a task throws an error' do
before do
allow(helper).to receive(:task_status).and_return('failures' => ['failed'])
migration.migrate
end
it 'resets task_id' do
expect { migration.migrate }.to raise_error(/Failed to update projects/)
expect(migration.migration_state).to match(task_id: nil)
end
end
context 'when update_by_query throws an error' do
before do
allow(client).to receive(:update_by_query).and_return('failures' => ['failed'])
end
it 'resets task_id' do
expect { migration.migrate }.to raise_error(/Failed to update project with project_id/)
expect(migration.migration_state).to match(task_id: nil)
end
end
end
end
end
describe 'integration test' do
let(:projects) { create_list(:project, 2, :repository) }
before do
set_elasticsearch_migration_to(old_version_without_traversal_ids, including: false)
ensure_elasticsearch_index!
projects.each do |project|
project.repository.index_commits_and_blobs # ensure objects are indexed
end
set_elasticsearch_migration_to(version, including: false)
ensure_elasticsearch_index!
end
it 'index all the documents in multiple iterations' do
migration.set_migration_state(task_id: nil)
expect(migration.completed?).to be_falsey
migration.migrate
expect(migration.migration_state).to match(task_id: anything)
# the migration might not complete after the initial task is created
# so make sure it actually completes
100.times do |_| # Max 1s waiting
migration.migrate
break if migration.migration_state[:task_id].nil?
sleep 0.01
end
migration.migrate
expect(migration.migration_state).to match(task_id: anything)
# the migration might not complete after the initial task is created
# so make sure it actually completes
100.times do |_| # Max 1s waiting
migration.migrate
break if migration.migration_state[:task_id].nil?
sleep 0.01
end
expect(migration.completed?).to be_truthy
end
context 'with project not found exception' do
before do
allow(Project).to receive(:find).and_raise(ActiveRecord::RecordNotFound)
end
it 'log failure when project is not found' do
migration.set_migration_state(task_id: nil)
expect(migration.completed?).to be_falsey
expect(migration).to receive(:log).with(/Running the count_items_missing_traversal_ids query/).once
expect(migration).to receive(:log).with(/Checking to see if migration is completed/).once
expect(migration).to receive(:log).with(/Searching for the projects with missing traversal_ids/).once
expect(migration).to receive(:log).with(/projects with missing traversal_ids/).once
expect(migration).to receive(:log).with(/Project not found/).once
migration.migrate
end
end
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment