Skip to content

Implement A2A Agent Metrics & Analytics #201

@aarora79

Description

@aarora79

Parent Issue: #195

Objective

Implement comprehensive metrics and analytics collection for A2A agents to track performance, usage patterns, and system health.

Quick Links

Metrics Categories

Performance Metrics

  • Agent Response Time: Time to complete each skill (p50, p95, p99 percentiles)
  • Agent Throughput: Requests per minute by agent and skill
  • Error Rate: Percentage of failed requests by agent and error type
  • Discovery Latency: Time to find agents in registry via semantic search

Business Metrics

  • Bookings Per Hour: Total flight bookings completed
  • Revenue Per Agent: Total transaction value by agent
  • Average Booking Value: Mean trip cost
  • Booking Success Rate: % of successful vs. cancelled bookings

System Metrics

  • Registry Query Time: Time to search agents
  • Inter-Agent Call Latency: Network latency between agents
  • Database Query Time: SQLite query performance
  • Memory Usage: Per-agent memory consumption
  • CPU Usage: Per-agent CPU utilization

User Metrics

  • User Search Attempts: Flight searches per user
  • Booking Funnel: Users searching → creating plan → confirming booking
  • Agent Usage Pattern: Which agents called how often
  • Session Duration: Time from first request to final booking

Implementation Architecture

Metrics Collection Framework

from dataclasses import dataclass
from typing import Dict, Any
from datetime import datetime
import logging
import time

logger = logging.getLogger(__name__)

@dataclass
class AgentMetric:
    """Single metric data point"""
    timestamp: datetime
    agent_name: str
    skill_name: str
    metric_type: str  # "latency", "error", "success"
    value: float
    tags: Dict[str, str]  # Additional tags for filtering

class MetricsCollector:
    """Collect and store metrics"""
    
    def record_skill_call(
        self,
        agent_name: str,
        skill_name: str,
        duration_ms: float,
        success: bool,
        error_type: Optional[str] = None
    ) -> None:
        """Record a skill invocation"""
        metric = AgentMetric(
            timestamp=datetime.utcnow(),
            agent_name=agent_name,
            skill_name=skill_name,
            metric_type="latency" if success else "error",
            value=duration_ms,
            tags={
                "success": str(success),
                "error_type": error_type or "none"
            }
        )
        self._store_metric(metric)
        logger.info(f"Recorded metric: {agent_name}/{skill_name} = {duration_ms}ms")
    
    def record_booking(
        self,
        agent_name: str,
        booking_id: str,
        amount: float,
        status: str
    ) -> None:
        """Record a booking event"""
        metric = AgentMetric(
            timestamp=datetime.utcnow(),
            agent_name=agent_name,
            skill_name="booking",
            metric_type="booking",
            value=amount,
            tags={"booking_id": booking_id, "status": status}
        )
        self._store_metric(metric)

Metrics Storage

import sqlite3
from contextlib import contextmanager

class MetricsStore:
    """SQLite-based metrics storage"""
    
    def __init__(self, db_path: str):
        self.db_path = db_path
        self._init_schema()
    
    def _init_schema(self) -> None:
        """Create metrics tables"""
        with self._get_connection() as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS metrics (
                    id INTEGER PRIMARY KEY,
                    timestamp DATETIME,
                    agent_name TEXT,
                    skill_name TEXT,
                    metric_type TEXT,
                    value REAL,
                    tags JSON,
                    created_at DATETIME DEFAULT CURRENT_TIMESTAMP
                )
            """)
            conn.execute("""
                CREATE TABLE IF NOT EXISTS agent_stats (
                    id INTEGER PRIMARY KEY,
                    agent_name TEXT,
                    total_calls INTEGER,
                    success_count INTEGER,
                    error_count INTEGER,
                    avg_latency_ms REAL,
                    last_updated DATETIME
                )
            """)
    
    @contextmanager
    def _get_connection(self):
        conn = sqlite3.connect(self.db_path)
        try:
            yield conn
            conn.commit()
        finally:
            conn.close()
    
    def store_metric(self, metric: AgentMetric) -> None:
        """Store metric in database"""
        with self._get_connection() as conn:
            conn.execute("""
                INSERT INTO metrics (timestamp, agent_name, skill_name, 
                                    metric_type, value, tags)
                VALUES (?, ?, ?, ?, ?, json(?))
            """, (
                metric.timestamp,
                metric.agent_name,
                metric.skill_name,
                metric.metric_type,
                metric.value,
                json.dumps(metric.tags)
            ))

Metrics API Endpoints

from fastapi import FastAPI, Query
from typing import Optional

app = FastAPI()

@app.get("/api/metrics/agent/{agent_name}")
async def get_agent_metrics(
    agent_name: str,
    time_range: str = "1h",  # 1h, 24h, 7d, 30d
    metric_type: Optional[str] = None
) -> Dict[str, Any]:
    """Get metrics for specific agent"""
    metrics = metrics_store.query_metrics(
        agent_name=agent_name,
        time_range=time_range,
        metric_type=metric_type
    )
    return {
        "agent": agent_name,
        "time_range": time_range,
        "metrics": metrics,
        "summary": _calculate_summary(metrics)
    }

@app.get("/api/metrics/summary")
async def get_all_metrics_summary() -> Dict[str, Any]:
    """Get system-wide metrics summary"""
    return {
        "total_agents": metrics_store.count_unique_agents(),
        "total_calls": metrics_store.count_total_calls(),
        "avg_latency_ms": metrics_store.get_avg_latency(),
        "error_rate": metrics_store.get_error_rate(),
        "bookings_today": metrics_store.get_booking_count("24h"),
        "top_agents": metrics_store.get_top_agents(limit=5)
    }

@app.get("/api/metrics/booking-funnel")
async def get_booking_funnel() -> Dict[str, Any]:
    """Get booking funnel metrics"""
    return {
        "searches": metrics_store.count_searches(),
        "trip_plans_created": metrics_store.count_trip_plans(),
        "bookings_confirmed": metrics_store.count_confirmed_bookings(),
        "conversion_rates": {
            "search_to_plan": metrics_store.get_conversion_rate("search", "plan"),
            "plan_to_booking": metrics_store.get_conversion_rate("plan", "booking")
        }
    }

Prometheus Metrics Export (Optional)

from prometheus_client import Counter, Histogram, Gauge

# Define Prometheus metrics
agent_calls_total = Counter(
    'agent_calls_total',
    'Total agent calls',
    ['agent_name', 'skill_name', 'status']
)

agent_call_duration_ms = Histogram(
    'agent_call_duration_ms',
    'Agent call duration in milliseconds',
    ['agent_name', 'skill_name'],
    buckets=[10, 50, 100, 250, 500, 1000, 2500]
)

active_bookings = Gauge(
    'active_bookings',
    'Number of active bookings',
    ['agent_name']
)

# Usage
@app.middleware("http")
async def metrics_middleware(request, call_next):
    start_time = time.time()
    response = await call_next(request)
    duration = (time.time() - start_time) * 1000
    
    agent_name = extract_agent_name(request.url)
    skill_name = extract_skill_name(request.url)
    
    agent_calls_total.labels(
        agent_name=agent_name,
        skill_name=skill_name,
        status=response.status_code
    ).inc()
    
    agent_call_duration_ms.labels(
        agent_name=agent_name,
        skill_name=skill_name
    ).observe(duration)
    
    return response

@app.get("/metrics")
async def metrics():
    """Prometheus metrics endpoint"""
    from prometheus_client import generate_latest
    return generate_latest()

Analytics Dashboard Integration

Key Dashboard Panels

  1. Agent Performance: Response times, error rates, throughput
  2. Booking Metrics: Daily bookings, average value, success rates
  3. Agent Discovery: Popular agents, search queries, discovery success
  4. System Health: Registry uptime, inter-agent call latency
  5. Booking Funnel: Search → Plan → Confirmation conversion funnel
  6. Revenue: Total revenue, revenue per agent, trends

Time Series Data

  • Hourly Aggregation: Metrics aggregated by hour for trends
  • Daily Aggregation: Daily summaries for dashboards
  • Monthly Aggregation: Monthly reports for business review

Data Retention Policy

  • Raw Metrics: 7 days (detailed data)
  • Hourly Aggregates: 90 days
  • Daily Aggregates: 1 year (long-term trends)
  • Monthly Summaries: Indefinite (business intelligence)

Alerting Rules

Performance Alerts

  • Agent response time > 1 second (p95)
  • Error rate > 5%
  • Agent unavailable (no calls in 5 minutes)

Business Alerts

  • Booking failure rate > 10%
  • Revenue drop > 20% day-over-day
  • Specific agent underperforming

Implementation Steps

Phase 1: Metrics Collection

  • Implement MetricsCollector class
  • Add timing instrumentation to agent calls
  • Record success/failure for each skill

Phase 2: Metrics Storage

  • Create SQLite schema for metrics
  • Implement MetricsStore for persistence
  • Add query methods for common reports

Phase 3: Metrics API

  • Create REST endpoints for metrics
  • Implement summary statistics
  • Add booking funnel tracking

Phase 4: Dashboard Integration

  • Connect dashboard to metrics API
  • Build performance dashboard
  • Add real-time metrics display

Acceptance Criteria

  • Metrics collected for all agent skill calls
  • Performance metrics stored in SQLite
  • Business metrics (bookings, revenue) tracked
  • REST API endpoints expose metrics
  • Booking funnel metrics calculated
  • Dashboard displays real-time metrics
  • Metrics retention policy documented
  • Alerting rules configured
  • Performance impact < 5% overhead

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions