Commit d093f92e authored by Micaël Bergeron's avatar Micaël Bergeron

wip: making tests pass

parent a6b9af81
......@@ -3,6 +3,7 @@ import json
from psycopg2.sql import Identifier, SQL, Placeholder
from enum import Enum
from elt.db import DB
from elt.schema import Schema, Column, DBType
from elt.error import Error
from functools import partial
......@@ -121,36 +122,33 @@ class Job:
}
@classmethod
def save(self, cursor, job, commit=False):
def save(self, job):
job_serial = job.__dict__()
columns, values = (job_serial.keys(), job_serial.values())
insert = SQL(("INSERT INTO {}.{} ({}) "
"VALUES ({}) "))
cursor.execute(
insert.format(
*self.identifier(),
SQL(",").join(map(Identifier, columns)),
SQL(",").join(Placeholder() * len(values)),
),
list(values),
)
with DB.open() as db, db.cursor() as cursor:
cursor.execute(
insert.format(
*self.identifier(),
SQL(",").join(map(Identifier, columns)),
SQL(",").join(Placeholder() * len(values)),
),
list(values),
)
db.commit()
if commit:
cursor.commit()
return True
return job
@classmethod
def for_elt(self, cursor, elt_uri, limit=100):
def for_elt(self, elt_uri, limit=100):
fetch = SQL(("SELECT elt_uri, state, started_at, ended_at, payload FROM {}.{} "
"WHERE elt_uri = %s "
"ORDER BY started_at DESC "
"LIMIT %s ")).format(*self.identifier())
cursor.execute(fetch, (elt_uri, limit))
def as_job(row):
return Job(row[0],
state=State[row[1]],
......@@ -158,4 +156,6 @@ class Job:
ended_at=row[3],
payload=row[4])
return list(map(as_job, cursor.fetchall()))
with DB.open() as db, db.cursor() as cursor:
cursor.execute(fetch, (elt_uri, limit))
return list(map(as_job, cursor.fetchall()))
import os
import pytest
import psycopg2
import psycopg2.sql as sql
from elt.utils import db_open
from elt.db import DB
class NoCommitConnection(psycopg2.extensions.connection):
def commit(self):
pass
@pytest.fixture(scope='session')
def db_args(request):
return {
args = {
'database': "pytest",
'host': os.getenv("PG_ADDRESS"),
'port': os.getenv("PG_PORT", 5432),
'user': os.getenv("PG_USERNAME"),
'password': os.getenv("PG_PASSWORD"),
}
DB.register(**args)
DB.set_connection_class(NoCommitConnection)
@pytest.fixture()
def dbcursor(request, db_args):
connection = psycopg2.connect(**db_args, )
transaction = connection.cursor()
def db(request, db_args):
connection = DB.connect()
def teardown():
connection.close()
request.addfinalizer(teardown)
return transaction
return connection
......@@ -9,20 +9,20 @@ def sample_job(payload={}):
payload=payload)
def test_save(dbcursor):
assert(Job.save(dbcursor, sample_job()))
def test_save(db):
assert(Job.save(sample_job()))
def test_load(dbcursor):
def test_load(db):
for i in range(0, 10):
Job.save(dbcursor, sample_job({'key': i}))
Job.save(sample_job({'key': i}))
jobs = Job.for_elt(dbcursor, 'elt://bizops/sample-elt')
jobs = Job.for_elt('elt://bizops/sample-elt')
[print(x.__dict__()) for x in jobs]
assert(len(jobs) == 10)
def test_transit(dbcursor):
def test_transit(db):
j = sample_job()
transition = j.transit(State.RUNNING)
......@@ -33,4 +33,4 @@ def test_transit(dbcursor):
assert(transition == (State.RUNNING, State.SUCCESS))
j.ended_at = datetime.utcnow()
Job.save(dbcursor, j)
Job.save(j)
......@@ -3,29 +3,6 @@ import psycopg2
import logging
from functools import reduce
from elt.job import describe_schema
from elt.schema import schema_apply
db_config_keys = [
"host",
"port",
"user",
"password",
"database",
]
class db_open:
def __init__(self, **kwargs):
self.config = {k: kwargs[k] for k in db_config_keys}
def __enter__(self, **kwargs):
self.connection = psycopg2.connect(**self.config)
return self.connection
def __exit__(self, type, value, traceback):
self.connection.close()
# from https://github.com/jonathanj/compose/blob/master/compose.py
......@@ -45,13 +22,3 @@ def setup_logging(args):
logging.basicConfig(stream=sys.stdout,
format="[%(levelname)s][%(asctime)s] %(message)s",
level=int(args.log_level))
def setup_db(args):
with db_open(**vars(args)) as db:
schema_apply(db, describe_schema())
def setup_elt(args):
setup_logging(args)
setup_db(args)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment