generated from amazon-archives/__template_Apache-2.0
-
Notifications
You must be signed in to change notification settings - Fork 46
Open
Labels
enhancementNew feature or requestNew feature or request
Description
Parent Issue: #195
Objective
Implement comprehensive metrics and analytics collection for A2A agents to track performance, usage patterns, and system health.
Quick Links
- Strands Multi-Agent Documentation: https://strandsagents.com/latest/documentation/docs/user-guide/concepts/multi-agent/agent-to-agent/
- AWS Bedrock AgentCore A2A Runtime: https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/runtime-a2a.html
Metrics Categories
Performance Metrics
- Agent Response Time: Time to complete each skill (p50, p95, p99 percentiles)
- Agent Throughput: Requests per minute by agent and skill
- Error Rate: Percentage of failed requests by agent and error type
- Discovery Latency: Time to find agents in registry via semantic search
Business Metrics
- Bookings Per Hour: Total flight bookings completed
- Revenue Per Agent: Total transaction value by agent
- Average Booking Value: Mean trip cost
- Booking Success Rate: % of successful vs. cancelled bookings
System Metrics
- Registry Query Time: Time to search agents
- Inter-Agent Call Latency: Network latency between agents
- Database Query Time: SQLite query performance
- Memory Usage: Per-agent memory consumption
- CPU Usage: Per-agent CPU utilization
User Metrics
- User Search Attempts: Flight searches per user
- Booking Funnel: Users searching → creating plan → confirming booking
- Agent Usage Pattern: Which agents called how often
- Session Duration: Time from first request to final booking
Implementation Architecture
Metrics Collection Framework
from dataclasses import dataclass
from typing import Dict, Any
from datetime import datetime
import logging
import time
logger = logging.getLogger(__name__)
@dataclass
class AgentMetric:
"""Single metric data point"""
timestamp: datetime
agent_name: str
skill_name: str
metric_type: str # "latency", "error", "success"
value: float
tags: Dict[str, str] # Additional tags for filtering
class MetricsCollector:
"""Collect and store metrics"""
def record_skill_call(
self,
agent_name: str,
skill_name: str,
duration_ms: float,
success: bool,
error_type: Optional[str] = None
) -> None:
"""Record a skill invocation"""
metric = AgentMetric(
timestamp=datetime.utcnow(),
agent_name=agent_name,
skill_name=skill_name,
metric_type="latency" if success else "error",
value=duration_ms,
tags={
"success": str(success),
"error_type": error_type or "none"
}
)
self._store_metric(metric)
logger.info(f"Recorded metric: {agent_name}/{skill_name} = {duration_ms}ms")
def record_booking(
self,
agent_name: str,
booking_id: str,
amount: float,
status: str
) -> None:
"""Record a booking event"""
metric = AgentMetric(
timestamp=datetime.utcnow(),
agent_name=agent_name,
skill_name="booking",
metric_type="booking",
value=amount,
tags={"booking_id": booking_id, "status": status}
)
self._store_metric(metric)Metrics Storage
import sqlite3
from contextlib import contextmanager
class MetricsStore:
"""SQLite-based metrics storage"""
def __init__(self, db_path: str):
self.db_path = db_path
self._init_schema()
def _init_schema(self) -> None:
"""Create metrics tables"""
with self._get_connection() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY,
timestamp DATETIME,
agent_name TEXT,
skill_name TEXT,
metric_type TEXT,
value REAL,
tags JSON,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS agent_stats (
id INTEGER PRIMARY KEY,
agent_name TEXT,
total_calls INTEGER,
success_count INTEGER,
error_count INTEGER,
avg_latency_ms REAL,
last_updated DATETIME
)
""")
@contextmanager
def _get_connection(self):
conn = sqlite3.connect(self.db_path)
try:
yield conn
conn.commit()
finally:
conn.close()
def store_metric(self, metric: AgentMetric) -> None:
"""Store metric in database"""
with self._get_connection() as conn:
conn.execute("""
INSERT INTO metrics (timestamp, agent_name, skill_name,
metric_type, value, tags)
VALUES (?, ?, ?, ?, ?, json(?))
""", (
metric.timestamp,
metric.agent_name,
metric.skill_name,
metric.metric_type,
metric.value,
json.dumps(metric.tags)
))Metrics API Endpoints
from fastapi import FastAPI, Query
from typing import Optional
app = FastAPI()
@app.get("/api/metrics/agent/{agent_name}")
async def get_agent_metrics(
agent_name: str,
time_range: str = "1h", # 1h, 24h, 7d, 30d
metric_type: Optional[str] = None
) -> Dict[str, Any]:
"""Get metrics for specific agent"""
metrics = metrics_store.query_metrics(
agent_name=agent_name,
time_range=time_range,
metric_type=metric_type
)
return {
"agent": agent_name,
"time_range": time_range,
"metrics": metrics,
"summary": _calculate_summary(metrics)
}
@app.get("/api/metrics/summary")
async def get_all_metrics_summary() -> Dict[str, Any]:
"""Get system-wide metrics summary"""
return {
"total_agents": metrics_store.count_unique_agents(),
"total_calls": metrics_store.count_total_calls(),
"avg_latency_ms": metrics_store.get_avg_latency(),
"error_rate": metrics_store.get_error_rate(),
"bookings_today": metrics_store.get_booking_count("24h"),
"top_agents": metrics_store.get_top_agents(limit=5)
}
@app.get("/api/metrics/booking-funnel")
async def get_booking_funnel() -> Dict[str, Any]:
"""Get booking funnel metrics"""
return {
"searches": metrics_store.count_searches(),
"trip_plans_created": metrics_store.count_trip_plans(),
"bookings_confirmed": metrics_store.count_confirmed_bookings(),
"conversion_rates": {
"search_to_plan": metrics_store.get_conversion_rate("search", "plan"),
"plan_to_booking": metrics_store.get_conversion_rate("plan", "booking")
}
}Prometheus Metrics Export (Optional)
from prometheus_client import Counter, Histogram, Gauge
# Define Prometheus metrics
agent_calls_total = Counter(
'agent_calls_total',
'Total agent calls',
['agent_name', 'skill_name', 'status']
)
agent_call_duration_ms = Histogram(
'agent_call_duration_ms',
'Agent call duration in milliseconds',
['agent_name', 'skill_name'],
buckets=[10, 50, 100, 250, 500, 1000, 2500]
)
active_bookings = Gauge(
'active_bookings',
'Number of active bookings',
['agent_name']
)
# Usage
@app.middleware("http")
async def metrics_middleware(request, call_next):
start_time = time.time()
response = await call_next(request)
duration = (time.time() - start_time) * 1000
agent_name = extract_agent_name(request.url)
skill_name = extract_skill_name(request.url)
agent_calls_total.labels(
agent_name=agent_name,
skill_name=skill_name,
status=response.status_code
).inc()
agent_call_duration_ms.labels(
agent_name=agent_name,
skill_name=skill_name
).observe(duration)
return response
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint"""
from prometheus_client import generate_latest
return generate_latest()Analytics Dashboard Integration
Key Dashboard Panels
- Agent Performance: Response times, error rates, throughput
- Booking Metrics: Daily bookings, average value, success rates
- Agent Discovery: Popular agents, search queries, discovery success
- System Health: Registry uptime, inter-agent call latency
- Booking Funnel: Search → Plan → Confirmation conversion funnel
- Revenue: Total revenue, revenue per agent, trends
Time Series Data
- Hourly Aggregation: Metrics aggregated by hour for trends
- Daily Aggregation: Daily summaries for dashboards
- Monthly Aggregation: Monthly reports for business review
Data Retention Policy
- Raw Metrics: 7 days (detailed data)
- Hourly Aggregates: 90 days
- Daily Aggregates: 1 year (long-term trends)
- Monthly Summaries: Indefinite (business intelligence)
Alerting Rules
Performance Alerts
- Agent response time > 1 second (p95)
- Error rate > 5%
- Agent unavailable (no calls in 5 minutes)
Business Alerts
- Booking failure rate > 10%
- Revenue drop > 20% day-over-day
- Specific agent underperforming
Implementation Steps
Phase 1: Metrics Collection
- Implement MetricsCollector class
- Add timing instrumentation to agent calls
- Record success/failure for each skill
Phase 2: Metrics Storage
- Create SQLite schema for metrics
- Implement MetricsStore for persistence
- Add query methods for common reports
Phase 3: Metrics API
- Create REST endpoints for metrics
- Implement summary statistics
- Add booking funnel tracking
Phase 4: Dashboard Integration
- Connect dashboard to metrics API
- Build performance dashboard
- Add real-time metrics display
Acceptance Criteria
- Metrics collected for all agent skill calls
- Performance metrics stored in SQLite
- Business metrics (bookings, revenue) tracked
- REST API endpoints expose metrics
- Booking funnel metrics calculated
- Dashboard displays real-time metrics
- Metrics retention policy documented
- Alerting rules configured
- Performance impact < 5% overhead
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request