Skip to content

Fix irregular chunks can't be parsed as event

Shinya Maeda requested to merge fix-events-dupmling into master

This is a high priority MR for Switch to Chat Agent V2 (gitlab-org#13533 - closed). Please prioritize the review and merge.

What does this MR do and why?

This fixes Still A1002 is happening (#491610 - closed).

In some cases, multiple events could be contained in a single streamed chunk. In this case, we need to split the events and feed it to a parser.

This fix is similar to Anthropic client's SSE parser. Technically speaking, our v2/chat/agent endpoint is not SSE compliant, so we parse chunks as JSON data with a new line as delimiter.

MR acceptance checklist

Please evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.

How to set up and validate locally

To test multiple events in a single chunk, apply the following patch in AI Gateway:

diff --git a/ai_gateway/api/v2/chat/agent.py b/ai_gateway/api/v2/chat/agent.py
index 5c068db0..1d24c0bd 100644
--- a/ai_gateway/api/v2/chat/agent.py
+++ b/ai_gateway/api/v2/chat/agent.py
@@ -53,8 +53,18 @@ async def chat(
     internal_event_client: InternalEventsClient = Depends(get_internal_event_client),
 ):
     async def _stream_handler(stream_events: AsyncIterator[TypeAgentEvent]):
+        buffer = []
         async for event in stream_events:
-            yield f"{event.dump_as_response()}\n"
+            buffer.append(event)
+
+            if len(buffer) > 1:
+                res = "".join([f"{e.dump_as_response()}\n" for e in buffer])
+                yield res
+                buffer = []
+        
+        if len(buffer) > 0:
+            yield f"{buffer[0]}\n"
+
 
     scratchpad = [
         AgentStep(

To test a single event spread in multiple chunks, apply the following patch in AI Gateway:

diff --git a/ai_gateway/api/v2/chat/agent.py b/ai_gateway/api/v2/chat/agent.py
index 5c068db0..a769e756 100644
--- a/ai_gateway/api/v2/chat/agent.py
+++ b/ai_gateway/api/v2/chat/agent.py
@@ -54,7 +54,13 @@ async def chat(
 ):
     async def _stream_handler(stream_events: AsyncIterator[TypeAgentEvent]):
         async for event in stream_events:
-            yield f"{event.dump_as_response()}\n"
+            # yield f"{event.dump_as_response()}\n"
+            res = event.dump_as_response()
+            mid_pos = len(res) // 2
+            yield res[:mid_pos]
+            yield res[mid_pos:]
+            yield "\n"
+
 
     scratchpad = [
         AgentStep(
Edited by Shinya Maeda

Merge request reports

Loading