Skip to content

Add support for SQL Taps and Targets

AJ Steers requested to merge 74-database-type-streams into main

Not handled here:

  • LOG_BASED replication: #306 (closed)
  • Flattening: !236 (merged)
  • ACTIVATE_VERSION: !227 (closed)
  • Robust handling of column ALTERs: TODO
  • Handling MERGE UPSERTS: TODO
  • Deduping on primary keys during insert or upsert: TODO

Latest Spec as of 2021-10-29:

SQLConnector class

The new SQLConnector class handles the following:

- connecting to the source
- generating SQLAlchemy connection and engine objects
- discovering schema catalog entries
- performing type conversions to/from JSONSchema types
- dialect-specific functions, such as escaping and fully qualified names

Most developers should only need to override get_sqlalchemy_url. We also have the option for developers override create_sqlalchemy_engine() and/or create_sqlalchemy_connection() for purposes of performance tuning on the base engine and connection config.

Discovery

Discovery is handled by the method discover_catalog_entries(tap_config: dict) -> Dict[str, List[dict]]. This should not need to be overridden unless there are bugs and/or gaps in the DB API implementation.

Type conversions from SQL to JSON Schema

In the short term, some developers may also need to extend to_jsonschema_type() for any type conversions which are not standard or not yet handled by the SDK. That effort should shrink as more contributions come back to the SDK for generic and robust type conversion for an increasing number of SQL data types.

SQLStream class

The SQLStream leverages a SQLConnector for all core functionality.

  • connector_class - reference to the custom SQLConnector class

Stream performance tuning - option 1: get_records()

To improve performance, developers may optionally override get_records() if they can provide better performance vs the generic SQLAlchemy interfaces.

The base built-in implementation for get_records() is:

    def get_records(self, partition: Optional[dict]) -> Iterable[Dict[str, Any]]:
        for row in self.connector.connection.execute(
            sqlalchemy.text(f"SELECT * FROM {self.fully_qualified_name}")
        ):
            yield dict(row)

Stream performance tuning - option 2: BATCH (#9 (closed))

The long-term vision for performance tuning is to implement batch handlers as part of #9 (closed).

SQLTap class

The SQLTap leverages the SQLConnector from your stream class for all core functionality.

  • default_stream_class - reference to the custom SQLStream class. That tap will use its connector to discovery available streams.

Target and Sink implementation

This MR now incorporates the target implementation from !200 (merged). Essentially, both taps and targets rely on the SQLConnector class (which in turn build on sqlalchemy and DBAPI 2.0). By combining both taps and targets, the "end-to-end" and "round-trip" tests are more robust, and the overall implementation of SQLConnector class is better and more fully implemented.

Future iterations should add more tests and should expand the "standard" tests suite.

Important remaining todos:

  • Decide on connection pool strategy (currently each table's stream gets its own engine and connection).
  • Decide on stream_name and tap_stream_id naming conventions.
  • Address execution_options(stream_results=True) feedback regarding streaming instead of holding records in-mem
  • JSON Schema to SQL Type conversions - flexibility of allowing developers to override those. (Per @edgarrmondragon comment.)
  • Incremental not yet supported.
  • Property selection from catalog metadata (handled in base SDK code, can be further optimized in future.)
Edited by AJ Steers

Merge request reports