diff --git a/docker-compose.yml b/docker-compose.yml index b820e42d7..854071235 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -55,7 +55,7 @@ services: build: context: . dockerfile: Dockerfile - entrypoint: [] # Override image entrypoint for local dev + entrypoint: [] # Override image entrypoint for local dev env_file: - path: .env required: false @@ -79,6 +79,11 @@ services: # Mount source code for hot reloading - .:/app - /app/.venv + + # Mount local adcp library directly into venv (replaces installed package) + # - ../adcp-client-python/src/adcp:/app/.venv/lib/python3.12/site-packages/adcp:ro - Uncomment this if you want to use locally installed adcp-python-client for e2e testing purposes + + # Optional: Mount audit logs - ./audit_logs:/app/audit_logs # Shared cache volumes for faster builds - adcp_global_pip_cache:/root/.cache/pip @@ -94,7 +99,7 @@ services: build: context: . dockerfile: Dockerfile - entrypoint: [] # Override image entrypoint for local dev + entrypoint: [] # Override image entrypoint for local dev env_file: - path: .env required: false @@ -114,6 +119,9 @@ services: # Mount source code for hot reloading - .:/app - /app/.venv + # Mount local adcp library directly into venv (replaces installed package) + # - ../adcp-client-python/src/adcp:/app/.venv/lib/python3.12/site-packages/adcp:ro - Uncomment this if you want to use locally installed adcp-python-client for e2e testing purposes + - ./audit_logs:/app/audit_logs # Shared cache volumes for faster builds - adcp_global_pip_cache:/root/.cache/pip diff --git a/pyproject.toml b/pyproject.toml index dfb6c2555..a520a3659 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ readme = "README.md" requires-python = ">=3.12" dependencies = [ - "adcp==2.16.0", # AdCP client - 2.16.0 extends type ergonomics to response types + "adcp==2.17.0", # Official AdCP Python client for external agent communication and adagents.json validation "fastmcp>=2.13.0", # Required for context.get_http_request() support "google-generativeai>=0.5.4", "google-cloud-iam>=2.19.1", diff --git a/run_all_tests.sh b/run_all_tests.sh index 03b5e1c0a..36b2fdc50 100755 --- a/run_all_tests.sh +++ b/run_all_tests.sh @@ -85,9 +85,20 @@ setup_docker_stack() { local TEST_PROJECT_NAME="adcp-test-$$" # $$ = process ID, ensures uniqueness export COMPOSE_PROJECT_NAME="$TEST_PROJECT_NAME" + # Create temporary override file to expose postgres port for tests + # (docker-compose.yml doesn't expose it by default for security) + TEST_COMPOSE_OVERRIDE="/tmp/docker-compose.test-override-$$.yml" + cat > "$TEST_COMPOSE_OVERRIDE" << 'OVERRIDE_EOF' +services: + postgres: + ports: + - "${POSTGRES_PORT:-5435}:5432" +OVERRIDE_EOF + export TEST_COMPOSE_OVERRIDE + # Clean up ONLY this test project's containers/volumes (not your local dev!) echo "Cleaning up any existing TEST containers (project: $TEST_PROJECT_NAME)..." - docker-compose -p "$TEST_PROJECT_NAME" down -v 2>/dev/null || true + docker-compose -f docker-compose.yml -f "$TEST_COMPOSE_OVERRIDE" -p "$TEST_PROJECT_NAME" down -v 2>/dev/null || true # DO NOT run docker volume prune - that affects ALL Docker volumes! # If ports are still in use, find new ones @@ -135,15 +146,15 @@ print(' '.join(map(str, ports))) # Build and start services echo "Building Docker images (this may take 2-3 minutes on first run)..." - if ! docker-compose -p "$TEST_PROJECT_NAME" build --progress=plain 2>&1 | grep -E "(Step|#|Building|exporting)" | tail -20; then + if ! docker-compose -f docker-compose.yml -f "$TEST_COMPOSE_OVERRIDE" -p "$TEST_PROJECT_NAME" build --progress=plain 2>&1 | grep -E "(Step|#|Building|exporting)" | tail -20; then echo -e "${RED}❌ Docker build failed${NC}" exit 1 fi echo "Starting Docker services..." - if ! docker-compose -p "$TEST_PROJECT_NAME" up -d; then + if ! docker-compose -f docker-compose.yml -f "$TEST_COMPOSE_OVERRIDE" -p "$TEST_PROJECT_NAME" up -d; then echo -e "${RED}❌ Docker services failed to start${NC}" - docker-compose -p "$TEST_PROJECT_NAME" logs + docker-compose -f docker-compose.yml -f "$TEST_COMPOSE_OVERRIDE" -p "$TEST_PROJECT_NAME" logs exit 1 fi @@ -157,12 +168,12 @@ print(' '.join(map(str, ports))) if [ $elapsed -gt $max_wait ]; then echo -e "${RED}❌ Services failed to start within ${max_wait}s${NC}" - docker-compose logs + docker-compose -f docker-compose.yml -f "$TEST_COMPOSE_OVERRIDE" -p "$TEST_PROJECT_NAME" logs exit 1 fi # Check PostgreSQL - if docker-compose -p "$TEST_PROJECT_NAME" exec -T postgres pg_isready -U adcp_user >/dev/null 2>&1; then + if docker-compose -f docker-compose.yml -f "$TEST_COMPOSE_OVERRIDE" -p "$TEST_PROJECT_NAME" exec -T postgres pg_isready -U adcp_user >/dev/null 2>&1; then echo -e "${GREEN}✓ PostgreSQL is ready (${elapsed}s)${NC}" break fi @@ -173,7 +184,7 @@ print(' '.join(map(str, ports))) # Run migrations echo "Running database migrations..." # Use docker-compose exec to run migrations inside the container - if ! docker-compose -p "$TEST_PROJECT_NAME" exec -T postgres psql -U adcp_user -d postgres -c "CREATE DATABASE adcp_test" 2>/dev/null; then + if ! docker-compose -f docker-compose.yml -f "$TEST_COMPOSE_OVERRIDE" -p "$TEST_PROJECT_NAME" exec -T postgres psql -U adcp_user -d postgres -c "CREATE DATABASE adcp_test" 2>/dev/null; then echo "Database adcp_test already exists, continuing..." fi @@ -190,7 +201,10 @@ print(' '.join(map(str, ports))) # Docker teardown function teardown_docker_stack() { echo -e "${BLUE}🐳 Stopping TEST Docker stack (project: $COMPOSE_PROJECT_NAME)...${NC}" - docker-compose -p "$COMPOSE_PROJECT_NAME" down -v 2>/dev/null || true + docker-compose -f docker-compose.yml -f "$TEST_COMPOSE_OVERRIDE" -p "$COMPOSE_PROJECT_NAME" down -v 2>/dev/null || true + + # Clean up temporary override file + rm -f "$TEST_COMPOSE_OVERRIDE" 2>/dev/null || true # Prune dangling volumes created by tests (only removes unused volumes) echo "Cleaning up dangling Docker volumes..." diff --git a/src/a2a_server/adcp_a2a_server.py b/src/a2a_server/adcp_a2a_server.py index b18fd4acb..45e53771a 100644 --- a/src/a2a_server/adcp_a2a_server.py +++ b/src/a2a_server/adcp_a2a_server.py @@ -97,6 +97,8 @@ from src.core.tools import ( update_performance_index_raw as core_update_performance_index_tool, ) +from adcp import create_a2a_webhook_payload +from adcp.types import GeneratedTaskStatus from src.services.protocol_webhook_service import get_protocol_webhook_service @@ -392,7 +394,14 @@ async def _send_protocol_webhook( result: dict[str, Any] | None = None, error: str | None = None, ): - """Send protocol-level push notification if configured.""" + """Send protocol-level push notification if configured. + + Per AdCP A2A spec (https://docs.adcontextprotocol.org/docs/protocols/a2a-guide#push-notifications-a2a-specific): + - Final states (completed, failed, canceled): Send full Task object with artifacts + - Intermediate states (working, input-required, submitted): Send TaskStatusUpdateEvent + + Uses create_a2a_webhook_payload from adcp library to automatically select correct type. + """ try: # Check if task has push notification config in metadata if not task.metadata or "push_notification_config" not in task.metadata: @@ -401,22 +410,20 @@ async def _send_protocol_webhook( webhook_config = task.metadata["push_notification_config"] push_notification_service = get_protocol_webhook_service() - # build push notification config from step request data from uuid import uuid4 - cfg_dict = webhook_config.get("push_notification_config") or {} - url = cfg_dict.get("url") + url = webhook_config.get("url") if not url: logger.info("[red]No push notification URL present; skipping webhook[/red]") return - authentication = cfg_dict.get("authentication") or {} + authentication = webhook_config.get("authentication") or {} schemes = authentication.get("schemes") or [] auth_type = schemes[0] if isinstance(schemes, list) and schemes else None auth_token = authentication.get("credentials") push_notification_config = DBPushNotificationConfig( - id=cfg_dict.get("id") or f"pnc_{uuid4().hex[:16]}", + id=webhook_config.get("id") or f"pnc_{uuid4().hex[:16]}", tenant_id="", principal_id="", url=url, @@ -425,13 +432,38 @@ async def _send_protocol_webhook( is_active=True, ) + # Convert status string to GeneratedTaskStatus enum + try: + status_enum = GeneratedTaskStatus(status) + except ValueError: + # Fallback for unknown status values + logger.warning(f"Unknown status '{status}', defaulting to 'working'") + status_enum = GeneratedTaskStatus.working + + # Build result data for the webhook payload + # Include error information in result if status is failed + result_data: dict[str, Any] = result or {} + if error and status == "failed": + result_data["error"] = error + + # Use create_a2a_webhook_payload to get the correct payload type: + # - Task for final states (completed, failed, canceled) + # - TaskStatusUpdateEvent for intermediate states (working, input-required, submitted) + payload = create_a2a_webhook_payload( + task_id=task.id, + status=status_enum, + context_id=task.context_id or "", + result=result_data, + ) + + metadata = { + "task_type": task.metadata['skills_requested'][0] if len(task.metadata['skills_requested']) > 0 else 'unknown', + } + await push_notification_service.send_notification( push_notification_config=push_notification_config, - task_id=task.id, - task_type="task", - status=status, - result=result, - error=error, + payload=payload, + metadata=metadata ) except Exception as e: # Don't fail the task if webhook fails @@ -560,11 +592,12 @@ async def on_message_send( # Extract push notification config from protocol layer (A2A MessageSendConfiguration) push_notification_config = None if hasattr(params, "configuration") and params.configuration: - if hasattr(params.configuration, "pushNotificationConfig"): - push_notification_config = params.configuration.pushNotificationConfig - logger.info( - f"Protocol-level push notification config provided for task {task_id}: {push_notification_config.url}" - ) + if hasattr(params.configuration, "push_notification_config"): + push_notification_config = params.configuration.push_notification_config + if push_notification_config: + logger.info( + f"Protocol-level push notification config provided for task {task_id}: {push_notification_config.url}" + ) # Prepare task metadata with both invocation types task_metadata: dict[str, Any] = { @@ -637,7 +670,10 @@ async def on_message_send( logger.info(f"Processing explicit skill: {skill_name} with parameters: {parameters}") try: - result = await self._handle_explicit_skill(skill_name, parameters, auth_token) + result = await self._handle_explicit_skill( + skill_name, parameters, auth_token, + push_notification_config=task_metadata.get("push_notification_config") + ) results.append({"skill": skill_name, "result": result, "success": True}) except ServerError: # ServerError should bubble up immediately (JSON-RPC error) @@ -646,6 +682,20 @@ async def on_message_send( logger.error(f"Error in explicit skill {skill_name}: {e}") results.append({"skill": skill_name, "error": str(e), "success": False}) + # Check for submitted status (manual approval required) - return early without artifacts + # Per AdCP spec, async operations should return Task with status=submitted and no artifacts + for res in results: + if res["success"] and isinstance(res["result"], dict): + result_status = res["result"].get("status") + if result_status == "submitted": + task.status = TaskStatus(state=TaskState.submitted) + task.artifacts = None # No artifacts for pending tasks + logger.info(f"Task {task_id} requires manual approval, returning status=submitted with no artifacts") + # Send protocol-level webhook notification + await self._send_protocol_webhook(task, status="submitted") + self.tasks[task_id] = task + return task + # Create artifacts for all skill results with human-readable text for i, res in enumerate(results): artifact_data = res["result"] if res["success"] else {"error": res["error"]} @@ -924,11 +974,17 @@ async def on_message_send( task_state = TaskState.submitted task_status_str = "submitted" + # Check for explicit status field (e.g., create_media_buy returns this) + result_status = part.data.get("status") + if result_status == "submitted": + task_state = TaskState.submitted + task_status_str = "submitted" + # Mark task with appropriate status task.status = TaskStatus(state=task_state) # Send protocol-level webhook notification if configured - await self._send_protocol_webhook(task, status=task_status_str, result=result_data) + await self._send_protocol_webhook(task, status=task_status_str) except ServerError: # Re-raise ServerError as-is (will be caught by JSON-RPC handler) @@ -960,7 +1016,16 @@ async def on_message_send( # Send protocol-level webhook notification for failure if configured task.status = TaskStatus(state=TaskState.failed) - await self._send_protocol_webhook(task, status="failed", error=str(e)) + # Attach error to task artifacts + task.artifacts = [ + Artifact( + artifact_id="error_1", + name="processing_error", + parts=[Part(root=DataPart(data={"error": str(e), "error_type": type(e).__name__}))], + ) + ] + + await self._send_protocol_webhook(task, status="failed") # Raise ServerError instead of creating failed task raise ServerError(InternalError(message=f"Message processing failed: {str(e)}")) @@ -1338,7 +1403,13 @@ async def on_delete_task_push_notification_config( logger.error(f"Error deleting push notification config: {e}") raise ServerError(InternalError(message=f"Failed to delete push notification config: {str(e)}")) - async def _handle_explicit_skill(self, skill_name: str, parameters: dict, auth_token: str | None) -> dict: + async def _handle_explicit_skill( + self, + skill_name: str, + parameters: dict, + auth_token: str | None, + push_notification_config: dict | None = None, + ) -> dict: """Handle explicit AdCP skill invocations. Maps skill names to appropriate handlers and validates parameters. @@ -1347,6 +1418,7 @@ async def _handle_explicit_skill(self, skill_name: str, parameters: dict, auth_t skill_name: The AdCP skill name (e.g., "get_products") parameters: Dictionary of skill-specific parameters auth_token: Bearer token for authentication (optional for discovery endpoints) + push_notification_config: Push notification config from A2A protocol layer Returns: Dictionary containing the skill result @@ -1354,6 +1426,9 @@ async def _handle_explicit_skill(self, skill_name: str, parameters: dict, auth_t Raises: ValueError: For unknown skills or invalid parameters """ + # Inject push_notification_config into parameters for skills that need it + if push_notification_config and skill_name in ("create_media_buy", "sync_creatives"): + parameters = {**parameters, "push_notification_config": push_notification_config} logger.info(f"Handling explicit skill: {skill_name} with parameters: {list(parameters.keys())}") # Validate auth_token for non-discovery skills @@ -1584,6 +1659,11 @@ async def _handle_create_media_buy_skill(self, parameters: dict, auth_token: str async def _handle_sync_creatives_skill(self, parameters: dict, auth_token: str) -> dict: """Handle explicit sync_creatives skill invocation (AdCP spec endpoint).""" try: + # DEBUG: Log incoming parameters + logger.info(f"[A2A sync_creatives] Received parameters keys: {list(parameters.keys())}") + logger.info(f"[A2A sync_creatives] assignments param: {parameters.get('assignments')}") + logger.info(f"[A2A sync_creatives] creatives count: {len(parameters.get('creatives', []))}") + # Create ToolContext from A2A auth info tool_context = self._create_tool_context_from_a2a( auth_token=auth_token, diff --git a/src/adapters/mock_ad_server.py b/src/adapters/mock_ad_server.py index e3328ad52..2ccad7f59 100644 --- a/src/adapters/mock_ad_server.py +++ b/src/adapters/mock_ad_server.py @@ -171,12 +171,15 @@ def _create_workflow_step(self, step_type: str, status: str, request_data: dict) # Create a context for async operations if needed context = ctx_manager.create_context(tenant_id=tenant["tenant_id"], principal_id=self.principal.principal_id) + # Add protocol field for webhook payload creation (mock adapter defaults to MCP) + request_data_with_protocol = {**request_data, "protocol": "mcp"} + # Create workflow step step = ctx_manager.create_workflow_step( context_id=context.context_id, step_type=step_type, tool_name=step_type.replace("mock_", ""), - request_data=request_data, + request_data=request_data_with_protocol, status=status, owner="mock_adapter", ) diff --git a/src/admin/blueprints/creatives.py b/src/admin/blueprints/creatives.py index 1f9f2a8b8..a3e34efdc 100644 --- a/src/admin/blueprints/creatives.py +++ b/src/admin/blueprints/creatives.py @@ -15,6 +15,11 @@ from src.core.database.models import ( PushNotificationConfig as DBPushNotificationConfig, ) +from a2a.types import Task, TaskStatusUpdateEvent +from adcp import create_a2a_webhook_payload, create_mcp_webhook_payload +from adcp.types import McpWebhookPayload, SyncCreativesSuccessResponse, CreativeAction, SyncCreativeResult +from adcp.types.generated_poc.core.context import ContextObject +from adcp.webhooks import GeneratedTaskStatus from src.services.protocol_webhook_service import get_protocol_webhook_service # TODO: Missing module - these functions need to be implemented @@ -67,84 +72,36 @@ def _cleanup_completed_tasks(): for task_id in completed_tasks: del _ai_review_tasks[task_id] logger.debug(f"Cleaned up completed AI review task: {task_id}") - """Call webhook to notify about creative status change with retry logic. - Args: - webhook_url: URL to POST notification to - creative_id: Creative ID - status: New status (approved, rejected, pending) - creative_data: Optional creative data to include - tenant_id: Optional tenant ID for signature verification - Returns: - bool: True if webhook delivered successfully, False otherwise - """ - from src.core.webhook_delivery import WebhookDelivery, deliver_webhook_with_retry +def _compute_media_buy_status_from_flight_dates(media_buy) -> str: + """Compute status based on flight dates: 'active' if within window, else 'scheduled'.""" + now = datetime.now(UTC) - try: - # Build payload - payload = { - "object_type": "creative", - "object_id": creative_id, - "status": status, - "timestamp": datetime.now(UTC).isoformat(), - } + start_time = None + if media_buy.start_time: + raw_start = media_buy.start_time + start_time = raw_start.replace(tzinfo=UTC) if raw_start.tzinfo is None else raw_start.astimezone(UTC) + elif media_buy.start_date: + start_time = datetime.combine(media_buy.start_date, datetime.min.time()).replace(tzinfo=UTC) - if creative_data: - payload["creative_data"] = creative_data - - headers = {"Content-Type": "application/json"} - - # Get signing secret from tenant - signing_secret = None - if tenant_id: - try: - with get_db_session() as db_session: - stmt = select(Tenant).filter_by(tenant_id=tenant_id) - tenant = db_session.scalars(stmt).first() - if tenant and hasattr(tenant, "admin_token") and tenant.admin_token: - signing_secret = tenant.admin_token - except Exception as e: - logger.warning(f"Could not fetch tenant for signature: {e}") - - # Create delivery configuration - delivery = WebhookDelivery( - webhook_url=webhook_url, - payload=payload, - headers=headers, - max_retries=3, - timeout=10, - signing_secret=signing_secret, - event_type="creative.status_changed", - tenant_id=tenant_id, - object_id=creative_id, - ) + end_time = None + if media_buy.end_time: + raw_end = media_buy.end_time + end_time = raw_end.replace(tzinfo=UTC) if raw_end.tzinfo is None else raw_end.astimezone(UTC) + elif media_buy.end_date: + end_time = datetime.combine(media_buy.end_date, datetime.max.time()).replace(tzinfo=UTC) - # Deliver with retry - success, result = deliver_webhook_with_retry(delivery) + # If start time passed and end time not passed, set to active + if start_time and end_time and now >= start_time and now <= end_time: + return "active" - if success: - logger.info( - f"Successfully delivered webhook for creative {creative_id} status={status} " - f"(attempts={result['attempts']}, delivery_id={result['delivery_id']})" - ) - else: - logger.error( - f"Failed to deliver webhook for creative {creative_id} after {result['attempts']} attempts: " - f"{result.get('error', 'Unknown error')} (delivery_id={result['delivery_id']})" - ) - - return success - - except Exception as e: - logger.error(f"Error setting up webhook delivery for creative {creative_id}: {e}", exc_info=True) - return False + return "scheduled" async def _call_webhook_for_creative_status( db_session, creative_id, - result, tenant_id: str | None = None, ): """Send protocol-level push notification for creative status update. @@ -152,10 +109,6 @@ async def _call_webhook_for_creative_status( Checks if all creatives in the sync_creatives task have been reviewed. Only fires the webhook when ALL creatives have been reviewed (approved or rejected). - This implements the semantic that sync_creatives task status should be: - - 'submitted' when any creatives are pending review - - 'completed' when all creatives have been reviewed - Returns: bool: True if webhook delivered successfully, False otherwise (or if no config found) """ @@ -209,18 +162,28 @@ async def _call_webhook_for_creative_status( logger.info(f"All {len(all_creatives)} creatives in task {step.step_id} have been reviewed; firing webhook") # Build SyncCreativesResponse with all creative results - complete_result = { - "creatives": [ - { - "creative_id": c.creative_id, - "name": c.name, - "format": c.format, - "status": c.status, - "action": "updated", # They were reviewed/updated - } - for c in all_creatives - ] - } + + creatives: list[SyncCreativeResult] = [ + SyncCreativeResult( + creative_id=c.creative_id, + platform_id="", # we need to populate this. Currently not storing any internal id of our own per creative + action=CreativeAction.failed if c.status != "approved" else CreativeAction.created, + errors=[c.data.get("rejection_reason")] if c.data and c.data.get("rejection_reason") else [] + ) + for c in all_creatives + ] + + # Convert context dict to ContextObject if present + context_data = step.request_data.get("context") + context_obj: ContextObject | None = None + if context_data and isinstance(context_data, dict): + context_obj = ContextObject.model_construct(**context_data) + + complete_result = SyncCreativesSuccessResponse( + creatives=creatives, + dry_run=False, + context=context_obj + ) # build push notification config from step request data # this is because we don't store push notification config in the database when creating the creative @@ -262,13 +225,37 @@ async def _call_webhook_for_creative_status( logger.info("error: None") logger.info(f"push_notification_config: {push_notification_config}") + # Determine protocol type from workflow step request_data + protocol = step.request_data.get("protocol", "mcp") # Default to MCP for backward compatibility + + # Create appropriate webhook payload based on protocol + # Convert result to dict for webhook payload functions + result_dict = complete_result.model_dump(mode="json") + + payload: Task | TaskStatusUpdateEvent | McpWebhookPayload + if protocol == "a2a": + payload = create_a2a_webhook_payload( + task_id=step.step_id, + status=GeneratedTaskStatus.completed, + result=result_dict, + context_id=step.context_id + ) + else: + # TODO: Fix in adcp python client - create_mcp_webhook_payload should return + # McpWebhookPayload instead of dict[str, Any] for proper type safety + mcp_payload_dict = create_mcp_webhook_payload(step.step_id, GeneratedTaskStatus.completed, result_dict) + payload = McpWebhookPayload.model_construct(**mcp_payload_dict) + + metadata = { + "task_type": step.tool_name + # TODO: @yusuf - check if we were passing principal_id and tenant to this previously + # TODO: @yusuf - check if we want to make metadata typed + } + await service.send_notification( push_notification_config=push_notification_config, - task_id=step.step_id, - task_type=step.tool_name, - status="completed", - result=complete_result, - error=None, + payload=payload, + metadata=metadata ) logger.info( @@ -480,25 +467,10 @@ def approve_creative(tenant_id, creative_id, **kwargs): db_session.commit() - # need to make sure this is complient with AdCP standard - result = { - "creatives": [ - { - "creative_id": creative.creative_id, - "name": creative.name, - "format": creative.format, - "status": "approved", - "approved_by": approved_by, - "approved_at": creative.approved_at.isoformat(), - } - ] - } - asyncio.run( _call_webhook_for_creative_status( db_session=db_session, creative_id=creative_id, - result=result, tenant_id=tenant_id, ) ) @@ -566,7 +538,7 @@ def approve_creative(tenant_id, creative_id, **kwargs): logger.info(f"[CREATIVE APPROVAL] Media buy {media_buy_id} status: {media_buy.status}") # Only check if media buy is waiting for creatives - if media_buy.status == "pending_creatives": + if media_buy.status == "pending_creatives" or media_buy.status == "draft": # Get all creative assignments for this media buy stmt_all_assignments = select(CreativeAssignment).filter_by(media_buy_id=media_buy_id) all_assignments = db_session.scalars(stmt_all_assignments).all() @@ -596,13 +568,14 @@ def approve_creative(tenant_id, creative_id, **kwargs): success, error_msg = execute_approved_media_buy(media_buy_id, tenant_id) if success: - # Update media buy status - media_buy.status = "scheduled" + # Update media buy status based on flight dates + new_status = _compute_media_buy_status_from_flight_dates(media_buy) + media_buy.status = new_status media_buy.approved_at = datetime.now(UTC) media_buy.approved_by = "system" db_session.commit() - logger.info(f"[CREATIVE APPROVAL] Media buy {media_buy_id} successfully created in adapter") + logger.info(f"[CREATIVE APPROVAL] Media buy {media_buy_id} successfully created in adapter, status={new_status}") else: logger.error(f"[CREATIVE APPROVAL] Adapter creation failed for {media_buy_id}: {error_msg}") # Leave status as pending_creatives so admin can retry @@ -693,24 +666,9 @@ def reject_creative(tenant_id, creative_id, **kwargs): db_session.commit() - # need to make sure this is complient with AdCP standard - result = { - "creatives": [ - { - "creative_id": creative.creative_id, - "name": creative.name, - "format": creative.format, - "status": "rejected", - "rejected_by": rejected_by, - "rejection_reason": rejection_reason, - "rejected_at": creative.data["rejected_at"], - } - ] - } - asyncio.run( _call_webhook_for_creative_status( - db_session=db_session, creative_id=creative_id, result=result, tenant_id=tenant_id + db_session=db_session, creative_id=creative_id, tenant_id=tenant_id ) ) @@ -869,7 +827,7 @@ async def _ai_review_creative_async( } asyncio.run( _call_webhook_for_creative_status( - db_session=session, creative_id=creative_id, result=result, tenant_id=tenant_id + db_session=session, creative_id=creative_id, tenant_id=tenant_id ) ) logger.info(f"[AI Review Async] Webhook called for {creative_id}") diff --git a/src/admin/blueprints/operations.py b/src/admin/blueprints/operations.py index 639d2bc66..47b272b43 100644 --- a/src/admin/blueprints/operations.py +++ b/src/admin/blueprints/operations.py @@ -9,6 +9,9 @@ from src.admin.utils import require_auth, require_tenant_access from src.core.database.models import MediaBuy, MediaPackage, PushNotificationConfig from src.services.protocol_webhook_service import get_protocol_webhook_service +from a2a.types import TaskState +from adcp.types import CreateMediaBuySuccessResponse, GeneratedTaskStatus as AdcpTaskStatus, Package +from adcp import create_a2a_webhook_payload, create_mcp_webhook_payload logger = logging.getLogger(__name__) @@ -311,6 +314,14 @@ def approve_media_buy(tenant_id, media_buy_id, **kwargs): flash("No pending approval found for this media buy", "warning") return redirect(url_for("operations.media_buy_detail", tenant_id=tenant_id, media_buy_id=media_buy_id)) + # Extract step data to dict to avoid detached instance errors after commit/nested sessions + step_data = { + "step_id": step.step_id, + "context_id": step.context_id, + "tool_name": step.tool_name, + "request_data": step.request_data or {}, + } + # Get user info for audit from flask import session as flask_session @@ -320,6 +331,17 @@ def approve_media_buy(tenant_id, media_buy_id, **kwargs): stmt_buy = select(MediaBuy).filter_by(media_buy_id=media_buy_id, tenant_id=tenant_id) media_buy = db_session.scalars(stmt_buy).first() + # Extract media_buy data to dict to avoid detached instance errors after commit + media_buy_data = None + if media_buy: + raw_request = media_buy.raw_request or {} + push_config = raw_request.get("push_notification_config") or {} + media_buy_data = { + "principal_id": media_buy.principal_id, + "buyer_ref": media_buy.buyer_ref, + "push_notification_url": push_config.get("url"), + } + if action == "approve": step.status = "approved" step.updated_at = datetime.now(UTC) @@ -423,41 +445,62 @@ def approve_media_buy(tenant_id, media_buy_id, **kwargs): logger.info(f"[APPROVAL] Adapter creation succeeded for {media_buy_id}") # Send webhook notification to buyer - stmt_webhook = ( - select(PushNotificationConfig) - .filter_by( - tenant_id=tenant_id, - principal_id=media_buy.principal_id, - url=media_buy.raw_request.get("push_notification_config").get("url"), - is_active=True, + webhook_config = None + if media_buy_data and media_buy_data["push_notification_url"]: + stmt_webhook = ( + select(PushNotificationConfig) + .filter_by( + tenant_id=tenant_id, + principal_id=media_buy_data["principal_id"], + url=media_buy_data["push_notification_url"], + is_active=True, + ) + .order_by(PushNotificationConfig.created_at.desc()) ) - .order_by(PushNotificationConfig.created_at.desc()) - ) - webhook_config = db_session.scalars(stmt_webhook).first() + webhook_config = db_session.scalars(stmt_webhook).first() - if webhook_config: + if webhook_config and media_buy_data: all_packages = db_session.scalars( select(MediaPackage).filter_by(media_buy_id=media_buy_id) ).all() - # Why this is not compliant with AdCP spec (particularly why status is missing in the protocol)? - webhook_payload = { - "media_buy_id": media_buy_id, - "buyer_ref": media_buy.buyer_ref, - "status": media_buy.status, - "packages": [{"package_id": x.package_id, "status": "approved"} for x in all_packages], + create_media_buy_approved_result = CreateMediaBuySuccessResponse( + media_buy_id=media_buy_id, + buyer_ref=media_buy_data["buyer_ref"], + packages=[Package(package_id=x.package_id) for x in all_packages], + context={}, # TODO: @yusuf - please fix this, like we've fixed in the creative approval + ) + metadata = { + "task_type": step_data["tool_name"], + # TODO: @yusuf - check if we were passing principal_id and tenant to this previously + # TODO: @yusuf - check if we want to make metadata typed } + # Determine protocol type from workflow step request_data + protocol = step_data["request_data"].get("protocol", "mcp") # Default to MCP for backward compatibility + + # Create appropriate webhook payload based on protocol + if protocol == "a2a": + create_media_buy_approved_payload = create_a2a_webhook_payload( + task_id=step_data["step_id"], + status=AdcpTaskStatus.completed, + result=create_media_buy_approved_result, + context_id=step_data["context_id"] + ) + else: + create_media_buy_approved_payload = create_mcp_webhook_payload( + task_id=step_data["step_id"], + result=create_media_buy_approved_result, + status=AdcpTaskStatus.completed + ) + try: service = get_protocol_webhook_service() asyncio.run( service.send_notification( push_notification_config=webhook_config, - task_id=step.step_id, - task_type=step.tool_name, - status="completed", - result=webhook_payload, - error=None, + payload=create_media_buy_approved_payload, + metadata=metadata ) ) logger.info(f"Sent webhook notification for approved media buy {media_buy_id}") @@ -492,39 +535,60 @@ def approve_media_buy(tenant_id, media_buy_id, **kwargs): db_session.commit() # Send webhook notification to buyer - stmt_webhook = ( - select(PushNotificationConfig) - .filter_by( - tenant_id=tenant_id, - principal_id=media_buy.principal_id, - url=media_buy.raw_request.get("push_notification_config").get("url"), - is_active=True, + webhook_config = None + if media_buy_data and media_buy_data["push_notification_url"]: + stmt_webhook = ( + select(PushNotificationConfig) + .filter_by( + tenant_id=tenant_id, + principal_id=media_buy_data["principal_id"], + url=media_buy_data["push_notification_url"], + is_active=True, + ) + .order_by(PushNotificationConfig.created_at.desc()) ) - .order_by(PushNotificationConfig.created_at.desc()) - ) - webhook_config = db_session.scalars(stmt_webhook).first() + webhook_config = db_session.scalars(stmt_webhook).first() - if webhook_config: + if webhook_config and media_buy_data: all_packages = db_session.scalars(select(MediaPackage).filter_by(media_buy_id=media_buy_id)).all() - # Why this is not compliant with AdCP spec (particularly why status is missing in the protocol)? - webhook_payload = { - "media_buy_id": media_buy_id, - "buyer_ref": media_buy.buyer_ref, - "status": media_buy.status, - "packages": [{"package_id": x.package_id, "status": "rejected"} for x in all_packages], + create_media_buy_rejected_result = CreateMediaBuySuccessResponse( + media_buy_id=media_buy_id, + buyer_ref=media_buy_data["buyer_ref"], + packages=[Package(package_id=x.package_id) for x in all_packages], + context={}, # TODO: @yusuf - please fix this, like we've fixed in the creative approval + ) + metadata = { + "task_type": step_data["tool_name"], + # TODO: @yusuf - check if we were passing principal_id and tenant to this previously + # TODO: @yusuf - check if we want to make metadata typed } + # Determine protocol type from workflow step request_data + protocol = step_data["request_data"].get("protocol", "mcp") # Default to MCP for backward compatibility + + # Create appropriate webhook payload based on protocol + if protocol == "a2a": + create_media_buy_rejected_payload = create_a2a_webhook_payload( + task_id=step_data["step_id"], + status=AdcpTaskStatus.rejected, + result=create_media_buy_rejected_result, + context_id=step_data["context_id"] + ) + else: + create_media_buy_rejected_payload = create_mcp_webhook_payload( + task_id=step_data["step_id"], + result=create_media_buy_rejected_result, + status=AdcpTaskStatus.rejected + ) + try: service = get_protocol_webhook_service() asyncio.run( service.send_notification( push_notification_config=webhook_config, - task_id=step.step_id, - task_type=step.tool_name, - status="rejected", - result=webhook_payload, - error=None, + payload=create_media_buy_rejected_payload, + metadata=metadata ) ) logger.info(f"Sent webhook notification for rejected media buy {media_buy_id}") @@ -548,29 +612,19 @@ def trigger_delivery_webhook(tenant_id, media_buy_id, **kwargs): """Trigger a delivery report webhook for a media buy manually.""" from flask import flash, redirect, url_for - from src.core.database.database_session import get_db_session from src.services.delivery_webhook_scheduler import get_delivery_webhook_scheduler try: - with get_db_session() as db_session: - # Get media buy - stmt = select(MediaBuy).filter_by(tenant_id=tenant_id, media_buy_id=media_buy_id) - media_buy = db_session.scalars(stmt).first() - - if not media_buy: - flash("Media buy not found", "error") - return redirect(url_for("operations.media_buy_detail", tenant_id=tenant_id, media_buy_id=media_buy_id)) - - # Trigger webhook using scheduler - scheduler = get_delivery_webhook_scheduler() - success = asyncio.run(scheduler.trigger_report_for_media_buy(media_buy, db_session)) + # Trigger webhook using scheduler - pass IDs to avoid detached instance errors + scheduler = get_delivery_webhook_scheduler() + success = asyncio.run(scheduler.trigger_report_for_media_buy_by_id(media_buy_id, tenant_id)) - if success: - flash("Delivery webhook triggered successfully", "success") - else: - flash("Failed to trigger delivery webhook. Check logs or configuration.", "warning") + if success: + flash("Delivery webhook triggered successfully", "success") + else: + flash("Failed to trigger delivery webhook. Check logs or configuration.", "warning") - return redirect(url_for("operations.media_buy_detail", tenant_id=tenant_id, media_buy_id=media_buy_id)) + return redirect(url_for("operations.media_buy_detail", tenant_id=tenant_id, media_buy_id=media_buy_id)) except Exception as e: logger.error(f"Error triggering delivery webhook for {media_buy_id}: {e}", exc_info=True) diff --git a/src/core/context_manager.py b/src/core/context_manager.py index 59ca99867..5bda638ba 100644 --- a/src/core/context_manager.py +++ b/src/core/context_manager.py @@ -9,6 +9,11 @@ from rich.console import Console from sqlalchemy import select +from a2a.types import Task, TaskStatusUpdateEvent +from adcp import create_a2a_webhook_payload, create_mcp_webhook_payload +from adcp.types import McpWebhookPayload +from adcp.webhooks import GeneratedTaskStatus + from src.core.database.database_session import DatabaseManager from src.core.database.models import Context, ObjectWorkflowMapping, WorkflowStep from src.services.protocol_webhook_service import get_protocol_webhook_service @@ -328,6 +333,7 @@ def mark_human_needed( request_data={ "reason": reason, "details": clarification_details, + "protocol": "mcp", # Default to MCP for internal system actions }, initial_comment=reason, ) @@ -650,6 +656,34 @@ def _send_push_notifications(self, step: WorkflowStep, new_status: str, session: f"[cyan]📤 Sending webhook to {push_notification_config.url} for {mapping.object_type} {mapping.object_id}[/cyan]" ) + # Build webhook payload based on protocol type + task_type_str = step.tool_name or mapping.action or "unknown" + protocol = (step.request_data or {}).get("protocol", "mcp") # Default to MCP + try: + status_enum = GeneratedTaskStatus(new_status) + except ValueError: + status_enum = GeneratedTaskStatus.unknown + + payload: Task | TaskStatusUpdateEvent | McpWebhookPayload + if protocol == "a2a": + payload = create_a2a_webhook_payload( + task_id=step.step_id, + status=status_enum, + context_id=step.context_id, + result=step.response_data or {}, + ) + else: + # TODO: Fix in adcp python client - create_mcp_webhook_payload should return + # McpWebhookPayload instead of dict[str, Any] for proper type safety + mcp_payload_dict = create_mcp_webhook_payload(step.step_id, status_enum, step.response_data) + payload = McpWebhookPayload.model_construct(**mcp_payload_dict) + + metadata: dict[str, Any] = { + "task_type": task_type_str, + "tenant_id": derived_tenant_id, + "principal_id": derived_principal_id, + } + try: # If we're already in an event loop, schedule the send; otherwise run it directly try: @@ -657,11 +691,8 @@ def _send_push_notifications(self, step: WorkflowStep, new_status: str, session: task = loop.create_task( service.send_notification( push_notification_config=push_notification_config, - task_id=step.step_id, - task_type=step.tool_name or mapping.action or "unknown", - status=new_status, - result=step.response_data, - error=step.error_message, + payload=payload, + metadata=metadata, ) ) @@ -680,11 +711,8 @@ def _log_task_result( asyncio.run( service.send_notification( push_notification_config=push_notification_config, - task_id=step.step_id, - task_type=step.tool_name or mapping.action or "unknown", - status=new_status, - result=step.response_data, - error=step.error_message, + payload=payload, + metadata=metadata, ) ) console.print( diff --git a/src/core/database/models.py b/src/core/database/models.py index e1366a010..7d9f371f8 100644 --- a/src/core/database/models.py +++ b/src/core/database/models.py @@ -1916,7 +1916,7 @@ class WebhookDeliveryLog(Base): String, ForeignKey("media_buys.media_buy_id", ondelete="CASCADE"), nullable=False ) webhook_url: Mapped[str] = mapped_column(String, nullable=False) - task_type: Mapped[str] = mapped_column(String, nullable=False) # "delivery_report" + task_type: Mapped[str] = mapped_column(String, nullable=False) # "media_buy_delivery" # AdCP webhook metadata sequence_number: Mapped[int] = mapped_column(Integer, nullable=False, server_default="1") diff --git a/src/core/tools/creatives.py b/src/core/tools/creatives.py index d0efdd52c..685734b44 100644 --- a/src/core/tools/creatives.py +++ b/src/core/tools/creatives.py @@ -1261,6 +1261,7 @@ def _sync_creatives_impl( # Track assignments per creative for response population assignments_by_creative: dict[str, list[str]] = {} # creative_id -> [package_ids] assignment_errors_by_creative: dict[str, dict[str, str]] = {} # creative_id -> {package_id: error} + media_buys_with_new_assignments: dict[str, Any] = {} # media_buy_id -> MediaBuy object # Note: assignments should be a dict, but handle both dict and None if assignments and isinstance(assignments, dict): @@ -1422,6 +1423,10 @@ def normalize_url(url: str | None) -> str | None: f"package={actual_package_id}, media_buy={media_buy_id}" ) + # Track media buy for potential status update (for any assignment, new or existing) + if media_buy_id and db_media_buy and media_buy_id not in media_buys_with_new_assignments: + media_buys_with_new_assignments[media_buy_id] = db_media_buy + assignment_list.append( CreativeAssignment( assignment_id=assignment.assignment_id, @@ -1436,6 +1441,14 @@ def normalize_url(url: str | None) -> str | None: if actual_package_id is not None: assignments_by_creative[creative_id].append(actual_package_id) + # Update media buy status if needed (draft -> pending_creatives) + for mb_id, mb_obj in media_buys_with_new_assignments.items(): + if mb_obj.status == "draft" and mb_obj.approved_at is not None: + mb_obj.status = "pending_creatives" + logger.info( + f"[SYNC_CREATIVES] Media buy {mb_id} transitioned from draft to pending_creatives" + ) + session.commit() # Update creative results with assignment information (per AdCP spec) @@ -1496,6 +1509,14 @@ def normalize_url(url: str | None) -> str | None: if push_notification_config: request_data_for_workflow["push_notification_config"] = push_notification_config + # Store context if provided (for echoing back in webhook) + if context: + request_data_for_workflow["context"] = context + + # Store protocol type for webhook payload creation + # ToolContext = A2A, Context (FastMCP) = MCP + request_data_for_workflow["protocol"] = "a2a" if isinstance(ctx, ToolContext) else "mcp" + step = ctx_manager.create_workflow_step( context_id=persistent_ctx.context_id, step_type="creative_approval", diff --git a/src/core/tools/media_buy_create.py b/src/core/tools/media_buy_create.py index 71964862e..86bf144ab 100644 --- a/src/core/tools/media_buy_create.py +++ b/src/core/tools/media_buy_create.py @@ -16,7 +16,7 @@ from urllib.parse import urlparse from adcp import BrandManifest, PushNotificationConfig -from adcp.types import MediaBuyStatus +from adcp.types import GeneratedTaskStatus as AdcpTaskStatus, MediaBuyStatus from adcp.types.generated_poc.core.context import ContextObject from adcp.types.generated_poc.core.creative_asset import CreativeAsset from adcp.types.generated_poc.core.targeting import TargetingOverlay @@ -185,7 +185,7 @@ def _extract_creative_url_and_dimensions( if creative_data.get("assets") and format_spec and format_spec.assets_required: # Use format spec to find the correct asset_id for image/video/url assets for asset_req in format_spec.assets_required: - asset_type = asset_req.asset_type.lower() + asset_type = str(asset_req.asset_type).lower() if asset_type in ["image", "video", "url"]: asset_id = asset_req.asset_id if asset_id in creative_data["assets"]: @@ -200,7 +200,7 @@ def _extract_creative_url_and_dimensions( if creative_data.get("assets") and format_spec and format_spec.assets_required: # Use format spec to find the correct asset_id for image/video assets for asset_req in format_spec.assets_required: - asset_type = asset_req.asset_type.lower() + asset_type = str(asset_req.asset_type).lower() if asset_type in ["image", "video"]: asset_id = asset_req.asset_id if asset_id in creative_data["assets"]: @@ -226,6 +226,48 @@ def _extract_creative_url_and_dimensions( if width and height: break + # Fallback: If format spec didn't work, iterate through all assets + if not url or not width or not height: + if creative_data.get("assets"): + for asset_id, asset_obj in creative_data["assets"].items(): + if isinstance(asset_obj, dict): + # Extract URL if not found yet + if not url and asset_obj.get("url"): + url = asset_obj["url"] + + # Extract dimensions if not found yet + if (not width or not height): + raw_width = asset_obj.get("width") + raw_height = asset_obj.get("height") + if raw_width is not None and not width: + try: + width = int(raw_width) + except (ValueError, TypeError): + pass + if raw_height is not None and not height: + try: + height = int(raw_height) + except (ValueError, TypeError): + pass + + # Stop if we found everything + if url and width and height: + break + + # Last resort: Check top-level fields + if not url: + url = creative_data.get("url") + if not width: + try: + width = int(creative_data["width"]) if creative_data.get("width") else None + except (ValueError, TypeError): + pass + if not height: + try: + height = int(creative_data["height"]) if creative_data.get("height") else None + except (ValueError, TypeError): + pass + return url, width, height @@ -1220,7 +1262,7 @@ async def _create_media_buy_impl( push_notification_config: dict[str, Any] | None = None, context: dict[str, Any] | None = None, # Optional application level context per adcp spec ctx: Context | ToolContext | None = None, -) -> CreateMediaBuySuccess | CreateMediaBuyError: +) -> tuple[CreateMediaBuySuccess | CreateMediaBuyError, AdcpTaskStatus]: """Create a media buy with the specified parameters. Args: @@ -1316,9 +1358,12 @@ async def _create_media_buy_impl( if not principal: error_msg = f"Principal {principal_id} not found" # Cannot create context or workflow step without valid principal - return CreateMediaBuyError( - errors=[Error(code="authentication_error", message=error_msg, details=None)], - context=to_context_object(req.context), + return ( + CreateMediaBuyError( + errors=[Error(code="authentication_error", message=error_msg, details=None)], + context=to_context_object(req.context), + ), + AdcpTaskStatus.failed, ) # Context management and workflow step creation - create workflow step FIRST @@ -1338,13 +1383,20 @@ async def _create_media_buy_impl( persistent_ctx = ctx_manager.create_context(tenant_id=tenant["tenant_id"], principal_id=principal_id) # Create workflow step for tracking this operation + # Prepare request data with protocol detection + request_data_for_workflow = req.model_dump(mode="json") + + # Store protocol type for webhook payload creation + # ToolContext = A2A, Context (FastMCP) = MCP + request_data_for_workflow["protocol"] = "a2a" if isinstance(ctx, ToolContext) else "mcp" + step = ctx_manager.create_workflow_step( context_id=persistent_ctx.context_id, step_type="media_buy_creation", owner="system", status="in_progress", tool_name="create_media_buy", - request_data=req.model_dump(mode="json"), + request_data=request_data_for_workflow, ) # Register push notification config if provided (MCP/A2A protocol support) @@ -1784,10 +1836,13 @@ async def _create_media_buy_impl( # Update workflow step as failed ctx_manager.update_workflow_step(step.step_id, status="failed", error_message=str(e)) - # Return error response (protocol layer will add status="failed") - return CreateMediaBuyError( - errors=[Error(code="validation_error", message=str(e), details=None)], - context=to_context_object(req.context), + # Return error response with failed status + return ( + CreateMediaBuyError( + errors=[Error(code="validation_error", message=str(e), details=None)], + context=to_context_object(req.context), + ), + AdcpTaskStatus.failed, ) # Principal already validated earlier (before context creation) to avoid foreign key errors @@ -2255,13 +2310,16 @@ async def _create_media_buy_impl( # The workflow_step_id in packages indicates approval is required # buyer_ref is required by schema, but mypy needs explicit check response_buyer_ref = req.buyer_ref if req.buyer_ref else "unknown" - return CreateMediaBuySuccess( - buyer_ref=response_buyer_ref, - media_buy_id=media_buy_id, - creative_deadline=None, - packages=cast(list[Any], pending_packages), - workflow_step_id=step.step_id, # Client can track approval via this ID - context=to_context_object(req.context), + return ( + CreateMediaBuySuccess( + buyer_ref=response_buyer_ref, + media_buy_id=media_buy_id, + creative_deadline=None, + packages=cast(list[Any], pending_packages), + workflow_step_id=step.step_id, # Client can track approval via this ID + context=to_context_object(req.context), + ), + AdcpTaskStatus.submitted, ) # Get products for the media buy to check product-level auto-creation settings @@ -2323,9 +2381,12 @@ async def _create_media_buy_impl( f" • {err}" for err in config_errors ) ctx_manager.update_workflow_step(step.step_id, status="failed", error_message=error_detail) - return CreateMediaBuyError( - errors=[Error(code="invalid_configuration", message=err, details=None) for err in config_errors], - context=to_context_object(req.context), + return ( + CreateMediaBuyError( + errors=[Error(code="invalid_configuration", message=err, details=None) for err in config_errors], + context=to_context_object(req.context), + ), + AdcpTaskStatus.failed, ) product_auto_create = all( @@ -2406,12 +2467,15 @@ async def _create_media_buy_impl( except Exception as e: logger.warning(f"⚠️ Failed to send configuration approval Slack notification: {e}") - return CreateMediaBuySuccess( - buyer_ref=req.buyer_ref if req.buyer_ref else "unknown", - media_buy_id=media_buy_id, - packages=cast(list[Any], response_packages), - workflow_step_id=step.step_id, - context=to_context_object(req.context), + return ( + CreateMediaBuySuccess( + buyer_ref=req.buyer_ref if req.buyer_ref else "unknown", + media_buy_id=media_buy_id, + packages=cast(list[Any], response_packages), + workflow_step_id=step.step_id, + context=to_context_object(req.context), + ), + AdcpTaskStatus.submitted, ) # Continue with synchronized media buy creation @@ -2638,9 +2702,12 @@ def _has_supported_key(url: str | None, fid: str, keys: set = product_format_key if not req.start_time or not req.end_time: error_msg = "start_time and end_time are required but were not properly set" ctx_manager.update_workflow_step(step.step_id, status="failed", error_message=error_msg) - return CreateMediaBuyError( - errors=[Error(code="invalid_datetime", message=error_msg, details=None)], - context=to_context_object(req.context), + return ( + CreateMediaBuyError( + errors=[Error(code="invalid_datetime", message=error_msg, details=None)], + context=to_context_object(req.context), + ), + AdcpTaskStatus.failed, ) # PRE-VALIDATE: Check all creatives have required fields BEFORE calling adapter @@ -2669,7 +2736,7 @@ def _has_supported_key(url: str | None, fid: str, keys: set = product_format_key error_msg = response.errors[0].message if response.errors else "Unknown error" error_code = response.errors[0].code if response.errors else "UNKNOWN" logger.error(f"[ADAPTER] Adapter returned error response: {error_code} - {error_msg}") - return response + return (response, AdcpTaskStatus.failed) # At this point, response is CreateMediaBuySuccess - safe to access success-specific fields # Type narrowing: media_buy_id must be present in successful response @@ -3426,7 +3493,7 @@ def serialize_for_json(value): }, ) - return modified_response + return (modified_response, AdcpTaskStatus.completed) except Exception as e: # Update workflow step as failed on any error during execution @@ -3564,13 +3631,10 @@ async def create_media_buy( brand_manifest.model_dump(mode="json") if isinstance(brand_manifest, BrandManifest) else brand_manifest ) packages_dicts = [p.model_dump(mode="json") for p in packages] - targeting_overlay_dict = targeting_overlay.model_dump(mode="json") if targeting_overlay else None - creatives_dicts = [c.model_dump(mode="json") for c in creatives] if creatives else None reporting_webhook_dict = reporting_webhook.model_dump(mode="json") if reporting_webhook else None - push_config_dict = push_notification_config.model_dump(mode="json") if push_notification_config else None context_dict = context.model_dump(mode="json") if context else None - response = await _create_media_buy_impl( + response, status = await _create_media_buy_impl( buyer_ref=buyer_ref, brand_manifest=brand_manifest_val, po_number=po_number, @@ -3581,7 +3645,10 @@ async def create_media_buy( context=context_dict, ctx=ctx, ) - return ToolResult(content=str(response), structured_content=response.model_dump()) + return ToolResult( + content=str(response), + structured_content={"status": status.value, **response.model_dump()}, + ) async def create_media_buy_raw( @@ -3636,9 +3703,9 @@ async def create_media_buy_raw( ctx: FastMCP context (automatically provided) (automatically provided) Returns: - CreateMediaBuyResponse with media buy details + Dict with status and CreateMediaBuyResponse data """ - return await _create_media_buy_impl( + response, status = await _create_media_buy_impl( buyer_ref=buyer_ref, brand_manifest=brand_manifest, po_number=po_number, @@ -3649,6 +3716,7 @@ async def create_media_buy_raw( context=context, ctx=ctx, ) + return {"status": status.value, **response.model_dump()} # Unified update tools diff --git a/src/core/tools/media_buy_update.py b/src/core/tools/media_buy_update.py index 1ba471e3f..cf52532ef 100644 --- a/src/core/tools/media_buy_update.py +++ b/src/core/tools/media_buy_update.py @@ -271,6 +271,13 @@ def _update_media_buy_impl( if persistent_ctx is None: raise ValueError("Failed to create or get persistent context") + # Prepare request data with protocol detection + request_data_for_workflow = req.model_dump(mode="json") # Convert dates to strings + + # Store protocol type for webhook payload creation + # ToolContext = A2A, Context (FastMCP) = MCP + request_data_for_workflow["protocol"] = "a2a" if isinstance(ctx, ToolContext) else "mcp" + # Create workflow step for this tool call step = ctx_manager.create_workflow_step( context_id=persistent_ctx.context_id, # Now safe to access @@ -278,7 +285,7 @@ def _update_media_buy_impl( owner="principal", status="in_progress", tool_name="update_media_buy", - request_data=req.model_dump(mode="json"), # Convert dates to strings + request_data=request_data_for_workflow, ) principal = get_principal_object(principal_id) # Now guaranteed to be str @@ -770,6 +777,16 @@ def normalize_url(url: str | None) -> str | None: ) session.add(assignment) + # If media buy was approved (approved_at set) but is in draft status + # (meaning it was approved without creatives), transition to pending_creatives + # Check whenever creative_ids are being set (not just when new ones added) + if pkg_update.creative_ids and media_buy_obj.status == "draft" and media_buy_obj.approved_at is not None: + media_buy_obj.status = "pending_creatives" + logger.info( + f"[UPDATE] Media buy {actual_media_buy_id} transitioned from draft to pending_creatives " + f"(creative_ids: {pkg_update.creative_ids})" + ) + session.commit() # Store results for affected_packages response @@ -956,6 +973,7 @@ def normalize_url(url: str | None) -> str | None: return response_data updated_assignments = [] + new_assignments_created = [] for ca in pkg_update.creative_assignments: # Schema validates and coerces dict inputs to LibraryCreativeAssignment @@ -997,6 +1015,17 @@ def normalize_url(url: str | None) -> str | None: ) session.add(new_assignment) updated_assignments.append(creative_id) + new_assignments_created.append(creative_id) + + # If media buy was approved (approved_at set) but is in draft status + # (meaning it was approved without creatives), transition to pending_creatives + # Check whenever creative_assignments are being set (not just when new ones created) + if pkg_update.creative_assignments and media_buy_obj.status == "draft" and media_buy_obj.approved_at is not None: + media_buy_obj.status = "pending_creatives" + logger.info( + f"[UPDATE] Media buy {actual_media_buy_id} transitioned from draft to pending_creatives " + f"(creative_assignments processed: {updated_assignments})" + ) session.commit() diff --git a/src/services/delivery_webhook_scheduler.py b/src/services/delivery_webhook_scheduler.py index 0232f8da8..bc72dd0d0 100644 --- a/src/services/delivery_webhook_scheduler.py +++ b/src/services/delivery_webhook_scheduler.py @@ -21,6 +21,8 @@ from src.core.tool_context import ToolContext from src.core.tools.media_buy_delivery import _get_media_buy_delivery_impl from src.services.protocol_webhook_service import get_protocol_webhook_service +from adcp import create_mcp_webhook_payload, create_a2a_webhook_payload +from adcp.types import GeneratedTaskStatus as AdcpTaskStatus, McpWebhookPayload logger = logging.getLogger(__name__) @@ -116,29 +118,39 @@ async def _send_reports(self) -> None: except Exception as e: logger.error(f"Error in daily delivery report batch: {e}", exc_info=True) - async def trigger_report_for_media_buy(self, media_buy: Any, session: Any) -> bool: - """Manually trigger a delivery report for a single media buy. + async def trigger_report_for_media_buy_by_id(self, media_buy_id: str, tenant_id: str) -> bool: + """Manually trigger a delivery report for a single media buy by ID. + + This method manages its own database session to avoid detached instance errors. Args: - media_buy: MediaBuy database model - session: Database session + media_buy_id: The media buy ID + tenant_id: The tenant ID Returns: bool: True if report was triggered successfully, False otherwise """ try: - raw_request = media_buy.raw_request or {} - reporting_webhook = raw_request.get("reporting_webhook") + with get_db_session() as session: + stmt = select(MediaBuy).filter_by(media_buy_id=media_buy_id, tenant_id=tenant_id) + media_buy = session.scalars(stmt).first() + + if not media_buy: + logger.warning(f"Cannot trigger report: Media buy {media_buy_id} not found") + return False + + raw_request = media_buy.raw_request or {} + reporting_webhook = raw_request.get("reporting_webhook") - if not reporting_webhook: - logger.warning(f"Cannot trigger report: No reporting_webhook configured for {media_buy.media_buy_id}") - return False + if not reporting_webhook: + logger.warning(f"Cannot trigger report: No reporting_webhook configured for {media_buy_id}") + return False - # Force sending even if already sent today (for testing) - await self._send_report_for_media_buy(media_buy, reporting_webhook, session, force=True) - return True + # Force sending even if already sent today (for testing) + await self._send_report_for_media_buy(media_buy, reporting_webhook, session, force=True) + return True except Exception as e: - logger.error(f"Error manually triggering report for {media_buy.media_buy_id}: {e}", exc_info=True) + logger.error(f"Error manually triggering report for {media_buy_id}: {e}", exc_info=True) return False async def _send_report_for_media_buy( @@ -241,18 +253,14 @@ async def _send_report_for_media_buy( next_day = datetime.now(UTC).date() + timedelta(days=1) next_expected_at = datetime.combine(next_day, datetime.min.time(), tzinfo=UTC).isoformat() - # Add webhook-specific metadata to the response - response_dict = delivery_response.model_dump() - response_dict.update( - { - "notification_type": "scheduled", - "sequence_number": sequence_number, - "next_expected_at": next_expected_at, - "frequency": raw_freq, - "partial_data": False, # TODO: Check for reporting_delayed status in media_buy_deliveries - "unavailable_count": 0, # TODO: Count reporting_delayed/failed deliveries - } - ) + # Convert delivery response to dict and add webhook-specific metadata + # Note: GetMediaBuyDeliveryResponse doesn't have these webhook fields, + # so we add them as extra data in the result dict + media_buy_delivery_result: dict[str, Any] = delivery_response.model_dump(mode="json") + media_buy_delivery_result["notification_type"] = "scheduled" + media_buy_delivery_result["next_expected_at"] = next_expected_at + media_buy_delivery_result["partial_data"] = False # TODO: Check for reporting_delayed status in media_buy_deliveries + media_buy_delivery_result["unavailable_count"] = 0 # TODO: Count reporting_delayed/failed deliveries # Extract webhook URL and authentication webhook_url = reporting_webhook.get("url") @@ -277,16 +285,15 @@ async def _send_report_for_media_buy( DBPushNotificationConfig.url == webhook_url, DBPushNotificationConfig.is_active, ) - push_config = session.scalars(config_stmt).first() + push_notification_config = session.scalars(config_stmt).first() # Extract webhook config data before session closes - if push_config: + if push_notification_config: # Detach from session and extract data - session.expunge(push_config) - webhook_config = push_config + session.expunge(push_notification_config) else: # Create a detached temporary config (not attached to session) - webhook_config = DBPushNotificationConfig( + push_notification_config = DBPushNotificationConfig( id=f"temp_{media_buy.media_buy_id}", tenant_id=media_buy.tenant_id, principal_id=media_buy.principal_id, @@ -296,17 +303,28 @@ async def _send_report_for_media_buy( is_active=True, ) + metadata = { + "task_type": "media_buy_delivery", + "tenant_id": media_buy.tenant_id, + "principal_id": media_buy.principal_id, + "media_buy_id": media_buy.media_buy_id, + } + + # TODO: Fix in adcp python client - create_mcp_webhook_payload should return + # McpWebhookPayload instead of dict[str, Any] for proper type safety + mcp_payload_dict = create_mcp_webhook_payload( + task_id=media_buy.media_buy_id, # TODO: @yusuf - double check if using media buy id is correct for media buy delivery??? + result=media_buy_delivery_result, + status=AdcpTaskStatus.completed + ) + media_buy_delivery_payload = McpWebhookPayload.model_construct(**mcp_payload_dict) + # Send webhook notification OUTSIDE the session context # This ensures the session is closed before async webhook call await self.webhook_service.send_notification( - task_type="media_buy_delivery", - task_id=media_buy.media_buy_id, - status="completed", - push_notification_config=webhook_config, - result=response_dict, # Use modified dict with webhook metadata - tenant_id=media_buy.tenant_id, - principal_id=media_buy.principal_id, - media_buy_id=media_buy.media_buy_id, + push_notification_config=push_notification_config, + payload=media_buy_delivery_payload, + metadata=metadata ) logger.info(f"Sent delivery report webhook for media buy {media_buy.media_buy_id}") diff --git a/src/services/protocol_webhook_service.py b/src/services/protocol_webhook_service.py index 714ae8c66..eef9b2561 100644 --- a/src/services/protocol_webhook_service.py +++ b/src/services/protocol_webhook_service.py @@ -19,9 +19,12 @@ import logging import time from datetime import UTC, datetime -from typing import Any +from typing import Any, cast from urllib.parse import urlparse, urlunparse from uuid import uuid4 +from a2a.types import Task, TaskStatusUpdateEvent +from adcp.types import McpWebhookPayload +from adcp import get_adcp_signed_headers_for_webhook, extract_webhook_result_data import requests @@ -31,7 +34,6 @@ logger = logging.getLogger(__name__) - def _normalize_localhost_for_docker(url: str) -> str: """Replace localhost host with host.docker.internal while preserving userinfo and port.""" try: @@ -67,52 +69,29 @@ def __init__(self): async def send_notification( self, - task_type: str, - task_id: str, - status: str, push_notification_config: PushNotificationConfig, - result: dict[str, Any] | None = None, - error: str | None = None, - tenant_id: str | None = None, - principal_id: str | None = None, - media_buy_id: str | None = None, + payload: Task | TaskStatusUpdateEvent | McpWebhookPayload, + metadata: dict[str, Any] ) -> bool: """ Send a protocol-level push notification to the configured webhook. Args: push_notification_config: Push notification configuration from protocol layer - task_type: Type of task ("sync_creatives", "media_buy", "delivery_report", etc.) - task_id: Task/operation ID - status: Status of operation ("working", "completed", "failed") - result: Result data - error: Error message if failed - tenant_id: Tenant ID for audit logging - principal_id: Principal ID for audit logging - media_buy_id: Media buy ID (for delivery_report tasks) - + payload: For A2A it can be Task or TaskStatusUpdateEvent types for MCP it wil be McpWebhookPayload. + Use create_a2a_webhook_payload or create_mcp_webhook_payload from adcp's official python client to get the payload for particular task and status + metadata: Contains app specific metadata's such as task_type, tenant_id, principal_id + Returns: True if notification sent successfully, False otherwise """ if not push_notification_config or not push_notification_config.url: - logger.debug(f"No webhook URL configured for task {task_id}, skipping notification") + # TODO: @yusuf - Double check logging actually works for Task, TaskStatusUpdateEvent and McpWebhookPayload types + logger.debug(f"No webhook URL configured in the push notification. Here's payload: {payload}, skipping notification") return False url = _normalize_localhost_for_docker(push_notification_config.url) - # Build notification payload (AdCP standard format) - payload: dict[str, Any] = { - "task_id": task_id, - "task_type": task_type, - "status": status, - "timestamp": datetime.now(UTC).isoformat(), - } - - if result: - payload["result"] = result - if error: - payload["error"] = error - # Prepare headers headers = {"Content-Type": "application/json", "User-Agent": "AdCP-Sales-Agent/1.0"} @@ -123,88 +102,73 @@ async def send_notification( push_notification_config.authentication_type if hasattr(push_notification_config, "authentication_type") else None - ), - "is_active": push_notification_config.is_active if hasattr(push_notification_config, "is_active") else None, + ) # DO NOT log authentication_token - security risk } logger.info(f"push_notification_config (sanitized): {safe_config}") + # Serialize payload to dict for signing and sending + # Task/TaskStatusUpdateEvent need serialization; McpWebhookPayload is already AdCPBaseModel + payload_dict: dict[str, Any] + if isinstance(payload, (Task, TaskStatusUpdateEvent)): + payload_dict = payload.model_dump(mode="json", exclude_none=True) + elif isinstance(payload, McpWebhookPayload): + payload_dict = payload.model_dump(mode="json", exclude_none=True) + else: + payload_dict = payload + # Apply authentication based on schemes if ( push_notification_config.authentication_type == "HMAC-SHA256" and push_notification_config.authentication_token ): # Sign payload with HMAC-SHA256 - timestamp = str(int(time.time())) - payload_str = json.dumps(payload, sort_keys=False, separators=(",", ":")) - message = f"{timestamp}.{payload_str}" - signature = hmac.new( - push_notification_config.authentication_token.encode("utf-8"), message.encode("utf-8"), hashlib.sha256 - ).hexdigest() - - headers["X-AdCP-Signature"] = f"sha256={signature}" - headers["X-AdCP-Timestamp"] = timestamp + get_adcp_signed_headers_for_webhook(headers, push_notification_config.authentication_token, timestamp, payload_dict) elif push_notification_config.authentication_type == "Bearer" and push_notification_config.authentication_token: # Use Bearer token authentication headers["Authorization"] = f"Bearer {push_notification_config.authentication_token}" - - # Calculate payload size for metrics - payload_size_bytes = len(json.dumps(payload).encode("utf-8")) - - notification_type_from_request = result.get("notification_type") if result is not None else None - sequence_number_from_result = result.get("sequence_number") if result is not None else None + # Send notification with retry logic and logging return await self._send_with_retry_and_logging( url=url, - payload=payload, + payload=payload_dict, headers=headers, - task_id=task_id, - task_type=task_type, - tenant_id=tenant_id, - principal_id=principal_id, - media_buy_id=media_buy_id, - notification_type=notification_type_from_request, - sequence_number=sequence_number_from_result if isinstance(sequence_number_from_result, int) else 1, - payload_size_bytes=payload_size_bytes, + metadata=metadata ) async def _send_with_retry_and_logging( self, url: str, - payload: dict, + payload: dict[str, Any], headers: dict, - task_id: str, - task_type: str, - tenant_id: str | None = None, - principal_id: str | None = None, - media_buy_id: str | None = None, - notification_type: str | None = None, - sequence_number: int = 1, - payload_size_bytes: int = 0, + metadata: dict[str, Any], max_attempts: int = 3, ) -> bool: - """Send webhook with exponential backoff retry logic, logging, and audit trail. + """Send webhook with exponential backoff retry logic, logging, and audit trail.""" + # Calculate payload size for metrics + payload_size_bytes = len(json.dumps(payload).encode("utf-8")) - Args: - url: Webhook URL - payload: JSON payload - headers: HTTP headers - task_id: Task ID for logging - task_type: Task type ("delivery_report", etc.) - tenant_id: Tenant ID for logging - principal_id: Principal ID for logging - media_buy_id: Media buy ID (for delivery_report tasks) - notification_type: Notification type for AdCP - sequence_number: Sequence number for this webhook - payload_size_bytes: Size of payload in bytes - max_attempts: Maximum retry attempts (default: 3) + task_type=metadata['task_type'] if 'task_type' in metadata else None + tenant_id=metadata['tenant_id'] if 'tenant_id' in metadata else None + principal_id=metadata['principal_id'] if 'principal_id' in metadata else None + media_buy_id=metadata['media_buy_id'] if 'media_buy_id' in metadata else None + + # TODO: Fix type annotation discrepancy in adcp library - extract_webhook_result_data + # returns dict at runtime but is typed as AdcpAsyncResponseData | None + result = cast(dict[str, Any] | None, extract_webhook_result_data(payload)) + # After serialization, payload is always a dict - extract task_id accordingly + # A2A Task uses 'id', TaskStatusUpdateEvent uses 'task_id', MCP uses 'task_id' + task_id = payload.get('id') or payload.get('task_id') or '' + + # If we are delivering media buy delivery report + notification_type_from_result=result.get("notification_type") if result is not None else None + sequence_number_from_result=result.get("sequence_number") if result is not None else None + notification_type=notification_type_from_result + sequence_number=sequence_number_from_result if isinstance(sequence_number_from_result, int) else 1 - Returns: - True if successful, False if all attempts failed - """ # Create webhook delivery log entry log_id = str(uuid4()) start_time = time.time() diff --git a/tests/e2e/test_a2a_webhook_payload_types.py b/tests/e2e/test_a2a_webhook_payload_types.py new file mode 100644 index 000000000..73e975a96 --- /dev/null +++ b/tests/e2e/test_a2a_webhook_payload_types.py @@ -0,0 +1,666 @@ +#!/usr/bin/env python3 +""" +E2E tests for A2A webhook payload type compliance. + +Per AdCP A2A spec (https://docs.adcontextprotocol.org/docs/protocols/a2a-guide#push-notifications-a2a-specific): +- Final states (completed, failed, canceled): Send full Task object with artifacts +- Intermediate states (working, input-required, submitted): Send TaskStatusUpdateEvent + +This test validates that our A2A server sends the correct payload type based on status. +""" + +import json +import socket +import uuid +from http.server import BaseHTTPRequestHandler, HTTPServer +from threading import Thread +from time import sleep +from typing import Any + +import httpx +import psycopg2 +import pytest + + +class WebhookPayloadCapture(BaseHTTPRequestHandler): + """Simple webhook receiver that captures all payloads with their types.""" + + received_payloads: list[dict[str, Any]] = [] + + def do_POST(self): + """Handle POST requests (webhook notifications).""" + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length) + + try: + payload = json.loads(body.decode("utf-8")) + + # Determine payload type based on A2A spec: + # - Task has 'id' field + # - TaskStatusUpdateEvent has 'taskId' field + payload_type = "unknown" + if "taskId" in payload: + payload_type = "TaskStatusUpdateEvent" + elif "id" in payload: + payload_type = "Task" + + # Extract status + status = None + if "status" in payload: + status_obj = payload["status"] + if isinstance(status_obj, dict): + status = status_obj.get("state") + else: + status = str(status_obj) + + self.received_payloads.append({ + "payload": payload, + "payload_type": payload_type, + "status": status, + "path": self.path, + }) + + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(b'{"status": "received"}') + except Exception as e: + self.send_response(500) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"error": str(e)}).encode()) + + def log_message(self, format, *args): + """Silence HTTP server logs during tests.""" + pass + + +@pytest.fixture +def webhook_capture_server(): + """Start a local HTTP server to capture webhook payloads.""" + # Clear any previous captures + WebhookPayloadCapture.received_payloads.clear() + + # Find an available port + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(("0.0.0.0", 0)) + port = s.getsockname()[1] + s.close() + + # Start server on all interfaces so it's reachable from Docker container + server = HTTPServer(("0.0.0.0", port), WebhookPayloadCapture) + thread = Thread(target=server.serve_forever, daemon=True) + thread.start() + + # Use localhost in URL - the MCP server's protocol_webhook_service + # rewrites it to host.docker.internal + webhook_url = f"http://localhost:{port}/webhook" + + yield { + "url": webhook_url, + "server": server, + "received": WebhookPayloadCapture.received_payloads, + } + + server.shutdown() + WebhookPayloadCapture.received_payloads.clear() + + +class TestA2AWebhookPayloadTypes: + """Test A2A webhook payload type compliance with AdCP spec.""" + + def setup_auto_approval(self, live_server): + """Configure adapter for auto-approval to get completed webhooks.""" + try: + conn = psycopg2.connect(live_server["postgres"]) + cursor = conn.cursor() + + cursor.execute("SELECT tenant_id FROM tenants WHERE subdomain = 'ci-test'") + tenant_row = cursor.fetchone() + if tenant_row: + tenant_id = tenant_row[0] + cursor.execute( + """ + INSERT INTO adapter_config (tenant_id, adapter_type, mock_manual_approval_required) + VALUES (%s, 'mock', false) + ON CONFLICT (tenant_id) + DO UPDATE SET mock_manual_approval_required = false, adapter_type = 'mock' + """, + (tenant_id,), + ) + conn.commit() + print(f"Updated adapter config for tenant {tenant_id}: auto-approval enabled") + + cursor.close() + conn.close() + except Exception as e: + print(f"Failed to update adapter config: {e}") + + def setup_manual_approval(self, live_server): + """Configure adapter for manual approval to get submitted webhooks.""" + try: + conn = psycopg2.connect(live_server["postgres"]) + cursor = conn.cursor() + + cursor.execute("SELECT tenant_id FROM tenants WHERE subdomain = 'ci-test'") + tenant_row = cursor.fetchone() + if tenant_row: + tenant_id = tenant_row[0] + cursor.execute( + """ + INSERT INTO adapter_config (tenant_id, adapter_type, mock_manual_approval_required) + VALUES (%s, 'mock', true) + ON CONFLICT (tenant_id) + DO UPDATE SET mock_manual_approval_required = true, adapter_type = 'mock' + """, + (tenant_id,), + ) + conn.commit() + print(f"Updated adapter config for tenant {tenant_id}: manual approval required") + + cursor.close() + conn.close() + except Exception as e: + print(f"Failed to update adapter config: {e}") + + @pytest.mark.asyncio + async def test_completed_status_sends_task_payload( + self, + docker_services_e2e, + live_server, + test_auth_token, + webhook_capture_server, + ): + """ + Test that completed status sends a Task payload (not TaskStatusUpdateEvent). + + Per AdCP spec: + - Completed is a final state + - Final states should send Task object with artifacts + """ + # Enable auto-approval so create_media_buy completes immediately + self.setup_auto_approval(live_server) + + a2a_url = f"{live_server['a2a']}/a2a" + context_id = str(uuid.uuid4()) + + # Send A2A create_media_buy message with push notification config + message = { + "jsonrpc": "2.0", + "id": str(uuid.uuid4()), + "method": "message/send", + "params": { + "message": { + "messageId": str(uuid.uuid4()), + "contextId": context_id, + "role": "user", # Required by A2A spec + "parts": [ + { + "data": { + "skill": "create_media_buy", + "parameters": { + "product_ids": ["video_premium"], + "total_budget": 5000.0, + "start_time": "2025-03-01T00:00:00Z", + "end_time": "2025-03-31T23:59:59Z", + "brand_manifest": {"name": "Webhook Type Test Brand"}, + "context": {"e2e": "webhook_completed_test"}, + }, + } + } + ], + }, + "configuration": { + "pushNotificationConfig": { + "url": webhook_capture_server["url"], + "authentication": { + "schemes": ["Bearer"], + "credentials": "test-webhook-token", + }, + } + }, + }, + } + + headers = { + "Authorization": f"Bearer {test_auth_token}", + "Content-Type": "application/json", + "x-adcp-tenant": "ci-test", + } + + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post(a2a_url, json=message, headers=headers) + + # Request should succeed + assert response.status_code == 200, f"A2A request failed: {response.text}" + result = response.json() + assert "error" not in result, f"A2A error: {result.get('error')}" + + # Wait for webhook to be delivered + timeout_seconds = 15 + poll_interval = 0.5 + elapsed = 0 + + while elapsed < timeout_seconds and not webhook_capture_server["received"]: + sleep(poll_interval) + elapsed += poll_interval + + # Verify webhook was received + received = webhook_capture_server["received"] + assert received, "Expected at least one webhook delivery" + + # Find the completed status webhook + completed_webhooks = [w for w in received if w["status"] == "completed"] + + if completed_webhooks: + webhook = completed_webhooks[0] + # Per AdCP spec: completed status should send Task (has 'id' field) + assert webhook["payload_type"] == "Task", ( + f"Completed status should send Task payload, not {webhook['payload_type']}. " + f"Payload has 'id': {'id' in webhook['payload']}, 'taskId': {'taskId' in webhook['payload']}" + ) + + # Verify Task structure + payload = webhook["payload"] + assert "id" in payload, "Task payload must have 'id' field" + assert "status" in payload, "Task payload must have 'status' field" + + # Per AdCP spec: completed status MUST have result in artifacts[0].parts[] + assert "artifacts" in payload, "Completed Task must have 'artifacts' field" + assert len(payload["artifacts"]) > 0, "Completed Task must have at least one artifact" + artifact = payload["artifacts"][0] + assert "parts" in artifact, "Artifact must have 'parts' field" + assert len(artifact["parts"]) > 0, "Artifact must have at least one part" + + @pytest.mark.asyncio + async def test_submitted_status_sends_task_status_update_event( + self, + docker_services_e2e, + live_server, + test_auth_token, + webhook_capture_server, + ): + """ + Test that submitted status sends a TaskStatusUpdateEvent payload. + + Per AdCP spec: + - Submitted is an intermediate state + - Intermediate states should send TaskStatusUpdateEvent + """ + # Enable manual approval so create_media_buy returns submitted state + self.setup_manual_approval(live_server) + + a2a_url = f"{live_server['a2a']}/a2a" + context_id = str(uuid.uuid4()) + + # Send A2A create_media_buy message that triggers approval workflow + message = { + "jsonrpc": "2.0", + "id": str(uuid.uuid4()), + "method": "message/send", + "params": { + "message": { + "messageId": str(uuid.uuid4()), + "contextId": context_id, + "role": "user", # Required by A2A spec + "parts": [ + { + "data": { + "skill": "create_media_buy", + "parameters": { + "product_ids": ["video_premium"], + "total_budget": 50000.0, + "start_time": "2025-04-01T00:00:00Z", + "end_time": "2025-04-30T23:59:59Z", + "brand_manifest": {"name": "Webhook Submitted Test Brand"}, + "context": {"e2e": "webhook_submitted_test"}, + }, + } + } + ], + }, + "configuration": { + "pushNotificationConfig": { + "url": webhook_capture_server["url"], + "authentication": { + "schemes": ["Bearer"], + "credentials": "test-webhook-token", + }, + } + }, + }, + } + + headers = { + "Authorization": f"Bearer {test_auth_token}", + "Content-Type": "application/json", + "x-adcp-tenant": "ci-test", + } + + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post(a2a_url, json=message, headers=headers) + + # Request should succeed (returns submitted status for async operations) + assert response.status_code == 200, f"A2A request failed: {response.text}" + + # Wait for webhook to be delivered + timeout_seconds = 15 + poll_interval = 0.5 + elapsed = 0 + + while elapsed < timeout_seconds and not webhook_capture_server["received"]: + sleep(poll_interval) + elapsed += poll_interval + + received = webhook_capture_server["received"] + + # Check for submitted webhooks + submitted_webhooks = [w for w in received if w["status"] == "submitted"] + + if submitted_webhooks: + webhook = submitted_webhooks[0] + # Per AdCP spec: submitted status should send TaskStatusUpdateEvent (has 'taskId' field) + assert webhook["payload_type"] == "TaskStatusUpdateEvent", ( + f"Submitted status should send TaskStatusUpdateEvent payload, not {webhook['payload_type']}. " + f"Payload has 'id': {'id' in webhook['payload']}, 'taskId': {'taskId' in webhook['payload']}" + ) + + # Verify TaskStatusUpdateEvent structure + payload = webhook["payload"] + assert "taskId" in payload, "TaskStatusUpdateEvent payload must have 'taskId' field" + assert "status" in payload, "TaskStatusUpdateEvent payload must have 'status' field" + assert "state" in payload["status"], "TaskStatusUpdateEvent.status must have 'state' field" + + @pytest.mark.asyncio + async def test_webhook_payload_type_matches_status( + self, + docker_services_e2e, + live_server, + test_auth_token, + webhook_capture_server, + ): + """ + Test that all received webhooks use correct payload type for their status. + + Per AdCP spec: + - Final states (completed, failed, canceled): Task + - Intermediate states (working, input-required, submitted): TaskStatusUpdateEvent + """ + # Enable auto-approval + self.setup_auto_approval(live_server) + + a2a_url = f"{live_server['a2a']}/a2a" + context_id = str(uuid.uuid4()) + + # Send create_media_buy request + message = { + "jsonrpc": "2.0", + "id": str(uuid.uuid4()), + "method": "message/send", + "params": { + "message": { + "messageId": str(uuid.uuid4()), + "contextId": context_id, + "role": "user", # Required by A2A spec + "parts": [ + { + "data": { + "skill": "create_media_buy", + "parameters": { + "product_ids": ["video_premium"], + "total_budget": 8000.0, + "start_time": "2025-05-01T00:00:00Z", + "end_time": "2025-05-31T23:59:59Z", + "brand_manifest": {"name": "Payload Validation Test"}, + }, + } + } + ], + }, + "configuration": { + "pushNotificationConfig": { + "url": webhook_capture_server["url"], + } + }, + }, + } + + headers = { + "Authorization": f"Bearer {test_auth_token}", + "Content-Type": "application/json", + "x-adcp-tenant": "ci-test", + } + + async with httpx.AsyncClient(timeout=30.0) as client: + await client.post(a2a_url, json=message, headers=headers) + + # Wait for webhooks + timeout_seconds = 15 + elapsed = 0 + + while elapsed < timeout_seconds and not webhook_capture_server["received"]: + sleep(0.5) + elapsed += 0.5 + + received = webhook_capture_server["received"] + + # Define expected payload types per status + final_states = {"completed", "failed", "canceled"} + intermediate_states = {"working", "input-required", "submitted"} + + # Validate each received webhook + for webhook in received: + status = webhook["status"] + payload_type = webhook["payload_type"] + + if status in final_states: + assert payload_type == "Task", ( + f"Final state '{status}' should use Task payload, got {payload_type}" + ) + elif status in intermediate_states: + assert payload_type == "TaskStatusUpdateEvent", ( + f"Intermediate state '{status}' should use TaskStatusUpdateEvent payload, got {payload_type}" + ) + # Unknown states are logged but not asserted + + +class TestWebhookPayloadStructure: + """Test webhook payload structure compliance.""" + + def setup_auto_approval(self, live_server): + """Configure adapter for auto-approval.""" + try: + conn = psycopg2.connect(live_server["postgres"]) + cursor = conn.cursor() + + cursor.execute("SELECT tenant_id FROM tenants WHERE subdomain = 'ci-test'") + tenant_row = cursor.fetchone() + if tenant_row: + tenant_id = tenant_row[0] + cursor.execute( + """ + INSERT INTO adapter_config (tenant_id, adapter_type, mock_manual_approval_required) + VALUES (%s, 'mock', false) + ON CONFLICT (tenant_id) + DO UPDATE SET mock_manual_approval_required = false, adapter_type = 'mock' + """, + (tenant_id,), + ) + conn.commit() + + cursor.close() + conn.close() + except Exception as e: + print(f"Failed to update adapter config: {e}") + + @pytest.mark.asyncio + async def test_task_payload_has_required_fields( + self, + docker_services_e2e, + live_server, + test_auth_token, + webhook_capture_server, + ): + """Test that Task payload has all required A2A fields.""" + self.setup_auto_approval(live_server) + + a2a_url = f"{live_server['a2a']}/a2a" + + message = { + "jsonrpc": "2.0", + "id": str(uuid.uuid4()), + "method": "message/send", + "params": { + "message": { + "messageId": str(uuid.uuid4()), + "contextId": str(uuid.uuid4()), + "role": "user", # Required by A2A spec + "parts": [ + { + "data": { + "skill": "create_media_buy", + "parameters": { + "product_ids": ["video_premium"], + "total_budget": 3000.0, + "start_time": "2025-06-01T00:00:00Z", + "end_time": "2025-06-30T23:59:59Z", + }, + } + } + ], + }, + "configuration": { + "pushNotificationConfig": {"url": webhook_capture_server["url"]} + }, + }, + } + + headers = { + "Authorization": f"Bearer {test_auth_token}", + "Content-Type": "application/json", + "x-adcp-tenant": "ci-test", + } + + async with httpx.AsyncClient(timeout=30.0) as client: + await client.post(a2a_url, json=message, headers=headers) + + # Wait for webhook + timeout_seconds = 15 + elapsed = 0 + while elapsed < timeout_seconds and not webhook_capture_server["received"]: + sleep(0.5) + elapsed += 0.5 + + received = webhook_capture_server["received"] + task_webhooks = [w for w in received if w["payload_type"] == "Task"] + + for webhook in task_webhooks: + payload = webhook["payload"] + + # Required Task fields per A2A spec + assert "id" in payload, "Task must have 'id' field" + assert "status" in payload, "Task must have 'status' field" + + status = payload["status"] + assert "state" in status, "Task.status must have 'state' field" + + # Per AdCP spec: completed/failed MUST have result in artifacts[0].parts[] + if status["state"] in ("completed", "failed"): + assert "artifacts" in payload, f"Task with status '{status['state']}' must have 'artifacts'" + assert isinstance(payload["artifacts"], list), "artifacts must be a list" + assert len(payload["artifacts"]) > 0, "artifacts must have at least one item" + assert "parts" in payload["artifacts"][0], "artifact must have 'parts'" + assert len(payload["artifacts"][0]["parts"]) > 0, "artifact.parts must have at least one part" + + @pytest.mark.asyncio + async def test_task_status_update_event_has_required_fields( + self, + docker_services_e2e, + live_server, + test_auth_token, + webhook_capture_server, + ): + """Test that TaskStatusUpdateEvent payload has all required A2A fields.""" + # Enable manual approval to get submitted status + try: + conn = psycopg2.connect(live_server["postgres"]) + cursor = conn.cursor() + cursor.execute("SELECT tenant_id FROM tenants WHERE subdomain = 'ci-test'") + tenant_row = cursor.fetchone() + if tenant_row: + tenant_id = tenant_row[0] + cursor.execute( + """ + INSERT INTO adapter_config (tenant_id, adapter_type, mock_manual_approval_required) + VALUES (%s, 'mock', true) + ON CONFLICT (tenant_id) + DO UPDATE SET mock_manual_approval_required = true, adapter_type = 'mock' + """, + (tenant_id,), + ) + conn.commit() + cursor.close() + conn.close() + except Exception as e: + print(f"Failed to update adapter config: {e}") + + a2a_url = f"{live_server['a2a']}/a2a" + + # Trigger an async operation that sends intermediate status + message = { + "jsonrpc": "2.0", + "id": str(uuid.uuid4()), + "method": "message/send", + "params": { + "message": { + "messageId": str(uuid.uuid4()), + "contextId": str(uuid.uuid4()), + "role": "user", # Required by A2A spec + "parts": [ + { + "data": { + "skill": "create_media_buy", + "parameters": { + "product_ids": ["video_premium"], + "total_budget": 10000.0, + "start_time": "2025-07-01T00:00:00Z", + "end_time": "2025-07-31T23:59:59Z", + }, + } + } + ], + }, + "configuration": { + "pushNotificationConfig": {"url": webhook_capture_server["url"]} + }, + }, + } + + headers = { + "Authorization": f"Bearer {test_auth_token}", + "Content-Type": "application/json", + "x-adcp-tenant": "ci-test", + } + + async with httpx.AsyncClient(timeout=30.0) as client: + await client.post(a2a_url, json=message, headers=headers) + + # Wait for webhook + timeout_seconds = 15 + elapsed = 0 + while elapsed < timeout_seconds and not webhook_capture_server["received"]: + sleep(0.5) + elapsed += 0.5 + + received = webhook_capture_server["received"] + event_webhooks = [w for w in received if w["payload_type"] == "TaskStatusUpdateEvent"] + + for webhook in event_webhooks: + payload = webhook["payload"] + + # Required TaskStatusUpdateEvent fields per A2A spec + assert "taskId" in payload, "TaskStatusUpdateEvent must have 'taskId' field" + assert "status" in payload, "TaskStatusUpdateEvent must have 'status' field" + + status = payload["status"] + assert "state" in status, "TaskStatusUpdateEvent.status must have 'state' field" diff --git a/tests/e2e/test_delivery_webhooks_e2e.py b/tests/e2e/test_delivery_webhooks_e2e.py index f5e4b11da..36ccc8ecd 100644 --- a/tests/e2e/test_delivery_webhooks_e2e.py +++ b/tests/e2e/test_delivery_webhooks_e2e.py @@ -294,18 +294,19 @@ async def test_daily_delivery_webhook_end_to_end( if received: webhook_payload = received[0] - # # Verify webhook payload structure - # # The scheduler uses task_type="media_buy_delivery" - assert webhook_payload.get("task_type") == "media_buy_delivery" - assert webhook_payload.get("status") == "completed" + # Verify webhook payload structure (MCP webhook format) + assert webhook_payload.get("status") == "completed", f"Expected status 'completed', got {webhook_payload.get('status')}" + assert webhook_payload.get("task_id") == media_buy_id, f"Expected task_id '{media_buy_id}', got {webhook_payload.get('task_id')}" + assert "timestamp" in webhook_payload, "Missing timestamp in webhook payload" result = webhook_payload.get("result") or {} + # Verify delivery data media_buy_deliveries = result.get("media_buy_deliveries") - assert len(media_buy_deliveries) > 0 + assert media_buy_deliveries is not None, "Missing media_buy_deliveries in result" + assert len(media_buy_deliveries) > 0, "Expected at least one media_buy_delivery" assert media_buy_deliveries[0]["media_buy_id"] == media_buy_id - # # Verify scheduling metadata - assert "next_expected_at" in result - assert result.get("frequency") == "daily" - assert "sequence_number" in result + # Verify scheduling metadata + assert result.get("notification_type") == "scheduled", f"Expected notification_type 'scheduled', got {result.get('notification_type')}" + assert "next_expected_at" in result, "Missing next_expected_at in result" diff --git a/tests/integration/link_validator.py b/tests/integration/link_validator.py index 340a3428e..6d36c2ba7 100644 --- a/tests/integration/link_validator.py +++ b/tests/integration/link_validator.py @@ -162,15 +162,29 @@ def normalize_url(self, url: str, current_page: str) -> str: parsed = urlparse(url) return parsed.path - def validate_link(self, url: str) -> tuple[bool, int, str]: + def validate_link(self, url: str, visited: set[str] | None = None) -> tuple[bool, int, str]: """Validate a single link by making a HEAD request. Args: url: Normalized URL path to validate + visited: Set of already visited URLs (for cycle detection) Returns: Tuple of (is_valid, status_code, error_message) """ + # Initialize visited set on first call + if visited is None: + visited = set() + + # Detect redirect cycles + if url in visited: + # Cycle detected - consider it valid since the route exists and is accessible + # (the cycle is a routing issue, not a broken link) + return True, 302, "" + + # Mark URL as visited + visited.add(url) + try: # Don't follow redirects automatically - check where they go first response = self.client.head(url, follow_redirects=False) @@ -187,9 +201,9 @@ def validate_link(self, url: str) -> tuple[bool, int, str]: # External redirects (OAuth, etc.) are valid - route exists and is working if location.startswith(("http://", "https://", "//")): return True, response.status_code, "" - # Internal redirects - follow them to validate + # Internal redirects - follow them to validate (with cycle detection) response.close() - return self.validate_link(location) + return self.validate_link(location, visited) # Consider 200, 304 as valid # 304: Not Modified (common for cached resources) diff --git a/tests/integration/test_delivery_webhooks_force.py b/tests/integration/test_delivery_webhooks_force.py index c7b1d0f1c..3fbd0a5fd 100644 --- a/tests/integration/test_delivery_webhooks_force.py +++ b/tests/integration/test_delivery_webhooks_force.py @@ -125,9 +125,10 @@ async def fake_send_notification(*args, **kwargs): # Verify call args args, kwargs = mock_send.await_args - result = kwargs.get("result") - assert result is not None - assert result["media_buy_deliveries"][0]["media_buy_id"] == media_buy_id + payload = kwargs.get("payload") + assert payload is not None + # Extract result from the McpWebhookPayload + assert payload.result["media_buy_deliveries"][0]["media_buy_id"] == media_buy_id @pytest.mark.requires_db @@ -149,7 +150,7 @@ async def test_trigger_report_for_media_buy_public_method(integration_db): media_buy = session.scalars(select(MediaBuy).filter_by(media_buy_id=media_buy_id)).first() # 2. Call public method - result = await scheduler.trigger_report_for_media_buy(media_buy, session) + result = await scheduler.trigger_report_for_media_buy_by_id(media_buy_id, tenant_id) # 3. Verify result and call assert result is True @@ -190,6 +191,6 @@ async def test_trigger_report_fails_gracefully_no_webhook(integration_db): scheduler = DeliveryWebhookScheduler() # Call public method - result = await scheduler.trigger_report_for_media_buy(media_buy, session) + result = await scheduler.trigger_report_for_media_buy_by_id(media_buy.media_buy_id, tenant_id) assert result is False diff --git a/tests/integration/test_delivery_webhooks_integration.py b/tests/integration/test_delivery_webhooks_integration.py index e9fcf56a1..9c747d5fe 100644 --- a/tests/integration/test_delivery_webhooks_integration.py +++ b/tests/integration/test_delivery_webhooks_integration.py @@ -205,27 +205,31 @@ async def fake_send_notification(*args, **kwargs): args, kwargs = mock_send_notification.await_args - task_type = kwargs.get("task_type") - task_id = kwargs.get("task_id") - status = kwargs.get("status") + # Extract from kwargs + metadata = kwargs.get("metadata") + payload = kwargs.get("payload") push_notification_config = kwargs.get("push_notification_config") - result = kwargs.get("result") - error = kwargs.get("error") - tenant_id = kwargs.get("tenant_id") - principal_id = kwargs.get("principal_id") + + # Extract from metadata + task_type = metadata.get("task_type") + extracted_tenant_id = metadata.get("tenant_id") + extracted_principal_id = metadata.get("principal_id") + extracted_media_buy_id = metadata.get("media_buy_id") + + # Extract from payload + task_id = payload.task_id + status = payload.status + result = payload.result # Webhook should have been sent exactly once assert mock_send_notification.await_count == 1 assert task_type == "media_buy_delivery" - assert error is None - assert tenant_id == tenant_id - assert principal_id == principal_id - assert media_buy_id == media_buy_id + assert extracted_tenant_id == tenant_id + assert extracted_principal_id == principal_id + assert extracted_media_buy_id == media_buy_id assert result is not None assert result.get("notification_type") == "scheduled" - assert result.get("sequence_number") == 1 assert result.get("next_expected_at") is not None - assert result.get("frequency") == "daily" assert result.get("partial_data") is False assert result.get("unavailable_count") == 0 assert result.get("reporting_period") is not None @@ -295,7 +299,8 @@ async def fake_send_notification(*args, **kwargs): # Check payload of the delivery args, kwargs = mock_send_notification.await_args - result = kwargs.get("result") + payload = kwargs.get("payload") + result = payload.result errors = result.get("errors") assert errors is None @@ -324,7 +329,8 @@ async def fake_send_notification(*args, **kwargs): # Should send a webhook (since status=active in DB) but with empty deliveries (since dynamic status=ready) if mock_send.call_count > 0: args, kwargs = mock_send.call_args - result = kwargs.get("result") + payload = kwargs.get("payload") + result = payload.result assert len(result.get("media_buy_deliveries", [])) == 0 @@ -353,7 +359,8 @@ async def fake_send_notification(*args, **kwargs): # With current implementation, dynamic status="completed" -> filtered out of active list -> empty deliveries args, kwargs = mock_send.call_args - result = kwargs.get("result") + payload = kwargs.get("payload") + result = payload.result # Just verify result structure is valid assert result is not None diff --git a/tests/integration/test_duplicate_product_validation.py b/tests/integration/test_duplicate_product_validation.py index 8e1829c4f..5aefdad43 100644 --- a/tests/integration/test_duplicate_product_validation.py +++ b/tests/integration/test_duplicate_product_validation.py @@ -83,7 +83,7 @@ async def test_duplicate_product_in_packages_rejected(self, integration_db): end_time = start_time + timedelta(days=7) # Should return error response about duplicate products - result = await _create_media_buy_impl( + result, _ = await _create_media_buy_impl( buyer_ref="test_media_buy_duplicate", brand_manifest={"name": "Test Brand"}, packages=packages, @@ -173,7 +173,7 @@ async def test_multiple_duplicate_products_all_listed(self, integration_db): end_time = start_time + timedelta(days=7) # Should return error response listing both duplicate products - result = await _create_media_buy_impl( + result, _ = await _create_media_buy_impl( buyer_ref="test_media_buy_multiple_duplicates", brand_manifest={"name": "Test Brand"}, packages=packages, diff --git a/tests/integration/test_gam_pricing_models_integration.py b/tests/integration/test_gam_pricing_models_integration.py index 4a8927600..44aa6a58a 100644 --- a/tests/integration/test_gam_pricing_models_integration.py +++ b/tests/integration/test_gam_pricing_models_integration.py @@ -396,7 +396,7 @@ async def test_gam_cpm_guaranteed_creates_standard_line_item(setup_gam_tenant_wi testing_context={"dry_run": True, "test_session_id": "test_session"}, ) - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref=request.buyer_ref, brand_manifest=request.brand_manifest, packages=request.packages, @@ -453,7 +453,7 @@ async def test_gam_cpc_creates_price_priority_line_item_with_clicks_goal(setup_g testing_context={"dry_run": True, "test_session_id": "test_session"}, ) - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref=request.buyer_ref, brand_manifest=request.brand_manifest, packages=request.packages, @@ -511,7 +511,7 @@ async def test_gam_vcpm_creates_standard_line_item_with_viewable_impressions(set testing_context={"dry_run": True, "test_session_id": "test_session"}, ) - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref=request.buyer_ref, brand_manifest=request.brand_manifest, packages=request.packages, @@ -570,7 +570,7 @@ async def test_gam_flat_rate_calculates_cpd_correctly(setup_gam_tenant_with_all_ testing_context={"dry_run": True, "test_session_id": "test_session"}, ) - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref=request.buyer_ref, brand_manifest=request.brand_manifest, packages=request.packages, @@ -640,7 +640,7 @@ async def test_gam_multi_package_mixed_pricing_models(setup_gam_tenant_with_all_ testing_context={"dry_run": True, "test_session_id": "test_session"}, ) - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref=request.buyer_ref, brand_manifest=request.brand_manifest, packages=request.packages, diff --git a/tests/integration/test_gam_pricing_restriction.py b/tests/integration/test_gam_pricing_restriction.py index 162f4d6ea..775c43e56 100644 --- a/tests/integration/test_gam_pricing_restriction.py +++ b/tests/integration/test_gam_pricing_restriction.py @@ -299,7 +299,7 @@ async def test_gam_rejects_cpcv_pricing_model(setup_gam_tenant_with_non_cpm_prod from src.core.tools.media_buy_create import _create_media_buy_impl # GAM adapter rejects unsupported pricing models by returning CreateMediaBuyError - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref=request.buyer_ref, brand_manifest=request.brand_manifest, packages=request.packages, @@ -349,7 +349,7 @@ async def test_gam_accepts_cpm_pricing_model(setup_gam_tenant_with_non_cpm_produ ) # This should succeed - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref=request.buyer_ref, brand_manifest=request.brand_manifest, packages=request.packages, @@ -402,7 +402,7 @@ async def test_gam_rejects_cpp_from_multi_pricing_product(setup_gam_tenant_with_ from src.core.schemas import CreateMediaBuyError # GAM adapter rejects unsupported pricing models by returning CreateMediaBuyError - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref=request.buyer_ref, brand_manifest=request.brand_manifest, packages=request.packages, @@ -452,7 +452,7 @@ async def test_gam_accepts_cpm_from_multi_pricing_product(setup_gam_tenant_with_ ) # This should succeed - buyer chose CPM from multi-option product - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref=request.buyer_ref, brand_manifest=request.brand_manifest, packages=request.packages, diff --git a/tests/integration/test_pricing_models_integration.py b/tests/integration/test_pricing_models_integration.py index 3f106913c..ce1a8dc7b 100644 --- a/tests/integration/test_pricing_models_integration.py +++ b/tests/integration/test_pricing_models_integration.py @@ -297,7 +297,7 @@ async def test_create_media_buy_with_cpm_fixed_pricing(setup_tenant_with_pricing testing_context={"dry_run": True, "test_session_id": "test_session"}, ) - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref=request.buyer_ref, brand_manifest=request.brand_manifest, packages=request.packages, @@ -343,7 +343,7 @@ async def test_create_media_buy_with_cpm_auction_pricing(setup_tenant_with_prici testing_context={"dry_run": True, "test_session_id": "test_session"}, ) - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref=request.buyer_ref, brand_manifest=request.brand_manifest, packages=request.packages, @@ -390,7 +390,7 @@ async def test_create_media_buy_auction_bid_below_floor_fails(setup_tenant_with_ ) # AdCP 2.4 spec: Errors are returned in response.errors, not raised as exceptions - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref=request.buyer_ref, brand_manifest=request.brand_manifest, packages=request.packages, @@ -433,7 +433,7 @@ async def test_create_media_buy_with_cpcv_pricing(setup_tenant_with_pricing_prod testing_context={"dry_run": True, "test_session_id": "test_session"}, ) - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref=request.buyer_ref, brand_manifest=request.brand_manifest, packages=request.packages, @@ -479,7 +479,7 @@ async def test_create_media_buy_below_min_spend_fails(setup_tenant_with_pricing_ ) # AdCP 2.4 spec: Errors are returned in response.errors, not raised as exceptions - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref=request.buyer_ref, brand_manifest=request.brand_manifest, packages=request.packages, @@ -522,7 +522,7 @@ async def test_create_media_buy_multi_pricing_choose_cpp(setup_tenant_with_prici testing_context={"dry_run": True, "test_session_id": "test_session"}, ) - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref=request.buyer_ref, brand_manifest=request.brand_manifest, packages=request.packages, @@ -568,7 +568,7 @@ async def test_create_media_buy_invalid_pricing_model_fails(setup_tenant_with_pr ) # AdCP 2.4 spec: Errors are returned in response.errors, not raised as exceptions - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref=request.buyer_ref, brand_manifest=request.brand_manifest, packages=request.packages, diff --git a/tests/integration_v2/conftest.py b/tests/integration_v2/conftest.py index 7726c3cc5..233d4efe6 100644 --- a/tests/integration_v2/conftest.py +++ b/tests/integration_v2/conftest.py @@ -192,6 +192,7 @@ def sample_tenant(integration_db): auth_setup_mode=False, # Disable setup mode for production-ready auth # Required: Access control configuration authorized_emails=["test@example.com"], + human_review_required=False, # Auto-approve media buys for testing (no manual approval) ) session.add(tenant) diff --git a/tests/integration_v2/test_a2a_skill_invocation.py b/tests/integration_v2/test_a2a_skill_invocation.py index 9d5c927e9..8df09ce2c 100644 --- a/tests/integration_v2/test_a2a_skill_invocation.py +++ b/tests/integration_v2/test_a2a_skill_invocation.py @@ -10,7 +10,7 @@ from unittest.mock import MagicMock, patch import pytest -from a2a.types import DataPart, Message, MessageSendParams, Part, Role, Task, TaskStatus +from a2a.types import DataPart, Message, MessageSendParams, Part, Role, Task, TaskState, TaskStatus from src.a2a_server.adcp_a2a_server import AdCPRequestHandler from tests.utils.a2a_helpers import create_a2a_message_with_skill, create_a2a_text_message @@ -406,6 +406,62 @@ async def test_explicit_skill_create_media_buy( assert "packages" in artifact_data assert isinstance(artifact_data["packages"], list) + @pytest.mark.asyncio + async def test_explicit_skill_create_media_buy_manual_approval( + self, handler, sample_tenant, sample_principal, sample_products, validator + ): + """Test create_media_buy returns status=submitted when manual approval required.""" + # Update tenant to require manual approval + from src.core.database.database_session import get_db_session + from src.core.database.models import Tenant + + with get_db_session() as session: + tenant = session.get(Tenant, sample_tenant["tenant_id"]) + tenant.human_review_required = True + session.commit() + + # Mock authentication token + handler._get_auth_token = MagicMock(return_value=sample_principal["access_token"]) + + # Mock tenant detection + with (patch("src.a2a_server.adcp_a2a_server.get_principal_from_token") as mock_get_principal,): + mock_get_principal.return_value = sample_principal["principal_id"] + from src.a2a_server import adcp_a2a_server + + adcp_a2a_server._request_headers.set({"host": f"{sample_tenant['subdomain']}.example.com"}) + + # Create explicit skill invocation message + from datetime import UTC, datetime, timedelta + + start_date = datetime.now(UTC) + timedelta(days=1) + end_date = start_date + timedelta(days=30) + + skill_params = { + "brand_manifest": {"name": "Test Campaign"}, + "packages": [ + { + "buyer_ref": f"pkg_{sample_products[0]}", + "product_id": sample_products[0], + "budget": 10000.0, + "pricing_option_id": "cpm_usd_fixed", + } + ], + "budget": {"total": 10000.0, "currency": "USD"}, + "start_time": start_date.isoformat(), + "end_time": end_date.isoformat(), + } + message = create_a2a_message_with_skill("create_media_buy", skill_params) + params = MessageSendParams(message=message) + + # Process the message + result = await handler.on_message_send(params) + + # Verify the result has status=submitted (manual approval required) + assert isinstance(result, Task) + assert result.status.state == TaskState.submitted + # Per A2A spec, tasks requiring approval should not have artifacts until approved + assert result.artifacts is None + @pytest.mark.asyncio async def test_hybrid_invocation(self, handler, sample_tenant, sample_principal, sample_products, validator): """Test hybrid invocation with both text and skill.""" diff --git a/tests/integration_v2/test_create_media_buy_v24.py b/tests/integration_v2/test_create_media_buy_v24.py index 4a503bcc8..af2854037 100644 --- a/tests/integration_v2/test_create_media_buy_v24.py +++ b/tests/integration_v2/test_create_media_buy_v24.py @@ -239,7 +239,7 @@ async def test_create_media_buy_with_package_budget_mcp(self, setup_test_tenant) # Call _impl with individual parameters (not a request object) # This exercises the FULL serialization path including response_packages construction # NOTE: budget is at package level per AdCP v2.4 spec (not a top-level parameter) - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref="test_buyer_v24", # REQUIRED per AdCP v2.2.0 brand_manifest={"name": "Nike Air Jordan 2025 basketball shoes"}, packages=[p.model_dump() for p in packages], @@ -305,7 +305,7 @@ async def test_create_media_buy_with_targeting_overlay_mcp(self, setup_test_tena context = MagicMock() context.headers = {"x-adcp-auth": "test_token_v24"} - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref="test_buyer_v24_targeting", # REQUIRED per AdCP v2.2.0 brand_manifest={"name": "Adidas UltraBoost 2025 running shoes"}, packages=[p.model_dump() for p in packages], @@ -381,7 +381,7 @@ async def test_create_media_buy_multiple_packages_with_budgets_mcp(self, setup_t # Total budget is sum of all package budgets total_budget_value = sum(pkg.budget for pkg in packages) - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref="test_buyer_v24_multi", # REQUIRED per AdCP v2.2.0 brand_manifest={"name": "Puma RS-X 2025 training shoes"}, packages=[p.model_dump() for p in packages], @@ -425,7 +425,7 @@ async def test_create_media_buy_with_package_budget_a2a(self, setup_test_tenant) context = MagicMock() context.headers = {"x-adcp-auth": "test_token_v24"} - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref="test_buyer_v24_a2a", # REQUIRED per AdCP v2.2.0 brand_manifest={"name": "Reebok Nano 2025 cross-training shoes"}, packages=[p.model_dump() for p in packages], @@ -467,7 +467,7 @@ async def test_create_media_buy_with_minimal_package(self, setup_test_tenant): # Standard AdCP format with explicit package # pricing_option_id format: {model}_{currency}_{fixed|auction} - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref="test_buyer_v24_standard", brand_manifest={"name": "Under Armour HOVR 2025 running shoes"}, packages=[ diff --git a/tests/integration_v2/test_creative_lifecycle_mcp.py b/tests/integration_v2/test_creative_lifecycle_mcp.py index 3b6084462..e377d4f3a 100644 --- a/tests/integration_v2/test_creative_lifecycle_mcp.py +++ b/tests/integration_v2/test_creative_lifecycle_mcp.py @@ -999,10 +999,8 @@ def test_validate_creatives_missing_required_fields(self, mock_context): # Missing URL - only has dimensions "width": 300, "height": 250, - }, - "click_url": { - "url": "https://example.com/landing" } + # Removed click_url so fallback logic has no URL to find } }, ) @@ -1182,8 +1180,8 @@ async def test_create_media_buy_with_creative_ids(self, mock_context, sample_cre # Verify response (domain response doesn't have status field) # Note: media_buy_id may be transformed by naming template (e.g., "buy_PO-TEST-123") - assert response.media_buy_id # Just verify it exists - actual_media_buy_id = response.media_buy_id + assert response["media_buy_id"] # Just verify it exists + actual_media_buy_id = response["media_buy_id"] # Protocol envelope adds status field - domain response just has media_buy_id # Verify creative assignments were created in database diff --git a/tests/integration_v2/test_error_paths.py b/tests/integration_v2/test_error_paths.py index 7ab797faa..ac32a3c50 100644 --- a/tests/integration_v2/test_error_paths.py +++ b/tests/integration_v2/test_error_paths.py @@ -32,7 +32,7 @@ from src.core.database.models import Principal as ModelPrincipal from src.core.database.models import Product as ModelProduct from src.core.database.models import Tenant as ModelTenant -from src.core.schemas import CreateMediaBuyError, CreateMediaBuyResponse, Error +from src.core.schemas import CreateMediaBuyError, CreateMediaBuyResponse, CreateMediaBuySuccess, Error from src.core.tool_context import ToolContext from src.core.tools import create_media_buy_raw, list_creatives_raw, sync_creatives_raw from tests.helpers.adcp_factories import create_test_package_request_dict @@ -160,7 +160,7 @@ async def test_missing_principal_returns_authentication_error(self, test_tenant_ future_end = future_start + timedelta(days=7) # This should return error response, not raise NameError - response = await create_media_buy_raw( + response_dict = await create_media_buy_raw( po_number="error_test_po", brand_manifest={"name": "Test campaign"}, buyer_ref="test_buyer", @@ -179,9 +179,19 @@ async def test_missing_principal_returns_authentication_error(self, test_tenant_ ctx=context, ) + # Validate and convert dict to Pydantic model + # This will raise ValidationError if the dict doesn't match the schema + status = response_dict.pop("status") # Extract status field (not part of the model) + + # Determine which model to construct based on presence of errors + if "errors" in response_dict and response_dict.get("errors"): + # Will raise ValidationError if response_dict doesn't match CreateMediaBuyError schema + response = CreateMediaBuyError.model_validate(response_dict) + else: + # Will raise ValidationError if response_dict doesn't match CreateMediaBuySuccess schema + response = CreateMediaBuySuccess.model_validate(response_dict) + # Verify response structure - error cases return CreateMediaBuyError - assert isinstance(response, (CreateMediaBuyResponse, CreateMediaBuyError)) - # CreateMediaBuyError is a discriminated union member of CreateMediaBuyResponse assert isinstance(response, CreateMediaBuyError) assert response.errors is not None assert len(response.errors) > 0 @@ -212,7 +222,7 @@ async def test_start_time_in_past_returns_validation_error(self, test_tenant_wit past_end = past_start + timedelta(days=7) # This should return error response for past start time - response = await create_media_buy_raw( + response_dict = await create_media_buy_raw( po_number="error_test_po", brand_manifest={"name": "Test campaign"}, buyer_ref="test_buyer", @@ -231,8 +241,15 @@ async def test_start_time_in_past_returns_validation_error(self, test_tenant_wit ctx=context, ) + # Validate and convert dict to Pydantic model + status = response_dict.pop("status") + + if "errors" in response_dict and response_dict.get("errors"): + response = CreateMediaBuyError.model_validate(response_dict) + else: + response = CreateMediaBuySuccess.model_validate(response_dict) + # Verify response structure - error cases return CreateMediaBuyError - assert isinstance(response, (CreateMediaBuyResponse, CreateMediaBuyError)) assert isinstance(response, CreateMediaBuyError) assert response.errors is not None assert len(response.errors) > 0 @@ -258,7 +275,7 @@ async def test_end_time_before_start_returns_validation_error(self, test_tenant_ start = datetime.now(UTC) + timedelta(days=7) end = start - timedelta(days=1) # Before start! - response = await create_media_buy_raw( + response_dict = await create_media_buy_raw( po_number="error_test_po", brand_manifest={"name": "Test campaign"}, buyer_ref="test_buyer", @@ -276,8 +293,15 @@ async def test_end_time_before_start_returns_validation_error(self, test_tenant_ ctx=context, ) + # Validate and convert dict to Pydantic model + status = response_dict.pop("status") + + if "errors" in response_dict and response_dict.get("errors"): + response = CreateMediaBuyError.model_validate(response_dict) + else: + response = CreateMediaBuySuccess.model_validate(response_dict) + # Verify response structure - error cases return CreateMediaBuyError - assert isinstance(response, (CreateMediaBuyResponse, CreateMediaBuyError)) assert isinstance(response, CreateMediaBuyError) assert response.errors is not None assert len(response.errors) > 0 @@ -341,7 +365,7 @@ async def test_missing_packages_returns_validation_error(self, test_tenant_with_ future_start = datetime.now(UTC) + timedelta(days=1) future_end = future_start + timedelta(days=7) - response = await create_media_buy_raw( + response_dict = await create_media_buy_raw( po_number="error_test_po", brand_manifest={"name": "Test campaign"}, buyer_ref="test_buyer", @@ -352,8 +376,15 @@ async def test_missing_packages_returns_validation_error(self, test_tenant_with_ ctx=context, ) + # Validate and convert dict to Pydantic model + status = response_dict.pop("status") + + if "errors" in response_dict and response_dict.get("errors"): + response = CreateMediaBuyError.model_validate(response_dict) + else: + response = CreateMediaBuySuccess.model_validate(response_dict) + # Verify response structure - error cases return CreateMediaBuyError - assert isinstance(response, (CreateMediaBuyResponse, CreateMediaBuyError)) assert isinstance(response, CreateMediaBuyError) assert response.errors is not None assert len(response.errors) > 0 diff --git a/tests/integration_v2/test_minimum_spend_validation.py b/tests/integration_v2/test_minimum_spend_validation.py index 496f6ca62..ca9959bf1 100644 --- a/tests/integration_v2/test_minimum_spend_validation.py +++ b/tests/integration_v2/test_minimum_spend_validation.py @@ -304,7 +304,7 @@ async def test_currency_minimum_spend_enforced(self, setup_test_data): end_time = start_time + timedelta(days=7) # Should fail validation and return errors in response - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref="minspend_test_1", brand_manifest={"name": "Test Campaign"}, packages=[ @@ -339,7 +339,7 @@ async def test_product_override_enforced(self, setup_test_data): # Try to create media buy below product override ($5000) # Should fail validation and return errors in response - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref="minspend_test_2", brand_manifest={"name": "Test Campaign"}, packages=[ @@ -374,7 +374,7 @@ async def test_lower_override_allows_smaller_spend(self, setup_test_data): # Create media buy above product minimum ($500) but below currency limit ($1000) # Should succeed because product override is lower - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref="minspend_test_3", brand_manifest={"name": "Test Campaign"}, packages=[ @@ -405,7 +405,7 @@ async def test_minimum_spend_met_success(self, setup_test_data): end_time = start_time + timedelta(days=7) # Create media buy above minimum - should succeed - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref="minspend_test_4", brand_manifest={"name": "Test Campaign"}, packages=[ @@ -470,7 +470,7 @@ async def test_different_currency_different_minimum(self, setup_test_data): # $800 should fail (below $1000 USD minimum) # Should fail validation and return errors in response - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref="minspend_test_6", brand_manifest={"name": "Test Campaign"}, packages=[ @@ -515,7 +515,7 @@ async def test_no_minimum_when_not_set(self, setup_test_data): end_time = start_time + timedelta(days=7) # Create media buy with low budget in GBP (should succeed - no minimum) - response = await _create_media_buy_impl( + response, _ = await _create_media_buy_impl( buyer_ref="minspend_test_7", brand_manifest={"name": "Test Campaign"}, packages=[ diff --git a/uv.lock b/uv.lock index ddc2731ac..1b9763b41 100644 --- a/uv.lock +++ b/uv.lock @@ -62,7 +62,7 @@ http-server = [ [[package]] name = "adcp" -version = "2.16.0" +version = "2.17.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "a2a-sdk" }, @@ -72,9 +72,9 @@ dependencies = [ { name = "pydantic" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/ef/7b/93cf41e1c83895f600073265a2760b47bb8800304fff3c2161f62dd57b3d/adcp-2.16.0.tar.gz", hash = "sha256:d96e1b51436fdeba937ea0e989463d2c1aaea7c5b7bdc694883cfe2f460e722a", size = 184525, upload-time = "2025-12-21T03:50:58.176Z" } +sdist = { url = "https://files.pythonhosted.org/packages/0c/83/caecbf598d00f3e531da08f132979bf38643c890e2f0207f7fec4bfd925e/adcp-2.17.0.tar.gz", hash = "sha256:031cf1ff8addf88470b5bf29048da2ea27c551f559cddd487f9424bfa36520e6", size = 186056, upload-time = "2025-12-30T10:54:18.916Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/73/75/b1bc441f82e5dcffd23fc79138527998dc7827414d2a43e941de4e3c6628/adcp-2.16.0-py3-none-any.whl", hash = "sha256:6aaa5953c583cf6fabe2e4a6954283ca1d604e0f7c5b3514085b6bfdeb41871c", size = 217014, upload-time = "2025-12-21T03:50:56.693Z" }, + { url = "https://files.pythonhosted.org/packages/6d/fb/197e4c14be174e14e9240f6bf47596e3c81c13ce1c54f41d4c5738e36b2e/adcp-2.17.0-py3-none-any.whl", hash = "sha256:ca000717cf573552fe0d178a1dd148bcdcd1c3d432c56b22000e1d8251478a9f", size = 217879, upload-time = "2025-12-30T10:54:17.508Z" }, ] [[package]] @@ -168,7 +168,7 @@ dev = [ requires-dist = [ { name = "a2a-cli", specifier = ">=0.2.0" }, { name = "a2a-sdk", extras = ["http-server"], specifier = ">=0.3.19" }, - { name = "adcp", specifier = "==2.16.0" }, + { name = "adcp", specifier = "==2.17.0" }, { name = "aiohttp", specifier = ">=3.9.0" }, { name = "alembic", specifier = ">=1.13.0" }, { name = "allure-pytest", marker = "extra == 'ui-tests'", specifier = "==2.13.5" },