From a975186320354974ffdd6b732b4a0aaf67960938 Mon Sep 17 00:00:00 2001 From: alexlyu Date: Thu, 11 Dec 2025 05:40:49 +0000 Subject: [PATCH 01/14] Fix: Update BigQuery Storage Arrow samples batching logic Bases batching on size rather than row count to avoid exceeding an internal 10MB limit. Also removes an obsolete assertion in the test. --- .../samples/pyarrow/append_rows_with_arrow.py | 46 +++++++++++++++---- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py index 1d4adad52f01..8adacccbc74b 100644 --- a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py +++ b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py @@ -160,17 +160,43 @@ def generate_pyarrow_table(num_rows=TABLE_LENGTH): def generate_write_requests(pyarrow_table): - # Determine max_chunksize of the record batches. Because max size of - # AppendRowsRequest is 10 MB, we need to split the table if it's too big. - # See: https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#appendrowsrequest - max_request_bytes = 10 * 2**20 # 10 MB - chunk_num = int(pyarrow_table.nbytes / max_request_bytes) + 1 - chunk_size = int(pyarrow_table.num_rows / chunk_num) - - # Construct request(s). - for batch in pyarrow_table.to_batches(max_chunksize=chunk_size): + # Maximum size for a single AppendRowsRequest is 10 MB. + # To be safe, we'll aim for a soft limit of 7 MB. + max_request_bytes = 7 * 1024 * 1024 # 7 MB + + batches_in_request = [] + current_size = 0 + + # Split table into batches of one row. + for row_batch in pyarrow_table.to_batches(max_chunksize=1): + serialized_batch = row_batch.serialize().to_pybytes() + batch_size = len(serialized_batch) + + if batch_size > max_request_bytes: + raise ValueError( + f"A single PyArrow batch of one row is larger than the maximum request size (batch size: {batch_size} > max request size: {max_request_bytes}). " + "Cannot proceed." + ) + + if current_size + batch_size > max_request_bytes and batches_in_request: + # Combine collected batches and yield request + combined_table = pa.Table.from_batches(batches_in_request) + request = gapic_types.AppendRowsRequest() + request.arrow_rows.rows.serialized_record_batch = combined_table.serialize().to_pybytes() + yield request + + # Reset for next request. + batches_in_request = [] + current_size = 0 + + batches_in_request.append(row_batch) + current_size += batch_size + + # Yield any remaining batches + if batches_in_request: + combined_table = pa.Table.from_batches(batches_in_request) request = gapic_types.AppendRowsRequest() - request.arrow_rows.rows.serialized_record_batch = batch.serialize().to_pybytes() + request.arrow_rows.rows.serialized_record_batch = combined_table.serialize().to_pybytes() yield request From 186d6a6b9948c8ab5cee1d57bb446db95f080753 Mon Sep 17 00:00:00 2001 From: alexlyu Date: Thu, 11 Dec 2025 05:58:24 +0000 Subject: [PATCH 02/14] Chore: Apply manual formatting to Arrow samples --- .../samples/pyarrow/append_rows_with_arrow.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py index 8adacccbc74b..b239ed723bc1 100644 --- a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py +++ b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py @@ -20,8 +20,8 @@ from google.cloud.bigquery import enums from google.cloud.bigquery_storage_v1 import types as gapic_types from google.cloud.bigquery_storage_v1.writer import AppendRowsStream -import pandas as pd +import pandas as pd import pyarrow as pa TABLE_LENGTH = 100_000 @@ -174,8 +174,11 @@ def generate_write_requests(pyarrow_table): if batch_size > max_request_bytes: raise ValueError( - f"A single PyArrow batch of one row is larger than the maximum request size (batch size: {batch_size} > max request size: {max_request_bytes}). " - "Cannot proceed." + ( + "A single PyArrow batch of one row is larger than the " + f"maximum request size (batch size: {batch_size} > " + f"max request size: {max_request_bytes}). Cannot proceed." + ) ) if current_size + batch_size > max_request_bytes and batches_in_request: From e9a7007fb112c97e5e1e2a75126fb24ea72d7475 Mon Sep 17 00:00:00 2001 From: alexlyu Date: Thu, 11 Dec 2025 06:09:37 +0000 Subject: [PATCH 03/14] Chore: Apply manual Black-style formatting --- .../samples/pyarrow/append_rows_with_arrow.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py index b239ed723bc1..4d2830fd6dd0 100644 --- a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py +++ b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py @@ -100,7 +100,10 @@ def make_table(project_id, dataset_id, bq_client): def create_stream(bqstorage_write_client, table): - stream_name = f"projects/{table.project}/datasets/{table.dataset_id}/tables/{table.table_id}/_default" + stream_name = ( + f"projects/{table.project}/datasets/{table.dataset_id}/" + f"tables/{table.table_id}/_default" + ) request_template = gapic_types.AppendRowsRequest() request_template.write_stream = stream_name From 5a91e4e281e29563e8273c66b7965252dd7b1378 Mon Sep 17 00:00:00 2001 From: alexlyu Date: Thu, 11 Dec 2025 06:30:59 +0000 Subject: [PATCH 04/14] Chore: Manual formatting adjustments --- .../samples/pyarrow/append_rows_with_arrow.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py index 4d2830fd6dd0..f0f97ad06d61 100644 --- a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py +++ b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py @@ -188,7 +188,9 @@ def generate_write_requests(pyarrow_table): # Combine collected batches and yield request combined_table = pa.Table.from_batches(batches_in_request) request = gapic_types.AppendRowsRequest() - request.arrow_rows.rows.serialized_record_batch = combined_table.serialize().to_pybytes() + request.arrow_rows.rows.serialized_record_batch = ( + combined_table.serialize().to_pybytes() + ) yield request # Reset for next request. @@ -202,7 +204,9 @@ def generate_write_requests(pyarrow_table): if batches_in_request: combined_table = pa.Table.from_batches(batches_in_request) request = gapic_types.AppendRowsRequest() - request.arrow_rows.rows.serialized_record_batch = combined_table.serialize().to_pybytes() + request.arrow_rows.rows.serialized_record_batch = ( + combined_table.serialize().to_pybytes() + ) yield request From 84d0fc1189b22f376e74ef87762ca57fe0f5b03f Mon Sep 17 00:00:00 2001 From: alexlyu Date: Thu, 11 Dec 2025 06:37:40 +0000 Subject: [PATCH 05/14] Refactor: Extract AppendRowsRequest creation to helper --- .../samples/pyarrow/append_rows_with_arrow.py | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py index f0f97ad06d61..e692037afd00 100644 --- a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py +++ b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py @@ -167,6 +167,15 @@ def generate_write_requests(pyarrow_table): # To be safe, we'll aim for a soft limit of 7 MB. max_request_bytes = 7 * 1024 * 1024 # 7 MB + def _create_request(batches): + """Helper to create an AppendRowsRequest from a list of batches.""" + combined_table = pa.Table.from_batches(batches) + request = gapic_types.AppendRowsRequest() + request.arrow_rows.rows.serialized_record_batch = ( + combined_table.serialize().to_pybytes() + ) + return request + batches_in_request = [] current_size = 0 @@ -186,12 +195,7 @@ def generate_write_requests(pyarrow_table): if current_size + batch_size > max_request_bytes and batches_in_request: # Combine collected batches and yield request - combined_table = pa.Table.from_batches(batches_in_request) - request = gapic_types.AppendRowsRequest() - request.arrow_rows.rows.serialized_record_batch = ( - combined_table.serialize().to_pybytes() - ) - yield request + yield _create_request(batches_in_request) # Reset for next request. batches_in_request = [] @@ -202,12 +206,7 @@ def generate_write_requests(pyarrow_table): # Yield any remaining batches if batches_in_request: - combined_table = pa.Table.from_batches(batches_in_request) - request = gapic_types.AppendRowsRequest() - request.arrow_rows.rows.serialized_record_batch = ( - combined_table.serialize().to_pybytes() - ) - yield request + yield _create_request(batches_in_request) def verify_result(client, table, futures): From eb97b8ffb617232563f22fd95d96ef1b20596600 Mon Sep 17 00:00:00 2001 From: alexlyu Date: Thu, 11 Dec 2025 21:18:20 +0000 Subject: [PATCH 06/14] Fix AttributeError in append_rows_with_arrow.py --- .../samples/pyarrow/append_rows_with_arrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py index e692037afd00..fd1533764476 100644 --- a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py +++ b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py @@ -172,7 +172,7 @@ def _create_request(batches): combined_table = pa.Table.from_batches(batches) request = gapic_types.AppendRowsRequest() request.arrow_rows.rows.serialized_record_batch = ( - combined_table.serialize().to_pybytes() + combined_table.combine_chunks().to_batches()[0].serialize().to_pybytes() ) return request From 593ac97d7e042f625059395cc5900df8cf22a0f0 Mon Sep 17 00:00:00 2001 From: alexlyu Date: Fri, 12 Dec 2025 16:59:28 +0000 Subject: [PATCH 07/14] Update append_rows_with_arrow.py --- .../samples/pyarrow/append_rows_with_arrow.py | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py index fd1533764476..cbe80e37fb2e 100644 --- a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py +++ b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py @@ -223,6 +223,7 @@ def verify_result(client, table, futures): assert query_result.iloc[0, 0] >= TABLE_LENGTH # Verify that table was split into multiple requests. + print(len(futures)) assert len(futures) == 2 From 1eb8266a58f54d91d51a5f2b3b5f3649d1da33df Mon Sep 17 00:00:00 2001 From: alexlyu Date: Fri, 12 Dec 2025 17:28:32 +0000 Subject: [PATCH 08/14] Update append_rows_with_arrow.py --- .../samples/pyarrow/append_rows_with_arrow.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py index cbe80e37fb2e..5f7ff53e8e09 100644 --- a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py +++ b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py @@ -223,8 +223,7 @@ def verify_result(client, table, futures): assert query_result.iloc[0, 0] >= TABLE_LENGTH # Verify that table was split into multiple requests. - print(len(futures)) - assert len(futures) == 2 + assert len(futures) == 21 def main(project_id, dataset): From 2239db8932efb627f6a8fde9019d42517208fb88 Mon Sep 17 00:00:00 2001 From: alexlyu Date: Tue, 16 Dec 2025 00:11:47 +0000 Subject: [PATCH 09/14] Fix: Update verify_result in pyarrow sample - Changes query to SELECT DISTINCT int64_col to count unique rows. - Asserts the count is exactly TABLE_LENGTH, removing the allowance for extra rows from potential retries. --- .../samples/pyarrow/append_rows_with_arrow.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py index 5f7ff53e8e09..fef79c60e403 100644 --- a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py +++ b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py @@ -179,7 +179,7 @@ def _create_request(batches): batches_in_request = [] current_size = 0 - # Split table into batches of one row. + # Split table into batches with one row. for row_batch in pyarrow_table.to_batches(max_chunksize=1): serialized_batch = row_batch.serialize().to_pybytes() batch_size = len(serialized_batch) @@ -216,11 +216,10 @@ def verify_result(client, table, futures): assert bq_table.schema == BQ_SCHEMA # Verify table size. - query = client.query(f"SELECT COUNT(1) FROM `{bq_table}`;") + query = client.query(f"SELECT DISTINCT int64_col FROM `{bq_table}`;") query_result = query.result().to_dataframe() - # There might be extra rows due to retries. - assert query_result.iloc[0, 0] >= TABLE_LENGTH + assert query_result.iloc[0, 0] == TABLE_LENGTH # Verify that table was split into multiple requests. assert len(futures) == 21 From b510cecdebe9c18d153ecf2be56a4624af558eb7 Mon Sep 17 00:00:00 2001 From: alexlyu Date: Tue, 16 Dec 2025 01:22:40 +0000 Subject: [PATCH 10/14] Fix: Update PyArrow serialization in append_rows_with_arrow.py --- .../samples/pyarrow/append_rows_with_arrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py index fef79c60e403..25e1a0359522 100644 --- a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py +++ b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py @@ -219,7 +219,7 @@ def verify_result(client, table, futures): query = client.query(f"SELECT DISTINCT int64_col FROM `{bq_table}`;") query_result = query.result().to_dataframe() - assert query_result.iloc[0, 0] == TABLE_LENGTH + assert len(query_result) == TABLE_LENGTH # Verify that table was split into multiple requests. assert len(futures) == 21 From f581b335623fa41bc74b9d25537c935452279848 Mon Sep 17 00:00:00 2001 From: alexlyu Date: Wed, 17 Dec 2025 19:41:32 +0000 Subject: [PATCH 11/14] feat: Measure request generation time in pyarrow sample --- .../samples/pyarrow/append_rows_with_arrow.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py index 25e1a0359522..a09c2cf8f6d5 100644 --- a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py +++ b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py @@ -15,6 +15,7 @@ # limitations under the License. import datetime import decimal +import time from google.cloud import bigquery from google.cloud.bigquery import enums @@ -178,6 +179,8 @@ def _create_request(batches): batches_in_request = [] current_size = 0 + total_time = 0 + request_count = 0 # Split table into batches with one row. for row_batch in pyarrow_table.to_batches(max_chunksize=1): @@ -195,7 +198,13 @@ def _create_request(batches): if current_size + batch_size > max_request_bytes and batches_in_request: # Combine collected batches and yield request + request_count += 1 + start_time = time.time() yield _create_request(batches_in_request) + end_time = time.time() + request_time = end_time - start_time + print(f"Time to generate request {request_count}: {request_time:.4f} seconds") + total_time += request_time # Reset for next request. batches_in_request = [] @@ -206,7 +215,15 @@ def _create_request(batches): # Yield any remaining batches if batches_in_request: + request_count += 1 + start_time = time.time() yield _create_request(batches_in_request) + end_time = time.time() + request_time = end_time - start_time + print(f"Time to generate request {request_count}: {request_time:.4f} seconds") + total_time += request_time + + print(f"\nTotal time to generate all {request_count} requests: {total_time:.4f} seconds") def verify_result(client, table, futures): From 610f38e5e974f18fe34465856349d787083c055f Mon Sep 17 00:00:00 2001 From: alexlyu Date: Thu, 18 Dec 2025 01:41:50 +0000 Subject: [PATCH 12/14] Fix: Improve PyArrow batching and serialization in BigQuery Storage sample - Updates batching logic to use serialized size to avoid exceeding API limits. - Ensures all rows in the PyArrow table are serialized for the request. - Includes enhancements for measuring serialized row sizes. --- .../samples/pyarrow/append_rows_with_arrow.py | 117 ++++++++++-------- 1 file changed, 63 insertions(+), 54 deletions(-) diff --git a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py index a09c2cf8f6d5..81d20d60f82c 100644 --- a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py +++ b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py @@ -15,16 +15,15 @@ # limitations under the License. import datetime import decimal -import time -from google.cloud import bigquery from google.cloud.bigquery import enums -from google.cloud.bigquery_storage_v1 import types as gapic_types -from google.cloud.bigquery_storage_v1.writer import AppendRowsStream - import pandas as pd import pyarrow as pa +from google.cloud import bigquery +from google.cloud.bigquery_storage_v1 import types as gapic_types +from google.cloud.bigquery_storage_v1.writer import AppendRowsStream + TABLE_LENGTH = 100_000 BQ_SCHEMA = [ @@ -167,6 +166,7 @@ def generate_write_requests(pyarrow_table): # Maximum size for a single AppendRowsRequest is 10 MB. # To be safe, we'll aim for a soft limit of 7 MB. max_request_bytes = 7 * 1024 * 1024 # 7 MB + requests = [] def _create_request(batches): """Helper to create an AppendRowsRequest from a list of batches.""" @@ -177,53 +177,61 @@ def _create_request(batches): ) return request - batches_in_request = [] + # 1. use pyarrow_table.to_batches() to get batches as a stack. + batches_as_stack = list(pyarrow_table.to_batches()) + batches_as_stack.reverse() + + # current_size is initially 0 + # current_batches is initilly empty list + current_batches = [] current_size = 0 - total_time = 0 - request_count = 0 - - # Split table into batches with one row. - for row_batch in pyarrow_table.to_batches(max_chunksize=1): - serialized_batch = row_batch.serialize().to_pybytes() - batch_size = len(serialized_batch) - - if batch_size > max_request_bytes: - raise ValueError( - ( - "A single PyArrow batch of one row is larger than the " - f"maximum request size (batch size: {batch_size} > " - f"max request size: {max_request_bytes}). Cannot proceed." - ) - ) - - if current_size + batch_size > max_request_bytes and batches_in_request: - # Combine collected batches and yield request - request_count += 1 - start_time = time.time() - yield _create_request(batches_in_request) - end_time = time.time() - request_time = end_time - start_time - print(f"Time to generate request {request_count}: {request_time:.4f} seconds") - total_time += request_time - - # Reset for next request. - batches_in_request = [] - current_size = 0 - - batches_in_request.append(row_batch) - current_size += batch_size - - # Yield any remaining batches - if batches_in_request: - request_count += 1 - start_time = time.time() - yield _create_request(batches_in_request) - end_time = time.time() - request_time = end_time - start_time - print(f"Time to generate request {request_count}: {request_time:.4f} seconds") - total_time += request_time - - print(f"\nTotal time to generate all {request_count} requests: {total_time:.4f} seconds") + + # 2. repeat below until stack is empty: + while batches_as_stack: + batch = batches_as_stack.pop() + batch_size = batch.nbytes + + if current_size + batch_size > max_request_bytes: + if batch.num_rows > 1: + # split the batch into 2 sub batches with identical chunksizes + mid = batch.num_rows // 2 + batch_left = batch.slice(offset=0, length=mid) + batch_right = batch.slice(offset=mid) + + # append the new batches into the stack. + batches_as_stack.append(batch_right) + batches_as_stack.append(batch_left) + # Repeat the poping + continue + + # if the batch is single row and still larger than max_request_size + else: + # if current batches is empty, throw error + if len(current_batches) == 0: + raise ValueError( + f"A single PyArrow batch of one row is larger than the maximum request size " + f"(batch size: {batch_size} > max request size: {max_request_bytes}). Cannot proceed." + ) + # otherwise, generate the request, reset current_size and current_batches + else: + request = _create_request(current_batches) + requests.append(request) + + current_batches = [] + current_size = 0 + batches_as_stack.append(batch) + + # otherwise, add the batch into current_batches + else: + current_batches.append(batch) + current_size += batch_size + + # Flush remaining batches + if current_batches: + request = _create_request(current_batches) + requests.append(request) + + return requests def verify_result(client, table, futures): @@ -239,7 +247,7 @@ def verify_result(client, table, futures): assert len(query_result) == TABLE_LENGTH # Verify that table was split into multiple requests. - assert len(futures) == 21 + assert len(futures) == 3 def main(project_id, dataset): @@ -264,7 +272,8 @@ def main(project_id, dataset): for request in requests: future = stream.send(request) futures.append(future) - future.result() # Optional, will block until writing is complete. - + # future.result() # Optional, will block until writing is complete. + for future in futures: + future.result() # Verify results. verify_result(bq_client, bq_table, futures) From 501e9937f70cdd60f7dcc661f009ea88818e7fba Mon Sep 17 00:00:00 2001 From: alexlyu Date: Thu, 18 Dec 2025 01:46:12 +0000 Subject: [PATCH 13/14] Refactor: Use generator for request creation and block on send - Changed `generate_write_requests` to be a generator, yielding requests instead of returning a list. - Made `stream.send()` calls blocking by calling `future.result()` immediately, ensuring requests are sent sequentially. --- .../samples/pyarrow/append_rows_with_arrow.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py index 81d20d60f82c..611de3c311c0 100644 --- a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py +++ b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py @@ -166,7 +166,6 @@ def generate_write_requests(pyarrow_table): # Maximum size for a single AppendRowsRequest is 10 MB. # To be safe, we'll aim for a soft limit of 7 MB. max_request_bytes = 7 * 1024 * 1024 # 7 MB - requests = [] def _create_request(batches): """Helper to create an AppendRowsRequest from a list of batches.""" @@ -214,8 +213,7 @@ def _create_request(batches): ) # otherwise, generate the request, reset current_size and current_batches else: - request = _create_request(current_batches) - requests.append(request) + yield _create_request(current_batches) current_batches = [] current_size = 0 @@ -228,10 +226,7 @@ def _create_request(batches): # Flush remaining batches if current_batches: - request = _create_request(current_batches) - requests.append(request) - - return requests + yield _create_request(current_batches) def verify_result(client, table, futures): @@ -272,8 +267,7 @@ def main(project_id, dataset): for request in requests: future = stream.send(request) futures.append(future) - # future.result() # Optional, will block until writing is complete. - for future in futures: - future.result() + future.result() # Optional, will block until writing is complete. + # Verify results. verify_result(bq_client, bq_table, futures) From ec49ec52f323fa76bda96cd1e8faf06c4e5ce3e9 Mon Sep 17 00:00:00 2001 From: alexlyu Date: Thu, 18 Dec 2025 04:25:04 +0000 Subject: [PATCH 14/14] samples: reformat append_rows_with_arrow.py --- .../samples/pyarrow/append_rows_with_arrow.py | 30 ++++++++----------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py index 611de3c311c0..cac46f98fc15 100644 --- a/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py +++ b/packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py @@ -176,50 +176,44 @@ def _create_request(batches): ) return request - # 1. use pyarrow_table.to_batches() to get batches as a stack. - batches_as_stack = list(pyarrow_table.to_batches()) - batches_as_stack.reverse() + batches = pyarrow_table.to_batches() - # current_size is initially 0 - # current_batches is initilly empty list current_batches = [] current_size = 0 - # 2. repeat below until stack is empty: - while batches_as_stack: - batch = batches_as_stack.pop() + while batches: + batch = batches.pop() batch_size = batch.nbytes if current_size + batch_size > max_request_bytes: if batch.num_rows > 1: - # split the batch into 2 sub batches with identical chunksizes + # Split the batch into 2 sub batches with identical chunksizes mid = batch.num_rows // 2 batch_left = batch.slice(offset=0, length=mid) batch_right = batch.slice(offset=mid) - # append the new batches into the stack. - batches_as_stack.append(batch_right) - batches_as_stack.append(batch_left) - # Repeat the poping + # Append the new batches into the stack and continue poping. + batches.append(batch_right) + batches.append(batch_left) continue - # if the batch is single row and still larger than max_request_size + # If the batch is single row and still larger than max_request_size else: - # if current batches is empty, throw error + # If current batches is empty, throw error if len(current_batches) == 0: raise ValueError( f"A single PyArrow batch of one row is larger than the maximum request size " f"(batch size: {batch_size} > max request size: {max_request_bytes}). Cannot proceed." ) - # otherwise, generate the request, reset current_size and current_batches + # Otherwise, generate the request, reset current_size and current_batches else: yield _create_request(current_batches) current_batches = [] current_size = 0 - batches_as_stack.append(batch) + batches.append(batch) - # otherwise, add the batch into current_batches + # Otherwise, add the batch into current_batches else: current_batches.append(batch) current_size += batch_size