meltano.py 2.15 KB
Newer Older
Douwe Maan's avatar
Douwe Maan committed
1 2 3 4 5 6
# If you want to define a custom DAG, create
# a new file under orchestrate/dags/ and Airflow
# will pick it up automatically.

import os
import logging
7 8 9
import subprocess
import json

Douwe Maan's avatar
Douwe Maan committed
10 11
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
12
from datetime import timedelta
Douwe Maan's avatar
Douwe Maan committed
13

14
DEFAULT_ARGS = {
Douwe Maan's avatar
Douwe Maan committed
15 16 17 18 19 20 21 22 23 24
    "owner": "airflow",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "catchup": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    "concurrency": 1,
}

25 26 27 28 29 30 31 32
project_root = os.getenv("MELTANO_PROJECT_ROOT", os.getcwd())

result = subprocess.run(
    [".meltano/run/bin", "schedule", "list", "--format=json"],
    cwd=project_root,
    stdout=subprocess.PIPE,
    universal_newlines=True,
    check=True,
Douwe Maan's avatar
Douwe Maan committed
33
)
34
schedules = json.loads(result.stdout)
Douwe Maan's avatar
Douwe Maan committed
35

36 37
for schedule in schedules:
    logging.info(f"Considering schedule '{schedule['name']}': {schedule}")
Douwe Maan's avatar
Douwe Maan committed
38

39
    if not schedule["cron_interval"]:
Douwe Maan's avatar
Douwe Maan committed
40
        logging.info(
41
            f"No DAG created for schedule '{schedule['name']}' because its interval is set to `@once`."
Douwe Maan's avatar
Douwe Maan committed
42 43 44
        )
        continue

45 46 47
    args = DEFAULT_ARGS.copy()
    if schedule["start_date"]:
        args["start_date"] = schedule["start_date"]
Douwe Maan's avatar
Douwe Maan committed
48

49
    dag_id = f"meltano_{schedule['name']}"
Douwe Maan's avatar
Douwe Maan committed
50 51 52 53 54 55 56 57 58

    # from https://airflow.apache.org/docs/stable/scheduler.html#backfill-and-catchup
    #
    # It is crucial to set `catchup` to False so that Airflow only create a single job
    # at the tail end of date window we want to extract data.
    #
    # Because our extractors do not support date-window extraction, it serves no
    # purpose to enqueue date-chunked jobs for complete extraction window.
    dag = DAG(
59
        dag_id, catchup=False, default_args=args, schedule_interval=schedule["interval"]
Douwe Maan's avatar
Douwe Maan committed
60 61 62 63
    )

    elt = BashOperator(
        task_id="extract_load",
64
        bash_command=f"cd {project_root}; .meltano/run/bin elt {' '.join(schedule['elt_args'])}",
Douwe Maan's avatar
Douwe Maan committed
65 66 67 68
        dag=dag,
        env={
            # inherit the current env
            **os.environ,
69
            **schedule["env"],
Douwe Maan's avatar
Douwe Maan committed
70 71 72 73 74
        },
    )

    # register the dag
    globals()[dag_id] = dag
75 76

    logging.info(f"DAG created for schedule '{schedule['name']}'")