Commit e7ee84aa authored by Kamil Trzciński's avatar Kamil Trzciński 🔴 Committed by Sean McGivern

Add support for DAG

This implements the support for `needs:` keyword
as part of GitLab CI. That makes some of the jobs
to be run out of order.
parent 2cf9769d
......@@ -38,6 +38,7 @@ module Ci
has_one :deployment, as: :deployable, class_name: 'Deployment'
has_many :trace_sections, class_name: 'Ci::BuildTraceSection'
has_many :trace_chunks, class_name: 'Ci::BuildTraceChunk', foreign_key: :build_id
has_many :needs, class_name: 'Ci::BuildNeed', foreign_key: :build_id, inverse_of: :build
has_many :job_artifacts, class_name: 'Ci::JobArtifact', foreign_key: :job_id, dependent: :destroy, inverse_of: :job # rubocop:disable Cop/ActiveRecordDependent
has_many :job_variables, class_name: 'Ci::JobVariable', foreign_key: :job_id
......@@ -50,6 +51,7 @@ module Ci
accepts_nested_attributes_for :runner_session
accepts_nested_attributes_for :job_variables
accepts_nested_attributes_for :needs
delegate :url, to: :runner_session, prefix: true, allow_nil: true
delegate :terminal_specification, to: :runner_session, allow_nil: true
......@@ -713,11 +715,21 @@ module Ci
depended_jobs = depends_on_builds
return depended_jobs unless options[:dependencies].present?
# find all jobs that are dependent on
if options[:dependencies].present?
depended_jobs = depended_jobs.select do |job|
options[:dependencies].include?(job.name)
end
end
depended_jobs.select do |job|
options[:dependencies].include?(job.name)
# find all jobs that are needed by this one
if options[:needs].present?
depended_jobs = depended_jobs.select do |job|
options[:needs].include?(job.name)
end
end
depended_jobs
end
def empty_dependencies?
......
# frozen_string_literal: true
module Ci
class BuildNeed < ApplicationRecord
extend Gitlab::Ci::Model
belongs_to :build, class_name: "Ci::Build", foreign_key: :build_id, inverse_of: :needs
validates :build, presence: true
validates :name, presence: true, length: { maximum: 128 }
scope :scoped_build, -> { where('ci_builds.id=ci_build_needs.build_id') }
end
end
......@@ -611,8 +611,8 @@ module Ci
end
# rubocop: disable CodeReuse/ServiceClass
def process!
Ci::ProcessPipelineService.new(project, user).execute(self)
def process!(trigger_build_name = nil)
Ci::ProcessPipelineService.new(project, user).execute(self, trigger_build_name)
end
# rubocop: enable CodeReuse/ServiceClass
......
......@@ -43,6 +43,12 @@ class CommitStatus < ApplicationRecord
scope :after_stage, -> (index) { where('stage_idx > ?', index) }
scope :processables, -> { where(type: %w[Ci::Build Ci::Bridge]) }
scope :with_needs, -> (names = nil) do
needs = Ci::BuildNeed.scoped_build.select(1)
needs = needs.where(name: names) if names
where('EXISTS (?)', needs).preload(:needs)
end
# We use `CommitStatusEnums.failure_reasons` here so that EE can more easily
# extend this `Hash` with new values.
enum_with_nil failure_reason: ::CommitStatusEnums.failure_reasons
......@@ -116,7 +122,7 @@ class CommitStatus < ApplicationRecord
commit_status.run_after_commit do
if pipeline_id
if complete? || manual?
PipelineProcessWorker.perform_async(pipeline_id)
BuildProcessWorker.perform_async(id)
else
PipelineUpdateWorker.perform_async(pipeline_id)
end
......
......@@ -29,6 +29,7 @@ module Ci
def degenerate!
self.class.transaction do
self.update!(options: nil, yaml_variables: nil)
self.needs.all.delete_all
self.metadata&.destroy
end
end
......
......@@ -4,19 +4,23 @@ module Ci
class ProcessPipelineService < BaseService
attr_reader :pipeline
def execute(pipeline)
def execute(pipeline, trigger_build_name = nil)
@pipeline = pipeline
update_retried
new_builds =
success =
stage_indexes_of_created_processables.flat_map do |index|
process_stage(index)
end
end.any?
# we evaluate dependent needs,
# only when the another job has finished
success = process_builds_with_needs(trigger_build_name) || success
@pipeline.update_status
new_builds.any?
success
end
private
......@@ -36,12 +40,40 @@ module Ci
end
end
def process_builds_with_needs(trigger_build_name)
return false unless trigger_build_name
return false unless Feature.enabled?(:ci_dag_support, project)
created_processables
.with_needs(trigger_build_name)
.find_each
.map(&method(:process_build_with_needs))
.any?
end
def process_build_with_needs(build)
current_status = status_for_build_needs(build.needs.map(&:name))
return unless HasStatus::COMPLETED_STATUSES.include?(current_status)
Gitlab::OptimisticLocking.retry_lock(build) do |subject|
Ci::ProcessBuildService.new(project, @user)
.execute(subject, current_status)
end
end
# rubocop: disable CodeReuse/ActiveRecord
def status_for_prior_stages(index)
pipeline.builds.where('stage_idx < ?', index).latest.status || 'success'
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def status_for_build_needs(needs)
pipeline.builds.where(name: needs).latest.status || 'success'
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def stage_indexes_of_created_processables
created_processables.order(:stage_idx).pluck(Arel.sql('DISTINCT stage_idx'))
......
......@@ -5,7 +5,7 @@ module Ci
CLONE_ACCESSORS = %i[pipeline project ref tag options name
allow_failure stage stage_id stage_idx trigger_request
yaml_variables when environment coverage_regex
description tag_list protected].freeze
description tag_list protected needs].freeze
def execute(build)
reprocess!(build).tap do |new_build|
......
......@@ -88,6 +88,7 @@
- pipeline_processing:ci_build_prepare
- pipeline_processing:build_queue
- pipeline_processing:build_success
- pipeline_processing:build_process
- pipeline_processing:pipeline_process
- pipeline_processing:pipeline_success
- pipeline_processing:pipeline_update
......
# frozen_string_literal: true
class BuildProcessWorker
include ApplicationWorker
include PipelineQueue
queue_namespace :pipeline_processing
# rubocop: disable CodeReuse/ActiveRecord
def perform(build_id)
CommitStatus.find_by(id: build_id).try do |build|
build.pipeline.process!(build.name)
end
end
# rubocop: enable CodeReuse/ActiveRecord
end
---
title: "Support creating DAGs in CI config through the `needs` key"
merge_request: 31328
author:
type: added
# frozen_string_literal: true
# See http://doc.gitlab.com/ce/development/migration_style_guide.html
# for more information on how to write migrations for GitLab.
class AddBuildNeed < ActiveRecord::Migration[5.2]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
def change
create_table :ci_build_needs, id: :serial do |t|
t.integer :build_id, null: false
t.text :name, null: false
t.index [:build_id, :name], unique: true
t.foreign_key :ci_builds, column: :build_id, on_delete: :cascade
end
end
end
......@@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 2019_07_29_090456) do
ActiveRecord::Schema.define(version: 2019_07_31_084415) do
# These are extensions that must be enabled in order to support this database
enable_extension "pg_trgm"
......@@ -454,6 +454,12 @@ ActiveRecord::Schema.define(version: 2019_07_29_090456) do
t.index ["namespace_id"], name: "index_chat_teams_on_namespace_id", unique: true
end
create_table "ci_build_needs", id: :serial, force: :cascade do |t|
t.integer "build_id", null: false
t.text "name", null: false
t.index ["build_id", "name"], name: "index_ci_build_needs_on_build_id_and_name", unique: true
end
create_table "ci_build_trace_chunks", force: :cascade do |t|
t.integer "build_id", null: false
t.integer "chunk_index", null: false
......@@ -3635,6 +3641,7 @@ ActiveRecord::Schema.define(version: 2019_07_29_090456) do
add_foreign_key "boards", "namespaces", column: "group_id", on_delete: :cascade
add_foreign_key "boards", "projects", name: "fk_f15266b5f9", on_delete: :cascade
add_foreign_key "chat_teams", "namespaces", on_delete: :cascade
add_foreign_key "ci_build_needs", "ci_builds", column: "build_id", on_delete: :cascade
add_foreign_key "ci_build_trace_chunks", "ci_builds", column: "build_id", on_delete: :cascade
add_foreign_key "ci_build_trace_section_names", "projects", on_delete: :cascade
add_foreign_key "ci_build_trace_sections", "ci_build_trace_section_names", column: "section_name_id", name: "fk_264e112c66", on_delete: :cascade
......
......@@ -13,7 +13,7 @@ module Gitlab
ALLOWED_KEYS = %i[tags script only except type image services
allow_failure type stage when start_in artifacts cache
dependencies before_script after_script variables
dependencies needs before_script after_script variables
environment coverage retry parallel extends].freeze
validations do
......@@ -34,11 +34,22 @@ module Gitlab
message: 'should be on_success, on_failure, ' \
'always, manual or delayed' }
validates :dependencies, array_of_strings: true
validates :needs, array_of_strings: true
validates :extends, array_of_strings_or_string: true
end
validates :start_in, duration: { limit: '1 day' }, if: :delayed?
validates :start_in, absence: true, unless: :delayed?
validate do
next unless dependencies.present?
next unless needs.present?
missing_needs = dependencies - needs
if missing_needs.any?
errors.add(:dependencies, "the #{missing_needs.join(", ")} should be part of needs")
end
end
end
entry :before_script, Entry::Script,
......@@ -95,10 +106,10 @@ module Gitlab
helpers :before_script, :script, :stage, :type, :after_script,
:cache, :image, :services, :only, :except, :variables,
:artifacts, :environment, :coverage, :retry,
:parallel
:parallel, :needs
attributes :script, :tags, :allow_failure, :when, :dependencies,
:retry, :parallel, :extends, :start_in
:needs, :retry, :parallel, :extends, :start_in
def self.matching?(name, config)
!name.to_s.start_with?('.') &&
......@@ -178,7 +189,8 @@ module Gitlab
parallel: parallel_defined? ? parallel_value.to_i : nil,
artifacts: artifacts_value,
after_script: after_script_value,
ignore: ignored? }
ignore: ignored?,
needs: needs_defined? ? needs_value : nil }
end
end
end
......
......@@ -4,61 +4,63 @@ module Gitlab
module Ci
class Config
class Normalizer
include Gitlab::Utils::StrongMemoize
def initialize(jobs_config)
@jobs_config = jobs_config
end
def normalize_jobs
extract_parallelized_jobs!
return @jobs_config if @parallelized_jobs.empty?
return @jobs_config if parallelized_jobs.empty?
expand_parallelize_jobs do |job_name, config|
if config[:dependencies]
config[:dependencies] = expand_names(config[:dependencies])
end
parallelized_config = parallelize_jobs
parallelize_dependencies(parallelized_config)
if config[:needs]
config[:needs] = expand_names(config[:needs])
end
config
end
end
private
def extract_parallelized_jobs!
@parallelized_jobs = {}
def expand_names(job_names)
return unless job_names
@jobs_config.each do |job_name, config|
if config[:parallel]
@parallelized_jobs[job_name] = self.class.parallelize_job_names(job_name, config[:parallel])
end
job_names.flat_map do |job_name|
parallelized_jobs[job_name.to_sym] || job_name
end
@parallelized_jobs
end
def parallelize_jobs
@jobs_config.each_with_object({}) do |(job_name, config), hash|
if @parallelized_jobs.key?(job_name)
@parallelized_jobs[job_name].each { |name, index| hash[name.to_sym] = config.merge(name: name, instance: index) }
else
hash[job_name] = config
end
def parallelized_jobs
strong_memoize(:parallelized_jobs) do
@jobs_config.each_with_object({}) do |(job_name, config), hash|
next unless config[:parallel]
hash
hash[job_name] = self.class.parallelize_job_names(job_name, config[:parallel])
end
end
end
def parallelize_dependencies(parallelized_config)
parallelized_job_names = @parallelized_jobs.keys.map(&:to_s)
parallelized_config.each_with_object({}) do |(job_name, config), hash|
if config[:dependencies] && (intersection = config[:dependencies] & parallelized_job_names).any?
parallelized_deps = intersection.flat_map { |dep| @parallelized_jobs[dep.to_sym].map(&:first) }
deps = config[:dependencies] - intersection + parallelized_deps
hash[job_name] = config.merge(dependencies: deps)
def expand_parallelize_jobs
@jobs_config.each_with_object({}) do |(job_name, config), hash|
if parallelized_jobs.key?(job_name)
parallelized_jobs[job_name].each_with_index do |name, index|
hash[name.to_sym] =
yield(name, config.merge(name: name, instance: index + 1))
end
else
hash[job_name] = config
hash[job_name] = yield(job_name, config)
end
hash
end
end
def self.parallelize_job_names(name, total)
Array.new(total) { |index| ["#{name} #{index + 1}/#{total}", index + 1] }
Array.new(total) { |index| "#{name} #{index + 1}/#{total}" }
end
end
end
......
......@@ -40,6 +40,7 @@ module Gitlab
environment: job[:environment_name],
coverage_regex: job[:coverage],
yaml_variables: yaml_variables(name),
needs_attributes: job[:needs]&.map { |need| { name: need } },
options: {
image: job[:image],
services: job[:services],
......@@ -108,6 +109,7 @@ module Gitlab
validate_job_stage!(name, job)
validate_job_dependencies!(name, job)
validate_job_needs!(name, job)
validate_job_environment!(name, job)
end
end
......@@ -152,6 +154,22 @@ module Gitlab
end
end
def validate_job_needs!(name, job)
return unless job[:needs]
stage_index = @stages.index(job[:stage])
job[:needs].each do |need|
raise ValidationError, "#{name} job: undefined need: #{need}" unless @jobs[need.to_sym]
needs_stage_index = @stages.index(@jobs[need.to_sym][:stage])
unless needs_stage_index.present? && needs_stage_index < stage_index
raise ValidationError, "#{name} job: need #{need} is not defined in prior stages"
end
end
end
def validate_job_environment!(name, job)
return unless job[:environment]
return unless job[:environment].is_a?(Hash)
......
# frozen_string_literal: true
FactoryBot.define do
factory :ci_build_need, class: Ci::BuildNeed do
build factory: :ci_build
sequence(:name) { |n| "build_#{n}" }
end
end
......@@ -86,6 +86,22 @@ describe Gitlab::Ci::Config::Entry::Job do
it { expect(entry).to be_valid }
end
end
context 'when has needs' do
let(:config) do
{ script: 'echo', needs: ['another-job'] }
end
it { expect(entry).to be_valid }
context 'when has dependencies' do
let(:config) do
{ script: 'echo', dependencies: ['another-job'], needs: ['another-job'] }
end
it { expect(entry).to be_valid }
end
end
end
context 'when entry value is not correct' do
......@@ -223,6 +239,43 @@ describe Gitlab::Ci::Config::Entry::Job do
expect(entry.errors).to include 'job start in must be blank'
end
end
context 'when has dependencies' do
context 'that are not a array of strings' do
let(:config) do
{ script: 'echo', dependencies: 'build-job' }
end
it 'returns error about invalid type' do
expect(entry).not_to be_valid
expect(entry.errors).to include 'job dependencies should be an array of strings'
end
end
end
context 'when has needs' do
context 'that are not a array of strings' do
let(:config) do
{ script: 'echo', needs: 'build-job' }
end
it 'returns error about invalid type' do
expect(entry).not_to be_valid
expect(entry.errors).to include 'job needs should be an array of strings'
end
end
context 'when have dependencies that are not subset of needs' do
let(:config) do
{ script: 'echo', dependencies: ['another-job'], needs: ['build-job'] }
end
it 'returns error about invalid data' do
expect(entry).not_to be_valid
expect(entry.errors).to include 'job dependencies the another-job should be part of needs'
end
end
end
end
end
......
......@@ -49,37 +49,44 @@ describe Gitlab::Ci::Config::Normalizer do
end
end
context 'when jobs depend on parallelized jobs' do
let(:config) { { job_name => job_config, other_job: { script: 'echo 1', dependencies: [job_name.to_s] } } }
it 'parallelizes dependencies' do
job_names = ["rspec 1/5", "rspec 2/5", "rspec 3/5", "rspec 4/5", "rspec 5/5"]
expect(subject[:other_job][:dependencies]).to include(*job_names)
%i[dependencies needs].each do |context|
context "when job has #{context} on parallelized jobs" do
let(:config) do
{
job_name => job_config,
other_job: { script: 'echo 1', context => [job_name.to_s] }
}
end
it "parallelizes #{context}" do
job_names = ["rspec 1/5", "rspec 2/5", "rspec 3/5", "rspec 4/5", "rspec 5/5"]
expect(subject[:other_job][context]).to include(*job_names)
end
it "does not include original job name in #{context}" do
expect(subject[:other_job][context]).not_to include(job_name)
end
end
it 'does not include original job name in dependencies' do
expect(subject[:other_job][:dependencies]).not_to include(job_name)
end
end
context "when there are #{context} which are both parallelized and not" do
let(:config) do
{
job_name => job_config,
other_job: { script: 'echo 1' },
final_job: { script: 'echo 1', context => [job_name.to_s, "other_job"] }
}
end
context 'when there are dependencies which are both parallelized and not' do
let(:config) do
{
job_name => job_config,
other_job: { script: 'echo 1' },
final_job: { script: 'echo 1', dependencies: [job_name.to_s, "other_job"] }
}
end
it 'parallelizes dependencies' do
job_names = ["rspec 1/5", "rspec 2/5", "rspec 3/5", "rspec 4/5", "rspec 5/5"]
it "parallelizes #{context}" do
job_names = ["rspec 1/5", "rspec 2/5", "rspec 3/5", "rspec 4/5", "rspec 5/5"]
expect(subject[:final_job][:dependencies]).to include(*job_names)
end
expect(subject[:final_job][context]).to include(*job_names)
end
it 'includes the regular job in dependencies' do
expect(subject[:final_job][:dependencies]).to include('other_job')
it "includes the regular job in #{context}" do
expect(subject[:final_job][context]).to include('other_job')
end
end
end
end
......
......@@ -1112,6 +1112,86 @@ module Gitlab
end
end
describe "Needs" do
let(:needs) { }
let(:dependencies) { }
let(:config) do
{
build1: { stage: 'build', script: 'test' },
build2: { stage: 'build', script: 'test' },
test1: { stage: 'test', script: 'test', needs: needs, dependencies: dependencies },
test2: { stage: 'test', script: 'test' },
deploy: { stage: 'test', script: 'test' }
}
end
subject { Gitlab::Ci::YamlProcessor.new(YAML.dump(config)) }
context 'no needs' do
it { expect { subject }.not_to raise_error }
end
context 'needs to builds' do
let(:needs) { %w(build1 build2) }
it "does create jobs with valid specification" do
expect(subject.builds.size).to eq(5)
expect(subject.builds[0]).to eq(
stage: "build",
stage_idx: 0,
name: "build1",
options: {
script: ["test"]
},
when: "on_success",
allow_failure: false,
yaml_variables: []
)
expect(subject.builds[2]).to eq(
stage: "test",
stage_idx: 1,
name: "test1",
options: {
script: ["test"]
},
needs_attributes: [
{ name: "build1" },
{ name: "build2" }
],
when: "on_success",
allow_failure: false,
yaml_variables: []
)