Verified Commit 062d39ca authored by Micaël Bergeron's avatar Micaël Bergeron

Merge remote-tracking branch 'cli/docker-gitlab-ci' into monorepo

parents da8c6877 588e6f4a
This diff is collapsed.
......@@ -2,24 +2,28 @@ import click
import importlib
import sys
import logging
import functools
from meltano.common.service import MeltanoService
from meltano.common.manifest_reader import ManifestReader
from meltano.common.manifest_writer import ManifestWriter
from meltano.common.manifest import Manifest
from meltano.common.db import DB
from meltano.common.utils import setup_db, pop_all
from meltano.schema import schema_apply
from meltano.stream import MeltanoStream
service = MeltanoService()
# print(service.auto_discover())
def load_extractor(name):
def build_extractor(name):
# this should register the module
try:
importlib.import_module("meltano.extract.{}".format(name))
except ImportError as e:
logging.error("Cannot find the loader {0}, you might need to install it (meltano-load-{0})".format(name))
logging.error("Cannot find the extractor {0}, you might need to install it (meltano-extract-{0})".format(name))
schema = None
stream = MeltanoStream(sys.stdout.fileno())
......@@ -27,6 +31,65 @@ def load_extractor(name):
return extractor
def build_loader(name):
# this should register the module
try:
importlib.import_module("meltano.load.{}".format(name))
except ImportError as e:
logging.error("Cannot find the loader {0}, you might need to install it (meltano-load-{0})".format(name))
schema = None
stream = MeltanoStream(sys.stdin.fileno())
loader = service.create_loader("com.meltano.load.{}".format(name), stream.create_reader())
return loader
def build_manifest(source, manifest_path):
with open(manifest_path, 'r') as file:
return ManifestReader(source).load(file)
def register_manifest(manifest):
for id in service.register_manifest(manifest):
logging.debug("Registered entity {}".format(id))
def db_options(func):
@click.option('-S', '--schema',
required=True)
@click.option('-H', '--host',
envvar='PG_ADDRESS',
default='localhost',
help="Database schema to use.")
@click.option('-p', '--port',
type=int,
envvar='PG_PORT',
default=5432)
@click.option('-d', '-db', 'database',
envvar='PG_DATABASE',
default=lambda: os.getenv('USER', ''),
help="Database to import the data to.")
@click.option('-T', '--table', 'table_name',
help="Table to import the data to.")
@click.option('-u', '--user',
envvar='PG_USERNAME',
default=lambda: os.getenv('USER', ''),
help="Specifies the user to connect to the database with.")
@click.password_option(envvar='PG_PASSWORD')
@functools.wraps(func)
def wrapper(*args, **kwargs):
import pdb; pdb.set_trace()
config = pop_all(("schema", "host", "port", "database", "table_name", "user", "password"), kwargs)
DB.setup(**config)
return func(*args, **kwargs)
return wrapper
@click.group()
def root():
pass
......@@ -35,32 +98,31 @@ def root():
@root.command()
@click.argument('manifest')
def describe(manifest):
reader = ManifestReader('describe')
with open(manifest, 'r') as file:
manifest = reader.load(file)
print(manifest)
manifest = build_manifest('describe', manifest)
print(manifest)
@root.command()
@click.argument('extractor')
@click.argument('output')
def discover(extractor, output):
extractor_name = extractor
extractor = load_extractor(extractor_name)
import pdb; pdb.set_trace()
entities = extractor.discover_entities()
manifest = Manifest('test', entities=entities)
@click.argument('source_name')
def discover(extractor, source_name):
extractor = build_extractor(extractor)
manifest = Manifest(source_name,
entities=extractor.discover_entities())
with open(output + ".yaml", 'w') as out:
with open(source_name + ".yaml", 'w') as out:
writer = ManifestWriter(out)
writer.write(manifest)
@root.command()
@click.argument('extractor')
def extract(extractor):
extractor = load_extractor(extractor)
@click.argument('manifest')
def extract(extractor, manifest):
source, _ = manifest.split(".")
register_manifest(build_manifest(source, manifest))
extractor = build_extractor(extractor)
logging.info("Extracting data...")
extractor.run()
......@@ -69,22 +131,29 @@ def extract(extractor):
@root.command()
@click.argument('loader')
def load(loader):
# this should register the module
try:
importlib.import_module("meltano.load.{}".format(loader))
except ImportError as e:
logging.error("Cannot find the loader {0}, you might need to install it (meltano-load-{0})".format(loader))
@click.argument('manifest')
def load(loader, manifest):
source, _ = manifest.split(".")
schema = None
stream = MeltanoStream(sys.stdin.fileno())
loader = service.create_loader("com.meltano.load.{}".format(loader), stream.create_reader())
register_manifest(build_manifest(source, manifest))
loader = build_loader(loader)
logging.info("Waiting for data...")
loader.run()
logging.info("Integration complete.")
@root.command()
@db_options
@click.argument('manifest')
def apply_schema(manifest):
source, _ = manifest.split(".")
manifest = build_manifest(source, manifest)
with DB.open() as db:
schema_apply(db, manifest.as_schema())
def main():
logging.basicConfig(level=logging.DEBUG)
root()
version: '1.0'
person:
attributes:
- alias: product_id
input:
name: "Product ID"
type: string
output:
name: product_id
type: string
metadata:
is_pkey: False
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment