Skip to content

Draft: feat: Implement prometheus counter for Task was destroyed but it is pending error

What does this merge request do and why?

Currently we are seeing a large number of Task was destroyed but it is pending! errors raised in Sentry.

From what I can see these are raised in the astream method in _compile_and_run_graph: https://gitlab.com/gitlab-org/modelops/applied-ml/code-suggestions/ai-assist/-/blob/main/duo_workflow_service/workflows/abstract_workflow.py#L265

Actually, this seems to be a warning raised from asyncio due to pending items remaining in the LangGraph queue at garbage collection time.

These warnings seem to be raised at the Error level in sentry leading to them showing up in our Sentry alerts.

In an effort to reduce noise in Sentry (#1009 (closed)) this MR implements a prometheus counter as a first step at moving to Grafana alerts based on a threshold rather than each error being raise in Sentry.

As mentioned in #1009 (closed) the approach to reduce noise is to:

  1. Create prometheus counter for error <---- This MR
  2. Create alert in Grafana
  3. Filter alert from Sentry when alert is in place <---- Also this MR

This is the third counter to be implemented, others include a counter for checkpointing 500 errors (!2953 (merged)) and model provider 500 errors (!2956 (merged)).

This MR introduces the duo_workflow_asyncio_error_total prometheus counter to track and alert for asyncio warnings containing the text Task was destroyed but it is pending!, with the scope to be expanded in future if other asyncio errors are to be suppressed from Sentry.

Tracks the following labels

  • type: the type of asyncio error (only pending_task_destroyed currently, but scope to expand to others in the future)
  • workflow_id: the ID of the workflow

How to set up and validate locally

  1. Run DWS in GDK and make sure that SENTRY_ERROR_TRACKING_ENABLED=true
  2. Apply this code change (calls the asyncio exception handler with a Task was destroyed but it is pending! message)
git apply !2980.patch
diff --git a/duo_workflow_service/tracking/sentry_error_tracking.py b/duo_workflow_service/tracking/sentry_error_tracking.py
index 12ad5a81..22770f1c 100644
--- a/duo_workflow_service/tracking/sentry_error_tracking.py
+++ b/duo_workflow_service/tracking/sentry_error_tracking.py
@@ -14,16 +14,17 @@ log = structlog.stdlib.get_logger("error_tracking")


 def setup_error_tracking():
-    if sentry_tracking_available():
-        sentry_sdk.init(
-            dsn=os.environ.get("SENTRY_DSN"),
-            environment=os.environ.get("DUO_WORKFLOW_SERVICE_ENVIRONMENT"),
-            traces_sample_rate=1.0,
-            before_send=before_send,
-            profiles_sample_rate=1.0,
-            integrations=[GRPCIntegration(), AsyncioIntegration()],
-            max_value_length=30 * 1024,
-        )
+    events = []
+    sentry_sdk.init(
+        dsn="",
+        environment=os.environ.get("DUO_WORKFLOW_SERVICE_ENVIRONMENT"),
+        traces_sample_rate=1.0,
+        before_send=before_send,
+        transport=events.append,
+        profiles_sample_rate=1.0,
+        integrations=[GRPCIntegration(), AsyncioIntegration()],
+        max_value_length=30 * 1024,
+    )


 def sentry_tracking_available():
diff --git a/duo_workflow_service/workflows/abstract_workflow.py b/duo_workflow_service/workflows/abstract_workflow.py
index 23a94b08..9214404b 100644
--- a/duo_workflow_service/workflows/abstract_workflow.py
+++ b/duo_workflow_service/workflows/abstract_workflow.py
@@ -193,6 +193,15 @@ class AbstractWorkflow(ABC):
     def _recursion_limit(self):
         return RECURSION_LIMIT

+    @staticmethod
+    def _trigger_test_warning():
+        loop = asyncio.get_running_loop()
+        context = {
+            "message": "Task was destroyed but it is pending!",
+            "task": "test_task",
+        }
+        loop.call_exception_handler(context)
+
     @traceable
     async def _compile_and_run_graph(self, goal: str) -> None:
         graph_config: RunnableConfig = {
@@ -261,6 +270,7 @@ class AbstractWorkflow(ABC):

                 compiled_graph = self._compile(goal, tools_registry, checkpointer)
                 graph_input = await self.get_graph_input(goal, status_event)
+                self._trigger_test_warning()
                 async for type, state in compiled_graph.astream(
                     input=graph_input,
                     config=graph_config,
  1. Open your local prometheus server (by default it is on localhost:8083)
  2. Look for duo_workflow_asyncio_error_total which should show the new counter with a count of 1

Merge request checklist

  • Tests added for new functionality. If not, please raise an issue to follow up.
  • Documentation added/updated, if needed.

Relates to #1009 (closed)

Edited by Tim Morriss

Merge request reports

Loading