Pass commit data to ProcessCommitWorker

By passing commit data to this worker we remove the need for querying
the Git repository for every job. This in turn reduces the time spent
processing each job.

The migration included migrates jobs from the old format to the new
format. For this to work properly it requires downtime as otherwise
workers may start producing errors until they're using a newer version
of the worker code.
parent e91afc0d
......@@ -48,6 +48,10 @@ class Commit
max_lines: DIFF_HARD_LIMIT_LINES,
}
end
def from_hash(hash, project)
new(Gitlab::Git::Commit.new(hash), project)
end
end
attr_accessor :raw
......
......@@ -135,7 +135,7 @@ class GitPushService < BaseService
@push_commits.each do |commit|
ProcessCommitWorker.
perform_async(project.id, current_user.id, commit.id, default)
perform_async(project.id, current_user.id, commit.to_hash, default)
end
end
......
......@@ -10,9 +10,10 @@ class ProcessCommitWorker
# project_id - The ID of the project this commit belongs to.
# user_id - The ID of the user that pushed the commit.
# commit_sha - The SHA1 of the commit to process.
# commit_hash - Hash containing commit details to use for constructing a
# Commit object without having to use the Git repository.
# default - The data was pushed to the default branch.
def perform(project_id, user_id, commit_sha, default = false)
def perform(project_id, user_id, commit_hash, default = false)
project = Project.find_by(id: project_id)
return unless project
......@@ -21,10 +22,7 @@ class ProcessCommitWorker
return unless user
commit = find_commit(project, commit_sha)
return unless commit
commit = build_commit(project, commit_hash)
author = commit.author || user
process_commit_message(project, commit, user, author, default)
......@@ -59,9 +57,18 @@ class ProcessCommitWorker
update_all(first_mentioned_in_commit_at: commit.committed_date)
end
private
def build_commit(project, hash)
date_suffix = '_date'
# When processing Sidekiq payloads various timestamps are stored as Strings.
# Commit in turn expects Time-like instances upon input, so we have to
# manually parse these values.
hash.each do |key, value|
if key.to_s.end_with?(date_suffix) && value.is_a?(String)
hash[key] = Time.parse(value)
end
end
def find_commit(project, sha)
project.commit(sha)
Commit.from_hash(hash, project)
end
end
---
title: Pass commit data to ProcessCommitWorker to reduce Git overhead
merge_request: 7744
author:
# See http://doc.gitlab.com/ce/development/migration_style_guide.html
# for more information on how to write migrations for GitLab.
class MigrateProcessCommitWorkerJobs < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
class Project < ActiveRecord::Base
def self.find_including_path(id)
select("projects.*, CONCAT(namespaces.path, '/', projects.path) AS path_with_namespace").
joins('INNER JOIN namespaces ON namespaces.id = projects.namespace_id').
find_by(id: id)
end
def repository_storage_path
Gitlab.config.repositories.storages[repository_storage]
end
def repository_path
File.join(repository_storage_path, read_attribute(:path_with_namespace) + '.git')
end
def repository
@repository ||= Rugged::Repository.new(repository_path)
end
end
DOWNTIME = true
DOWNTIME_REASON = 'Existing workers will error until they are using a newer version of the code'
disable_ddl_transaction!
def up
Sidekiq.redis do |redis|
new_jobs = []
while job = redis.lpop('queue:process_commit')
payload = JSON.load(job)
project = Project.find_including_path(payload['args'][0])
next unless project
begin
commit = project.repository.lookup(payload['args'][2])
rescue Rugged::OdbError
next
end
hash = {
id: commit.oid,
message: commit.message,
parent_ids: commit.parent_ids,
authored_date: commit.author[:time],
author_name: commit.author[:name],
author_email: commit.author[:email],
committed_date: commit.committer[:time],
committer_email: commit.committer[:email],
committer_name: commit.committer[:name]
}
payload['args'][2] = hash
new_jobs << JSON.dump(payload)
end
redis.multi do |multi|
new_jobs.each do |j|
multi.lpush('queue:process_commit', j)
end
end
end
end
def down
Sidekiq.redis do |redis|
new_jobs = []
while job = redis.lpop('queue:process_commit')
payload = JSON.load(job)
payload['args'][2] = payload['args'][2]['id']
new_jobs << JSON.dump(payload)
end
redis.multi do |multi|
new_jobs.each do |j|
multi.lpush('queue:process_commit', j)
end
end
end
end
end
......@@ -321,6 +321,6 @@ describe Gitlab::CycleAnalytics::Events do
context.update(milestone: milestone)
mr = create_merge_request_closing_issue(context)
ProcessCommitWorker.new.perform(project.id, user.id, mr.commits.last.sha)
ProcessCommitWorker.new.perform(project.id, user.id, mr.commits.last.to_hash)
end
end
require 'spec_helper'
require Rails.root.join('db', 'migrate', '20161124141322_migrate_process_commit_worker_jobs.rb')
describe MigrateProcessCommitWorkerJobs do
let(:project) { create(:project) }
let(:user) { create(:user) }
let(:commit) { project.commit.raw.raw_commit }
describe 'Project' do
describe 'find_including_path' do
it 'returns Project instances' do
expect(described_class::Project.find_including_path(project.id)).
to be_an_instance_of(described_class::Project)
end
it 'selects the full path for every Project' do
migration_project = described_class::Project.
find_including_path(project.id)
expect(migration_project[:path_with_namespace]).
to eq(project.path_with_namespace)
end
end
describe '#repository_storage_path' do
it 'returns the storage path for the repository' do
migration_project = described_class::Project.
find_including_path(project.id)
expect(File.directory?(migration_project.repository_storage_path)).
to eq(true)
end
end
describe '#repository_path' do
it 'returns the path to the repository' do
migration_project = described_class::Project.
find_including_path(project.id)
expect(File.directory?(migration_project.repository_path)).to eq(true)
end
end
describe '#repository' do
it 'returns a Rugged::Repository' do
migration_project = described_class::Project.
find_including_path(project.id)
expect(migration_project.repository).
to be_an_instance_of(Rugged::Repository)
end
end
end
describe '#up', :redis do
let(:migration) { described_class.new }
def job_count
Sidekiq.redis { |r| r.llen('queue:process_commit') }
end
before do
Sidekiq.redis do |redis|
job = JSON.dump(args: [project.id, user.id, commit.oid])
redis.lpush('queue:process_commit', job)
end
end
it 'skips jobs using a project that no longer exists' do
allow(described_class::Project).to receive(:find_including_path).
with(project.id).
and_return(nil)
migration.up
expect(job_count).to eq(0)
end
it 'skips jobs using commits that no longer exist' do
allow_any_instance_of(Rugged::Repository).to receive(:lookup).
with(commit.oid).
and_raise(Rugged::OdbError)
migration.up
expect(job_count).to eq(0)
end
it 'inserts migrated jobs back into the queue' do
migration.up
expect(job_count).to eq(1)
end
context 'a migrated job' do
let(:job) do
migration.up
JSON.load(Sidekiq.redis { |r| r.lpop('queue:process_commit') })
end
let(:commit_hash) do
job['args'][2]
end
it 'includes the project ID' do
expect(job['args'][0]).to eq(project.id)
end
it 'includes the user ID' do
expect(job['args'][1]).to eq(user.id)
end
it 'includes the commit ID' do
expect(commit_hash['id']).to eq(commit.oid)
end
it 'includes the commit message' do
expect(commit_hash['message']).to eq(commit.message)
end
it 'includes the parent IDs' do
expect(commit_hash['parent_ids']).to eq(commit.parent_ids)
end
it 'includes the author date' do
expect(commit_hash['authored_date']).to eq(commit.author[:time].to_s)
end
it 'includes the author name' do
expect(commit_hash['author_name']).to eq(commit.author[:name])
end
it 'includes the author Email' do
expect(commit_hash['author_email']).to eq(commit.author[:email])
end
it 'includes the commit date' do
expect(commit_hash['committed_date']).to eq(commit.committer[:time].to_s)
end
it 'includes the committer name' do
expect(commit_hash['committer_name']).to eq(commit.committer[:name])
end
it 'includes the committer Email' do
expect(commit_hash['committer_email']).to eq(commit.committer[:email])
end
end
end
describe '#down', :redis do
let(:migration) { described_class.new }
def job_count
Sidekiq.redis { |r| r.llen('queue:process_commit') }
end
before do
Sidekiq.redis do |redis|
job = JSON.dump(args: [project.id, user.id, commit.oid])
redis.lpush('queue:process_commit', job)
migration.up
end
end
it 'pushes migrated jobs back into the queue' do
migration.down
expect(job_count).to eq(1)
end
context 'a migrated job' do
let(:job) do
migration.down
JSON.load(Sidekiq.redis { |r| r.lpop('queue:process_commit') })
end
it 'includes the project ID' do
expect(job['args'][0]).to eq(project.id)
end
it 'includes the user ID' do
expect(job['args'][1]).to eq(user.id)
end
it 'includes the commit SHA' do
expect(job['args'][2]).to eq(commit.oid)
end
end
end
end
......@@ -302,4 +302,21 @@ eos
expect(commit.uri_type('this/path/doesnt/exist')).to be_nil
end
end
describe '.from_hash' do
let(:new_commit) { described_class.from_hash(commit.to_hash, project) }
it 'returns a Commit' do
expect(new_commit).to be_an_instance_of(described_class)
end
it 'wraps a Gitlab::Git::Commit' do
expect(new_commit.raw).to be_an_instance_of(Gitlab::Git::Commit)
end
it 'stores the correct commit fields' do
expect(new_commit.id).to eq(commit.id)
expect(new_commit.message).to eq(commit.message)
end
end
end
......@@ -135,6 +135,6 @@ describe 'cycle analytics events' do
merge_merge_requests_closing_issue(issue)
ProcessCommitWorker.new.perform(project.id, user.id, mr.commits.last.sha)
ProcessCommitWorker.new.perform(project.id, user.id, mr.commits.last.to_hash)
end
end
......@@ -263,7 +263,7 @@ describe GitPushService, services: true do
author_email: commit_author.email
)
allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit).
allow_any_instance_of(ProcessCommitWorker).to receive(:build_commit).
and_return(commit)
allow(project.repository).to receive(:commits_between).and_return([commit])
......@@ -321,7 +321,7 @@ describe GitPushService, services: true do
committed_date: commit_time
)
allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit).
allow_any_instance_of(ProcessCommitWorker).to receive(:build_commit).
and_return(commit)
allow(project.repository).to receive(:commits_between).and_return([commit])
......@@ -360,7 +360,7 @@ describe GitPushService, services: true do
allow(project.repository).to receive(:commits_between).
and_return([closing_commit])
allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit).
allow_any_instance_of(ProcessCommitWorker).to receive(:build_commit).
and_return(closing_commit)
project.team << [commit_author, :master]
......
......@@ -55,8 +55,12 @@ RSpec.configure do |config|
config.around(:each, :redis) do |example|
Gitlab::Redis.with(&:flushall)
Sidekiq.redis(&:flushall)
example.run
Gitlab::Redis.with(&:flushall)
Sidekiq.redis(&:flushall)
end
end
......
......@@ -11,31 +11,25 @@ describe ProcessCommitWorker do
it 'does not process the commit when the project does not exist' do
expect(worker).not_to receive(:close_issues)
worker.perform(-1, user.id, commit.id)
worker.perform(-1, user.id, commit.to_hash)
end
it 'does not process the commit when the user does not exist' do
expect(worker).not_to receive(:close_issues)
worker.perform(project.id, -1, commit.id)
end
it 'does not process the commit when the commit no longer exists' do
expect(worker).not_to receive(:close_issues)
worker.perform(project.id, user.id, 'this-should-does-not-exist')
worker.perform(project.id, -1, commit.to_hash)
end
it 'processes the commit message' do
expect(worker).to receive(:process_commit_message).and_call_original
worker.perform(project.id, user.id, commit.id)
worker.perform(project.id, user.id, commit.to_hash)
end
it 'updates the issue metrics' do
expect(worker).to receive(:update_issue_metrics).and_call_original
worker.perform(project.id, user.id, commit.id)
worker.perform(project.id, user.id, commit.to_hash)
end
end
......@@ -106,4 +100,19 @@ describe ProcessCommitWorker do
expect(metric.first_mentioned_in_commit_at).to eq(commit.committed_date)
end
end
describe '#build_commit' do
it 'returns a Commit' do
commit = worker.build_commit(project, id: '123')
expect(commit).to be_an_instance_of(Commit)
end
it 'parses date strings into Time instances' do
commit = worker.
build_commit(project, id: '123', authored_date: Time.now.to_s)
expect(commit.authored_date).to be_an_instance_of(Time)
end
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment