Add worker to populate import source user missing details

What does this MR do and why?

Introduce a new Sidekiq Worker in Direct Transfer to populate the source_name and source_username for the Import::SourceUser records created without them. Additionally, the worker updates the associated placeholder user's name and username.

The new worker retrieves user details in batches of 100 users via the GraphQL API. After fetching the details, it updates the Import::SourceUser record and the placeholder user using the same creation patterns.

In summary, the SourceUsersAttributesWorker is scheduled to run every minute and will only proceed with a request if the number of source users is 100 or more. Once the BulkImport status changes to completed, the worker will be triggered one last time to update any remaining records. During this final run, the number of remaining records should be small, probably less than 100.

For context, for contributions associated with non-members, Import::SourceUser records are created without name and username because the information aren't present in the NDJSON files that are imported. For contributions associated with members, that isn't a problem, because Import::SourceUser records for members are created in the MembersPipeline, which has access to the user name and username.

The worker does not utilize the user_contributions relation introduced in #454522 (closed) because the new relation is still behind a feature flag, making it unavailable in source instances. I created #480245 to roll out the relation, which will later allow us to create a new pipeline that uses the new relation.

Related to: #443557 (closed)

MR acceptance checklist

Please evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.

Screenshots or screen recordings

Screenshots are required for UI changes, and strongly recommended for all other merge requests.

Before After

How to set up and validate locally

  1. Enable the feature flags: importer_user_mapping and bulk_import_importer_user_mapping

    Feature.enable(:importer_user_mapping)
    Feature.enable(:bulk_import_importer_user_mapping)
  2. Import a new group using Direct Transfer which contains contributions from non-members

    1. New group,
    2. Import Group
    3. Provide a host user and access token
    4. Select the group to be imported
  3. Wait for the migration to finish

  4. Access the Placeholder Users tab

    1. Go to the top-level namespace page
    2. Access the Members section
    3. Click on the Placeholder tab
  5. Check if all placeholder users have names and usernames resembling the source user.

SQL Plans

Note: The table on production is empty. In Database Lab, the table was populated using the command below, which creates 8000 source users for 4 distinct namespaces, so in total 32000 records

exec

DO $$
DECLARE temprow RECORD;
BEGIN FOR temprow IN
    SELECT id FROM users LIMIT 8000
  LOOP
    INSERT INTO import_source_users (placeholder_user_id,namespace_id,created_at,updated_at,source_username,source_name,source_user_identifier,source_hostname,import_type) VALUES (temprow.id,9970,'2024-08-22 01:05:58.372040','2024-08-22 01:05:58.372040',null,null, cast(temprow.id as varchar), 'github.com', 'github');
    INSERT INTO import_source_users (placeholder_user_id,namespace_id,created_at,updated_at,source_username,source_name,source_user_identifier,source_hostname,import_type) VALUES (temprow.id,2244,'2024-08-22 01:05:58.372040','2024-08-22 01:05:58.372040',null,null, cast(temprow.id as varchar), 'github.com', 'github');
    INSERT INTO import_source_users (placeholder_user_id,namespace_id,created_at,updated_at,source_username,source_name,source_user_identifier,source_hostname,import_type) VALUES (temprow.id,2247,'2024-08-22 01:05:58.372040','2024-08-22 01:05:58.372040',null,null, cast(temprow.id as varchar), 'github.com', 'github');
    INSERT INTO import_source_users (placeholder_user_id,namespace_id,created_at,updated_at,source_username,source_name,source_user_identifier,source_hostname,import_type) VALUES (temprow.id,2252,'2024-08-22 01:05:58.372040','2024-08-22 01:05:58.372040',null,null, cast(temprow.id as varchar), 'github.com', 'github');
  END LOOP;
END; $$

Ruby method

Import::SourceUser.source_users_with_missing_information(namespace: Group.find(9970), source_hostname: 'github.com', import_type: 'github')
  .each_batch(of: 100, order: :desc) do |batch| 
  batch.each do |source_user|
    puts source_user
  end
end
Query plans before index

Raw SQL 1

SELECT
    "import_source_users"."id"
FROM
    "import_source_users"
WHERE
    "import_source_users"."namespace_id" = 9970
    AND "import_source_users"."source_hostname" = 'github.com'
    AND "import_source_users"."import_type" = 'github'
    AND ("import_source_users"."source_name" IS NULL
        OR "import_source_users"."source_username" IS NULL)
ORDER BY
    "import_source_users"."id" DESC
LIMIT 1

Query plan - https://console.postgres.ai/gitlab/gitlab-production-main/sessions/31074/commands/96497

 Limit  (cost=2.77..2.77 rows=1 width=8) (actual time=0.146..0.147 rows=0 loops=1)
   Buffers: shared hit=8
   I/O Timings: read=0.000 write=0.000
   ->  Sort  (cost=2.77..2.77 rows=1 width=8) (actual time=0.145..0.146 rows=0 loops=1)
         Sort Key: import_source_users.id DESC
         Sort Method: quicksort  Memory: 25kB
         Buffers: shared hit=8
         I/O Timings: read=0.000 write=0.000
         ->  Index Scan using index_import_source_users_on_namespace_id_and_status on public.import_source_users  (cost=0.29..2.76 rows=1 width=8) (actual time=0.043..0.043 rows=0 loops=1)
               Index Cond: (import_source_users.namespace_id = 9970)
               Filter: (((import_source_users.source_name IS NULL) OR (import_source_users.source_username IS NULL)) AND (import_source_users.source_hostname = 'github.com'::text) AND (import_source_users.import_type = 'github'::text))
               Rows Removed by Filter: 0
               Buffers: shared hit=5
               I/O Timings: read=0.000 write=0.000

Raw SQL 2

SELECT
    "import_source_users"."id"
FROM
    "import_source_users"
WHERE
    "import_source_users"."namespace_id" = 9970
    AND "import_source_users"."source_hostname" = 'github.com'
    AND "import_source_users"."import_type" = 'gitlab_migration'
    AND ("import_source_users"."source_name" IS NULL
        OR "import_source_users"."source_username" IS NULL)
    AND "import_source_users"."id" <= 8000
ORDER BY
    "import_source_users"."id" DESC
LIMIT 1 OFFSET 100

Query plan - https://postgres.ai/console/gitlab/gitlab-production-main/sessions/31074/commands/96498

 Limit  (cost=2.77..2.78 rows=1 width=8) (actual time=0.059..0.059 rows=0 loops=1)
   Buffers: shared hit=8
   I/O Timings: read=0.000 write=0.000
   ->  Sort  (cost=2.77..2.77 rows=1 width=8) (actual time=0.057..0.058 rows=0 loops=1)
         Sort Key: import_source_users.id DESC
         Sort Method: quicksort  Memory: 25kB
         Buffers: shared hit=8
         I/O Timings: read=0.000 write=0.000
         ->  Index Scan using index_import_source_users_on_namespace_id_and_status on public.import_source_users  (cost=0.29..2.76 rows=1 width=8) (actual time=0.037..0.037 rows=0 loops=1)
               Index Cond: (import_source_users.namespace_id = 9970)
               Filter: (((import_source_users.source_name IS NULL) OR (import_source_users.source_username IS NULL)) AND (import_source_users.id <= 8000) AND (import_source_users.source_hostname = 'github.com'::text) AND (import_source_users.import_type = 'gitlab_migration'::text))
               Rows Removed by Filter: 0
               Buffers: shared hit=5
               I/O Timings: read=0.000 write=0.000
Query plans after index **Raw SQL 1**
SELECT
    "import_source_users"."id"
FROM
    "import_source_users"
WHERE
    "import_source_users"."namespace_id" = 9970
    AND "import_source_users"."source_hostname" = 'github.com'
    AND "import_source_users"."import_type" = 'github'
    AND ("import_source_users"."source_name" IS NULL
        OR "import_source_users"."source_username" IS NULL)
ORDER BY
    "import_source_users"."id" DESC
LIMIT 1

Query plan - https://console.postgres.ai/gitlab/gitlab-production-main/sessions/31201/commands/96907

 Limit  (cost=0.41..3.43 rows=1 width=8) (actual time=0.089..0.090 rows=1 loops=1)
   Buffers: shared hit=7
   I/O Timings: read=0.000 write=0.000
   ->  Index Only Scan Backward using idx_namespace_hostname_import_type_id_source_name_and_username on public.import_source_users  (cost=0.41..3.43 rows=1 width=8) (actual time=0.088..0.088 rows=1 loops=1)
         Index Cond: ((import_source_users.namespace_id = 9970) AND (import_source_users.source_hostname = 'github.com'::text) AND (import_source_users.import_type = 'github'::text))
         Heap Fetches: 1
         Buffers: shared hit=7
         I/O Timings: read=0.000 write=0.000

Raw SQL 2

SELECT
    "import_source_users"."id"
FROM
    "import_source_users"
WHERE
    "import_source_users"."namespace_id" = 9970
    AND "import_source_users"."source_hostname" = 'github.com'
    AND "import_source_users"."import_type" = 'gitlab_migration'
    AND ("import_source_users"."source_name" IS NULL
        OR "import_source_users"."source_username" IS NULL)
    AND "import_source_users"."id" <= 8000
ORDER BY
    "import_source_users"."id" DESC
LIMIT 1 OFFSET 100

Query plan - https://console.postgres.ai/gitlab/gitlab-production-main/sessions/31186/commands/96883

 Limit  (cost=1.94..3.46 rows=1 width=8) (actual time=0.118..0.119 rows=0 loops=1)
   Buffers: shared hit=3 read=3
   I/O Timings: read=0.063 write=0.000
   ->  Index Only Scan Backward using index_namespace_source_hostname_import_type_2 on public.import_source_users  (cost=0.41..1.94 rows=1 width=8) (actual time=0.116..0.117 rows=0 loops=1)
         Index Cond: ((import_source_users.namespace_id = 9970) AND (import_source_users.source_hostname = 'github.com'::text) AND (import_source_users.import_type = 'gitlab_migration'::text) AND (import_source_users.id <= 8000))
         Heap Fetches: 0
         Buffers: shared hit=3 read=3
         I/O Timings: read=0.063 write=0.000
Edited by Rodrigo Tomonari

Merge request reports

Loading