Commit d2168b66 authored by Micaël Bergeron's avatar Micaël Bergeron

making the writer concurrent

parent 36671df1
import pandas as pd
import asyncio
import logging
from typing import Sequence
from abc import ABC, abstractmethod
......@@ -22,10 +23,34 @@ class MeltanoExtractor:
@abstractmethod
async def extract(self, entity: MeltanoEntity):
"""
Generate DataFrame for a specified entity.
Generates DataFrames for a specified entity.
"""
pass
async def extract_entity(self, entity):
async for frame in self.extract(entity):
self.writer.write(entity, frame)
async def extract_all(self, loop, entities):
tasks = []
try:
self.writer.open()
async for entity in entities():
tasks.append(
loop.create_task(self.extract_entity(entity))
)
# TODO: add timeout
result = await asyncio.gather(*tasks)
finally:
logging.info("Shutting down")
self.writer.close()
def run(self):
loop = asyncio.get_event_loop()
self.writer.send_all(loop, self)
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(
self.extract_all(loop, self.entities)
)
finally:
loop.close()
......@@ -23,10 +23,10 @@ class SampleExtractor(MeltanoExtractor):
yield ['a', 'b', 'c']
async def extract(self, entity):
logging.debug(f"Extracting data for {entity}")
for i in range(10000):
await asyncio.sleep(0)
yield sample_data(i, ['a', 'b', 'c'])
# logging.debug(f"Extracting data for {entity}")
for i in range(1000):
await asyncio.sleep(3)
yield sample_data(i, entity)
MeltanoService.register_extractor("com.meltano.extract.sample", SampleExtractor)
......@@ -7,16 +7,13 @@ class MeltanoStreamWriter:
self.chunksize = chunksize
self.stream = stream
def send(self, sink, data):
writer = pa.RecordBatchStreamWriter(sink, data.schema)
def write(self, entity, frame):
data = self.encode(entity, frame)
writer = pa.RecordBatchStreamWriter(self._sink, data.schema)
writer.write_table(data, chunksize=self.chunksize)
writer.close()
async def write(self, sink, extractor):
async for entity in extractor.entities():
async for frame in extractor.extract(entity):
self.send(sink, self.encode(entity, frame))
def encode(self, entity, frame, **metadata):
page = pa.Table.from_pandas(frame, preserve_index=False)
page = page.replace_schema_metadata(metadata={
......@@ -28,8 +25,8 @@ class MeltanoStreamWriter:
return page
def send_all(self, loop, extractor: 'MeltanoExtractor'):
with open(self.stream.fd, 'wb') as sink:
loop.run_until_complete(
self.write(sink, extractor)
)
def open(self):
self._sink = open(self.stream.fd, 'wb')
def close(self):
self._sink.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment