Skip to content

Commit 14b8444

Browse files
authored
Merge branch 'main' into feat-remove-parameters
2 parents 776443e + 49e7bfc commit 14b8444

File tree

13 files changed

+1411
-223
lines changed

13 files changed

+1411
-223
lines changed

docsrc/Connecting_and_queries.rst

Lines changed: 61 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -196,14 +196,14 @@ To get started, follow the steps below:
196196
) as connection:
197197
# Create a cursor
198198
cursor = connection.cursor()
199-
199+
200200
# Execute a simple test query
201201
cursor.execute("SELECT 1")
202202

203203
.. note::
204204

205-
Firebolt Core is assumed to be running locally on the default port (3473). For instructions
206-
on how to run Firebolt Core locally using Docker, refer to the
205+
Firebolt Core is assumed to be running locally on the default port (3473). For instructions
206+
on how to run Firebolt Core locally using Docker, refer to the
207207
`official docs <https://docs.firebolt.io/firebolt-core/firebolt-core-get-started>`_.
208208

209209

@@ -404,7 +404,7 @@ parameters equal in length to the number of placeholders in the statement.
404404
"INSERT INTO test_table2 VALUES ($1, $2, $3)",
405405
(2, "world", "2018-01-02"),
406406
)
407-
407+
408408
# paramstyle only needs to be set once, it will be used for all subsequent queries
409409

410410
cursor.execute(
@@ -437,6 +437,58 @@ as values in the second argument.
437437
cursor.close()
438438

439439

440+
Bulk insert for improved performance
441+
--------------------------------------
442+
443+
For inserting large amounts of data more efficiently, you can use the ``bulk_insert`` parameter
444+
with ``executemany()``. This concatenates multiple INSERT statements into a single batch request,
445+
which can significantly improve performance when inserting many rows.
446+
447+
**Note:** The ``bulk_insert`` parameter only works with INSERT statements and supports both
448+
``fb_numeric`` and ``qmark`` parameter styles. Using it with other statement types will
449+
raise an error.
450+
451+
**Example with QMARK parameter style (default):**
452+
453+
::
454+
455+
# Using the default qmark parameter style
456+
cursor.executemany(
457+
"INSERT INTO test_table VALUES (?, ?, ?)",
458+
(
459+
(1, "apple", "2019-01-01"),
460+
(2, "banana", "2020-01-01"),
461+
(3, "carrot", "2021-01-01"),
462+
(4, "donut", "2022-01-01"),
463+
(5, "eggplant", "2023-01-01")
464+
),
465+
bulk_insert=True # Enable bulk insert for better performance
466+
)
467+
468+
**Example with FB_NUMERIC parameter style:**
469+
470+
::
471+
472+
import firebolt.db
473+
# Set paramstyle to "fb_numeric" for server-side parameter substitution
474+
firebolt.db.paramstyle = "fb_numeric"
475+
476+
cursor.executemany(
477+
"INSERT INTO test_table VALUES ($1, $2, $3)",
478+
(
479+
(1, "apple", "2019-01-01"),
480+
(2, "banana", "2020-01-01"),
481+
(3, "carrot", "2021-01-01"),
482+
(4, "donut", "2022-01-01"),
483+
(5, "eggplant", "2023-01-01")
484+
),
485+
bulk_insert=True # Enable bulk insert for better performance
486+
)
487+
488+
When ``bulk_insert=True``, the SDK concatenates all INSERT statements into a single batch
489+
and sends them to the server for optimized batch processing.
490+
491+
440492
Setting session parameters
441493
--------------------------------------
442494

@@ -731,7 +783,7 @@ of execute_async is -1, which is the rowcount for queries where it's not applica
731783
cursor.execute_async("INSERT INTO my_table VALUES (5, 'egg', '2022-01-01')")
732784
token = cursor.async_query_token
733785

734-
Trying to access `async_query_token` before calling `execute_async` will raise an exception.
786+
Trying to access `async_query_token` before calling `execute_async` will raise an exception.
735787

736788
.. note::
737789
Multiple-statement queries are not supported for asynchronous queries. However, you can run each statement
@@ -746,9 +798,9 @@ Monitoring the query status
746798
To check the async query status you need to retrieve the token of the query. The token is a unique
747799
identifier for the query and can be used to fetch the query status. You can store this token
748800
outside of the current process and use it later to check the query status. :ref:`Connection <firebolt.db:Connection>` object
749-
has two methods to check the query status: :py:meth:`firebolt.db.connection.Connection.is_async_query_running` and
750-
:py:meth:`firebolt.db.connection.Connection.is_async_query_successful`.`is_async_query_running` will return True
751-
if the query is still running, and False otherwise. `is_async_query_successful` will return True if the query
801+
has two methods to check the query status: :py:meth:`firebolt.db.connection.Connection.is_async_query_running` and
802+
:py:meth:`firebolt.db.connection.Connection.is_async_query_successful`.`is_async_query_running` will return True
803+
if the query is still running, and False otherwise. `is_async_query_successful` will return True if the query
752804
has finished successfully, None if query is still running and False if the query has failed.
753805

754806
::
@@ -779,7 +831,7 @@ will send a cancel request to the server and the query will be stopped.
779831

780832
token = cursor.async_query_token
781833
connection.cancel_async_query(token)
782-
834+
783835
# Verify that the query was cancelled
784836
running = connection.is_async_query_running(token)
785837
print(running) # False

src/firebolt/async_db/cursor.py

Lines changed: 65 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
UPDATE_ENDPOINT_HEADER,
2727
UPDATE_PARAMETERS_HEADER,
2828
CursorState,
29-
ParameterStyle,
3029
)
3130
from firebolt.common.cursor.base_cursor import (
3231
BaseCursor,
@@ -40,6 +39,10 @@
4039
check_not_closed,
4140
check_query_executed,
4241
)
42+
from firebolt.common.cursor.statement_planners import (
43+
ExecutionPlan,
44+
StatementPlannerFactory,
45+
)
4346
from firebolt.common.row_set.asynchronous.base import BaseAsyncRowSet
4447
from firebolt.common.row_set.asynchronous.in_memory import InMemoryAsyncRowSet
4548
from firebolt.common.row_set.asynchronous.streaming import StreamingAsyncRowSet
@@ -221,124 +224,98 @@ async def _do_execute(
221224
timeout: Optional[float] = None,
222225
async_execution: bool = False,
223226
streaming: bool = False,
227+
bulk_insert: bool = False,
224228
) -> None:
225229
await self._close_rowset_and_reset()
226230
self._row_set = StreamingAsyncRowSet() if streaming else InMemoryAsyncRowSet()
231+
227232
# Import paramstyle from module level
228233
from firebolt.async_db import paramstyle
229234

230235
try:
231-
parameter_style = ParameterStyle(paramstyle)
232-
except ValueError:
233-
raise ProgrammingError(f"Unsupported paramstyle: {paramstyle}")
234-
try:
235-
if parameter_style == ParameterStyle.FB_NUMERIC:
236-
await self._execute_fb_numeric(
237-
raw_query, parameters, timeout, async_execution, streaming
238-
)
239-
else:
240-
queries: List[Union[SetParameter, str]] = (
241-
[raw_query]
242-
if skip_parsing
243-
else self._formatter.split_format_sql(raw_query, parameters)
244-
)
245-
timeout_controller = TimeoutController(timeout)
246-
if len(queries) > 1 and async_execution:
247-
raise FireboltError(
248-
"Server side async does not support multi-statement queries"
249-
)
250-
for query in queries:
251-
await self._execute_single_query(
252-
query, timeout_controller, async_execution, streaming
253-
)
236+
statement_planner = StatementPlannerFactory.create_planner(
237+
paramstyle, self._formatter
238+
)
239+
240+
plan = statement_planner.create_execution_plan(
241+
raw_query,
242+
parameters,
243+
skip_parsing,
244+
async_execution,
245+
streaming,
246+
bulk_insert,
247+
)
248+
await self._execute_plan(plan, timeout)
254249
self._state = CursorState.DONE
255250
except Exception:
256251
self._state = CursorState.ERROR
257252
raise
258253

259-
async def _execute_fb_numeric(
254+
async def _execute_plan(
260255
self,
261-
query: str,
262-
parameters: Sequence[Sequence[ParameterType]],
256+
plan: ExecutionPlan,
263257
timeout: Optional[float],
264-
async_execution: bool,
265-
streaming: bool,
266258
) -> None:
267-
Cursor._log_query(query)
259+
"""Execute an execution plan."""
268260
timeout_controller = TimeoutController(timeout)
269-
timeout_controller.raise_if_timeout()
270-
query_params = self._build_fb_numeric_query_params(
271-
parameters, streaming, async_execution
272-
)
273-
resp = await self._api_request(
274-
query,
275-
query_params,
276-
timeout=timeout_controller.remaining(),
277-
)
278-
await self._raise_if_error(resp)
279-
if async_execution:
280-
await resp.aread()
281-
self._parse_async_response(resp)
282-
else:
283-
await self._parse_response_headers(resp.headers)
284-
await self._append_row_set_from_response(resp)
261+
262+
for query in plan.queries:
263+
if isinstance(query, SetParameter):
264+
if plan.async_execution:
265+
raise FireboltError(
266+
"Server side async does not support set statements, "
267+
"please use execute to set this parameter"
268+
)
269+
await self._validate_set_parameter(
270+
query, timeout_controller.remaining()
271+
)
272+
else:
273+
# Regular query execution
274+
await self._execute_single_query(
275+
query,
276+
plan.query_params,
277+
timeout_controller,
278+
plan.async_execution,
279+
plan.streaming,
280+
)
285281

286282
async def _execute_single_query(
287283
self,
288-
query: Union[SetParameter, str],
284+
query: str,
285+
query_params: Optional[Dict[str, Any]],
289286
timeout_controller: TimeoutController,
290287
async_execution: bool,
291288
streaming: bool,
292289
) -> None:
290+
"""Execute a single query."""
293291
start_time = time.time()
294292
Cursor._log_query(query)
295293
timeout_controller.raise_if_timeout()
296294

297-
if isinstance(query, SetParameter):
298-
if async_execution:
299-
raise FireboltError(
300-
"Server side async does not support set statements, "
301-
"please use execute to set this parameter"
302-
)
303-
await self._validate_set_parameter(query, timeout_controller.remaining())
304-
else:
305-
await self._handle_query_execution(
306-
query, timeout_controller, async_execution, streaming
307-
)
295+
final_params = query_params or {}
308296

309-
if not async_execution:
310-
logger.info(
311-
f"Query fetched {self.rowcount} rows in"
312-
f" {time.time() - start_time} seconds."
313-
)
314-
else:
315-
logger.info("Query submitted for async execution.")
316-
317-
async def _handle_query_execution(
318-
self,
319-
query: str,
320-
timeout_controller: TimeoutController,
321-
async_execution: bool,
322-
streaming: bool,
323-
) -> None:
324-
query_params: Dict[str, Any] = {
325-
"output_format": self._get_output_format(streaming)
326-
}
327-
if async_execution:
328-
query_params["async"] = True
329297
resp = await self._api_request(
330298
query,
331-
query_params,
299+
final_params,
332300
timeout=timeout_controller.remaining(),
333301
)
334302
await self._raise_if_error(resp)
303+
335304
if async_execution:
336305
await resp.aread()
337306
self._parse_async_response(resp)
338307
else:
339308
await self._parse_response_headers(resp.headers)
340309
await self._append_row_set_from_response(resp)
341310

311+
if not async_execution:
312+
logger.info(
313+
f"Query fetched {self.rowcount} rows in"
314+
f" {time.time() - start_time} seconds."
315+
)
316+
else:
317+
logger.info("Query submitted for async execution.")
318+
342319
async def use_database(self, database: str, cache: bool = True) -> None:
343320
"""Switch the current database context with caching."""
344321
if cache:
@@ -421,6 +398,7 @@ async def executemany(
421398
query: str,
422399
parameters_seq: Sequence[Sequence[ParameterType]],
423400
timeout_seconds: Optional[float] = None,
401+
bulk_insert: bool = False,
424402
) -> Union[int, str]:
425403
"""Prepare and execute a database query.
426404
@@ -438,6 +416,9 @@ async def executemany(
438416
`SET param=value` statement before it. All parameters are stored in
439417
cursor object until it's closed. They can also be removed with
440418
`flush_parameters` method call.
419+
Bulk insert: When bulk_insert=True, multiple INSERT queries are
420+
concatenated and sent as a single batch for improved performance.
421+
Only supported for INSERT statements.
441422
442423
Args:
443424
query (str): SQL query to execute.
@@ -446,11 +427,15 @@ async def executemany(
446427
query with actual values from each set in a sequence. Resulting queries
447428
for each subset are executed sequentially.
448429
timeout_seconds (Optional[float]): Query execution timeout in seconds.
430+
bulk_insert (bool): When True, concatenates multiple INSERT queries
431+
into a single batch request. Only supported for INSERT statements.
449432
450433
Returns:
451434
int: Query row count.
452435
"""
453-
await self._do_execute(query, parameters_seq, timeout=timeout_seconds)
436+
await self._do_execute(
437+
query, parameters_seq, timeout=timeout_seconds, bulk_insert=bulk_insert
438+
)
454439
return self.rowcount
455440

456441
@check_not_closed

0 commit comments

Comments
 (0)