Skip to content
Snippets Groups Projects

Add migration for backfilling traversal_ids in blobs and wiki blobs

Merged Siddharth Dungarwal requested to merge 351381-backfill-blobs-and-wiki-blobs into master
Compare and Show latest version
2 files
+ 139
39
Compare changes
  • Side-by-side
  • Inline
Files
2
@@ -3,7 +3,7 @@
class BackfillTraversalIdsToBlobsAndWikiBlobs < Elastic::Migration
include Elastic::MigrationHelper
BATCH_SIZE = 1000
batch_size 100_000
ELASTIC_TIMEOUT = '5m'
BLOB_AND_WIKI_BLOB = %w[blob wiki_blob].freeze
batched!
@@ -11,6 +11,26 @@ class BackfillTraversalIdsToBlobsAndWikiBlobs < Elastic::Migration
retry_on_failure
def migrate
task_id = migration_state[:task_id]
if task_id
task_status = helper.task_status(task_id: task_id)
if task_status['failures'].present?
set_migration_state(task_id: nil)
log_raise "Failed to update projects : #{task_status['failures']}"
end
if task_status['completed']
log "Updating traversal_ids in original index is completed for task_id:#{task_id}"
set_migration_state(task_id: nil)
else
log "Updating traversal_ids in original index is still in progress for task_id:#{task_id}"
end
return
end
if completed?
log "Migration Completed: There are no projects left to add traversal_ids"
return
@@ -19,25 +39,27 @@ def migrate
log "Searching for the projects with missing traversal_ids"
project_ids = projects_with_missing_traversal_ids
log "Found #{project_ids.size} projects with missing traversal_ids"
project_ids.each do |project_id|
update_by_query(Project.find(project_id))
begin
update_by_query(Project.find(project_ids[0])) unless project_ids.empty?
rescue ActiveRecord::RecordNotFound
log "Project not found: #{project_ids[0]}"
end
end
def completed?
helper.refresh_index
helper.refresh_index(index_name: helper.target_name)
log "Running the count_items_missing_traversal_ids query"
total_remaining = count_items_missing_traversal_ids
log "Checking to see if migration is completed based on index counts remaining: #{total_remaining}"
total_remaining == 0
end
private
def update_by_query(project)
client.update_by_query(
log "Launching update query for project #{project.id}"
response = client.update_by_query(
index: helper.target_name,
body: {
query: {
@@ -53,10 +75,25 @@ def update_by_query(project)
source: "ctx._source.traversal_ids = '#{project.namespace_ancestry}'"
}
},
wait_for_completion: true,
wait_for_completion: false,
timeout: ELASTIC_TIMEOUT,
conflicts: 'proceed'
)
if response['failures'].present?
set_migration_state(task_id: nil)
log_raise "Failed to update project with project_id: #{project.id} - #{response['failures']}"
end
task_id = response['task']
log "Adding traversal_ids to original index is started with task_id:#{task_id}"
set_migration_state(
task_id: task_id
)
rescue StandardError => e
set_migration_state(task_id: nil)
raise e
end
def count_items_missing_traversal_ids
@@ -86,7 +123,7 @@ def projects_with_missing_traversal_ids
},
aggs: {
project_ids: {
terms: { size: BATCH_SIZE, field: "project_id" }
terms: { size: batch_size, field: "project_id" }
}
}
}
Loading