Skip to content

Commit 7f67cfc

Browse files
multi frame decompression of lz4
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent b5578ca commit 7f67cfc

File tree

1 file changed

+21
-1
lines changed

1 file changed

+21
-1
lines changed

src/databricks/sql/backend/sea/queue.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,25 @@
3636
logger = logging.getLogger(__name__)
3737

3838

39+
def decompress_multi_frame_lz4(attachment: bytes) -> bytes:
40+
try:
41+
decompressor = lz4.frame.LZ4FrameDecompressor()
42+
arrow_file = decompressor.decompress(attachment)
43+
44+
# the attachment may be a concatenation of multiple LZ4 frames
45+
while decompressor.unused_data:
46+
remaining_data = decompressor.unused_data
47+
arrow_file += decompressor.decompress(remaining_data)
48+
49+
logger.debug(f"LZ4 decompressed {len(arrow_file)} bytes from attachment")
50+
51+
except Exception as e:
52+
logger.error(f"LZ4 decompression failed: {e}")
53+
raise e
54+
55+
return arrow_file
56+
57+
3958
class SeaResultSetQueueFactory(ABC):
4059
@staticmethod
4160
def build_queue(
@@ -70,13 +89,14 @@ def build_queue(
7089
elif manifest.format == ResultFormat.ARROW_STREAM.value:
7190
if result_data.attachment is not None:
7291
arrow_file = (
73-
lz4.frame.decompress(result_data.attachment)
92+
decompress_multi_frame_lz4(result_data.attachment)
7493
if lz4_compressed
7594
else result_data.attachment
7695
)
7796
arrow_table = create_arrow_table_from_arrow_file(
7897
arrow_file, description
7998
)
99+
logger.debug(f"Created arrow table with {arrow_table.num_rows} rows")
80100
return ArrowQueue(arrow_table, manifest.total_row_count)
81101

82102
# EXTERNAL_LINKS disposition

0 commit comments

Comments
 (0)