Skip to content

Commit cec90b2

Browse files
committed
More refractor
1 parent eb14f95 commit cec90b2

File tree

8 files changed

+142
-175
lines changed

8 files changed

+142
-175
lines changed

src/databricks/sql/client.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import requests
1010
import json
1111
import os
12-
import decimal
1312
from uuid import UUID
1413

1514
from databricks.sql import __version__
@@ -31,8 +30,6 @@
3130
transform_paramstyle,
3231
ColumnTable,
3332
ColumnQueue,
34-
concat_chunked_tables,
35-
merge_columnar,
3633
)
3734
from databricks.sql.parameters.native import (
3835
DbsqlParameterBase,
@@ -1402,7 +1399,7 @@ def _convert_columnar_table(self, table: ColumnTable):
14021399
result.append(ResultRow(*curr_row))
14031400

14041401
return result
1405-
1402+
14061403
def _convert_arrow_table(self, table: "pyarrow.Table"):
14071404

14081405
column_names = [c[0] for c in self.description]
@@ -1411,7 +1408,7 @@ def _convert_arrow_table(self, table: "pyarrow.Table"):
14111408
if self.connection.disable_pandas is True:
14121409
columns_as_lists = [col.to_pylist() for col in table.itercolumns()]
14131410
return [ResultRow(*row) for row in zip(*columns_as_lists)]
1414-
1411+
14151412
# Need to use nullable types, as otherwise type can change when there are missing values.
14161413
# See https://arrow.apache.org/docs/python/pandas.html#nullable-types
14171414
# NOTE: This api is epxerimental https://pandas.pydata.org/pandas-docs/stable/user_guide/integer_na.html
@@ -1445,7 +1442,7 @@ def _convert_arrow_table(self, table: "pyarrow.Table"):
14451442
@property
14461443
def rownumber(self):
14471444
return self._next_row_index
1448-
1445+
14491446
def fetchmany_arrow(self, size: int) -> "pyarrow.Table":
14501447
"""
14511448
Fetch the next set of rows of a query result, returning a PyArrow table.
@@ -1494,18 +1491,18 @@ def fetchmany_columnar(self, size: int):
14941491
self._next_row_index += partial_results.num_rows
14951492

14961493
return results
1497-
1494+
14981495
def fetchall_arrow(self) -> "pyarrow.Table":
14991496
"""Fetch all (remaining) rows of a query result, returning them as a PyArrow table."""
15001497
results = self.results.remaining_rows()
15011498
self._next_row_index += results.num_rows
1502-
1499+
15031500
while not self.has_been_closed_server_side and self.has_more_rows:
15041501
self._fill_results_buffer()
15051502
partial_results = self.results.remaining_rows()
15061503
results.append(partial_results)
15071504
self._next_row_index += partial_results.num_rows
1508-
1505+
15091506
return results.to_arrow_table()
15101507

15111508
def fetchall_columnar(self):
@@ -1516,7 +1513,7 @@ def fetchall_columnar(self):
15161513
while not self.has_been_closed_server_side and self.has_more_rows:
15171514
self._fill_results_buffer()
15181515
partial_results = self.results.remaining_rows()
1519-
results = merge_columnar(results, partial_results)
1516+
results.append(partial_results)
15201517
self._next_row_index += partial_results.num_rows
15211518

15221519
return results

src/databricks/sql/cloudfetch/download_manager.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,7 @@ def _schedule_downloads(self):
8484
"""
8585
While download queue has a capacity, peek pending links and submit them to thread pool.
8686
"""
87-
# print("Schedule_downloads")
88-
# logger.debug("ResultFileDownloadManager: schedule downloads")
87+
logger.debug("ResultFileDownloadManager: schedule downloads")
8988
while (len(self._download_tasks) < self._max_download_threads) and (
9089
len(self._pending_links) > 0
9190
):

src/databricks/sql/cloudfetch/downloader.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,17 +91,17 @@ def run(self) -> DownloadedFile:
9191
ResultSetDownloadHandler._validate_link(
9292
self.link, self.settings.link_expiry_buffer_secs
9393
)
94-
94+
9595
with self._http_client.execute(
9696
method=HttpMethod.GET,
9797
url=self.link.fileLink,
9898
timeout=self.settings.download_timeout,
9999
verify=self._ssl_options.tls_verify,
100100
headers=self.link.httpHeaders
101+
# TODO: Pass cert from `self._ssl_options`
101102
) as response:
102-
103103
response.raise_for_status()
104-
104+
105105
# Save (and decompress if needed) the downloaded file
106106
compressed_data = response.content
107107
decompressed_data = (

src/databricks/sql/common/http.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from typing import Generator
99
import logging
1010
import time
11+
1112
logger = logging.getLogger(__name__)
1213

1314

@@ -70,10 +71,7 @@ def execute(
7071
logger.info("Executing HTTP request: %s with url: %s", method.value, url)
7172
response = None
7273
try:
73-
start_time = time.time()
7474
response = self.session.request(method.value, url, **kwargs)
75-
end_time = time.time()
76-
# print(f"Downloaded file in {end_time - start_time} seconds")
7775
yield response
7876
except Exception as e:
7977
logger.error("Error executing HTTP request in DatabricksHttpClient: %s", e)

src/databricks/sql/thrift_backend.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,6 @@
3636
RequestErrorInfo,
3737
NoRetryReason,
3838
ResultSetQueueFactory,
39-
convert_arrow_based_set_to_arrow_table,
40-
convert_decimals_in_arrow_table,
41-
convert_column_based_set_to_arrow_table,
4239
)
4340
from databricks.sql.types import SSLOptions
4441

@@ -633,23 +630,6 @@ def _poll_for_status(self, op_handle):
633630
)
634631
return self.make_request(self._client.GetOperationStatus, req)
635632

636-
def _create_arrow_table(self, t_row_set, lz4_compressed, schema_bytes, description):
637-
if t_row_set.columns is not None:
638-
(
639-
arrow_table,
640-
num_rows,
641-
) = convert_column_based_set_to_arrow_table(t_row_set.columns, description)
642-
elif t_row_set.arrowBatches is not None:
643-
(arrow_table, num_rows,) = convert_arrow_based_set_to_arrow_table(
644-
t_row_set.arrowBatches, lz4_compressed, schema_bytes
645-
)
646-
else:
647-
raise OperationalError(
648-
"Unsupported TRowSet instance {}".format(t_row_set),
649-
session_id_hex=self._session_id_hex,
650-
)
651-
return convert_decimals_in_arrow_table(arrow_table, description), num_rows
652-
653633
def _get_metadata_resp(self, op_handle):
654634
req = ttypes.TGetResultSetMetadataReq(operationHandle=op_handle)
655635
return self.make_request(self._client.GetResultSetMetadata, req)

0 commit comments

Comments
 (0)