Commit fd19ef68 authored by Shinya Maeda's avatar Shinya Maeda

Efficient merge train locks

Efficient merge train locks with Sequential Process helper.
parent 9ef2efde
Pipeline #70433183 passed with stages
in 69 minutes and 1 second
......@@ -21,6 +21,11 @@ class MergeTrain < ApplicationRecord
all_in_train(merge_request).first
end
def first_in_train_from(merge_request_ids)
merge_request = MergeRequest.find(merge_request_ids.first)
all_in_train(merge_request).where(id: merge_request_ids).first
end
def total_count_in_train(merge_request)
all_in_train(merge_request).count
end
......
......@@ -9,13 +9,35 @@ module MergeTrains
def execute(merge_request)
return unless merge_request.on_train?
in_lock("merge_train:#{merge_request.target_project_id}-#{merge_request.target_branch}") do
unsafe_refresh(merge_request)
if Feature.enabled?(:merge_trains_efficient_refresh, default_enabled: true)
efficient_refresh(merge_request)
else
legacy_refresh(merge_request)
end
end
private
def efficient_refresh(merge_request)
queue = Gitlab::BatchPopQueueing.new('merge_trains', queue_id(merge_request))
result = queue.safe_execute([merge_request.id], lock_timeout: 15.minutes) do |items|
first_merge_request = MergeTrain.first_in_train_from(items)
unsafe_refresh(first_merge_request)
end
if result[:status] == :finished && result[:new_items].present?
first_merge_request = MergeTrain.first_in_train_from(result[:new_items])
AutoMergeProcessWorker.perform_async(first_merge_request.id)
end
end
def legacy_refresh(merge_request)
in_lock("merge_train:#{merge_request.target_project_id}-#{merge_request.target_branch}") do
unsafe_refresh(merge_request)
end
end
def unsafe_refresh(merge_request)
following_merge_requests_from(merge_request).each do |merge_request|
MergeTrains::RefreshMergeRequestService
......@@ -27,5 +49,9 @@ module MergeTrains
def following_merge_requests_from(merge_request)
merge_request.merge_train.all_next.to_a.unshift(merge_request)
end
def queue_id(merge_request)
"#{merge_request.target_project_id}:#{merge_request.target_branch}"
end
end
end
---
title: Fix race condition on merge train that it cannot process merge request sometimes
merge_request: 14386
author:
type: fixed
......@@ -67,6 +67,36 @@ describe MergeTrain do
end
end
describe '.first_in_train_from' do
subject { described_class.first_in_train_from(merge_request_ids) }
context 'when arguments is null' do
let(:merge_request_ids) { nil }
it 'raises an error' do
expect { subject }.to raise_error(NoMethodError)
end
end
context 'when there are two merge requests on the same merge train' do
let(:merge_request_ids) { [merge_request_1.id, merge_request_2.id] }
let!(:merge_request_1) { create_merge_request_on_train }
let!(:merge_request_2) { create_merge_request_on_train(source_branch: 'improve/awesome') }
it 'returns the first merge request on the merge train from the given ids' do
is_expected.to eq(merge_request_1)
end
context "when specifies merge request 2's id only" do
let(:merge_request_ids) { [merge_request_2.id] }
it 'returns the first merge request on the merge train from the given ids' do
is_expected.to eq(merge_request_2)
end
end
end
end
describe '.total_count_in_train' do
subject { described_class.total_count_in_train(merge_request) }
......
......@@ -5,7 +5,7 @@ require 'spec_helper'
describe MergeTrains::RefreshMergeRequestsService do
include ExclusiveLeaseHelpers
set(:project) { create(:project) }
let(:project) { create(:project) }
set(:maintainer_1) { create(:user) }
set(:maintainer_2) { create(:user) }
let(:service) { described_class.new(project, maintainer_1) }
......@@ -61,17 +61,45 @@ describe MergeTrains::RefreshMergeRequestsService do
end
end
context 'when the exlusive lock has already been taken' do
let(:lease_key) do
"merge_train:#{merge_request_1.target_project_id}-#{merge_request_1.target_branch}"
context 'when the other thread has already been processing the merge train' do
let(:lock_key) { "batch_pop_queueing:lock:merge_trains:#{merge_request.target_project_id}:#{merge_request.target_branch}" }
before do
stub_exclusive_lease_taken(lock_key)
end
it 'does not refresh' do
expect(refresh_service_1).not_to receive(:execute).with(merge_request_1)
subject
end
it 'enqueues the merge request id to BatchPopQueueing' do
expect_next_instance_of(Gitlab::BatchPopQueueing) do |queuing|
expect(queuing).to receive(:enqueue).with([merge_request_1.id], anything).and_call_original
end
subject
end
end
context 'when merge_trains_efficient_refresh is disabled' do
before do
stub_exclusive_lease_taken(lease_key)
stub_feature_flags(merge_trains_efficient_refresh: false)
end
it 'raises FailedToObtainLockError' do
expect { subject }.to raise_error(Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError)
context 'when the exclusive lock has already been taken' do
let(:lease_key) do
"merge_train:#{merge_request_1.target_project_id}-#{merge_request_1.target_branch}"
end
before do
stub_exclusive_lease_taken(lease_key)
end
it 'raises FailedToObtainLockError' do
expect { subject }.to raise_error(Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError)
end
end
end
end
......@@ -85,6 +113,20 @@ describe MergeTrains::RefreshMergeRequestsService do
subject
end
context 'when merge request 1 was tried to be refreshed while the system is refreshing merge request 2' do
before do
allow_any_instance_of(described_class).to receive(:unsafe_refresh).with(merge_request_2) do
service.execute(merge_request_1)
end
end
it 'refreshes the merge request 1 later with AutoMergeProcessWorker' do
expect(AutoMergeProcessWorker).to receive(:perform_async).with(merge_request_1.id).once
subject
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
##
# This class is a queuing system for processing expensive tasks in an atomic manner
# with batch poping to let you optimize the total processing time.
#
# In usual queuing system, the first item started being processed immediately
# and the following items wait until the next items have been popped from the queue.
# On the other hand, this queueing system, the former part is same, however,
# it pops the enqueued items as batch. This is especially useful when you want to
# drop redandant items from the queue in order to process important items only,
# thus it's more efficient than the traditional queueing system.
#
# Caveats:
# - The order of the items are not guaranteed because of `sadd` (Redis Sets).
#
# Example:
# ```
# class TheWorker
# def perform
# result = Gitlab::BatchPopQueueing.new('feature', 'queue').safe_execute([item]) do |items_in_queue|
# item = extract_the_most_important_item_from(items_in_queue)
# expensive_process(item)
# end
#
# if result[:status] == :finished && result[:new_items].present?
# item = extract_the_most_important_item_from(items_in_queue)
# TheWorker.perform_async(item.id)
# end
# end
# end
# ```
#
class BatchPopQueueing
attr_reader :namespace, :queue_id
EXTRA_QUEUE_EXPIRE_WINDOW = 1.hour
MAX_COUNTS_OF_POP_ALL = 1000
# Initialize queue
#
# @param [String] namespace The namespace of the exclusive lock and queue key. Typically, it's a feature name.
# @param [String] queue_id The identifier of the queue.
# @return [Boolean]
def initialize(namespace, queue_id)
raise ArgumentError if namespace.empty? || queue_id.empty?
@namespace, @queue_id = namespace, queue_id
end
##
# Execute the given block in an exclusive lock.
# If there is the other thread has already working on the block,
# it enqueues the items without processing the block.
#
# @param [Array<String>] new_items New items to be added to the queue.
# @param [Time] lock_timeout The timeout of the exclusive lock. Generally, this value should be longer than the maximum prosess timing of the given block.
# @return [Hash]
# - status => One of the `:enqueued` or `:finished`.
# - new_items => Newly enqueued items during the given block had been processed.
#
# NOTE: If an exception is raised in the block, the poppped items will not be recovered.
# We should NOT re-enqueue the items in this case because it could end up in an infinite loop.
def safe_execute(new_items, lock_timeout: 10.minutes, &block)
enqueue(new_items, lock_timeout + EXTRA_QUEUE_EXPIRE_WINDOW)
lease = Gitlab::ExclusiveLease.new(lock_key, timeout: lock_timeout)
return { status: :enqueued } unless uuid = lease.try_obtain
begin
all_args = pop_all
yield all_args if block_given?
{ status: :finished, new_items: peek_all }
ensure
Gitlab::ExclusiveLease.cancel(lock_key, uuid)
end
end
private
def lock_key
@lock_key ||= "batch_pop_queueing:lock:#{namespace}:#{queue_id}"
end
def queue_key
@queue_key ||= "batch_pop_queueing:queue:#{namespace}:#{queue_id}"
end
def enqueue(items, expire_time)
Gitlab::Redis::Queues.with do |redis|
redis.sadd(queue_key, items)
redis.expire(queue_key, expire_time.to_i)
end
end
def pop_all
Gitlab::Redis::Queues.with do |redis|
redis.spop(queue_key, MAX_COUNTS_OF_POP_ALL)
end
end
def peek_all
Gitlab::Redis::Queues.with do |redis|
redis.smembers(queue_key)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::BatchPopQueueing do
include ExclusiveLeaseHelpers
using RSpec::Parameterized::TableSyntax
describe '#initialize' do
where(:namespace, :queue_id, :expect_error, :error_type) do
'feature' | '1' | false | nil
:feature | '1' | false | nil
nil | '1' | true | NoMethodError
'feature' | nil | true | NoMethodError
'' | '1' | true | ArgumentError
'feature' | '' | true | ArgumentError
'feature' | 1 | true | NoMethodError
end
with_them do
it do
if expect_error
expect { described_class.new(namespace, queue_id) }.to raise_error(error_type)
else
expect { described_class.new(namespace, queue_id) }.not_to raise_error
end
end
end
end
describe '#safe_execute', :clean_gitlab_redis_queues do
subject { queue.safe_execute(new_items, lock_timeout: lock_timeout) }
let(:queue) { described_class.new(namespace, queue_id) }
let(:namespace) { 'feature' }
let(:queue_id) { '1' }
let(:lock_timeout) { 10.minutes }
let(:new_items) { %w[A B] }
let(:lock_key) { queue.send(:lock_key) }
let(:queue_key) { queue.send(:queue_key) }
it 'enqueues new items always' do
Gitlab::Redis::Queues.with do |redis|
expect(redis).to receive(:sadd).with(queue_key, new_items)
expect(redis).to receive(:expire).with(queue_key, (lock_timeout + described_class::EXTRA_QUEUE_EXPIRE_WINDOW).to_i)
end
subject
end
it 'yields the new items with exclusive lease' do
uuid = 'test'
expect_to_obtain_exclusive_lease(lock_key, uuid, timeout: lock_timeout)
expect_to_cancel_exclusive_lease(lock_key, uuid)
expect { |b| queue.safe_execute(new_items, lock_timeout: lock_timeout, &b) }
.to yield_with_args(match_array(new_items))
end
it 'returns the result and no items in the queue' do
expect(subject[:status]).to eq(:finished)
expect(subject[:new_items]).to be_empty
Gitlab::Redis::Queues.with do |redis|
expect(redis.llen(queue_key)).to be(0)
end
end
context 'when new items are enqueued during the process' do
it 'returns the result with newly added items' do
result = queue.safe_execute(new_items) do
queue.safe_execute(['C'])
end
expect(result[:status]).to eq(:finished)
expect(result[:new_items]).to eq(['C'])
Gitlab::Redis::Queues.with do |redis|
expect(redis.scard(queue_key)).to be(1)
end
end
end
context 'when interger items are enqueued' do
let(:new_items) { [1, 2, 3] }
it 'yields as String values' do
expect { |b| queue.safe_execute(new_items, lock_timeout: lock_timeout, &b) }
.to yield_with_args(%w[1 2 3])
end
end
context 'when the queue key does not exist in Redis' do
before do
allow(queue).to receive(:enqueue) { }
end
it 'yields empty array' do
expect { |b| queue.safe_execute(new_items, lock_timeout: lock_timeout, &b) }
.to yield_with_args([])
end
end
context 'when the other process has already been working on the queue' do
before do
stub_exclusive_lease_taken(lock_key, timeout: lock_timeout)
end
it 'does not yield the block' do
expect { |b| queue.safe_execute(new_items, lock_timeout: lock_timeout, &b) }
.not_to yield_control
end
it 'returns the result' do
expect(subject[:status]).to eq(:enqueued)
end
end
context 'when a duplicate item is enqueued' do
it 'returns the poped items to the queue and raise an error' do
expect { |b| queue.safe_execute(%w[1 1 2 2], &b) }
.to yield_with_args(match_array(%w[1 2]))
end
end
context 'when there are two queues' do
it 'enqueues items to each queue' do
queue_1 = described_class.new(namespace, '1')
queue_2 = described_class.new(namespace, '2')
result_2 = nil
result_1 = queue_1.safe_execute(['A']) do |_|
result_2 = queue_2.safe_execute(['B']) do |_|
queue_1.safe_execute(['C'])
queue_2.safe_execute(['D'])
end
end
expect(result_1[:status]).to eq(:finished)
expect(result_1[:new_items]).to eq(['C'])
expect(result_2[:status]).to eq(:finished)
expect(result_2[:new_items]).to eq(['D'])
end
end
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment