From 4ac5c55a6009403b5c08692048f056d16dedb072 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 2 Oct 2025 15:20:57 +0200 Subject: [PATCH 01/12] Move parquet serialization file --- lonboard/_serialization/__init__.py | 0 lonboard/{_serialization.py => _serialization/parquet.py} | 0 2 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 lonboard/_serialization/__init__.py rename lonboard/{_serialization.py => _serialization/parquet.py} (100%) diff --git a/lonboard/_serialization/__init__.py b/lonboard/_serialization/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lonboard/_serialization.py b/lonboard/_serialization/parquet.py similarity index 100% rename from lonboard/_serialization.py rename to lonboard/_serialization/parquet.py From e1b447c992cdc090e30f56bcdada945be6acec49 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 2 Oct 2025 15:22:34 +0200 Subject: [PATCH 02/12] Separate serialize_view_state --- lonboard/_serialization/parquet.py | 11 +---------- lonboard/_serialization/view_state.py | 14 ++++++++++++++ lonboard/traits.py | 7 ++----- 3 files changed, 17 insertions(+), 15 deletions(-) create mode 100644 lonboard/_serialization/view_state.py diff --git a/lonboard/_serialization/parquet.py b/lonboard/_serialization/parquet.py index 3ed0d38f..6c43e5d5 100644 --- a/lonboard/_serialization/parquet.py +++ b/lonboard/_serialization/parquet.py @@ -1,8 +1,7 @@ from __future__ import annotations -import math from io import BytesIO -from typing import TYPE_CHECKING, Any, overload +from typing import TYPE_CHECKING, overload import arro3.compute as ac from arro3.core import ( @@ -23,7 +22,6 @@ if TYPE_CHECKING: from lonboard._layer import BaseArrowLayer from lonboard.experimental._layer import TripsLayer - from lonboard.models import ViewState DEFAULT_PARQUET_COMPRESSION = "ZSTD" @@ -156,13 +154,6 @@ def validate_accessor_length_matches_table( raise TraitError("accessor must have same length as table") -def serialize_view_state(data: ViewState | None, obj: Any) -> None | dict[str, Any]: # noqa: ARG001 - if data is None: - return None - - return data._asdict() - - def serialize_timestamp_accessor( timestamps: ChunkedArray, obj: TripsLayer, diff --git a/lonboard/_serialization/view_state.py b/lonboard/_serialization/view_state.py new file mode 100644 index 00000000..7e1193b0 --- /dev/null +++ b/lonboard/_serialization/view_state.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from lonboard.models import ViewState + + +def serialize_view_state(data: ViewState | None, obj: Any) -> None | dict[str, Any]: # noqa: ARG001 + """Serialize ViewState for the frontend.""" + if data is None: + return None + + return data._asdict() diff --git a/lonboard/traits.py b/lonboard/traits.py index a0a61862..8d6303af 100644 --- a/lonboard/traits.py +++ b/lonboard/traits.py @@ -31,11 +31,8 @@ from lonboard._environment import DEFAULT_HEIGHT from lonboard._geoarrow.box_to_polygon import parse_box_encoded_table from lonboard._geoarrow.ops.coord_layout import convert_struct_column_to_interleaved -from lonboard._serialization import ( - ACCESSOR_SERIALIZATION, - TABLE_SERIALIZATION, - serialize_view_state, -) +from lonboard._serialization import ACCESSOR_SERIALIZATION, TABLE_SERIALIZATION +from lonboard._serialization.view_state import serialize_view_state from lonboard._utils import get_geometry_column_index from lonboard._vendor.matplotlib.colors import _to_rgba_no_colorcycle from lonboard.models import ViewState From 650b135b84f955bc2b828239589470e6629c72f1 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 2 Oct 2025 15:25:14 +0200 Subject: [PATCH 03/12] Move to table parquet serialization --- lonboard/_serialization/{ => table}/parquet.py | 1 + 1 file changed, 1 insertion(+) rename lonboard/_serialization/{ => table}/parquet.py (99%) diff --git a/lonboard/_serialization/parquet.py b/lonboard/_serialization/table/parquet.py similarity index 99% rename from lonboard/_serialization/parquet.py rename to lonboard/_serialization/table/parquet.py index 6c43e5d5..25c444a0 100644 --- a/lonboard/_serialization/parquet.py +++ b/lonboard/_serialization/table/parquet.py @@ -1,5 +1,6 @@ from __future__ import annotations +import math from io import BytesIO from typing import TYPE_CHECKING, overload From 0f1cc9e8e30b394cb319ecfaa32030e6fd6147ec Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 2 Oct 2025 23:49:09 +0200 Subject: [PATCH 04/12] wip --- lonboard/_serialization/accessor.py | 0 lonboard/_serialization/table/__init__.py | 0 lonboard/_serialization/table/arro3.py | 52 +++++ lonboard/_serialization/table/base.py | 140 +++++++++++++ lonboard/_serialization/table/config.py | 2 + lonboard/_serialization/table/parquet.py | 194 ------------------- lonboard/_serialization/table/parquet_old.py | 171 ++++++++++++++++ lonboard/_serialization/table/pyarrow.py | 65 +++++++ lonboard/_serialization/table/util.py | 1 + 9 files changed, 431 insertions(+), 194 deletions(-) create mode 100644 lonboard/_serialization/accessor.py create mode 100644 lonboard/_serialization/table/__init__.py create mode 100644 lonboard/_serialization/table/arro3.py create mode 100644 lonboard/_serialization/table/base.py create mode 100644 lonboard/_serialization/table/config.py delete mode 100644 lonboard/_serialization/table/parquet.py create mode 100644 lonboard/_serialization/table/parquet_old.py create mode 100644 lonboard/_serialization/table/pyarrow.py create mode 100644 lonboard/_serialization/table/util.py diff --git a/lonboard/_serialization/accessor.py b/lonboard/_serialization/accessor.py new file mode 100644 index 00000000..e69de29b diff --git a/lonboard/_serialization/table/__init__.py b/lonboard/_serialization/table/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lonboard/_serialization/table/arro3.py b/lonboard/_serialization/table/arro3.py new file mode 100644 index 00000000..bbec30eb --- /dev/null +++ b/lonboard/_serialization/table/arro3.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +from io import BytesIO +from typing import TYPE_CHECKING + +from .base import ArrowSerialization +from .config import DEFAULT_PARQUET_COMPRESSION, DEFAULT_PARQUET_COMPRESSION_LEVEL + +if TYPE_CHECKING: + from arro3.core import RecordBatch + + +class Arro3ParquetSerialization(ArrowSerialization): + """Serialize Arrow Tables and Arrays to Parquet using arro3.""" + + def __init__(self) -> None: + super().__init__() + + def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes: + from arro3.io import write_parquet + + compression_string = ( + f"{DEFAULT_PARQUET_COMPRESSION}({DEFAULT_PARQUET_COMPRESSION_LEVEL})" + ) + bio = BytesIO() + write_parquet( + record_batch, + bio, + compression=compression_string, + max_row_group_size=record_batch.num_rows, + ) + + return bio.getvalue() + + +class Arro3IPCSerialization(ArrowSerialization): + """Serialize Arrow Tables and Arrays to Arrow IPC using arro3.""" + + def __init__(self) -> None: + super().__init__() + + def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes: + """Write a single RecordBatch to an Arrow IPC stream in memory and return the bytes.""" + if record_batch.num_rows == 0: + raise ValueError("Batch with 0 rows.") + + from arro3.io import write_ipc_stream + + bio = BytesIO() + write_ipc_stream(record_batch, bio, compression="lz4") + + return bio.getvalue() diff --git a/lonboard/_serialization/table/base.py b/lonboard/_serialization/table/base.py new file mode 100644 index 00000000..91ee3421 --- /dev/null +++ b/lonboard/_serialization/table/base.py @@ -0,0 +1,140 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, overload + +import arro3.compute as ac +from arro3.core import ( + Array, + ChunkedArray, + DataType, + RecordBatch, + Scalar, + Table, + list_array, + list_flatten, + list_offsets, +) +from traitlets import TraitError + +from lonboard._utils import timestamp_start_offset + +if TYPE_CHECKING: + from arro3.core import Array, RecordBatch + + from lonboard._layer import BaseArrowLayer + from lonboard.experimental._layer import TripsLayer + + +class ArrowSerialization(ABC): + @abstractmethod + def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes: + pass + + def _serialize_arrow_table( + self, + table: Table, + *, + max_chunksize: int, + ) -> list[bytes]: + buffers: list[bytes] = [] + assert max_chunksize > 0 + + for record_batch in table.rechunk(max_chunksize=max_chunksize).to_batches(): + if record_batch.num_rows == 0: + raise ValueError("Batch with 0 rows.") + + buffers.append(self._serialize_arrow_batch(record_batch)) + + return buffers + + def _serialize_arrow_column( + self, + array: Array | ChunkedArray, + *, + max_chunksize: int, + ) -> list[bytes]: + """Serialize an Arrow Array or Column to a Parquet file with one column.""" + pyarrow_table = Table.from_pydict({"value": array}) + return self._serialize_arrow_table(pyarrow_table, max_chunksize=max_chunksize) + + def serialize_widget_table( + self, + table: Table, + obj: BaseArrowLayer, + ) -> list[bytes]: + assert isinstance(table, Table), "expected Arrow table" + return self._serialize_arrow_table(table, max_chunksize=obj._rows_per_chunk) # noqa: SLF001 + + @overload + def serialize_widget_accessor( + self, + data: ChunkedArray, + obj: BaseArrowLayer, + ) -> list[bytes]: ... + @overload + def serialize_widget_accessor( + self, + data: str | float | list | tuple | bytes, + obj: BaseArrowLayer, + ) -> str | int | float | list | tuple | bytes: ... + def serialize_widget_accessor( + self, + data: str | float | list | tuple | bytes | ChunkedArray, + obj: BaseArrowLayer, + ): + if data is None: + return None + + # We assume data has already been validated to the right type for this accessor + # Allow any json-serializable type through + if isinstance(data, (str, int, float, list, tuple, bytes)): + return data + + assert isinstance(data, ChunkedArray) + validate_accessor_length_matches_table(data, obj.table) + return self._serialize_arrow_column(data, max_chunksize=obj._rows_per_chunk) # noqa: SLF001 + + def serialize_widget_timestamps( + self, + timestamps: ChunkedArray, + obj: TripsLayer, + ) -> list[bytes]: + """Subtract off min timestamp to fit into f32 integer range. + + Then cast to float32. + """ + # Note: this has some overlap with `timestamp_max_physical_value` in utils. + # Cast to int64 type + timestamps = timestamps.cast(DataType.list(DataType.int64())) + + start_offset_adjustment = Scalar( + timestamp_start_offset(timestamps), + type=DataType.int64(), + ) + + list_offsets_iter = list_offsets(timestamps) + inner_values_iter = list_flatten(timestamps) + + offsetted_chunks = [] + for offsets, inner_values in zip( + list_offsets_iter, + inner_values_iter, + strict=True, + ): + offsetted_values = ac.add(inner_values, start_offset_adjustment) + f32_values = offsetted_values.cast(DataType.int64()).cast( + DataType.float32(), + ) + offsetted_chunks.append(list_array(offsets, f32_values)) + + f32_timestamps_col = ChunkedArray(offsetted_chunks) + return serialize_accessor(f32_timestamps_col, obj) + + +def validate_accessor_length_matches_table( + accessor: Array | ChunkedArray, + table: Table, +) -> None: + if len(accessor) != len(table): + raise TraitError("accessor must have same length as table") diff --git a/lonboard/_serialization/table/config.py b/lonboard/_serialization/table/config.py new file mode 100644 index 00000000..b2ecf9e5 --- /dev/null +++ b/lonboard/_serialization/table/config.py @@ -0,0 +1,2 @@ +DEFAULT_PARQUET_COMPRESSION = "ZSTD" +DEFAULT_PARQUET_COMPRESSION_LEVEL = 7 diff --git a/lonboard/_serialization/table/parquet.py b/lonboard/_serialization/table/parquet.py deleted file mode 100644 index 25c444a0..00000000 --- a/lonboard/_serialization/table/parquet.py +++ /dev/null @@ -1,194 +0,0 @@ -from __future__ import annotations - -import math -from io import BytesIO -from typing import TYPE_CHECKING, overload - -import arro3.compute as ac -from arro3.core import ( - Array, - ChunkedArray, - DataType, - RecordBatch, - Scalar, - Table, - list_array, - list_flatten, - list_offsets, -) -from traitlets import TraitError - -from lonboard._utils import timestamp_start_offset - -if TYPE_CHECKING: - from lonboard._layer import BaseArrowLayer - from lonboard.experimental._layer import TripsLayer - - -DEFAULT_PARQUET_COMPRESSION = "ZSTD" -DEFAULT_PARQUET_COMPRESSION_LEVEL = 7 -DEFAULT_PARQUET_CHUNK_SIZE = 2**16 -# Target chunk size for Arrow (uncompressed) per Parquet chunk -DEFAULT_ARROW_CHUNK_BYTES_SIZE = 5 * 1024 * 1024 # 5MB - -# Maximum number of separate chunks/row groups to allow splitting an input layer into -# Deck.gl can pick from a maximum of 256 layers, and a user could have many layers, so -# we don't want to use too many layers per data file. -DEFAULT_MAX_NUM_CHUNKS = 32 - - -def write_parquet_batch(record_batch: RecordBatch) -> bytes: - """Write a RecordBatch to a Parquet file. - - We still use pyarrow.parquet.ParquetWriter if pyarrow is installed because pyarrow - has better encoding defaults. So Parquet files written by pyarrow are smaller by - default than files written by arro3.io.write_parquet. - """ - # Occasionally it's possible for there to be empty batches in the - # pyarrow table. This will error when writing to parquet. We want to - # give a more informative error. - if record_batch.num_rows == 0: - raise ValueError("Batch with 0 rows.") - - try: - import pyarrow as pa - import pyarrow.parquet as pq - - bio = BytesIO() - with pq.ParquetWriter( - bio, - schema=pa.schema(record_batch.schema), - compression=DEFAULT_PARQUET_COMPRESSION, - compression_level=DEFAULT_PARQUET_COMPRESSION_LEVEL, - ) as writer: - writer.write_batch( - pa.record_batch(record_batch), - row_group_size=record_batch.num_rows, - ) - - return bio.getvalue() - - except ImportError: - from arro3.io import write_parquet - - compression_string = ( - f"{DEFAULT_PARQUET_COMPRESSION}({DEFAULT_PARQUET_COMPRESSION_LEVEL})" - ) - bio = BytesIO() - write_parquet( - record_batch, - bio, - compression=compression_string, - max_row_group_size=record_batch.num_rows, - ) - - return bio.getvalue() - - -def serialize_table_to_parquet(table: Table, *, max_chunksize: int) -> list[bytes]: - buffers: list[bytes] = [] - assert max_chunksize > 0 - - for record_batch in table.rechunk(max_chunksize=max_chunksize).to_batches(): - buffers.append(write_parquet_batch(record_batch)) - - return buffers - - -def serialize_pyarrow_column( - data: Array | ChunkedArray, - *, - max_chunksize: int, -) -> list[bytes]: - """Serialize a pyarrow column to a Parquet file with one column.""" - pyarrow_table = Table.from_pydict({"value": data}) - return serialize_table_to_parquet(pyarrow_table, max_chunksize=max_chunksize) - - -@overload -def serialize_accessor( - data: ChunkedArray, - obj: BaseArrowLayer, -) -> list[bytes]: ... -@overload -def serialize_accessor( - data: str | float | list | tuple | bytes, - obj: BaseArrowLayer, -) -> str | int | float | list | tuple | bytes: ... -def serialize_accessor( - data: str | float | list | tuple | bytes | ChunkedArray, - obj: BaseArrowLayer, -): - if data is None: - return None - - # We assume data has already been validated to the right type for this accessor - # Allow any json-serializable type through - if isinstance(data, (str, int, float, list, tuple, bytes)): - return data - - assert isinstance(data, ChunkedArray) - validate_accessor_length_matches_table(data, obj.table) - return serialize_pyarrow_column(data, max_chunksize=obj._rows_per_chunk) # noqa: SLF001 - - -def serialize_table(data: Table, obj: BaseArrowLayer) -> list[bytes]: - assert isinstance(data, Table), "expected Arrow table" - return serialize_table_to_parquet(data, max_chunksize=obj._rows_per_chunk) # noqa: SLF001 - - -def infer_rows_per_chunk(table: Table) -> int: - # At least one chunk - num_chunks = max(round(table.nbytes / DEFAULT_ARROW_CHUNK_BYTES_SIZE), 1) - - # Clamp to the maximum number of chunks - num_chunks = min(num_chunks, DEFAULT_MAX_NUM_CHUNKS) - - return math.ceil(table.num_rows / num_chunks) - - -def validate_accessor_length_matches_table( - accessor: Array | ChunkedArray, - table: Table, -) -> None: - if len(accessor) != len(table): - raise TraitError("accessor must have same length as table") - - -def serialize_timestamp_accessor( - timestamps: ChunkedArray, - obj: TripsLayer, -) -> list[bytes]: - """Subtract off min timestamp to fit into f32 integer range. - - Then cast to float32. - """ - # Note: this has some overlap with `timestamp_max_physical_value` in utils. - # Cast to int64 type - timestamps = timestamps.cast(DataType.list(DataType.int64())) - - start_offset_adjustment = Scalar( - timestamp_start_offset(timestamps), - type=DataType.int64(), - ) - - list_offsets_iter = list_offsets(timestamps) - inner_values_iter = list_flatten(timestamps) - - offsetted_chunks = [] - for offsets, inner_values in zip( - list_offsets_iter, - inner_values_iter, - strict=True, - ): - offsetted_values = ac.add(inner_values, start_offset_adjustment) - f32_values = offsetted_values.cast(DataType.int64()).cast(DataType.float32()) - offsetted_chunks.append(list_array(offsets, f32_values)) - - f32_timestamps_col = ChunkedArray(offsetted_chunks) - return serialize_accessor(f32_timestamps_col, obj) - - -ACCESSOR_SERIALIZATION = {"to_json": serialize_accessor} -TIMESTAMP_ACCESSOR_SERIALIZATION = {"to_json": serialize_timestamp_accessor} -TABLE_SERIALIZATION = {"to_json": serialize_table} diff --git a/lonboard/_serialization/table/parquet_old.py b/lonboard/_serialization/table/parquet_old.py new file mode 100644 index 00000000..49709630 --- /dev/null +++ b/lonboard/_serialization/table/parquet_old.py @@ -0,0 +1,171 @@ +from __future__ import annotations + +import math + +from arro3.core import Table + +DEFAULT_PARQUET_CHUNK_SIZE = 2**16 +# Target chunk size for Arrow (uncompressed) per Parquet chunk +DEFAULT_ARROW_CHUNK_BYTES_SIZE = 5 * 1024 * 1024 # 5MB + +# Maximum number of separate chunks/row groups to allow splitting an input layer into +# Deck.gl can pick from a maximum of 256 layers, and a user could have many layers, so +# we don't want to use too many layers per data file. +DEFAULT_MAX_NUM_CHUNKS = 32 + + +# def write_parquet_batch(record_batch: RecordBatch) -> bytes: +# """Write a RecordBatch to a Parquet file. + +# We still use pyarrow.parquet.ParquetWriter if pyarrow is installed because pyarrow +# has better encoding defaults. So Parquet files written by pyarrow are smaller by +# default than files written by arro3.io.write_parquet. +# """ +# # Occasionally it's possible for there to be empty batches in the +# # pyarrow table. This will error when writing to parquet. We want to +# # give a more informative error. +# if record_batch.num_rows == 0: +# raise ValueError("Batch with 0 rows.") + +# try: +# import pyarrow as pa +# import pyarrow.parquet as pq + +# bio = BytesIO() +# with pq.ParquetWriter( +# bio, +# schema=pa.schema(record_batch.schema), +# compression=DEFAULT_PARQUET_COMPRESSION, +# compression_level=DEFAULT_PARQUET_COMPRESSION_LEVEL, +# ) as writer: +# writer.write_batch( +# pa.record_batch(record_batch), +# row_group_size=record_batch.num_rows, +# ) + +# return bio.getvalue() + +# except ImportError: +# from arro3.io import write_parquet + +# compression_string = ( +# f"{DEFAULT_PARQUET_COMPRESSION}({DEFAULT_PARQUET_COMPRESSION_LEVEL})" +# ) +# bio = BytesIO() +# write_parquet( +# record_batch, +# bio, +# compression=compression_string, +# max_row_group_size=record_batch.num_rows, +# ) + +# return bio.getvalue() + + +# def serialize_table_to_parquet(table: Table, *, max_chunksize: int) -> list[bytes]: +# buffers: list[bytes] = [] +# assert max_chunksize > 0 + +# for record_batch in table.rechunk(max_chunksize=max_chunksize).to_batches(): +# buffers.append(write_parquet_batch(record_batch)) + +# return buffers + + +# def serialize_pyarrow_column( +# data: Array | ChunkedArray, +# *, +# max_chunksize: int, +# ) -> list[bytes]: +# """Serialize a pyarrow column to a Parquet file with one column.""" +# pyarrow_table = Table.from_pydict({"value": data}) +# return serialize_table_to_parquet(pyarrow_table, max_chunksize=max_chunksize) + + +# @overload +# def serialize_accessor( +# data: ChunkedArray, +# obj: BaseArrowLayer, +# ) -> list[bytes]: ... +# @overload +# def serialize_accessor( +# data: str | float | list | tuple | bytes, +# obj: BaseArrowLayer, +# ) -> str | int | float | list | tuple | bytes: ... +# def serialize_accessor( +# data: str | float | list | tuple | bytes | ChunkedArray, +# obj: BaseArrowLayer, +# ): +# if data is None: +# return None + +# # We assume data has already been validated to the right type for this accessor +# # Allow any json-serializable type through +# if isinstance(data, (str, int, float, list, tuple, bytes)): +# return data + +# assert isinstance(data, ChunkedArray) +# validate_accessor_length_matches_table(data, obj.table) +# return serialize_pyarrow_column(data, max_chunksize=obj._rows_per_chunk) + + +# def serialize_table(data: Table, obj: BaseArrowLayer) -> list[bytes]: +# assert isinstance(data, Table), "expected Arrow table" +# return serialize_table_to_parquet(data, max_chunksize=obj._rows_per_chunk) + + +def infer_rows_per_chunk(table: Table) -> int: + # At least one chunk + num_chunks = max(round(table.nbytes / DEFAULT_ARROW_CHUNK_BYTES_SIZE), 1) + + # Clamp to the maximum number of chunks + num_chunks = min(num_chunks, DEFAULT_MAX_NUM_CHUNKS) + + return math.ceil(table.num_rows / num_chunks) + + +# def validate_accessor_length_matches_table( +# accessor: Array | ChunkedArray, +# table: Table, +# ) -> None: +# if len(accessor) != len(table): +# raise TraitError("accessor must have same length as table") + + +# def serialize_timestamp_accessor( +# timestamps: ChunkedArray, +# obj: TripsLayer, +# ) -> list[bytes]: +# """Subtract off min timestamp to fit into f32 integer range. + +# Then cast to float32. +# """ +# # Note: this has some overlap with `timestamp_max_physical_value` in utils. +# # Cast to int64 type +# timestamps = timestamps.cast(DataType.list(DataType.int64())) + +# start_offset_adjustment = Scalar( +# timestamp_start_offset(timestamps), +# type=DataType.int64(), +# ) + +# list_offsets_iter = list_offsets(timestamps) +# inner_values_iter = list_flatten(timestamps) + +# offsetted_chunks = [] +# for offsets, inner_values in zip( +# list_offsets_iter, +# inner_values_iter, +# strict=True, +# ): +# offsetted_values = ac.add(inner_values, start_offset_adjustment) +# f32_values = offsetted_values.cast(DataType.int64()).cast(DataType.float32()) +# offsetted_chunks.append(list_array(offsets, f32_values)) + +# f32_timestamps_col = ChunkedArray(offsetted_chunks) +# return serialize_accessor(f32_timestamps_col, obj) + + +ACCESSOR_SERIALIZATION = {"to_json": serialize_accessor} +TIMESTAMP_ACCESSOR_SERIALIZATION = {"to_json": serialize_timestamp_accessor} +TABLE_SERIALIZATION = {"to_json": serialize_table} diff --git a/lonboard/_serialization/table/pyarrow.py b/lonboard/_serialization/table/pyarrow.py new file mode 100644 index 00000000..c96b0469 --- /dev/null +++ b/lonboard/_serialization/table/pyarrow.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from io import BytesIO +from typing import TYPE_CHECKING + +from .base import ArrowSerialization +from .config import DEFAULT_PARQUET_COMPRESSION, DEFAULT_PARQUET_COMPRESSION_LEVEL + +if TYPE_CHECKING: + from arro3.core import RecordBatch + + +class PyArrowParquetSerialization(ArrowSerialization): + """Serialize Arrow Tables and Arrays to Parquet using pyarrow.""" + + def __init__(self) -> None: + # Validate that pyarrow is installed + import pyarrow.parquet # noqa: F401 + + super().__init__() + + def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes: + import pyarrow as pa + import pyarrow.parquet as pq + + bio = BytesIO() + with pq.ParquetWriter( + bio, + schema=pa.schema(record_batch.schema), + compression=DEFAULT_PARQUET_COMPRESSION, + compression_level=DEFAULT_PARQUET_COMPRESSION_LEVEL, + ) as writer: + writer.write_batch( + pa.record_batch(record_batch), + row_group_size=record_batch.num_rows, + ) + + return bio.getvalue() + + +class PyArrowIPCSerialization(ArrowSerialization): + """Serialize Arrow Tables and Arrays to Arrow IPC using pyarrow.""" + + def __init__(self) -> None: + # Validate that pyarrow is installed + import pyarrow as pa # noqa: F401 + + super().__init__() + + def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes: + """Write a single RecordBatch to an Arrow IPC stream in memory and return the bytes.""" + if record_batch.num_rows == 0: + raise ValueError("Batch with 0 rows.") + + import pyarrow as pa + + bio = BytesIO() + with pa.ipc.new_stream( + bio, + record_batch.schema, + options=pa.ipc.IpcWriteOptions(compression="lz4"), + ) as writer: + writer.write_batch(record_batch) + + return bio.getvalue() diff --git a/lonboard/_serialization/table/util.py b/lonboard/_serialization/table/util.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/lonboard/_serialization/table/util.py @@ -0,0 +1 @@ + From 547a54e09fe5242570016edd87631b1e30831fe7 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 2 Oct 2025 23:49:33 +0200 Subject: [PATCH 05/12] wip --- lonboard/_serialization/table/util.py | 1 - 1 file changed, 1 deletion(-) diff --git a/lonboard/_serialization/table/util.py b/lonboard/_serialization/table/util.py index 8b137891..e69de29b 100644 --- a/lonboard/_serialization/table/util.py +++ b/lonboard/_serialization/table/util.py @@ -1 +0,0 @@ - From 407d2eb0f292aca024a2731a16385944968ec539 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 15 Oct 2025 11:55:03 -0400 Subject: [PATCH 06/12] restore multithreaded serialization --- lonboard/_serialization/table/base.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/lonboard/_serialization/table/base.py b/lonboard/_serialization/table/base.py index 91ee3421..eb7e55f3 100644 --- a/lonboard/_serialization/table/base.py +++ b/lonboard/_serialization/table/base.py @@ -1,6 +1,7 @@ from __future__ import annotations from abc import ABC, abstractmethod +from concurrent.futures import ThreadPoolExecutor from typing import TYPE_CHECKING, overload import arro3.compute as ac @@ -37,16 +38,14 @@ def _serialize_arrow_table( *, max_chunksize: int, ) -> list[bytes]: - buffers: list[bytes] = [] assert max_chunksize > 0 - for record_batch in table.rechunk(max_chunksize=max_chunksize).to_batches(): - if record_batch.num_rows == 0: - raise ValueError("Batch with 0 rows.") + batches = table.rechunk(max_chunksize=max_chunksize).to_batches() + if any(batch.num_rows == 0 for batch in batches): + raise ValueError("Batch with 0 rows.") - buffers.append(self._serialize_arrow_batch(record_batch)) - - return buffers + with ThreadPoolExecutor() as executor: + return list(executor.map(self._serialize_arrow_batch, batches)) def _serialize_arrow_column( self, From 95e3568fe55177e2fce0eb1f31fb653a4593303a Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 15 Oct 2025 12:00:40 -0400 Subject: [PATCH 07/12] improved docs --- lonboard/_serialization/table/base.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/lonboard/_serialization/table/base.py b/lonboard/_serialization/table/base.py index eb7e55f3..f030625a 100644 --- a/lonboard/_serialization/table/base.py +++ b/lonboard/_serialization/table/base.py @@ -28,9 +28,16 @@ class ArrowSerialization(ABC): + """Base class for serializing Arrow Tables and Arrays. + + Ipywidgets does not easily support streaming of data, and the transport can choke on + large single buffers. Therefore, we split a table into multiple RecordBatches and + serialize them individually. Then we send a list of buffers to the frontend. + """ + @abstractmethod def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes: - pass + """Serialize one Arrow RecordBatch to a buffer.""" def _serialize_arrow_table( self, @@ -53,35 +60,37 @@ def _serialize_arrow_column( *, max_chunksize: int, ) -> list[bytes]: - """Serialize an Arrow Array or Column to a Parquet file with one column.""" + """Serialize an Arrow Array or Column as a table with one column named "value".""" pyarrow_table = Table.from_pydict({"value": array}) return self._serialize_arrow_table(pyarrow_table, max_chunksize=max_chunksize) - def serialize_widget_table( + def serialize_table( self, table: Table, obj: BaseArrowLayer, ) -> list[bytes]: + """Serialize an Arrow Table from a widget.""" assert isinstance(table, Table), "expected Arrow table" return self._serialize_arrow_table(table, max_chunksize=obj._rows_per_chunk) # noqa: SLF001 @overload - def serialize_widget_accessor( + def serialize_accessor( self, data: ChunkedArray, obj: BaseArrowLayer, ) -> list[bytes]: ... @overload - def serialize_widget_accessor( + def serialize_accessor( self, data: str | float | list | tuple | bytes, obj: BaseArrowLayer, ) -> str | int | float | list | tuple | bytes: ... - def serialize_widget_accessor( + def serialize_accessor( self, data: str | float | list | tuple | bytes | ChunkedArray, obj: BaseArrowLayer, ): + """Serialize an Arrow Array or Column from a widget.""" if data is None: return None @@ -99,9 +108,9 @@ def serialize_widget_timestamps( timestamps: ChunkedArray, obj: TripsLayer, ) -> list[bytes]: - """Subtract off min timestamp to fit into f32 integer range. + """Serialize timestamps for TripsLayer. - Then cast to float32. + Subtract off min timestamp to fit into f32 integer range. Then cast to float32. """ # Note: this has some overlap with `timestamp_max_physical_value` in utils. # Cast to int64 type @@ -128,7 +137,7 @@ def serialize_widget_timestamps( offsetted_chunks.append(list_array(offsets, f32_values)) f32_timestamps_col = ChunkedArray(offsetted_chunks) - return serialize_accessor(f32_timestamps_col, obj) + return self.serialize_accessor(f32_timestamps_col, obj) def validate_accessor_length_matches_table( From 98c9a1524d5f23bd3d0e509cf86e9ca52c12eb0d Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 15 Oct 2025 13:28:57 -0400 Subject: [PATCH 08/12] Define parquet/ipc deserialization --- package-lock.json | 14 ++++++-- package.json | 2 ++ src/index.tsx | 3 -- src/serialization/index.ts | 34 +++++++++++++++++++ src/serialization/ipc.ts | 28 ++++++++++++++++ src/{ => serialization}/parquet.ts | 54 +++++++++++++++++------------- 6 files changed, 106 insertions(+), 29 deletions(-) create mode 100644 src/serialization/index.ts create mode 100644 src/serialization/ipc.ts rename src/{ => serialization}/parquet.ts (52%) diff --git a/package-lock.json b/package-lock.json index 268c6264..033b6094 100644 --- a/package-lock.json +++ b/package-lock.json @@ -21,6 +21,7 @@ "framer-motion": "^12.23.19", "lodash.debounce": "^4.0.8", "lodash.throttle": "^4.1.1", + "lz4js": "^0.2.0", "maplibre-gl": "^5.9.0", "memoize-one": "^6.0.0", "parquet-wasm": "0.7.1", @@ -39,6 +40,7 @@ "@types/lodash": "^4.17.13", "@types/lodash.debounce": "^4.0.9", "@types/lodash.throttle": "^4.1.9", + "@types/lz4js": "^0.2.1", "@types/react": "^19.1.1", "@types/uuid": "^10.0.0", "autoprefixer": "^10.4.20", @@ -7557,6 +7559,13 @@ "@types/lodash": "*" } }, + "node_modules/@types/lz4js": { + "version": "0.2.1", + "resolved": "https://registry.npmjs.org/@types/lz4js/-/lz4js-0.2.1.tgz", + "integrity": "sha512-aAnbA4uKPNqZqu0XK1QAwKP0Wskb4Oa7ZFqxW5CMIyGgqYQKFgBxTfK3m3KODXoOLv5t14VregzgrEak13uGQA==", + "dev": true, + "license": "MIT" + }, "node_modules/@types/node": { "version": "24.7.2", "resolved": "https://registry.npmjs.org/@types/node/-/node-24.7.2.tgz", @@ -11752,8 +11761,7 @@ "version": "0.2.0", "resolved": "https://registry.npmjs.org/lz4js/-/lz4js-0.2.0.tgz", "integrity": "sha512-gY2Ia9Lm7Ep8qMiuGRhvUq0Q7qUereeldZPP1PMEJxPtEWHJLqw9pgX68oHajBH0nzJK4MaZEA/YNV3jT8u8Bg==", - "license": "ISC", - "optional": true + "license": "ISC" }, "node_modules/lzo-wasm": { "version": "0.0.4", @@ -15144,7 +15152,6 @@ "resolved": "https://registry.npmjs.org/tailwindcss/-/tailwindcss-3.4.18.tgz", "integrity": "sha512-6A2rnmW5xZMdw11LYjhcI5846rt9pbLSabY5XPxo+XWdxwZaFEn47Go4NzFiHu9sNNmr/kXivP1vStfvMaK1GQ==", "license": "MIT", - "peer": true, "dependencies": { "@alloc/quick-lru": "^5.2.0", "arg": "^5.0.2", @@ -15767,6 +15774,7 @@ "integrity": "sha512-j3lYzGC3P+B5Yfy/pfKNgVEg4+UtcIJcVRt2cDjIOmhLourAqPqf8P7acgxeiSgUB7E3p2P8/3gNIgDLpwzs4g==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "esbuild": "^0.21.3", "postcss": "^8.4.43", diff --git a/package.json b/package.json index 6f38c717..ca5fd6bb 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,7 @@ "framer-motion": "^12.23.19", "lodash.debounce": "^4.0.8", "lodash.throttle": "^4.1.1", + "lz4js": "^0.2.0", "maplibre-gl": "^5.9.0", "memoize-one": "^6.0.0", "parquet-wasm": "0.7.1", @@ -38,6 +39,7 @@ "@types/lodash": "^4.17.13", "@types/lodash.debounce": "^4.0.9", "@types/lodash.throttle": "^4.1.9", + "@types/lz4js": "^0.2.1", "@types/react": "^19.1.1", "@types/uuid": "^10.0.0", "autoprefixer": "^10.4.20", diff --git a/src/index.tsx b/src/index.tsx index 3fa03e6d..42be11dd 100644 --- a/src/index.tsx +++ b/src/index.tsx @@ -15,7 +15,6 @@ import { type BaseLayerModel, initializeChildModels, } from "./model/index.js"; -import { initParquetWasm } from "./parquet.js"; import DeckFirstRenderer from "./renderers/deck-first.js"; import OverlayRenderer from "./renderers/overlay.js"; import { MapRendererProps } from "./renderers/types.js"; @@ -31,8 +30,6 @@ import * as selectors from "./xstate/selectors"; import "maplibre-gl/dist/maplibre-gl.css"; import "./globals.css"; -await initParquetWasm(); - const DEFAULT_INITIAL_VIEW_STATE = { latitude: 10, longitude: 0, diff --git a/src/serialization/index.ts b/src/serialization/index.ts new file mode 100644 index 00000000..d96b2229 --- /dev/null +++ b/src/serialization/index.ts @@ -0,0 +1,34 @@ +import * as arrow from "apache-arrow"; + +import { parseArrowIPC } from "./ipc"; +import { isParquetBuffer, parseParquet } from "./parquet"; + +/** + * Parse a list of buffers containing either Parquet or Arrow IPC chunks into an + * Arrow JS table + * + * Each buffer in the list is expected to be a fully self-contained Parquet file + * or Arrow IPC file/stream that can parse on its own and consists of one arrow + * Record Batch + */ +export function deserializeArrowTable(dataViews: DataView[]): arrow.Table { + const batches: arrow.RecordBatch[] = []; + for (const chunkBuffer of dataViews) { + let table: arrow.Table; + + if (isParquetBuffer(chunkBuffer)) { + table = parseParquet(chunkBuffer); + } else { + // Assume Arrow IPC + table = parseArrowIPC(chunkBuffer); + } + + if (table.batches.length !== 1) { + console.warn(`Expected one batch in table, got ${table.batches.length}`); + } + + batches.push(...table.batches); + } + + return new arrow.Table(batches); +} diff --git a/src/serialization/ipc.ts b/src/serialization/ipc.ts new file mode 100644 index 00000000..10617f75 --- /dev/null +++ b/src/serialization/ipc.ts @@ -0,0 +1,28 @@ +import * as arrow from "apache-arrow"; +import * as lz4 from "lz4js"; + +const lz4Codec: arrow.Codec = { + encode(data: Uint8Array): Uint8Array { + return lz4.compress(data); + }, + decode(data: Uint8Array): Uint8Array { + return lz4.decompress(data); + }, +}; + +let LZ4_CODEC_SET: boolean = false; + +/** + * Parse an Arrow IPC buffer to an Arrow JS table + */ +export function parseArrowIPC(dataView: DataView): arrow.Table { + if (!LZ4_CODEC_SET) { + arrow.compressionRegistry.set(arrow.CompressionType.LZ4_FRAME, lz4Codec); + LZ4_CODEC_SET = true; + } + + console.time("readArrowIPC"); + const arrowTable = arrow.tableFromIPC(dataView); + console.timeEnd("readArrowIPC"); + return arrowTable; +} diff --git a/src/parquet.ts b/src/serialization/parquet.ts similarity index 52% rename from src/parquet.ts rename to src/serialization/parquet.ts index beda1ade..39a797a7 100644 --- a/src/parquet.ts +++ b/src/serialization/parquet.ts @@ -1,5 +1,6 @@ import * as arrow from "apache-arrow"; import _initParquetWasm, { readParquet } from "parquet-wasm"; +import {} from "parquet-wasm"; // NOTE: this version must be synced exactly with the parquet-wasm version in // use. @@ -7,21 +8,49 @@ const PARQUET_WASM_VERSION = "0.7.1"; const PARQUET_WASM_CDN_URL = `https://cdn.jsdelivr.net/npm/parquet-wasm@${PARQUET_WASM_VERSION}/esm/parquet_wasm_bg.wasm`; let WASM_READY: boolean = false; +// We initiate the fetch immediately (but don't await it) so that it can be +// downloaded in the background on app start +const PARQUET_WASM_FETCH = fetch(PARQUET_WASM_CDN_URL); + +const PARQUET_MAGIC = new TextEncoder().encode("PAR1"); + +/** Initialize the parquet-wasm WASM buffer */ export async function initParquetWasm() { if (WASM_READY) { return; } - await _initParquetWasm(PARQUET_WASM_CDN_URL); + const wasm_buffer = await PARQUET_WASM_FETCH; + await _initParquetWasm(wasm_buffer); WASM_READY = true; + return; +} + +export function isParquetBuffer(dataView: DataView): boolean { + if (dataView.byteLength < PARQUET_MAGIC.length) { + return false; + } + + for (let i = 0; i < PARQUET_MAGIC.length; i++) { + if (dataView.getUint8(i) !== PARQUET_MAGIC[i]) { + return false; + } + } + + return true; } /** * Parse a Parquet buffer to an Arrow JS table + * + * It's most convenient for this to be a sync function, so we assume we've + * called `initParquetWasm()` elsewhere */ export function parseParquet(dataView: DataView): arrow.Table { if (!WASM_READY) { - throw new Error("wasm not ready"); + throw new Error( + "parquet-wasm not initialized, initParquetWasm() should have been called first", + ); } console.time("readParquet"); @@ -36,24 +65,3 @@ export function parseParquet(dataView: DataView): arrow.Table { return arrowTable; } - -/** - * Parse a list of buffers containing Parquet chunks into an Arrow JS table - * - * Each buffer in the list is expected to be a fully self-contained Parquet file - * that can parse on its own and consists of one arrow Record Batch - * - * @var {[type]} - */ -export function parseParquetBuffers(dataViews: DataView[]): arrow.Table { - const batches: arrow.RecordBatch[] = []; - for (const chunkBuffer of dataViews) { - const table = parseParquet(chunkBuffer); - if (table.batches.length !== 1) { - console.warn("Expected one batch"); - } - batches.push(...table.batches); - } - - return new arrow.Table(batches); -} From b9cc043462de7fe4ad022359d2c49f4280cf672b Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 15 Oct 2025 13:52:18 -0400 Subject: [PATCH 09/12] Handle parsing either arrow ipc or parquet buffers --- src/accessor.ts | 52 ++---------------------------------- src/model/base.ts | 5 ---- src/model/extension.ts | 1 - src/model/layer.ts | 6 ++--- src/serialization/parquet.ts | 12 +++------ 5 files changed, 8 insertions(+), 68 deletions(-) diff --git a/src/accessor.ts b/src/accessor.ts index 402291f9..5ebe8f52 100644 --- a/src/accessor.ts +++ b/src/accessor.ts @@ -1,61 +1,13 @@ import * as arrow from "apache-arrow"; -import { useState, useEffect } from "react"; -import { parseParquetBuffers } from "./parquet.js"; +import { deserializeArrowTable } from "./serialization/index.js"; type AccessorRaw = DataView[] | unknown; -export function useTableBufferState( - wasmReady: boolean, - dataRaw: DataView[], -): [arrow.Table | null] { - const [dataTable, setDataTable] = useState(null); - // Only parse the parquet buffer when the data itself or wasmReady has changed - useEffect(() => { - const callback = () => { - if (wasmReady && dataRaw && dataRaw.length > 0) { - console.log( - `table byte lengths: ${dataRaw.map( - (dataView) => dataView.byteLength, - )}`, - ); - - setDataTable(parseParquetBuffers(dataRaw)); - } - }; - callback(); - }, [wasmReady, dataRaw]); - - return [dataTable]; -} - -export function useAccessorState( - wasmReady: boolean, - accessorRaw: AccessorRaw, -): [arrow.Vector | null] { - const [accessorValue, setAccessorValue] = useState(null); - - // Only parse the parquet buffer when the data itself or wasmReady has changed - useEffect(() => { - const callback = () => { - setAccessorValue( - accessorRaw instanceof Array && accessorRaw?.[0] instanceof DataView - ? wasmReady && accessorRaw?.[0].byteLength > 0 - ? parseParquetBuffers(accessorRaw).getChildAt(0) - : null - : (accessorRaw as arrow.Vector | null), - ); - }; - callback(); - }, [wasmReady, accessorRaw]); - - return [accessorValue]; -} - export function parseAccessor(accessorRaw: AccessorRaw): arrow.Vector | null { return accessorRaw instanceof Array && accessorRaw?.[0] instanceof DataView ? accessorRaw?.[0].byteLength > 0 - ? parseParquetBuffers(accessorRaw).getChildAt(0) + ? deserializeArrowTable(accessorRaw).getChildAt(0) : null : (accessorRaw as arrow.Vector | null); } diff --git a/src/model/base.ts b/src/model/base.ts index 0d9faa08..f2dd2161 100644 --- a/src/model/base.ts +++ b/src/model/base.ts @@ -16,11 +16,6 @@ export abstract class BaseModel { this.callbacks = new Map(); this.callbacks.set("change", updateStateCallback); } - - async loadSubModels() { - return; - } - /** * Initialize an attribute that does not need any transformation from its * serialized representation to its deck.gl representation. diff --git a/src/model/extension.ts b/src/model/extension.ts index 7ae75f7b..ede75be1 100644 --- a/src/model/extension.ts +++ b/src/model/extension.ts @@ -258,6 +258,5 @@ export async function initializeExtension( throw new Error(`no known model for extension type ${extensionType}`); } - await extensionModel.loadSubModels(); return extensionModel; } diff --git a/src/model/layer.ts b/src/model/layer.ts index d7110362..3aeb3cef 100644 --- a/src/model/layer.ts +++ b/src/model/layer.ts @@ -27,8 +27,8 @@ import type { import type { WidgetModel } from "@jupyter-widgets/base"; import * as arrow from "apache-arrow"; -import { parseParquetBuffers } from "../parquet.js"; import { BaseLayerModel } from "./base-layer.js"; +import { deserializeArrowTable } from "../serialization/index.js"; import { isDefined } from "../util.js"; import { PointVector, @@ -64,13 +64,13 @@ export abstract class BaseArrowLayerModel extends BaseLayerModel { * @param {string} pythonName Name of attribute on Python model (usually snake-cased) */ initTable(pythonName: string) { - this.table = parseParquetBuffers(this.model.get(pythonName)); + this.table = deserializeArrowTable(this.model.get(pythonName)); // Remove all existing change callbacks for this attribute this.model.off(`change:${pythonName}`); const callback = () => { - this.table = parseParquetBuffers(this.model.get(pythonName)); + this.table = deserializeArrowTable(this.model.get(pythonName)); }; this.model.on(`change:${pythonName}`, callback); diff --git a/src/serialization/parquet.ts b/src/serialization/parquet.ts index 39a797a7..33a1e83a 100644 --- a/src/serialization/parquet.ts +++ b/src/serialization/parquet.ts @@ -26,6 +26,9 @@ export async function initParquetWasm() { return; } +// For now, simplest to just ensure this is called at least once on module load +await initParquetWasm(); + export function isParquetBuffer(dataView: DataView): boolean { if (dataView.byteLength < PARQUET_MAGIC.length) { return false; @@ -42,17 +45,8 @@ export function isParquetBuffer(dataView: DataView): boolean { /** * Parse a Parquet buffer to an Arrow JS table - * - * It's most convenient for this to be a sync function, so we assume we've - * called `initParquetWasm()` elsewhere */ export function parseParquet(dataView: DataView): arrow.Table { - if (!WASM_READY) { - throw new Error( - "parquet-wasm not initialized, initParquetWasm() should have been called first", - ); - } - console.time("readParquet"); // TODO: use arrow-js-ffi for more memory-efficient wasm --> js transfer? From 8cd8c1d0b3a5261c82be31252829a3b21704ff29 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 15 Oct 2025 14:02:21 -0400 Subject: [PATCH 10/12] define top-level parquet/arrow serialization --- lonboard/_serialization/accessor.py | 0 lonboard/_serialization/table/__init__.py | 60 +++++++ lonboard/_serialization/table/parquet_old.py | 171 ------------------- lonboard/_serialization/table/util.py | 25 +++ 4 files changed, 85 insertions(+), 171 deletions(-) delete mode 100644 lonboard/_serialization/accessor.py delete mode 100644 lonboard/_serialization/table/parquet_old.py diff --git a/lonboard/_serialization/accessor.py b/lonboard/_serialization/accessor.py deleted file mode 100644 index e69de29b..00000000 diff --git a/lonboard/_serialization/table/__init__.py b/lonboard/_serialization/table/__init__.py index e69de29b..7998e493 100644 --- a/lonboard/_serialization/table/__init__.py +++ b/lonboard/_serialization/table/__init__.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from lonboard._serialization.table.arro3 import ( + Arro3IPCSerialization, + Arro3ParquetSerialization, +) +from lonboard._serialization.table.base import ArrowSerialization +from lonboard._serialization.table.pyarrow import ( + PyArrowIPCSerialization, + PyArrowParquetSerialization, +) + +if TYPE_CHECKING: + from arro3.core import RecordBatch + + +class ParquetSerialization(ArrowSerialization): + """Serialize Arrow Tables and Arrays to Parquet. + + Uses `pyarrow` if installed, otherwise falls back to `arro3.io`. + """ + + _impl: ArrowSerialization + + def __init__(self) -> None: + try: + import pyarrow.parquet + except ImportError: + self._impl = Arro3ParquetSerialization() + else: + self._impl = PyArrowParquetSerialization() + + super().__init__() + + def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes: + return self._impl._serialize_arrow_batch(record_batch) # noqa SLF001 + + +class IPCSerialization(ArrowSerialization): + """Serialize Arrow Tables and Arrays to Arrow IPC. + + Uses `pyarrow` if installed, otherwise falls back to `arro3.io`. + """ + + _impl: ArrowSerialization + + def __init__(self) -> None: + try: + import pyarrow as pa + except ImportError: + self._impl = Arro3IPCSerialization() + else: + self._impl = PyArrowIPCSerialization() + + super().__init__() + + def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes: + return self._impl._serialize_arrow_batch(record_batch) # noqa SLF001 diff --git a/lonboard/_serialization/table/parquet_old.py b/lonboard/_serialization/table/parquet_old.py deleted file mode 100644 index 49709630..00000000 --- a/lonboard/_serialization/table/parquet_old.py +++ /dev/null @@ -1,171 +0,0 @@ -from __future__ import annotations - -import math - -from arro3.core import Table - -DEFAULT_PARQUET_CHUNK_SIZE = 2**16 -# Target chunk size for Arrow (uncompressed) per Parquet chunk -DEFAULT_ARROW_CHUNK_BYTES_SIZE = 5 * 1024 * 1024 # 5MB - -# Maximum number of separate chunks/row groups to allow splitting an input layer into -# Deck.gl can pick from a maximum of 256 layers, and a user could have many layers, so -# we don't want to use too many layers per data file. -DEFAULT_MAX_NUM_CHUNKS = 32 - - -# def write_parquet_batch(record_batch: RecordBatch) -> bytes: -# """Write a RecordBatch to a Parquet file. - -# We still use pyarrow.parquet.ParquetWriter if pyarrow is installed because pyarrow -# has better encoding defaults. So Parquet files written by pyarrow are smaller by -# default than files written by arro3.io.write_parquet. -# """ -# # Occasionally it's possible for there to be empty batches in the -# # pyarrow table. This will error when writing to parquet. We want to -# # give a more informative error. -# if record_batch.num_rows == 0: -# raise ValueError("Batch with 0 rows.") - -# try: -# import pyarrow as pa -# import pyarrow.parquet as pq - -# bio = BytesIO() -# with pq.ParquetWriter( -# bio, -# schema=pa.schema(record_batch.schema), -# compression=DEFAULT_PARQUET_COMPRESSION, -# compression_level=DEFAULT_PARQUET_COMPRESSION_LEVEL, -# ) as writer: -# writer.write_batch( -# pa.record_batch(record_batch), -# row_group_size=record_batch.num_rows, -# ) - -# return bio.getvalue() - -# except ImportError: -# from arro3.io import write_parquet - -# compression_string = ( -# f"{DEFAULT_PARQUET_COMPRESSION}({DEFAULT_PARQUET_COMPRESSION_LEVEL})" -# ) -# bio = BytesIO() -# write_parquet( -# record_batch, -# bio, -# compression=compression_string, -# max_row_group_size=record_batch.num_rows, -# ) - -# return bio.getvalue() - - -# def serialize_table_to_parquet(table: Table, *, max_chunksize: int) -> list[bytes]: -# buffers: list[bytes] = [] -# assert max_chunksize > 0 - -# for record_batch in table.rechunk(max_chunksize=max_chunksize).to_batches(): -# buffers.append(write_parquet_batch(record_batch)) - -# return buffers - - -# def serialize_pyarrow_column( -# data: Array | ChunkedArray, -# *, -# max_chunksize: int, -# ) -> list[bytes]: -# """Serialize a pyarrow column to a Parquet file with one column.""" -# pyarrow_table = Table.from_pydict({"value": data}) -# return serialize_table_to_parquet(pyarrow_table, max_chunksize=max_chunksize) - - -# @overload -# def serialize_accessor( -# data: ChunkedArray, -# obj: BaseArrowLayer, -# ) -> list[bytes]: ... -# @overload -# def serialize_accessor( -# data: str | float | list | tuple | bytes, -# obj: BaseArrowLayer, -# ) -> str | int | float | list | tuple | bytes: ... -# def serialize_accessor( -# data: str | float | list | tuple | bytes | ChunkedArray, -# obj: BaseArrowLayer, -# ): -# if data is None: -# return None - -# # We assume data has already been validated to the right type for this accessor -# # Allow any json-serializable type through -# if isinstance(data, (str, int, float, list, tuple, bytes)): -# return data - -# assert isinstance(data, ChunkedArray) -# validate_accessor_length_matches_table(data, obj.table) -# return serialize_pyarrow_column(data, max_chunksize=obj._rows_per_chunk) - - -# def serialize_table(data: Table, obj: BaseArrowLayer) -> list[bytes]: -# assert isinstance(data, Table), "expected Arrow table" -# return serialize_table_to_parquet(data, max_chunksize=obj._rows_per_chunk) - - -def infer_rows_per_chunk(table: Table) -> int: - # At least one chunk - num_chunks = max(round(table.nbytes / DEFAULT_ARROW_CHUNK_BYTES_SIZE), 1) - - # Clamp to the maximum number of chunks - num_chunks = min(num_chunks, DEFAULT_MAX_NUM_CHUNKS) - - return math.ceil(table.num_rows / num_chunks) - - -# def validate_accessor_length_matches_table( -# accessor: Array | ChunkedArray, -# table: Table, -# ) -> None: -# if len(accessor) != len(table): -# raise TraitError("accessor must have same length as table") - - -# def serialize_timestamp_accessor( -# timestamps: ChunkedArray, -# obj: TripsLayer, -# ) -> list[bytes]: -# """Subtract off min timestamp to fit into f32 integer range. - -# Then cast to float32. -# """ -# # Note: this has some overlap with `timestamp_max_physical_value` in utils. -# # Cast to int64 type -# timestamps = timestamps.cast(DataType.list(DataType.int64())) - -# start_offset_adjustment = Scalar( -# timestamp_start_offset(timestamps), -# type=DataType.int64(), -# ) - -# list_offsets_iter = list_offsets(timestamps) -# inner_values_iter = list_flatten(timestamps) - -# offsetted_chunks = [] -# for offsets, inner_values in zip( -# list_offsets_iter, -# inner_values_iter, -# strict=True, -# ): -# offsetted_values = ac.add(inner_values, start_offset_adjustment) -# f32_values = offsetted_values.cast(DataType.int64()).cast(DataType.float32()) -# offsetted_chunks.append(list_array(offsets, f32_values)) - -# f32_timestamps_col = ChunkedArray(offsetted_chunks) -# return serialize_accessor(f32_timestamps_col, obj) - - -ACCESSOR_SERIALIZATION = {"to_json": serialize_accessor} -TIMESTAMP_ACCESSOR_SERIALIZATION = {"to_json": serialize_timestamp_accessor} -TABLE_SERIALIZATION = {"to_json": serialize_table} diff --git a/lonboard/_serialization/table/util.py b/lonboard/_serialization/table/util.py index e69de29b..e8266439 100644 --- a/lonboard/_serialization/table/util.py +++ b/lonboard/_serialization/table/util.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +import math +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from arro3.core import Table + +# Target chunk size for Arrow (uncompressed) per Parquet chunk +DEFAULT_ARROW_CHUNK_BYTES_SIZE = 5 * 1024 * 1024 # 5MB + +# Maximum number of separate chunks/row groups to allow splitting an input layer into +# Deck.gl can pick from a maximum of 256 layers, and a user could have many layers, so +# we don't want to use too many layers per data file. +DEFAULT_MAX_NUM_CHUNKS = 32 + + +def infer_rows_per_chunk(table: Table) -> int: + # At least one chunk + num_chunks = max(round(table.nbytes / DEFAULT_ARROW_CHUNK_BYTES_SIZE), 1) + + # Clamp to the maximum number of chunks + num_chunks = min(num_chunks, DEFAULT_MAX_NUM_CHUNKS) + + return math.ceil(table.num_rows / num_chunks) From 411ba1430f217751003c2eb4d9003e3dc6547a9c Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 15 Oct 2025 14:17:35 -0400 Subject: [PATCH 11/12] Manage choice of serialization method via config --- lonboard/_serialization/__init__.py | 58 +++++++++++++++++++++++++++ lonboard/_serialization/table/base.py | 2 +- lonboard/config.py | 3 ++ lonboard/experimental/traits.py | 4 +- 4 files changed, 64 insertions(+), 3 deletions(-) create mode 100644 lonboard/config.py diff --git a/lonboard/_serialization/__init__.py b/lonboard/_serialization/__init__.py index e69de29b..3ebce342 100644 --- a/lonboard/_serialization/__init__.py +++ b/lonboard/_serialization/__init__.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from lonboard import config +from lonboard._serialization.table import ( + ArrowSerialization, + IPCSerialization, + ParquetSerialization, +) + +if TYPE_CHECKING: + from arro3.core import ChunkedArray, Table + + from lonboard._layer import BaseArrowLayer + from lonboard.experimental._layer import TripsLayer + + +def _choose_serialization() -> ArrowSerialization: + """Handle choice of serialization method. + + NOTE: we handle this choice **inside of `serialize_` functions** so that the choice + can be changed at runtime. We don't want a specific serialization class to be + attached to layer instances, because then it wouldn't update if the config changes. + """ + if config.USE_PARQUET: + return ParquetSerialization() + + return IPCSerialization() + + +def serialize_accessor( + data: str | float | list | tuple | bytes | ChunkedArray, + obj: BaseArrowLayer, +) -> str | int | float | list | tuple | bytes | list[bytes]: + """Serialize an Arrow Array or Column from a widget.""" + return _choose_serialization().serialize_accessor(data, obj) + + +def serialize_timestamps( + timestamps: ChunkedArray, + obj: TripsLayer, +) -> list[bytes]: + """Serialize timestamps for TripsLayer.""" + return _choose_serialization().serialize_timestamps(timestamps, obj) + + +def serialize_table( + table: Table, + obj: BaseArrowLayer, +) -> list[bytes]: + """Serialize an Arrow Table from a widget.""" + return _choose_serialization().serialize_table(table, obj) + + +ACCESSOR_SERIALIZATION = {"to_json": serialize_accessor} +TIMESTAMP_SERIALIZATION = {"to_json": serialize_timestamps} +TABLE_SERIALIZATION = {"to_json": serialize_table} diff --git a/lonboard/_serialization/table/base.py b/lonboard/_serialization/table/base.py index f030625a..17e005ce 100644 --- a/lonboard/_serialization/table/base.py +++ b/lonboard/_serialization/table/base.py @@ -103,7 +103,7 @@ def serialize_accessor( validate_accessor_length_matches_table(data, obj.table) return self._serialize_arrow_column(data, max_chunksize=obj._rows_per_chunk) # noqa: SLF001 - def serialize_widget_timestamps( + def serialize_timestamps( self, timestamps: ChunkedArray, obj: TripsLayer, diff --git a/lonboard/config.py b/lonboard/config.py new file mode 100644 index 00000000..5f539353 --- /dev/null +++ b/lonboard/config.py @@ -0,0 +1,3 @@ +# TODO: real config system + +USE_PARQUET: bool = False diff --git a/lonboard/experimental/traits.py b/lonboard/experimental/traits.py index 526340e1..f97dc085 100644 --- a/lonboard/experimental/traits.py +++ b/lonboard/experimental/traits.py @@ -19,7 +19,7 @@ ) from lonboard._constants import MAX_INTEGER_FLOAT32, MIN_INTEGER_FLOAT32 -from lonboard._serialization import TIMESTAMP_ACCESSOR_SERIALIZATION +from lonboard._serialization import TIMESTAMP_SERIALIZATION from lonboard._utils import get_geometry_column_index from lonboard.traits import FixedErrorTraitType @@ -56,7 +56,7 @@ def __init__( **kwargs: Any, ) -> None: super().__init__(*args, **kwargs) - self.tag(sync=True, **TIMESTAMP_ACCESSOR_SERIALIZATION) + self.tag(sync=True, **TIMESTAMP_SERIALIZATION) def _reduce_precision( self, From 850f35ea4082e8b0f5afc06e16670105edca503e Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 16 Oct 2025 11:55:12 -0400 Subject: [PATCH 12/12] cleanup --- lonboard/_layer.py | 2 +- lonboard/_serialization/table/arro3.py | 5 +---- lonboard/_serialization/table/pyarrow.py | 9 +++------ 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/lonboard/_layer.py b/lonboard/_layer.py index cc3453c2..32114914 100644 --- a/lonboard/_layer.py +++ b/lonboard/_layer.py @@ -31,7 +31,7 @@ from lonboard._geoarrow.ops.coord_layout import make_geometry_interleaved from lonboard._geoarrow.parse_wkb import parse_serialized_table from lonboard._geoarrow.row_index import add_positional_row_index -from lonboard._serialization import infer_rows_per_chunk +from lonboard._serialization.table.util import infer_rows_per_chunk from lonboard._utils import auto_downcast as _auto_downcast from lonboard._utils import get_geometry_column_index, remove_extension_kwargs from lonboard.traits import ( diff --git a/lonboard/_serialization/table/arro3.py b/lonboard/_serialization/table/arro3.py index bbec30eb..d0ac37ff 100644 --- a/lonboard/_serialization/table/arro3.py +++ b/lonboard/_serialization/table/arro3.py @@ -41,12 +41,9 @@ def __init__(self) -> None: def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes: """Write a single RecordBatch to an Arrow IPC stream in memory and return the bytes.""" - if record_batch.num_rows == 0: - raise ValueError("Batch with 0 rows.") - from arro3.io import write_ipc_stream bio = BytesIO() - write_ipc_stream(record_batch, bio, compression="lz4") + write_ipc_stream(record_batch, bio, compression=None) return bio.getvalue() diff --git a/lonboard/_serialization/table/pyarrow.py b/lonboard/_serialization/table/pyarrow.py index c96b0469..587b8d19 100644 --- a/lonboard/_serialization/table/pyarrow.py +++ b/lonboard/_serialization/table/pyarrow.py @@ -49,17 +49,14 @@ def __init__(self) -> None: def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes: """Write a single RecordBatch to an Arrow IPC stream in memory and return the bytes.""" - if record_batch.num_rows == 0: - raise ValueError("Batch with 0 rows.") - import pyarrow as pa bio = BytesIO() with pa.ipc.new_stream( bio, - record_batch.schema, - options=pa.ipc.IpcWriteOptions(compression="lz4"), + schema=pa.schema(record_batch.schema), + options=pa.ipc.IpcWriteOptions(compression=None), ) as writer: - writer.write_batch(record_batch) + writer.write_batch(pa.record_batch(record_batch)) return bio.getvalue()