Skip to content
Snippets Groups Projects
Commit e413e8c8 authored by James Fargher's avatar James Fargher
Browse files

Merge branch 'kassio/bulkimports-handle-network-errors' into 'master'

BulkImports: Handle network errors

See merge request !68582
parents 35f1082c 0c8948dd
No related branches found
No related tags found
1 merge request!68582BulkImports: Handle network errors
Pipeline #379882384 failed
Pipeline: CNG-mirror

#379888024

    ......@@ -50,6 +50,8 @@ def pipeline_class
    event :start do
    transition created: :started
    # To avoid errors when re-starting a pipeline in case of network errors
    transition started: :started
    end
    event :finish do
    ......
    ......@@ -16,7 +16,7 @@ class PipelineWorker # rubocop:disable Scalability/IdempotentWorker
    def perform(pipeline_tracker_id, stage, entity_id)
    pipeline_tracker = ::BulkImports::Tracker
    .with_status(:created)
    .with_status(:created, :started)
    .find_by_id(pipeline_tracker_id)
    if pipeline_tracker.present?
    ......@@ -59,18 +59,35 @@ def run(pipeline_tracker)
    pipeline_tracker.pipeline_class.new(context).run
    pipeline_tracker.finish!
    rescue BulkImports::NetworkError => e
    if e.retriable?(pipeline_tracker)
    logger.error(
    worker: self.class.name,
    entity_id: pipeline_tracker.entity.id,
    pipeline_name: pipeline_tracker.pipeline_name,
    message: "Retrying error: #{e.message}"
    )
    reenqueue(pipeline_tracker, delay: e.retry_delay)
    else
    fail_tracker(pipeline_tracker, e)
    end
    rescue StandardError => e
    fail_tracker(pipeline_tracker, e)
    end
    def fail_tracker(pipeline_tracker, exception)
    pipeline_tracker.update!(status_event: 'fail_op', jid: jid)
    logger.error(
    worker: self.class.name,
    entity_id: pipeline_tracker.entity.id,
    pipeline_name: pipeline_tracker.pipeline_name,
    message: e.message
    message: exception.message
    )
    Gitlab::ErrorTracking.track_exception(
    e,
    exception,
    entity_id: pipeline_tracker.entity.id,
    pipeline_name: pipeline_tracker.pipeline_name
    )
    ......@@ -88,8 +105,13 @@ def job_timeout?(pipeline_tracker)
    (Time.zone.now - pipeline_tracker.entity.created_at) > Pipeline::NDJSON_EXPORT_TIMEOUT
    end
    def reenqueue(pipeline_tracker)
    self.class.perform_in(NDJSON_PIPELINE_PERFORM_DELAY, pipeline_tracker.id, pipeline_tracker.stage, pipeline_tracker.entity.id)
    def reenqueue(pipeline_tracker, delay: NDJSON_PIPELINE_PERFORM_DELAY)
    self.class.perform_in(
    delay,
    pipeline_tracker.id,
    pipeline_tracker.stage,
    pipeline_tracker.entity.id
    )
    end
    end
    end
    ......@@ -17,6 +17,8 @@ def execute(document:, operation_name: nil, variables: {}, context: {})
    )
    ::Gitlab::Json.parse(response.body)
    rescue *Gitlab::HTTP::HTTP_ERRORS => e
    raise ::BulkImports::NetworkError, e
    end
    end
    private_constant :HTTP
    ......
    ......@@ -113,11 +113,11 @@ def request_headers
    def with_error_handling
    response = yield
    raise(::BulkImports::Error, "Error #{response.code}") unless response.success?
    raise ::BulkImports::NetworkError.new(response: response) unless response.success?
    response
    rescue *Gitlab::HTTP::HTTP_ERRORS => e
    raise(::BulkImports::Error, e)
    raise ::BulkImports::NetworkError, e
    end
    def api_url
    ......
    # frozen_string_literal: true
    module BulkImports
    class NetworkError < Error
    COUNTER_KEY = 'bulk_imports/%{entity_id}/%{stage}/%{tracker_id}/network_error/%{error}'
    RETRIABLE_EXCEPTIONS = Gitlab::HTTP::HTTP_TIMEOUT_ERRORS
    RETRIABLE_HTTP_CODES = [429].freeze
    DEFAULT_RETRY_DELAY_SECONDS = 60
    MAX_RETRIABLE_COUNT = 3
    def initialize(message = nil, response: nil)
    raise ArgumentError, 'message or response required' if message.blank? && response.blank?
    super(message)
    @response = response
    end
    def retriable?(tracker)
    if retriable_exception? || retriable_http_code?
    increment(tracker) <= MAX_RETRIABLE_COUNT
    else
    false
    end
    end
    def retry_delay
    if response&.code == 429
    response.headers.fetch('Retry-After', DEFAULT_RETRY_DELAY_SECONDS).to_i
    else
    DEFAULT_RETRY_DELAY_SECONDS
    end.seconds
    end
    private
    attr_reader :response
    def retriable_exception?
    RETRIABLE_EXCEPTIONS.include?(cause&.class)
    end
    def retriable_http_code?
    RETRIABLE_HTTP_CODES.include?(response&.code)
    end
    def increment(tracker)
    key = COUNTER_KEY % {
    stage: tracker.stage,
    tracker_id: tracker.id,
    entity_id: tracker.entity.id,
    error: cause.class.name
    }
    Gitlab::Cache::Import::Caching.increment(key)
    end
    end
    end
    ......@@ -84,8 +84,10 @@ def self.increment(raw_key, timeout: TIMEOUT)
    key = cache_key_for(raw_key)
    Redis::Cache.with do |redis|
    redis.incr(key)
    value = redis.incr(key)
    redis.expire(key, timeout)
    value
    end
    end
    ......
    ......@@ -32,7 +32,7 @@
    it 'raises BulkImports::Error' do
    allow(Gitlab::HTTP).to receive(method).and_raise(Errno::ECONNREFUSED)
    expect { subject.public_send(method, resource) }.to raise_exception(BulkImports::Error)
    expect { subject.public_send(method, resource) }.to raise_exception(BulkImports::NetworkError)
    end
    end
    ......@@ -42,7 +42,7 @@
    allow(Gitlab::HTTP).to receive(method).and_return(response_double)
    expect { subject.public_send(method, resource) }.to raise_exception(BulkImports::Error)
    expect { subject.public_send(method, resource) }.to raise_exception(BulkImports::NetworkError)
    end
    end
    end
    ......@@ -180,7 +180,11 @@ def stub_http_get(path, query, response)
    let(:version) { '13.0.0' }
    it 'raises an error' do
    expect { subject.get(resource) }.to raise_error(::BulkImports::Error, "Unsupported GitLab Version. Minimum Supported Gitlab Version #{BulkImport::MINIMUM_GITLAB_MAJOR_VERSION}.")
    expect { subject.get(resource) }
    .to raise_error(
    ::BulkImports::Error,
    "Unsupported GitLab Version. Minimum Supported Gitlab Version #{BulkImport::MINIMUM_GITLAB_MAJOR_VERSION}."
    )
    end
    end
    ......
    # frozen_string_literal: true
    require 'spec_helper'
    RSpec.describe BulkImports::NetworkError, :clean_gitlab_redis_cache do
    let(:tracker) { double(id: 1, stage: 2, entity: double(id: 3)) }
    describe '.new' do
    it 'requires either a message or a HTTP response' do
    expect { described_class.new }
    .to raise_error(ArgumentError, 'message or response required')
    end
    end
    describe '#retriable?' do
    it 'returns true for MAX_RETRIABLE_COUNT times when cause if one of RETRIABLE_EXCEPTIONS' do
    raise described_class::RETRIABLE_EXCEPTIONS.sample
    rescue StandardError => cause
    begin
    raise described_class, cause
    rescue StandardError => exception
    described_class::MAX_RETRIABLE_COUNT.times do
    expect(exception.retriable?(tracker)).to eq(true)
    end
    expect(exception.retriable?(tracker)).to eq(false)
    end
    end
    it 'returns true for MAX_RETRIABLE_COUNT times when response is one of RETRIABLE_CODES' do
    exception = described_class.new(response: double(code: 429))
    described_class::MAX_RETRIABLE_COUNT.times do
    expect(exception.retriable?(tracker)).to eq(true)
    end
    expect(exception.retriable?(tracker)).to eq(false)
    end
    it 'returns false for other exceptions' do
    raise StandardError
    rescue StandardError => cause
    begin
    raise described_class, cause
    rescue StandardError => exception
    expect(exception.retriable?(tracker)).to eq(false)
    end
    end
    end
    describe '#retry_delay' do
    it 'returns the default value when there is not a rate limit error' do
    exception = described_class.new('foo')
    expect(exception.retry_delay).to eq(described_class::DEFAULT_RETRY_DELAY_SECONDS.seconds)
    end
    context 'when the exception is a rate limit error' do
    it 'returns the "Retry-After"' do
    exception = described_class.new(response: double(code: 429, headers: { 'Retry-After' => 20 }))
    expect(exception.retry_delay).to eq(20.seconds)
    end
    it 'returns the default value when there is no "Retry-After" header' do
    exception = described_class.new(response: double(code: 429, headers: {}))
    expect(exception.retry_delay).to eq(described_class::DEFAULT_RETRY_DELAY_SECONDS.seconds)
    end
    end
    end
    end
    ......@@ -58,6 +58,16 @@
    end
    end
    describe '.increment' do
    it 'increment a key and returns the current value' do
    expect(described_class.increment('foo')).to eq(1)
    value = Gitlab::Redis::Cache.with { |r| r.get(described_class.cache_key_for('foo')) }
    expect(value.to_i).to eq(1)
    end
    end
    describe '.set_add' do
    it 'adds a value to a set' do
    described_class.set_add('foo', 10)
    ......
    ......@@ -27,42 +27,59 @@ def self.ndjson_pipeline?
    .and_return([[0, pipeline_class]])
    end
    it 'runs the given pipeline successfully' do
    pipeline_tracker = create(
    :bulk_import_tracker,
    entity: entity,
    pipeline_name: 'FakePipeline'
    )
    expect_next_instance_of(Gitlab::Import::Logger) do |logger|
    expect(logger)
    .to receive(:info)
    .with(
    worker: described_class.name,
    pipeline_name: 'FakePipeline',
    entity_id: entity.id
    )
    end
    shared_examples 'successfully runs the pipeline' do
    it 'runs the given pipeline successfully' do
    expect_next_instance_of(Gitlab::Import::Logger) do |logger|
    expect(logger)
    .to receive(:info)
    .with(
    worker: described_class.name,
    pipeline_name: 'FakePipeline',
    entity_id: entity.id
    )
    end
    expect(BulkImports::EntityWorker)
    .to receive(:perform_async)
    .with(entity.id, pipeline_tracker.stage)
    expect(BulkImports::EntityWorker)
    .to receive(:perform_async)
    .with(entity.id, pipeline_tracker.stage)
    expect(subject).to receive(:jid).and_return('jid')
    expect(subject).to receive(:jid).and_return('jid')
    subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
    subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
    pipeline_tracker.reload
    pipeline_tracker.reload
    expect(pipeline_tracker.status_name).to eq(:finished)
    expect(pipeline_tracker.jid).to eq('jid')
    end
    end
    expect(pipeline_tracker.status_name).to eq(:finished)
    expect(pipeline_tracker.jid).to eq('jid')
    it_behaves_like 'successfully runs the pipeline' do
    let(:pipeline_tracker) do
    create(
    :bulk_import_tracker,
    entity: entity,
    pipeline_name: 'FakePipeline'
    )
    end
    end
    it_behaves_like 'successfully runs the pipeline' do
    let(:pipeline_tracker) do
    create(
    :bulk_import_tracker,
    :started,
    entity: entity,
    pipeline_name: 'FakePipeline'
    )
    end
    end
    context 'when the pipeline cannot be found' do
    it 'logs the error' do
    pipeline_tracker = create(
    :bulk_import_tracker,
    :started,
    :finished,
    entity: entity,
    pipeline_name: 'FakePipeline'
    )
    ......@@ -126,6 +143,39 @@ def self.ndjson_pipeline?
    expect(pipeline_tracker.status_name).to eq(:failed)
    expect(pipeline_tracker.jid).to eq('jid')
    end
    context 'when it is a network error' do
    it 'reenqueue on retriable network errors' do
    pipeline_tracker = create(
    :bulk_import_tracker,
    entity: entity,
    pipeline_name: 'FakePipeline'
    )
    exception = BulkImports::NetworkError.new(
    response: double(code: 429, headers: {})
    )
    expect_next_instance_of(pipeline_class) do |pipeline|
    expect(pipeline)
    .to receive(:run)
    .and_raise(exception)
    end
    expect(subject).to receive(:jid).and_return('jid')
    expect(described_class)
    .to receive(:perform_in)
    .with(
    60.seconds,
    pipeline_tracker.id,
    pipeline_tracker.stage,
    pipeline_tracker.entity.id
    )
    subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
    end
    end
    end
    context 'when ndjson pipeline' do
    ......
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Finish editing this message first!
    Please register or to comment