Verified Commit 36e02674 authored by Micaël Bergeron's avatar Micaël Bergeron

Merge remote-tracking branch 'load-postgresql/0.2.0-dev' into 0.3.0-dev

parents e92c194c 9d2edb43
......@@ -13,9 +13,6 @@ variables:
#
# If you want to also cache the installed packages, you have to install
# them in a virtualenv and cache it as well.
image: python:3.6
cache:
paths:
- .cache/pip
......@@ -63,7 +60,6 @@ test:
publish:
stage: deploy
before_script:
- echo "$PYPIRC" > ~/.pypirc
- pip install twine
script:
- python setup.py sdist
......
FROM registry.gitlab.com/meltano/meltano-cli:latest
ADD . /app/loader/postgresql
RUN pip install /app/loader/postgresql
File added
File added
from meltano.common.service import MeltanoService
from .loader import PostgreSQLLoader
MeltanoService.register_loader("com.meltano.load.postgresql", PostgreSQLLoader)
import asyncio
import sys
import argparse
import logging
from meltano.common.service import MeltanoService
from meltano.common.utils import setup_logging, setup_db
from meltano.common.cli import parser_db_conn, parser_logging
from meltano.schema import Schema
from meltano.stream import MeltanoStream
from .loader import PostgreSQLLoader
def parse():
parser = argparse.ArgumentParser(
description="Load data from stdin using PostgreSQL")
parser_db_conn(parser)
parser_logging(parser)
parser.add_argument('manifest_file',
type=str,
help=("Manifest file to load."))
return parser.parse_args()
def main(args):
service = MeltanoService()
# schema = service.load_schema(args.schema, args.manifest_file)
schema = Schema(args.schema, [])
# hardcode the schema at the moment, but this should be discovered at some point
stream = MeltanoStream(sys.stdin.fileno(), schema=schema)
loader = service.create_loader("com.meltano.load.postgresql", stream)
logging.info("Waiting for data...")
loader.receive()
logging.info("Integration complete.")
if __name__ == '__main__':
args = parse()
setup_logging(args)
setup_db(args)
main(args)
import logging
import pandas
import io
from abc import ABC, abstractmethod
from meltano.load.base import MeltanoLoader
from meltano.common.db import DB
from meltano.common.service import MeltanoService
from meltano.common.entity import Entity
from meltano.common.process import integrate_csv_file # TODO: remove me
from meltano.stream import MeltanoStream
class PostgreSQLLoader(MeltanoLoader):
def load(self, source_name, entity, data):
logging.info("Received entity {} with data {}.".format(entity, data))
# this is a hack to use CSV COPY From
memcsv = io.StringIO()
data.to_csv(memcsv, index=False)
is_pkey = lambda attr: attr.metadata.get('is_pkey', False)
primary_key = next(ifilter(is_pkey, entity.attributes), None)
with DB.open() as db:
integrate_csv_file(db, memcsv,
table_schema=source_name,
table_name=entity.alias,
primary_key=primary_key.name,
update_action="NOTHING")
......@@ -2,9 +2,9 @@
from setuptools import setup, find_packages
setup(
name='meltano-common',
version='0.3.0-dev3',
description='Meltano shared module.',
name='meltano',
version='0.2.0-dev',
description='Meltano framework.',
author='Meltano Team & Contributors',
author_email='meltano@gitlab.com',
url='https://gitlab.com/meltano/meltano',
......
File added
File added
test_table:
whitelisted:
- id: integer
- age: integer
- name: character varying
import pyarrow as pa
import pandas as pd
import json
import sys
import os
d = {
'id': range(200),
'name': ["John", "Steve"] * 100,
'age': [43, 33] * 100
}
# gather the source data in the DataFrame
df = pd.DataFrame(data=d)
# should be constructed from a elt.schema.Schema
schema = pa.schema([
pa.field('name', pa.string()),
pa.field('age', pa.int32()),
])
# convert it to a pyarrow.Table
table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
# very important, we need to know the source_table
table = table.replace_schema_metadata(metadata={
'meltano': json.dumps({
'entity_name': "project",
'jobid': "8857"
})
})
# write to stdout
sink = os.fdopen(sys.stdout.fileno(), "wb")
#sink = open("test.arrow", "wb")
#sink = pa.BufferOutputStream()
writer = pa.RecordBatchStreamWriter(sink, table.schema)
# manage chunking
chunk_size = 5
for batch in table.to_batches():
writer.write_batch(batch)
writer.close()
sink.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment