Commit c8375b0b authored by Micaël Bergeron's avatar Micaël Bergeron

refactoring the zuora extractor

parent d093f92e
import psycopg2
import os
db_config_keys = [
"host",
"port",
"user",
"password",
"database",
]
class DB:
db_config = {
'host': os.getenv('PG_ADDRESS', 'localhost'),
'port': os.getenv('PG_PORT', 5432),
'user': os.getenv('PG_USERNAME', os.getenv('USER')),
'password': os.getenv('PG_PASSWORD'),
'database': os.getenv('PG_DATABASE'),
}
connection_class = psycopg2.extensions.connection
_connection = None
@classmethod
def setup(self, open_persistent=False, **kwargs):
self.db_config.update({k: kwargs[k] for k in db_config_keys if k in kwargs})
# self._engine = create_engine(self.engine_uri())
self._connection = self.connect()
@classmethod
def connect(self):
if self._connection is not None:
return self._connection
return psycopg2.connect(**self.db_config,
connection_factory=self.connection_class)
@classmethod
def open(self):
return db_open()
@classmethod
def engine_uri():
return "postgresql://{username}:{password}@{host}:{port}/{database}".format(**self.db_config)
@classmethod
def set_connection_class(self, cls):
self.connection_class = cls
@classmethod
def close(self):
if self.connection is not None:
self.connection.close()
class db_open:
def __enter__(self):
self.connection = DB.connect()
return self.connection
def __exit__(self, ex_type, ex_value, traceback):
if ex_value is None:
self.connection.commit()
else:
self.connection.rollback()
......@@ -59,7 +59,7 @@ class State(Enum):
class Job:
"""
Represents a Job at a certain state (JobState).
Represents a Job at a certain state (State).
"""
schema_name = PG_SCHEMA
table_name = PG_TABLE
......@@ -69,10 +69,12 @@ class Job:
return map(Identifier, (self.schema_name, self.table_name))
def __init__(self, elt_uri,
id=None,
state=State.IDLE,
started_at=None,
ended_at=None,
payload={}):
self.id = id
self.elt_uri = elt_uri
self._state = state
self._started_at = started_at
......@@ -138,7 +140,6 @@ class Job:
),
list(values),
)
db.commit()
return job
......
......@@ -8,7 +8,8 @@ import logging
def write_to_db_from_csv(db_conn, csv_file, *,
primary_key,
table_schema,
table_name):
table_name,
header_transform=lambda x: x):
"""
Write to Postgres DB from a CSV
......@@ -81,7 +82,8 @@ def write_to_db_from_csv(db_conn, csv_file, *,
def upsert_to_db_from_csv(db_conn, csv_file, *,
primary_key,
table_schema,
table_name):
table_name,
header_transform=lambda x: x):
"""
Upsert to Postgres DB from a CSV
......@@ -114,7 +116,7 @@ def upsert_to_db_from_csv(db_conn, csv_file, *,
psycopg2.sql.Identifier("pg_temp"),
tmp_table,
psycopg2.sql.SQL(', ').join(
psycopg2.sql.Identifier(n) for n in header.split(','),
psycopg2.sql.Identifier(header_transform(n)) for n in header.split(','),
),
)
logging.debug(copy_query.as_string(cursor))
......
......@@ -2,16 +2,17 @@ import os
import pytest
import psycopg2
import psycopg2.sql as sql
import logging
from elt.db import DB
class NoCommitConnection(psycopg2.extensions.connection):
def commit(self):
pass
print("db.commit() bypass for pytest")
@pytest.fixture(scope='session')
def db_args(request):
def db_setup(request):
args = {
'database': "pytest",
'host': os.getenv("PG_ADDRESS"),
......@@ -19,15 +20,15 @@ def db_args(request):
'user': os.getenv("PG_USERNAME"),
'password': os.getenv("PG_PASSWORD"),
}
DB.register(**args)
DB.set_connection_class(NoCommitConnection)
DB.setup(**args)
@pytest.fixture()
def db(request, db_args):
def db(request, db_setup):
connection = DB.connect()
def teardown():
connection.close()
connection.rollback()
request.addfinalizer(teardown)
return connection
from elt.db import DB
def test_connect():
db_conn = DB.connect()
assert(db_conn)
......@@ -3,6 +3,7 @@ import psycopg2
import logging
from functools import reduce
from elt.db import DB
# from https://github.com/jonathanj/compose/blob/master/compose.py
......@@ -17,6 +18,11 @@ def compose(*fs):
"""
return reduce(lambda f, g: lambda x: f(g(x)), fs, lambda x: x)
def setup_db(args=None):
if args is None:
DB.setup()
else:
DB.setup(**vars(args))
def setup_logging(args):
logging.basicConfig(stream=sys.stdout,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment