Commit 593ad4e5 authored by Micaël Bergeron's avatar Micaël Bergeron

wip: making the singer parser

parent 750c00b5
import argparse
from enum import Enum
from elt.cli import ActionEnum, OptionEnum, parser_logging
from elt.utils import setup_logging
def action_convert(args):
pass
class SchemaType(OptionEnum):
MELTANO = "meltano"
SINGER = "singer"
class Actions(ActionEnum):
CONVERT_SCHEMA = ('convert_schema', action_convert)
def parse():
parser = argparse.ArgumentParser(description="Use the Marketo Bulk Export to get Leads or Activities")
parser_logging(parser)
parser.add_argument('-s',
dest="source",
type=SchemaType,
choices=list(SchemaType),
required=True,
help="Specifies input schema type.")
parser.add_argument('-d',
dest="destination",
type=SchemaType,
choices=list(SchemaType),
default=SchemaType.MELTANO,
help="Specifies output schema type.")
parser.add_argument('action',
type=Actions.from_str,
choices=list(Actions),
default=Actions.CONVERT_SCHEMA,
help=("convert_schema: convert a schema into an external schema type."))
return parser.parse_args()
def main(args):
args.action(args)
if __name__ == '__main__':
args = parse()
setup_logging(args)
main(args)
......@@ -19,6 +19,18 @@ class OptionEnum(Enum):
return hash(self.value)
class ActionEnum(Enum):
@classmethod
def from_str(cls, name):
return cls[name.upper()]
def __str__(self):
return self.value[0]
def __call__(self, args):
return self.value[1](args)
class ExportOutput(OptionEnum):
DB = "db"
FILE = "file"
......
import json
from collections import OrderedDict
from functools import partial
from elt.schema import Schema
def loads(schema_name: str, raw: str) -> Schema:
# we must use an OrderedDict to benefit from the
# ordering of properties, see the `zip` below
raw_schema = json.loads(raw, object_pairs_hook=OrderedDict)
schema = Schema(schema_name)
def schema_column(table_name,
column_name,
data_type,
is_nullable,
is_mapping_key):
return Column(table_schema=schema_name,
table_name=table_name,
column_name=column_name,
data_type=str(data_type),
is_nullable=is_nullable,
is_mapping_key=is_mapping_key)
# streams → tables
for stream in raw_schema['streams'].items():
table_name = stream['stream']
metadata = stream['metadata']
# we can zip both these together to iterate on
# (column_def, column_meta) which can be useful
column_defs = stream['schema']['properties']
*column_meta, table_meta = stream['metadata']
mapping_keys = set(table_meta['metadata']['table-key-properties'])
table_column = partial(schema_column, table_name)
is_mapping_key = lambda col_def: col_def[0] in mapping_keys
for col_def, col_meta in zip(column_defs, column_meta):
column_name = col_def[0]
is_nullable = "null" in col_def[1]['type']
is_mapping_key = col_def[0] in mapping_keys
schema.add_column(table_column(
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment