remove PEP420 namespaces (not needed in a single package)

parent b329e4ce
......@@ -21,8 +21,8 @@ service = MeltanoService()
def build_extractor(name):
# this should register the module
try:
# this should register the module
importlib.import_module("meltano.extract.{}".format(name))
except ImportError as e:
logging.error("Cannot find the extractor {0}, you might need to install it (meltano-extract-{0})".format(name))
......
......@@ -7,6 +7,7 @@ class MeltanoService:
loaders = dict()
extractors = dict()
@classmethod
def register_loader(cls, loader_id, loader_class):
"""
......@@ -15,6 +16,7 @@ class MeltanoService:
# TODO test if loader_class > MeltanoLoader
cls.loaders[loader_id] = loader_class
@classmethod
def register_extractor(cls, extractor_id, extractor_class):
"""
......@@ -23,15 +25,19 @@ class MeltanoService:
# TODO test if extractor_class > MeltanoExtractor
cls.extractors[extractor_id] = extractor_class
def __init__(self):
self._entities = {}
def create_loader(self, loader_id, stream):
return MeltanoService.loaders[loader_id](stream, self)
def create_extractor(self, extractor_id, stream):
return MeltanoService.extractors[extractor_id](stream, self)
def register_entity(self, entity_id, entity: 'Entity'):
"""
entity_id: "urn:com.meltano:entity:{source}:{entity}"
......@@ -42,6 +48,7 @@ class MeltanoService:
self._entities[entity_id] = entity
return entity_id
def register_manifest(cls, manifest: 'Manifest'):
"""
manifest: The manifest to register.
......@@ -53,15 +60,18 @@ class MeltanoService:
return [cls.register_entity(build_id(entity), entity) \
for entity in manifest.entities]
def get_entity(self, entity_id):
return self._entities[entity_id]
def auto_discover(self):
packages = pkg_resources.AvailableDistributions() # scan sys.path
addons = filter(lambda pkg: re.search(r'meltano-(load|extract)-\w+'))
return list(map(importlib.import_module, addons))
def load_schema(self, schema_name, schema_file):
serializer = MeltanoSerializer(schema_name)
with open(schema_file, 'r') as f:
......
......@@ -11,12 +11,14 @@ from meltano.common.entity import Entity
class MeltanoExtractor:
source_name = None
def __init__(self, writer: MeltanoStreamWriter, service: 'MeltanoService',
source_name=None):
self.source_name = source_name or self.__class__.source_name
self.service = service
self.writer = writer
@abstractmethod
async def entities(self):
"""
......@@ -24,6 +26,7 @@ class MeltanoExtractor:
"""
pass
@abstractmethod
async def extract(self, entity: Entity):
"""
......@@ -31,10 +34,12 @@ class MeltanoExtractor:
"""
pass
async def extract_entity(self, entity):
async for frame in self.extract(entity):
self.writer.write(self.source_name, entity, frame)
async def extract_all(self, loop, entities):
tasks = []
try:
......@@ -50,8 +55,10 @@ class MeltanoExtractor:
logging.info("Shutting down")
self.writer.close()
def run(self):
try:
# TODO: add error handling
loop = asyncio.get_event_loop()
loop.run_until_complete(
self.extract_all(loop, self.entities)
......
......@@ -18,6 +18,7 @@ from meltano.common.entity import Entity, Attribute, TransientAttribute
URL = "https://api.fastly.com/"
# TODO: refactor to utils
pandas_to_dbtype = {
np.dtype('object'): DBType.String,
......@@ -30,6 +31,7 @@ pandas_to_dbtype = {
np.dtype('datetime64'): DBType.Timestamp
}
# TODO: refactor to utils
def df_to_entity(alias, df):
"""
......@@ -55,6 +57,7 @@ class FastlyExtractor(MeltanoExtractor):
source_name = "fastly"
def create_session(self):
headers = {
'Fastly-Key': os.getenv("FASTLY_API_TOKEN"),
......@@ -63,9 +66,11 @@ class FastlyExtractor(MeltanoExtractor):
session = aiohttp.ClientSession(headers=headers)
return session
def url(self, endpoint):
return "".join((URL, endpoint))
# TODO: refactor this out in a HTTP loader
async def req(self, session, endpoint, payload={}):
url = self.url(endpoint)
......@@ -76,6 +81,7 @@ class FastlyExtractor(MeltanoExtractor):
return json_normalize(await resp.json())
# TODO: refactor this out in a discovery component
async def entities(self):
"""
......@@ -85,6 +91,7 @@ class FastlyExtractor(MeltanoExtractor):
billing = await self.req(session, "billing/v2/year/2018/month/06")
yield df_to_entity("Billing", billing)
def discover_entities(self):
async def drain(generator):
results = []
......@@ -99,6 +106,7 @@ class FastlyExtractor(MeltanoExtractor):
return entities
async def extract(self, entity):
async with self.create_session() as session:
for year, month in itertools.product(range(2017, 2018),
......
......@@ -24,8 +24,10 @@ class SampleExtractor(MeltanoExtractor):
async def entities(self):
yield Entity('Sample')
async def extract(self, entity):
for i in count():
await asyncio.sleep(1)
yield sample_data(i, ['a', 'b', 'c'])
......
......@@ -10,6 +10,7 @@ class MeltanoStreamReader:
def __init__(self, stream):
self.stream = stream
def integrate(self, stream, loader, loop):
"""
Read a DataFrame from the stream.
......@@ -29,11 +30,14 @@ class MeltanoStreamReader:
finally:
loop.stop()
def read_all(self, loop, loader: 'MeltanoLoader'):
with open(self.stream.fd, 'rb') as tap:
# TODO add error handling
loop.add_reader(tap, self.integrate, tap, loader, loop)
loop.run_forever()
def read_metadata(self, reader) -> Entity:
raw_metadata = reader.schema.metadata[b'meltano']
raw_metadata = json.loads(raw_metadata.decode("utf-8"))
......
......@@ -10,6 +10,7 @@ class MeltanoStreamWriter:
self.chunksize = chunksize
self.stream = stream
def write(self, source_name, entity, frame):
data = self.encode(source_name, entity, frame)
......@@ -17,6 +18,7 @@ class MeltanoStreamWriter:
writer.write_table(data)
writer.close()
# TODO: this should be inferred from the Entity at some point
# there also might be some other transformations that could be done
# this is the simplest transformation to make it work.
......@@ -35,6 +37,7 @@ class MeltanoStreamWriter:
return df
def encode(self, source_name, entity, frame, **metadata):
page = pa.Table.from_pandas(self.normalize_df(frame),
schema=entity.as_pa_schema(),
......@@ -49,8 +52,11 @@ class MeltanoStreamWriter:
return page
# TODO: use a context manager
def open(self):
self._sink = open(self.stream.fd, 'wb')
def close(self):
self._sink.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment