Skip to content
Snippets Groups Projects
Verified Commit 9b32aa4b authored by Adam Hegyi's avatar Adam Hegyi Committed by GitLab
Browse files

Merge branch '441485-worker-for-ingesting-cs-data' into 'master'

Worker for ingesting code suggestion events

See merge request gitlab-org/gitlab!148401



Merged-by: Adam Hegyi's avatarAdam Hegyi <ahegyi@gitlab.com>
Approved-by: charlie ablett's avatarcharlie ablett <cablett@gitlab.com>
Reviewed-by: Pavel Shutsin's avatarPavel Shutsin <pshutsin@gitlab.com>
parents 0cf1c9f0 705eec23
No related branches found
No related tags found
No related merge requests found
......@@ -915,6 +915,9 @@
Settings.cron_jobs['click_house_rebuild_materialized_view_cron_worker'] ||= {}
Settings.cron_jobs['click_house_rebuild_materialized_view_cron_worker']['cron'] ||= "*/10 * * * *"
Settings.cron_jobs['click_house_rebuild_materialized_view_cron_worker']['job_class'] = 'ClickHouse::RebuildMaterializedViewCronWorker'
Settings.cron_jobs['click_house_code_suggestion_events_cron_worker'] ||= {}
Settings.cron_jobs['click_house_code_suggestion_events_cron_worker']['cron'] ||= "*/5 * * * *"
Settings.cron_jobs['click_house_code_suggestion_events_cron_worker']['job_class'] = 'ClickHouse::CodeSuggestionEventsCronWorker'
Settings.cron_jobs['vertex_ai_refresh_access_token_worker'] ||= {}
Settings.cron_jobs['vertex_ai_refresh_access_token_worker']['cron'] ||= '*/50 * * * *'
Settings.cron_jobs['vertex_ai_refresh_access_token_worker']['job_class'] = 'Llm::VertexAiAccessTokenRefreshWorker'
......
......@@ -120,6 +120,15 @@
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:click_house_code_suggestion_events_cron
:worker_name: ClickHouse::CodeSuggestionEventsCronWorker
:feature_category: :value_stream_management
:has_external_dependencies: true
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:click_house_event_authors_consistency_cron
:worker_name: ClickHouse::EventAuthorsConsistencyCronWorker
:feature_category: :value_stream_management
......
# frozen_string_literal: true
module ClickHouse
class CodeSuggestionEventsCronWorker
include ApplicationWorker
include ClickHouseWorker
idempotent!
queue_namespace :cronjob
data_consistency :delayed
worker_has_external_dependencies! # the worker interacts with a ClickHouse database
feature_category :value_stream_management
MAX_RUNTIME = 200.seconds
BATCH_SIZE = 1000
CSV_MAPPING = {
user_id: :user_id,
timestamp: :timestamp,
event: :event
}.freeze
INSERT_QUERY = <<~SQL.squish
INSERT INTO code_suggestion_usages (user_id, timestamp, event)
SETTINGS async_insert=1, wait_for_async_insert=1 FORMAT CSV
SQL
def perform
return unless enabled?
connection.ping # ensure CH is available
status, inserted_rows = loop_with_runtime_limit(MAX_RUNTIME) do
insert_rows(build_rows)
end
log_extra_metadata_on_done(:result, {
status: status,
inserted_rows: inserted_rows
})
end
private
def loop_with_runtime_limit(limit)
status = :processed
total_inserted_rows = 0
runtime_limiter = Gitlab::Metrics::RuntimeLimiter.new(limit)
loop do
if runtime_limiter.over_time?
status = :over_time
break
end
inserted_rows = yield
total_inserted_rows += inserted_rows
break if inserted_rows == 0
end
[status, total_inserted_rows]
end
def enabled?
Gitlab::ClickHouse.globally_enabled_for_analytics? &&
Feature.enabled?(:code_suggestion_events_in_click_house)
end
def build_rows
# Using LPOP which is not crash-safe. There is a small chance for data loss
# if ClickHouse is down or the worker crashes before the INSERT.
Gitlab::Redis::SharedState.with do |redis|
Array.wrap(redis.lpop(ClickHouse::WriteBuffer::BUFFER_KEY, BATCH_SIZE)).filter_map do |hash|
build_row(Gitlab::Json.parse(hash, symbolize_names: true))
end
end
end
def build_row(hash)
return unless CSV_MAPPING.keys.all? { |key| hash[key] }
hash[:timestamp] = DateTime.parse(hash[:timestamp]).to_f
hash
end
def insert_rows(rows)
CsvBuilder::Gzip.new(rows, CSV_MAPPING).render do |tempfile, rows_written|
if rows_written == 0
0
else
connection.insert_csv(INSERT_QUERY, File.open(tempfile.path))
rows.size
end
end
end
def connection
@connection ||= ClickHouse::Connection.new(:main)
end
end
end
---
name: code_suggestion_events_in_click_house
feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/441485
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/148401
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/454093
milestone: '16.11'
group: group::optimize
type: beta
default_enabled: false
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ClickHouse::CodeSuggestionEventsCronWorker, feature_category: :value_stream_management do
let(:job) { described_class.new }
context 'when ClickHouse is disabled for analytics' do
before do
stub_application_setting(use_clickhouse_for_analytics: false)
end
it 'does nothing' do
expect(Gitlab::Metrics::RuntimeLimiter).not_to receive(:new)
job.perform
end
end
context 'when code_suggestion_events_in_click_house feature flag is disabled' do
before do
stub_application_setting(use_clickhouse_for_analytics: true)
stub_feature_flags(code_suggestion_events_in_click_house: false)
end
it 'does nothing' do
expect(Gitlab::Metrics::RuntimeLimiter).not_to receive(:new)
job.perform
end
end
context 'when ClickHouse is enabled', :click_house, :clean_gitlab_redis_shared_state do
let(:connection) { ClickHouse::Connection.new(:main) }
subject(:inserted_records) { connection.select('SELECT * FROM code_suggestion_usages FINAL ORDER BY user_id ASC') }
before do
stub_application_setting(use_clickhouse_for_analytics: true)
end
it 'does not insert anything' do
job.perform
expect(inserted_records).to be_empty
end
context 'when data is present' do
before do
Gitlab::Tracking::AiTracking.track_event('code_suggestions_requested', {}) # garbage
Gitlab::Tracking::AiTracking.track_event('code_suggestions_requested', { user_id: 1 })
Gitlab::Tracking::AiTracking.track_event('code_suggestions_requested', {}) # garbage
Gitlab::Tracking::AiTracking.track_event('code_suggestions_requested', { user_id: 2 })
Gitlab::Tracking::AiTracking.track_event('code_suggestions_requested', { user_id: 3 })
end
it 'inserts all rows' do
status = job.perform
expect(status).to eq({ status: :processed, inserted_rows: 3 })
event = Gitlab::Tracking::AiTracking::EVENTS['code_suggestions_requested']
expect(inserted_records).to match([
hash_including('user_id' => 1, 'event' => event),
hash_including('user_id' => 2, 'event' => event),
hash_including('user_id' => 3, 'event' => event)
])
end
context 'when looping twice' do
it 'inserts all rows' do
stub_const("#{described_class.name}::BATCH_SIZE", 2)
status = job.perform
expect(status).to eq({ status: :processed, inserted_rows: 3 })
end
end
context 'when pinging ClickHouse fails' do
it 'does not take anything from redis' do
allow_next_instance_of(ClickHouse::Connection) do |connection|
expect(connection).to receive(:ping).and_raise(Errno::ECONNREFUSED)
end
expect { job.perform }.to raise_error(Errno::ECONNREFUSED)
Gitlab::Redis::SharedState.with do |redis|
buffer = redis.rpop(ClickHouse::WriteBuffer::BUFFER_KEY, 100)
expect(buffer.size).to eq(5)
end
end
end
context 'when time limit is up' do
it 'returns over_time status' do
stub_const("#{described_class.name}::BATCH_SIZE", 1)
allow_next_instance_of(Gitlab::Metrics::RuntimeLimiter) do |limiter|
allow(limiter).to receive(:over_time?).and_return(false, false, true)
end
status = job.perform
expect(status).to eq({ status: :over_time, inserted_rows: 2 })
expect(inserted_records).to match([
hash_including('user_id' => 2),
hash_including('user_id' => 3)
])
end
end
end
end
end
......@@ -15,6 +15,14 @@ def execute(query)
ClickHouse::Client.execute(query, database, configuration)
end
def insert_csv(query, file)
ClickHouse::Client.insert_csv(query, file, database, configuration)
end
def ping
execute('SELECT 1')
end
def database_name
configuration.databases[database]&.database
end
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment