Target Stream Maps
Closes #63 (closed)
This MR adds the stream map capability for Targets.
Why development experience doesn't change
It's worth reiterating that the Tap or Target developer does not need to know or care if stream maps are being used. This is because of where exactly the stream map operations exist in the pipeline.
Here's a crude logical flow diagram:
Tap.sync_all()
->Stream.get_records()
->(map)
->(emit records)
->Target.listen()
->(map)
->Sink.__init__()
->Sink.process_record()
For Taps: The map operation occurs after all other operations are performed by the developer's custom code, immediately before writing the RECORD
and SCHEMA
messages.
For Targets: The map operation occurs before any Sink methods are called from the developer's custom code. From the Sink's perspective, the only record
they receive is the transformed record and the only schema
they will receive is the transformed schema. Same also for the stream name or alias. Only the transformed version will be seen by the sink.
Single Code Path, aka Mapping is always on
For simplicity, all taps and targets have a mapper class now and a StreamMap
object per each stream, regardless of whether they actually have custom stream maps specified by the user. If no map is applied, we have a default SameRecordTransform
which just passes along the record. This means we don't have to care at runtime whether custom maps are configured; if not configured it just uses "the boring mapper", which is to do nothing and pass along as-is.
Regarding State
One unexpected outcome of applying stream maps, is that (unlike RECORD
and SCHEMA
messages) the STATE
messages will not be modified in any way by the stream map operations. Rather, the STATE messages will remain true to the developer's original stream names, original schema definitions, and original record values. This respects the spec guidelines that the tap "owns" whatever is written into STATE
and the target's only role is to pass the state for serialization (via its STDOUT
pipe) after all preceding records are handled.
Because of the logical ordering of events, this does not create any problems for taps, but it does mean that it is all-the-more important that target should not try to make any assumptions about STATE
messages or parse/modify state directly.
Regarding modification of primary keys
The following rules apply when modifying one of the stream's primary key properties:
- Dropping a primary key property will result in an error, unless the
__key_properties__
operation is applied by the user to override or nullify the list of default keys. - Users may override the default key properties of a stream by applying the
__key_properties__
operation. Setting tonull
will ignore any keys set upstream, while setting to a list of strings will override those key names. (This has been added to thestream_maps.md
documentation.) - The above override behavior should have the same outcome as if the user manually nulled out
key_properties
in thecatalog.json
file.
Regarding modification of replication keys
We do not yet have custom handling for replication key modification. However, given the flow diagram shown above, the replication key is only truly required (1) in operations related to the stream's get_records()
method and (2) in creating and emitting the STATE file. As discussed above, the STATE message is the only part of this flow not touched by the stream map transforms, so this should just continue working as usual, despite the surrounding changes to SCHEMA and RECORD messages.
While it certainly is a best practice to store and pass the replication key value along with the other stream properties, the downstream target should (in theory at least) not be impacted by the replication key's absence.