Skip to content

Adjust snowplow emitter thread count

Shinya Maeda requested to merge shinya.maeda-main-patch-af1d into main

What does this merge request do and why?

Currently, we initiates 5 threads for snowplow instrumentator. Given that we only initiate one thread for the main asyncio loop as the web worker and subsequent request processes, 5 threads is too much and should free its resource for the main web worker.

Here is the default values in AsyncEmitter:

Related to AI Gateway becomes unresponsive by CPU-bound task (#405 - closed).

How to set up and validate locally

  1. Checkout this branch
  2. Cherry-pick monitoring-threads branch !596 (merged).
  3. Set the following in .env
# Tracking
AIGW_SNOWPLOW__ENABLED=True
AIGW_SNOWPLOW__ENDPOINT="https://blizzard.local"
AIGW_SNOWPLOW__THREAD_COUNT=7
AIGW_SNOWPLOW__BATCH_SIZE=8

# Instrumentators
AIGW_INSTRUMENTATOR__THREAD_MONITORING_ENABLED=True
AIGW_INSTRUMENTATOR__THREAD_MONITORING_INTERVAL=5
  1. Check threads log by Monitor thread activities (!596 - merged) in modelgateway_debug.log.

For example, here is the log where you can see that 7 threads running for snowplow:

{
    "pid": 21602,
    "threads_count": 10,
    "stacktrace": [
        {
            "thread_id": 140317688837696,
            "lines": [
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 973,
                    "name": "_bootstrap",
                    "line": "self._bootstrap_inner()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 1016,
                    "name": "_bootstrap_inner",
                    "line": "self.run()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 953,
                    "name": "run",
                    "line": "self._target(*self._args, **self._kwargs)"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/socketserver.py",
                    "lineno": 232,
                    "name": "serve_forever",
                    "line": "ready = selector.select(poll_interval)"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/selectors.py",
                    "lineno": 416,
                    "name": "select",
                    "line": "fd_event_list = self._selector.poll(timeout)"
                }
            ]
        },
        {
            "thread_id": 140318435427904,
            "lines": [
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 973,
                    "name": "_bootstrap",
                    "line": "self._bootstrap_inner()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 1016,
                    "name": "_bootstrap_inner",
                    "line": "self.run()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 953,
                    "name": "run",
                    "line": "self._target(*self._args, **self._kwargs)"
                }
            ]
        },
        {
            "thread_id": 140318938814016,
            "lines": [
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 973,
                    "name": "_bootstrap",
                    "line": "self._bootstrap_inner()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 1016,
                    "name": "_bootstrap_inner",
                    "line": "self.run()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 953,
                    "name": "run",
                    "line": "self._target(*self._args, **self._kwargs)"
                },
                {
                    "filename": "/home/shinya/ai-assist/.venv/lib/python3.10/site-packages/snowplow_tracker/emitters.py",
                    "lineno": 529,
                    "name": "consume",
                    "line": "evts = self.queue.get()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/queue.py",
                    "lineno": 171,
                    "name": "get",
                    "line": "self.not_empty.wait()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 320,
                    "name": "wait",
                    "line": "waiter.acquire()"
                }
            ]
        },
        {
            "thread_id": 140318947206720,
            "lines": [
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 973,
                    "name": "_bootstrap",
                    "line": "self._bootstrap_inner()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 1016,
                    "name": "_bootstrap_inner",
                    "line": "self.run()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 953,
                    "name": "run",
                    "line": "self._target(*self._args, **self._kwargs)"
                },
                {
                    "filename": "/home/shinya/ai-assist/.venv/lib/python3.10/site-packages/snowplow_tracker/emitters.py",
                    "lineno": 529,
                    "name": "consume",
                    "line": "evts = self.queue.get()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/queue.py",
                    "lineno": 171,
                    "name": "get",
                    "line": "self.not_empty.wait()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 320,
                    "name": "wait",
                    "line": "waiter.acquire()"
                }
            ]
        },
        {
            "thread_id": 140318955599424,
            "lines": [
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 973,
                    "name": "_bootstrap",
                    "line": "self._bootstrap_inner()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 1016,
                    "name": "_bootstrap_inner",
                    "line": "self.run()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 953,
                    "name": "run",
                    "line": "self._target(*self._args, **self._kwargs)"
                },
                {
                    "filename": "/home/shinya/ai-assist/.venv/lib/python3.10/site-packages/snowplow_tracker/emitters.py",
                    "lineno": 529,
                    "name": "consume",
                    "line": "evts = self.queue.get()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/queue.py",
                    "lineno": 171,
                    "name": "get",
                    "line": "self.not_empty.wait()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 320,
                    "name": "wait",
                    "line": "waiter.acquire()"
                }
            ]
        },
        {
            "thread_id": 140318963992128,
            "lines": [
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 973,
                    "name": "_bootstrap",
                    "line": "self._bootstrap_inner()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 1016,
                    "name": "_bootstrap_inner",
                    "line": "self.run()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 953,
                    "name": "run",
                    "line": "self._target(*self._args, **self._kwargs)"
                },
                {
                    "filename": "/home/shinya/ai-assist/.venv/lib/python3.10/site-packages/snowplow_tracker/emitters.py",
                    "lineno": 529,
                    "name": "consume",
                    "line": "evts = self.queue.get()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/queue.py",
                    "lineno": 171,
                    "name": "get",
                    "line": "self.not_empty.wait()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 320,
                    "name": "wait",
                    "line": "waiter.acquire()"
                }
            ]
        },
        {
            "thread_id": 140318972384832,
            "lines": [
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 973,
                    "name": "_bootstrap",
                    "line": "self._bootstrap_inner()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 1016,
                    "name": "_bootstrap_inner",
                    "line": "self.run()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 953,
                    "name": "run",
                    "line": "self._target(*self._args, **self._kwargs)"
                },
                {
                    "filename": "/home/shinya/ai-assist/.venv/lib/python3.10/site-packages/snowplow_tracker/emitters.py",
                    "lineno": 529,
                    "name": "consume",
                    "line": "evts = self.queue.get()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/queue.py",
                    "lineno": 171,
                    "name": "get",
                    "line": "self.not_empty.wait()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 320,
                    "name": "wait",
                    "line": "waiter.acquire()"
                }
            ]
        },
        {
            "thread_id": 140318980777536,
            "lines": [
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 973,
                    "name": "_bootstrap",
                    "line": "self._bootstrap_inner()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 1016,
                    "name": "_bootstrap_inner",
                    "line": "self.run()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 953,
                    "name": "run",
                    "line": "self._target(*self._args, **self._kwargs)"
                },
                {
                    "filename": "/home/shinya/ai-assist/.venv/lib/python3.10/site-packages/snowplow_tracker/emitters.py",
                    "lineno": 529,
                    "name": "consume",
                    "line": "evts = self.queue.get()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/queue.py",
                    "lineno": 171,
                    "name": "get",
                    "line": "self.not_empty.wait()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 320,
                    "name": "wait",
                    "line": "waiter.acquire()"
                }
            ]
        },
        {
            "thread_id": 140318989170240,
            "lines": [
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 973,
                    "name": "_bootstrap",
                    "line": "self._bootstrap_inner()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 1016,
                    "name": "_bootstrap_inner",
                    "line": "self.run()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 953,
                    "name": "run",
                    "line": "self._target(*self._args, **self._kwargs)"
                },
                {
                    "filename": "/home/shinya/ai-assist/.venv/lib/python3.10/site-packages/snowplow_tracker/emitters.py",
                    "lineno": 529,
                    "name": "consume",
                    "line": "evts = self.queue.get()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/queue.py",
                    "lineno": 171,
                    "name": "get",
                    "line": "self.not_empty.wait()"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/threading.py",
                    "lineno": 320,
                    "name": "wait",
                    "line": "waiter.acquire()"
                }
            ]
        },
        {
            "thread_id": 140320031853440,
            "lines": [
                {
                    "filename": "<string>",
                    "lineno": 1,
                    "name": "<module>"
                },
                {
                    "filename": "/home/shinya/ai-assist/ai_gateway/app.py",
                    "lineno": 84,
                    "name": "main",
                    "line": "uvicorn.run("
                },
                {
                    "filename": "/home/shinya/ai-assist/.venv/lib/python3.10/site-packages/uvicorn/main.py",
                    "lineno": 587,
                    "name": "run",
                    "line": "server.run()"
                },
                {
                    "filename": "/home/shinya/ai-assist/.venv/lib/python3.10/site-packages/uvicorn/server.py",
                    "lineno": 61,
                    "name": "run",
                    "line": "return asyncio.run(self.serve(sockets=sockets))"
                },
                {
                    "filename": "/home/shinya/.asdf/installs/python/3.10.13/lib/python3.10/asyncio/runners.py",
                    "lineno": 44,
                    "name": "run",
                    "line": "return loop.run_until_complete(main)"
                },
                {
                    "filename": "/home/shinya/ai-assist/ai_gateway/instrumentators/threads.py",
                    "lineno": 45,
                    "name": "monitor_threads",
                    "line": "for filename, lineno, name, line in traceback.extract_stack(stack):"
                }
            ]
        }
    ],
    "logger": "threads",
    "level": "info",
    "type": "mlops",
    "stage": "main",
    "timestamp": "2024-01-17T05:54:08.643819Z",
    "message": "Thread activities"
}

Merge request checklist

  • Tests added for new functionality. If not, please raise an issue to follow up.
  • Documentation added/updated, if needed.
Edited by Shinya Maeda

Merge request reports