Add OAuth access token archival worker for data retention

What does this MR do and why?

Add OAuth access token archival worker for data retention

Introduce a new cron worker to handle periodic archival and cleanup of revoked OAuth access tokens. This worker will run on a scheduled basis to move old revoked tokens to the archive table and delete them from the original table.

  • Add OauthAccessTokenArchiveWorker for scheduled cleanup
  • Configure worker in all_queues.yml for Sidekiq processing
  • Add ops feature flag to control worker execution
  • Update cron settings in initializers for scheduling
  • Include comprehensive worker specs for testing

This is part 2 of the OAuth token cleanup implementation, focusing on the automated background processing infrastructure.

Related: https://gitlab.com/gitlab-org/gitlab/-/issues/521855
Changelog: performance

References

Most relevant

Main issue: https://gitlab.com/gitlab-org/gitlab/-/issues/521855
Feature flag rollout issue: #571771
New index: !207474 (merged)

Additional resources

Plan: #555382
Part 1: !197692 (merged)
Details: https://gitlab.com/gitlab-org/gitlab/-/issues/521855#note_2703882221
Previous results from BBM: https://gitlab.com/gitlab-org/gitlab/-/issues/521855#note_2768105355
Query plan: !202767 (comment 2774688387)

Queries

Batch Finding Queries

-- Initial batch query
SELECT "oauth_access_tokens"."id" 
FROM "oauth_access_tokens" 
WHERE (id > 0)
ORDER BY "oauth_access_tokens"."id" ASC 
LIMIT 1;

-- Finding next batch boundary 
SELECT "oauth_access_tokens"."id" 
FROM "oauth_access_tokens" 
WHERE (id > 0)"
  AND "oauth_access_tokens"."id" >= 1
ORDER BY "oauth_access_tokens"."id" ASC 
LIMIT 1 OFFSET 10000;

-- 3. Sub-batch Start Check
SELECT "oauth_access_tokens"."id" 
FROM "oauth_access_tokens" 
WHERE (id > 0)
  AND "oauth_access_tokens"."id" >= 1 
  AND "oauth_access_tokens"."id" < 59183 
ORDER BY "oauth_access_tokens"."id" ASC 
LIMIT 1;

-- Sub-batch queries (OFFSET 1000 for SUB_BATCH_SIZE)
SELECT "oauth_access_tokens"."id" 
FROM "oauth_access_tokens" 
WHERE (id > 0)
  AND "oauth_access_tokens"."id" >= 1 
  AND "oauth_access_tokens"."id" < 59183 
  AND "oauth_access_tokens"."id" >= 1 
ORDER BY "oauth_access_tokens"."id" ASC 
LIMIT 1 OFFSET 1000;

Archive Operation CTE

WITH deleted AS (
  DELETE FROM oauth_access_tokens
  WHERE id IN (
    SELECT "oauth_access_tokens"."id" 
    FROM "oauth_access_tokens" 
    WHERE (id > 0)
      AND "oauth_access_tokens"."id" >= 1 
      AND "oauth_access_tokens"."id" < 59183 
      AND "oauth_access_tokens"."id" >= 1 
      AND "oauth_access_tokens"."id" < 5907
      AND "oauth_access_tokens"."revoked_at" <= '2025-08-24 00:00:00'
      LIMIT 1000
  )
  RETURNING *
)
INSERT INTO oauth_access_token_archived_records
  (id, resource_owner_id, application_id, token, refresh_token,
   expires_in, revoked_at, created_at, scopes, organization_id, archived_at)
SELECT
  id, resource_owner_id, application_id, token, refresh_token,
  expires_in, revoked_at, created_at, scopes, organization_id,
  CURRENT_TIMESTAMP as archived_at
FROM deleted;

Reversibility and Recovery

  • The operations performed by the worker is not automatically reversible since it performs DELETE operations.
    • Migration can be stopped at any point with only processed batches affected
    • The CTE approach ensures atomicity per batch: tokens are only deleted if successfully archived.
    • Unprocessed tokens remain intact

Data recovery options are the following.

Full Recovery (from archive table)

INSERT INTO oauth_access_tokens 
(resource_owner_id, application_id, token, refresh_token,
   expires_in, revoked_at, created_at, scopes, organization_id)
   SELECT id, resource_owner_id, application_id, token, refresh_token,
  expires_in, revoked_at, created_at, scopes, organization_id
   FROM oauth_access_token_archived_records
   WHERE archived_at >= [migration_start_timestamp];

Selective Recovery (specific tokens)

INSERT INTO oauth_access_tokens 
  (resource_owner_id, application_id, token, refresh_token,
   expires_in, revoked_at, created_at, scopes, organization_id)
SELECT 
  id, resource_owner_id, application_id, token, refresh_token,
  expires_in, revoked_at, created_at, scopes, organization_id
FROM oauth_access_token_archived_records
WHERE id BETWEEN 253900000 AND 253901000
ORDER BY id;

Important notes

The worker will perform actions that we have already successfully tested with a BBM.
Results are available here.

How to set up and validate locally

  • Create some oauth access tokens with revoked_at from 2 months ago
  • Enqueue and run the Authn::DataRetention::OauthAccessTokenArchiveWorker and see the tokens are correctly removed

Is possible to seed some tokens with the current rake task:

bundle exec rake 'db:seed:oauth_tokens[1]'
# lib/tasks/seed_oauth_tokens.rake
namespace :db do
  namespace :seed do
    desc "Seed millions of OAuth access tokens for testing archival"
    task :oauth_tokens, [:count_millions] => :environment do |_task, args|
      count_millions = (args[:count_millions] || 200).to_i
      total_records = count_millions * 1_000_000
      # total_records = count_millions * 20_000

      puts "Seeding #{count_millions} million OAuth tokens (#{total_records} records)"
      puts "This will take a while..."

      # Get sample data
      organization_ids = Organizations::Organization.limit(10).pluck(:id)
      application_ids = Authn::OauthApplication.limit(10).pluck(:id)
      user_ids = User.limit(100).pluck(:id)

      batch_size = 10_000
      iterations = total_records / batch_size

      start_time = Time.current

      iterations.times do |i|
        values = []

        batch_size.times do
          created_at = rand(2.years.ago..Time.current)
          is_revoked = rand < 0.87  # 87% revoked to match your production

          values << {
            organization_id: organization_ids.sample,
            application_id: application_ids.sample,
            resource_owner_id: user_ids.sample,
            token: SecureRandom.hex(32),
            refresh_token: rand < 0.8 ? SecureRandom.hex(32) : nil,
            expires_in: [3600, 7200, 86400].sample,
            scopes: ['read', 'write', 'read write', 'api'].sample,
            created_at: created_at,
            revoked_at: is_revoked ? created_at + rand(1.hour..6.months) : nil
          }
        end

        # Use raw SQL for maximum speed
        sql = <<~SQL
          INSERT INTO oauth_access_tokens
            (organization_id, application_id, resource_owner_id, token,
             refresh_token, expires_in, scopes, created_at, revoked_at)
          VALUES #{values.map { |v|
            "(#{v[:organization_id]}, #{v[:application_id]}, #{v[:resource_owner_id] || 'NULL'},
             '#{v[:token]}', #{v[:refresh_token] ? "'#{v[:refresh_token]}'" : 'NULL'},
             #{v[:expires_in]}, '#{v[:scopes]}', '#{v[:created_at]}',
             #{v[:revoked_at] ? "'#{v[:revoked_at]}'" : 'NULL'})"
          }.join(',')}
        SQL

        ActiveRecord::Base.connection.execute(sql)

        if (i + 1) % 100 == 0
          progress = ((i + 1) * batch_size / total_records.to_f * 100).round(2)
          elapsed = Time.current - start_time
          rate = (i + 1) * batch_size / elapsed
          eta = (total_records - (i + 1) * batch_size) / rate

          puts "Progress: #{(i + 1) * batch_size}/#{total_records} (#{progress}%) - " \
               "Rate: #{rate.round}/sec - ETA: #{(eta / 60).round} minutes"
        end
      end
      
      puts "\nSeeding complete!"
      puts "Total time: #{((Time.current - start_time) / 60).round(2)} minutes"
      
      # Show statistics
      stats = ActiveRecord::Base.connection.select_one(<<~SQL)
        SELECT
          COUNT(*) as total,
          COUNT(*) FILTER (WHERE revoked_at IS NOT NULL) as revoked,
          COUNT(*) FILTER (WHERE revoked_at IS NULL) as active,
          COUNT(*) FILTER (WHERE revoked_at < '#{1.month.ago}') as archivable
        FROM oauth_access_tokens
      SQL
      
      puts "\nDatabase statistics:"
      puts "Total tokens: #{stats['total']}"
      puts "Revoked: #{stats['revoked']} (#{(stats['revoked'].to_f / stats['total'] * 100).round(1)}%)"
      puts "Active: #{stats['active']} (#{(stats['active'].to_f / stats['total'] * 100).round(1)}%)"
      puts "Ready for archival: #{stats['archivable']}"
    end
  end
end

MR acceptance checklist

Evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.

Edited by Daniele Bracciani

Merge request reports

Loading