|
4 | 4 | from random import randint |
5 | 5 | from typing import Callable, List, Tuple |
6 | 6 |
|
| 7 | +import trio |
7 | 8 | from pytest import mark, raises |
8 | 9 |
|
9 | 10 | import firebolt.async_db |
@@ -283,34 +284,73 @@ async def test_parameterized_query_with_special_chars(connection: Connection) -> |
283 | 284 | ], "Invalid data in table after parameterized insert" |
284 | 285 |
|
285 | 286 |
|
286 | | -async def test_executemany_bulk_insert( |
287 | | - connection: Connection, fb_numeric_paramstyle: None |
| 287 | +@mark.parametrize( |
| 288 | + "paramstyle,query,test_data", |
| 289 | + [ |
| 290 | + ( |
| 291 | + "fb_numeric", |
| 292 | + 'INSERT INTO "test_tbl" VALUES ($1, $2)', |
| 293 | + [(1, "alice"), (2, "bob"), (3, "charlie")], |
| 294 | + ), |
| 295 | + ( |
| 296 | + "qmark", |
| 297 | + 'INSERT INTO "test_tbl" VALUES (?, ?)', |
| 298 | + [(4, "david"), (5, "eve"), (6, "frank")], |
| 299 | + ), |
| 300 | + ], |
| 301 | +) |
| 302 | +async def test_executemany_bulk_insert_paramstyles( |
| 303 | + connection: Connection, |
| 304 | + paramstyle: str, |
| 305 | + query: str, |
| 306 | + test_data: List[Tuple], |
| 307 | + create_drop_test_table_setup_teardown_async: Callable, |
288 | 308 | ) -> None: |
289 | | - """executemany with bulk_insert=True inserts data correctly.""" |
| 309 | + """executemany with bulk_insert=True works correctly for both paramstyles.""" |
| 310 | + # Set the paramstyle for this test |
| 311 | + original_paramstyle = firebolt.async_db.paramstyle |
| 312 | + firebolt.async_db.paramstyle = paramstyle |
| 313 | + # Generate a unique label for this test execution |
| 314 | + unique_label = f"test_bulk_insert_async_{paramstyle}_{randint(100000, 999999)}" |
| 315 | + table_name = "test_tbl" |
| 316 | + |
290 | 317 | try: |
291 | | - firebolt.async_db.paramstyle = "fb_numeric" |
| 318 | + c = connection.cursor() |
292 | 319 |
|
293 | | - async with connection.cursor() as c: |
294 | | - await c.execute('DROP TABLE IF EXISTS "test_bulk_insert_async"') |
295 | | - await c.execute( |
296 | | - 'CREATE FACT TABLE "test_bulk_insert_async"(id int, name string) primary index id' |
297 | | - ) |
| 320 | + # Can't do this for fb_numeric yet - FIR-49970 |
| 321 | + if paramstyle != "fb_numeric": |
| 322 | + await c.execute(f"SET query_label = '{unique_label}'") |
298 | 323 |
|
299 | | - await c.executemany( |
300 | | - 'INSERT INTO "test_bulk_insert_async" VALUES ($1, $2)', |
301 | | - [(1, "alice"), (2, "bob"), (3, "charlie")], |
302 | | - bulk_insert=True, |
303 | | - ) |
| 324 | + # Execute bulk insert |
| 325 | + await c.executemany( |
| 326 | + query, |
| 327 | + test_data, |
| 328 | + bulk_insert=True, |
| 329 | + ) |
304 | 330 |
|
305 | | - await c.execute('SELECT * FROM "test_bulk_insert_async" ORDER BY id') |
306 | | - data = await c.fetchall() |
307 | | - assert len(data) == 3 |
308 | | - assert data[0] == [1, "alice"] |
309 | | - assert data[1] == [2, "bob"] |
310 | | - assert data[2] == [3, "charlie"] |
| 331 | + # Verify the data was inserted correctly |
| 332 | + await c.execute(f'SELECT * FROM "{table_name}" ORDER BY id') |
| 333 | + data = await c.fetchall() |
| 334 | + assert len(data) == len(test_data) |
| 335 | + for i, (expected_id, expected_name) in enumerate(test_data): |
| 336 | + assert data[i] == [expected_id, expected_name] |
| 337 | + |
| 338 | + # Verify that only one INSERT query was executed with our unique label |
| 339 | + # Can't do this for fb_numeric yet - FIR-49970 |
| 340 | + if paramstyle != "fb_numeric": |
| 341 | + # Wait a moment to ensure query history is updated |
| 342 | + await trio.sleep(10) |
| 343 | + await c.execute( |
| 344 | + "SELECT COUNT(*) FROM information_schema.engine_query_history " |
| 345 | + f"WHERE query_label = '{unique_label}' AND query_text LIKE 'INSERT INTO%'" |
| 346 | + " AND status = 'ENDED_SUCCESSFULLY'" |
| 347 | + ) |
| 348 | + query_count = (await c.fetchone())[0] |
| 349 | + assert ( |
| 350 | + query_count == 1 |
| 351 | + ), f"Expected 1 INSERT query with label '{unique_label}', but found {query_count}" |
311 | 352 | finally: |
312 | | - async with connection.cursor() as c: |
313 | | - await c.execute('DROP TABLE IF EXISTS "test_bulk_insert_async"') |
| 353 | + firebolt.async_db.paramstyle = original_paramstyle |
314 | 354 |
|
315 | 355 |
|
316 | 356 | async def test_multi_statement_query(connection: Connection) -> None: |
|
0 commit comments