From 88c978f824990694daae459a85df7dd7cd3bc82c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 13:02:12 +0800 Subject: [PATCH 01/24] Remove all redundant change in pr 6969 --- paimon-python/pypaimon/common/file_io.py | 4 +- paimon-python/pypaimon/filesystem/local.py | 36 +++ .../pypaimon/write/file_store_commit.py | 232 +++++------------- 3 files changed, 103 insertions(+), 169 deletions(-) create mode 100644 paimon-python/pypaimon/filesystem/local.py diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index 2ec1909306e4..7249ff47e563 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -31,6 +31,7 @@ from pypaimon.common.options import Options from pypaimon.common.options.config import OssOptions, S3Options from pypaimon.common.uri_reader import UriReaderFactory +from pypaimon.filesystem.local import PaimonLocalFileSystem from pypaimon.schema.data_types import DataField, AtomicType, PyarrowFieldParser from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob from pypaimon.table.row.generic_row import GenericRow @@ -183,9 +184,8 @@ def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: ) def _initialize_local_fs(self) -> FileSystem: - from pyarrow.fs import LocalFileSystem - return LocalFileSystem() + return PaimonLocalFileSystem() def new_input_stream(self, path: str): path_str = self.to_filesystem_path(path) diff --git a/paimon-python/pypaimon/filesystem/local.py b/paimon-python/pypaimon/filesystem/local.py new file mode 100644 index 000000000000..5b22676e049c --- /dev/null +++ b/paimon-python/pypaimon/filesystem/local.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import threading +import pyarrow +from pyarrow._fs import LocalFileSystem + +class PaimonLocalFileSystem(LocalFileSystem): + rename_lock = threading.Lock() + + def move(self, src, dst): + with PaimonLocalFileSystem.rename_lock: + file_info = self.get_file_info([dst])[0] + result = file_info.type != pyarrow.fs.FileType.NotFound + if (result is True): + raise Exception("Target file already exists") + + super(PaimonLocalFileSystem, self).move(src, dst) + + + + diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index e55e25f7c830..8f01b2a2cd0b 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -39,32 +39,6 @@ logger = logging.getLogger(__name__) - -class CommitResult: - """Base class for commit results.""" - - def is_success(self) -> bool: - """Returns True if commit was successful.""" - raise NotImplementedError - - -class SuccessResult(CommitResult): - """Result indicating successful commit.""" - - def is_success(self) -> bool: - return True - - -class RetryResult(CommitResult): - - def __init__(self, latest_snapshot, exception: Optional[Exception] = None): - self.latest_snapshot = latest_snapshot - self.exception = exception - - def is_success(self) -> bool: - return False - - class FileStoreCommit: """ Core commit logic for file store operations. @@ -146,7 +120,6 @@ def _try_commit(self, commit_kind, commit_entries, commit_identifier): import threading retry_count = 0 - retry_result = None start_time_ms = int(time.time() * 1000) thread_id = threading.current_thread().name while True: @@ -156,22 +129,19 @@ def _try_commit(self, commit_kind, commit_entries, commit_identifier): commit_entries = self._generate_overwrite_entries() result = self._try_commit_once( - retry_result=retry_result, commit_kind=commit_kind, commit_entries=commit_entries, commit_identifier=commit_identifier, latest_snapshot=latest_snapshot ) - if result.is_success(): - logger.warning( + if result: + logger.info( f"Thread {thread_id}: commit success {latest_snapshot.id + 1 if latest_snapshot else 1} " f"after {retry_count} retries" ) break - retry_result = result - elapsed_ms = int(time.time() * 1000) - start_time_ms if elapsed_ms > self.commit_timeout or retry_count >= self.commit_max_retries: error_msg = ( @@ -179,35 +149,14 @@ def _try_commit(self, commit_kind, commit_entries, commit_identifier): f"after {elapsed_ms} millis with {retry_count} retries, " f"there maybe exist commit conflicts between multiple jobs." ) - if retry_result.exception: - raise RuntimeError(error_msg) from retry_result.exception - else: - raise RuntimeError(error_msg) + raise RuntimeError(error_msg) self._commit_retry_wait(retry_count) retry_count += 1 - def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str, + def _try_commit_once(self, commit_kind: str, commit_entries: List[ManifestEntry], commit_identifier: int, - latest_snapshot: Optional[Snapshot]) -> CommitResult: - start_time_ms = int(time.time() * 1000) - - if retry_result is not None and latest_snapshot is not None: - start_check_snapshot_id = 1 # Snapshot.FIRST_SNAPSHOT_ID - if retry_result.latest_snapshot is not None: - start_check_snapshot_id = retry_result.latest_snapshot.id + 1 - - for snapshot_id in range(start_check_snapshot_id, latest_snapshot.id + 2): - snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id) - if (snapshot and snapshot.commit_user == self.commit_user and - snapshot.commit_identifier == commit_identifier and - snapshot.commit_kind == commit_kind): - logger.info( - f"Commit already completed (snapshot {snapshot_id}), " - f"user: {self.commit_user}, identifier: {commit_identifier}" - ) - return SuccessResult() - + latest_snapshot: Optional[Snapshot]) -> bool: unique_id = uuid.uuid4() base_manifest_list = f"manifest-list-{unique_id}-0" delta_manifest_list = f"manifest-list-{unique_id}-1" @@ -242,103 +191,76 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str else: deleted_file_count += 1 delta_record_count -= entry.file.row_count - - try: - self.manifest_file_manager.write(new_manifest_file, commit_entries) - - # TODO: implement noConflictsOrFail logic - partition_columns = list(zip(*(entry.partition.values for entry in commit_entries))) - partition_min_stats = [min(col) for col in partition_columns] - partition_max_stats = [max(col) for col in partition_columns] - partition_null_counts = [sum(value == 0 for value in col) for col in partition_columns] - if not all(count == 0 for count in partition_null_counts): - raise RuntimeError("Partition value should not be null") - - manifest_file_path = f"{self.manifest_file_manager.manifest_path}/{new_manifest_file}" - file_size = self.table.file_io.get_file_size(manifest_file_path) - - new_manifest_file_meta = ManifestFileMeta( - file_name=new_manifest_file, - file_size=file_size, - num_added_files=added_file_count, - num_deleted_files=deleted_file_count, - partition_stats=SimpleStats( - min_values=GenericRow( - values=partition_min_stats, - fields=self.table.partition_keys_fields - ), - max_values=GenericRow( - values=partition_max_stats, - fields=self.table.partition_keys_fields - ), - null_counts=partition_null_counts, + self.manifest_file_manager.write(new_manifest_file, commit_entries) + # TODO: implement noConflictsOrFail logic + partition_columns = list(zip(*(entry.partition.values for entry in commit_entries))) + partition_min_stats = [min(col) for col in partition_columns] + partition_max_stats = [max(col) for col in partition_columns] + partition_null_counts = [sum(value == 0 for value in col) for col in partition_columns] + if not all(count == 0 for count in partition_null_counts): + raise RuntimeError("Partition value should not be null") + manifest_file_path = f"{self.manifest_file_manager.manifest_path}/{new_manifest_file}" + new_manifest_file_meta = ManifestFileMeta( + file_name=new_manifest_file, + file_size=self.table.file_io.get_file_size(manifest_file_path), + num_added_files=added_file_count, + num_deleted_files=deleted_file_count, + partition_stats=SimpleStats( + min_values=GenericRow( + values=partition_min_stats, + fields=self.table.partition_keys_fields ), - schema_id=self.table.table_schema.id, - ) + max_values=GenericRow( + values=partition_max_stats, + fields=self.table.partition_keys_fields + ), + null_counts=partition_null_counts, + ), + schema_id=self.table.table_schema.id, + ) + self.manifest_list_manager.write(delta_manifest_list, [new_manifest_file_meta]) + + # process existing_manifest + total_record_count = 0 + if latest_snapshot: + existing_manifest_files = self.manifest_list_manager.read_all(latest_snapshot) + previous_record_count = latest_snapshot.total_record_count + if previous_record_count: + total_record_count += previous_record_count + else: + existing_manifest_files = [] + self.manifest_list_manager.write(base_manifest_list, existing_manifest_files) + + total_record_count += delta_record_count + snapshot_data = Snapshot( + version=3, + id=new_snapshot_id, + schema_id=self.table.table_schema.id, + base_manifest_list=base_manifest_list, + delta_manifest_list=delta_manifest_list, + total_record_count=total_record_count, + delta_record_count=delta_record_count, + commit_user=self.commit_user, + commit_identifier=commit_identifier, + commit_kind=commit_kind, + time_millis=int(time.time() * 1000), + next_row_id=next_row_id, + ) - self.manifest_list_manager.write(delta_manifest_list, [new_manifest_file_meta]) - - # process existing_manifest - total_record_count = 0 - if latest_snapshot: - existing_manifest_files = self.manifest_list_manager.read_all(latest_snapshot) - previous_record_count = latest_snapshot.total_record_count - if previous_record_count: - total_record_count += previous_record_count - else: - existing_manifest_files = [] - - self.manifest_list_manager.write(base_manifest_list, existing_manifest_files) - total_record_count += delta_record_count - snapshot_data = Snapshot( - version=3, - id=new_snapshot_id, - schema_id=self.table.table_schema.id, - base_manifest_list=base_manifest_list, - delta_manifest_list=delta_manifest_list, - total_record_count=total_record_count, - delta_record_count=delta_record_count, - commit_user=self.commit_user, - commit_identifier=commit_identifier, - commit_kind=commit_kind, - time_millis=int(time.time() * 1000), - next_row_id=next_row_id, - ) - # Generate partition statistics for the commit - statistics = self._generate_partition_statistics(commit_entries) - except Exception as e: - self._cleanup_preparation_failure(new_manifest_file, delta_manifest_list, - base_manifest_list) - logger.warning(f"Exception occurs when preparing snapshot: {e}", exc_info=True) - raise RuntimeError(f"Failed to prepare snapshot: {e}") + # Generate partition statistics for the commit + statistics = self._generate_partition_statistics(commit_entries) # Use SnapshotCommit for atomic commit try: with self.snapshot_commit: success = self.snapshot_commit.commit(snapshot_data, self.table.current_branch(), statistics) if not success: - # Commit failed, clean up temporary files and retry - commit_time_sec = (int(time.time() * 1000) - start_time_ms) / 1000 - logger.warning( - f"Atomic commit failed for snapshot #{new_snapshot_id} " - f"by user {self.commit_user} " - f"with identifier {commit_identifier} and kind {commit_kind} after {commit_time_sec}s. " - f"Clean up and try again." - ) - self._cleanup_preparation_failure(new_manifest_file, delta_manifest_list, - base_manifest_list) - return RetryResult(latest_snapshot, None) - except Exception as e: + logger.warning(f"Atomic commit failed for snapshot #{new_snapshot_id} failed") + return success + except Exception: # Commit exception, not sure about the situation and should not clean up the files logger.warning("Retry commit for exception") - return RetryResult(latest_snapshot, e) - - logger.warning( - f"Successfully commit snapshot {new_snapshot_id} to table {self.table.identifier} " - f"for snapshot-{new_snapshot_id} by user {self.commit_user} " - + f"with identifier {commit_identifier} and kind {commit_kind}." - ) - return SuccessResult() + return False def _generate_overwrite_entries(self): """Generate commit entries for OVERWRITE mode based on latest snapshot.""" @@ -377,30 +299,6 @@ def _commit_retry_wait(self, retry_count: int): ) time.sleep(total_wait_ms / 1000.0) - def _cleanup_preparation_failure(self, manifest_file: Optional[str], - delta_manifest_list: Optional[str], - base_manifest_list: Optional[str]): - try: - manifest_path = self.manifest_list_manager.manifest_path - - if delta_manifest_list: - manifest_files = self.manifest_list_manager.read(delta_manifest_list) - for manifest_meta in manifest_files: - manifest_file_path = f"{self.manifest_file_manager.manifest_path}/{manifest_meta.file_name}" - self.table.file_io.delete_quietly(manifest_file_path) - delta_path = f"{manifest_path}/{delta_manifest_list}" - self.table.file_io.delete_quietly(delta_path) - - if base_manifest_list: - base_path = f"{manifest_path}/{base_manifest_list}" - self.table.file_io.delete_quietly(base_path) - - if manifest_file: - manifest_file_path = f"{self.manifest_file_manager.manifest_path}/{manifest_file}" - self.table.file_io.delete_quietly(manifest_file_path) - except Exception as e: - logger.warning(f"Failed to clean up temporary files during preparation failure: {e}", exc_info=True) - def abort(self, commit_messages: List[CommitMessage]): """Abort commit and delete files. Uses external_path if available to ensure proper scheme handling.""" for message in commit_messages: From 4ee2c354fe7da8bc9c34f9f1dc3d0f5a478c9235 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 13:03:58 +0800 Subject: [PATCH 02/24] Fix minus --- paimon-python/pypaimon/common/file_io.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index 7249ff47e563..fe8cfb48ee4e 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -40,8 +40,6 @@ class FileIO: - rename_lock = threading.Lock() - def __init__(self, path: str, catalog_options: Options): self.properties = catalog_options self.logger = logging.getLogger(__name__) @@ -255,15 +253,7 @@ def rename(self, src: str, dst: str) -> bool: self.mkdirs(str(dst_parent)) src_str = self.to_filesystem_path(src) - if isinstance(self.filesystem, LocalFileSystem): - if self.exists(dst): - return False - with FileIO.rename_lock: - if self.exists(dst): - return False - self.filesystem.move(src_str, dst_str) - else: - self.filesystem.move(src_str, dst_str) + self.filesystem.move(src_str, dst_str) return True except Exception as e: self.logger.warning(f"Failed to rename {src} to {dst}: {e}") From eea795079072e1d449ce8ac243c2498ed7bf220c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 13:05:03 +0800 Subject: [PATCH 03/24] Fix minus --- paimon-python/pypaimon/filesystem/local.py | 1 + 1 file changed, 1 insertion(+) diff --git a/paimon-python/pypaimon/filesystem/local.py b/paimon-python/pypaimon/filesystem/local.py index 5b22676e049c..7ee78d4c0f18 100644 --- a/paimon-python/pypaimon/filesystem/local.py +++ b/paimon-python/pypaimon/filesystem/local.py @@ -20,6 +20,7 @@ from pyarrow._fs import LocalFileSystem class PaimonLocalFileSystem(LocalFileSystem): + rename_lock = threading.Lock() def move(self, src, dst): From 55326fd0740170ce9c02e70f75d80cd86603cb07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 13:24:43 +0800 Subject: [PATCH 04/24] Fix typo --- paimon-python/pypaimon/common/file_io.py | 3 +-- paimon-python/pypaimon/filesystem/local.py | 7 ++----- paimon-python/pypaimon/write/file_store_commit.py | 1 + 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index fe8cfb48ee4e..556d9e9ae798 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -18,7 +18,6 @@ import logging import os import subprocess -import threading import uuid from pathlib import Path from typing import Any, Dict, List, Optional @@ -26,7 +25,7 @@ import pyarrow from packaging.version import parse -from pyarrow._fs import FileSystem, LocalFileSystem +from pyarrow._fs import FileSystem from pypaimon.common.options import Options from pypaimon.common.options.config import OssOptions, S3Options diff --git a/paimon-python/pypaimon/filesystem/local.py b/paimon-python/pypaimon/filesystem/local.py index 7ee78d4c0f18..7c46c154db58 100644 --- a/paimon-python/pypaimon/filesystem/local.py +++ b/paimon-python/pypaimon/filesystem/local.py @@ -19,6 +19,7 @@ import pyarrow from pyarrow._fs import LocalFileSystem + class PaimonLocalFileSystem(LocalFileSystem): rename_lock = threading.Lock() @@ -30,8 +31,4 @@ def move(self, src, dst): if (result is True): raise Exception("Target file already exists") - super(PaimonLocalFileSystem, self).move(src, dst) - - - - + super(PaimonLocalFileSystem, self).move(src, dst) \ No newline at end of file diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 8f01b2a2cd0b..bbbc019aeccc 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -39,6 +39,7 @@ logger = logging.getLogger(__name__) + class FileStoreCommit: """ Core commit logic for file store operations. From 7bc6c54a26c5b6ab2292cee7d50fae9f12e276ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 13:39:02 +0800 Subject: [PATCH 05/24] Fix type --- paimon-python/pypaimon/filesystem/local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/filesystem/local.py b/paimon-python/pypaimon/filesystem/local.py index 7c46c154db58..c845f8547cce 100644 --- a/paimon-python/pypaimon/filesystem/local.py +++ b/paimon-python/pypaimon/filesystem/local.py @@ -31,4 +31,4 @@ def move(self, src, dst): if (result is True): raise Exception("Target file already exists") - super(PaimonLocalFileSystem, self).move(src, dst) \ No newline at end of file + super(PaimonLocalFileSystem, self).move(src, dst) From 7589628cc901a23c807902a8816553acba0c199f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 14:21:33 +0800 Subject: [PATCH 06/24] Fix comment --- .../read/scanner/full_starting_scanner.py | 6 +- .../pypaimon/write/file_store_commit.py | 104 +++++++++++------- 2 files changed, 70 insertions(+), 40 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py index 3765baffa67f..fecd7a3b8d8c 100755 --- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py @@ -79,7 +79,7 @@ def schema_fields_func(schema_id: int): ) def scan(self) -> Plan: - file_entries = self.plan_files() + file_entries = self.plan_files(None) if not file_entries: return Plan([]) @@ -104,8 +104,8 @@ def scan(self) -> Plan: splits = self._apply_push_down_limit(splits) return Plan(splits) - def plan_files(self) -> List[ManifestEntry]: - latest_snapshot = self.snapshot_manager.get_latest_snapshot() + def plan_files(self, latestSnapshot) -> List[ManifestEntry]: + latest_snapshot = self.snapshot_manager.get_latest_snapshot() if latestSnapshot is None else latestSnapshot if not latest_snapshot: return [] manifest_files = self.manifest_list_manager.read_all(latest_snapshot) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index bbbc019aeccc..91269d4a6904 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -85,8 +85,8 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): )) self._try_commit(commit_kind="APPEND", - commit_entries=commit_entries, - commit_identifier=commit_identifier) + commit_identifier=commit_identifier, + commit_entries_plan=lambda snapshot : commit_entries) def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], commit_identifier: int): """Commit the given commit messages in overwrite mode.""" @@ -113,11 +113,11 @@ def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], c self._try_commit( commit_kind="OVERWRITE", - commit_entries=None, # Will be generated in _try_commit based on latest snapshot - commit_identifier=commit_identifier + commit_identifier=commit_identifier, + commit_entries_plan= lambda snapshot : self._generate_overwrite_entries(snapshot) ) - def _try_commit(self, commit_kind, commit_entries, commit_identifier): + def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan): import threading retry_count = 0 @@ -125,9 +125,7 @@ def _try_commit(self, commit_kind, commit_entries, commit_identifier): thread_id = threading.current_thread().name while True: latest_snapshot = self.snapshot_manager.get_latest_snapshot() - - if commit_kind == "OVERWRITE": - commit_entries = self._generate_overwrite_entries() + commit_entries = commit_entries_plan(latest_snapshot) result = self._try_commit_once( commit_kind=commit_kind, @@ -192,34 +190,40 @@ def _try_commit_once(self, commit_kind: str, else: deleted_file_count += 1 delta_record_count -= entry.file.row_count - self.manifest_file_manager.write(new_manifest_file, commit_entries) - # TODO: implement noConflictsOrFail logic - partition_columns = list(zip(*(entry.partition.values for entry in commit_entries))) - partition_min_stats = [min(col) for col in partition_columns] - partition_max_stats = [max(col) for col in partition_columns] - partition_null_counts = [sum(value == 0 for value in col) for col in partition_columns] - if not all(count == 0 for count in partition_null_counts): - raise RuntimeError("Partition value should not be null") - manifest_file_path = f"{self.manifest_file_manager.manifest_path}/{new_manifest_file}" - new_manifest_file_meta = ManifestFileMeta( - file_name=new_manifest_file, - file_size=self.table.file_io.get_file_size(manifest_file_path), - num_added_files=added_file_count, - num_deleted_files=deleted_file_count, - partition_stats=SimpleStats( - min_values=GenericRow( - values=partition_min_stats, - fields=self.table.partition_keys_fields - ), - max_values=GenericRow( - values=partition_max_stats, - fields=self.table.partition_keys_fields + try: + self.manifest_file_manager.write(new_manifest_file, commit_entries) + # TODO: implement noConflictsOrFail logic + partition_columns = list(zip(*(entry.partition.values for entry in commit_entries))) + partition_min_stats = [min(col) for col in partition_columns] + partition_max_stats = [max(col) for col in partition_columns] + partition_null_counts = [sum(value == 0 for value in col) for col in partition_columns] + if not all(count == 0 for count in partition_null_counts): + raise RuntimeError("Partition value should not be null") + manifest_file_path = f"{self.manifest_file_manager.manifest_path}/{new_manifest_file}" + new_manifest_file_meta = ManifestFileMeta( + file_name=new_manifest_file, + file_size=self.table.file_io.get_file_size(manifest_file_path), + num_added_files=added_file_count, + num_deleted_files=deleted_file_count, + partition_stats=SimpleStats( + min_values=GenericRow( + values=partition_min_stats, + fields=self.table.partition_keys_fields + ), + max_values=GenericRow( + values=partition_max_stats, + fields=self.table.partition_keys_fields + ), + null_counts=partition_null_counts, ), - null_counts=partition_null_counts, - ), - schema_id=self.table.table_schema.id, - ) - self.manifest_list_manager.write(delta_manifest_list, [new_manifest_file_meta]) + schema_id=self.table.table_schema.id, + ) + self.manifest_list_manager.write(delta_manifest_list, [new_manifest_file_meta]) + except Exception as e: + self._cleanup_preparation_failure(new_manifest_file, delta_manifest_list, + base_manifest_list) + logger.warning(f"Exception occurs when preparing snapshot: {e}", exc_info=True) + raise RuntimeError(f"Failed to prepare snapshot: {e}") # process existing_manifest total_record_count = 0 @@ -257,16 +261,42 @@ def _try_commit_once(self, commit_kind: str, success = self.snapshot_commit.commit(snapshot_data, self.table.current_branch(), statistics) if not success: logger.warning(f"Atomic commit failed for snapshot #{new_snapshot_id} failed") + self._cleanup_preparation_failure(new_manifest_file, delta_manifest_list, + base_manifest_list) return success except Exception: # Commit exception, not sure about the situation and should not clean up the files logger.warning("Retry commit for exception") return False - def _generate_overwrite_entries(self): + def _cleanup_preparation_failure(self, manifest_file: Optional[str], + delta_manifest_list: Optional[str], + base_manifest_list: Optional[str]): + try: + manifest_path = self.manifest_list_manager.manifest_path + + if delta_manifest_list: + manifest_files = self.manifest_list_manager.read(delta_manifest_list) + for manifest_meta in manifest_files: + manifest_file_path = f"{self.manifest_file_manager.manifest_path}/{manifest_meta.file_name}" + self.table.file_io.delete_quietly(manifest_file_path) + delta_path = f"{manifest_path}/{delta_manifest_list}" + self.table.file_io.delete_quietly(delta_path) + + if base_manifest_list: + base_path = f"{manifest_path}/{base_manifest_list}" + self.table.file_io.delete_quietly(base_path) + + if manifest_file: + manifest_file_path = f"{self.manifest_file_manager.manifest_path}/{manifest_file}" + self.table.file_io.delete_quietly(manifest_file_path) + except Exception as e: + logger.warning(f"Failed to clean up temporary files during preparation failure: {e}", exc_info=True) + + def _generate_overwrite_entries(self, latestSnapshot): """Generate commit entries for OVERWRITE mode based on latest snapshot.""" entries = [] - current_entries = FullStartingScanner(self.table, self._overwrite_partition_filter, None).plan_files() + current_entries = FullStartingScanner(self.table, self._overwrite_partition_filter, None).plan_files(latestSnapshot) for entry in current_entries: entry.kind = 1 # DELETE entries.append(entry) From 400278a322eb6cc67f92c9982c4e2e81bbd812f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 14:27:17 +0800 Subject: [PATCH 07/24] Fix minus --- .../pypaimon/write/file_store_commit.py | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 91269d4a6904..c19667e9b39c 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -219,42 +219,42 @@ def _try_commit_once(self, commit_kind: str, schema_id=self.table.table_schema.id, ) self.manifest_list_manager.write(delta_manifest_list, [new_manifest_file_meta]) + + # process existing_manifest + total_record_count = 0 + if latest_snapshot: + existing_manifest_files = self.manifest_list_manager.read_all(latest_snapshot) + previous_record_count = latest_snapshot.total_record_count + if previous_record_count: + total_record_count += previous_record_count + else: + existing_manifest_files = [] + self.manifest_list_manager.write(base_manifest_list, existing_manifest_files) + + total_record_count += delta_record_count + snapshot_data = Snapshot( + version=3, + id=new_snapshot_id, + schema_id=self.table.table_schema.id, + base_manifest_list=base_manifest_list, + delta_manifest_list=delta_manifest_list, + total_record_count=total_record_count, + delta_record_count=delta_record_count, + commit_user=self.commit_user, + commit_identifier=commit_identifier, + commit_kind=commit_kind, + time_millis=int(time.time() * 1000), + next_row_id=next_row_id, + ) + + # Generate partition statistics for the commit + statistics = self._generate_partition_statistics(commit_entries) except Exception as e: self._cleanup_preparation_failure(new_manifest_file, delta_manifest_list, base_manifest_list) logger.warning(f"Exception occurs when preparing snapshot: {e}", exc_info=True) raise RuntimeError(f"Failed to prepare snapshot: {e}") - # process existing_manifest - total_record_count = 0 - if latest_snapshot: - existing_manifest_files = self.manifest_list_manager.read_all(latest_snapshot) - previous_record_count = latest_snapshot.total_record_count - if previous_record_count: - total_record_count += previous_record_count - else: - existing_manifest_files = [] - self.manifest_list_manager.write(base_manifest_list, existing_manifest_files) - - total_record_count += delta_record_count - snapshot_data = Snapshot( - version=3, - id=new_snapshot_id, - schema_id=self.table.table_schema.id, - base_manifest_list=base_manifest_list, - delta_manifest_list=delta_manifest_list, - total_record_count=total_record_count, - delta_record_count=delta_record_count, - commit_user=self.commit_user, - commit_identifier=commit_identifier, - commit_kind=commit_kind, - time_millis=int(time.time() * 1000), - next_row_id=next_row_id, - ) - - # Generate partition statistics for the commit - statistics = self._generate_partition_statistics(commit_entries) - # Use SnapshotCommit for atomic commit try: with self.snapshot_commit: From 08b5777d8f2fbaf652671d0c82c861878e3e0cbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 14:28:42 +0800 Subject: [PATCH 08/24] Fix minus --- .../pypaimon/write/file_store_commit.py | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index c19667e9b39c..aaeb5105ae91 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -269,6 +269,25 @@ def _try_commit_once(self, commit_kind: str, logger.warning("Retry commit for exception") return False + def _generate_overwrite_entries(self, latestSnapshot): + """Generate commit entries for OVERWRITE mode based on latest snapshot.""" + entries = [] + current_entries = FullStartingScanner(self.table, self._overwrite_partition_filter, None).plan_files(latestSnapshot) + for entry in current_entries: + entry.kind = 1 # DELETE + entries.append(entry) + for msg in self._overwrite_commit_messages: + partition = GenericRow(list(msg.partition), self.table.partition_keys_fields) + for file in msg.new_files: + entries.append(ManifestEntry( + kind=0, # ADD + partition=partition, + bucket=msg.bucket, + total_buckets=self.table.total_buckets, + file=file + )) + return entries + def _cleanup_preparation_failure(self, manifest_file: Optional[str], delta_manifest_list: Optional[str], base_manifest_list: Optional[str]): @@ -293,25 +312,6 @@ def _cleanup_preparation_failure(self, manifest_file: Optional[str], except Exception as e: logger.warning(f"Failed to clean up temporary files during preparation failure: {e}", exc_info=True) - def _generate_overwrite_entries(self, latestSnapshot): - """Generate commit entries for OVERWRITE mode based on latest snapshot.""" - entries = [] - current_entries = FullStartingScanner(self.table, self._overwrite_partition_filter, None).plan_files(latestSnapshot) - for entry in current_entries: - entry.kind = 1 # DELETE - entries.append(entry) - for msg in self._overwrite_commit_messages: - partition = GenericRow(list(msg.partition), self.table.partition_keys_fields) - for file in msg.new_files: - entries.append(ManifestEntry( - kind=0, # ADD - partition=partition, - bucket=msg.bucket, - total_buckets=self.table.total_buckets, - file=file - )) - return entries - def _commit_retry_wait(self, retry_count: int): import threading thread_id = threading.get_ident() From ec6717bbbd34479b8cba9b676e32c0b4ff634357 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 14:30:02 +0800 Subject: [PATCH 09/24] Fix minus --- paimon-python/pypaimon/write/file_store_commit.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index aaeb5105ae91..0a648d5997dd 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -247,7 +247,7 @@ def _try_commit_once(self, commit_kind: str, next_row_id=next_row_id, ) - # Generate partition statistics for the commit + # Generate partition statistics for the commit statistics = self._generate_partition_statistics(commit_entries) except Exception as e: self._cleanup_preparation_failure(new_manifest_file, delta_manifest_list, From d763adbd74a053109a263dc749d4b1fc631cb293 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 14:30:20 +0800 Subject: [PATCH 10/24] Fix minus --- paimon-python/pypaimon/write/file_store_commit.py | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 0a648d5997dd..0de547b19048 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -246,7 +246,6 @@ def _try_commit_once(self, commit_kind: str, time_millis=int(time.time() * 1000), next_row_id=next_row_id, ) - # Generate partition statistics for the commit statistics = self._generate_partition_statistics(commit_entries) except Exception as e: From 1c72096ed24632ef36f8d9bd6ee7d5a9f2b6e370 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 14:31:05 +0800 Subject: [PATCH 11/24] Fix minus --- .../pypaimon/write/file_store_commit.py | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 0de547b19048..9698f556f165 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -287,6 +287,24 @@ def _generate_overwrite_entries(self, latestSnapshot): )) return entries + def _commit_retry_wait(self, retry_count: int): + import threading + thread_id = threading.get_ident() + + retry_wait_ms = min( + self.commit_min_retry_wait * (2 ** retry_count), + self.commit_max_retry_wait + ) + + jitter_ms = random.randint(0, max(1, int(retry_wait_ms * 0.2))) + total_wait_ms = retry_wait_ms + jitter_ms + + logger.debug( + f"Thread {thread_id}: Waiting {total_wait_ms}ms before retry (base: {retry_wait_ms}ms, " + f"jitter: {jitter_ms}ms)" + ) + time.sleep(total_wait_ms / 1000.0) + def _cleanup_preparation_failure(self, manifest_file: Optional[str], delta_manifest_list: Optional[str], base_manifest_list: Optional[str]): @@ -311,24 +329,6 @@ def _cleanup_preparation_failure(self, manifest_file: Optional[str], except Exception as e: logger.warning(f"Failed to clean up temporary files during preparation failure: {e}", exc_info=True) - def _commit_retry_wait(self, retry_count: int): - import threading - thread_id = threading.get_ident() - - retry_wait_ms = min( - self.commit_min_retry_wait * (2 ** retry_count), - self.commit_max_retry_wait - ) - - jitter_ms = random.randint(0, max(1, int(retry_wait_ms * 0.2))) - total_wait_ms = retry_wait_ms + jitter_ms - - logger.debug( - f"Thread {thread_id}: Waiting {total_wait_ms}ms before retry (base: {retry_wait_ms}ms, " - f"jitter: {jitter_ms}ms)" - ) - time.sleep(total_wait_ms / 1000.0) - def abort(self, commit_messages: List[CommitMessage]): """Abort commit and delete files. Uses external_path if available to ensure proper scheme handling.""" for message in commit_messages: From 4a330818a3b5698c9894550bbbbfdf3ce9f04422 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 14:36:41 +0800 Subject: [PATCH 12/24] Fix comment --- paimon-python/pypaimon/write/file_store_commit.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 9698f556f165..92394bb40ac2 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -249,8 +249,7 @@ def _try_commit_once(self, commit_kind: str, # Generate partition statistics for the commit statistics = self._generate_partition_statistics(commit_entries) except Exception as e: - self._cleanup_preparation_failure(new_manifest_file, delta_manifest_list, - base_manifest_list) + self._cleanup_preparation_failure(delta_manifest_list, base_manifest_list) logger.warning(f"Exception occurs when preparing snapshot: {e}", exc_info=True) raise RuntimeError(f"Failed to prepare snapshot: {e}") @@ -260,8 +259,7 @@ def _try_commit_once(self, commit_kind: str, success = self.snapshot_commit.commit(snapshot_data, self.table.current_branch(), statistics) if not success: logger.warning(f"Atomic commit failed for snapshot #{new_snapshot_id} failed") - self._cleanup_preparation_failure(new_manifest_file, delta_manifest_list, - base_manifest_list) + self._cleanup_preparation_failure(delta_manifest_list, base_manifest_list) return success except Exception: # Commit exception, not sure about the situation and should not clean up the files @@ -305,7 +303,7 @@ def _commit_retry_wait(self, retry_count: int): ) time.sleep(total_wait_ms / 1000.0) - def _cleanup_preparation_failure(self, manifest_file: Optional[str], + def _cleanup_preparation_failure(self, delta_manifest_list: Optional[str], base_manifest_list: Optional[str]): try: @@ -322,10 +320,6 @@ def _cleanup_preparation_failure(self, manifest_file: Optional[str], if base_manifest_list: base_path = f"{manifest_path}/{base_manifest_list}" self.table.file_io.delete_quietly(base_path) - - if manifest_file: - manifest_file_path = f"{self.manifest_file_manager.manifest_path}/{manifest_file}" - self.table.file_io.delete_quietly(manifest_file_path) except Exception as e: logger.warning(f"Failed to clean up temporary files during preparation failure: {e}", exc_info=True) From dd7156736ab41dfd0a3802b899fd73537e2ce14a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 15:11:02 +0800 Subject: [PATCH 13/24] Fix type --- paimon-python/pypaimon/write/file_store_commit.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 92394bb40ac2..9e4047d70ecc 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -86,7 +86,7 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): self._try_commit(commit_kind="APPEND", commit_identifier=commit_identifier, - commit_entries_plan=lambda snapshot : commit_entries) + commit_entries_plan=lambda snapshot: commit_entries) def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], commit_identifier: int): """Commit the given commit messages in overwrite mode.""" @@ -114,7 +114,7 @@ def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], c self._try_commit( commit_kind="OVERWRITE", commit_identifier=commit_identifier, - commit_entries_plan= lambda snapshot : self._generate_overwrite_entries(snapshot) + commit_entries_plan=lambda snapshot: self._generate_overwrite_entries(snapshot) ) def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan): @@ -269,7 +269,8 @@ def _try_commit_once(self, commit_kind: str, def _generate_overwrite_entries(self, latestSnapshot): """Generate commit entries for OVERWRITE mode based on latest snapshot.""" entries = [] - current_entries = FullStartingScanner(self.table, self._overwrite_partition_filter, None).plan_files(latestSnapshot) + current_entries = (FullStartingScanner(self.table, self._overwrite_partition_filter, None) + .plan_files(latestSnapshot)) for entry in current_entries: entry.kind = 1 # DELETE entries.append(entry) From 45151f754dd6cbdd4a6cdbb937e610a9fc9331de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 15:34:49 +0800 Subject: [PATCH 14/24] Fix comment --- .../pypaimon/read/scanner/full_starting_scanner.py | 6 +++--- paimon-python/pypaimon/write/file_store_commit.py | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py index fecd7a3b8d8c..3765baffa67f 100755 --- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py @@ -79,7 +79,7 @@ def schema_fields_func(schema_id: int): ) def scan(self) -> Plan: - file_entries = self.plan_files(None) + file_entries = self.plan_files() if not file_entries: return Plan([]) @@ -104,8 +104,8 @@ def scan(self) -> Plan: splits = self._apply_push_down_limit(splits) return Plan(splits) - def plan_files(self, latestSnapshot) -> List[ManifestEntry]: - latest_snapshot = self.snapshot_manager.get_latest_snapshot() if latestSnapshot is None else latestSnapshot + def plan_files(self) -> List[ManifestEntry]: + latest_snapshot = self.snapshot_manager.get_latest_snapshot() if not latest_snapshot: return [] manifest_files = self.manifest_list_manager.read_all(latest_snapshot) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 9e4047d70ecc..83f8b58c71fe 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -269,8 +269,9 @@ def _try_commit_once(self, commit_kind: str, def _generate_overwrite_entries(self, latestSnapshot): """Generate commit entries for OVERWRITE mode based on latest snapshot.""" entries = [] - current_entries = (FullStartingScanner(self.table, self._overwrite_partition_filter, None) - .plan_files(latestSnapshot)) + current_entries = [] if latestSnapshot is None \ + else (FullStartingScanner(self.table, self._overwrite_partition_filter, None). + read_manifest_entries(self.manifest_list_manager.read_all(latestSnapshot))) for entry in current_entries: entry.kind = 1 # DELETE entries.append(entry) From cf016c2f8e6813b30e5a5cc1efa11e6b3a2126e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 17:18:18 +0800 Subject: [PATCH 15/24] Fix comment --- .../pypaimon/tests/reader_append_only_test.py | 55 +++++++++++++++ .../pypaimon/write/file_store_commit.py | 68 +++++++++++++++++-- 2 files changed, 116 insertions(+), 7 deletions(-) diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py b/paimon-python/pypaimon/tests/reader_append_only_test.py index b47f5d1f67ff..ba66fa85acea 100644 --- a/paimon-python/pypaimon/tests/reader_append_only_test.py +++ b/paimon-python/pypaimon/tests/reader_append_only_test.py @@ -21,6 +21,7 @@ import tempfile import time import unittest +from typing import Optional import numpy as np import pandas as pd @@ -28,7 +29,11 @@ from pypaimon import CatalogFactory, Schema from pypaimon.common.options.core_options import CoreOptions +from pypaimon.manifest.schema.manifest_entry import ManifestEntry +from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER from pypaimon.snapshot.snapshot_manager import SnapshotManager +from pypaimon.table.row.generic_row import GenericRow +from pypaimon.write.file_store_commit import RetryResult class AoReaderTest(unittest.TestCase): @@ -153,6 +158,56 @@ def test_append_only_multi_write_once_commit(self): actual = self._read_test_table(read_builder).sort_by('user_id') self.assertEqual(actual, self.expected) + def test_commit_retry_filter(self): + schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) + self.catalog.create_table('default.test_append_only_multi_once_commit', schema, False) + table = self.catalog.get_table('default.test_append_only_multi_once_commit') + write_builder = table.new_batch_write_builder() + + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + data1 = { + 'user_id': [1, 2, 3, 4], + 'item_id': [1001, 1002, 1003, 1004], + 'behavior': ['a', 'b', 'c', None], + 'dt': ['p1', 'p1', 'p2', 'p1'], + } + pa_table1 = pa.Table.from_pydict(data1, schema=self.pa_schema) + data2 = { + 'user_id': [5, 6, 7, 8], + 'item_id': [1005, 1006, 1007, 1008], + 'behavior': ['e', 'f', 'g', 'h'], + 'dt': ['p2', 'p1', 'p2', 'p2'], + } + pa_table2 = pa.Table.from_pydict(data2, schema=self.pa_schema) + + table_write.write_arrow(pa_table1) + table_write.write_arrow(pa_table2) + + messages = table_write.prepare_commit() + table_commit.commit(messages) + table_write.close() + + snapshot_manager = SnapshotManager(table) + latest_snapshot = snapshot_manager.get_latest_snapshot() + commit_entries = [] + for msg in messages: + partition = GenericRow(list(msg.partition), table.partition_keys_fields) + for file in msg.new_files: + commit_entries.append(ManifestEntry( + kind=0, + partition=partition, + bucket=msg.bucket, + total_buckets=table.total_buckets, + file=file + )) + # mock retry + table_commit.file_store_commit._try_commit_once(RetryResult(None), "APPEND", commit_entries, BATCH_COMMIT_IDENTIFIER, latest_snapshot) + table_commit.close() + read_builder = table.new_read_builder() + actual = self._read_test_table(read_builder).sort_by('user_id') + self.assertEqual(actual, self.expected) + def test_over_1000_cols_read(self): num_rows = 1 num_cols = 10 diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 83f8b58c71fe..ed148e060444 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -40,6 +40,30 @@ logger = logging.getLogger(__name__) +class CommitResult: + """Base class for commit results.""" + + def is_success(self) -> bool: + """Returns True if commit was successful.""" + raise NotImplementedError + + +class SuccessResult(CommitResult): + """Result indicating successful commit.""" + + def is_success(self) -> bool: + return True + + +class RetryResult(CommitResult): + + def __init__(self, latest_snapshot, exception: Optional[Exception] = None): + self.latest_snapshot = latest_snapshot + self.exception = exception + + def is_success(self) -> bool: + return False + class FileStoreCommit: """ Core commit logic for file store operations. @@ -121,6 +145,7 @@ def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan): import threading retry_count = 0 + retry_result = None start_time_ms = int(time.time() * 1000) thread_id = threading.current_thread().name while True: @@ -128,19 +153,22 @@ def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan): commit_entries = commit_entries_plan(latest_snapshot) result = self._try_commit_once( + retry_result=retry_result, commit_kind=commit_kind, commit_entries=commit_entries, commit_identifier=commit_identifier, latest_snapshot=latest_snapshot ) - if result: + if result.is_success(): logger.info( f"Thread {thread_id}: commit success {latest_snapshot.id + 1 if latest_snapshot else 1} " f"after {retry_count} retries" ) break + retry_result = result + elapsed_ms = int(time.time() * 1000) - start_time_ms if elapsed_ms > self.commit_timeout or retry_count >= self.commit_max_retries: error_msg = ( @@ -148,14 +176,33 @@ def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan): f"after {elapsed_ms} millis with {retry_count} retries, " f"there maybe exist commit conflicts between multiple jobs." ) - raise RuntimeError(error_msg) + if retry_result.exception: + raise RuntimeError(error_msg) from retry_result.exception + else: + raise RuntimeError(error_msg) self._commit_retry_wait(retry_count) retry_count += 1 - def _try_commit_once(self, commit_kind: str, + def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str, commit_entries: List[ManifestEntry], commit_identifier: int, - latest_snapshot: Optional[Snapshot]) -> bool: + latest_snapshot: Optional[Snapshot]) -> CommitResult: + if retry_result is not None and latest_snapshot is not None: + start_check_snapshot_id = 1 # Snapshot.FIRST_SNAPSHOT_ID + if retry_result.latest_snapshot is not None: + start_check_snapshot_id = retry_result.latest_snapshot.id + 1 + + for snapshot_id in range(start_check_snapshot_id, latest_snapshot.id + 2): + snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id) + if (snapshot and snapshot.commit_user == self.commit_user and + snapshot.commit_identifier == commit_identifier and + snapshot.commit_kind == commit_kind): + logger.info( + f"Commit already completed (snapshot {snapshot_id}), " + f"user: {self.commit_user}, identifier: {commit_identifier}" + ) + return SuccessResult() + unique_id = uuid.uuid4() base_manifest_list = f"manifest-list-{unique_id}-0" delta_manifest_list = f"manifest-list-{unique_id}-1" @@ -260,11 +307,18 @@ def _try_commit_once(self, commit_kind: str, if not success: logger.warning(f"Atomic commit failed for snapshot #{new_snapshot_id} failed") self._cleanup_preparation_failure(delta_manifest_list, base_manifest_list) - return success - except Exception: + return RetryResult(latest_snapshot, None) + except Exception as e: # Commit exception, not sure about the situation and should not clean up the files logger.warning("Retry commit for exception") - return False + return RetryResult(latest_snapshot, e) + + logger.warning( + f"Successfully commit snapshot {new_snapshot_id} to table {self.table.identifier} " + f"for snapshot-{new_snapshot_id} by user {self.commit_user} " + + f"with identifier {commit_identifier} and kind {commit_kind}." + ) + return SuccessResult() def _generate_overwrite_entries(self, latestSnapshot): """Generate commit entries for OVERWRITE mode based on latest snapshot.""" From 6d89f40bfe112e13e9145e64894be0e7ee58a3f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 17:21:28 +0800 Subject: [PATCH 16/24] Fix comment --- paimon-python/pypaimon/tests/reader_append_only_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py b/paimon-python/pypaimon/tests/reader_append_only_test.py index ba66fa85acea..b5998cbe3709 100644 --- a/paimon-python/pypaimon/tests/reader_append_only_test.py +++ b/paimon-python/pypaimon/tests/reader_append_only_test.py @@ -202,7 +202,8 @@ def test_commit_retry_filter(self): file=file )) # mock retry - table_commit.file_store_commit._try_commit_once(RetryResult(None), "APPEND", commit_entries, BATCH_COMMIT_IDENTIFIER, latest_snapshot) + success = table_commit.file_store_commit._try_commit_once(RetryResult(None), "APPEND", commit_entries, BATCH_COMMIT_IDENTIFIER, latest_snapshot) + self.assertTrue(success.is_success()) table_commit.close() read_builder = table.new_read_builder() actual = self._read_test_table(read_builder).sort_by('user_id') From aedf9767e5317dc3d35951cc162e7cea13fb3bfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 18:04:31 +0800 Subject: [PATCH 17/24] Fix comment --- .../pypaimon/write/file_store_commit.py | 48 ++++++++++--------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index ed148e060444..f08bcedb8244 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -132,13 +132,10 @@ def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], c raise RuntimeError(f"Trying to overwrite partition {overwrite_partition}, but the changes " f"in {msg.partition} does not belong to this partition") - self._overwrite_partition_filter = partition_filter - self._overwrite_commit_messages = commit_messages - self._try_commit( commit_kind="OVERWRITE", commit_identifier=commit_identifier, - commit_entries_plan=lambda snapshot: self._generate_overwrite_entries(snapshot) + commit_entries_plan=lambda snapshot: self._generate_overwrite_entries(snapshot, partition_filter, commit_messages) ) def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan): @@ -187,22 +184,9 @@ def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan): def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str, commit_entries: List[ManifestEntry], commit_identifier: int, latest_snapshot: Optional[Snapshot]) -> CommitResult: - if retry_result is not None and latest_snapshot is not None: - start_check_snapshot_id = 1 # Snapshot.FIRST_SNAPSHOT_ID - if retry_result.latest_snapshot is not None: - start_check_snapshot_id = retry_result.latest_snapshot.id + 1 - - for snapshot_id in range(start_check_snapshot_id, latest_snapshot.id + 2): - snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id) - if (snapshot and snapshot.commit_user == self.commit_user and - snapshot.commit_identifier == commit_identifier and - snapshot.commit_kind == commit_kind): - logger.info( - f"Commit already completed (snapshot {snapshot_id}), " - f"user: {self.commit_user}, identifier: {commit_identifier}" - ) - return SuccessResult() - + if self._duplicate_commit(retry_result, latest_snapshot, commit_identifier, commit_kind): + return SuccessResult() + unique_id = uuid.uuid4() base_manifest_list = f"manifest-list-{unique_id}-0" delta_manifest_list = f"manifest-list-{unique_id}-1" @@ -320,16 +304,34 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str ) return SuccessResult() - def _generate_overwrite_entries(self, latestSnapshot): + def _duplicate_commit(self, retry_result, latest_snapshot, commit_identifier, commit_kind) -> bool: + if retry_result is not None and latest_snapshot is not None: + start_check_snapshot_id = 1 # Snapshot.FIRST_SNAPSHOT_ID + if retry_result.latest_snapshot is not None: + start_check_snapshot_id = retry_result.latest_snapshot.id + 1 + + for snapshot_id in range(start_check_snapshot_id, latest_snapshot.id + 2): + snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id) + if (snapshot and snapshot.commit_user == self.commit_user and + snapshot.commit_identifier == commit_identifier and + snapshot.commit_kind == commit_kind): + logger.info( + f"Commit already completed (snapshot {snapshot_id}), " + f"user: {self.commit_user}, identifier: {commit_identifier}" + ) + return True + return False + + def _generate_overwrite_entries(self, latestSnapshot, partition_filter, commit_messages): """Generate commit entries for OVERWRITE mode based on latest snapshot.""" entries = [] current_entries = [] if latestSnapshot is None \ - else (FullStartingScanner(self.table, self._overwrite_partition_filter, None). + else (FullStartingScanner(self.table, partition_filter, None). read_manifest_entries(self.manifest_list_manager.read_all(latestSnapshot))) for entry in current_entries: entry.kind = 1 # DELETE entries.append(entry) - for msg in self._overwrite_commit_messages: + for msg in commit_messages: partition = GenericRow(list(msg.partition), self.table.partition_keys_fields) for file in msg.new_files: entries.append(ManifestEntry( From d363b0146321fc9604648c45e31a841c70d5d86d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 18:22:32 +0800 Subject: [PATCH 18/24] Fix comment --- paimon-python/pypaimon/write/file_store_commit.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index f08bcedb8244..57b4420b3cc9 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -310,7 +310,7 @@ def _duplicate_commit(self, retry_result, latest_snapshot, commit_identifier, co if retry_result.latest_snapshot is not None: start_check_snapshot_id = retry_result.latest_snapshot.id + 1 - for snapshot_id in range(start_check_snapshot_id, latest_snapshot.id + 2): + for snapshot_id in range(start_check_snapshot_id, latest_snapshot.id + 1): snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id) if (snapshot and snapshot.commit_user == self.commit_user and snapshot.commit_identifier == commit_identifier and From c1cbe9576f278efb46df784184f58b9fde9427ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 18:39:12 +0800 Subject: [PATCH 19/24] Fix typo --- paimon-python/pypaimon/tests/reader_append_only_test.py | 4 ++-- paimon-python/pypaimon/write/file_store_commit.py | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py b/paimon-python/pypaimon/tests/reader_append_only_test.py index b5998cbe3709..3de622a78dae 100644 --- a/paimon-python/pypaimon/tests/reader_append_only_test.py +++ b/paimon-python/pypaimon/tests/reader_append_only_test.py @@ -21,7 +21,6 @@ import tempfile import time import unittest -from typing import Optional import numpy as np import pandas as pd @@ -202,7 +201,8 @@ def test_commit_retry_filter(self): file=file )) # mock retry - success = table_commit.file_store_commit._try_commit_once(RetryResult(None), "APPEND", commit_entries, BATCH_COMMIT_IDENTIFIER, latest_snapshot) + success = table_commit.file_store_commit._try_commit_once(RetryResult(None), "APPEND", + commit_entries, BATCH_COMMIT_IDENTIFIER, latest_snapshot) self.assertTrue(success.is_success()) table_commit.close() read_builder = table.new_read_builder() diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 57b4420b3cc9..6a0ba66eaa6f 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -64,6 +64,7 @@ def __init__(self, latest_snapshot, exception: Optional[Exception] = None): def is_success(self) -> bool: return False + class FileStoreCommit: """ Core commit logic for file store operations. @@ -135,7 +136,8 @@ def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], c self._try_commit( commit_kind="OVERWRITE", commit_identifier=commit_identifier, - commit_entries_plan=lambda snapshot: self._generate_overwrite_entries(snapshot, partition_filter, commit_messages) + commit_entries_plan=lambda snapshot: self._generate_overwrite_entries( + snapshot, partition_filter, commit_messages) ) def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan): From 84b39022f15bbde763f73773a0b1461070600bd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 18:47:38 +0800 Subject: [PATCH 20/24] Fix minus --- paimon-python/pypaimon/write/file_store_commit.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 6a0ba66eaa6f..4e62471d8c47 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -306,7 +306,7 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str ) return SuccessResult() - def _duplicate_commit(self, retry_result, latest_snapshot, commit_identifier, commit_kind) -> bool: + def _is_duplicate_commit(self, retry_result, latest_snapshot, commit_identifier, commit_kind) -> bool: if retry_result is not None and latest_snapshot is not None: start_check_snapshot_id = 1 # Snapshot.FIRST_SNAPSHOT_ID if retry_result.latest_snapshot is not None: From af9395189bfe4b43d6f5e0c2c9e259468afd682e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 18:48:11 +0800 Subject: [PATCH 21/24] Fix minus --- paimon-python/pypaimon/write/file_store_commit.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 4e62471d8c47..6c65c5173ab2 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -186,7 +186,7 @@ def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan): def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str, commit_entries: List[ManifestEntry], commit_identifier: int, latest_snapshot: Optional[Snapshot]) -> CommitResult: - if self._duplicate_commit(retry_result, latest_snapshot, commit_identifier, commit_kind): + if self._is_duplicate_commit(retry_result, latest_snapshot, commit_identifier, commit_kind): return SuccessResult() unique_id = uuid.uuid4() From b08322b029859a7b0adab69637f9186736cae669 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 8 Jan 2026 19:15:14 +0800 Subject: [PATCH 22/24] Fix minus --- paimon-python/pypaimon/tests/reader_append_only_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py b/paimon-python/pypaimon/tests/reader_append_only_test.py index 3de622a78dae..aa4ae7c47249 100644 --- a/paimon-python/pypaimon/tests/reader_append_only_test.py +++ b/paimon-python/pypaimon/tests/reader_append_only_test.py @@ -201,8 +201,8 @@ def test_commit_retry_filter(self): file=file )) # mock retry - success = table_commit.file_store_commit._try_commit_once(RetryResult(None), "APPEND", - commit_entries, BATCH_COMMIT_IDENTIFIER, latest_snapshot) + success = (table_commit.file_store_commit. + _try_commit_once(RetryResult(None), "APPEND", commit_entries, BATCH_COMMIT_IDENTIFIER, latest_snapshot)) self.assertTrue(success.is_success()) table_commit.close() read_builder = table.new_read_builder() From 2137e1bf4165409627ec2ec1a3b28d31943e1fca Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Thu, 8 Jan 2026 21:41:28 +0800 Subject: [PATCH 23/24] fix --- paimon-python/pypaimon/tests/reader_append_only_test.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py b/paimon-python/pypaimon/tests/reader_append_only_test.py index aa4ae7c47249..1fe1ed6a7bb6 100644 --- a/paimon-python/pypaimon/tests/reader_append_only_test.py +++ b/paimon-python/pypaimon/tests/reader_append_only_test.py @@ -201,8 +201,12 @@ def test_commit_retry_filter(self): file=file )) # mock retry - success = (table_commit.file_store_commit. - _try_commit_once(RetryResult(None), "APPEND", commit_entries, BATCH_COMMIT_IDENTIFIER, latest_snapshot)) + success = table_commit.file_store_commit._try_commit_once( + RetryResult(None), + "APPEND", + commit_entries, + BATCH_COMMIT_IDENTIFIER, + latest_snapshot) self.assertTrue(success.is_success()) table_commit.close() read_builder = table.new_read_builder() From 37a87d01654a9c00b9fa92d9366eed77dffe2fa7 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Thu, 8 Jan 2026 22:06:53 +0800 Subject: [PATCH 24/24] fix --- paimon-python/pypaimon/tests/reader_append_only_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py b/paimon-python/pypaimon/tests/reader_append_only_test.py index 1fe1ed6a7bb6..d65658ef5c55 100644 --- a/paimon-python/pypaimon/tests/reader_append_only_test.py +++ b/paimon-python/pypaimon/tests/reader_append_only_test.py @@ -159,8 +159,8 @@ def test_append_only_multi_write_once_commit(self): def test_commit_retry_filter(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) - self.catalog.create_table('default.test_append_only_multi_once_commit', schema, False) - table = self.catalog.get_table('default.test_append_only_multi_once_commit') + self.catalog.create_table('default.test_commit_retry_filter', schema, False) + table = self.catalog.get_table('default.test_commit_retry_filter') write_builder = table.new_batch_write_builder() table_write = write_builder.new_write()