Commit 36671df1 authored by Micaël Bergeron's avatar Micaël Bergeron

refactored the MeltanoStream(Reader/Writer)

parent 59161962
from meltano.stream import MeltanoStream
import pandas as pd
import asyncio
from typing import Sequence
from abc import ABC, abstractmethod
from meltano.stream.writer import MeltanoStreamWriter
from meltano.common.entity import MeltanoEntity
class MeltanoExtractor:
def __init__(self, stream: MeltanoStream, service: 'MeltanoService'):
def __init__(self, writer: MeltanoStreamWriter, service: 'MeltanoService'):
self.service = service
self.stream = stream
self.writer = writer
async def extract_all(self):
@abstractmethod
async def entities(self):
"""
Generates a list of MeltanoEntity from the data source.
"""
pass
@abstractmethod
async def extract(self, entity: MeltanoEntity):
"""
Generate DataFrame for a specified entity.
"""
pass
def send_all(self):
writer = self.stream.create_writer(self)
#self.start_extract()
writer.send_all()
#self.end_extract()
def run(self):
loop = asyncio.get_event_loop()
self.writer.send_all(loop, self)
......@@ -2,6 +2,7 @@ import asyncio
import pyarrow as pa
import pandas as pd
import json
import logging
from meltano.common.service import MeltanoService
from meltano.extract.base import MeltanoExtractor
......@@ -14,32 +15,17 @@ def sample_data(i, columns):
# gather the source data in the DataFrame
df = pd.DataFrame(data=d)
# should be constructed from a elt.schema.Schema
schema = pa.schema([
pa.field(column, pa.int32()) for column in columns
])
# convert it to a pyarrow.Table
table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
# very important, we need to know the source_table
table = table.replace_schema_metadata(metadata={
'meltano': json.dumps({
'entity_name': "com.meltano.marketo:Project",
'jobid': "8857"
})
})
return table
return df
class SampleExtractor(MeltanoExtractor):
async def extract_all(self):
i = 0
while True:
i = i + 1
await asyncio.sleep(10)
async def entities(self):
yield ['a', 'b', 'c']
async def extract(self, entity):
logging.debug(f"Extracting data for {entity}")
for i in range(10000):
await asyncio.sleep(0)
yield sample_data(i, ['a', 'b', 'c'])
......
import asyncio
from abc import ABC, abstractmethod
from meltano.common.service import MeltanoService
from meltano.common.entity import MeltanoEntity
from meltano.stream import MeltanoStream
from meltano.stream.reader import MeltanoStreamReader
class MeltanoLoader(ABC):
def __init__(self, stream: MeltanoStream, service: MeltanoService):
def __init__(self, reader: MeltanoStreamReader, service: MeltanoService):
self.service = service
self.stream = stream
self.reader = reader
def start_load(self):
pass
......@@ -19,8 +21,8 @@ class MeltanoLoader(ABC):
def end_load(self):
pass
def receive(self):
reader = self.stream.create_reader(self)
def run(self):
loop = asyncio.get_event_loop()
self.start_load()
reader.read()
self.reader.read_all(loop, self)
self.end_load()
......@@ -6,21 +6,17 @@ from .reader import MeltanoStreamReader
class MeltanoStream:
"""
Reads data serialized using the `MeltanoSink` writer.
"""
def __init__(self, fd, schema: Schema):
def __init__(self, fd):
"""
fd: file descriptor to use, it should be non-blocking.
"""
self.fd = fd
self.schema = schema
def create_reader(self, loader: 'MeltanoLoader'):
return MeltanoStreamReader(self, loader)
def create_reader(self):
return MeltanoStreamReader(self)
def create_writer(self, extractor: 'MeltanoExtractor'):
def create_writer(self):
"""
Send a DataFrame to the stream.
"""
return MeltanoStreamWriter(self, extractor)
return MeltanoStreamWriter(self)
import select
import asyncio
import logging
import json
import pyarrow as pa
......@@ -8,11 +7,10 @@ from meltano.common.entity import MeltanoEntity
class MeltanoStreamReader:
def __init__(self, stream, loader):
def __init__(self, stream):
self.stream = stream
self.loader = loader
def integrate(self, stream, loop):
def integrate(self, stream, loader, loop):
"""
Read a DataFrame from the stream.
"""
......@@ -25,18 +23,16 @@ class MeltanoStreamReader:
reader = pa.open_stream(stream)
metadata = self.read_metadata(reader)
for batch in reader:
self.loader.load(metadata, batch.to_pandas())
loader.load(metadata, batch.to_pandas())
except Exception as e:
logging.error("Stream cannot be read: {}".format(e))
finally:
loop.stop()
def read(self):
tap = open(self.stream.fd, 'rb')
loop = asyncio.get_event_loop()
loop.add_reader(tap, self.integrate, tap, loop)
loop.run_forever()
def read_all(self, loop, loader: 'MeltanoLoader'):
with open(self.stream.fd, 'rb') as tap:
loop.add_reader(tap, self.integrate, tap, loader, loop)
loop.run_forever()
def read_metadata(self, reader) -> MeltanoEntity:
raw_metadata = reader.schema.metadata[b'meltano']
......
import pyarrow as pa
import json
import asyncio
class MeltanoStreamWriter:
def __init__(self, stream, extractor):
def __init__(self, stream, chunksize=1000):
self.chunksize = chunksize
self.stream = stream
self.extractor = extractor
def send(self, sink, data):
writer = pa.RecordBatchStreamWriter(sink, data.schema)
writer.write_table(data, chunksize=100)
writer.write_table(data, chunksize=self.chunksize)
writer.close()
async def write(self):
with open(self.stream.fd, 'wb') as sink:
async for frame in self.extractor.extract_all():
self.send(sink, self.encode(frame))
async def write(self, sink, extractor):
async for entity in extractor.entities():
async for frame in extractor.extract(entity):
self.send(sink, self.encode(entity, frame))
def encode(self, frame):
def encode(self, entity, frame, **metadata):
page = pa.Table.from_pandas(frame, preserve_index=False)
page = page.replace_schema_metadata(metadata={
'meltano': json.dumps({
'entity_name': "com.meltano.marketo:Project",
'jobid': "8857"
})
'entity': entity,
'metadata': metadata
})
})
return page
def send_all(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self.write())
def send_all(self, loop, extractor: 'MeltanoExtractor'):
with open(self.stream.fd, 'wb') as sink:
loop.run_until_complete(
self.write(sink, extractor)
)
......@@ -3,7 +3,7 @@ from setuptools import setup, find_packages
setup(
name='meltano-common',
version='0.2.0-dev',
version='0.3.0-dev',
description='Meltano shared module.',
author='Meltano Team & Contributors',
author_email='meltano@gitlab.com',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment