Skip to content

Commit eb14f95

Browse files
committed
nit
1 parent cfc047f commit eb14f95

File tree

3 files changed

+70
-232
lines changed

3 files changed

+70
-232
lines changed

src/databricks/sql/client.py

Lines changed: 9 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1391,7 +1391,7 @@ def _fill_results_buffer(self):
13911391
self.results = results
13921392
self.has_more_rows = has_more_rows
13931393

1394-
def _convert_columnar_table(self, table):
1394+
def _convert_columnar_table(self, table: ColumnTable):
13951395
column_names = [c[0] for c in self.description]
13961396
ResultRow = Row(*column_names)
13971397
result = []
@@ -1402,45 +1402,16 @@ def _convert_columnar_table(self, table):
14021402
result.append(ResultRow(*curr_row))
14031403

14041404
return result
1405-
1406-
def print_mem(self):
1407-
import os
1408-
import psutil
1409-
1410-
process = psutil.Process(os.getpid())
1411-
mem_info = process.memory_info()
1412-
total_mem_mb = mem_info.rss / 1024 / 1024
1413-
cpu_percent = process.cpu_percent(interval=0.1)
1414-
print(f"Total memory usage: {total_mem_mb:.2f} MB")
1415-
print(f"CPU percent: {cpu_percent:.2f}%")
1416-
# total_size_bytes = table.get_total_buffer_size()
1417-
# total_size_mb = total_size_bytes / (1024 * 1024)
1418-
1419-
# print(f"Total PyArrow table size: {total_size_bytes} bytes ({total_size_mb:.2f} MB)")
1420-
1405+
14211406
def _convert_arrow_table(self, table: "pyarrow.Table"):
1422-
import sys
1423-
from pympler import asizeof
1424-
1425-
self.print_mem()
1426-
print(f"Memory size table: {table.nbytes / (1024 ** 2):.2f} MB")
1427-
# Convert to MB for easier reading
1407+
14281408
column_names = [c[0] for c in self.description]
14291409
ResultRow = Row(*column_names)
14301410

14311411
if self.connection.disable_pandas is True:
1432-
start_time = time.time()
14331412
columns_as_lists = [col.to_pylist() for col in table.itercolumns()]
1434-
self.print_mem()
1435-
print(f"Memory size columns_as_lists: {sum(sys.getsizeof(col) for col in columns_as_lists) / (1024 ** 2):.2f} MB")
1436-
res = [ResultRow(*row) for row in zip(*columns_as_lists)]
1437-
self.print_mem()
1438-
end_time = time.time()
1439-
print(f"Time taken to convert arrow table to list: {end_time - start_time} seconds")
1440-
print(f"Memory size res: {sum(sys.getsizeof(row) for row in res) / (1024 ** 2):.2f} MB")
1441-
return res
1413+
return [ResultRow(*row) for row in zip(*columns_as_lists)]
14421414

1443-
start_time = time.time()
14441415
# Need to use nullable types, as otherwise type can change when there are missing values.
14451416
# See https://arrow.apache.org/docs/python/pandas.html#nullable-types
14461417
# NOTE: This api is epxerimental https://pandas.pydata.org/pandas-docs/stable/user_guide/integer_na.html
@@ -1461,31 +1432,20 @@ def _convert_arrow_table(self, table: "pyarrow.Table"):
14611432

14621433
# Need to rename columns, as the to_pandas function cannot handle duplicate column names
14631434
table_renamed = table.rename_columns([str(c) for c in range(table.num_columns)])
1464-
print(f"Memory size table_renamed: {table_renamed.nbytes / (1024 ** 2):.2f} MB")
14651435
df = table_renamed.to_pandas(
14661436
types_mapper=dtype_mapping.get,
14671437
date_as_object=True,
14681438
timestamp_as_object=True,
14691439
self_destruct=True,
14701440
)
1471-
print(f"Memory size df: {df.memory_usage(deep=True).sum() / (1024 ** 2):.2f} MB")
1472-
self.print_mem()
1473-
# del table_renamed
14741441

14751442
res = df.to_numpy(na_value=None, dtype="object")
1476-
print(f"Memory size res: {res.nbytes / (1024 ** 2):.2f} MB")
1477-
self.print_mem()
1478-
# del df
1479-
tmp_res = [ResultRow(*v) for v in res]
1480-
self.print_mem()
1481-
end_time = time.time()
1482-
print(f"Time taken to convert arrow table to list: {end_time - start_time} seconds")
1483-
return tmp_res
1443+
return [ResultRow(*v) for v in res]
14841444

14851445
@property
14861446
def rownumber(self):
14871447
return self._next_row_index
1488-
1448+
14891449
def fetchmany_arrow(self, size: int) -> "pyarrow.Table":
14901450
"""
14911451
Fetch the next set of rows of a query result, returning a PyArrow table.
@@ -1497,26 +1457,18 @@ def fetchmany_arrow(self, size: int) -> "pyarrow.Table":
14971457
results = self.results.next_n_rows(size)
14981458
n_remaining_rows = size - results.num_rows
14991459
self._next_row_index += results.num_rows
1500-
# partial_result_chunks = [results]
1501-
1502-
TOTAL_SIZE = results.num_rows
15031460
while (
15041461
n_remaining_rows > 0
15051462
and not self.has_been_closed_server_side
15061463
and self.has_more_rows
15071464
):
1508-
# print(f"TOTAL DATA ROWS {TOTAL_SIZE}")
15091465
self._fill_results_buffer()
15101466
partial_results = self.results.next_n_rows(n_remaining_rows)
15111467
results.append(partial_results)
1512-
# partial_result_chunks.append(partial_results)
15131468
n_remaining_rows -= partial_results.num_rows
15141469
self._next_row_index += partial_results.num_rows
1515-
TOTAL_SIZE += partial_results.num_rows
15161470

15171471
return results.to_arrow_table()
1518-
1519-
15201472

15211473
def fetchmany_columnar(self, size: int):
15221474
"""
@@ -1537,39 +1489,23 @@ def fetchmany_columnar(self, size: int):
15371489
):
15381490
self._fill_results_buffer()
15391491
partial_results = self.results.next_n_rows(n_remaining_rows)
1540-
results = merge_columnar(results, partial_results)
1492+
results.append(partial_results)
15411493
n_remaining_rows -= partial_results.num_rows
15421494
self._next_row_index += partial_results.num_rows
15431495

15441496
return results
1545-
1497+
15461498
def fetchall_arrow(self) -> "pyarrow.Table":
15471499
"""Fetch all (remaining) rows of a query result, returning them as a PyArrow table."""
15481500
results = self.results.remaining_rows()
15491501
self._next_row_index += results.num_rows
15501502

1551-
# partial_result_chunks = [results]
1552-
# print("Server side has more rows", self.has_more_rows)
1553-
TOTAL_SIZE = results.num_rows
1554-
15551503
while not self.has_been_closed_server_side and self.has_more_rows:
1556-
# print(f"TOTAL DATA ROWS {TOTAL_SIZE}")
15571504
self._fill_results_buffer()
15581505
partial_results = self.results.remaining_rows()
15591506
results.append(partial_results)
15601507
self._next_row_index += partial_results.num_rows
1561-
TOTAL_SIZE += partial_results.num_rows
1562-
1563-
# results = concat_chunked_tables(partial_result_chunks)
1564-
1565-
# If PyArrow is installed and we have a ColumnTable result, convert it to PyArrow Table
1566-
# Valid only for metadata commands result set
1567-
# if isinstance(results, ColumnTable) and pyarrow:
1568-
# data = {
1569-
# name: col
1570-
# for name, col in zip(results.column_names, results.column_table)
1571-
# }
1572-
# return pyarrow.Table.from_pydict(data)
1508+
15731509
return results.to_arrow_table()
15741510

15751511
def fetchall_columnar(self):

src/databricks/sql/cloudfetch/downloader.py

Lines changed: 0 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,7 @@ def run(self) -> DownloadedFile:
9999
verify=self._ssl_options.tls_verify,
100100
headers=self.link.httpHeaders
101101
) as response:
102-
# print_text = [
103102

104-
# ]
105-
106103
response.raise_for_status()
107104

108105
# Save (and decompress if needed) the downloaded file
@@ -127,75 +124,11 @@ def run(self) -> DownloadedFile:
127124
)
128125
)
129126

130-
# print_text.append(
131-
# f"Downloaded file startRowOffset - {self.link.startRowOffset} - rowCount - {self.link.rowCount}"
132-
# )
133-
134-
# for text in print_text:
135-
# print(text)
136-
137127
return DownloadedFile(
138128
decompressed_data,
139129
self.link.startRowOffset,
140130
self.link.rowCount,
141131
)
142-
# session = requests.Session()
143-
# session.mount("http://", HTTPAdapter(max_retries=retryPolicy))
144-
# session.mount("https://", HTTPAdapter(max_retries=retryPolicy))
145-
146-
# try:
147-
# print_text = [
148-
149-
# ]
150-
# start_time = time.time()
151-
# # Get the file via HTTP request
152-
# response = session.get(
153-
# self.link.fileLink,
154-
# timeout=self.settings.download_timeout,
155-
# verify=self._ssl_options.tls_verify,
156-
# headers=self.link.httpHeaders
157-
# # TODO: Pass cert from `self._ssl_options`
158-
# )
159-
# response.raise_for_status()
160-
# end_time = time.time()
161-
# print_text.append(f"Downloaded file in {end_time - start_time} seconds")
162-
# # Save (and decompress if needed) the downloaded file
163-
# compressed_data = response.content
164-
# decompressed_data = (
165-
# ResultSetDownloadHandler._decompress_data(compressed_data)
166-
# if self.settings.is_lz4_compressed
167-
# else compressed_data
168-
# )
169-
170-
# # The size of the downloaded file should match the size specified from TSparkArrowResultLink
171-
# if len(decompressed_data) != self.link.bytesNum:
172-
# logger.debug(
173-
# "ResultSetDownloadHandler: downloaded file size {} does not match the expected value {}".format(
174-
# len(decompressed_data), self.link.bytesNum
175-
# )
176-
# )
177-
178-
# logger.debug(
179-
# "ResultSetDownloadHandler: successfully downloaded file, offset {}, row count {}".format(
180-
# self.link.startRowOffset, self.link.rowCount
181-
# )
182-
# )
183-
184-
# print_text.append(
185-
# f"Downloaded file startRowOffset - {self.link.startRowOffset} - rowCount - {self.link.rowCount}"
186-
# )
187-
188-
# for text in print_text:
189-
# print(text)
190-
191-
# return DownloadedFile(
192-
# decompressed_data,
193-
# self.link.startRowOffset,
194-
# self.link.rowCount,
195-
# )
196-
# finally:
197-
# if session:
198-
# session.close()
199132

200133
@staticmethod
201134
def _validate_link(link: TSparkArrowResultLink, expiry_buffer_secs: int):

0 commit comments

Comments
 (0)