diff --git a/docsrc/Connecting_and_queries.rst b/docsrc/Connecting_and_queries.rst index 446f43b31a..c5913e3deb 100644 --- a/docsrc/Connecting_and_queries.rst +++ b/docsrc/Connecting_and_queries.rst @@ -196,14 +196,14 @@ To get started, follow the steps below: ) as connection: # Create a cursor cursor = connection.cursor() - + # Execute a simple test query cursor.execute("SELECT 1") .. note:: - Firebolt Core is assumed to be running locally on the default port (3473). For instructions - on how to run Firebolt Core locally using Docker, refer to the + Firebolt Core is assumed to be running locally on the default port (3473). For instructions + on how to run Firebolt Core locally using Docker, refer to the `official docs `_. @@ -404,7 +404,7 @@ parameters equal in length to the number of placeholders in the statement. "INSERT INTO test_table2 VALUES ($1, $2, $3)", (2, "world", "2018-01-02"), ) - + # paramstyle only needs to be set once, it will be used for all subsequent queries cursor.execute( @@ -437,6 +437,58 @@ as values in the second argument. cursor.close() +Bulk insert for improved performance +-------------------------------------- + +For inserting large amounts of data more efficiently, you can use the ``bulk_insert`` parameter +with ``executemany()``. This concatenates multiple INSERT statements into a single batch request, +which can significantly improve performance when inserting many rows. + +**Note:** The ``bulk_insert`` parameter only works with INSERT statements and supports both +``fb_numeric`` and ``qmark`` parameter styles. Using it with other statement types will +raise an error. + +**Example with QMARK parameter style (default):** + +:: + + # Using the default qmark parameter style + cursor.executemany( + "INSERT INTO test_table VALUES (?, ?, ?)", + ( + (1, "apple", "2019-01-01"), + (2, "banana", "2020-01-01"), + (3, "carrot", "2021-01-01"), + (4, "donut", "2022-01-01"), + (5, "eggplant", "2023-01-01") + ), + bulk_insert=True # Enable bulk insert for better performance + ) + +**Example with FB_NUMERIC parameter style:** + +:: + + import firebolt.db + # Set paramstyle to "fb_numeric" for server-side parameter substitution + firebolt.db.paramstyle = "fb_numeric" + + cursor.executemany( + "INSERT INTO test_table VALUES ($1, $2, $3)", + ( + (1, "apple", "2019-01-01"), + (2, "banana", "2020-01-01"), + (3, "carrot", "2021-01-01"), + (4, "donut", "2022-01-01"), + (5, "eggplant", "2023-01-01") + ), + bulk_insert=True # Enable bulk insert for better performance + ) + +When ``bulk_insert=True``, the SDK concatenates all INSERT statements into a single batch +and sends them to the server for optimized batch processing. + + Setting session parameters -------------------------------------- @@ -731,7 +783,7 @@ of execute_async is -1, which is the rowcount for queries where it's not applica cursor.execute_async("INSERT INTO my_table VALUES (5, 'egg', '2022-01-01')") token = cursor.async_query_token -Trying to access `async_query_token` before calling `execute_async` will raise an exception. +Trying to access `async_query_token` before calling `execute_async` will raise an exception. .. note:: Multiple-statement queries are not supported for asynchronous queries. However, you can run each statement @@ -746,9 +798,9 @@ Monitoring the query status To check the async query status you need to retrieve the token of the query. The token is a unique identifier for the query and can be used to fetch the query status. You can store this token outside of the current process and use it later to check the query status. :ref:`Connection ` object -has two methods to check the query status: :py:meth:`firebolt.db.connection.Connection.is_async_query_running` and -:py:meth:`firebolt.db.connection.Connection.is_async_query_successful`.`is_async_query_running` will return True -if the query is still running, and False otherwise. `is_async_query_successful` will return True if the query +has two methods to check the query status: :py:meth:`firebolt.db.connection.Connection.is_async_query_running` and +:py:meth:`firebolt.db.connection.Connection.is_async_query_successful`.`is_async_query_running` will return True +if the query is still running, and False otherwise. `is_async_query_successful` will return True if the query has finished successfully, None if query is still running and False if the query has failed. :: @@ -779,7 +831,7 @@ will send a cancel request to the server and the query will be stopped. token = cursor.async_query_token connection.cancel_async_query(token) - + # Verify that the query was cancelled running = connection.is_async_query_running(token) print(running) # False diff --git a/src/firebolt/async_db/cursor.py b/src/firebolt/async_db/cursor.py index a03b1eecca..cf572107a3 100644 --- a/src/firebolt/async_db/cursor.py +++ b/src/firebolt/async_db/cursor.py @@ -218,9 +218,11 @@ async def _do_execute( timeout: Optional[float] = None, async_execution: bool = False, streaming: bool = False, + bulk_insert: bool = False, ) -> None: await self._close_rowset_and_reset() self._row_set = StreamingAsyncRowSet() if streaming else InMemoryAsyncRowSet() + # Import paramstyle from module level from firebolt.async_db import paramstyle @@ -230,7 +232,12 @@ async def _do_execute( ) plan = statement_planner.create_execution_plan( - raw_query, parameters, skip_parsing, async_execution, streaming + raw_query, + parameters, + skip_parsing, + async_execution, + streaming, + bulk_insert, ) await self._execute_plan(plan, timeout) self._state = CursorState.DONE @@ -385,6 +392,7 @@ async def executemany( query: str, parameters_seq: Sequence[Sequence[ParameterType]], timeout_seconds: Optional[float] = None, + bulk_insert: bool = False, ) -> Union[int, str]: """Prepare and execute a database query. @@ -402,6 +410,9 @@ async def executemany( `SET param=value` statement before it. All parameters are stored in cursor object until it's closed. They can also be removed with `flush_parameters` method call. + Bulk insert: When bulk_insert=True, multiple INSERT queries are + concatenated and sent as a single batch for improved performance. + Only supported for INSERT statements. Args: query (str): SQL query to execute. @@ -410,11 +421,15 @@ async def executemany( query with actual values from each set in a sequence. Resulting queries for each subset are executed sequentially. timeout_seconds (Optional[float]): Query execution timeout in seconds. + bulk_insert (bool): When True, concatenates multiple INSERT queries + into a single batch request. Only supported for INSERT statements. Returns: int: Query row count. """ - await self._do_execute(query, parameters_seq, timeout=timeout_seconds) + await self._do_execute( + query, parameters_seq, timeout=timeout_seconds, bulk_insert=bulk_insert + ) return self.rowcount @check_not_closed diff --git a/src/firebolt/common/cursor/statement_planners.py b/src/firebolt/common/cursor/statement_planners.py index a24d5f92b2..162dacf985 100644 --- a/src/firebolt/common/cursor/statement_planners.py +++ b/src/firebolt/common/cursor/statement_planners.py @@ -12,7 +12,11 @@ JSON_LINES_OUTPUT_FORMAT, JSON_OUTPUT_FORMAT, ) -from firebolt.utils.exception import FireboltError, ProgrammingError +from firebolt.utils.exception import ( + ConfigurationError, + FireboltError, + ProgrammingError, +) if TYPE_CHECKING: from firebolt.common.statement_formatter import StatementFormatter @@ -36,7 +40,6 @@ def __init__(self, formatter: StatementFormatter) -> None: """Initialize statement planner with required dependencies.""" self.formatter = formatter - @abstractmethod def create_execution_plan( self, raw_query: str, @@ -44,8 +47,86 @@ def create_execution_plan( skip_parsing: bool = False, async_execution: bool = False, streaming: bool = False, + bulk_insert: bool = False, + ) -> ExecutionPlan: + """Create an execution plan for a given statement and parameters. + + This method serves as a factory for creating an execution plan, which + encapsulates the queries to be executed and the parameters for execution. + It supports standard execution, as well as bulk insert, which is handled + by a separate method. + + Args: + raw_query (str): The raw SQL query to be executed. + parameters (Sequence[Sequence[ParameterType]]): A sequence of parameter + sequences for the query. + skip_parsing (bool): If True, the query will not be parsed, and all + special features (e.g., multi-statement, parameterized queries) will + be disabled. Defaults to False. + async_execution (bool): If True, the query will be executed + asynchronously. Defaults to False. + streaming (bool): If True, the query results will be streamed. + Defaults to False. + bulk_insert (bool): If True, the query will be treated as a bulk insert + operation. Defaults to False. + + Returns: + ExecutionPlan: An object representing the execution plan. + """ + if bulk_insert: + return self._create_bulk_execution_plan( + raw_query, parameters, async_execution + ) + else: + return self._create_standard_execution_plan( + raw_query, parameters, skip_parsing, async_execution, streaming + ) + + def _validate_bulk_insert_query(self, query: str) -> None: + """Validate that query is an INSERT statement for bulk_insert.""" + query_normalized = query.lstrip().lower() + if not query_normalized.startswith("insert"): + raise ConfigurationError( + "bulk_insert is only supported for INSERT statements" + ) + if ";" in query.strip().rstrip(";"): + raise ProgrammingError( + "bulk_insert does not support multi-statement queries" + ) + + def _create_bulk_execution_plan( + self, + raw_query: str, + parameters: Sequence[Sequence[ParameterType]], + async_execution: bool, ) -> ExecutionPlan: - """Create an execution plan for the given statement and parameters.""" + """Create bulk execution plan using formatter logic.""" + # Validate bulk_insert requirements + self._validate_bulk_insert_query(raw_query) + if not parameters: + raise ProgrammingError("bulk_insert requires at least one parameter set") + + return self._create_bulk_plan_impl(raw_query, parameters, async_execution) + + @abstractmethod + def _create_standard_execution_plan( + self, + raw_query: str, + parameters: Sequence[Sequence[ParameterType]], + skip_parsing: bool, + async_execution: bool, + streaming: bool, + ) -> ExecutionPlan: + """Create standard (non-bulk) execution plan.""" + + @abstractmethod + def _create_bulk_plan_impl( + self, + raw_query: str, + parameters: Sequence[Sequence[ParameterType]], + async_execution: bool, + ) -> ExecutionPlan: + """Create parameter-style specific bulk execution plan.""" @staticmethod def _get_output_format(streaming: bool) -> str: @@ -58,18 +139,19 @@ def _get_output_format(streaming: bool) -> str: class FbNumericStatementPlanner(BaseStatementPlanner): """Statement planner for fb_numeric parameter style.""" - def create_execution_plan( + def _create_standard_execution_plan( self, raw_query: str, parameters: Sequence[Sequence[ParameterType]], - skip_parsing: bool = False, - async_execution: bool = False, - streaming: bool = False, + skip_parsing: bool, + async_execution: bool, + streaming: bool, ) -> ExecutionPlan: """Create execution plan for fb_numeric parameter style.""" query_params = self._build_fb_numeric_query_params( parameters, streaming, async_execution ) + return ExecutionPlan( queries=[raw_query], query_params=query_params, @@ -78,11 +160,37 @@ def create_execution_plan( streaming=streaming, ) + def _create_bulk_plan_impl( + self, + raw_query: str, + parameters: Sequence[Sequence[ParameterType]], + async_execution: bool, + ) -> ExecutionPlan: + """Create bulk insert execution plan for fb_numeric parameter style.""" + # Prepare bulk insert query and parameters for fb_numeric + processed_query, processed_params = self._prepare_fb_numeric_bulk_insert( + raw_query, parameters + ) + + # Build query parameters for bulk insert + query_params = self._build_fb_numeric_query_params( + processed_params, streaming=False, async_execution=async_execution + ) + + return ExecutionPlan( + queries=[processed_query], + query_params=query_params, + is_multi_statement=False, + async_execution=async_execution, + streaming=False, + ) + def _build_fb_numeric_query_params( self, parameters: Sequence[Sequence[ParameterType]], streaming: bool, async_execution: bool, + extra_params: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Build query parameters for fb_numeric style.""" param_list = parameters[0] if parameters else [] @@ -101,19 +209,45 @@ def _build_fb_numeric_query_params( query_params["query_parameters"] = json.dumps(query_parameters) if async_execution: query_params["async"] = True + if extra_params: + query_params.update(extra_params) return query_params + def _prepare_fb_numeric_bulk_insert( + self, query: str, parameters_seq: Sequence[Sequence[ParameterType]] + ) -> tuple[str, Sequence[Sequence[ParameterType]]]: + """Prepare multiple INSERT queries as a single batch for fb_numeric style.""" + # For bulk insert, we need to create unique parameter names for each INSERT + # Example: ($1, $2); ($3, $4); ($5, $6) instead of ($1, $2); ($1, $2); ($1, $2) + queries = [] + param_offset = 0 + for param_set in parameters_seq: + # Replace parameter placeholders with unique numbers + modified_query = query + for i in range(len(param_set)): + old_param = f"${i + 1}" + new_param = f"${param_offset + i + 1}" + modified_query = modified_query.replace(old_param, new_param) + queries.append(modified_query) + param_offset += len(param_set) + + combined_query = "; ".join(queries) + flattened_parameters = [ + param for param_set in parameters_seq for param in param_set + ] + return combined_query, [flattened_parameters] + class QmarkStatementPlanner(BaseStatementPlanner): """Statement planner for qmark parameter style.""" - def create_execution_plan( + def _create_standard_execution_plan( self, raw_query: str, parameters: Sequence[Sequence[ParameterType]], - skip_parsing: bool = False, - async_execution: bool = False, - streaming: bool = False, + skip_parsing: bool, + async_execution: bool, + streaming: bool, ) -> ExecutionPlan: """Create execution plan for qmark parameter style.""" queries: List[Union[SetParameter, str]] = ( @@ -142,6 +276,31 @@ def create_execution_plan( streaming=streaming, ) + def _create_bulk_plan_impl( + self, + raw_query: str, + parameters: Sequence[Sequence[ParameterType]], + async_execution: bool, + ) -> ExecutionPlan: + """Create bulk insert execution plan for qmark parameter style.""" + # Use formatter's bulk insert method to create combined query + combined_query = self.formatter.format_bulk_insert(raw_query, parameters) + + # Build query parameters for bulk insert + query_params: Dict[str, Any] = { + "output_format": self._get_output_format(False), + } + if async_execution: + query_params["async"] = True + + return ExecutionPlan( + queries=[combined_query], + query_params=query_params, + is_multi_statement=False, + async_execution=async_execution, + streaming=False, + ) + class StatementPlannerFactory: """Factory for creating statement planner instances based on paramstyle.""" @@ -168,6 +327,7 @@ def create_planner( ProgrammingError: If paramstyle is not supported """ planner_class = cls._PLANNER_CLASSES.get(paramstyle) + if planner_class is None: raise ProgrammingError(f"Unsupported paramstyle: {paramstyle}") diff --git a/src/firebolt/common/statement_formatter.py b/src/firebolt/common/statement_formatter.py index 27a79a031c..b43d7e8d8d 100644 --- a/src/firebolt/common/statement_formatter.py +++ b/src/firebolt/common/statement_formatter.py @@ -218,6 +218,30 @@ def split_format_sql( self.statement_to_set(st) or self.statement_to_sql(st) for st in statements ] + def format_bulk_insert( + self, query: str, parameters_seq: Sequence[Sequence[ParameterType]] + ) -> str: + """ + Format bulk insert operations by creating multiple INSERT statements. + + Args: + query: The base INSERT query template + parameters_seq: Sequence of parameter sets for each INSERT + + Returns: + Combined SQL string with all INSERT statements + """ + statements = parse_sql(query) + if not statements: + raise DataError("Invalid SQL query for bulk insert") + + formatted_queries = [] + for param_set in parameters_seq: + formatted_query = self.format_statement(statements[0], param_set) + formatted_queries.append(formatted_query) + + return "; ".join(formatted_queries) + def create_statement_formatter(version: int) -> StatementFormatter: if version == 1: diff --git a/src/firebolt/db/cursor.py b/src/firebolt/db/cursor.py index 0d6deab171..b0c67dcedb 100644 --- a/src/firebolt/db/cursor.py +++ b/src/firebolt/db/cursor.py @@ -224,9 +224,11 @@ def _do_execute( timeout: Optional[float] = None, async_execution: bool = False, streaming: bool = False, + bulk_insert: bool = False, ) -> None: self._close_rowset_and_reset() self._row_set = StreamingRowSet() if streaming else InMemoryRowSet() + # Import paramstyle from module level from firebolt.db import paramstyle @@ -236,7 +238,12 @@ def _do_execute( ) plan = statement_planner.create_execution_plan( - raw_query, parameters, skip_parsing, async_execution, streaming + raw_query, + parameters, + skip_parsing, + async_execution, + streaming, + bulk_insert, ) self._execute_plan(plan, timeout) self._state = CursorState.DONE @@ -387,6 +394,7 @@ def executemany( query: str, parameters_seq: Sequence[Sequence[ParameterType]], timeout_seconds: Optional[float] = None, + bulk_insert: bool = False, ) -> Union[int, str]: """Prepare and execute a database query. @@ -404,6 +412,9 @@ def executemany( `SET param=value` statement before it. All parameters are stored in cursor object until it's closed. They can also be removed with `flush_parameters` method call. + Bulk insert: When bulk_insert=True, multiple INSERT queries are + concatenated and sent as a single batch for improved performance. + Only supported for INSERT statements. Args: query (str): SQL query to execute. @@ -412,11 +423,15 @@ def executemany( query with actual values from each set in a sequence. Resulting queries for each subset are executed sequentially. timeout_seconds (Optional[float]): Query execution timeout in seconds. + bulk_insert (bool): When True, concatenates multiple INSERT queries + into a single batch request. Only supported for INSERT statements. Returns: int: Query row count. """ - self._do_execute(query, parameters_seq, timeout=timeout_seconds) + self._do_execute( + query, parameters_seq, timeout=timeout_seconds, bulk_insert=bulk_insert + ) return self.rowcount @check_not_closed diff --git a/tests/integration/dbapi/async/V2/test_queries_async.py b/tests/integration/dbapi/async/V2/test_queries_async.py index e2042e4183..27d4186250 100644 --- a/tests/integration/dbapi/async/V2/test_queries_async.py +++ b/tests/integration/dbapi/async/V2/test_queries_async.py @@ -4,8 +4,10 @@ from random import randint from typing import Callable, List, Tuple +import trio from pytest import mark, raises +import firebolt.async_db from firebolt.async_db import Binary, Connection, Cursor, OperationalError from firebolt.async_db.connection import connect from firebolt.client.auth.base import Auth @@ -282,6 +284,75 @@ async def test_parameterized_query_with_special_chars(connection: Connection) -> ], "Invalid data in table after parameterized insert" +@mark.parametrize( + "paramstyle,query,test_data", + [ + ( + "fb_numeric", + 'INSERT INTO "test_tbl" VALUES ($1, $2)', + [(1, "alice"), (2, "bob"), (3, "charlie")], + ), + ( + "qmark", + 'INSERT INTO "test_tbl" VALUES (?, ?)', + [(4, "david"), (5, "eve"), (6, "frank")], + ), + ], +) +async def test_executemany_bulk_insert_paramstyles( + connection: Connection, + paramstyle: str, + query: str, + test_data: List[Tuple], + create_drop_test_table_setup_teardown_async: Callable, +) -> None: + """executemany with bulk_insert=True works correctly for both paramstyles.""" + # Set the paramstyle for this test + original_paramstyle = firebolt.async_db.paramstyle + firebolt.async_db.paramstyle = paramstyle + # Generate a unique label for this test execution + unique_label = f"test_bulk_insert_async_{paramstyle}_{randint(100000, 999999)}" + table_name = "test_tbl" + + try: + c = connection.cursor() + + # Can't do this for fb_numeric yet - FIR-49970 + if paramstyle != "fb_numeric": + await c.execute(f"SET query_label = '{unique_label}'") + + # Execute bulk insert + await c.executemany( + query, + test_data, + bulk_insert=True, + ) + + # Verify the data was inserted correctly + await c.execute(f'SELECT * FROM "{table_name}" ORDER BY id') + data = await c.fetchall() + assert len(data) == len(test_data) + for i, (expected_id, expected_name) in enumerate(test_data): + assert data[i] == [expected_id, expected_name] + + # Verify that only one INSERT query was executed with our unique label + # Can't do this for fb_numeric yet - FIR-49970 + if paramstyle != "fb_numeric": + # Wait a moment to ensure query history is updated + await trio.sleep(10) + await c.execute( + "SELECT COUNT(*) FROM information_schema.engine_query_history " + f"WHERE query_label = '{unique_label}' AND query_text LIKE 'INSERT INTO%'" + " AND status = 'ENDED_SUCCESSFULLY'" + ) + query_count = (await c.fetchone())[0] + assert ( + query_count == 1 + ), f"Expected 1 INSERT query with label '{unique_label}', but found {query_count}" + finally: + firebolt.async_db.paramstyle = original_paramstyle + + async def test_multi_statement_query(connection: Connection) -> None: """Query parameters are handled properly""" diff --git a/tests/integration/dbapi/conftest.py b/tests/integration/dbapi/conftest.py index 6ec89708a8..7fc257d10c 100644 --- a/tests/integration/dbapi/conftest.py +++ b/tests/integration/dbapi/conftest.py @@ -12,9 +12,7 @@ LOGGER = getLogger(__name__) -CREATE_TEST_TABLE = ( - 'CREATE DIMENSION TABLE IF NOT EXISTS "test_tbl" (id int, name string)' -) +CREATE_TEST_TABLE = 'CREATE TABLE IF NOT EXISTS "test_tbl" (id int, name string)' DROP_TEST_TABLE = 'DROP TABLE IF EXISTS "test_tbl" CASCADE' LONG_SELECT_DEFAULT_V1 = 250000000000 @@ -59,7 +57,7 @@ def create_drop_test_table_setup_teardown(connection: Connection) -> None: @fixture async def create_drop_test_table_setup_teardown_async(connection: Connection) -> None: - with connection.cursor() as c: + async with connection.cursor() as c: await c.execute(CREATE_TEST_TABLE) yield c await c.execute(DROP_TEST_TABLE) diff --git a/tests/integration/dbapi/sync/V2/test_queries.py b/tests/integration/dbapi/sync/V2/test_queries.py index 9b415cef88..55b99d5340 100644 --- a/tests/integration/dbapi/sync/V2/test_queries.py +++ b/tests/integration/dbapi/sync/V2/test_queries.py @@ -1,4 +1,5 @@ import math +import time from datetime import date, datetime from decimal import Decimal from random import randint @@ -7,6 +8,7 @@ from pytest import mark, raises +import firebolt.db from firebolt.client.auth import Auth from firebolt.common._types import ColType from firebolt.common.row_set.types import Column @@ -283,6 +285,75 @@ def test_empty_query(c: Cursor, query: str, params: tuple) -> None: ) +@mark.parametrize( + "paramstyle,query,test_data", + [ + ( + "fb_numeric", + 'INSERT INTO "test_tbl" VALUES ($1, $2)', + [(1, "alice"), (2, "bob"), (3, "charlie")], + ), + ( + "qmark", + 'INSERT INTO "test_tbl" VALUES (?, ?)', + [(4, "david"), (5, "eve"), (6, "frank")], + ), + ], +) +def test_executemany_bulk_insert_paramstyles( + connection: Connection, + paramstyle: str, + query: str, + test_data: List[Tuple], + create_drop_test_table_setup_teardown: Callable, +) -> None: + """executemany with bulk_insert=True works correctly for both paramstyles.""" + # Set the paramstyle for this test + original_paramstyle = firebolt.db.paramstyle + firebolt.db.paramstyle = paramstyle + # Generate a unique label for this test execution + unique_label = f"test_bulk_insert_{paramstyle}_{randint(100000, 999999)}" + table_name = "test_tbl" + + try: + c = connection.cursor() + + # Can't do this for fb_numeric yet - FIR-49970 + if paramstyle != "fb_numeric": + c.execute(f"SET query_label = '{unique_label}'") + + # Execute bulk insert + c.executemany( + query, + test_data, + bulk_insert=True, + ) + + # Verify the data was inserted correctly + c.execute(f'SELECT * FROM "{table_name}" ORDER BY id') + data = c.fetchall() + assert len(data) == len(test_data) + for i, (expected_id, expected_name) in enumerate(test_data): + assert data[i] == [expected_id, expected_name] + + # Verify that only one INSERT query was executed with our unique label + # Can't do this for fb_numeric yet - FIR-49970 + if paramstyle != "fb_numeric": + # Wait a moment to ensure query history is updated + time.sleep(10) + c.execute( + "SELECT COUNT(*) FROM information_schema.engine_query_history " + f"WHERE query_label = '{unique_label}' AND query_text LIKE 'INSERT INTO%'" + " AND status = 'ENDED_SUCCESSFULLY'" + ) + query_count = c.fetchone()[0] + assert ( + query_count == 1 + ), f"Expected 1 INSERT query with label '{unique_label}', but found {query_count}" + finally: + firebolt.db.paramstyle = original_paramstyle + + def test_multi_statement_query(connection: Connection) -> None: """Query parameters are handled properly""" diff --git a/tests/unit/async_db/test_cursor.py b/tests/unit/async_db/test_cursor.py index d26345d163..f7bb296671 100644 --- a/tests/unit/async_db/test_cursor.py +++ b/tests/unit/async_db/test_cursor.py @@ -1,3 +1,4 @@ +import json import re import time from datetime import date, datetime @@ -1505,3 +1506,160 @@ async def test_unsupported_paramstyle_raises(cursor: Cursor) -> None: await cursor.execute("SELECT 1") finally: db.paramstyle = original_paramstyle + + +async def test_executemany_bulk_insert_qmark_works( + httpx_mock: HTTPXMock, + cursor: Cursor, + query_url: str, +): + """executemany with bulk_insert=True works with qmark paramstyle.""" + + def bulk_insert_callback(request): + query = request.content.decode() + # Should contain multiple INSERT statements + assert query.count("INSERT INTO") == 3 + assert "; " in query + + return Response( + status_code=200, + content=json.dumps( + { + "meta": [], + "data": [], + "rows": 0, + "statistics": { + "elapsed": 0.0, + "rows_read": 0, + "bytes_read": 0, + }, + } + ), + headers={}, + ) + + base_url = str(query_url).split("?")[0] + url_pattern = re.compile(re.escape(base_url)) + httpx_mock.add_callback(bulk_insert_callback, url=url_pattern) + + result = await cursor.executemany( + "INSERT INTO test_table VALUES (?, ?)", + [(1, "a"), (2, "b"), (3, "c")], + bulk_insert=True, + ) + assert result == 0 + + +async def test_executemany_bulk_insert_fb_numeric( + httpx_mock: HTTPXMock, + cursor: Cursor, + query_url: str, +): + """executemany with bulk_insert=True and FB_NUMERIC style.""" + import firebolt.async_db as db_module + + original_paramstyle = db_module.paramstyle + + try: + db_module.paramstyle = "fb_numeric" + + def bulk_insert_callback(request): + query = request.content.decode() + assert query.count("INSERT INTO") == 3 + assert "; " in query + + query_params = json.loads(request.url.params.get("query_parameters", "[]")) + assert len(query_params) == 6 + assert query_params[0]["name"] == "$1" + assert query_params[5]["name"] == "$6" + + return Response( + status_code=200, + content=json.dumps( + { + "meta": [], + "data": [], + "rows": 0, + "statistics": { + "elapsed": 0.0, + "rows_read": 0, + "bytes_read": 0, + }, + } + ), + headers={}, + ) + + base_url = str(query_url).split("?")[0] + url_pattern = re.compile(re.escape(base_url)) + httpx_mock.add_callback(bulk_insert_callback, url=url_pattern) + + result = await cursor.executemany( + "INSERT INTO test_table VALUES ($1, $2)", + [(1, "a"), (2, "b"), (3, "c")], + bulk_insert=True, + ) + assert result == 0 + finally: + db_module.paramstyle = original_paramstyle + + +async def test_executemany_bulk_insert_non_insert_fails( + cursor: Cursor, fb_numeric_paramstyle +): + """executemany with bulk_insert=True fails for non-INSERT queries.""" + with raises(ConfigurationError, match="bulk_insert is only supported for INSERT"): + await cursor.executemany( + "SELECT * FROM test_table", + [()], + bulk_insert=True, + ) + + with raises(ConfigurationError, match="bulk_insert is only supported for INSERT"): + await cursor.executemany( + "UPDATE test_table SET col = $1", + [(1,)], + bulk_insert=True, + ) + + with raises(ConfigurationError, match="bulk_insert is only supported for INSERT"): + await cursor.executemany( + "DELETE FROM test_table WHERE id = $1", + [(1,)], + bulk_insert=True, + ) + + +async def test_executemany_bulk_insert_multi_statement_fails( + cursor: Cursor, fb_numeric_paramstyle +): + """executemany with bulk_insert=True fails for multi-statement queries.""" + with raises( + ProgrammingError, match="bulk_insert does not support multi-statement queries" + ): + await cursor.executemany( + "INSERT INTO test_table VALUES ($1); SELECT * FROM test_table", + [(1,)], + bulk_insert=True, + ) + + with raises( + ProgrammingError, match="bulk_insert does not support multi-statement queries" + ): + await cursor.executemany( + "INSERT INTO test_table VALUES ($1); INSERT INTO test_table VALUES ($2)", + [(1,), (2,)], + bulk_insert=True, + ) + + +async def test_executemany_bulk_insert_empty_params_fails( + cursor: Cursor, fb_numeric_paramstyle +): + """executemany with bulk_insert=True fails with empty parameters.""" + with raises(ProgrammingError, match="requires at least one parameter set"): + await cursor.executemany( + "INSERT INTO test_table VALUES ($1)", + [], + bulk_insert=True, + ) diff --git a/tests/unit/common/cursor/test_statement_planners.py b/tests/unit/common/cursor/test_statement_planners.py index 368fd51c6d..5cf758becf 100644 --- a/tests/unit/common/cursor/test_statement_planners.py +++ b/tests/unit/common/cursor/test_statement_planners.py @@ -83,7 +83,9 @@ def test_fb_numeric_planner_initialization(formatter): def test_fb_numeric_basic_execution_plan(fb_numeric_planner): """Test basic execution plan creation.""" - plan = fb_numeric_planner.create_execution_plan("SELECT $1", [[42]]) + plan = fb_numeric_planner.create_execution_plan( + "SELECT $1", [[42]], bulk_insert=False + ) assert len(plan.queries) == 1 assert plan.queries[0] == "SELECT $1" @@ -96,7 +98,9 @@ def test_fb_numeric_basic_execution_plan(fb_numeric_planner): def test_fb_numeric_execution_plan_with_parameters(fb_numeric_planner): """Test execution plan with parameters.""" parameters = [[42, "test", True]] - plan = fb_numeric_planner.create_execution_plan("SELECT $1, $2, $3", parameters) + plan = fb_numeric_planner.create_execution_plan( + "SELECT $1, $2, $3", parameters, bulk_insert=False + ) assert plan.query_params is not None assert "query_parameters" in plan.query_params @@ -111,7 +115,7 @@ def test_fb_numeric_execution_plan_with_parameters(fb_numeric_planner): def test_fb_numeric_execution_plan_no_parameters(fb_numeric_planner): """Test execution plan without parameters.""" - plan = fb_numeric_planner.create_execution_plan("SELECT 1", []) + plan = fb_numeric_planner.create_execution_plan("SELECT 1", [], bulk_insert=False) assert plan.query_params is not None assert "query_parameters" not in plan.query_params @@ -185,7 +189,7 @@ def test_qmark_planner_initialization(formatter): def test_qmark_basic_execution_plan(qmark_planner): """Test basic execution plan creation.""" - plan = qmark_planner.create_execution_plan("SELECT ?", [[42]]) + plan = qmark_planner.create_execution_plan("SELECT ?", [[42]], bulk_insert=False) assert len(plan.queries) >= 1 # Could be split by formatter assert plan.query_params is not None @@ -257,7 +261,9 @@ def test_qmark_multi_statement_detection(qmark_planner, queries, expected_multi) # Mock the formatter to return queries qmark_planner.formatter.split_format_sql = Mock(return_value=queries) - plan = qmark_planner.create_execution_plan("SELECT 1; SELECT 2", [[]]) + plan = qmark_planner.create_execution_plan( + "SELECT 1; SELECT 2", [[]], bulk_insert=False + ) assert plan.is_multi_statement is expected_multi assert len(plan.queries) == len(queries) @@ -304,11 +310,38 @@ def test_statement_planner_factory_unsupported_paramstyle(formatter): StatementPlannerFactory.create_planner("unsupported", formatter) +@pytest.mark.parametrize( + "paramstyle,expected_class", + [ + ("fb_numeric", "FbNumericStatementPlanner"), + ("qmark", "QmarkStatementPlanner"), + ], +) +def test_statement_planner_factory_creates_correct_bulk_planners( + formatter, paramstyle, expected_class +): + """Test that factory creates unified planner types that support both standard and bulk operations.""" + planner = StatementPlannerFactory.create_planner(paramstyle, formatter) + + assert planner.__class__.__name__ == expected_class + assert planner.formatter == formatter + + +def test_statement_planner_factory_unified_planners(formatter): + """Test that planners now support both standard and bulk operations.""" + fb_planner = StatementPlannerFactory.create_planner("fb_numeric", formatter) + qmark_planner = StatementPlannerFactory.create_planner("qmark", formatter) + + # Check that planners are the same whether bulk_insert is True or False + assert isinstance(fb_planner, FbNumericStatementPlanner) + assert isinstance(qmark_planner, QmarkStatementPlanner) + + # Edge cases and error conditions def test_fb_numeric_empty_parameters_list(formatter): """Test fb_numeric with empty parameters list.""" planner = FbNumericStatementPlanner(formatter) - plan = planner.create_execution_plan("SELECT 1", []) + plan = planner.create_execution_plan("SELECT 1", [], bulk_insert=False) assert "query_parameters" not in plan.query_params assert plan.query_params["output_format"] == JSON_OUTPUT_FORMAT @@ -317,7 +350,9 @@ def test_fb_numeric_empty_parameters_list(formatter): def test_fb_numeric_none_parameters(formatter): """Test fb_numeric with None in parameters.""" planner = FbNumericStatementPlanner(formatter) - plan = planner.create_execution_plan("SELECT $1, $2", [[None, "test"]]) + plan = planner.create_execution_plan( + "SELECT $1, $2", [[None, "test"]], bulk_insert=False + ) query_params = json.loads(plan.query_params["query_parameters"]) assert query_params[0]["value"] is None @@ -327,7 +362,7 @@ def test_fb_numeric_none_parameters(formatter): def test_qmark_empty_query(formatter): """Test qmark with empty query.""" planner = QmarkStatementPlanner(formatter) - plan = planner.create_execution_plan("", [[]]) + plan = planner.create_execution_plan("", [[]], bulk_insert=False) assert plan.query_params is not None assert "output_format" in plan.query_params diff --git a/tests/unit/db/test_cursor.py b/tests/unit/db/test_cursor.py index e6325b6a5e..ceba19a174 100644 --- a/tests/unit/db/test_cursor.py +++ b/tests/unit/db/test_cursor.py @@ -1391,3 +1391,162 @@ def test_unsupported_paramstyle_raises(cursor): cursor.execute("SELECT 1") finally: db.paramstyle = original_paramstyle + + +def test_executemany_bulk_insert_qmark_works( + httpx_mock: HTTPXMock, + cursor: Cursor, + query_url: str, +): + """executemany with bulk_insert=True works with qmark paramstyle.""" + + def bulk_insert_callback(request): + query = request.content.decode() + # Should contain multiple INSERT statements + assert query.count("INSERT INTO") == 3 + assert "; " in query + + return Response( + status_code=200, + content=json.dumps( + { + "meta": [], + "data": [], + "rows": 0, + "statistics": { + "elapsed": 0.0, + "rows_read": 0, + "bytes_read": 0, + }, + } + ), + headers={}, + ) + + base_url = str(query_url).split("?")[0] + url_pattern = re.compile(re.escape(base_url)) + httpx_mock.add_callback(bulk_insert_callback, url=url_pattern) + + result = cursor.executemany( + "INSERT INTO test_table VALUES (?, ?)", + [(1, "a"), (2, "b"), (3, "c")], + bulk_insert=True, + ) + assert result == 0 + + +def test_executemany_bulk_insert_fb_numeric( + httpx_mock: HTTPXMock, + cursor: Cursor, + query_url: str, +): + """executemany with bulk_insert=True and FB_NUMERIC style.""" + import firebolt.db as db_module + + original_paramstyle = db_module.paramstyle + + try: + db_module.paramstyle = "fb_numeric" + + def bulk_insert_callback(request): + query = request.content.decode() + assert query.count("INSERT INTO") == 3 + assert "; " in query + + query_params = json.loads(request.url.params.get("query_parameters", "[]")) + assert len(query_params) == 6 + assert query_params[0]["name"] == "$1" + assert query_params[5]["name"] == "$6" + + return Response( + status_code=200, + content=json.dumps( + { + "meta": [], + "data": [], + "rows": 0, + "statistics": { + "elapsed": 0.0, + "rows_read": 0, + "bytes_read": 0, + }, + } + ), + headers={}, + ) + + base_url = str(query_url).split("?")[0] + url_pattern = re.compile(re.escape(base_url)) + httpx_mock.add_callback(bulk_insert_callback, url=url_pattern) + + result = cursor.executemany( + "INSERT INTO test_table VALUES ($1, $2)", + [(1, "a"), (2, "b"), (3, "c")], + bulk_insert=True, + ) + assert result == 0 + finally: + db_module.paramstyle = original_paramstyle + + +def test_executemany_bulk_insert_non_insert_fails( + cursor: Cursor, fb_numeric_paramstyle +): + """executemany with bulk_insert=True fails for non-INSERT queries.""" + with raises(ConfigurationError, match="bulk_insert is only supported for INSERT"): + cursor.executemany( + "SELECT * FROM test_table", + [()], + bulk_insert=True, + ) + + with raises(ConfigurationError, match="bulk_insert is only supported for INSERT"): + cursor.executemany( + "UPDATE test_table SET col = $1", + [(1,)], + bulk_insert=True, + ) + + with raises(ConfigurationError, match="bulk_insert is only supported for INSERT"): + cursor.executemany( + "DELETE FROM test_table WHERE id = $1", + [(1,)], + bulk_insert=True, + ) + + +def test_executemany_bulk_insert_multi_statement_fails( + cursor: Cursor, fb_numeric_paramstyle +): + """executemany with bulk_insert=True fails for multi-statement queries.""" + with raises( + ProgrammingError, match="bulk_insert does not support multi-statement queries" + ): + cursor.executemany( + "INSERT INTO test_table VALUES ($1); SELECT * FROM test_table", + [(1,)], + bulk_insert=True, + ) + + with raises( + ProgrammingError, match="bulk_insert does not support multi-statement queries" + ): + cursor.executemany( + "INSERT INTO test_table VALUES ($1); INSERT INTO test_table VALUES ($2)", + [(1,), (2,)], + bulk_insert=True, + ) + + +def test_executemany_bulk_insert_empty_params_fails( + cursor: Cursor, fb_numeric_paramstyle +): + """executemany with bulk_insert=True fails with empty parameters.""" + with raises( + ProgrammingError, match="bulk_insert requires at least one parameter set" + ): + cursor.executemany( + "INSERT INTO test_table VALUES ($1)", + [], + bulk_insert=True, + )