diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index 2ec1909306e4..556d9e9ae798 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -18,7 +18,6 @@ import logging import os import subprocess -import threading import uuid from pathlib import Path from typing import Any, Dict, List, Optional @@ -26,11 +25,12 @@ import pyarrow from packaging.version import parse -from pyarrow._fs import FileSystem, LocalFileSystem +from pyarrow._fs import FileSystem from pypaimon.common.options import Options from pypaimon.common.options.config import OssOptions, S3Options from pypaimon.common.uri_reader import UriReaderFactory +from pypaimon.filesystem.local import PaimonLocalFileSystem from pypaimon.schema.data_types import DataField, AtomicType, PyarrowFieldParser from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob from pypaimon.table.row.generic_row import GenericRow @@ -39,8 +39,6 @@ class FileIO: - rename_lock = threading.Lock() - def __init__(self, path: str, catalog_options: Options): self.properties = catalog_options self.logger = logging.getLogger(__name__) @@ -183,9 +181,8 @@ def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: ) def _initialize_local_fs(self) -> FileSystem: - from pyarrow.fs import LocalFileSystem - return LocalFileSystem() + return PaimonLocalFileSystem() def new_input_stream(self, path: str): path_str = self.to_filesystem_path(path) @@ -255,15 +252,7 @@ def rename(self, src: str, dst: str) -> bool: self.mkdirs(str(dst_parent)) src_str = self.to_filesystem_path(src) - if isinstance(self.filesystem, LocalFileSystem): - if self.exists(dst): - return False - with FileIO.rename_lock: - if self.exists(dst): - return False - self.filesystem.move(src_str, dst_str) - else: - self.filesystem.move(src_str, dst_str) + self.filesystem.move(src_str, dst_str) return True except Exception as e: self.logger.warning(f"Failed to rename {src} to {dst}: {e}") diff --git a/paimon-python/pypaimon/filesystem/local.py b/paimon-python/pypaimon/filesystem/local.py new file mode 100644 index 000000000000..c845f8547cce --- /dev/null +++ b/paimon-python/pypaimon/filesystem/local.py @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import threading +import pyarrow +from pyarrow._fs import LocalFileSystem + + +class PaimonLocalFileSystem(LocalFileSystem): + + rename_lock = threading.Lock() + + def move(self, src, dst): + with PaimonLocalFileSystem.rename_lock: + file_info = self.get_file_info([dst])[0] + result = file_info.type != pyarrow.fs.FileType.NotFound + if (result is True): + raise Exception("Target file already exists") + + super(PaimonLocalFileSystem, self).move(src, dst) diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py b/paimon-python/pypaimon/tests/reader_append_only_test.py index b47f5d1f67ff..d65658ef5c55 100644 --- a/paimon-python/pypaimon/tests/reader_append_only_test.py +++ b/paimon-python/pypaimon/tests/reader_append_only_test.py @@ -28,7 +28,11 @@ from pypaimon import CatalogFactory, Schema from pypaimon.common.options.core_options import CoreOptions +from pypaimon.manifest.schema.manifest_entry import ManifestEntry +from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER from pypaimon.snapshot.snapshot_manager import SnapshotManager +from pypaimon.table.row.generic_row import GenericRow +from pypaimon.write.file_store_commit import RetryResult class AoReaderTest(unittest.TestCase): @@ -153,6 +157,62 @@ def test_append_only_multi_write_once_commit(self): actual = self._read_test_table(read_builder).sort_by('user_id') self.assertEqual(actual, self.expected) + def test_commit_retry_filter(self): + schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) + self.catalog.create_table('default.test_commit_retry_filter', schema, False) + table = self.catalog.get_table('default.test_commit_retry_filter') + write_builder = table.new_batch_write_builder() + + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + data1 = { + 'user_id': [1, 2, 3, 4], + 'item_id': [1001, 1002, 1003, 1004], + 'behavior': ['a', 'b', 'c', None], + 'dt': ['p1', 'p1', 'p2', 'p1'], + } + pa_table1 = pa.Table.from_pydict(data1, schema=self.pa_schema) + data2 = { + 'user_id': [5, 6, 7, 8], + 'item_id': [1005, 1006, 1007, 1008], + 'behavior': ['e', 'f', 'g', 'h'], + 'dt': ['p2', 'p1', 'p2', 'p2'], + } + pa_table2 = pa.Table.from_pydict(data2, schema=self.pa_schema) + + table_write.write_arrow(pa_table1) + table_write.write_arrow(pa_table2) + + messages = table_write.prepare_commit() + table_commit.commit(messages) + table_write.close() + + snapshot_manager = SnapshotManager(table) + latest_snapshot = snapshot_manager.get_latest_snapshot() + commit_entries = [] + for msg in messages: + partition = GenericRow(list(msg.partition), table.partition_keys_fields) + for file in msg.new_files: + commit_entries.append(ManifestEntry( + kind=0, + partition=partition, + bucket=msg.bucket, + total_buckets=table.total_buckets, + file=file + )) + # mock retry + success = table_commit.file_store_commit._try_commit_once( + RetryResult(None), + "APPEND", + commit_entries, + BATCH_COMMIT_IDENTIFIER, + latest_snapshot) + self.assertTrue(success.is_success()) + table_commit.close() + read_builder = table.new_read_builder() + actual = self._read_test_table(read_builder).sort_by('user_id') + self.assertEqual(actual, self.expected) + def test_over_1000_cols_read(self): num_rows = 1 num_cols = 10 diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index e55e25f7c830..6c65c5173ab2 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -110,8 +110,8 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): )) self._try_commit(commit_kind="APPEND", - commit_entries=commit_entries, - commit_identifier=commit_identifier) + commit_identifier=commit_identifier, + commit_entries_plan=lambda snapshot: commit_entries) def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], commit_identifier: int): """Commit the given commit messages in overwrite mode.""" @@ -133,16 +133,14 @@ def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], c raise RuntimeError(f"Trying to overwrite partition {overwrite_partition}, but the changes " f"in {msg.partition} does not belong to this partition") - self._overwrite_partition_filter = partition_filter - self._overwrite_commit_messages = commit_messages - self._try_commit( commit_kind="OVERWRITE", - commit_entries=None, # Will be generated in _try_commit based on latest snapshot - commit_identifier=commit_identifier + commit_identifier=commit_identifier, + commit_entries_plan=lambda snapshot: self._generate_overwrite_entries( + snapshot, partition_filter, commit_messages) ) - def _try_commit(self, commit_kind, commit_entries, commit_identifier): + def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan): import threading retry_count = 0 @@ -151,9 +149,7 @@ def _try_commit(self, commit_kind, commit_entries, commit_identifier): thread_id = threading.current_thread().name while True: latest_snapshot = self.snapshot_manager.get_latest_snapshot() - - if commit_kind == "OVERWRITE": - commit_entries = self._generate_overwrite_entries() + commit_entries = commit_entries_plan(latest_snapshot) result = self._try_commit_once( retry_result=retry_result, @@ -164,7 +160,7 @@ def _try_commit(self, commit_kind, commit_entries, commit_identifier): ) if result.is_success(): - logger.warning( + logger.info( f"Thread {thread_id}: commit success {latest_snapshot.id + 1 if latest_snapshot else 1} " f"after {retry_count} retries" ) @@ -190,24 +186,9 @@ def _try_commit(self, commit_kind, commit_entries, commit_identifier): def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str, commit_entries: List[ManifestEntry], commit_identifier: int, latest_snapshot: Optional[Snapshot]) -> CommitResult: - start_time_ms = int(time.time() * 1000) - - if retry_result is not None and latest_snapshot is not None: - start_check_snapshot_id = 1 # Snapshot.FIRST_SNAPSHOT_ID - if retry_result.latest_snapshot is not None: - start_check_snapshot_id = retry_result.latest_snapshot.id + 1 - - for snapshot_id in range(start_check_snapshot_id, latest_snapshot.id + 2): - snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id) - if (snapshot and snapshot.commit_user == self.commit_user and - snapshot.commit_identifier == commit_identifier and - snapshot.commit_kind == commit_kind): - logger.info( - f"Commit already completed (snapshot {snapshot_id}), " - f"user: {self.commit_user}, identifier: {commit_identifier}" - ) - return SuccessResult() - + if self._is_duplicate_commit(retry_result, latest_snapshot, commit_identifier, commit_kind): + return SuccessResult() + unique_id = uuid.uuid4() base_manifest_list = f"manifest-list-{unique_id}-0" delta_manifest_list = f"manifest-list-{unique_id}-1" @@ -242,10 +223,8 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str else: deleted_file_count += 1 delta_record_count -= entry.file.row_count - try: self.manifest_file_manager.write(new_manifest_file, commit_entries) - # TODO: implement noConflictsOrFail logic partition_columns = list(zip(*(entry.partition.values for entry in commit_entries))) partition_min_stats = [min(col) for col in partition_columns] @@ -253,13 +232,10 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str partition_null_counts = [sum(value == 0 for value in col) for col in partition_columns] if not all(count == 0 for count in partition_null_counts): raise RuntimeError("Partition value should not be null") - manifest_file_path = f"{self.manifest_file_manager.manifest_path}/{new_manifest_file}" - file_size = self.table.file_io.get_file_size(manifest_file_path) - new_manifest_file_meta = ManifestFileMeta( file_name=new_manifest_file, - file_size=file_size, + file_size=self.table.file_io.get_file_size(manifest_file_path), num_added_files=added_file_count, num_deleted_files=deleted_file_count, partition_stats=SimpleStats( @@ -275,7 +251,6 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str ), schema_id=self.table.table_schema.id, ) - self.manifest_list_manager.write(delta_manifest_list, [new_manifest_file_meta]) # process existing_manifest @@ -287,8 +262,8 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str total_record_count += previous_record_count else: existing_manifest_files = [] - self.manifest_list_manager.write(base_manifest_list, existing_manifest_files) + total_record_count += delta_record_count snapshot_data = Snapshot( version=3, @@ -307,8 +282,7 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str # Generate partition statistics for the commit statistics = self._generate_partition_statistics(commit_entries) except Exception as e: - self._cleanup_preparation_failure(new_manifest_file, delta_manifest_list, - base_manifest_list) + self._cleanup_preparation_failure(delta_manifest_list, base_manifest_list) logger.warning(f"Exception occurs when preparing snapshot: {e}", exc_info=True) raise RuntimeError(f"Failed to prepare snapshot: {e}") @@ -317,16 +291,8 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str with self.snapshot_commit: success = self.snapshot_commit.commit(snapshot_data, self.table.current_branch(), statistics) if not success: - # Commit failed, clean up temporary files and retry - commit_time_sec = (int(time.time() * 1000) - start_time_ms) / 1000 - logger.warning( - f"Atomic commit failed for snapshot #{new_snapshot_id} " - f"by user {self.commit_user} " - f"with identifier {commit_identifier} and kind {commit_kind} after {commit_time_sec}s. " - f"Clean up and try again." - ) - self._cleanup_preparation_failure(new_manifest_file, delta_manifest_list, - base_manifest_list) + logger.warning(f"Atomic commit failed for snapshot #{new_snapshot_id} failed") + self._cleanup_preparation_failure(delta_manifest_list, base_manifest_list) return RetryResult(latest_snapshot, None) except Exception as e: # Commit exception, not sure about the situation and should not clean up the files @@ -340,14 +306,34 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str ) return SuccessResult() - def _generate_overwrite_entries(self): + def _is_duplicate_commit(self, retry_result, latest_snapshot, commit_identifier, commit_kind) -> bool: + if retry_result is not None and latest_snapshot is not None: + start_check_snapshot_id = 1 # Snapshot.FIRST_SNAPSHOT_ID + if retry_result.latest_snapshot is not None: + start_check_snapshot_id = retry_result.latest_snapshot.id + 1 + + for snapshot_id in range(start_check_snapshot_id, latest_snapshot.id + 1): + snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id) + if (snapshot and snapshot.commit_user == self.commit_user and + snapshot.commit_identifier == commit_identifier and + snapshot.commit_kind == commit_kind): + logger.info( + f"Commit already completed (snapshot {snapshot_id}), " + f"user: {self.commit_user}, identifier: {commit_identifier}" + ) + return True + return False + + def _generate_overwrite_entries(self, latestSnapshot, partition_filter, commit_messages): """Generate commit entries for OVERWRITE mode based on latest snapshot.""" entries = [] - current_entries = FullStartingScanner(self.table, self._overwrite_partition_filter, None).plan_files() + current_entries = [] if latestSnapshot is None \ + else (FullStartingScanner(self.table, partition_filter, None). + read_manifest_entries(self.manifest_list_manager.read_all(latestSnapshot))) for entry in current_entries: entry.kind = 1 # DELETE entries.append(entry) - for msg in self._overwrite_commit_messages: + for msg in commit_messages: partition = GenericRow(list(msg.partition), self.table.partition_keys_fields) for file in msg.new_files: entries.append(ManifestEntry( @@ -377,7 +363,7 @@ def _commit_retry_wait(self, retry_count: int): ) time.sleep(total_wait_ms / 1000.0) - def _cleanup_preparation_failure(self, manifest_file: Optional[str], + def _cleanup_preparation_failure(self, delta_manifest_list: Optional[str], base_manifest_list: Optional[str]): try: @@ -394,10 +380,6 @@ def _cleanup_preparation_failure(self, manifest_file: Optional[str], if base_manifest_list: base_path = f"{manifest_path}/{base_manifest_list}" self.table.file_io.delete_quietly(base_path) - - if manifest_file: - manifest_file_path = f"{self.manifest_file_manager.manifest_path}/{manifest_file}" - self.table.file_io.delete_quietly(manifest_file_path) except Exception as e: logger.warning(f"Failed to clean up temporary files during preparation failure: {e}", exc_info=True)