|
23 | 23 | import typing as t |
24 | 24 | from pathlib import Path |
25 | 25 |
|
| 26 | +import orjsonl |
| 27 | +import pandas as pd |
| 28 | +from sqlalchemy_cratedb import insert_bulk |
26 | 29 | from tqdm.contrib.logging import logging_redirect_tqdm |
| 30 | + |
27 | 31 | if t.TYPE_CHECKING: |
28 | 32 | import polars as pl |
29 | 33 |
|
@@ -247,20 +251,25 @@ def _load(self, path_schema: Path, path_data: Path): |
247 | 251 |
|
248 | 252 | # Load data. |
249 | 253 | try: |
250 | | - df: "pl.DataFrame" = self.load_table(path_table_data) |
251 | | - df.write_database(table_name=tablename_restored, connection=self.dburi, if_table_exists="replace") |
| 254 | + df: "pd.DataFrame" = pd.DataFrame.from_records(self.load_table(path_table_data)) |
| 255 | + df.to_sql( |
| 256 | + name=tablename_restored, |
| 257 | + con=self.adapter.engine, |
| 258 | + index=False, |
| 259 | + if_exists="replace", |
| 260 | + method=insert_bulk, |
| 261 | + ) |
252 | 262 | except Exception as ex: |
253 | 263 | error_logger(self.debug)(f"Importing table failed: {tablename}. Reason: {ex}") |
254 | 264 |
|
255 | 265 | logger.info(f"Successfully imported {table_count} system tables") |
256 | | - # df.to_pandas().to_sql(name=tablename, con=self.adapter.engine, if_exists="append", index=False) # noqa: ERA001, E501 |
257 | 266 |
|
258 | | - def load_table(self, path: Path) -> "pl.DataFrame": |
| 267 | + def load_table(self, path: Path) -> t.List: |
259 | 268 | import polars as pl |
260 | 269 |
|
261 | 270 | if path.suffix in [".jsonl"]: |
262 | | - return pl.read_ndjson(path) |
| 271 | + return orjsonl.load(path) |
263 | 272 | elif path.suffix in [".parquet", ".pq"]: |
264 | | - return pl.read_parquet(path) |
| 273 | + return pl.read_parquet(path).to_pandas().to_dict("records") |
265 | 274 | else: |
266 | 275 | raise NotImplementedError(f"Input format not implemented: {path.suffix}") |
0 commit comments