Verified Commit c62511d3 authored by Micaël Bergeron's avatar Micaël Bergeron

Merge remote-tracking branch 'fastly/dev' into 0.3.0-dev

parents 36e02674 2c1a9194
# This file is a template, and might need editing before it works on your project.
# Official language image. Look for the different tagged releases at:
image: python:latest
# Change pip's cache directory to be inside the project directory since we can
# only cache local items.
# Pip's cache doesn't store the python packages
# If you want to also cache the installed packages, you have to install
# them in a virtualenv and cache it as well.
- .cache/pip
url = ""
url = ""
verify_ssl = true
name = "pypi"
......@@ -9,3 +9,21 @@ It currently exposes the `meltano` module with the following features:
- `meltano.utils`: Miscellaneous utilites
Issues are tracked at
# Meltano Fastly extractor
## Configuration
FASTLY_API_TOKEN: the API token from Fastly. It should have `Billing` access and `global:read` scope.
## Usage
> Note: the `meltano` package will be needed to interact with this package.
$ pip install meltano
$ pip install -e .
$ FASTLY_API_TOKEN=<your_api_token> meltano extract fastly | meltano load json
ADD . /app/extractor/fastly
RUN pip install /app/extractor/fastly
CMD meltano extract fastly | meltano load postgresql
<<<<<<< HEAD
ADD . /app/loader/postgresql
RUN pip install /app/loader/postgresql
ADD . /app/extractor/fastly
RUN pip install /app/extractor/fastly
CMD meltano extract fastly | meltano load postgresql
>>>>>>> fastly/dev
This diff is collapsed.
from meltano.common.service import MeltanoService
from .extractor import FastlyExtractor
MeltanoService.register_extractor("com.meltano.extract.fastly", FastlyExtractor)
import asyncio
import sys
import argparse
import logging
from meltano.common.service import MeltanoService
from meltano.common.utils import setup_logging, setup_db
from meltano.common.cli import parser_db_conn, parser_logging
from meltano.schema import Schema
from import MeltanoStream
from .loader import PostgreSQLLoader
def parse():
parser = argparse.ArgumentParser(
description="Load data from stdin using PostgreSQL")
help=("Manifest file to load."))
return parser.parse_args()
def main(args):
service = MeltanoService()
# schema = service.load_schema(args.schema, args.manifest_file)
schema = Schema(args.schema, [])
# hardcode the schema at the moment, but this should be discovered at some point
stream = MeltanoStream(sys.stdin.fileno(), schema=schema)
loader = service.create_loader("com.meltano.load.postgresql", stream)"Waiting for data...")
loader.receive()"Integration complete.")
if __name__ == '__main__':
args = parse()
import io
import os
import logging
import pandas as pd
import numpy as np
import json
import aiohttp
import asyncio
import itertools
from import json_normalize
from meltano.schema import DBType
from meltano.extract.base import MeltanoExtractor
from meltano.common.transform import columnify
from meltano.common.service import MeltanoService
from meltano.common.entity import Entity, Attribute, TransientAttribute
URL = ""
# TODO: refactor to utils
pandas_to_dbtype = {
np.dtype('object'): DBType.String,
np.dtype('float64'): DBType.Double,
np.dtype('float32'): DBType.Double,
np.dtype('int64'): DBType.Long,
np.dtype('int32'): DBType.Integer,
np.dtype('bool'): DBType.Boolean,
np.dtype('int8'): DBType.Integer,
np.dtype('datetime64'): DBType.Timestamp
# TODO: refactor to utils
def df_to_entity(alias, df):
Infer an Entity from a DataFrame
attrs = []
for column, dtype in df.dtypes.items():
converted_type = pandas_to_dbtype[dtype]
input = TransientAttribute(column, converted_type.value)
output = TransientAttribute(columnify(column), converted_type.value)
Attribute(column, input, output)
return Entity(alias, attributes=attrs)
class FastlyExtractor(MeltanoExtractor):
Extractor for the Fastly Billing API
source_name = "fastly"
def create_session(self):
headers = {
'Fastly-Key': os.getenv("FASTLY_API_TOKEN"),
'Accept': "application/json"
session = aiohttp.ClientSession(headers=headers)
return session
def url(self, endpoint):
return "".join((URL, endpoint))
# TODO: refactor this out in a HTTP loader
async def req(self, session, endpoint, payload={}):
url = self.url(endpoint)
async with session.get(url) as resp:
logging.debug("API {}:{}".format(url, resp.status))
if resp.status != 200:
raise resp.status
return json_normalize(await resp.json())
# TODO: refactor this out in a discovery component
async def entities(self):
Generates a list of Entity object for entity auto-discovery
async with self.create_session() as session:
billing = await self.req(session, "billing/v2/year/2018/month/06")
# import pdb; pdb.set_trace()
yield df_to_entity("Billing", billing)
def discover_entities(self):
async def drain(generator):
results = []
async for result in generator:
import pdb; pdb.set_trace()
return results
loop = asyncio.get_event_loop()
entities = loop.run_until_complete(
return entities
async def extract(self, entity):
async with self.create_session() as session:
for year, month in itertools.product(range(2017, 2018),
range(1, 12)):
url = "billing/v2/year/{:04}/month/{:02}".format(year, month)
yield await self.req(session, url)
except Exception as err:
......@@ -25,6 +25,7 @@ setup(
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment