Commit ce4580cd authored by Micaël Bergeron's avatar Micaël Bergeron

wip: working on the singer/kettle serializers for Schema

parent 593ad4e5
import argparse
import sys
from enum import Enum
from elt.cli import ActionEnum, OptionEnum, parser_logging
from elt.utils import setup_logging
from elt.schema.serializers.singer import load
from elt.schema.serializers.meltano import dump
def action_convert(args):
pass
schema = load("singer", sys.stdin)
dump(sys.stdout, schema)
class SchemaType(OptionEnum):
......
......@@ -71,6 +71,11 @@ class Schema:
def add_table(self, column: Column):
self.tables.add(Schema.table_key(column))
def add_column(self, column: Column):
self.add_table(column)
# TODO: raise on onverwrite?
self.columns[Schema.column_key(column)] = column
def column_diff(self, column: Column) -> Set[SchemaDiff]:
table_key = Schema.table_key(column)
column_key = Schema.column_key(column)
......
import logging
from xml.etree import ElementTree
from elt.schema import Schema, Column, DBType
def loads(schema_name: str, raw: str) -> Schema:
tree = ElementTree.fromstring(raw)
schema = Schema(schema_name)
def schema_column(table_name,
column_name,
data_type,
is_nullable,
is_mapping_key):
return Column(table_schema=schema_name,
table_name=table_name,
column_name=column_name,
data_type=data_type.value,
is_nullable=is_nullable,
is_mapping_key=is_mapping_key)
for table in
import json
import logging
from collections import OrderedDict
from functools import partial
from elt.schema import Schema
from elt.schema import Schema, Column, DBType
# (<type>, <format>): DBType
data_type_map = {
("string", None): DBType.String,
("string", "date-time"): DBType.Timestamp,
("boolean", None): DBType.Boolean,
("integer", None): DBType.Integer,
("number", None): DBType.Double,
("object", None): DBType.JSON,
}
def is_nullable(type_def) -> bool:
# empty def means any valid JSON
if not type_def:
return True
if 'anyOf' in type_def:
type_def = type_def['anyOf'][1] # the nullable one
return "null" in type_def['type']
def data_type(type_def) -> DBType:
# empty def means any valid JSON
if not type_def:
return DBType.JSON
if 'anyOf' in type_def:
type_def = type_def['anyOf'][0] # the format one
if isinstance(type_def['type'], list):
non_null = lambda x: x != "null"
_type = next(filter(non_null, type_def['type']))
else:
_type = type_def['type']
_format = type_def.get('format', None)
return data_type_map[(_type, _format)]
def loads(schema_name: str, raw: str) -> Schema:
......@@ -11,6 +53,7 @@ def loads(schema_name: str, raw: str) -> Schema:
raw_schema = json.loads(raw, object_pairs_hook=OrderedDict)
schema = Schema(schema_name)
def schema_column(table_name,
column_name,
data_type,
......@@ -19,17 +62,18 @@ def loads(schema_name: str, raw: str) -> Schema:
return Column(table_schema=schema_name,
table_name=table_name,
column_name=column_name,
data_type=str(data_type),
data_type=data_type.value,
is_nullable=is_nullable,
is_mapping_key=is_mapping_key)
# streams → tables
for stream in raw_schema['streams'].items():
for stream in raw_schema['streams']:
table_name = stream['stream']
metadata = stream['metadata']
# we can zip both these together to iterate on
# (column_def, column_meta) which can be useful
# (column_def, column_meta) which is a useful
# shortcut (no need to check the breadcrumb)
column_defs = stream['schema']['properties']
*column_meta, table_meta = stream['metadata']
......@@ -38,11 +82,20 @@ def loads(schema_name: str, raw: str) -> Schema:
table_column = partial(schema_column, table_name)
is_mapping_key = lambda col_def: col_def[0] in mapping_keys
for col_def, col_meta in zip(column_defs, column_meta):
for col_def, col_meta in zip(column_defs.items(), column_meta):
column_name = col_def[0]
is_nullable = "null" in col_def[1]['type']
is_mapping_key = col_def[0] in mapping_keys
try:
is_mapping_key = col_def[0] in mapping_keys
schema.add_column(table_column(column_name,
data_type(col_def[1]),
is_nullable(col_def[1]),
is_mapping_key))
except:
logging.error("Cannot parse column definition for {}: {}".format(table_name, col_def[0]))
return schema
schema.add_column(table_column(
def load(schema_name: str, reader) -> Schema:
return loads(schema_name, reader.read())
import elt.schema.serializer as serializer
import elt.schema.serializers.meltano as serializer
from functools import partial
from itertools import chain
......
import elt.schema.serializers.singer as serializer
def test_loads():
singer_catalog = """
{
"streams": [
{
"stream": "FieldPermissions",
"tap_stream_id": "FieldPermissions",
"schema": {
"type": "object",
"additionalProperties": false,
"properties": {
"Id": {
"type": "string"
},
"ParentId": {
"type": [
"null",
"string"
]
},
"SobjectType": {
"type": [
"null",
"string"
]
},
"Field": {
"type": [
"null",
"string"
]
},
"PermissionsEdit": {
"type": [
"null",
"boolean"
]
},
"PermissionsRead": {
"type": [
"null",
"boolean"
]
},
"SystemModstamp": {
"anyOf": [
{
"type": "string",
"format": "date-time"
},
{
"type": [
"string",
"null"
]
}
]
}
}
},
"metadata": [
{
"breadcrumb": [
"properties",
"Id"
],
"metadata": {
"inclusion": "automatic",
"selected-by-default": true
}
},
{
"breadcrumb": [
"properties",
"ParentId"
],
"metadata": {
"inclusion": "available",
"selected-by-default": true
}
},
{
"breadcrumb": [
"properties",
"SobjectType"
],
"metadata": {
"inclusion": "available",
"selected-by-default": true
}
},
{
"breadcrumb": [
"properties",
"Field"
],
"metadata": {
"inclusion": "available",
"selected-by-default": true
}
},
{
"breadcrumb": [
"properties",
"PermissionsEdit"
],
"metadata": {
"inclusion": "available",
"selected-by-default": true
}
},
{
"breadcrumb": [
"properties",
"PermissionsRead"
],
"metadata": {
"inclusion": "available",
"selected-by-default": true
}
},
{
"breadcrumb": [
"properties",
"SystemModstamp"
],
"metadata": {
"inclusion": "automatic",
"selected-by-default": true
}
},
{
"breadcrumb": [],
"metadata": {
"valid-replication-keys": [
"SystemModstamp"
],
"table-key-properties": [
"Id"
]
}
}
]
}
]
}
"""
schema = serializer.loads("singer", singer_catalog)
assert(len(schema.tables) == 1)
import pdb; pdb.set_trace()
def test_load():
singer_catalog_path = "/home/gitlab/git/gitlab/bizops/meltano/properties.json"
schema = serializer.load("singer", open(singer_catalog_path))
assert(len(schema.tables) > 20)
assert("Usage_Ping_Data__c" in schema.tables)
import pdb; pdb.set_trace()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment