Skip to content

Add GCP bucket connector for fetching package metadata

Problem to solve

The external license database will make license data available to instances via a public gcp bucket. In order import this data a way to connect to the bucket and retrieve its contents is necessary.

Proposal

Create a gcp bucket connector which can open a gcp bucket connection and stream data to the caller. fog/fog-google is already in use in the code base. It's probably the best candidate.

Because there's a large amount of data in the bucket, a way to get to the last object read is necessary so as not to re-process already processed data.

Implementation plan

1. connector

  • create PackageMetadata::Connector implementing the interface used in the sync service
  • add PackageMetadata::GcpBucketConnector
    • initialized with base_uri, version_format, and purl_type
  • implement PackageMetadata::Connector#fetch_objects_after(sequence_id, chunk_id)
    • list sequence objects in bucket (these are lexicographically ordered)
    • skip any objects less than sequence and chunk until next "unread" object is found
    • open stream to the object, pass limited number of lines at a time to csv reader
    • yield csv rows to caller
  • note: if sequence is not found we have to start from the first sequence, so depending on the number of objects in the bucket a search faster than O(n) might be needed

Pseudocode:

module PackageMetadata
  class GcpBucketConnector < BaseConnector
    def initialize(base_uri:, version_format:, purl_type:)
      @base_uri = base_uri
      @version_format = version_format
      @purl_type = purl_type
    end

    def data_after(seq_id, chunk_id, &blk)
      objects = objects_after(seq_id, chunk_id)
      objects.each do |obj|

        seq, chunk = split(obj.name)
        connection.get_object(bucket_name, obj.name) do |buffer|
            csv = CSV.new(buffer)
            csv.each do |seq, chunk, row|
              yield row
            end
        end

      end
    end
    
    private

    def objects_after(seq_id, chunk_id, &blk)
      all_objects = bucket.list_objects
      objects_to_process = []
      
      pos = 0
      all_objects.each_with_idx do |obj, idx|
        seq, chunk = split(obj.name)

        if seq == seq_id
          pos = idx
        end

        if seq == seq_id && chunk == chunk_id
          pos = idx
        end
      end

      all_objects[pos+1..-1]
    end

    def connection
      @connection ||= Fog::Storage::Google.new(@base_uri, @version_format, @purl_type)
    end
  end
end

2. gcp bucket settings

Because this is a public bucket, configuration can happen within code/instance settings rather than in the database.

3. network connection

Because of potential size of objects, the connection to the bucket should be gzip encoded rather than plain text.

4. data stream

Because of the amount of data being processed, streaming the csv data in configurable batches is important so as not to create memory pressure on the instance.

Edited by Igor Frenkel