Allow stream-level and property-level transformations to be defined in `meltano.yml` (Stream Maps)
Expand Original Issue Description
Inspired by https://www.dropbase.io/, PipelineWise transformations (https://transferwise.github.io/pipelinewise/user_guide/transformations.html), and our existing extractor extras, I'm imagining something like:
extractors:
- name: tap-example
# ...
transform:
<entity>:
<function>: <args>
<attribute>:
<function>: <args>
Since these transformations would act on an extractor's output: a stream of Singer SCHEMA
and RECORD
messages, we could relatively easily support functions for:
- renaming (prefixing, suffixing) entity (stream, table) or attribute (property, column) names
- dropping entities or attributes, in cases where a tap doesn't support discovery mode and entity selection
- adding attributes with predefined or dynamic values, like PipelineWise's metadata columns: https://transferwise.github.io/pipelinewise/user_guide/metadata_columns.html
- filtering records based on one or more attribute values, keeping only those that do (or don't!) match (
drop_if
,drop_unless
?) - replacing text in attribute values
- replacing empty strings with nulls
- replacing nulls with a string
- changing attribute types and casting values, which can go beyond overriding the JSON schema using the
schema
extra
Functions could take arguments of any type: a simple string, an array of values, or an object with additional keys. If a function doesn't take any attribute (like drop
), it could just take a true
boolean.
Since functions could have object arguments, Meltano would not immediately be able to distinguish between {entity: {attribute: {function: scalar_value}}}
and {entity: {function: {nested_key: nested_value}}}
, where the key nested under an entity identifier could either be an attribute identifier or a function name. We don't have this issue with metadata
and schema
extras, because metadata values cannot (so far) be objects, and since schema info can only be specified for attributes, not entities as a whole. Perhaps we can add a special _
or _self
or _entity
key at the attribute level to nest entity-level transformation functions under, in cases where they need object values.
Background (Updated 2021-12-15)
There would be a large advantage to being able to enable transformations like those from pipelinewise-transform-field and Meltano SDK's Inline Stream Maps to be defined natively in meltano.yml
config.
This opens up a large number of use cases defined on the SDK docs site:
Stream-Level Mapping Applications
- Stream aliasing: streams can be aliased to provide custom naming downstream.
- Stream filtering: streams records can be filtered based on any user-defined logic.
- Stream duplication: streams can be split or duplicated and then sent as multiple distinct streams to the downstream target.
Property-Level Mapping Applications
- Property-level aliasing: properties can be renamed in the resulting stream.
- Property-level transformations: properties can be transformed inline.
- Property-level exclusions: properties can be removed from the resulting stream.
- Property-level additions: new properties can be created based on inline user-defined expressions.
As well as fixes for these common issues:
- Applying selection rules to taps that don't support selection.
- Resolve issues caused by taps that use selection rules to filter
RECORD
messages but not theSCHEMA
messages that is used to create target tables. - Resolve compatibility issues from taps that send data types that the chosen target cannot understand.
- Resolve compatibility issues from taps that send
ACTIVATE_VERSION
messages to targets that don't understand them. - Need for "record flattening" when neither the tap or target supports this feature natively.
Proposal: Map transforms as properties of extractors and loaders (Updated 2021-12-15)
After running the following...
meltano add mapper meltano-map-transform
meltano add mapper pipelinewise-transform-field
... you would be able to provide a config such as:
mappers:
- name: meltano-map-transform
pip_url: meltano-map-transform
config: # Optionally, a default config.
# ...
- name: pipelinewise-transform-field
pip_url: pipelinewise-transform-field
- name: no-activate-version. # A fictional mapper that removes ACTIVATE_VERSION messages
pip_url: no-activate-version
config: # Optionally, a default config.
# ...
extractors:
- name: tap-gitlab
# ...
mappings:
- name: pii-hasher # The name of the map transform to apply.
mapper: meltano-map-transform # The mapper plugin to use.
config: # What will be sent to the transformer in a config.json file.
stream_maps:
customers:
id_hashed: md5(record['id'])
id: None
loaders:
- name: target-salesforce
# ...
mappings:
- name: add-global-guid # The name of the map transform to apply.
mapper: meltano-map-transform # The mapper plugin to use.
config: # What will be sent to the transformer in a config.json file.
stream_maps:
customers:
guid: md5(record['id'])
- name: target-csv
# ...
mappings:
- name: flatten-records # The name of the map transform to apply.
mapper: meltano-map-transform # The mapper plugin to use.
default: true # Transform prepended automatically if default=true.
config: # What will be sent to the transformer in a config.json file.
flatten_records: true
- name: compat-fix # The name of the map transform to apply.
mapper: no-activate-version # The mapper plugin to use.
# config is omitted if the generic config is sufficient
One or more map transforms may be placed between tap and target:
meltano run tap-gitlab pii-hasher target-salesfoce
meltano run tap-gitlab pii-hasher flatten-records target-csv
Since default=true
for flatten-records
and compat-fix
on target-csv
, these are all equivalent:
meltano run tap-gitlab flatten-records target-csv
meltano run tap-gitlab target-csv
Note: For the example config above, this command would fail because flatten-records
is only defined for target-csv
and is not defined for tap-gitlab
nor target-salesforce
: meltano run tap-gitlab flatten-records target-salesforce
.