Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 61 additions & 9 deletions docsrc/Connecting_and_queries.rst
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,14 @@ To get started, follow the steps below:
) as connection:
# Create a cursor
cursor = connection.cursor()

# Execute a simple test query
cursor.execute("SELECT 1")

.. note::

Firebolt Core is assumed to be running locally on the default port (3473). For instructions
on how to run Firebolt Core locally using Docker, refer to the
Firebolt Core is assumed to be running locally on the default port (3473). For instructions
on how to run Firebolt Core locally using Docker, refer to the
`official docs <https://docs.firebolt.io/firebolt-core/firebolt-core-get-started>`_.


Expand Down Expand Up @@ -404,7 +404,7 @@ parameters equal in length to the number of placeholders in the statement.
"INSERT INTO test_table2 VALUES ($1, $2, $3)",
(2, "world", "2018-01-02"),
)

# paramstyle only needs to be set once, it will be used for all subsequent queries

cursor.execute(
Expand Down Expand Up @@ -437,6 +437,58 @@ as values in the second argument.
cursor.close()


Bulk insert for improved performance
--------------------------------------

For inserting large amounts of data more efficiently, you can use the ``bulk_insert`` parameter
with ``executemany()``. This concatenates multiple INSERT statements into a single batch request,
which can significantly improve performance when inserting many rows.

**Note:** The ``bulk_insert`` parameter only works with INSERT statements and supports both
``fb_numeric`` and ``qmark`` parameter styles. Using it with other statement types will
raise an error.

**Example with QMARK parameter style (default):**

::

# Using the default qmark parameter style
cursor.executemany(
"INSERT INTO test_table VALUES (?, ?, ?)",
(
(1, "apple", "2019-01-01"),
(2, "banana", "2020-01-01"),
(3, "carrot", "2021-01-01"),
(4, "donut", "2022-01-01"),
(5, "eggplant", "2023-01-01")
),
bulk_insert=True # Enable bulk insert for better performance
)

**Example with FB_NUMERIC parameter style:**

::

import firebolt.db
# Set paramstyle to "fb_numeric" for server-side parameter substitution
firebolt.db.paramstyle = "fb_numeric"

cursor.executemany(
"INSERT INTO test_table VALUES ($1, $2, $3)",
(
(1, "apple", "2019-01-01"),
(2, "banana", "2020-01-01"),
(3, "carrot", "2021-01-01"),
(4, "donut", "2022-01-01"),
(5, "eggplant", "2023-01-01")
),
bulk_insert=True # Enable bulk insert for better performance
)

When ``bulk_insert=True``, the SDK concatenates all INSERT statements into a single batch
and sends them to the server for optimized batch processing.


Setting session parameters
--------------------------------------

Expand Down Expand Up @@ -731,7 +783,7 @@ of execute_async is -1, which is the rowcount for queries where it's not applica
cursor.execute_async("INSERT INTO my_table VALUES (5, 'egg', '2022-01-01')")
token = cursor.async_query_token

Trying to access `async_query_token` before calling `execute_async` will raise an exception.
Trying to access `async_query_token` before calling `execute_async` will raise an exception.

.. note::
Multiple-statement queries are not supported for asynchronous queries. However, you can run each statement
Expand All @@ -746,9 +798,9 @@ Monitoring the query status
To check the async query status you need to retrieve the token of the query. The token is a unique
identifier for the query and can be used to fetch the query status. You can store this token
outside of the current process and use it later to check the query status. :ref:`Connection <firebolt.db:Connection>` object
has two methods to check the query status: :py:meth:`firebolt.db.connection.Connection.is_async_query_running` and
:py:meth:`firebolt.db.connection.Connection.is_async_query_successful`.`is_async_query_running` will return True
if the query is still running, and False otherwise. `is_async_query_successful` will return True if the query
has two methods to check the query status: :py:meth:`firebolt.db.connection.Connection.is_async_query_running` and
:py:meth:`firebolt.db.connection.Connection.is_async_query_successful`.`is_async_query_running` will return True
if the query is still running, and False otherwise. `is_async_query_successful` will return True if the query
has finished successfully, None if query is still running and False if the query has failed.

::
Expand Down Expand Up @@ -779,7 +831,7 @@ will send a cancel request to the server and the query will be stopped.

token = cursor.async_query_token
connection.cancel_async_query(token)

# Verify that the query was cancelled
running = connection.is_async_query_running(token)
print(running) # False
Expand Down
19 changes: 17 additions & 2 deletions src/firebolt/async_db/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,11 @@ async def _do_execute(
timeout: Optional[float] = None,
async_execution: bool = False,
streaming: bool = False,
bulk_insert: bool = False,
) -> None:
await self._close_rowset_and_reset()
self._row_set = StreamingAsyncRowSet() if streaming else InMemoryAsyncRowSet()

# Import paramstyle from module level
from firebolt.async_db import paramstyle

Expand All @@ -230,7 +232,12 @@ async def _do_execute(
)

plan = statement_planner.create_execution_plan(
raw_query, parameters, skip_parsing, async_execution, streaming
raw_query,
parameters,
skip_parsing,
async_execution,
streaming,
bulk_insert,
)
await self._execute_plan(plan, timeout)
self._state = CursorState.DONE
Expand Down Expand Up @@ -385,6 +392,7 @@ async def executemany(
query: str,
parameters_seq: Sequence[Sequence[ParameterType]],
timeout_seconds: Optional[float] = None,
bulk_insert: bool = False,
) -> Union[int, str]:
"""Prepare and execute a database query.

Expand All @@ -402,6 +410,9 @@ async def executemany(
`SET param=value` statement before it. All parameters are stored in
cursor object until it's closed. They can also be removed with
`flush_parameters` method call.
Bulk insert: When bulk_insert=True, multiple INSERT queries are
concatenated and sent as a single batch for improved performance.
Only supported for INSERT statements.

Args:
query (str): SQL query to execute.
Expand All @@ -410,11 +421,15 @@ async def executemany(
query with actual values from each set in a sequence. Resulting queries
for each subset are executed sequentially.
timeout_seconds (Optional[float]): Query execution timeout in seconds.
bulk_insert (bool): When True, concatenates multiple INSERT queries
into a single batch request. Only supported for INSERT statements.

Returns:
int: Query row count.
"""
await self._do_execute(query, parameters_seq, timeout=timeout_seconds)
await self._do_execute(
query, parameters_seq, timeout=timeout_seconds, bulk_insert=bulk_insert
)
return self.rowcount

@check_not_closed
Expand Down
Loading
Loading