Verified Commit f0cbe507 authored by Micaël Bergeron's avatar Micaël Bergeron

Merge branch '0.3.0-dev' of gitlab.com:mbergeron/meltano-common into 0.3.0-dev

parents ee84bc04 bef4a636
from meltano.schema import Schema, Column, DBType
class Manifest:
def __init__(self, version, source_name, entities=[]):
self.version = version
self.source_name = source_name
self.entities = entities
def as_schema(self):
columns = []
for entity in self.entities:
for attr in entity.attributes:
columns.append(
Column(table_schema=self.source_name,
table_name=entity.alias,
column_name=attr.name,
data_type=attr.data_type,
is_mapping_key=attr.metadata.get('is_pkey', False),
is_nullable=attr.metadata.get('is_nullable', True))
)
return Schema(self.source_name, columns)
import yaml
from meltano.common.entity import Entity, Attribute, TransientAttribute
from .manifest import Manifest
class ManifestReader:
def __init__(self, source_name):
self.source_name = source_name
def load(self, file):
return self.loads(file.read())
def loads(self, raw):
raw_manifest = yaml.load(raw)
version = raw_manifest.pop('version')
entities = [self.parse_entity(entity_name, entity_def) \
for entity_name, entity_def in raw_manifest.items()]
return Manifest(version, self.source_name, entities)
def parse_entity(self, entity_name, entity_def) -> Entity:
attributes = [self.parse_attribute(attr) \
for attr in entity_def['attributes']]
return Entity(entity_name, attributes=attributes)
def parse_attribute(self, attribute_def) -> Attribute:
input, output = map(self.parse_transient_attribute, (
attribute_def.get('input'),
attribute_def.get('output')
))
return Attribute(attribute_def['alias'], input, output,
metadata=attribute_def['metadata'])
def parse_transient_attribute(self, transient_attribute_def) -> TransientAttribute:
if transient_attribute_def is None:
return None
return TransientAttribute(transient_attribute_def['name'],
transient_attribute_def['type'])
import yaml
import yamlordereddictloader
from collections import OrderedDict
from .manifest import Manifest
class ManifestWriter:
def __init__(self, file):
self.file = file
def write(self, manifest: Manifest):
entities = [
(entity.alias, self.raw_entity(entity)) \
for entity in manifest.entities
]
raw_manifest = OrderedDict([
('version', manifest.version),
*entities
])
yaml.dump(raw_manifest, self.file,
Dumper=yamlordereddictloader.SafeDumper,
default_flow_style=False)
def raw_entity(self, entity):
return {
'attributes': [
self.raw_attribute(attr) \
for attr in entity.attributes
]
}
def raw_attribute(self, attribute):
raw_attribute = {
'alias': attribute.alias,
'input': self.raw_transient_attribute(attribute.input),
}
if attribute.input != attribute.output:
raw_attribute['output'] = self.raw_transient_attribute(attribute.output)
if attribute.metadata:
raw_attribute['metadata'] = attribute.metadata
return raw_attribute
def raw_transient_attribute(self, transient_attribute):
return {
'name': transient_attribute.name,
'type': transient_attribute.data_type,
}
......@@ -20,6 +20,7 @@ setup(
"pandas",
"pyarrow",
"pyyaml",
"yamlordereddictloader",
"requests",
"attrs"
]
......
import io
from meltano.common.manifest import *
from meltano.common.manifest_reader import *
from meltano.common.manifest_writer import *
SAMPLE = """
version: '1.0'
person:
attributes:
- alias: product_id
input:
name: Product ID
type: string
output:
name: product_id
type: string
metadata:
is_pkey: False
"""
def test_reader():
reader = ManifestReader("sample")
manifest = reader.loads(SAMPLE)
assert(manifest.version == "1.0")
assert(len(manifest.entities) == 1)
def test_writer():
manifest = ManifestReader("sample").loads(SAMPLE)
buf = io.StringIO()
writer = ManifestWriter(buf)
writer.write(manifest)
round_trip = ManifestReader("sample").loads(buf.getvalue())
assert(manifest.version == round_trip.version)
for a, b in zip(manifest.entities, round_trip.entities):
assert(a.alias == b.alias)
......@@ -4,7 +4,7 @@ import psycopg2
import psycopg2.sql as sql
import logging
from meltano.db import DB, Session
from meltano.common.db import DB, Session
from sqlalchemy import MetaData
logging.basicConfig(level=logging.INFO)
......
from meltano.db import DB
from meltano.common.db import DB
def test_connect():
......
import pytest
from meltano.db import DB
from meltano.job import Job, State
from meltano.common.job import Job, State
from datetime import datetime
def sample_job(payload={}):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment