Commit afd6101b authored by Micaël Bergeron's avatar Micaël Bergeron Committed by Jacob Schatz

Zendesk elt

parents
import psycopg2
import getpass
import os
import datetime
from enum import Enum
from argparse import ArgumentParser
class OptionEnum(Enum):
def __str__(self):
return self.value
def __eq__(self, other):
return self.value == other
def __hash__(self):
return hash(self.value)
class ExportOutput(OptionEnum):
DB = "db"
FILE = "file"
class Password:
DEFAULT = 'PG_PASSWORD environment variable.'
def __init__(self, value):
if value == self.DEFAULT:
value = os.getenv('PG_PASSWORD', None)
if not value:
value = getpass.getpass()
self.value = value
def __str__(self):
return self.value
class DateWindow:
@classmethod
def parse_date(value):
if value is None:
return datetime.date.today()
return datetime.strptime("%Y-%m-%d", value)
def __init__(self, args, formatter=datetime.datetime.isoformat):
self.formatter = formatter
if args.days is not None:
today = datetime.date.today()
days = lambda d: datetime.timedelta(days=d)
# Tomorrow at 00:00:00 UTC
self.end = datetime.datetime.combine(today + days(1),
datetime.time())
# N days ago at 00:00:00 UTC
self.start = datetime.datetime.combine(today - days(args.days),
datetime.time())
else:
self.start = DateWindow.parse_date(args.start)
self.end = DateWindow.parse_date(args.end)
def range(self):
return (self.start, self.end)
def formatted_range(self):
return map(self.formatter, self.range())
def parser_db_conn(parser: ArgumentParser, required=True):
current_user = os.getenv('USER')
parser.add_argument('-S', '--schema', required=required,
help="Database schema to use.")
parser.add_argument('-T', '--table', dest='table_name',
help="Table to import the data to.")
parser.add_argument('-d', '--db', dest='database',
default=os.getenv('PG_DATABASE', current_user),
help="Database to import the data to.")
parser.add_argument('-H', '--host', default=os.getenv('PG_ADDRESS', 'localhost'),
help="Database host address.")
parser.add_argument('-p', '--port', default=os.getenv('PG_PORT', 5432),
help="Database port.")
parser.add_argument('-u', '--user', default=os.getenv('PG_USERNAME', current_user),
help="Specifies the user to connect to the database with.")
parser.add_argument('-W', '--password', type=Password, help='Specify password',
default=Password.DEFAULT)
def parser_date_window(parser: ArgumentParser):
parser.add_argument('--days',
type=int,
help="Specify the number of preceding days from the current time to get incremental records for. Only used for lead records.")
parser.add_argument('-b',
dest="start",
help="The start date in the isoformat of 2018-01-01. This will be formatted properly downstream.")
parser.add_argument('-e',
dest="end",
help="The end date in the isoformat of 2018-02-01. This will be formatted properly downstream.")
def parser_output(parser: ArgumentParser):
parser.add_argument('-o',
dest="output",
type=ExportOutput,
choices=list(ExportOutput),
default=ExportOutput.DB,
help="Specifies the output store for the extracted data.")
parser.add_argument('--nodelete',
action='store_true',
help="If argument is provided, the CSV file generated will not be deleted.")
parser.add_argument('-F', '--output-file',
dest="output_file",
help="Specifies the output to write the output to.")
import psycopg2
import psycopg2.sql
import re
import json
def write_to_db_from_csv(db_conn, csv_file, *,
table_schema,
table_name):
"""
Write to Postgres DB from a CSV
:param db_conn: psycopg2 database connection
:param csv_file: name of CSV that you wish to write to table of same name
:return:
"""
with open(csv_file, 'r') as file:
try:
# Get header row, remove new lines, lowercase
header = next(file).rstrip().lower()
schema = psycopg2.sql.Identifier(table_schema)
table = psycopg2.sql.Identifier(table_name)
cursor = db_conn.cursor()
copy_query = psycopg2.sql.SQL(
"COPY {0}.{1} ({2}) FROM STDIN WITH DELIMITER AS ',' NULL AS 'null' CSV"
).format(
schema,
table,
psycopg2.sql.SQL(', ').join(
psycopg2.sql.Identifier(n) for n in header.split(',')
)
)
print(copy_query.as_string(cursor))
print("Copying file")
cursor.copy_expert(sql=copy_query, file=file)
db_conn.commit()
cursor.close()
except psycopg2.Error as err:
print(err)
def upsert_to_db_from_csv(db_conn, csv_file, *,
primary_key,
table_schema,
table_name):
"""
Upsert to Postgres DB from a CSV
:param db_conn: psycopg2 database connection
:param csv_file: name of CSV that you wish to write to table of same name
:return:
"""
with open(csv_file, 'r') as file:
try:
# Get header row, remove new lines, lowercase
header = next(file).rstrip().lower()
cursor = db_conn.cursor()
schema = psycopg2.sql.Identifier(table_schema)
table = psycopg2.sql.Identifier(table_name)
tmp_table = psycopg2.sql.Identifier(table_name + "_tmp")
# Create temp table
create_table = psycopg2.sql.SQL("CREATE TEMP TABLE {0} AS SELECT * FROM {1}.{2} LIMIT 0").format(
tmp_table,
schema,
table,
)
cursor.execute(create_table)
print(create_table.as_string(cursor))
db_conn.commit()
# Import into TMP Table
copy_query = psycopg2.sql.SQL("COPY {0}.{1} ({2}) FROM STDIN WITH DELIMITER AS ',' NULL AS 'null' CSV").format(
psycopg2.sql.Identifier("pg_temp"),
tmp_table,
psycopg2.sql.SQL(', ').join(
psycopg2.sql.Identifier(n) for n in header.split(','),
),
)
print(copy_query.as_string(cursor))
print("Copying File")
cursor.copy_expert(sql=copy_query, file=file)
db_conn.commit()
# Update primary table
split_header = [col for col in header.split(
',') if col != primary_key]
set_cols = {col: '.'.join(['excluded', col])
for col in split_header}
rep_colon = re.sub(':', '=', json.dumps(set_cols))
rep_brace = re.sub('{|}', '', rep_colon)
set_strings = re.sub('\.', '"."', rep_brace)
update_query = psycopg2.sql.SQL("INSERT INTO {0}.{1} ({2}) SELECT {2} FROM {3}.{4} ON CONFLICT ({5}) DO UPDATE SET {6}").format(
schema,
table,
psycopg2.sql.SQL(', ').join(
psycopg2.sql.Identifier(n) for n in header.split(',')
),
psycopg2.sql.Identifier("pg_temp"),
tmp_table,
psycopg2.sql.Identifier(primary_key),
psycopg2.sql.SQL(set_strings),
)
cursor.execute(update_query)
print(update_query.as_string(cursor))
db_conn.commit()
# Drop temporary table
drop_query = psycopg2.sql.SQL("DROP TABLE {0}.{1}").format(
psycopg2.sql.Identifier("pg_temp"),
tmp_table,
)
print(drop_query.as_string(cursor))
cursor.execute(drop_query)
db_conn.commit()
cursor.close()
except psycopg2.Error as err:
print(err)
import psycopg2
import psycopg2.sql
import psycopg2.extras
from typing import Sequence, Callable, Set
from enum import Enum
from collections import OrderedDict, namedtuple
class SchemaException(Exception):
"""Base exception for schema errors."""
class InapplicableChangeException(SchemaException):
"""Raise for inapplicable schema changes."""
class AggregateException(SchemaException):
"""Aggregate multiple sub-exceptions."""
def __init__(self, exceptions: Sequence[SchemaException]):
self.exceptions = exceptions
class ExceptionAggregator:
def __init__(self, errors=Sequence[Exception]):
self.success = []
self.failures = []
self.errors = errors
def recognize_exception(self, e: Exception) -> bool:
EType = type(e)
return EType in self.errors
def call(self, callable: Callable, *args, **kwargs):
params = (args, kwargs)
try:
ret = callable(*args, **kwargs)
self.success.append(params)
return ret
except Exception as e:
if self.recognize_exception(e):
self.failures.append((e, params))
else:
raise e
def raise_aggregate(self) -> AggregateException:
if len(self.failures):
exceptions = map(lambda f: f[0], self.failures)
raise AggregateException(exceptions)
class DBType(str, Enum):
Date = 'date'
String = 'character varying'
Double = 'real'
Integer = 'integer'
Long = 'bigint'
Boolean = 'boolean'
Timestamp = 'timestamp without time zone'
JSON = 'json'
ArrayOfInteger = Integer + '[]'
ArrayOfLong = Long + '[]'
ArrayOfString = String + '[]'
class SchemaDiff(Enum):
COLUMN_OK = 1
COLUMN_CHANGED = 2
COLUMN_MISSING = 3
TABLE_MISSING = 4
Column = namedtuple('Column', [
'table_schema',
'table_name',
'column_name',
'data_type',
'is_nullable',
'is_mapping_key',
])
class Schema:
def table_key(column: Column):
return column.table_name
def column_key(column: Column):
return (column.table_name, column.column_name)
def __init__(self, name, columns: Sequence[Column] = []):
self.name = name
self.tables = set()
self.columns = OrderedDict()
for column in columns:
self.tables.add(Schema.table_key(column))
self.columns[Schema.column_key(column)] = column
def add_table(self, column: Column):
self.tables.add(Schema.table_key(column))
def column_diff(self, column: Column) -> Set[SchemaDiff]:
table_key = Schema.table_key(column)
column_key = Schema.column_key(column)
if table_key not in self.tables:
return {SchemaDiff.TABLE_MISSING, SchemaDiff.COLUMN_MISSING}
if column_key not in self.columns:
return {SchemaDiff.COLUMN_MISSING}
db_col = self.columns[column_key]
if column.data_type != db_col.data_type \
or column.is_nullable != db_col.is_nullable:
return {SchemaDiff.COLUMN_CHANGED}
return {SchemaDiff.COLUMN_OK}
def db_schema(db_conn, schema_name) -> Schema:
"""
:db_conn: psycopg2 db_connection
:schema: database schema
"""
cursor = db_conn.cursor()
cursor.execute("""
SELECT table_schema, table_name, column_name, data_type, is_nullable = 'YES', NULL as is_mapping_key
FROM information_schema.columns
WHERE table_schema = %s
ORDER BY ordinal_position;
""", (schema_name,))
columns = map(Column._make, cursor.fetchall())
return Schema(schema_name, columns)
def schema_apply(db_conn, target_schema: Schema):
"""
Tries to apply the schema from the Marketo API into
upon the data warehouse.
:db_conn: psycopg2 database connection.
:target_schema: Schema to apply.
Returns True when successful.
"""
schema = db_schema(db_conn, target_schema.name)
results = ExceptionAggregator(errors=[InapplicableChangeException])
schema_cursor = db_conn.cursor()
for name, col in target_schema.columns.items():
results.call(schema_apply_column, schema_cursor, schema, col)
results.raise_aggregate()
# commit if there are no failure
db_conn.commit()
def schema_apply_column(db_cursor, schema: Schema, column: Column) -> Set[SchemaDiff]:
"""
Apply the schema to the current database connection
adapting tables as it goes. Currently only supports
adding new columns.
:cursor: A database connection
:column: the column to apply
"""
diff = schema.column_diff(column)
identifier = (
psycopg2.sql.Identifier(column.table_schema),
psycopg2.sql.Identifier(column.table_name),
)
if SchemaDiff.COLUMN_OK in diff:
print("[{}]: {}".format(column.column_name, diff))
if SchemaDiff.COLUMN_CHANGED in diff:
raise InapplicableChangeException(diff)
if SchemaDiff.TABLE_MISSING in diff:
stmt = "CREATE TABLE {}.{} (__row_id SERIAL PRIMARY KEY)"
sql = psycopg2.sql.SQL(stmt).format(*identifier)
db_cursor.execute(sql)
schema.add_table(column)
if SchemaDiff.COLUMN_MISSING in diff:
stmt = "ALTER TABLE {}.{} ADD COLUMN {} %s"
if not column.is_nullable:
stmt += " NOT NULL"
sql = psycopg2.sql.SQL(stmt % column.data_type).format(
*identifier,
psycopg2.sql.Identifier(column.column_name),
)
db_cursor.execute(sql)
if column.is_mapping_key:
constraint = "mapping_key_{}".format(column.column_name)
stmt = "ALTER TABLE {}.{} ADD CONSTRAINT {} UNIQUE ({})"
sql = psycopg2.sql.SQL(stmt).format(
*identifier,
psycopg2.sql.Identifier(constraint),
psycopg2.sql.Identifier(column.column_name),
)
db_cursor.execute(sql)
return diff
import psycopg2
from functools import reduce
db_config_keys = [
"host",
"port",
"user",
"password",
"database",
]
class db_open:
def __init__(self, **kwargs):
self.config = {k: kwargs[k] for k in db_config_keys}
def __enter__(self, **kwargs):
self.connection = psycopg2.connect(**self.config)
return self.connection
def __exit__(self, type, value, traceback):
self.connection.close()
# from https://github.com/jonathanj/compose/blob/master/compose.py
def compose(*fs):
"""
Create a function composition.
:type *fs: ``iterable`` of 1-argument ``callable``s
:param *fs: Iterable of 1-argument functions to compose, functions will be
applied from last to first, in other words ``compose(f, g)(x) ==
f(g(x))``.
:return: I{callable} taking 1 argument.
"""
return reduce(lambda f, g: lambda x: f(g(x)), fs, lambda x: x)
#!/usr/bin/env python
from distutils.core import setup
setup(name='bizops-elt-common',
version='1.0',
description='BizOps shared modules.',
author='Micael Bergeron',
author_email='mbergeron@gitlab.com',
url='https://gitlab.com/bizops/bizops',
packages=['elt'],
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment