Commit fe500d16 authored by Micaël Bergeron's avatar Micaël Bergeron

refactor MeltanoEntity to Entity

parent 8ca6a67a
class MeltanoEntity:
def __init__(self, metadata):
import io
import pyarrow as pa
from typing import Optional
from meltano.schema import DBType
class TransientAttribute:
def __init__(self, name, data_type): = name
self.data_type = data_type
def copy(self):
return TransientAttribute(, self.data_type)
def __eq__(self, other):
return == and \
self.data_type == other.data_type
def __repr__(self):
return "{}: {}".format(, self.data_type)
class Attribute:
def __init__(self, alias,
input: Optional[TransientAttribute],
output: Optional[TransientAttribute],
self.alias = alias
self.input = input or TransientAttribute(alias, db_type)
self.output = output or self.input.copy()
self.metadata = metadata
def __repr__(self):
out = repr(self.input)
if self.output != self.input:
out += " as {}".format(self.output)
return out
def name(self):
def data_type(self):
return self.output.data_type
class Entity:
def __init__(self, alias, attributes=[], metadata={}):
self.schema = {
'table_name': "test_table",
'schema_name': "test_schema"
self.alias = alias
self.attributes = attributes
self.metadata = metadata
def __repr__(self):
out = io.StringIO()
[out.write("\n\t{}".format(attr)) for attr in self.attributes]
return out.getvalue()
# TODO: refactor this out
def as_pa_schema(self):
pa_types_map = {
DBType.String: pa.string(),
DBType.Integer: pa.int32(),
DBType.Double: pa.float64(),
DBType.Timestamp: pa.timestamp('s'),
DBType.Long: pa.int64(),
return pa.schema([
pa.field(, pa_types_map[attr.data_type]) \
for attr in self.attributes
......@@ -5,7 +5,7 @@ import logging
from typing import Sequence
from abc import ABC, abstractmethod
from import MeltanoStreamWriter
from meltano.common.entity import MeltanoEntity
from meltano.common.entity import Entity
class MeltanoExtractor:
......@@ -16,12 +16,12 @@ class MeltanoExtractor:
async def entities(self):
Generates a list of MeltanoEntity from the data source.
Generates a list of Entity from the data source.
async def extract(self, entity: MeltanoEntity):
async def extract(self, entity: Entity):
Generates DataFrames for a specified entity.
......@@ -4,7 +4,9 @@ import pandas as pd
import json
import logging
from itertools import count
from meltano.common.service import MeltanoService
from meltano.common.entity import Entity
from meltano.extract.base import MeltanoExtractor
......@@ -20,13 +22,11 @@ def sample_data(i, columns):
class SampleExtractor(MeltanoExtractor):
async def entities(self):
yield ['a', 'b', 'c']
yield Entity('Sample')
async def extract(self, entity):
# logging.debug(f"Extracting data for {entity}")
for i in range(1000):
await asyncio.sleep(3)
yield sample_data(i, entity)
for i in count():
yield sample_data(i, ['a', 'b', 'c'])
MeltanoService.register_extractor("com.meltano.extract.sample", SampleExtractor)
import asyncio
import pyarrow
from abc import ABC, abstractmethod
from meltano.common.service import MeltanoService
from meltano.common.entity import MeltanoEntity
from meltano.common.entity import Entity
from import MeltanoStreamReader
......@@ -15,9 +16,15 @@ class MeltanoLoader(ABC):
def load(self, entity: MeltanoEntity, data):
def load(self, entity: Entity, data):
def integrate(self, metadata, batch):
#entity = self.service.entities[metadata['entity_id']]
entity = Entity(metadata['entity_name'])
return self.load(entity, batch.to_pandas())
def end_load(self):
......@@ -3,7 +3,7 @@ import logging
import json
import pyarrow as pa
from meltano.common.entity import MeltanoEntity
from meltano.common.entity import Entity
class MeltanoStreamReader:
......@@ -21,9 +21,9 @@ class MeltanoStreamReader:
reader = pa.open_stream(stream)
metadata = self.read_metadata(reader)
for batch in reader:
loader.load(metadata, batch.to_pandas())
metadata = self.read_metadata(reader)
loader.integrate(metadata, batch)
except Exception as e:
logging.error("Stream cannot be read: {}".format(e))
......@@ -34,9 +34,8 @@ class MeltanoStreamReader:
loop.add_reader(tap, self.integrate, tap, loader, loop)
def read_metadata(self, reader) -> MeltanoEntity:
def read_metadata(self, reader) -> Entity:
raw_metadata = reader.schema.metadata[b'meltano']
raw_metadata = json.loads(raw_metadata.decode("utf-8"))
# return self.service.entities[raw_metadata['entity_name']]
return MeltanoEntity(raw_metadata)
return raw_metadata
import pyarrow as pa
import numpy as np
import json
from meltano.common.transform import columnify
class MeltanoStreamWriter:
def __init__(self, stream, chunksize=1000):
def __init__(self, stream, chunksize=10000):
self.chunksize = chunksize = stream
......@@ -11,15 +14,36 @@ class MeltanoStreamWriter:
data = self.encode(entity, frame)
writer = pa.RecordBatchStreamWriter(self._sink, data.schema)
writer.write_table(data, chunksize=self.chunksize)
# TODO: this should be inferred from the Entity at some point
# there also might be some other transformations that could be done
# this is the simplest transformation to make it work.
def normalize_df(self, df):
Transforms the DataFrame to rename the columns and convert python objects
to a json representation.
df.rename(columns=columnify, inplace=True)
for col, dtype in df.dtypes.items():
if dtype != np.dtype('object'):
xform = lambda x: x if isinstance(x, str) else json.dumps(x)
df[col] = df[col].map(xform)
return df
def encode(self, entity, frame, **metadata):
page = pa.Table.from_pandas(frame, preserve_index=False)
page = pa.Table.from_pandas(self.normalize_df(frame),
page = page.replace_schema_metadata(metadata={
'meltano': json.dumps({
'entity': entity,
'metadata': metadata
'entity_name': entity.alias,
'entity_uri': "com.meltano.{}.{}".format("fastly", entity.alias),
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment