Commit 6b36dc0f authored by Micaël Bergeron's avatar Micaël Bergeron

make the incremental load possible

parent c203be80
import psycopg2
import json
import sqlalchemy.types as types
import logging
from psycopg2.sql import Identifier, SQL, Placeholder
from enum import Enum
......@@ -51,6 +52,10 @@ class Job(SystemModel):
ended_at = Column(types.DateTime)
payload = Column(types.JSON)
def __init__(self, **kwargs):
kwargs['state'] = kwargs.get('state', State.IDLE)
super().__init__(**kwargs)
def transit(self, state: State) -> (State, State):
transition = (self.state, state)
......@@ -58,6 +63,7 @@ class Job(SystemModel):
raise ImpossibleTransitionError(transition)
self.state = state
logging.debug("Job {} → {}.".format(self, state))
return transition
def __repr__(self):
......
......@@ -79,6 +79,38 @@ def write_to_db_from_csv(db_conn, csv_file, *,
logging.error(err)
def create_tmp_table(db_conn, table_schema, table_name):
tmp_table_name = "_".join((table_name, 'tmp'))
schema = psycopg2.sql.Identifier(table_schema)
table = psycopg2.sql.Identifier(table_name)
tmp_table = psycopg2.sql.Identifier(tmp_table_name)
# Create temp table
cursor = db_conn.cursor()
create_table = psycopg2.sql.SQL("CREATE TEMP TABLE {0} AS SELECT * FROM {1}.{2} LIMIT 0").format(
tmp_table,
schema,
table,
)
cursor.execute(create_table)
logging.debug(create_table.as_string(cursor))
return tmp_table_name
def update_set_stmt(columns):
"""
[columns] ⇒ "<column>=excluded.<column>,.."
"""
# Update primary table
set_cols = {col: '.'.join(['excluded', col]) for col in columns}
rep_colon = re.sub(':', '=', json.dumps(set_cols))
rep_brace = re.sub('{|}', '', rep_colon)
set_cols = re.sub('\.', '"."', rep_brace)
return set_cols
def upsert_to_db_from_csv(db_conn, csv_file, *,
primary_key,
table_schema,
......@@ -99,17 +131,9 @@ def upsert_to_db_from_csv(db_conn, csv_file, *,
schema = psycopg2.sql.Identifier(table_schema)
table = psycopg2.sql.Identifier(table_name)
tmp_table = psycopg2.sql.Identifier(table_name + "_tmp")
# Create temp table
create_table = psycopg2.sql.SQL("CREATE TEMP TABLE {0} AS SELECT * FROM {1}.{2} LIMIT 0").format(
tmp_table,
schema,
table,
)
cursor.execute(create_table)
logging.debug(create_table.as_string(cursor))
db_conn.commit()
tmp_table = psycopg2.sql.Identifier(create_tmp_table(db_conn,
table_schema,
table_name))
# Import into TMP Table
copy_query = psycopg2.sql.SQL("COPY {0}.{1} ({2}) FROM STDIN WITH DELIMITER AS ',' NULL AS 'null' CSV").format(
......@@ -124,15 +148,7 @@ def upsert_to_db_from_csv(db_conn, csv_file, *,
cursor.copy_expert(sql=copy_query, file=file)
db_conn.commit()
# Update primary table
split_header = [col for col in header.split(
',') if col != primary_key]
set_cols = {col: '.'.join(['excluded', col])
for col in split_header}
rep_colon = re.sub(':', '=', json.dumps(set_cols))
rep_brace = re.sub('{|}', '', rep_colon)
set_strings = re.sub('\.', '"."', rep_brace)
update_columns = [col for col in columns.split(',') if col != primary_key]
update_query = psycopg2.sql.SQL("INSERT INTO {0}.{1} ({2}) SELECT {2} FROM {3}.{4} ON CONFLICT ({5}) DO UPDATE SET {6}").format(
schema,
table,
......@@ -142,7 +158,7 @@ def upsert_to_db_from_csv(db_conn, csv_file, *,
psycopg2.sql.Identifier("pg_temp"),
tmp_table,
psycopg2.sql.Identifier(primary_key),
psycopg2.sql.SQL(set_strings),
psycopg2.sql.SQL(update_set_stmt(update_columns)),
)
cursor.execute(update_query)
logging.debug(update_query.as_string(cursor))
......
......@@ -190,7 +190,6 @@ def schema_apply_column(db_cursor,
if column.is_mapping_key:
constraint = "{table}_{column}_mapping_key".format(table=column.table_name,
column=column.column_name)
stmt = "ALTER TABLE {}.{} ADD CONSTRAINT {} UNIQUE ({})"
sql = psycopg2.sql.SQL(stmt).format(
*identifier,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment