Skip to content

feat(tracking): add internal events framework to ai gateway

What does this merge request do and why?

As Part of #491 (closed), we will be adding Internal events to AI gateway.

This will help us standardised event structure discussed in https://gitlab.com/gitlab-org/analytics-section/analytics-instrumentation/proposals/-/merge_requests/15

How to use this for tracking
internal_events_client.track_event(
    'event_name',
    {
        'label': 'label',
        'value': 2,
        'property': 'property',
        'random-key': 'value'
    },
    standard_context 
    'category'
)

How to set up and validate locally

patch
diff --git a/ai_gateway/api/v2/code/completions.py b/ai_gateway/api/v2/code/completions.py
index 2604cda..3391c0d 100644
--- a/ai_gateway/api/v2/code/completions.py
+++ b/ai_gateway/api/v2/code/completions.py
@@ -1,3 +1,4 @@
+from datetime import datetime
 from time import time
 from typing import Annotated, AsyncIterator, Union
 
@@ -34,6 +35,7 @@ from ai_gateway.async_dependency_resolver import (
     get_code_suggestions_generations_anthropic_factory_provider,
     get_code_suggestions_generations_litellm_factory_provider,
     get_code_suggestions_generations_vertex_provider,
+    get_internal_events_client,
     get_snowplow_instrumentator,
 )
 from ai_gateway.auth.self_signed_jwt import SELF_SIGNED_TOKEN_ISSUER
@@ -48,15 +50,20 @@ from ai_gateway.code_suggestions.processing.ops import lang_from_filename
 from ai_gateway.gitlab_features import GitLabFeatureCategory, GitLabUnitPrimitive
 from ai_gateway.instrumentators.base import TelemetryInstrumentator
 from ai_gateway.models import KindAnthropicModel, KindModelProvider
-from ai_gateway.tracking import SnowplowEvent, SnowplowEventContext
+from ai_gateway.tracking import (
+    InternalEventAdditionalProperties,
+    SnowplowEvent,
+    SnowplowEventContext,
+    StandardContext,
+)
 from ai_gateway.tracking.errors import log_exception
 from ai_gateway.tracking.instrumentator import SnowplowInstrumentator
+from ai_gateway.tracking.internal_events import InternalEventsClient
 
 __all__ = [
     "router",
 ]
 
-
 log = structlog.stdlib.get_logger("codesuggestions")
 
 router = APIRouter()
@@ -91,6 +98,7 @@ async def completions(
     snowplow_instrumentator: SnowplowInstrumentator = Depends(
         get_snowplow_instrumentator
     ),
+    internal_events: InternalEventsClient = Depends(get_internal_events_client),
 ):
     if not current_user.can(GitLabUnitPrimitive.CODE_SUGGESTIONS):
         raise HTTPException(
@@ -99,8 +107,34 @@ async def completions(
         )
 
     try:
-        snowplow_instrumentator.watch(
-            _suggestion_requested_snowplow_event(request, payload)
+        # snowplow_instrumentator.watch(
+        #     _suggestion_requested_snowplow_event(request, payload)
+        # )
+
+        standard_context = StandardContext(
+            project_id=123,
+            namespace_id=456,
+            is_gitlab_team_member=True,
+            feature_enabled_by_namespace_ids=[123, 456],
+            environment="production",
+            source="api",
+            plan="premium",
+            context_generated_at=datetime.now().isoformat(),
+            realm=request.headers.get(X_GITLAB_REALM_HEADER, "saas"),
+            instance_id=request.headers.get(X_GITLAB_INSTANCE_ID_HEADER, ""),
+            global_user_id=request.headers.get(X_GITLAB_GLOBAL_USER_ID_HEADER, ""),
+            host_name=request.headers.get(X_GITLAB_HOST_NAME_HEADER, ""),
+        )
+
+        additional_properties = InternalEventAdditionalProperties(
+            label="completion_event", property="property_value", value=1, key="value"
+        )
+
+        internal_events.track_event(
+            event_name="test_event",
+            additional_properties=additional_properties,
+            context=standard_context,
+            category="code_completions",
         )
     except Exception as e:
         log_exception(e)
@@ -114,42 +148,42 @@ async def completions(
         stream=payload.stream,
     )
 
-    kwargs = {}
-    if payload.model_provider == KindModelProvider.ANTHROPIC:
-        code_completions = completions_anthropic_factory()
-
-        # We support the prompt version 2 only with the Anthropic models
-        if payload.prompt_version == 2:
-            kwargs.update({"raw_prompt": payload.prompt})
-    elif payload.model_provider == KindModelProvider.LITELLM:
-        code_completions = completions_litellm_factory(
-            model__name=payload.model_name,
-            model__endpoint=payload.model_endpoint,
-        )
-    else:
-        code_completions = completions_legacy_factory()
-        if payload.choices_count > 0:
-            kwargs.update({"candidate_count": payload.choices_count})
-
-        if payload.context:
-            kwargs.update({"code_context": [ctx.content for ctx in payload.context]})
-
-    suggestions = await _execute_code_completion(payload, code_completions, **kwargs)
-
-    if isinstance(suggestions[0], AsyncIterator):
-        return await _handle_stream(suggestions[0])
-
-    return SuggestionsResponse(
-        id="id",
-        created=int(time()),
-        model=SuggestionsResponse.Model(
-            engine=suggestions[0].model.engine,
-            name=suggestions[0].model.name,
-            lang=suggestions[0].lang,
-        ),
-        experiments=suggestions[0].metadata.experiments,
-        choices=_completion_suggestion_choices(suggestions),
-    )
+    # kwargs = {}
+    # if payload.model_provider == KindModelProvider.ANTHROPIC:
+    #     code_completions = completions_anthropic_factory()
+
+    #     # We support the prompt version 2 only with the Anthropic models
+    #     if payload.prompt_version == 2:
+    #         kwargs.update({"raw_prompt": payload.prompt})
+    # elif payload.model_provider == KindModelProvider.LITELLM:
+    #     code_completions = completions_litellm_factory(
+    #         model__name=payload.model_name,
+    #         model__endpoint=payload.model_endpoint,
+    #     )
+    # else:
+    #     code_completions = completions_legacy_factory()
+    #     if payload.choices_count > 0:
+    #         kwargs.update({"candidate_count": payload.choices_count})
+
+    #     if payload.context:
+    #         kwargs.update({"code_context": [ctx.content for ctx in payload.context]})
+
+    # suggestions = await _execute_code_completion(payload, code_completions, **kwargs)
+
+    # if isinstance(suggestions[0], AsyncIterator):
+    #     return await _handle_stream(suggestions[0])
+
+    # return SuggestionsResponse(
+    #     id="id",
+    #     created=int(time()),
+    #     model=SuggestionsResponse.Model(
+    #         engine=suggestions[0].model.engine,
+    #         name=suggestions[0].model.name,
+    #         lang=suggestions[0].lang,
+    #     ),
+    #     experiments=suggestions[0].metadata.experiments,
+    #     choices=_completion_suggestion_choices(suggestions),
+    # )
 
 
 @router.post("/code/generations")
diff --git a/ai_gateway/tracking/container.py b/ai_gateway/tracking/container.py
index c9c6356..10d7828 100644
--- a/ai_gateway/tracking/container.py
+++ b/ai_gateway/tracking/container.py
@@ -17,7 +17,7 @@ def _init_snowplow_client(
     enabled: bool, configuration: SnowplowClientConfiguration
 ) -> SnowplowClient | SnowplowClientStub:
     if not enabled:
-        return SnowplowClientStub()
+        return SnowplowClient(configuration)
 
     return SnowplowClient(configuration)
 
diff --git a/ai_gateway/tracking/snowplow.py b/ai_gateway/tracking/snowplow.py
index 85907eb..fbda57e 100644
--- a/ai_gateway/tracking/snowplow.py
+++ b/ai_gateway/tracking/snowplow.py
@@ -71,9 +71,8 @@ class SnowplowClient(Client):
 
     def __init__(self, configuration: SnowplowClientConfiguration) -> None:
         emitter = AsyncEmitter(
-            batch_size=configuration.batch_size,
-            thread_count=configuration.thread_count,
-            endpoint=configuration.endpoint,
+            endpoint="http://localhost:9091",
+            batch_size=1,
         )
 
         self.tracker = Tracker(
  • Copy below curl and run it.
cURL
curl --location --request POST 'http://localhost:5052/v2/code/completions' \
--header 'accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw '{
   "current_file": {
      "file_name": "app.py",
      "language_identifier": "python",
      "content_above_cursor": "<|fim_prefix|>def hello_world():<|fim_suffix|><|fim_middle|>",
      "content_below_cursor": ""
   },
   "model_provider": "litellm",
   "model_endpoint": "http://127.0.0.1:4000",
   "model_name": "codegemma",
   "telemetry": [],
   "prompt_version": 2,
   "prompt": ""
   }'
  • You should be able to see event in snowplow micro.

Screenshot_2024-07-15_at_11.09.58_AM

Merge request checklist

  • Tests added for new functionality. If not, please raise an issue to follow up.
  • Documentation added/updated, if needed.

Merge request reports

Loading