diff --git a/posthog/client.py b/posthog/client.py index 426bb24c..e5fc6537 100644 --- a/posthog/client.py +++ b/posthog/client.py @@ -269,6 +269,10 @@ def __init__( self.project_root = project_root + self.use_ai_ingestion_pipeline = os.environ.get( + "LLMA_INGESTION_PIPELINE", "false" + ).lower() in ("true", "1", "yes") + # personal_api_key: This should be a generated Personal API Key, private self.personal_api_key = personal_api_key if debug: @@ -316,6 +320,7 @@ def __init__( retries=max_retries, timeout=timeout, historical_migration=historical_migration, + use_ai_ingestion_pipeline=self.use_ai_ingestion_pipeline, ) self.consumers.append(consumer) diff --git a/posthog/consumer.py b/posthog/consumer.py index cbb77fd1..25b4e6b1 100644 --- a/posthog/consumer.py +++ b/posthog/consumer.py @@ -37,6 +37,7 @@ def __init__( retries=10, timeout=15, historical_migration=False, + use_ai_ingestion_pipeline=False, ): """Create a consumer thread.""" Thread.__init__(self) @@ -57,6 +58,7 @@ def __init__( self.retries = retries self.timeout = timeout self.historical_migration = historical_migration + self.use_ai_ingestion_pipeline = use_ai_ingestion_pipeline def run(self): """Runs the consumer.""" @@ -136,17 +138,82 @@ def fatal_exception(exc): # retry on all other errors (eg. network) return False + if self.use_ai_ingestion_pipeline: + ai_events = [] + non_ai_events = [] + + for item in batch: + event_name = item.get("event", "") + if event_name.startswith("$ai_"): + ai_events.append(item) + else: + non_ai_events.append(item) + + for ai_event in ai_events: + self._send_ai_event(ai_event, fatal_exception) + + if non_ai_events: + + @backoff.on_exception( + backoff.expo, + Exception, + max_tries=self.retries + 1, + giveup=fatal_exception, + ) + def send_batch_request(): + batch_post( + self.api_key, + self.host, + gzip=self.gzip, + timeout=self.timeout, + batch=non_ai_events, + historical_migration=self.historical_migration, + ) + + send_batch_request() + else: + @backoff.on_exception( + backoff.expo, + Exception, + max_tries=self.retries + 1, + giveup=fatal_exception, + ) + def send_request(): + batch_post( + self.api_key, + self.host, + gzip=self.gzip, + timeout=self.timeout, + batch=batch, + historical_migration=self.historical_migration, + ) + + send_request() + + def _send_ai_event(self, event, fatal_exception): + """Send a single AI event to the /i/v0/ai endpoint""" + from posthog.request import ai_post + from posthog.utils import extract_ai_blob_properties + + # Extract blob properties from the event + properties = event.get("properties", {}) + cleaned_properties, blobs = extract_ai_blob_properties(properties) + @backoff.on_exception( backoff.expo, Exception, max_tries=self.retries + 1, giveup=fatal_exception ) - def send_request(): - batch_post( + def send_ai_request(): + ai_post( self.api_key, self.host, gzip=self.gzip, timeout=self.timeout, - batch=batch, - historical_migration=self.historical_migration, + event_name=event.get("event"), + distinct_id=event.get("distinct_id"), + properties=cleaned_properties, + blobs=blobs, + timestamp=event.get("timestamp"), + uuid=event.get("uuid"), ) - send_request() + send_ai_request() diff --git a/posthog/request.py b/posthog/request.py index 7199f3a8..9b72c1c3 100644 --- a/posthog/request.py +++ b/posthog/request.py @@ -1,13 +1,14 @@ import json import logging import re +import secrets import socket from dataclasses import dataclass from datetime import date, datetime from gzip import GzipFile from io import BytesIO from typing import Any, List, Optional, Tuple, Union - +from uuid import uuid4 import requests from dateutil.tz import tzutc @@ -318,3 +319,166 @@ def default(self, obj: Any): return obj.isoformat() return json.JSONEncoder.default(self, obj) + + +def build_ai_multipart_request( + event_name: str, + distinct_id: str, + properties: dict[str, Any], + blobs: dict[str, Any], + timestamp: Optional[str] = None, + event_uuid: Optional[str] = None, +) -> tuple[bytes, str]: + """ + Build a multipart/form-data request body for AI events. + + Args: + event_name: The event name (e.g., "$ai_generation") + distinct_id: The distinct ID for the event + properties: Event properties (without blob properties) + blobs: Dictionary of blob properties to include as separate parts + timestamp: Optional timestamp for the event + event_uuid: Optional UUID for the event + + Returns: + Tuple of (body_bytes, boundary) for the multipart request + + Format follows the /i/v0/ai endpoint spec: + Part 1: "event" - JSON with {uuid, event, distinct_id, timestamp} + Part 2: "event.properties" - JSON with non-blob properties + Part 3+: "event.properties.$ai_input" etc. - Blob data as JSON + """ + # Generate a random boundary that's unlikely to appear in the data + boundary = "----WebKitFormBoundary" + secrets.token_hex(16) + + # Ensure we have a UUID + if event_uuid is None: + event_uuid = str(uuid4()) + + # Build the event part + event_data = { + "uuid": event_uuid, + "event": event_name, + "distinct_id": distinct_id, + } + if timestamp is not None: + event_data["timestamp"] = timestamp + + # Build multipart body + parts = [] + + # Part 1: event + parts.append(f"--{boundary}\r\n".encode()) + parts.append(b'Content-Disposition: form-data; name="event"\r\n') + parts.append(b"Content-Type: application/json\r\n\r\n") + parts.append(json.dumps(event_data, cls=DatetimeSerializer).encode("utf-8")) + parts.append(b"\r\n") + + # Part 2: event.properties + parts.append(f"--{boundary}\r\n".encode()) + parts.append(b'Content-Disposition: form-data; name="event.properties"\r\n') + parts.append(b"Content-Type: application/json\r\n\r\n") + parts.append(json.dumps(properties, cls=DatetimeSerializer).encode("utf-8")) + parts.append(b"\r\n") + + # Part 3+: blob parts + for blob_name, blob_value in blobs.items(): + parts.append(f"--{boundary}\r\n".encode()) + parts.append( + f'Content-Disposition: form-data; name="event.properties.{blob_name}"\r\n'.encode() + ) + parts.append(b"Content-Type: application/json\r\n\r\n") + parts.append(json.dumps(blob_value, cls=DatetimeSerializer).encode("utf-8")) + parts.append(b"\r\n") + + # Final boundary + parts.append(f"--{boundary}--\r\n".encode()) + + # Combine all parts + body = b"".join(parts) + + return body, boundary + + +def ai_post( + api_key: str, + host: Optional[str] = None, + gzip: bool = False, + timeout: int = 15, + **kwargs, +) -> requests.Response: + """ + Post an AI event to the /i/v0/ai endpoint using multipart/form-data. + + Args: + api_key: The PostHog API key + host: The host to post to + gzip: Whether to gzip compress the request + timeout: Request timeout in seconds + **kwargs: Event parameters including event_name, distinct_id, properties, blobs, etc. + + Returns: + The response from the server + + Raises: + APIError: If the request fails + """ + log = logging.getLogger("posthog") + + # Extract event parameters + event_name = kwargs.get("event_name") + distinct_id = kwargs.get("distinct_id") + properties = kwargs.get("properties", {}) + blobs = kwargs.get("blobs", {}) + timestamp = kwargs.get("timestamp") + event_uuid = kwargs.get("uuid") + + # Build multipart request + body, boundary = build_ai_multipart_request( + event_name=event_name, + distinct_id=distinct_id, + properties=properties, + blobs=blobs, + timestamp=timestamp, + event_uuid=event_uuid, + ) + + # Optionally gzip compress the body if enabled and body is large enough + # Spec recommends compression for requests > 10KB + data = body + headers = { + "Content-Type": f"multipart/form-data; boundary={boundary}", + "Authorization": f"Bearer {api_key}", + "User-Agent": USER_AGENT, + } + + if gzip or len(body) > 10 * 1024: # Compress if gzip enabled or body > 10KB + headers["Content-Encoding"] = "gzip" + buf = BytesIO() + with GzipFile(fileobj=buf, mode="w") as gz: + gz.write(body) + data = buf.getvalue() + log.debug("Compressed AI event from %d bytes to %d bytes", len(body), len(data)) + + url = remove_trailing_slash(host or DEFAULT_HOST) + "/i/v0/ai" + log.debug("Posting AI event to %s", url) + log.debug( + "Event: %s, Distinct ID: %s, Blobs: %s", + event_name, + distinct_id, + list(blobs.keys()), + ) + + res = _session.post(url, data=data, headers=headers, timeout=timeout) + + if res.status_code == 200: + log.debug("AI event uploaded successfully") + return res + + # Handle errors + try: + payload = res.json() + log.debug("Received error response: %s", payload) + raise APIError(res.status_code, payload.get("detail", "Unknown error")) + except (KeyError, ValueError): + raise APIError(res.status_code, res.text) diff --git a/posthog/test/ai/test_sanitization.py b/posthog/test/ai/test_sanitization.py index 24d1f2ea..cddf760a 100644 --- a/posthog/test/ai/test_sanitization.py +++ b/posthog/test/ai/test_sanitization.py @@ -1,3 +1,4 @@ +import re import os import unittest @@ -12,6 +13,7 @@ is_raw_base64, REDACTED_IMAGE_PLACEHOLDER, ) +from posthog.request import build_ai_multipart_request class TestSanitization(unittest.TestCase): @@ -333,6 +335,213 @@ def test_sanitize_handles_single_message(self): class TestAIMultipartRequest(unittest.TestCase): + """Tests for building AI multipart requests.""" + + def test_build_basic_multipart_request(self): + """Test building a basic multipart request with one blob.""" + event_name = "$ai_generation" + distinct_id = "test_user" + properties = {"$ai_model": "gpt-4", "$ai_provider": "openai"} + blobs = {"$ai_input": [{"role": "user", "content": "test message"}]} + timestamp = "2024-01-15T10:30:00Z" + event_uuid = "test-uuid-123" + + body, boundary = build_ai_multipart_request( + event_name=event_name, + distinct_id=distinct_id, + properties=properties, + blobs=blobs, + timestamp=timestamp, + event_uuid=event_uuid, + ) + + # Verify body is bytes + assert isinstance(body, bytes) + assert isinstance(boundary, str) + + # Decode body for inspection + body_str = body.decode("utf-8") + + # Verify boundary format + assert boundary.startswith("----WebKitFormBoundary") + assert ( + len(boundary) == 54 + ) # "----WebKitFormBoundary" (22 chars) + token_hex(16) (32 chars) + + # Verify all parts are present + assert f"--{boundary}" in body_str + assert 'name="event"' in body_str + assert 'name="event.properties"' in body_str + assert 'name="event.properties.$ai_input"' in body_str + + # Verify event part contains expected data + assert '"event": "$ai_generation"' in body_str + assert '"distinct_id": "test_user"' in body_str + assert '"uuid": "test-uuid-123"' in body_str + assert '"timestamp": "2024-01-15T10:30:00Z"' in body_str + + # Verify properties part + assert '"$ai_model": "gpt-4"' in body_str + assert '"$ai_provider": "openai"' in body_str + + # Verify blob part + assert '"role": "user"' in body_str + assert '"content": "test message"' in body_str + + # Verify final boundary + assert f"--{boundary}--" in body_str + + def test_build_multipart_with_multiple_blobs(self): + """Test building a multipart request with multiple blobs.""" + event_name = "$ai_generation" + distinct_id = "test_user" + properties = {"$ai_model": "gpt-4"} + blobs = { + "$ai_input": [{"role": "user", "content": "input"}], + "$ai_output_choices": [{"role": "assistant", "content": "output"}], + } + + body, boundary = build_ai_multipart_request( + event_name=event_name, + distinct_id=distinct_id, + properties=properties, + blobs=blobs, + ) + + body_str = body.decode("utf-8") + + # Verify both blob parts are present + assert 'name="event.properties.$ai_input"' in body_str + assert 'name="event.properties.$ai_output_choices"' in body_str + assert '"content": "input"' in body_str + assert '"content": "output"' in body_str + + def test_build_multipart_no_blobs(self): + """Test building a multipart request with no blobs.""" + event_name = "$ai_generation" + distinct_id = "test_user" + properties = {"$ai_model": "gpt-4"} + blobs = {} + + body, boundary = build_ai_multipart_request( + event_name=event_name, + distinct_id=distinct_id, + properties=properties, + blobs=blobs, + ) + + body_str = body.decode("utf-8") + + # Should still have event and properties parts + assert 'name="event"' in body_str + assert 'name="event.properties"' in body_str + + # Should not have any blob parts + assert 'name="event.properties.$ai_input"' not in body_str + assert 'name="event.properties.$ai_output_choices"' not in body_str + + def test_build_multipart_auto_generates_uuid(self): + """Test that UUID is auto-generated if not provided.""" + event_name = "$ai_generation" + distinct_id = "test_user" + properties = {} + blobs = {} + + body, boundary = build_ai_multipart_request( + event_name=event_name, + distinct_id=distinct_id, + properties=properties, + blobs=blobs, + event_uuid=None, # Don't provide UUID + ) + + body_str = body.decode("utf-8") + + # Should have a UUID in the event part + assert '"uuid":' in body_str + + # Extract and verify it's a valid UUID format (basic check) + uuid_pattern = r'"uuid":\s*"([0-9a-f-]+)"' + match = re.search(uuid_pattern, body_str) + assert match is not None + uuid_str = match.group(1) + assert len(uuid_str) == 36 # Standard UUID string length + + def test_build_multipart_without_timestamp(self): + """Test building request without timestamp.""" + event_name = "$ai_generation" + distinct_id = "test_user" + properties = {} + blobs = {} + + body, boundary = build_ai_multipart_request( + event_name=event_name, + distinct_id=distinct_id, + properties=properties, + blobs=blobs, + timestamp=None, + ) + + body_str = body.decode("utf-8") + + # Should not have timestamp in event part + assert '"timestamp"' not in body_str + + def test_build_multipart_content_types(self): + """Test that all parts have correct Content-Type headers.""" + event_name = "$ai_generation" + distinct_id = "test_user" + properties = {"$ai_model": "gpt-4"} + blobs = {"$ai_input": [{"role": "user", "content": "test"}]} + + body, boundary = build_ai_multipart_request( + event_name=event_name, + distinct_id=distinct_id, + properties=properties, + blobs=blobs, + ) + + body_str = body.decode("utf-8") + + # All parts should have application/json Content-Type + parts = body_str.split(f"--{boundary}") + for part in parts: + if 'name="' in part: # Skip empty parts + assert "Content-Type: application/json" in part + + def test_build_multipart_complex_nested_data(self): + """Test with complex nested JSON structures in blobs.""" + event_name = "$ai_generation" + distinct_id = "test_user" + properties = {"$ai_model": "gpt-4"} + blobs = { + "$ai_input": [ + { + "role": "user", + "content": [ + {"type": "text", "text": "What's in this image?"}, + { + "type": "image_url", + "image_url": {"url": "https://example.com/image.jpg"}, + }, + ], + } + ] + } + + body, boundary = build_ai_multipart_request( + event_name=event_name, + distinct_id=distinct_id, + properties=properties, + blobs=blobs, + ) + + body_str = body.decode("utf-8") + + # Verify nested structure is properly encoded + assert '"type": "text"' in body_str + assert '"type": "image_url"' in body_str + assert "https://example.com/image.jpg" in body_str """Test that _INTERNAL_LLMA_MULTIMODAL environment variable controls sanitization.""" def tearDown(self): diff --git a/posthog/test/test_utils.py b/posthog/test/test_utils.py index 6474c519..fe2821e0 100644 --- a/posthog/test/test_utils.py +++ b/posthog/test/test_utils.py @@ -179,6 +179,75 @@ class TestDataClass: }, } + def test_extract_ai_blob_properties_with_both_blobs(self): + """Test extracting both $ai_input and $ai_output_choices""" + properties = { + "$ai_input": [{"role": "user", "content": "test"}], + "$ai_output_choices": [{"role": "assistant", "content": "response"}], + "$ai_model": "gpt-4", + "$ai_provider": "openai", + } + + cleaned, blobs = utils.extract_ai_blob_properties(properties) + + assert cleaned == { + "$ai_model": "gpt-4", + "$ai_provider": "openai", + } + assert blobs == { + "$ai_input": [{"role": "user", "content": "test"}], + "$ai_output_choices": [{"role": "assistant", "content": "response"}], + } + + def test_extract_ai_blob_properties_with_only_input(self): + """Test extracting only $ai_input""" + properties = { + "$ai_input": [{"role": "user", "content": "test"}], + "$ai_model": "gpt-4", + } + + cleaned, blobs = utils.extract_ai_blob_properties(properties) + + assert cleaned == {"$ai_model": "gpt-4"} + assert blobs == {"$ai_input": [{"role": "user", "content": "test"}]} + + def test_extract_ai_blob_properties_with_none_values(self): + """Test that None blob values are not extracted""" + properties = { + "$ai_input": None, + "$ai_output_choices": [{"role": "assistant", "content": "response"}], + "$ai_model": "gpt-4", + } + + cleaned, blobs = utils.extract_ai_blob_properties(properties) + + assert cleaned == {"$ai_model": "gpt-4"} + assert blobs == { + "$ai_output_choices": [{"role": "assistant", "content": "response"}] + } + + def test_extract_ai_blob_properties_no_blobs(self): + """Test extraction when no blob properties are present""" + properties = { + "$ai_model": "gpt-4", + "$ai_provider": "openai", + "$ai_tokens": 100, + } + + cleaned, blobs = utils.extract_ai_blob_properties(properties) + + assert cleaned == properties + assert blobs == {} + + def test_extract_ai_blob_properties_empty_dict(self): + """Test extraction with empty properties dict""" + properties = {} + + cleaned, blobs = utils.extract_ai_blob_properties(properties) + + assert cleaned == {} + assert blobs == {} + class TestFlagCache(unittest.TestCase): def setUp(self): diff --git a/posthog/utils.py b/posthog/utils.py index 37f4a136..22f0fe6f 100644 --- a/posthog/utils.py +++ b/posthog/utils.py @@ -517,3 +517,41 @@ def system_context() -> dict[str, Any]: "$os": os_name, "$os_version": os_version, } + + +def extract_ai_blob_properties( + properties: dict[str, Any], +) -> tuple[dict[str, Any], dict[str, Any]]: + """ + Extract AI blob properties from event properties for multipart upload. + + Extracts $ai_input and $ai_output_choices from properties and returns + a tuple of (cleaned_properties, blobs_dict). + + Args: + properties: Event properties dictionary + + Returns: + Tuple of (cleaned_properties without blobs, blobs_dict with extracted blobs) + + Examples: + >>> props = {"$ai_input": [...], "$ai_model": "gpt-4", "$ai_output_choices": [...]} + >>> cleaned, blobs = extract_ai_blob_properties(props) + >>> # cleaned = {"$ai_model": "gpt-4"} + >>> # blobs = {"$ai_input": [...], "$ai_output_choices": [...]} + """ + # Properties to extract as blobs + blob_property_names = {"$ai_input", "$ai_output_choices"} + + # Create a copy of properties without the blob properties + cleaned_properties = { + k: v for k, v in properties.items() if k not in blob_property_names + } + + # Extract blob properties that are present and not None + blobs = {} + for prop_name in blob_property_names: + if prop_name in properties and properties[prop_name] is not None: + blobs[prop_name] = properties[prop_name] + + return cleaned_properties, blobs