Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ services:
build:
context: .
dockerfile: Dockerfile
entrypoint: [] # Override image entrypoint for local dev
entrypoint: [] # Override image entrypoint for local dev
env_file:
- path: .env
required: false
Expand All @@ -79,6 +79,11 @@ services:
# Mount source code for hot reloading
- .:/app
- /app/.venv

# Mount local adcp library directly into venv (replaces installed package)
# - ../adcp-client-python/src/adcp:/app/.venv/lib/python3.12/site-packages/adcp:ro - Uncomment this if you want to use locally installed adcp-python-client for e2e testing purposes

# Optional: Mount audit logs
- ./audit_logs:/app/audit_logs
# Shared cache volumes for faster builds
- adcp_global_pip_cache:/root/.cache/pip
Expand All @@ -94,7 +99,7 @@ services:
build:
context: .
dockerfile: Dockerfile
entrypoint: [] # Override image entrypoint for local dev
entrypoint: [] # Override image entrypoint for local dev
env_file:
- path: .env
required: false
Expand All @@ -114,6 +119,11 @@ services:
# Mount source code for hot reloading
- .:/app
- /app/.venv
# Mount local adcp library directly into venv (replaces installed package)
# - ../adcp-client-python/src/adcp:/app/.venv/lib/python3.12/site-packages/adcp:ro - Uncomment this if you want to use locally installed adcp-python-client for e2e testing purposes

# Mount OAuth credentials file
# - ./client_secret.json:/app/client_secret.json:ro
- ./audit_logs:/app/audit_logs
# Shared cache volumes for faster builds
- adcp_global_pip_cache:/root/.cache/pip
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ readme = "README.md"
requires-python = ">=3.12"

dependencies = [
"adcp==2.16.0", # AdCP client - 2.16.0 extends type ergonomics to response types
"adcp==2.17.0", # Official AdCP Python client for external agent communication and adagents.json validation
"fastmcp>=2.13.0", # Required for context.get_http_request() support
"google-generativeai>=0.5.4",
"google-cloud-iam>=2.19.1",
Expand Down
30 changes: 22 additions & 8 deletions run_all_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,20 @@ setup_docker_stack() {
local TEST_PROJECT_NAME="adcp-test-$$" # $$ = process ID, ensures uniqueness
export COMPOSE_PROJECT_NAME="$TEST_PROJECT_NAME"

# Create temporary override file to expose postgres port for tests
# (docker-compose.yml doesn't expose it by default for security)
TEST_COMPOSE_OVERRIDE="/tmp/docker-compose.test-override-$$.yml"
cat > "$TEST_COMPOSE_OVERRIDE" << 'OVERRIDE_EOF'
services:
postgres:
ports:
- "${POSTGRES_PORT:-5435}:5432"
OVERRIDE_EOF
export TEST_COMPOSE_OVERRIDE

# Clean up ONLY this test project's containers/volumes (not your local dev!)
echo "Cleaning up any existing TEST containers (project: $TEST_PROJECT_NAME)..."
docker-compose -p "$TEST_PROJECT_NAME" down -v 2>/dev/null || true
docker-compose -f docker-compose.yml -f "$TEST_COMPOSE_OVERRIDE" -p "$TEST_PROJECT_NAME" down -v 2>/dev/null || true
# DO NOT run docker volume prune - that affects ALL Docker volumes!

# If ports are still in use, find new ones
Expand Down Expand Up @@ -135,15 +146,15 @@ print(' '.join(map(str, ports)))

# Build and start services
echo "Building Docker images (this may take 2-3 minutes on first run)..."
if ! docker-compose -p "$TEST_PROJECT_NAME" build --progress=plain 2>&1 | grep -E "(Step|#|Building|exporting)" | tail -20; then
if ! docker-compose -f docker-compose.yml -f "$TEST_COMPOSE_OVERRIDE" -p "$TEST_PROJECT_NAME" build --progress=plain 2>&1 | grep -E "(Step|#|Building|exporting)" | tail -20; then
echo -e "${RED}❌ Docker build failed${NC}"
exit 1
fi

echo "Starting Docker services..."
if ! docker-compose -p "$TEST_PROJECT_NAME" up -d; then
if ! docker-compose -f docker-compose.yml -f "$TEST_COMPOSE_OVERRIDE" -p "$TEST_PROJECT_NAME" up -d; then
echo -e "${RED}❌ Docker services failed to start${NC}"
docker-compose -p "$TEST_PROJECT_NAME" logs
docker-compose -f docker-compose.yml -f "$TEST_COMPOSE_OVERRIDE" -p "$TEST_PROJECT_NAME" logs
exit 1
fi

Expand All @@ -157,12 +168,12 @@ print(' '.join(map(str, ports)))

if [ $elapsed -gt $max_wait ]; then
echo -e "${RED}❌ Services failed to start within ${max_wait}s${NC}"
docker-compose logs
docker-compose -f docker-compose.yml -f "$TEST_COMPOSE_OVERRIDE" -p "$TEST_PROJECT_NAME" logs
exit 1
fi

# Check PostgreSQL
if docker-compose -p "$TEST_PROJECT_NAME" exec -T postgres pg_isready -U adcp_user >/dev/null 2>&1; then
if docker-compose -f docker-compose.yml -f "$TEST_COMPOSE_OVERRIDE" -p "$TEST_PROJECT_NAME" exec -T postgres pg_isready -U adcp_user >/dev/null 2>&1; then
echo -e "${GREEN}✓ PostgreSQL is ready (${elapsed}s)${NC}"
break
fi
Expand All @@ -173,7 +184,7 @@ print(' '.join(map(str, ports)))
# Run migrations
echo "Running database migrations..."
# Use docker-compose exec to run migrations inside the container
if ! docker-compose -p "$TEST_PROJECT_NAME" exec -T postgres psql -U adcp_user -d postgres -c "CREATE DATABASE adcp_test" 2>/dev/null; then
if ! docker-compose -f docker-compose.yml -f "$TEST_COMPOSE_OVERRIDE" -p "$TEST_PROJECT_NAME" exec -T postgres psql -U adcp_user -d postgres -c "CREATE DATABASE adcp_test" 2>/dev/null; then
echo "Database adcp_test already exists, continuing..."
fi

Expand All @@ -190,7 +201,10 @@ print(' '.join(map(str, ports)))
# Docker teardown function
teardown_docker_stack() {
echo -e "${BLUE}🐳 Stopping TEST Docker stack (project: $COMPOSE_PROJECT_NAME)...${NC}"
docker-compose -p "$COMPOSE_PROJECT_NAME" down -v 2>/dev/null || true
docker-compose -f docker-compose.yml -f "$TEST_COMPOSE_OVERRIDE" -p "$COMPOSE_PROJECT_NAME" down -v 2>/dev/null || true

# Clean up temporary override file
rm -f "$TEST_COMPOSE_OVERRIDE" 2>/dev/null || true

# Prune dangling volumes created by tests (only removes unused volumes)
echo "Cleaning up dangling Docker volumes..."
Expand Down
79 changes: 65 additions & 14 deletions src/a2a_server/adcp_a2a_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,13 +425,16 @@ async def _send_protocol_webhook(
is_active=True,
)

task.status = TaskStatus(state=TaskState(status))

metadata = {
"task_type": task.metadata['skills_requested'][0] if len(task.metadata['skills_requested']) > 0 else 'unknown',
}

await push_notification_service.send_notification(
push_notification_config=push_notification_config,
task_id=task.id,
task_type="task",
status=status,
result=result,
error=error,
payload=task,
metadata=metadata
)
except Exception as e:
# Don't fail the task if webhook fails
Expand Down Expand Up @@ -560,11 +563,12 @@ async def on_message_send(
# Extract push notification config from protocol layer (A2A MessageSendConfiguration)
push_notification_config = None
if hasattr(params, "configuration") and params.configuration:
if hasattr(params.configuration, "pushNotificationConfig"):
push_notification_config = params.configuration.pushNotificationConfig
logger.info(
f"Protocol-level push notification config provided for task {task_id}: {push_notification_config.url}"
)
if hasattr(params.configuration, "push_notification_config"):
push_notification_config = params.configuration.push_notification_config
if push_notification_config:
logger.info(
f"Protocol-level push notification config provided for task {task_id}: {push_notification_config.url}"
)

# Prepare task metadata with both invocation types
task_metadata: dict[str, Any] = {
Expand Down Expand Up @@ -637,7 +641,10 @@ async def on_message_send(
logger.info(f"Processing explicit skill: {skill_name} with parameters: {parameters}")

try:
result = await self._handle_explicit_skill(skill_name, parameters, auth_token)
result = await self._handle_explicit_skill(
skill_name, parameters, auth_token,
push_notification_config=task_metadata.get("push_notification_config")
)
results.append({"skill": skill_name, "result": result, "success": True})
except ServerError:
# ServerError should bubble up immediately (JSON-RPC error)
Expand All @@ -646,6 +653,20 @@ async def on_message_send(
logger.error(f"Error in explicit skill {skill_name}: {e}")
results.append({"skill": skill_name, "error": str(e), "success": False})

# Check for submitted status (manual approval required) - return early without artifacts
# Per AdCP spec, async operations should return Task with status=submitted and no artifacts
for res in results:
if res["success"] and isinstance(res["result"], dict):
result_status = res["result"].get("status")
if result_status == "submitted":
task.status = TaskStatus(state=TaskState.submitted)
task.artifacts = None # No artifacts for pending tasks
logger.info(f"Task {task_id} requires manual approval, returning status=submitted with no artifacts")
# Send protocol-level webhook notification
await self._send_protocol_webhook(task, status="submitted")
self.tasks[task_id] = task
return task

# Create artifacts for all skill results with human-readable text
for i, res in enumerate(results):
artifact_data = res["result"] if res["success"] else {"error": res["error"]}
Expand Down Expand Up @@ -924,11 +945,17 @@ async def on_message_send(
task_state = TaskState.submitted
task_status_str = "submitted"

# Check for explicit status field (e.g., create_media_buy returns this)
result_status = part.data.get("status")
if result_status == "submitted":
task_state = TaskState.submitted
task_status_str = "submitted"

# Mark task with appropriate status
task.status = TaskStatus(state=task_state)

# Send protocol-level webhook notification if configured
await self._send_protocol_webhook(task, status=task_status_str, result=result_data)
await self._send_protocol_webhook(task, status=task_status_str)

except ServerError:
# Re-raise ServerError as-is (will be caught by JSON-RPC handler)
Expand Down Expand Up @@ -960,7 +987,16 @@ async def on_message_send(

# Send protocol-level webhook notification for failure if configured
task.status = TaskStatus(state=TaskState.failed)
await self._send_protocol_webhook(task, status="failed", error=str(e))
# Attach error to task artifacts
task.artifacts = [
Artifact(
artifact_id="error_1",
name="processing_error",
parts=[Part(root=DataPart(data={"error": str(e), "error_type": type(e).__name__}))],
)
]

await self._send_protocol_webhook(task, status="failed")

# Raise ServerError instead of creating failed task
raise ServerError(InternalError(message=f"Message processing failed: {str(e)}"))
Expand Down Expand Up @@ -1338,7 +1374,13 @@ async def on_delete_task_push_notification_config(
logger.error(f"Error deleting push notification config: {e}")
raise ServerError(InternalError(message=f"Failed to delete push notification config: {str(e)}"))

async def _handle_explicit_skill(self, skill_name: str, parameters: dict, auth_token: str | None) -> dict:
async def _handle_explicit_skill(
self,
skill_name: str,
parameters: dict,
auth_token: str | None,
push_notification_config: dict | None = None,
) -> dict:
"""Handle explicit AdCP skill invocations.

Maps skill names to appropriate handlers and validates parameters.
Expand All @@ -1347,13 +1389,17 @@ async def _handle_explicit_skill(self, skill_name: str, parameters: dict, auth_t
skill_name: The AdCP skill name (e.g., "get_products")
parameters: Dictionary of skill-specific parameters
auth_token: Bearer token for authentication (optional for discovery endpoints)
push_notification_config: Push notification config from A2A protocol layer

Returns:
Dictionary containing the skill result

Raises:
ValueError: For unknown skills or invalid parameters
"""
# Inject push_notification_config into parameters for skills that need it
if push_notification_config and skill_name in ("create_media_buy", "sync_creatives"):
parameters = {**parameters, "push_notification_config": push_notification_config}
logger.info(f"Handling explicit skill: {skill_name} with parameters: {list(parameters.keys())}")

# Validate auth_token for non-discovery skills
Expand Down Expand Up @@ -1584,6 +1630,11 @@ async def _handle_create_media_buy_skill(self, parameters: dict, auth_token: str
async def _handle_sync_creatives_skill(self, parameters: dict, auth_token: str) -> dict:
"""Handle explicit sync_creatives skill invocation (AdCP spec endpoint)."""
try:
# DEBUG: Log incoming parameters
logger.info(f"[A2A sync_creatives] Received parameters keys: {list(parameters.keys())}")
logger.info(f"[A2A sync_creatives] assignments param: {parameters.get('assignments')}")
logger.info(f"[A2A sync_creatives] creatives count: {len(parameters.get('creatives', []))}")

# Create ToolContext from A2A auth info
tool_context = self._create_tool_context_from_a2a(
auth_token=auth_token,
Expand Down
5 changes: 4 additions & 1 deletion src/adapters/mock_ad_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,15 @@ def _create_workflow_step(self, step_type: str, status: str, request_data: dict)
# Create a context for async operations if needed
context = ctx_manager.create_context(tenant_id=tenant["tenant_id"], principal_id=self.principal.principal_id)

# Add protocol field for webhook payload creation (mock adapter defaults to MCP)
request_data_with_protocol = {**request_data, "protocol": "mcp"}

# Create workflow step
step = ctx_manager.create_workflow_step(
context_id=context.context_id,
step_type=step_type,
tool_name=step_type.replace("mock_", ""),
request_data=request_data,
request_data=request_data_with_protocol,
status=status,
owner="mock_adapter",
)
Expand Down
Loading
Loading