|
1 | 1 | """PII redaction for MCPCat logs.""" |
2 | 2 |
|
3 | | -from typing import Any, TYPE_CHECKING |
4 | | - |
5 | | -from datafog import DataFog |
6 | | - |
| 3 | +from typing import Any, TYPE_CHECKING, Callable, Set |
7 | 4 | if TYPE_CHECKING: |
8 | 5 | from mcpcat.types import Event, UnredactedEvent |
9 | 6 |
|
10 | 7 |
|
11 | | -def defaultRedactor(text: str) -> str: |
12 | | - """Default redactor function for sensitive information.""" |
13 | | - # Basic implementation - can be enhanced with actual redaction logic |
14 | | - return text |
| 8 | +# Set of field names that should be protected from redaction. |
| 9 | +# These fields contain system-level identifiers and metadata that |
| 10 | +# need to be preserved for analytics tracking. |
| 11 | +PROTECTED_FIELDS: Set[str] = { |
| 12 | + 'session_id', |
| 13 | + 'id', |
| 14 | + 'project_id', |
| 15 | + 'server', |
| 16 | + 'identify_actor_given_id', |
| 17 | + 'identify_actor_name', |
| 18 | + 'identify_data', |
| 19 | + 'resource_name', |
| 20 | + 'event_type', |
| 21 | + 'actor_id' |
| 22 | +} |
| 23 | + |
| 24 | + |
| 25 | +def redact_strings_in_object( |
| 26 | + obj: Any, |
| 27 | + redact_fn: Callable[[str], str], |
| 28 | + path: str = '', |
| 29 | + is_protected: bool = False |
| 30 | +) -> Any: |
| 31 | + """ |
| 32 | + Recursively applies a redaction function to all string values in an object. |
| 33 | + This ensures that sensitive information is removed from all string fields |
| 34 | + before events are sent to the analytics service. |
| 35 | + |
| 36 | + Args: |
| 37 | + obj: The object to redact strings from |
| 38 | + redact_fn: The redaction function to apply to each string |
| 39 | + path: The current path in the object tree (used to check protected fields) |
| 40 | + is_protected: Whether the current object/value is within a protected field |
| 41 | + |
| 42 | + Returns: |
| 43 | + A new object with all strings redacted |
| 44 | + """ |
| 45 | + if obj is None: |
| 46 | + return obj |
| 47 | + |
| 48 | + # Handle strings |
| 49 | + if isinstance(obj, str): |
| 50 | + # Don't redact if this field or any parent field is protected |
| 51 | + if is_protected: |
| 52 | + return obj |
| 53 | + return redact_fn(obj) |
| 54 | + |
| 55 | + # Handle arrays/lists |
| 56 | + if isinstance(obj, list): |
| 57 | + return [redact_strings_in_object(item, redact_fn, f"{path}[{index}]", is_protected) |
| 58 | + for index, item in enumerate(obj)] |
| 59 | + |
| 60 | + # Handle dictionaries/objects |
| 61 | + if isinstance(obj, dict): |
| 62 | + redacted_obj = {} |
| 63 | + |
| 64 | + for key, value in obj.items(): |
| 65 | + # Skip None values |
| 66 | + if value is None: |
| 67 | + continue |
| 68 | + |
| 69 | + # Build the path for nested fields |
| 70 | + field_path = f"{path}.{key}" if path else key |
| 71 | + # Check if this field is protected (only check at top level) |
| 72 | + is_field_protected = is_protected or (path == '' and key in PROTECTED_FIELDS) |
| 73 | + redacted_obj[key] = redact_strings_in_object(value, redact_fn, field_path, is_field_protected) |
| 74 | + |
| 75 | + return redacted_obj |
| 76 | + |
| 77 | + # For all other types (numbers, booleans, etc.), return as-is |
| 78 | + return obj |
15 | 79 |
|
16 | 80 |
|
17 | | -def redact_event_sync(event: "UnredactedEvent") -> "Event": |
18 | | - """Synchronous version to redact sensitive information from an event.""" |
19 | | - return event |
| 81 | +def redact_event(event: "UnredactedEvent", redact_fn: Callable[[str], str]) -> "Event": |
| 82 | + """ |
| 83 | + Applies the customer's redaction function to all string fields in an Event object. |
| 84 | + This is the main entry point for redacting sensitive information from events |
| 85 | + before they are sent to the analytics service. |
| 86 | + |
| 87 | + Args: |
| 88 | + event: The event to redact |
| 89 | + redact_fn: The customer's redaction function |
| 90 | + |
| 91 | + Returns: |
| 92 | + A new event object with all strings redacted |
| 93 | + """ |
| 94 | + return redact_strings_in_object(event, redact_fn, '', False) |
0 commit comments