initial implementation

parents
*/__pycache__
*/config/environment.conf
*/config/dbconfig.py
*/config/mailcfg.py
*/data-integration/system/karaf/caches/*
*.log
*.pyc
*.DS_Store
*.idea
*.csv
tmp_log.txt
.env
.vscode/
*.egg-info/
\ No newline at end of file
# This file is a template, and might need editing before it works on your project.
# Official language image. Look for the different tagged releases at:
# https://hub.docker.com/r/library/python/tags/
image: python:latest
# Change pip's cache directory to be inside the project directory since we can
# only cache local items.
variables:
PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache"
# Pip's cache doesn't store the python packages
# https://pip.pypa.io/en/stable/reference/pip_install/#caching
#
# If you want to also cache the installed packages, you have to install
# them in a virtualenv and cache it as well.
cache:
paths:
- .cache/pip
- venv/
before_script:
- pip install pipenv
- pipenv run python --version
- pipenv run pip install cython
- pipenv install --dev
- pipenv run pip install -e .
lint:
stage: test
script:
- pipenv run flake8
allow_failure: true
test:
script:
- pipenv run pytest
allow_failure: true
.run:
script:
- python setup.py bdist_wheel
# an alternative approach is to install and run:
- pip install dist/*
# run the command here
artifacts:
paths:
- dist/*.whl
.pages:
script:
- pip install sphinx sphinx-rtd-theme
- cd doc ; make html
- mv build/html/ ../public/
artifacts:
paths:
- public
only:
- master
publish:
stage: deploy
before_script:
- pip install twine
script:
- python setup.py sdist
- twine upload dist/*
only:
- master
when: manual
FROM registry.gitlab.com/meltano/meltano-load-postgresql
ADD . /app/extractor/fastly
RUN pip install /app/extractor/fastly
CMD meltano extract fastly | meltano load postgresql
[[source]]
url = "https://pypi.python.org/simple"
verify_ssl = true
name = "pypi"
[packages]
SQLAlchemy = "*"
"aiohttp" = "*"
"psycopg2" = "==2.7.4"
"meltano-common" = ">=0.3.0-dev"
[dev-packages]
"flake8" = "*"
"autopep8" = "*"
pytest = "*"
[requires]
python_version = "3.6"
billing:
attributes:
- alias: invoice_id
input:
name invoice_id
type: string
metadata:
is_pkey: yes
- alias: customer_id
input:
name: customer_id
type: string
"end_time",
"line_items",
"start_time",
"status_status",
"total_bandwidth",
"total_bandwidth_cost",
"total_bandwidth_units",
"total_commit_shortfall",
"total_cost",
"total_cost_before_discount",
"total_discount",
"total_discount_amount",
"total_extras",
"total_extras_cost",
"total_incurred_cost",
"total_overage",
"total_percentile",
"total_percentile_cost",
"total_requests",
"total_requests_cost"
import asyncio
import sys
import argparse
import logging
from meltano.common.service import MeltanoService
from meltano.common.utils import setup_logging, setup_db
from meltano.common.cli import parser_db_conn, parser_logging
from meltano.schema import Schema
from meltano.stream import MeltanoStream
# Note:
# In the meltano runner, this should be a dynamic import
import meltano.load.postgresql
def parse():
parser = argparse.ArgumentParser(
description="Load data from stdin using PostgreSQL")
parser_db_conn(parser)
parser_logging(parser)
parser.add_argument('manifest_file',
type=str,
help=("Manifest file to load."))
return parser.parse_args()
def main(args):
service = MeltanoService()
# schema = service.load_schema(args.schema, args.manifest_file)
schema = Schema(args.schema, [])
# hardcode the schema at the moment, but this should be discovered at some point
stream = MeltanoStream(sys.stdin.fileno(), schema=schema)
loader = service.create_loader("com.meltano.load.postgresql", stream)
logging.info("Waiting for data...")
loader.receive()
logging.info("Integration complete.")
if __name__ == '__main__':
args = parse()
setup_logging(args)
setup_db(args)
main(args)
from meltano.common.service import MeltanoService
from .extractor import FastlyExtractor
MeltanoService.register_extractor("com.meltano.extract.fastly", FastlyExtractor)
import asyncio
import sys
import argparse
import logging
from meltano.common.service import MeltanoService
from meltano.common.utils import setup_logging, setup_db
from meltano.common.cli import parser_db_conn, parser_logging
from meltano.schema import Schema
from meltano.stream import MeltanoStream
from .loader import PostgreSQLLoader
def parse():
parser = argparse.ArgumentParser(
description="Load data from stdin using PostgreSQL")
parser_db_conn(parser)
parser_logging(parser)
parser.add_argument('manifest_file',
type=str,
help=("Manifest file to load."))
return parser.parse_args()
def main(args):
service = MeltanoService()
# schema = service.load_schema(args.schema, args.manifest_file)
schema = Schema(args.schema, [])
# hardcode the schema at the moment, but this should be discovered at some point
stream = MeltanoStream(sys.stdin.fileno(), schema=schema)
loader = service.create_loader("com.meltano.load.postgresql", stream)
logging.info("Waiting for data...")
loader.receive()
logging.info("Integration complete.")
if __name__ == '__main__':
args = parse()
setup_logging(args)
setup_db(args)
main(args)
import io
import os
import logging
import pandas as pd
import numpy as np
import json
import aiohttp
import asyncio
import itertools
from pandas.io.json import json_normalize
from meltano.schema import DBType
from meltano.extract.base import MeltanoExtractor
from meltano.common.transform import columnify
from meltano.common.service import MeltanoService
from meltano.common.entity import Entity, Attribute, TransientAttribute
URL = "https://api.fastly.com/"
# TODO: refactor to utils
pandas_to_dbtype = {
np.dtype('object'): DBType.String,
np.dtype('float64'): DBType.Double,
np.dtype('float32'): DBType.Double,
np.dtype('int64'): DBType.Long,
np.dtype('int32'): DBType.Integer,
np.dtype('bool'): DBType.Boolean,
np.dtype('int8'): DBType.Integer,
np.dtype('datetime64'): DBType.Timestamp
}
# TODO: refactor to utils
def df_to_entity(alias, df):
"""
Infer an Entity from a DataFrame
"""
attrs = []
for column, dtype in df.dtypes.items():
converted_type = pandas_to_dbtype[dtype]
input = TransientAttribute(column, converted_type.value)
output = TransientAttribute(columnify(column), converted_type.value)
attrs.append(
Attribute(column, input, output)
)
return Entity(alias, attributes=attrs)
class FastlyExtractor(MeltanoExtractor):
"""
Extractor for the Fastly Billing API
"""
def create_session(self):
headers = {
'Fastly-Key': os.getenv("FASTLY_API_TOKEN"),
'Accept': "application/json"
}
session = aiohttp.ClientSession(headers=headers)
return session
def url(self, endpoint):
return "".join((URL, endpoint))
# TODO: refactor this out in a HTTP loader
async def req(self, session, endpoint, payload={}):
url = self.url(endpoint)
async with session.get(url) as resp:
logging.debug("API {}:{}".format(url, resp.status))
if resp.status != 200:
raise resp.status
return json_normalize(await resp.json())
# TODO: refactor this out in a discovery component
async def entities(self):
"""
Generates a list of Entity object for entity auto-discovery
"""
async with self.create_session() as session:
billing = await self.req(session, "billing/v2/year/2018/month/06")
# import pdb; pdb.set_trace()
yield df_to_entity("Billing", billing)
def discover_entities(self):
async def drain(generator):
results = []
async for result in generator:
import pdb; pdb.set_trace()
return results
loop = asyncio.get_event_loop()
entities = loop.run_until_complete(
drain(self.entities())
)
return entities
async def extract(self, entity):
async with self.create_session() as session:
for year, month in itertools.product(range(2017, 2018),
range(1, 12)):
url = "billing/v2/year/{:04}/month/{:02}".format(year, month)
try:
yield await self.req(session, url)
except Exception as err:
logging.error(err)
#!/usr/bin/env python
from distutils.core import setup
setup(name='meltano-extract-fastly',
version='0.1.0-dev0',
description='Meltano extractor for Fastly.',
author='Meltano Team & Contributor',
author_email='meltano@gitlab.com',
url='https://gitlab.com/meltano/meltano',
packages=['meltano.extract.fastly'],
install_requires=[
"aiohttp",
"SQLAlchemy",
"psycopg2>=2.7.4",
"meltano-common"
]
)
test_table:
whitelisted:
- id: integer
- age: integer
- name: character varying
billing:
whitelisted:
- customer_id: integer
- line_items: json
import pyarrow as pa
import pandas as pd
import json
import sys
import os
d = {
'id': range(200),
'name': ["John", "Steve"] * 100,
'age': [43, 33] * 100
}
# gather the source data in the DataFrame
df = pd.DataFrame(data=d)
# should be constructed from a elt.schema.Schema
schema = pa.schema([
pa.field('name', pa.string()),
pa.field('age', pa.int32()),
])
# convert it to a pyarrow.Table
table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
# very important, we need to know the source_table
table = table.replace_schema_metadata(metadata={
'meltano': json.dumps({
'entity_name': "project",
'jobid': "8857"
})
})
# write to stdout
sink = os.fdopen(sys.stdout.fileno(), "wb")
#sink = open("test.arrow", "wb")
#sink = pa.BufferOutputStream()
writer = pa.RecordBatchStreamWriter(sink, table.schema)
# manage chunking
chunk_size = 5
for batch in table.to_batches():
writer.write_batch(batch)
writer.close()
sink.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment