Add OAuth access token archival worker for data retention
What does this MR do and why?
Add OAuth access token archival worker for data retention
Introduce a new cron worker to handle periodic archival and cleanup of revoked OAuth access tokens. This worker will run on a scheduled basis to move old revoked tokens to the archive table and delete them from the original table.
- Add OauthAccessTokenArchiveWorker for scheduled cleanup
- Configure worker in all_queues.yml for Sidekiq processing
- Add ops feature flag to control worker execution
- Update cron settings in initializers for scheduling
- Include comprehensive worker specs for testing
This is part 2 of the OAuth token cleanup implementation, focusing on the automated background processing infrastructure.
Related: https://gitlab.com/gitlab-org/gitlab/-/issues/521855
Changelog: performance
References
Most relevant
Main issue: https://gitlab.com/gitlab-org/gitlab/-/issues/521855
Feature flag rollout issue: #571771
New index: !207474 (merged)
Additional resources
Plan: #555382
Part 1: !197692 (merged)
Details: https://gitlab.com/gitlab-org/gitlab/-/issues/521855#note_2703882221
Previous results from BBM: https://gitlab.com/gitlab-org/gitlab/-/issues/521855#note_2768105355
Query plan: !202767 (comment 2774688387)
Queries
Batch Finding Queries
-- Initial batch query
SELECT "oauth_access_tokens"."id"
FROM "oauth_access_tokens"
WHERE (id > 0)
ORDER BY "oauth_access_tokens"."id" ASC
LIMIT 1;
-- Finding next batch boundary
SELECT "oauth_access_tokens"."id"
FROM "oauth_access_tokens"
WHERE (id > 0)"
AND "oauth_access_tokens"."id" >= 1
ORDER BY "oauth_access_tokens"."id" ASC
LIMIT 1 OFFSET 10000;
-- 3. Sub-batch Start Check
SELECT "oauth_access_tokens"."id"
FROM "oauth_access_tokens"
WHERE (id > 0)
AND "oauth_access_tokens"."id" >= 1
AND "oauth_access_tokens"."id" < 59183
ORDER BY "oauth_access_tokens"."id" ASC
LIMIT 1;
-- Sub-batch queries (OFFSET 1000 for SUB_BATCH_SIZE)
SELECT "oauth_access_tokens"."id"
FROM "oauth_access_tokens"
WHERE (id > 0)
AND "oauth_access_tokens"."id" >= 1
AND "oauth_access_tokens"."id" < 59183
AND "oauth_access_tokens"."id" >= 1
ORDER BY "oauth_access_tokens"."id" ASC
LIMIT 1 OFFSET 1000;
Archive Operation CTE
WITH deleted AS (
DELETE FROM oauth_access_tokens
WHERE id IN (
SELECT "oauth_access_tokens"."id"
FROM "oauth_access_tokens"
WHERE (id > 0)
AND "oauth_access_tokens"."id" >= 1
AND "oauth_access_tokens"."id" < 59183
AND "oauth_access_tokens"."id" >= 1
AND "oauth_access_tokens"."id" < 5907
AND "oauth_access_tokens"."revoked_at" <= '2025-08-24 00:00:00'
LIMIT 1000
)
RETURNING *
)
INSERT INTO oauth_access_token_archived_records
(id, resource_owner_id, application_id, token, refresh_token,
expires_in, revoked_at, created_at, scopes, organization_id, archived_at)
SELECT
id, resource_owner_id, application_id, token, refresh_token,
expires_in, revoked_at, created_at, scopes, organization_id,
CURRENT_TIMESTAMP as archived_at
FROM deleted;
Reversibility and Recovery
- The operations performed by the worker is not automatically reversible since it performs DELETE operations.
- Migration can be stopped at any point with only processed batches affected
- The CTE approach ensures atomicity per batch: tokens are only deleted if successfully archived.
- Unprocessed tokens remain intact
Data recovery options are the following.
Full Recovery (from archive table)
INSERT INTO oauth_access_tokens
(resource_owner_id, application_id, token, refresh_token,
expires_in, revoked_at, created_at, scopes, organization_id)
SELECT id, resource_owner_id, application_id, token, refresh_token,
expires_in, revoked_at, created_at, scopes, organization_id
FROM oauth_access_token_archived_records
WHERE archived_at >= [migration_start_timestamp];
Selective Recovery (specific tokens)
INSERT INTO oauth_access_tokens
(resource_owner_id, application_id, token, refresh_token,
expires_in, revoked_at, created_at, scopes, organization_id)
SELECT
id, resource_owner_id, application_id, token, refresh_token,
expires_in, revoked_at, created_at, scopes, organization_id
FROM oauth_access_token_archived_records
WHERE id BETWEEN 253900000 AND 253901000
ORDER BY id;
Important notes
The worker will perform actions that we have already successfully tested with a BBM.
Results are available here.
How to set up and validate locally
- Create some oauth access tokens with
revoked_atfrom 2 months ago - Enqueue and run the
Authn::DataRetention::OauthAccessTokenArchiveWorkerand see the tokens are correctly removed
Is possible to seed some tokens with the current rake task:
bundle exec rake 'db:seed:oauth_tokens[1]'
# lib/tasks/seed_oauth_tokens.rake
namespace :db do
namespace :seed do
desc "Seed millions of OAuth access tokens for testing archival"
task :oauth_tokens, [:count_millions] => :environment do |_task, args|
count_millions = (args[:count_millions] || 200).to_i
total_records = count_millions * 1_000_000
# total_records = count_millions * 20_000
puts "Seeding #{count_millions} million OAuth tokens (#{total_records} records)"
puts "This will take a while..."
# Get sample data
organization_ids = Organizations::Organization.limit(10).pluck(:id)
application_ids = Authn::OauthApplication.limit(10).pluck(:id)
user_ids = User.limit(100).pluck(:id)
batch_size = 10_000
iterations = total_records / batch_size
start_time = Time.current
iterations.times do |i|
values = []
batch_size.times do
created_at = rand(2.years.ago..Time.current)
is_revoked = rand < 0.87 # 87% revoked to match your production
values << {
organization_id: organization_ids.sample,
application_id: application_ids.sample,
resource_owner_id: user_ids.sample,
token: SecureRandom.hex(32),
refresh_token: rand < 0.8 ? SecureRandom.hex(32) : nil,
expires_in: [3600, 7200, 86400].sample,
scopes: ['read', 'write', 'read write', 'api'].sample,
created_at: created_at,
revoked_at: is_revoked ? created_at + rand(1.hour..6.months) : nil
}
end
# Use raw SQL for maximum speed
sql = <<~SQL
INSERT INTO oauth_access_tokens
(organization_id, application_id, resource_owner_id, token,
refresh_token, expires_in, scopes, created_at, revoked_at)
VALUES #{values.map { |v|
"(#{v[:organization_id]}, #{v[:application_id]}, #{v[:resource_owner_id] || 'NULL'},
'#{v[:token]}', #{v[:refresh_token] ? "'#{v[:refresh_token]}'" : 'NULL'},
#{v[:expires_in]}, '#{v[:scopes]}', '#{v[:created_at]}',
#{v[:revoked_at] ? "'#{v[:revoked_at]}'" : 'NULL'})"
}.join(',')}
SQL
ActiveRecord::Base.connection.execute(sql)
if (i + 1) % 100 == 0
progress = ((i + 1) * batch_size / total_records.to_f * 100).round(2)
elapsed = Time.current - start_time
rate = (i + 1) * batch_size / elapsed
eta = (total_records - (i + 1) * batch_size) / rate
puts "Progress: #{(i + 1) * batch_size}/#{total_records} (#{progress}%) - " \
"Rate: #{rate.round}/sec - ETA: #{(eta / 60).round} minutes"
end
end
puts "\nSeeding complete!"
puts "Total time: #{((Time.current - start_time) / 60).round(2)} minutes"
# Show statistics
stats = ActiveRecord::Base.connection.select_one(<<~SQL)
SELECT
COUNT(*) as total,
COUNT(*) FILTER (WHERE revoked_at IS NOT NULL) as revoked,
COUNT(*) FILTER (WHERE revoked_at IS NULL) as active,
COUNT(*) FILTER (WHERE revoked_at < '#{1.month.ago}') as archivable
FROM oauth_access_tokens
SQL
puts "\nDatabase statistics:"
puts "Total tokens: #{stats['total']}"
puts "Revoked: #{stats['revoked']} (#{(stats['revoked'].to_f / stats['total'] * 100).round(1)}%)"
puts "Active: #{stats['active']} (#{(stats['active'].to_f / stats['total'] * 100).round(1)}%)"
puts "Ready for archival: #{stats['archivable']}"
end
end
end
MR acceptance checklist
Evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.