feat: Implement a gRPC Message Size Interceptor
What does this merge request do and why?
Current implementation
Client size limits
At the moment each client enforces a message size limit:
- Size enforcement in the Go executor.
- Size enforcement in Gitlab Language Server.
- Size enforcement in workhorse.
Server size limit
- Default gRPC limits in Duo Workflow Service are 4MiB for incoming and outgoing messages.
- By default the gRPC server handles oversized messages by doing the following:
- Raises a non-ok status code (like
RESOURCE_EXHAUSTED) - Sends a error message to the client (e.g.
Received message larger than max (X vs. 4194304)) - Closes the gRPC connection
- Raises an error in the application
- Raises a non-ok status code (like
- From my local testing I can see the following errors being raised from the client sending oversized messages:
-
RuntimeError: async generator raised StopAsyncIteration, seems to be raised by an oversized startRequest message inserver.pydue to the oversized message being rejected and the connection closed by gRPC. This is propagated through to the gRPC async iterator as it does not have any more messages to process. -
asyncio.exceptions.CancelledError, seems to be raised by oversized messages during workflow execution byasyncioduring awaitingworkflow_taskin server.py due to theworkflow_taskbeing forcibly cancelled by gRPC when the connection closes. At the same time,actionfromsend_events()is stopped with theStopAsyncIterationerror is handled at L290.
-
Example StopAsyncIteration error
{
"event": "Finished ExecuteWorkflow RPC",
"logger": "grpc",
"level": "info",
"correlation_id": "555fc472-1be7-4a1b-858d-aa7862f1b8c7",
"gitlab_global_user_id": "777",
"workflow_id": "undefined",
"duration_s": 0.0025215420027961954,
"request_arrived_at": "2025-09-12T09:17:29.700856+00:00",
"cpu_s": 0.0015339999999999243,
"grpc_type": "BIDI_STREAM",
"grpc_service_name": "DuoWorkflow",
"grpc_method_name": "ExecuteWorkflow",
"servicer_context_code": "OK",
"gitlab_host_name": null,
"gitlab_realm": "self-managed",
"gitlab_instance_id": null,
"gitlab_authentication_type": "oidc",
"gitlab_version": "18.4.0-pre",
"user_agent": "grpc-python/1.73.1 grpc-c/48.0.0 (osx; chttp2)",
"exception_message": "async generator raised StopAsyncIteration",
"exception_class": "RuntimeError",
"exception_backtrace": "Traceback (most recent call last):\n File \"/Users/tim/gitlab/gdk/gdk/gitlab-ai-gateway/duo_workflow_service/server.py\", line 156, in ExecuteWorkflow\n start_workflow_request: contract_pb2.ClientEvent = await anext(\n ^^^^^^^^^^^^\nStopAsyncIteration\n\nThe above exception was the direct cause of the following exception:\n\nTraceback (most recent call last):\n File \"/Users/tim/gitlab/gdk/gdk/gitlab-ai-gateway/duo_workflow_service/interceptors/monitoring_interceptor.py\", line 155, in monitoring\n yield\n File \"/Users/tim/gitlab/gdk/gdk/gitlab-ai-gateway/duo_workflow_service/interceptors/monitoring_interceptor.py\", line 128, in stream_behavior\n async for behavior_response in behavior(\n File \"/Users/tim/gitlab/gdk/gdk/gitlab-ai-gateway/.venv/lib/python3.12/site-packages/dependency_injector/wiring.py\", line 1077, in _patched\n async for obj in fn(*args, **kwargs):\nRuntimeError: async generator raised StopAsyncIteration\n",
"workflow_definition": null,
"timestamp": "2025-09-12T09:17:29.703442Z"
}
Example CancelledError
{
"event": "Client-side streaming has been closed.",
"logger": "server",
"level": "info",
"correlation_id": "6f4d536f-faba-4f83-9536-357aff0a2b54",
"gitlab_global_user_id": "777",
"workflow_id": "test-normal-message",
"timestamp": "2025-09-12T09:17:29.754286Z"
}
Followed by:
{
"event": "",
"logger": "exceptions",
"level": "error",
"correlation_id": "6f4d536f-faba-4f83-9536-357aff0a2b54",
"gitlab_global_user_id": "777",
"workflow_id": "test-normal-message",
"status_code": null,
"exception_class": "CancelledError",
"additional_details": {
"workflow_id": "test-normal-message",
"source": "duo_workflow_service.workflows.chat.workflow"
},
"timestamp": "2025-09-12T09:17:29.754544Z",
"exception": "Traceback (most recent call last):\n File \"/Users/tim/gitlab/gdk/gdk/gitlab-ai-gateway/duo_workflow_service/workflows/abstract_workflow.py\", line 224, in _compile_and_run_graph\n await fetch_workflow_and_container_data(\n File \"/Users/tim/gitlab/gdk/gdk/gitlab-ai-gateway/duo_workflow_service/gitlab/gitlab_api.py\", line 148, in fetch_workflow_and_container_data\n response = await client.graphql(query, variables)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/Users/tim/gitlab/gdk/gdk/gitlab-ai-gateway/duo_workflow_service/gitlab/executor_http_client.py\", line 83, in graphql\n response = await asyncio.wait_for(\n ^^^^^^^^^^^^^^^^^^^^^^^\n File \"/Users/tim/.local/share/mise/installs/python/3.12.11/lib/python3.12/asyncio/tasks.py\", line 520, in wait_for\n return await fut\n ^^^^^^^^^\n File \"/Users/tim/gitlab/gdk/gdk/gitlab-ai-gateway/duo_workflow_service/executor/action.py\", line 57, in _execute_action\n actionResponse = await _execute_action_and_get_action_response(metadata, action)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/Users/tim/gitlab/gdk/gdk/gitlab-ai-gateway/duo_workflow_service/executor/action.py\", line 38, in _execute_action_and_get_action_response\n event: contract_pb2.ClientEvent = await inbox.get()\n ^^^^^^^^^^^^^^^^^\n File \"/Users/tim/.local/share/mise/installs/python/3.12.11/lib/python3.12/asyncio/queues.py\", line 158, in get\n await getter\nasyncio.exceptions.CancelledError"
}
Current limitations
The current implementation, and the use of default handling, will log either StopAsyncIteration or CancelledError into the logs with no indication what caused these errors. As these are both generic asyncio errors, they can be caused by various reasons.
Clients will receive a more helpful RESOURCE_EXHAUSTED error with a error message explaining the message was oversized, but as clients can be local we wouldn't be able to use these for observability of the DWS server.
Proposed implementation
This MR implements a interceptor + wrapper to inspect message sizes for both incoming and outgoing gRPC messages and raise useful logging messages using the structured logging of DWS. These include correlation_id and workflow_id.
The newly created MessageSizeInterceptor reads message content to calculate size and compares this to the MAX_MESSAGE_SIZE. This replaces the default handling and allows us to insert custom handling logic for cases of oversized messages.
With this first implementation, if a message violates this limit then an error is logged (using structured logging) and grpc.aio.ServicerContext.abort() is used which imitates the behaviour of the default handling.
Within this interceptor, there are 4 kinds of wrapper:
-
UNARY: a unary to unary connection (e.g.GenerateToken, similar to HTTP requests) -
SERVER_STREAMING: unary (client) to streaming (server) connection -
CLIENT_STREAMING: streaming (client) to unary (server) connection -
BIDI_STREAMING: bi-directional streaming (streaming to streaming) connection (e.g.ExecuteWorkflow, similar to websocket connection)
There are also changes to the server.py to catch StopAsyncIteration exceptions which are raised if a startRequest with a message size larger than MAX_MESSAGE_SIZE is received, or the message is empty.
Example error message from MessageSizeInterceptor
{
"event": "Error with incoming message size (5242923 bytes, 5.0 MB) exceeds 4MiB limit",
"logger": "grpc",
"level": "error",
"correlation_id": "67a6e6a4-5bb8-40a9-902e-bae379a89a1d",
"gitlab_global_user_id": "777",
"workflow_id": "undefined",
"direction": "incoming",
"method": "/DuoWorkflow/ExecuteWorkflow",
"message_size_bytes": 5242923,
"max_size_bytes": 4194304,
"grpc_type": "BIDI_STREAM",
"size_mb": 5.0,
"max_size_mb": 4.0,
"timestamp": "2025-09-12T10:51:51.653876Z"
}
Future enhancement
We could improve this in the future by:
- Adding a UIChatLog for message size errors
- Adding a unique error code
How to set up and validate locally
You can use this script to send a 5MiB startRequest, then observe logs to see new logging message:
import os
import grpc
from dotenv import load_dotenv
from gitlab_cloud_connector import (
CloudConnectorUser,
GitLabUnitPrimitive,
TokenAuthority,
UserClaims,
)
from contract import contract_pb2, contract_pb2_grpc
def main():
load_dotenv()
os.environ.setdefault("CLOUD_CONNECTOR_SERVICE_NAME", "gitlab-duo-workflow-service")
# Set up client
port = int(os.environ.get("PORT", "50052"))
channel = grpc.insecure_channel(f"localhost:{port}")
stub = contract_pb2_grpc.DuoWorkflowStub(channel)
# Create auth token
ta = TokenAuthority(os.environ.get("DUO_WORKFLOW_SELF_SIGNED_JWT__SIGNING_KEY"))
current_user = CloudConnectorUser(authenticated=True, claims=UserClaims())
token, _ = ta.encode(
"777",
"self-managed",
current_user,
"test-instance",
[GitLabUnitPrimitive.DUO_WORKFLOW_EXECUTE_WORKFLOW],
)
metadata = [("authorization", f"Bearer {token}")]
# Create 5MB message
large_content = "x" * (5 * 1024 * 1024)
message = contract_pb2.ClientEvent(
startRequest=contract_pb2.StartWorkflowRequest(
clientVersion="1",
workflowDefinition="chat",
workflowID="test",
goal=large_content,
)
)
try:
print("Sending 5MB message...")
def message_generator():
yield message
list(stub.ExecuteWorkflow(message_generator(), metadata=metadata))
print("FAIL: Message was accepted")
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.RESOURCE_EXHAUSTED:
print("PASS: Message rejected with RESOURCE_EXHAUSTED")
else:
print(f"FAIL: Wrong error: {e.code()}")
if __name__ == "__main__":
main()
Merge request checklist
-
Tests added for new functionality. If not, please raise an issue to follow up. -
Documentation added/updated, if needed. -
If this change requires executor implementation: verified that issues/MRs exist for both Go executor and Node executor or confirmed that changes are backward-compatible and don't break existing executor functionality.
Closes #1367 (closed)