Commit 49514a76 authored by Thomas La Piana's avatar Thomas La Piana Committed by Taylor A Murphy, PhD

Resolve "Airflow Env Var bug"

parent 895af46e
# This file contains common operators/functions to be used across multiple DAGs
import functools
""" This file contains common operators/functions to be used across multiple DAGs """
import os
from airflow.operators.slack_operator import SlackAPIPostOperator
......@@ -64,3 +63,17 @@ gitlab_defaults = dict(
is_delete_operator_pod=True,
namespace=os.environ["NAMESPACE"],
)
# GitLab default environment variables for worker pods
env = os.environ.copy()
GIT_BRANCH = env["GIT_BRANCH"]
gitlab_pod_env_vars = {
"CI_PROJECT_DIR": "/analytics",
"EXECUTION_DATE": "{{ next_execution_date }}",
"SNOWFLAKE_LOAD_DATABASE": "RAW"
if GIT_BRANCH == "master"
else f"{GIT_BRANCH.upper()}_RAW",
"SNOWFLAKE_TRANSFORM_DATABASE": "ANALYTICS"
if GIT_BRANCH == "master"
else f"{GIT_BRANCH.upper()}_ANALYTICS",
}
......@@ -6,21 +6,14 @@ from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from kube_secrets import *
from airflow_utils import slack_failed_task, gitlab_defaults
from airflow_utils import slack_failed_task, gitlab_defaults, gitlab_pod_env_vars
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
# Load the env vars into a dict and set Secrets
env = os.environ.copy()
GIT_BRANCH = env["GIT_BRANCH"]
pod_env_vars = {
"SNOWFLAKE_LOAD_DATABASE": "RAW"
if GIT_BRANCH == "master"
else f"{GIT_BRANCH.upper()}_RAW",
"SNOWFLAKE_TRANSFORM_DATABASE": "ANALYTICS"
if GIT_BRANCH == "master"
else f"{GIT_BRANCH.upper()}_ANALYTICS",
}
pod_env_vars = {**gitlab_pod_env_vars, **{}}
# Default arguments for the DAG
default_args = {
......
......@@ -4,19 +4,14 @@ from datetime import datetime, timedelta
from airflow import DAG
from kube_secrets import *
from airflow_utils import slack_failed_task, gitlab_defaults
from airflow_utils import slack_failed_task, gitlab_defaults, gitlab_pod_env_vars
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
# Load the env vars into a dict and set Secrets
env = os.environ.copy()
GIT_BRANCH = env["GIT_BRANCH"]
pod_env_vars = {
"SNOWFLAKE_LOAD_DATABASE": "RAW" if GIT_BRANCH == "master" else f"{GIT_BRANCH}_RAW",
"SNOWFLAKE_TRANSFORM_DATABASE": "ANALYTICS"
if GIT_BRANCH == "master"
else f"{GIT_BRANCH}_ANALYTICS",
}
pod_env_vars = {**gitlab_pod_env_vars, **{}}
# Default arguments for the DAG
default_args = {
......
......@@ -4,19 +4,14 @@ from datetime import datetime, timedelta
from airflow import DAG
from kube_secrets import *
from airflow_utils import slack_failed_task, gitlab_defaults
from airflow_utils import slack_failed_task, gitlab_defaults, gitlab_pod_env_vars
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
# Load the env vars into a dict and set Secrets
env = os.environ.copy()
GIT_BRANCH = env["GIT_BRANCH"]
pod_env_vars = {
"SNOWFLAKE_LOAD_DATABASE": "RAW" if GIT_BRANCH == "master" else f"{GIT_BRANCH}_RAW",
"SNOWFLAKE_TRANSFORM_DATABASE": "ANALYTICS"
if GIT_BRANCH == "master"
else f"{GIT_BRANCH}_ANALYTICS",
}
pod_env_vars = {**gitlab_pod_env_vars, **{}}
# Default arguments for the DAG
default_args = {
......
......@@ -4,19 +4,14 @@ from datetime import datetime, timedelta
from airflow import DAG
from kube_secrets import *
from airflow_utils import slack_failed_task, gitlab_defaults
from airflow_utils import slack_failed_task, gitlab_defaults, gitlab_pod_env_vars
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
# Load the env vars into a dict and set Secrets
env = os.environ.copy()
GIT_BRANCH = env["GIT_BRANCH"]
pod_env_vars = {
"SNOWFLAKE_LOAD_DATABASE": "RAW" if GIT_BRANCH == "master" else f"{GIT_BRANCH}_RAW",
"SNOWFLAKE_TRANSFORM_DATABASE": "ANALYTICS"
if GIT_BRANCH == "master"
else f"{GIT_BRANCH}_ANALYTICS",
}
pod_env_vars = {**gitlab_pod_env_vars, **{}}
# Default arguments for the DAG
default_args = {
......
import os
from datetime import datetime, timedelta
from datetime import datetime
from airflow import DAG
from kube_secrets import *
from airflow_utils import slack_failed_task, gitlab_defaults
from airflow_utils import slack_failed_task, gitlab_defaults, gitlab_pod_env_vars
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
# Load the env vars into a dict and set Secrets
env = os.environ.copy()
GIT_BRANCH = env["GIT_BRANCH"]
pod_env_vars = {
"SNOWFLAKE_LOAD_DATABASE": "RAW" if GIT_BRANCH == "master" else f"{GIT_BRANCH}_RAW",
"SNOWFLAKE_TRANSFORM_DATABASE": "ANALYTICS"
if GIT_BRANCH == "master"
else f"{GIT_BRANCH}_ANALYTICS",
}
pod_env_vars = {**gitlab_pod_env_vars, **{}}
# Default arguments for the DAG
default_args = {
......
......@@ -4,21 +4,13 @@ from datetime import datetime, timedelta
from airflow import DAG
from kube_secrets import *
from airflow_utils import slack_failed_task, gitlab_defaults
from airflow_utils import slack_failed_task, gitlab_defaults, gitlab_pod_env_vars
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
# Load the env vars into a dict and set Secrets
env = os.environ.copy()
GIT_BRANCH = env["GIT_BRANCH"]
pod_env_vars = {
"SNOWFLAKE_LOAD_DATABASE": "RAW"
if GIT_BRANCH == "master"
else f"{GIT_BRANCH.upper()}_RAW",
"SNOWFLAKE_TRANSFORM_DATABASE": "ANALYTICS"
if GIT_BRANCH == "master"
else f"{GIT_BRANCH.upper()}_ANALYTICS",
"CI_PROJECT_DIR": "/analytics",
}
pod_env_vars = {**gitlab_pod_env_vars, **{}}
# Default arguments for the DAG
default_args = {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment