Skip to content

Target Stream Maps

AJ Steers requested to merge 63-target-stream-maps into main

Closes #63 (closed)


This MR adds the stream map capability for Targets.

Why development experience doesn't change

It's worth reiterating that the Tap or Target developer does not need to know or care if stream maps are being used. This is because of where exactly the stream map operations exist in the pipeline.

Here's a crude logical flow diagram:

Tap.sync_all() -> Stream.get_records() -> (map) -> (emit records) -> Target.listen() -> (map) -> Sink.__init__() -> Sink.process_record()

For Taps: The map operation occurs after all other operations are performed by the developer's custom code, immediately before writing the RECORD and SCHEMA messages.

For Targets: The map operation occurs before any Sink methods are called from the developer's custom code. From the Sink's perspective, the only record they receive is the transformed record and the only schema they will receive is the transformed schema. Same also for the stream name or alias. Only the transformed version will be seen by the sink.

Single Code Path, aka Mapping is always on

For simplicity, all taps and targets have a mapper class now and a StreamMap object per each stream, regardless of whether they actually have custom stream maps specified by the user. If no map is applied, we have a default SameRecordTransform which just passes along the record. This means we don't have to care at runtime whether custom maps are configured; if not configured it just uses "the boring mapper", which is to do nothing and pass along as-is.

Regarding State

One unexpected outcome of applying stream maps, is that (unlike RECORD and SCHEMA messages) the STATE messages will not be modified in any way by the stream map operations. Rather, the STATE messages will remain true to the developer's original stream names, original schema definitions, and original record values. This respects the spec guidelines that the tap "owns" whatever is written into STATE and the target's only role is to pass the state for serialization (via its STDOUT pipe) after all preceding records are handled.

Because of the logical ordering of events, this does not create any problems for taps, but it does mean that it is all-the-more important that target should not try to make any assumptions about STATE messages or parse/modify state directly.

Regarding modification of primary keys

The following rules apply when modifying one of the stream's primary key properties:

  • Dropping a primary key property will result in an error, unless the __key_properties__ operation is applied by the user to override or nullify the list of default keys.
  • Users may override the default key properties of a stream by applying the __key_properties__ operation. Setting to null will ignore any keys set upstream, while setting to a list of strings will override those key names. (This has been added to the stream_maps.md documentation.)
  • The above override behavior should have the same outcome as if the user manually nulled out key_properties in the catalog.json file.

Regarding modification of replication keys

We do not yet have custom handling for replication key modification. However, given the flow diagram shown above, the replication key is only truly required (1) in operations related to the stream's get_records() method and (2) in creating and emitting the STATE file. As discussed above, the STATE message is the only part of this flow not touched by the stream map transforms, so this should just continue working as usual, despite the surrounding changes to SCHEMA and RECORD messages.

While it certainly is a best practice to store and pass the replication key value along with the other stream properties, the downstream target should (in theory at least) not be impacted by the replication key's absence.

Edited by AJ Steers

Merge request reports