Support for Parent-Child streams and Metrics

This update will provide a means of specifying a parent_stream_class on an individual Stream class. If provided, that child stream will then be seeded with each parent record as its partition. This allows deeply nested stream scenarios with very little orchestration effort on the developers' side.

Implementation details:

  1. The SDK automatically builds and invokes a DAG based upon parent-child relationships described by Stream.parent_stream_class relationships.
  2. Child streams cannot be invoked directly; instead they will be invoked from their parents.
  3. Two methods are available to optionally override the partition context for fine-grained control:
    1. Stream.get_child_context(record: dict) -> dict allows developers to override the context sent on to the child stream.
    2. Stream.get_records() now allows a (record, child_context) tuple to be returned from the generator, instead of only allowing the return of a single record. This method is useful for sending large binary objects or custom handles which are needed by the child class but should not be included in the record itself.
  4. Streams will automatically support iterating without emitting records - as needed for parent streams which are not selected themselves but they have selected children (or grandchildren/descendants).
  5. By default, the streams will use partition handling to track an individual state for each child stream and parent item combination. (I.e. 2 child streams and 100 parent items = 200 distinct bookmarks.)
    1. This behavior can be overridden using the new stream property Stream.state_partitioning_keys -> Optional[List[str]]. Overriding this property allows you to use a subset of keys when creating state partitions.
      • For example, the granularity for GitHub issue comments is repo_id : issue_id, meaning one partition for every distinct issue in every repo. We can override this behavior (allocating 10-100x fewer bookmarks) by setting state_partitioning_keys to simply ['repo_id']. In this way, we only require one bookmark per repo, versus one bookmark for every single issue.
      • To only track one bookmark for all substreams, you can set state_partitioning_keys to an empty list([]).
  6. Since the context sent to methods like get_records() and get_url_params() may be a parent context, which also might be different from how stream bookmarks are partitioned, the partition argument has been renamed in these methods to simply context. The renamed context param be any of the following:
    1. For normal streams, None, assuming no partitioning is defined and no parent-child relationships exist. (Unchanged.)
    2. A single partition, if Stream.partitions is defined. (Unchanged.)
    3. For child streams, the output from the parents get_child_context() method, or equivalently, the second item in a tuple result from the parent's get_records() implementation.
    4. For child streams, the parent record itself - if no other behavior has been defined.
      • Note: if the parent does not define get_child_context(), then the child must override state_partition_keys. This is to prevent the entire parent record from being used as the state partition keys.

Support for Singer Metrics:

  • This MR also adds support for Singer Metrics as first logged in #91 (closed).
  • Specifically http_request_duration and record_count are now supported and enabled by default.
  • More info on this implementation is in: /docs/implementation/metrics.md

Still to-be-completed:

  1. Define the replication keys relationship between child and parent.
    • Proposed ignore_parent_replication_key - if set to true on a child, we assume that client's replication key increments are not tracked by the parent, meaning a pessimistic scan of all parent items is required.
  2. Progress markers for partitioned state entries are not always cleared at the beginning or end of a sync operation.
  3. Updated docs:

Closes #97 (closed)

How to test during the preview

The easiest way to test is to run the following from your own Tap repo:

  1. Run poetry remove singer-sdk within your existing sdk-based repo or simply comment-out the singer-sdk line in your repo's pyproject.toml.
  2. According to your preference:
    • Depend on this branch: poetry add git+https://gitlab.com/meltano/singer-sdk.git#97-hierarchical-streams
    • Clone and make a local dev dependency: poetry add --dev ../singer-sdk/ (assumes singer-sdk is a sibling directory).
Edited by AJ Steers

Merge request reports

Loading