Skip to content
Snippets Groups Projects
Select Git revision
  • 22590-add-is_licensed_user_base_product-to-mart_arr_weekly_with_zero_dollar_charges
  • service_now_init_models
  • master default protected
  • 22819-extraction-new-data-source-request-for-ecosystems-bva-data-via-api-2
  • sort-prep_charge_mrr_daily-lineage
  • 22926-migrate-from-gitlab-dotcom-assignment-tables-to-wk_license_billable_users
  • add-external-tables-for-runner-logs
  • add-field
  • 22847-update-internal-events-namespace-service-ping-to-contain-fewer-metrics
  • add-handbook-claude-role
  • 22122-update-incremental-statement-in-fct_behavior_structured_event_code_suggestion
  • test-py-env
  • 22644-adding-coupa_po-report-table
  • add-zendesk-tickets-dags
  • 22048-add-is_paid_duo-flag-to-wk_mart_behavior_structured_event_ai_gateway_flattened
  • 22820-dbt-new-data-source-request-for-ecosystems-bva-data-via-api
  • lisvinueza-master-patch-90155
  • 23106-2025-03-01-dbt-test-failure-in-test
  • 23157-deprecate-v1-of-mart_crm_opportunity
  • 22774-migrate-combined_pl_mapping-to-common-versions
20 results

google_analytics_four.py

google_analytics_four.py 3.81 KiB
import os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from kubernetes_helpers import get_affinity, get_toleration
from yaml import safe_load, YAMLError
from airflow_utils import (
    clone_and_setup_extraction_cmd,
    DATA_IMAGE,
    DBT_IMAGE,
    dbt_install_deps_nosha_cmd,
    gitlab_defaults,
    gitlab_pod_env_vars,
    slack_failed_task,
    REPO_BASE_PATH,
)

from kube_secrets import (
    GCP_MKTG_GOOG_ANALYTICS4_5E6DC7D6_CREDENTIALS,
    GIT_DATA_TESTS_CONFIG,
    GIT_DATA_TESTS_PRIVATE_KEY,
    MCD_DEFAULT_API_ID,
    MCD_DEFAULT_API_TOKEN,
    SALT_EMAIL,
    SALT_IP,
    SALT_NAME,
    SALT_PASSWORD,
    SALT,
    SNOWFLAKE_ACCOUNT,
    SNOWFLAKE_LOAD_PASSWORD,
    SNOWFLAKE_LOAD_ROLE,
    SNOWFLAKE_LOAD_USER,
    SNOWFLAKE_LOAD_WAREHOUSE,
    SNOWFLAKE_PASSWORD,
    SNOWFLAKE_STATIC_DATABASE,
    SNOWFLAKE_TRANSFORM_ROLE,
    SNOWFLAKE_TRANSFORM_SCHEMA,
    SNOWFLAKE_TRANSFORM_WAREHOUSE,
    SNOWFLAKE_USER,
)

dbt_secrets = [
    GIT_DATA_TESTS_CONFIG,
    GIT_DATA_TESTS_PRIVATE_KEY,
    MCD_DEFAULT_API_ID,
    MCD_DEFAULT_API_TOKEN,
    SALT_EMAIL,
    SALT_IP,
    SALT_NAME,
    SALT_PASSWORD,
    SALT,
    SNOWFLAKE_ACCOUNT,
    SNOWFLAKE_LOAD_PASSWORD,
    SNOWFLAKE_LOAD_ROLE,
    SNOWFLAKE_LOAD_USER,
    SNOWFLAKE_LOAD_WAREHOUSE,
    SNOWFLAKE_PASSWORD,
    SNOWFLAKE_STATIC_DATABASE,
    SNOWFLAKE_TRANSFORM_ROLE,
    SNOWFLAKE_TRANSFORM_SCHEMA,
    SNOWFLAKE_TRANSFORM_WAREHOUSE,
    SNOWFLAKE_USER,
]

env = os.environ.copy()

GIT_BRANCH = env["GIT_BRANCH"]
pod_env_vars = gitlab_pod_env_vars

default_args = {
    "depends_on_past": False,
    "on_failure_callback": slack_failed_task,
    "owner": "airflow",
    "retries": 1,
    "retry_delay": timedelta(minutes=1),
    "sla": timedelta(hours=24),
    "sla_miss_callback": slack_failed_task,
    "start_date": datetime(2023, 12, 15),
}

dag = DAG(
    "el_google_analytics_four",
    default_args=default_args,
    schedule_interval="0 8 * * *",
    concurrency=1,
    catchup=True,
)

external_table_run_cmd = f"""
    {dbt_install_deps_nosha_cmd} &&
    dbt run-operation stage_external_sources \
        --args "select: source google_analytics_4_bigquery" --profiles-dir profile; ret=$?;
"""
dbt_task_name = "dbt-google-analytics-four-external-table-refresh"
dbt_external_table_run = KubernetesPodOperator(
    **gitlab_defaults,
    image=DBT_IMAGE,
    task_id=dbt_task_name,
    trigger_rule="all_done",
    name=dbt_task_name,
    secrets=dbt_secrets,
    env_vars=gitlab_pod_env_vars,
    arguments=[external_table_run_cmd],
    affinity=get_affinity("dbt"),
    tolerations=get_toleration("dbt"),
    dag=dag,
)

spec_file = "bigquery/src/google_analytics_four/bigquery_export.yml"
spec_path = f"{REPO_BASE_PATH}/extract/{spec_file}"

with open(
    spec_path,
    "r",
) as yaml_file:
    try:
        stream = safe_load(yaml_file)
    except YAMLError as exc:
        print(exc)

for export in stream["exports"]:
    export_name = export["name"]
    export_date_str = "{{ yesterday_ds_nodash }}"

    extract_command = f"""
    {clone_and_setup_extraction_cmd} &&
    python bigquery/src/bigquery_export.py \
        --config_path={spec_file} \
        --export_name={export_name}
    """

    task_name = export["name"]

    extract_operator = KubernetesPodOperator(
        **gitlab_defaults,
        image=DATA_IMAGE,
        task_id=export_name,
        name=export_name,
        secrets=[GCP_MKTG_GOOG_ANALYTICS4_5E6DC7D6_CREDENTIALS],
        env_vars={
            **pod_env_vars,
            "EXPORT_DATE": export_date_str,
            "GIT_BRANCH": GIT_BRANCH,
        },
        affinity=get_affinity("extraction"),
        tolerations=get_toleration("extraction"),
        arguments=[extract_command],
        dag=dag,
    )

    extract_operator >> dbt_external_table_run