-
Notifications
You must be signed in to change notification settings - Fork 0
API Reference
Creator: Mahesh Vaijainthymala Krishnamoorthy (Mahesh Vaikri)
MAPLE provides a comprehensive, type-safe API for multi-agent communication. This reference covers all public APIs, data structures, and patterns you need to build sophisticated agent systems.
Current Version: 1.0.0
API Stability: Stable β
Test Coverage: 32/32 Tests Passed (100%) π
# Essential imports for most MAPLE applications
from maple import (
Agent, Message, Priority, Config,
Result, ResourceRequest, ResourceRange,
SecurityConfig, PerformanceConfig
)
# Advanced imports for specialized use cases
from maple.core.types import TypeValidator, Size, Duration
from maple.error import CircuitBreaker, retry, RetryOptions
from maple.resources import ResourceManager, ResourceNegotiator
from maple.security import LinkManager
from maple.communication import Stream, StreamOptions
The core agent class for creating autonomous agents.
class Agent:
def __init__(self, config: Config) -> None
def start(self) -> None
def stop(self) -> None
def send(self, message: Message) -> Result[str, Dict[str, Any]]
def request(self, message: Message, timeout: str = "30s") -> Result[Message, Dict[str, Any]]
def receive(self, timeout: Optional[str] = None) -> Result[Message, Dict[str, Any]]
def broadcast(self, recipients: List[str], message: Message) -> Dict[str, Result[str, Dict[str, Any]]]
def publish(self, topic: str, message: Message) -> Result[str, Dict[str, Any]]
def subscribe(self, topic: str) -> Result[None, Dict[str, Any]]
@dataclass
class Config:
agent_id: str
broker_url: str
security: Optional[SecurityConfig] = None
performance: Optional[PerformanceConfig] = None
metrics: Optional[MetricsConfig] = None
tracing: Optional[TracingConfig] = None
# Example
config = Config(
agent_id="my_agent",
broker_url="localhost:8080",
security=SecurityConfig(
auth_type="jwt",
credentials="my_token",
require_links=True
)
)
agent = Agent(config)
# Decorator-based handler registration
@agent.handler("TASK_REQUEST")
def handle_task(message: Message) -> Optional[Message]:
"""Handle incoming task requests."""
# Process the task
result = process_task(message.payload)
# Return response message
return Message(
message_type="TASK_RESPONSE",
payload={"result": result, "status": "completed"}
)
# Topic handler for publish-subscribe
@agent.topic_handler("sensor_data")
def handle_sensor_data(message: Message) -> None:
"""Handle sensor data publications."""
process_sensor_reading(message.payload)
# Stream handler for real-time data
@agent.stream_handler("live_feed")
def handle_live_data(message: Message) -> None:
"""Handle streaming data."""
update_dashboard(message.payload)
# Resource-aware sending
resource_request = ResourceRequest(
compute=ResourceRange(min=2, preferred=4),
memory=ResourceRange(min="1GB", preferred="2GB"),
priority="HIGH"
)
message = Message(
message_type="HEAVY_COMPUTATION",
payload={"data": large_dataset},
metadata={"resources": resource_request.to_dict()}
)
result = agent.send(message)
# Secure communication with links
link_result = agent.establish_link("target_agent")
if link_result.is_ok():
link_id = link_result.unwrap()
secure_result = agent.send_with_link(message, "target_agent")
# Filtered message receiving
def is_high_priority(msg):
return msg.priority == Priority.HIGH
high_priority_msg = agent.receive_filtered(
filter=is_high_priority,
timeout="10s"
)
class Message:
def __init__(
self,
message_type: str,
receiver: Optional[str] = None,
priority: Priority = Priority.MEDIUM,
payload: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
message_id: Optional[str] = None,
sender: Optional[str] = None,
timestamp: Optional[datetime] = None
) -> None
# Conversion methods
def to_dict(self) -> Dict[str, Any]
def to_json(self) -> str
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Message'
@classmethod
def from_json(cls, json_str: str) -> 'Message'
# Utility methods
def with_receiver(self, receiver: str) -> 'Message'
def with_link(self, link_id: str) -> 'Message'
def get_link_id(self) -> Optional[str]
# Fluent builder interface
message = Message.builder() \
.message_type("DATA_REQUEST") \
.receiver("data_service") \
.priority(Priority.HIGH) \
.payload({
"query": "temperature > 25",
"limit": 100
}) \
.correlation_id("req-123") \
.build()
# Error message
error_msg = Message.error(
error_type="VALIDATION_ERROR",
message="Invalid data format",
details={"field": "temperature", "expected": "number"},
receiver="sender_agent"
)
# Acknowledgment message
ack_msg = Message.ack(correlation_id="req-123")
# Resource request message
resource_msg = Message(
message_type="RESOURCE_REQUEST",
payload={
"resources": {
"compute": {"min": 2, "preferred": 4},
"memory": {"min": "1GB", "preferred": "2GB"}
}
}
)
MAPLE's powerful error handling mechanism.
class Result[T, E]:
@classmethod
def ok(cls, value: T) -> 'Result[T, E]'
@classmethod
def err(cls, error: E) -> 'Result[T, E]'
def is_ok(self) -> bool
def is_err(self) -> bool
def unwrap(self) -> T
def unwrap_or(self, default: T) -> T
def unwrap_err(self) -> E
def map(self, f: Callable[[T], U]) -> 'Result[U, E]'
def map_err(self, f: Callable[[E], F]) -> 'Result[T, F]'
def and_then(self, f: Callable[[T], 'Result[U, E]']) -> 'Result[U, E]'
def or_else(self, f: Callable[[E], 'Result[T, F]']) -> 'Result[T, F]'
# Basic usage
def divide(a: int, b: int) -> Result[float, str]:
if b == 0:
return Result.err("Division by zero")
return Result.ok(a / b)
result = divide(10, 2)
if result.is_ok():
print(f"Result: {result.unwrap()}")
else:
print(f"Error: {result.unwrap_err()}")
# Chaining operations
def process_data(data: str) -> Result[int, str]:
return parse_number(data) \
.and_then(lambda x: validate_range(x)) \
.and_then(lambda x: compute_result(x))
# Error mapping
result = risky_operation() \
.map_err(lambda e: f"Operation failed: {e}")
# Fallback values
value = computation() \
.unwrap_or(default_value)
@dataclass
class ResourceRequest:
compute: Optional[ResourceRange] = None
memory: Optional[ResourceRange] = None
bandwidth: Optional[ResourceRange] = None
time: Optional[TimeConstraint] = None
priority: str = "MEDIUM"
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ResourceRequest'
def to_dict(self) -> Dict[str, Any]
# Helper methods
ResourceRequest.Range(min=1, preferred=2, max=4)
ResourceRequest.TimeConstraint(deadline="30s", timeout="60s")
class ResourceManager:
def register_resource(self, resource_type: str, amount: Any) -> None
def get_available_resources(self) -> Dict[str, Any]
def allocate(self, request: ResourceRequest) -> Result[ResourceAllocation, Dict[str, Any]]
def release(self, allocation: ResourceAllocation) -> None
# Usage
manager = ResourceManager()
manager.register_resource("cpu_cores", 8)
manager.register_resource("memory", "16GB")
request = ResourceRequest(
compute=ResourceRange(min=2, preferred=4),
memory=ResourceRange(min="1GB", preferred="2GB")
)
allocation_result = manager.allocate(request)
if allocation_result.is_ok():
allocation = allocation_result.unwrap()
# Use resources...
manager.release(allocation)
class ResourceNegotiator:
def request_resources(
self,
request: ResourceRequest,
agent_id: str,
timeout: str = "30s"
) -> Result[Dict[str, Any], Dict[str, Any]]
def handle_request(
self,
message: Message,
evaluator: Callable[[ResourceRequest], Result[Dict[str, Any], Dict[str, Any]]]
) -> Message
# Agent-to-agent resource negotiation
negotiator = ResourceNegotiator(agent)
# Request resources from another agent
resource_result = negotiator.request_resources(
ResourceRequest(compute=ResourceRange(min=4)),
"compute_agent",
timeout="30s"
)
@dataclass
class SecurityConfig:
auth_type: str
credentials: str
permissions: Optional[List[Dict[str, Any]]] = None
require_links: bool = False
strict_link_policy: bool = False
link_config: Optional[LinkConfig] = None
@dataclass
class LinkConfig:
enabled: bool = True
default_lifetime: int = 3600
auto_establish: bool = True
rekey_interval: int = 3600
# Establish secure link
link_result = agent.establish_link("target_agent", lifetime_seconds=3600)
if link_result.is_ok():
link_id = link_result.unwrap()
# Send message over secure link
secure_message = message.with_link(link_id)
agent.send(secure_message)
# Link validation in message handlers
@agent.handler("SECURE_DATA")
def handle_secure_data(message: Message) -> Optional[Message]:
link_id = message.get_link_id()
if not link_id:
return Message.error("MISSING_LINK", "Secure link required")
# Process secure data...
return response_message
class Stream:
def __init__(self, agent: Agent, name: str, options: Optional[StreamOptions] = None)
@classmethod
def connect(cls, agent: Agent, name: str, options: Optional[StreamOptions] = None) -> 'Stream'
def send(self, data: Any) -> Result[None, Dict[str, Any]]
def receive(self, timeout: Optional[float] = None) -> Result[Any, Dict[str, Any]]
def close(self) -> Result[None, Dict[str, Any]]
# Create and use streams
stream_result = agent.create_stream("live_data")
if stream_result.is_ok():
stream = stream_result.unwrap()
# Send data
stream.send({"temperature": 25.5, "timestamp": now()})
# Receive data
data_result = stream.receive(timeout=5.0)
if data_result.is_ok():
data = data_result.unwrap()
process_stream_data(data)
# Close when done
stream.close()
from maple.error import CircuitBreaker
# Create circuit breaker
circuit = CircuitBreaker(
failure_threshold=5, # Open after 5 failures
reset_timeout=30.0, # Test recovery after 30s
half_open_max_calls=1 # 1 test call in half-open state
)
# Use circuit breaker
result = circuit.execute(lambda: risky_network_call())
if result.is_err():
error = result.unwrap_err()
if error.get('errorType') == 'CIRCUIT_OPEN':
# Circuit is protecting us from failing service
use_fallback_service()
from maple.error import retry, RetryOptions, exponential_backoff
# Configure retry options
options = RetryOptions(
max_attempts=3,
backoff=exponential_backoff(initial=0.1, factor=2.0, jitter=0.1),
retryable_errors=['NETWORK_ERROR', 'TIMEOUT']
)
# Use retry
result = retry(lambda: unreliable_operation(), options)
from maple.error import ErrorType, Severity, Error
# Standard error types
ErrorType.NETWORK_ERROR
ErrorType.TIMEOUT
ErrorType.VALIDATION_ERROR
ErrorType.RESOURCE_ERROR
ErrorType.AUTHENTICATION_ERROR
# Create structured errors
error = Error(
error_type="CUSTOM_ERROR",
message="Something went wrong",
details={"context": "additional info"},
severity=Severity.HIGH,
recoverable=True,
suggestion={"action": "retry with different parameters"}
)
from maple.core.types import (
Boolean, Integer, Float, String, Timestamp, UUID, Byte,
Array, Map, Set, Option,
Priority, AgentID, MessageID, Size, Duration
)
# Type validation
validated_data = TypeValidator.validate(raw_data, {
'id': UUID,
'name': String,
'values': Array(Integer),
'metadata': Map(String, String),
'created_at': Timestamp
})
# Size and Duration parsing
memory_bytes = Size.parse("4GB") # Returns 4294967296
timeout_seconds = Duration.parse("5m") # Returns 300.0
# Define custom message types
class TaskRequest:
@staticmethod
def validate(value: Any) -> Dict[str, Any]:
if not isinstance(value, dict):
raise TypeError("TaskRequest must be a dictionary")
return {
'task_id': UUID.validate(value.get('task_id')),
'task_type': String.validate(value.get('task_type')),
'parameters': Map(String, Any).validate(value.get('parameters', {})),
'priority': Priority.validate(value.get('priority', 'MEDIUM'))
}
# Use in message validation
message_schema = {
'task': TaskRequest,
'timeout': Duration,
'resources': ResourceRequest
}
@dataclass
class PerformanceConfig:
connection_pool_size: int = 10
max_concurrent_requests: int = 50
serialization_format: str = "json"
batch_size: int = 10
batch_timeout: str = "100ms"
enable_compression: bool = False
async_mode: bool = True
# Built-in metrics
agent.metrics.message_count # Total messages processed
agent.metrics.average_latency # Average message latency
agent.metrics.error_rate # Error rate percentage
agent.metrics.resource_utilization # Resource usage stats
# Custom metrics
agent.metrics.increment_counter("custom_events")
agent.metrics.record_latency("operation_time", duration)
agent.metrics.set_gauge("active_connections", count)
# Export metrics
from maple.monitoring import PrometheusExporter
exporter = PrometheusExporter()
exporter.export(agent.metrics)
from maple.broker.broker import MessageBroker
class CustomBroker(MessageBroker):
def send(self, message: Message) -> str:
# Custom sending logic
pass
def subscribe(self, agent_id: str, handler: Callable) -> None:
# Custom subscription logic
pass
# Use custom broker
config = Config(
agent_id="my_agent",
broker_url="custom://localhost:8080"
)
custom_broker = CustomBroker(config)
agent = Agent(config, broker=custom_broker)
from maple.core.serialization import Serializer
class ProtobufSerializer(Serializer):
def serialize(self, message: Message) -> bytes:
# Convert message to protobuf
pass
def deserialize(self, data: bytes) -> Message:
# Convert protobuf to message
pass
# Register custom serializer
agent.set_serializer(ProtobufSerializer())
# Always use Result<T,E> for operations that can fail
def reliable_operation() -> Result[Data, Error]:
return safe_call() \
.and_then(validate_data) \
.and_then(process_data) \
.map_err(lambda e: enhance_error(e))
# Always specify resource requirements
@agent.handler("COMPUTE_INTENSIVE")
def handle_computation(message: Message) -> Optional[Message]:
# Check if resources are available
if not has_sufficient_resources(message):
return Message.error("INSUFFICIENT_RESOURCES",
"Need more CPU/memory")
# Process with resource awareness
with resource_manager.allocate(requirements) as allocation:
result = perform_computation(message.payload)
return Message(
message_type="COMPUTATION_RESULT",
payload={"result": result}
)
# Use secure links for sensitive data
@agent.handler("SENSITIVE_DATA")
def handle_sensitive(message: Message) -> Optional[Message]:
if not message.get_link_id():
return Message.error("SECURITY_ERROR",
"Secure link required for sensitive data")
# Process sensitive data securely
pass
- Core API: Stable - Breaking changes only in major versions
- Message API: Stable - Backward compatible evolution
- Resource API: Stable - May add new resource types
- Security API: Stable - Security enhancements may be added
- Error API: Stable - New error types may be added
import maple
print(maple.__version__) # "1.0.0"
# Check API compatibility
if maple.version_info >= (1, 0):
# Use stable APIs
pass
This API provides everything you need to create sophisticated, high-performance agent networks. MAPLE's 32/32 perfect test score and 33x performance advantage ensure your applications will be both reliable and fast.
[Quick Start β](Quick-Start-Tutorial) | [Examples β](Examples) | [Architecture β](Architecture-Overview)
MAPLE API - Where Performance Meets Reliability π
Copyright (C) 2025 Mahesh Vaijainthymala Krishnamoorthy (Mahesh Vaikri) License AGPL 3.0