use entity registration

parent decb5c2e
FROM registry.gitlab.com/meltano/meltano-cli:latest
ADD . /app/loader/postgresql
RUN pip install /app/loader/postgresql
......@@ -6,22 +6,26 @@ from abc import ABC, abstractmethod
from meltano.load.base import MeltanoLoader
from meltano.common.db import DB
from meltano.common.service import MeltanoService
from meltano.common.entity import MeltanoEntity
from meltano.common.entity import Entity
from meltano.common.process import integrate_csv_file # TODO: remove me
from meltano.stream import MeltanoStream
class PostgreSQLLoader(MeltanoLoader):
def load(self, entity, data):
def load(self, source_name, entity, data):
logging.info("Received entity {} with data {}.".format(entity, data))
# this is a hack to use CSV COPY From
memcsv = io.StringIO()
data.to_csv(memcsv, index=False)
is_pkey = lambda attr: attr.metadata.get('is_pkey', False)
primary_key = next(ifilter(is_pkey, entity.attributes), None)
with DB.open() as db:
integrate_csv_file(db, memcsv,
primary_key='id',
table_name=entity.schema['table_name'],
table_schema=entity.schema['schema_name'],
table_schema=source_name,
table_name=entity.alias,
primary_key=primary_key.name,
update_action="NOTHING")
......@@ -11,6 +11,6 @@ setup(name='meltano-load-postgresql',
install_requires=[
"SQLAlchemy",
"psycopg2>=2.7.4",
"meltano-common==0.2.0-dev"
"meltano-common"
]
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment