Adjust snowplow emitter thread count
What does this merge request do and why?
Currently, we initiates 5 threads for snowplow instrumentator. Given that we only initiate one thread for the main asyncio loop as the web worker and subsequent request processes, 5 threads is too much and should free its resource for the main web worker.
Here is the default values in AsyncEmitter
:
-
thread_count
is1
https://github.com/snowplow/snowplow-python-tracker/blob/master/snowplow_tracker/emitters.py#L445 -
batch_size
is10
https://github.com/snowplow/snowplow-python-tracker/blob/master/snowplow_tracker/emitters.py#L463
Related to AI Gateway becomes unresponsive by CPU-bound task (#405 - closed).
How to set up and validate locally
- Checkout this branch
- Cherry-pick
monitoring-threads
branch !596 (merged). - Set the following in .env
# Tracking
AIGW_SNOWPLOW__ENABLED=True
AIGW_SNOWPLOW__ENDPOINT="https://blizzard.local"
AIGW_SNOWPLOW__THREAD_COUNT=7
AIGW_SNOWPLOW__BATCH_SIZE=8
# Instrumentators
AIGW_INSTRUMENTATOR__THREAD_MONITORING_ENABLED=True
AIGW_INSTRUMENTATOR__THREAD_MONITORING_INTERVAL=5
- Check
threads
log by Monitor thread activities (!596 - merged) inmodelgateway_debug.log
.
For example, here is the log where you can see that 7 threads running for snowplow:
{
"pid": 21602,
"threads_count": 10,
"stacktrace": [
{
"thread_id": 140317688837696,
"lines": [
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 973,
"name": "_bootstrap",
"line": "self._bootstrap_inner()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 1016,
"name": "_bootstrap_inner",
"line": "self.run()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 953,
"name": "run",
"line": "self._target(*self._args, **self._kwargs)"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/socketserver.py",
"lineno": 232,
"name": "serve_forever",
"line": "ready = selector.select(poll_interval)"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/selectors.py",
"lineno": 416,
"name": "select",
"line": "fd_event_list = self._selector.poll(timeout)"
}
]
},
{
"thread_id": 140318435427904,
"lines": [
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 973,
"name": "_bootstrap",
"line": "self._bootstrap_inner()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 1016,
"name": "_bootstrap_inner",
"line": "self.run()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 953,
"name": "run",
"line": "self._target(*self._args, **self._kwargs)"
}
]
},
{
"thread_id": 140318938814016,
"lines": [
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 973,
"name": "_bootstrap",
"line": "self._bootstrap_inner()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 1016,
"name": "_bootstrap_inner",
"line": "self.run()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 953,
"name": "run",
"line": "self._target(*self._args, **self._kwargs)"
},
{
"filename": "/home/shinya/ai-assist/.venv/lib/python3.10/site-packages/snowplow_tracker/emitters.py",
"lineno": 529,
"name": "consume",
"line": "evts = self.queue.get()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/queue.py",
"lineno": 171,
"name": "get",
"line": "self.not_empty.wait()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 320,
"name": "wait",
"line": "waiter.acquire()"
}
]
},
{
"thread_id": 140318947206720,
"lines": [
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 973,
"name": "_bootstrap",
"line": "self._bootstrap_inner()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 1016,
"name": "_bootstrap_inner",
"line": "self.run()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 953,
"name": "run",
"line": "self._target(*self._args, **self._kwargs)"
},
{
"filename": "/home/shinya/ai-assist/.venv/lib/python3.10/site-packages/snowplow_tracker/emitters.py",
"lineno": 529,
"name": "consume",
"line": "evts = self.queue.get()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/queue.py",
"lineno": 171,
"name": "get",
"line": "self.not_empty.wait()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 320,
"name": "wait",
"line": "waiter.acquire()"
}
]
},
{
"thread_id": 140318955599424,
"lines": [
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 973,
"name": "_bootstrap",
"line": "self._bootstrap_inner()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 1016,
"name": "_bootstrap_inner",
"line": "self.run()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 953,
"name": "run",
"line": "self._target(*self._args, **self._kwargs)"
},
{
"filename": "/home/shinya/ai-assist/.venv/lib/python3.10/site-packages/snowplow_tracker/emitters.py",
"lineno": 529,
"name": "consume",
"line": "evts = self.queue.get()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/queue.py",
"lineno": 171,
"name": "get",
"line": "self.not_empty.wait()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 320,
"name": "wait",
"line": "waiter.acquire()"
}
]
},
{
"thread_id": 140318963992128,
"lines": [
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 973,
"name": "_bootstrap",
"line": "self._bootstrap_inner()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 1016,
"name": "_bootstrap_inner",
"line": "self.run()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 953,
"name": "run",
"line": "self._target(*self._args, **self._kwargs)"
},
{
"filename": "/home/shinya/ai-assist/.venv/lib/python3.10/site-packages/snowplow_tracker/emitters.py",
"lineno": 529,
"name": "consume",
"line": "evts = self.queue.get()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/queue.py",
"lineno": 171,
"name": "get",
"line": "self.not_empty.wait()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 320,
"name": "wait",
"line": "waiter.acquire()"
}
]
},
{
"thread_id": 140318972384832,
"lines": [
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 973,
"name": "_bootstrap",
"line": "self._bootstrap_inner()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 1016,
"name": "_bootstrap_inner",
"line": "self.run()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 953,
"name": "run",
"line": "self._target(*self._args, **self._kwargs)"
},
{
"filename": "/home/shinya/ai-assist/.venv/lib/python3.10/site-packages/snowplow_tracker/emitters.py",
"lineno": 529,
"name": "consume",
"line": "evts = self.queue.get()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/queue.py",
"lineno": 171,
"name": "get",
"line": "self.not_empty.wait()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 320,
"name": "wait",
"line": "waiter.acquire()"
}
]
},
{
"thread_id": 140318980777536,
"lines": [
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 973,
"name": "_bootstrap",
"line": "self._bootstrap_inner()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 1016,
"name": "_bootstrap_inner",
"line": "self.run()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 953,
"name": "run",
"line": "self._target(*self._args, **self._kwargs)"
},
{
"filename": "/home/shinya/ai-assist/.venv/lib/python3.10/site-packages/snowplow_tracker/emitters.py",
"lineno": 529,
"name": "consume",
"line": "evts = self.queue.get()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/queue.py",
"lineno": 171,
"name": "get",
"line": "self.not_empty.wait()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 320,
"name": "wait",
"line": "waiter.acquire()"
}
]
},
{
"thread_id": 140318989170240,
"lines": [
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 973,
"name": "_bootstrap",
"line": "self._bootstrap_inner()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 1016,
"name": "_bootstrap_inner",
"line": "self.run()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 953,
"name": "run",
"line": "self._target(*self._args, **self._kwargs)"
},
{
"filename": "/home/shinya/ai-assist/.venv/lib/python3.10/site-packages/snowplow_tracker/emitters.py",
"lineno": 529,
"name": "consume",
"line": "evts = self.queue.get()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/queue.py",
"lineno": 171,
"name": "get",
"line": "self.not_empty.wait()"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
"lineno": 320,
"name": "wait",
"line": "waiter.acquire()"
}
]
},
{
"thread_id": 140320031853440,
"lines": [
{
"filename": "<string>",
"lineno": 1,
"name": "<module>"
},
{
"filename": "/home/shinya/ai-assist/ai_gateway/app.py",
"lineno": 84,
"name": "main",
"line": "uvicorn.run("
},
{
"filename": "/home/shinya/ai-assist/.venv/lib/python3.10/site-packages/uvicorn/main.py",
"lineno": 587,
"name": "run",
"line": "server.run()"
},
{
"filename": "/home/shinya/ai-assist/.venv/lib/python3.10/site-packages/uvicorn/server.py",
"lineno": 61,
"name": "run",
"line": "return asyncio.run(self.serve(sockets=sockets))"
},
{
"filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/asyncio/runners.py",
"lineno": 44,
"name": "run",
"line": "return loop.run_until_complete(main)"
},
{
"filename": "/home/shinya/ai-assist/ai_gateway/instrumentators/threads.py",
"lineno": 45,
"name": "monitor_threads",
"line": "for filename, lineno, name, line in traceback.extract_stack(stack):"
}
]
}
],
"logger": "threads",
"level": "info",
"type": "mlops",
"stage": "main",
"timestamp": "2024-01-17T05:54:08.643819Z",
"message": "Thread activities"
}
Merge request checklist
-
Tests added for new functionality. If not, please raise an issue to follow up. -
Documentation added/updated, if needed.
Edited by Shinya Maeda