Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 163 additions & 0 deletions AUTH_RACE_CONDITION_FIX.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# Authentication Race Condition Fix

## Problem Summary

Some users were experiencing an issue where AgentOps would show a session URL in the console, but no data would actually reach the backend. The session appeared to be created locally, but spans/events were not being transmitted successfully.

## Root Cause Analysis

The issue was caused by an **authentication race condition** in the session initialization flow:

1. **Asynchronous Authentication**: When `agentops.init()` is called, authentication happens asynchronously in a background thread to avoid blocking the main application.

2. **Immediate Session Creation**: The session/trace is created immediately with a temporary project ID, and the session URL is logged to the console right away.

3. **Export Failures**: If spans are exported before authentication completes, they fail because:
- No JWT token is available yet for authorization
- The project ID is still set to "temporary"
- The exporter would fail silently or with minimal logging

4. **Silent Failures**: The default log level was initially set to `CRITICAL`, causing most error messages to be suppressed, making the issue difficult to diagnose.

## Timeline of Events

```
1. User calls agentops.init(api_key="...")
2. Session URL is logged immediately (e.g., "Session Replay: https://app.agentops.ai/sessions?trace_id=...")
3. Authentication starts in background thread
4. User's code starts generating spans/events
5. Spans try to export but fail (no JWT yet)
6. Authentication completes 1-3 seconds later
7. New spans work, but initial spans are lost
```

## The Fix

### 1. Added Authentication Synchronization

Added an event-based mechanism to track authentication completion:

```python
# In Client class
_auth_completed = threading.Event() # Signals when auth is done

def wait_for_auth(self, timeout: float = 5.0) -> bool:
"""Wait for authentication to complete."""
return self._auth_completed.wait(timeout)
```

### 2. New Configuration Options

Added two new configuration parameters:

- `wait_for_auth` (default: `True`): Whether to wait for authentication before allowing span exports
- `auth_timeout` (default: `5.0`): Maximum seconds to wait for authentication

These can be configured via:
- Parameters to `agentops.init()`
- Environment variables: `AGENTOPS_WAIT_FOR_AUTH`, `AGENTOPS_AUTH_TIMEOUT`

### 3. Improved Exporter Logic

The `AuthenticatedOTLPExporter` now:
- Checks if JWT is available before attempting export
- Returns `FAILURE` (for retry) instead of attempting unauthorized requests
- Provides better logging for debugging

### 4. Better Error Visibility

- Improved logging throughout the authentication flow
- More descriptive error messages when exports fail
- Debug logs show authentication progress

## Usage Examples

### Default Behavior (Recommended)

```python
import agentops

# Will wait up to 5 seconds for auth to complete
agentops.init(api_key="your-api-key")
# Spans created here will be exported successfully
```

### Custom Timeout

```python
# Wait up to 10 seconds for auth
agentops.init(
api_key="your-api-key",
auth_timeout=10.0
)
```

### Disable Waiting (Previous Behavior)

```python
# Don't wait for auth (may lose initial spans)
agentops.init(
api_key="your-api-key",
wait_for_auth=False
)
```

### Environment Variables

```bash
export AGENTOPS_WAIT_FOR_AUTH=true
export AGENTOPS_AUTH_TIMEOUT=10.0
python your_script.py
```

## Backward Compatibility

The fix is **backward compatible**:
- By default, `wait_for_auth=True` ensures spans are not lost
- Users can opt-out by setting `wait_for_auth=False` to restore previous behavior
- The wait is capped by timeout to prevent indefinite blocking
- If authentication fails, the system continues without blocking

## Testing the Fix

Run the test script to verify the fix:

```bash
python test_auth_fix.py
```

This will test:
1. With wait_for_auth enabled (default)
2. With wait_for_auth disabled
3. Without an API key

## Troubleshooting

If you're still experiencing issues:

1. **Enable Debug Logging**:
```python
agentops.init(api_key="...", log_level="DEBUG")
```

2. **Check Network Connectivity**:
- Verify you can reach `https://api.agentops.ai`
- Check for proxy/firewall issues

3. **Verify API Key**:
- Ensure your API key is valid
- Check for typos or extra spaces

4. **Increase Timeout**:
```python
agentops.init(api_key="...", auth_timeout=10.0)
```

## Impact

This fix resolves the issue where:
- Users see a session URL but no data in the dashboard
- Initial spans/events are lost during session startup
- Authentication failures are silent and hard to debug

The solution ensures reliable data transmission while maintaining non-blocking initialization for better user experience.
12 changes: 12 additions & 0 deletions agentops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ def init(
fail_safe: Optional[bool] = None,
log_session_replay_url: Optional[bool] = None,
exporter_endpoint: Optional[str] = None,
wait_for_auth: Optional[bool] = None,
auth_timeout: Optional[float] = None,
**kwargs,
):
"""
Expand Down Expand Up @@ -121,6 +123,10 @@ def init(
log_session_replay_url (bool): Whether to log session replay URLs to the console. Defaults to True.
exporter_endpoint (str, optional): Endpoint for the exporter. If none is provided, key will
be read from the AGENTOPS_EXPORTER_ENDPOINT environment variable.
wait_for_auth (bool, optional): Whether to wait for authentication to complete before allowing
span exports. Defaults to True. Can be set via AGENTOPS_WAIT_FOR_AUTH environment variable.
auth_timeout (float, optional): Maximum time in seconds to wait for authentication to complete.
Defaults to 5.0. Can be set via AGENTOPS_AUTH_TIMEOUT environment variable.
**kwargs: Additional configuration parameters to be passed to the client.
"""
global _client
Expand Down Expand Up @@ -163,6 +169,8 @@ def init(
"fail_safe": fail_safe,
"log_session_replay_url": log_session_replay_url,
"exporter_endpoint": exporter_endpoint,
"wait_for_auth": wait_for_auth,
"auth_timeout": auth_timeout,
**kwargs,
}

Expand Down Expand Up @@ -193,6 +201,8 @@ def configure(**kwargs):
- exporter: Custom span exporter for OpenTelemetry trace data
- processor: Custom span processor for OpenTelemetry trace data
- exporter_endpoint: Endpoint for the exporter
- wait_for_auth: Whether to wait for authentication to complete
- auth_timeout: Maximum time to wait for authentication
"""
global _client

Expand All @@ -213,6 +223,8 @@ def configure(**kwargs):
"exporter",
"processor",
"exporter_endpoint",
"wait_for_auth",
"auth_timeout",
}

# Check for invalid parameters
Expand Down
40 changes: 38 additions & 2 deletions agentops/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,20 @@ class Client:
_project_id: Optional[str] = None
_auth_lock = threading.Lock()
_auth_task: Optional[asyncio.Task] = None
_auth_completed = threading.Event() # Add event to signal auth completion

def __new__(cls, *args: Any, **kwargs: Any) -> "Client":
if cls.__instance is None:
cls.__instance = super(Client, cls).__new__(cls)
cls.__instance = super().__new__(cls)
cls.__instance._initialized = False
# Initialize instance variables that should only be set once per instance
cls.__instance._init_trace_context = None
cls.__instance._legacy_session_for_init_trace = None
cls.__instance._auth_token = None
cls.__instance._project_id = None
cls.__instance._auth_lock = threading.Lock()
cls.__instance._auth_task = None
cls.__instance._auth_completed = threading.Event()
return cls.__instance

def __init__(self):
Expand Down Expand Up @@ -108,11 +111,15 @@ async def _fetch_auth_async(self, api_key: str) -> Optional[dict]:
tracer.update_config(tracing_config)

logger.debug("Successfully fetched authentication token asynchronously")
self._auth_completed.set() # Signal that auth is complete
return response
else:
logger.debug("Authentication failed - will continue without authentication")
self._auth_completed.set() # Signal completion even on failure
return None
except Exception:
except Exception as e:
logger.debug(f"Authentication exception: {e}")
self._auth_completed.set() # Signal completion even on exception
return None

def _start_auth_task(self, api_key: str):
Expand Down Expand Up @@ -144,11 +151,31 @@ def run_async_auth():
auth_thread = threading.Thread(target=run_async_auth, daemon=True)
auth_thread.start()

def wait_for_auth(self, timeout: float = 5.0) -> bool:
"""
Wait for authentication to complete.

Args:
timeout: Maximum time to wait in seconds

Returns:
True if authentication completed within timeout, False otherwise
"""
if not self.config.api_key:
# No API key, auth won't happen
return True

return self._auth_completed.wait(timeout)

def init(self, **kwargs: Any) -> None: # Return type updated to None
# Recreate the Config object to parse environment variables at the time of initialization
# This allows re-init with new env vars if needed, though true singletons usually init once.
self.config = Config()
self.configure(**kwargs)

# Use config values for wait_for_auth, allowing override from kwargs
wait_for_auth_completion = kwargs.get("wait_for_auth", self.config.wait_for_auth)
wait_timeout = kwargs.get("auth_timeout", self.config.auth_timeout)

# Only treat as re-initialization if a different non-None API key is explicitly provided
provided_api_key = kwargs.get("api_key")
Expand Down Expand Up @@ -198,8 +225,17 @@ def jwt_provider():
# Start authentication task only if we have an API key
if self.config.api_key:
self._start_auth_task(self.config.api_key)

# Optionally wait for authentication to complete
if wait_for_auth_completion:
logger.debug(f"Waiting up to {wait_timeout}s for authentication to complete...")
if self.wait_for_auth(wait_timeout):
logger.debug("Authentication completed successfully")
else:
logger.warning(f"Authentication did not complete within {wait_timeout}s timeout")
else:
logger.debug("No API key available - skipping authentication task")
self._auth_completed.set() # Set immediately since no auth will happen

global _atexit_registered
if not _atexit_registered:
Expand Down
32 changes: 23 additions & 9 deletions agentops/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,24 @@ class Config:
exporter_endpoint: Optional[str] = field(
default_factory=lambda: os.getenv("AGENTOPS_EXPORTER_ENDPOINT", "https://otlp.agentops.ai/v1/traces"),
metadata={
"description": "Endpoint for the span exporter. When not provided, the default AgentOps endpoint will be used."
"description": "Endpoint for the exporter. If none is provided, key will be read from the AGENTOPS_EXPORTER_ENDPOINT environment variable."
},
)

exporter: Optional[SpanExporter] = field(
default_factory=lambda: None, metadata={"description": "Custom span exporter for OpenTelemetry trace data"}

wait_for_auth: bool = field(
default_factory=lambda: get_env_bool("AGENTOPS_WAIT_FOR_AUTH", True),
metadata={"description": "Whether to wait for authentication to complete before starting spans"},
)

processor: Optional[SpanProcessor] = field(
default_factory=lambda: None, metadata={"description": "Custom span processor for OpenTelemetry trace data"}

auth_timeout: float = field(
default_factory=lambda: float(os.getenv("AGENTOPS_AUTH_TIMEOUT", "5.0")),
metadata={"description": "Maximum time in seconds to wait for authentication to complete"},
)

# Optional custom exporter and processor for OpenTelemetry
exporter: Optional[SpanExporter] = field(default=None, init=False)
processor: Optional[SpanProcessor] = field(default=None, init=False)

def configure(
self,
api_key: Optional[str] = None,
Expand All @@ -157,6 +163,8 @@ def configure(
exporter: Optional[SpanExporter] = None,
processor: Optional[SpanProcessor] = None,
exporter_endpoint: Optional[str] = None,
wait_for_auth: Optional[bool] = None,
auth_timeout: Optional[float] = None,
):
"""Configure settings from kwargs, validating where necessary"""
if api_key is not None:
Expand Down Expand Up @@ -237,6 +245,12 @@ def configure(

if exporter_endpoint is not None:
self.exporter_endpoint = exporter_endpoint

if wait_for_auth is not None:
self.wait_for_auth = wait_for_auth

if auth_timeout is not None:
self.auth_timeout = auth_timeout
# else:
# self.exporter_endpoint = self.endpoint

Expand All @@ -260,9 +274,9 @@ def dict(self):
"fail_safe": self.fail_safe,
"prefetch_jwt_token": self.prefetch_jwt_token,
"log_session_replay_url": self.log_session_replay_url,
"exporter": self.exporter,
"processor": self.processor,
"exporter_endpoint": self.exporter_endpoint,
"wait_for_auth": self.wait_for_auth,
"auth_timeout": self.auth_timeout,
}

def json(self):
Expand Down
Loading
Loading