|
15 | 15 | from functools import wraps |
16 | 16 | from typing import Any, Callable, Dict, List, Optional, Union |
17 | 17 |
|
| 18 | +import polars as pl |
18 | 19 | from pydantic import BaseModel, ConfigDict |
19 | 20 | from typing_extensions import Annotated, Literal |
20 | 21 |
|
@@ -48,7 +49,8 @@ class MCPResultSet(BaseModel): |
48 | 49 |
|
49 | 50 | table_schema: Optional[List[Dict[str, Any]]] |
50 | 51 | rows: Union[List[Dict[str, Any]], str] |
51 | | - row_count: int |
| 52 | + returned_result_count: int |
| 53 | + total_result_count: int |
52 | 54 |
|
53 | 55 | MCPTransport = Literal["http", "stdio"] |
54 | 56 |
|
@@ -135,6 +137,24 @@ def http_app(self, **kwargs): |
135 | 137 | """Create a Starlette ASGI app for the MCP server.""" |
136 | 138 | return self.mcp.http_app(**kwargs) |
137 | 139 |
|
| 140 | + def _handle_result_set(self, pl_df: pl.DataFrame, effective_limit: Optional[int], table_format: TableFormat) -> MCPResultSet: |
| 141 | + """Handle the result set from a logical plan.""" |
| 142 | + original_result_count = len(pl_df) |
| 143 | + if effective_limit and original_result_count > effective_limit: |
| 144 | + pl_df = pl_df.limit(effective_limit) |
| 145 | + rows_list = pl_df.to_dicts() |
| 146 | + schema_fields = [{"name": name, "type": str(dtype)} for name, dtype in pl_df.schema.items()] |
| 147 | + result_set = MCPResultSet( |
| 148 | + table_schema=schema_fields, |
| 149 | + rows=rows_list, |
| 150 | + returned_result_count=len(rows_list), |
| 151 | + total_result_count=original_result_count, |
| 152 | + ) |
| 153 | + if table_format == "markdown": |
| 154 | + result_set.rows = _render_markdown_preview(rows_list) |
| 155 | + result_set.table_schema = None |
| 156 | + return result_set |
| 157 | + |
138 | 158 | def _build_parameterized_tool(self, tool: ParameterizedToolDefinition): |
139 | 159 | """Build a keyword-argument tool function with per-field schema for FastMCP. |
140 | 160 |
|
@@ -162,24 +182,13 @@ async def tool_fn_wrapper(*args, **kwargs) -> MCPResultSet: |
162 | 182 | bound_plan = bind_parameters(tool._parameterized_view, payload, tool.params) |
163 | 183 | async with self._collect_semaphore: |
164 | 184 | pl_df, metrics = await asyncio.to_thread( |
165 | | - lambda: self.session_state.execution.collect(bound_plan, n=effective_limit) |
| 185 | + lambda: self.session_state.execution.collect(bound_plan) |
166 | 186 | ) |
167 | 187 | logger.info(f"Completed query for {tool.name}") |
168 | 188 | logger.info(metrics.get_summary()) |
169 | 189 | logger.debug(f"Query Details: {params_obj.model_dump_json()}") |
170 | 190 |
|
171 | | - rows_list = pl_df.to_dicts() |
172 | | - schema_fields = [{"name": name, "type": str(dtype)} for name, dtype in pl_df.schema.items()] |
173 | | - result_set = MCPResultSet( |
174 | | - table_schema=schema_fields, |
175 | | - rows=rows_list, |
176 | | - row_count=len(rows_list), |
177 | | - ) |
178 | | - if table_format == "markdown": |
179 | | - result_set.rows = _render_markdown_preview(rows_list) |
180 | | - result_set.table_schema = None |
181 | | - |
182 | | - return result_set |
| 191 | + return self._handle_result_set(pl_df, effective_limit, table_format) |
183 | 192 | except Exception as e: |
184 | 193 | from fastmcp.exceptions import ToolError |
185 | 194 | raise ToolError(f"Fenic server failed to execute tool {tool.name}. Underlying error: {e}") from e |
@@ -263,19 +272,13 @@ async def wrapper(*args, **kwargs) -> MCPResultSet: |
263 | 272 | # collections with a semaphore to protect the backend executor. |
264 | 273 | async with self._collect_semaphore: |
265 | 274 | pl_df, metrics = await asyncio.to_thread( |
266 | | - lambda: self.session_state.execution.collect(bound_plan, n=effective_limit) |
| 275 | + lambda: self.session_state.execution.collect(bound_plan) |
267 | 276 | ) |
268 | 277 | logger.info(f"Completed query for {tool.name}") |
269 | 278 | logger.info(metrics.get_summary()) |
270 | 279 | logger.debug(f"Query Details: {args if args else kwargs}") |
271 | | - rows_list = pl_df.to_dicts() |
272 | | - schema_fields = [{"name": name, "type": str(dtype)} for name, dtype in pl_df.schema.items()] |
273 | | - out = MCPResultSet(table_schema=schema_fields, rows=rows_list, row_count=len(rows_list)) |
274 | | - if table_format == "markdown": |
275 | | - out.rows = _render_markdown_preview(rows_list) |
276 | | - out.table_schema = None |
277 | | - |
278 | | - return out |
| 280 | + |
| 281 | + return self._handle_result_set(pl_df, effective_limit, table_format) |
279 | 282 | except Exception as e: |
280 | 283 | from fastmcp.exceptions import ToolError |
281 | 284 | raise ToolError(f"Fenic server failed to execute tool {tool.name}. Underlying error: {e}") from e |
|
0 commit comments