Verified Commit 349caf32 authored by Douwe Maan's avatar Douwe Maan
Browse files

Remove Airflow webserver but keep scheduler

parent 6734e441
Pipeline #105068206 passed with stages
in 14 minutes and 19 seconds
......@@ -23,8 +23,5 @@ WORKDIR /project
# meltano ui
EXPOSE 5000
# airflow ui
EXPOSE 5010
ENTRYPOINT ["meltano"]
CMD ["ui"]
\ No newline at end of file
......@@ -76,7 +76,6 @@ In this section, we will be going over how you can deploy a Meltano Docker image
- **Memory Limits (MiB)**: Soft limit 1024
- **Port mappings**:
- 5000/tcp (meltano)
- 5010/tcp (airflow)
1. Click `Update` button to finish setting up your container defintion
1. Click `Edit` next to the _Task defintion_ heading
......
......@@ -98,25 +98,14 @@ For more information on how to get these from your GitLab application, check out
## Service Listen Configuration
By default, the API and Airflow services listen with following host/port combinations.
By default, the API service listens with following host/port combination.
API: `http://0.0.0.0:5000`
Airflow: `http://0.0.0.0:5010`
To change the host/port configuration on which the API or Airflow server listen, update your `.env` in your project directory with the following configuration:
:::warning Airflow integration
Meltano expects Airflow to be available on the **same host** it is currently exposed from.
Meltano doesn't support connecting to a remote Airflow instance.
:::
To change the host/port configuration on which the API server listens, update your `.env` in your project directory with the following configuration:
```bash
# Meltano API configuration
export MELTANO_API_HOSTNAME="0.0.0.0"
export MELTANO_API_PORT="5000"
# Airflow configuration
export AIRFLOW__WEBSERVER__WEB_SERVER_HOST="0.0.0.0"
export AIRFLOW__WEBSERVER__WEB_SERVER_PORT="5010"
```
......@@ -101,7 +101,6 @@ In this section, we will be going over how you can deploy a Meltano Docker image
- **Memory Limits (MiB)**: Soft limit 1024
- **Port mappings**:
- 5000/tcp (meltano)
- 5010/tcp (airflow)
1. Click `Update` button to finish setting up your container defintion
1. Click `Edit` next to the _Task defintion_ heading
......
......@@ -4,7 +4,7 @@ If you're using SaaS tools to manage support, sales, marketing, revenue and othe
When a new pipeline schedule is created using the [UI](/docs/getting-started.html#create-a-pipeline-schedule) or [CLI](/docs/command-line-interface.html#schedule), a [DAG](https://airflow.apache.org/concepts.html#dags) is automatically created in Airflow as well, which represents "a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies".
Airflow is automatically installed when the Meltano UI is launched for the first time, and automatically runs in the background while Meltano UI is running.
Airflow is automatically installed when the Meltano UI is launched for the first time, and the scheduler automatically runs in the background while Meltano UI is running.
### Installing Airflow
......@@ -32,11 +32,15 @@ Example:
meltano schedule carbon__sqlite tap-carbon-intensity target-sqlite @daily
```
Now that you've scheduled your first DAG, you can load the "Pipeline" page in the UI and see it show up. You can also refresh the "Orchestrate" page and see your DAG show up in the Airflow interface.
Now that you've scheduled your first DAG, you can load the "Pipeline" page in the UI and see it show up.
### Using Airflow directly
You are free to interact with Airflow directly through the "Orchestrate" page in the Meltano UI.
You are free to interact with Airflow directly through its own UI. You can start the web like this:
```bash
meltano invoke airflow webserver -D
```
By default, you'll only see Meltano's pipeline DAGs here, which are created automatically using the dynamic DAG generator included with every Meltano project, located at `orchestrate/dags/meltano.py`.
......@@ -60,7 +64,7 @@ Manually trigger a task to run:
meltano invoke airflow run --raw meltano extract_load $(date -I)
```
Start the Airflow UI, if you're not already running Meltano UI: (will start in a separate browser)
Start the Airflow UI: (will start in a separate browser)
```bash
meltano invoke airflow webserver -D
......
......@@ -26,41 +26,42 @@ class AirflowWorker(threading.Thread):
project, config_service=self.config_service
)
self._plugin = None
self._webserver = None
self._scheduler = None
self.pid_file = PIDFile(self.project.run_dir("airflow", "scheduler.pid"))
def kill_stale_workers(self):
stale_workers = []
workers_pid_files = map(self.pid_file, ("webserver", "scheduler"))
for pid_file in workers_pid_files:
try:
stale_workers.append(pid_file.process)
except UnknownProcessError:
pass
def on_terminate(process):
logging.info(f"Process {process} ended with exit code {process.returncode}")
process = None
try:
process = self.pid_file.process
except UnknownProcessError:
pass
for process in stale_workers:
logging.debug(f"Process {process} is stale, terminating it.")
if process is not None:
logging.debug(
f"Process {process} is running, possibly stale, terminating it."
)
process.terminate()
gone, alive = psutil.wait_procs(stale_workers, timeout=5, callback=on_terminate)
def on_terminate(process):
logging.info(
f"Process {process} ended with exit code {process.returncode}"
)
_gone, alive = psutil.wait_procs(
[process], timeout=5, callback=on_terminate
)
# kill the rest
for process in alive:
process.kill()
# kill the rest
for process in alive:
process.kill()
for pid_file in workers_pid_files:
try:
pid_file.unlink()
except:
pass
try:
self.pid_file.unlink()
except:
pass
def start_all(self):
_, Session = project_engine(self.project)
logs_dir = self.project.run_dir("airflow", "logs")
logs_path = self.project.run_dir("airflow", "logs", "scheduler.log")
try:
session = Session()
......@@ -68,25 +69,14 @@ class AirflowWorker(threading.Thread):
self.project, self._plugin, prepare_with_session=session
)
# fmt: off
with logs_dir.joinpath("webserver.log").open("w") as webserver, \
logs_dir.joinpath("scheduler.log").open("w") as scheduler:
self._webserver = invoker.invoke("webserver", "-w", "1", stdout=webserver)
self._scheduler = invoker.invoke("scheduler", stdout=scheduler)
self.pid_file("webserver").write_pid(self._webserver.pid)
self.pid_file("scheduler").write_pid(self._scheduler.pid)
# fmt: on
# Time padding for server initialization so UI iframe displays as expected
# (iteration potential on approach but following UIAvailableWorker sleep approach)
time.sleep(2)
with logs_path.open("w") as logs_file:
scheduler = invoker.invoke(
"scheduler", "--pid", str(self.pid_file), stdout=logs_file
)
self.pid_file.write_pid(scheduler.pid)
finally:
session.close()
def pid_file(self, name) -> PIDFile:
return PIDFile(self.project.run_dir("airflow", f"{name}.pid"))
def run(self):
try:
self._plugin = self.config_service.find_plugin("airflow")
......
......@@ -75,8 +75,11 @@ def start(ctx, reload, bind_port, bind):
# TODO: remove when running on Python 3.8
asyncio.get_child_watcher()
# init workers and conditionally append to it in the rest of this function body
workers = [MeltanoCompilerWorker(project)]
workers = []
if not truthy(os.getenv("MELTANO_DISABLE_AIRFLOW", False)):
workers.append(AirflowWorker(project))
workers.append(MeltanoCompilerWorker(project))
# we need to whitelist the loaders here because not
# all the loaders support dbt in the first place
......
......@@ -451,9 +451,3 @@ orchestrators:
- name: core.dags_are_paused_at_creation
env: AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION
value: False
- name: webserver.web_server_port
value: 5010
env: AIRFLOW__WEBSERVER__WEB_SERVER_PORT
- name: webserver.web_server_host
value: '0.0.0.0'
env: AIRFLOW__WEBSERVER__WEB_SERVER_HOST
......@@ -8,7 +8,6 @@ module.exports = {
configureWebpack: {
plugins: [
new webpack.EnvironmentPlugin({
AIRFLOW_URL: 'http://localhost:5010',
MELTANO_APP_URL: 'http://localhost:5000',
DBT_DOCS_URL: 'http://localhost:5000/-/dbt/'
}),
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment