Skip to content

Implement Agent-to-Agent Communication Workflow #198

@aarora79

Description

@aarora79

Parent Issue: #195

Quick Links

Objective

Demonstrate complete Agent-to-Agent (A2A) communication workflow with Travel Assistant discovering and collaborating with Flight Booking Agent.

Architecture

Agent Discovery Flow

  1. Travel Assistant queries A2A Registry for agents with tags: ["booking", "flight"]
  2. FAISS semantic search returns Flight Booking Agent metadata
  3. Travel Assistant extracts Flight Booking Agent endpoint and authentication info
  4. Travel Assistant establishes HTTP connection to Flight Booking Agent

Communication Protocol

  • Discovery: REST API to registry /api/v0.1/agents?query=flight+booking
  • Interaction: Direct REST calls between agents (A2A protocol)
  • Authentication: Optional JWT tokens (use agent API keys)
  • Content-Type: application/json

Implementation Steps

Phase 1: Service Discovery Client

Create HTTP client in Travel Assistant to query registry:

async def discover_agent(
    query: str,
    tags: List[str]
) -> Dict[str, Any]:
    """Discover agent via semantic search"""
    response = await registry_client.search_agents(
        query=query,
        tags=tags
    )
    return response[0] if response else None

Phase 2: Agent-to-Agent Communication

Create REST client in Travel Assistant:

async def call_booking_agent(
    booking_request: Dict[str, Any],
    agent_endpoint: str
) -> Dict[str, Any]:
    """Call Flight Booking Agent REST API"""
    async with aiohttp.ClientSession() as session:
        async with session.post(
            f"{agent_endpoint}/api/reserve-flight",
            json=booking_request
        ) as response:
            return await response.json()

Phase 3: Error Handling & Retry Logic

MAX_RETRIES = 3
RETRY_DELAY = 1

async def call_with_retry(
    func: Callable,
    *args,
    **kwargs
) -> Dict[str, Any]:
    """Call agent with exponential backoff retry"""
    for attempt in range(MAX_RETRIES):
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            if attempt == MAX_RETRIES - 1:
                raise
            await asyncio.sleep(RETRY_DELAY * (2 ** attempt))

Workflow Diagram

User Request
    ↓
[Travel Assistant Agent]
    ├─→ Query Registry: search_agents("flight booking")
    ├─→ Get Flight Booking Agent endpoint & metadata
    ├─→ Call check_availability()
    ├─→ Call reserve_flight()
    ├─→ Call confirm_booking()
    └─→ Return trip plan to user
         ↓
    [Flight Booking Agent]
         ├─→ Check seat availability
         ├─→ Reserve seat
         ├─→ Process payment
         └─→ Return confirmation

Request/Response Examples

Travel Assistant → Flight Booking Agent: Reserve Flight

Request:

{
  "flight_id": 1,
  "passengers": [
    {
      "name": "John Smith",
      "email": "john@example.com"
    }
  ],
  "payment_method": "credit_card"
}

Response:

{
  "success": true,
  "booking_number": "BK001",
  "confirmation_code": "CONF-12345",
  "total_price": 250.00,
  "status": "confirmed"
}

Integration Points

  1. Registry API (Port 5000)

    • GET /api/v0.1/agents?query=flight+booking
    • Returns: Agent metadata with endpoint, auth, skills
  2. Travel Assistant Agent (Port 8001)

    • Creates trip plan by coordinating with Flight Booking Agent
    • Implements retry logic for resilience
  3. Flight Booking Agent (Port 8002)

    • Exposes REST endpoints for reservation operations
    • Manages inventory and payment processing

Testing Strategy

Unit Tests

  • Test discovery client successfully parses agent metadata
  • Test retry logic with simulated failures
  • Test request/response serialization

Integration Tests

  • Deploy both agents in Docker
  • Test full workflow: discovery → availability check → booking
  • Verify error scenarios and recovery

End-to-End Tests

  • User requests trip plan
  • System discovers Flight Booking Agent
  • Makes booking request
  • Returns confirmation to user

Acceptance Criteria

  • Travel Assistant discovers Flight Booking Agent via semantic search
  • Agent-to-agent REST communication working
  • Request/response handling validated
  • Error handling with retry logic implemented
  • Timeout handling (default 30 seconds)
  • All interactions logged with trace IDs
  • End-to-end workflow tested successfully
  • Documentation of A2A communication patterns

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions