Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 4 additions & 15 deletions paimon-python/pypaimon/common/file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@
import logging
import os
import subprocess
import threading
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional
from urllib.parse import splitport, urlparse

import pyarrow
from packaging.version import parse
from pyarrow._fs import FileSystem, LocalFileSystem
from pyarrow._fs import FileSystem

from pypaimon.common.options import Options
from pypaimon.common.options.config import OssOptions, S3Options
from pypaimon.common.uri_reader import UriReaderFactory
from pypaimon.filesystem.local import PaimonLocalFileSystem
from pypaimon.schema.data_types import DataField, AtomicType, PyarrowFieldParser
from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob
from pypaimon.table.row.generic_row import GenericRow
Expand All @@ -39,8 +39,6 @@


class FileIO:
rename_lock = threading.Lock()

def __init__(self, path: str, catalog_options: Options):
self.properties = catalog_options
self.logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -183,9 +181,8 @@ def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:
)

def _initialize_local_fs(self) -> FileSystem:
from pyarrow.fs import LocalFileSystem

return LocalFileSystem()
return PaimonLocalFileSystem()

def new_input_stream(self, path: str):
path_str = self.to_filesystem_path(path)
Expand Down Expand Up @@ -255,15 +252,7 @@ def rename(self, src: str, dst: str) -> bool:
self.mkdirs(str(dst_parent))

src_str = self.to_filesystem_path(src)
if isinstance(self.filesystem, LocalFileSystem):
if self.exists(dst):
return False
with FileIO.rename_lock:
if self.exists(dst):
return False
self.filesystem.move(src_str, dst_str)
else:
self.filesystem.move(src_str, dst_str)
self.filesystem.move(src_str, dst_str)
return True
except Exception as e:
self.logger.warning(f"Failed to rename {src} to {dst}: {e}")
Expand Down
34 changes: 34 additions & 0 deletions paimon-python/pypaimon/filesystem/local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import threading
import pyarrow
from pyarrow._fs import LocalFileSystem


class PaimonLocalFileSystem(LocalFileSystem):

rename_lock = threading.Lock()

def move(self, src, dst):
with PaimonLocalFileSystem.rename_lock:
file_info = self.get_file_info([dst])[0]
result = file_info.type != pyarrow.fs.FileType.NotFound
if (result is True):
raise Exception("Target file already exists")

super(PaimonLocalFileSystem, self).move(src, dst)
60 changes: 60 additions & 0 deletions paimon-python/pypaimon/tests/reader_append_only_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@

from pypaimon import CatalogFactory, Schema
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.write.file_store_commit import RetryResult


class AoReaderTest(unittest.TestCase):
Expand Down Expand Up @@ -153,6 +157,62 @@ def test_append_only_multi_write_once_commit(self):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)

def test_commit_retry_filter(self):
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'])
self.catalog.create_table('default.test_commit_retry_filter', schema, False)
table = self.catalog.get_table('default.test_commit_retry_filter')
write_builder = table.new_batch_write_builder()

table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data1 = {
'user_id': [1, 2, 3, 4],
'item_id': [1001, 1002, 1003, 1004],
'behavior': ['a', 'b', 'c', None],
'dt': ['p1', 'p1', 'p2', 'p1'],
}
pa_table1 = pa.Table.from_pydict(data1, schema=self.pa_schema)
data2 = {
'user_id': [5, 6, 7, 8],
'item_id': [1005, 1006, 1007, 1008],
'behavior': ['e', 'f', 'g', 'h'],
'dt': ['p2', 'p1', 'p2', 'p2'],
}
pa_table2 = pa.Table.from_pydict(data2, schema=self.pa_schema)

table_write.write_arrow(pa_table1)
table_write.write_arrow(pa_table2)

messages = table_write.prepare_commit()
table_commit.commit(messages)
table_write.close()

snapshot_manager = SnapshotManager(table)
latest_snapshot = snapshot_manager.get_latest_snapshot()
commit_entries = []
for msg in messages:
partition = GenericRow(list(msg.partition), table.partition_keys_fields)
for file in msg.new_files:
commit_entries.append(ManifestEntry(
kind=0,
partition=partition,
bucket=msg.bucket,
total_buckets=table.total_buckets,
file=file
))
# mock retry
success = table_commit.file_store_commit._try_commit_once(
RetryResult(None),
"APPEND",
commit_entries,
BATCH_COMMIT_IDENTIFIER,
latest_snapshot)
self.assertTrue(success.is_success())
table_commit.close()
read_builder = table.new_read_builder()
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)

def test_over_1000_cols_read(self):
num_rows = 1
num_cols = 10
Expand Down
98 changes: 40 additions & 58 deletions paimon-python/pypaimon/write/file_store_commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int):
))

self._try_commit(commit_kind="APPEND",
commit_entries=commit_entries,
commit_identifier=commit_identifier)
commit_identifier=commit_identifier,
commit_entries_plan=lambda snapshot: commit_entries)

def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], commit_identifier: int):
"""Commit the given commit messages in overwrite mode."""
Expand All @@ -133,16 +133,14 @@ def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], c
raise RuntimeError(f"Trying to overwrite partition {overwrite_partition}, but the changes "
f"in {msg.partition} does not belong to this partition")

self._overwrite_partition_filter = partition_filter
self._overwrite_commit_messages = commit_messages

self._try_commit(
commit_kind="OVERWRITE",
commit_entries=None, # Will be generated in _try_commit based on latest snapshot
commit_identifier=commit_identifier
commit_identifier=commit_identifier,
commit_entries_plan=lambda snapshot: self._generate_overwrite_entries(
snapshot, partition_filter, commit_messages)
)

def _try_commit(self, commit_kind, commit_entries, commit_identifier):
def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan):
import threading

retry_count = 0
Expand All @@ -151,9 +149,7 @@ def _try_commit(self, commit_kind, commit_entries, commit_identifier):
thread_id = threading.current_thread().name
while True:
latest_snapshot = self.snapshot_manager.get_latest_snapshot()

if commit_kind == "OVERWRITE":
commit_entries = self._generate_overwrite_entries()
commit_entries = commit_entries_plan(latest_snapshot)

result = self._try_commit_once(
retry_result=retry_result,
Expand All @@ -164,7 +160,7 @@ def _try_commit(self, commit_kind, commit_entries, commit_identifier):
)

if result.is_success():
logger.warning(
logger.info(
f"Thread {thread_id}: commit success {latest_snapshot.id + 1 if latest_snapshot else 1} "
f"after {retry_count} retries"
)
Expand All @@ -190,24 +186,9 @@ def _try_commit(self, commit_kind, commit_entries, commit_identifier):
def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str,
commit_entries: List[ManifestEntry], commit_identifier: int,
latest_snapshot: Optional[Snapshot]) -> CommitResult:
start_time_ms = int(time.time() * 1000)

if retry_result is not None and latest_snapshot is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep this as it is, and add test to verify this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

start_check_snapshot_id = 1 # Snapshot.FIRST_SNAPSHOT_ID
if retry_result.latest_snapshot is not None:
start_check_snapshot_id = retry_result.latest_snapshot.id + 1

for snapshot_id in range(start_check_snapshot_id, latest_snapshot.id + 2):
snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id)
if (snapshot and snapshot.commit_user == self.commit_user and
snapshot.commit_identifier == commit_identifier and
snapshot.commit_kind == commit_kind):
logger.info(
f"Commit already completed (snapshot {snapshot_id}), "
f"user: {self.commit_user}, identifier: {commit_identifier}"
)
return SuccessResult()

if self._is_duplicate_commit(retry_result, latest_snapshot, commit_identifier, commit_kind):
return SuccessResult()

unique_id = uuid.uuid4()
base_manifest_list = f"manifest-list-{unique_id}-0"
delta_manifest_list = f"manifest-list-{unique_id}-1"
Expand Down Expand Up @@ -242,24 +223,19 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str
else:
deleted_file_count += 1
delta_record_count -= entry.file.row_count

try:
self.manifest_file_manager.write(new_manifest_file, commit_entries)

# TODO: implement noConflictsOrFail logic
partition_columns = list(zip(*(entry.partition.values for entry in commit_entries)))
partition_min_stats = [min(col) for col in partition_columns]
partition_max_stats = [max(col) for col in partition_columns]
partition_null_counts = [sum(value == 0 for value in col) for col in partition_columns]
if not all(count == 0 for count in partition_null_counts):
raise RuntimeError("Partition value should not be null")

manifest_file_path = f"{self.manifest_file_manager.manifest_path}/{new_manifest_file}"
file_size = self.table.file_io.get_file_size(manifest_file_path)

new_manifest_file_meta = ManifestFileMeta(
file_name=new_manifest_file,
file_size=file_size,
file_size=self.table.file_io.get_file_size(manifest_file_path),
num_added_files=added_file_count,
num_deleted_files=deleted_file_count,
partition_stats=SimpleStats(
Expand All @@ -275,7 +251,6 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str
),
schema_id=self.table.table_schema.id,
)

self.manifest_list_manager.write(delta_manifest_list, [new_manifest_file_meta])

# process existing_manifest
Expand All @@ -287,8 +262,8 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str
total_record_count += previous_record_count
else:
existing_manifest_files = []

self.manifest_list_manager.write(base_manifest_list, existing_manifest_files)

total_record_count += delta_record_count
snapshot_data = Snapshot(
version=3,
Expand All @@ -307,8 +282,7 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str
# Generate partition statistics for the commit
statistics = self._generate_partition_statistics(commit_entries)
except Exception as e:
self._cleanup_preparation_failure(new_manifest_file, delta_manifest_list,
base_manifest_list)
self._cleanup_preparation_failure(delta_manifest_list, base_manifest_list)
logger.warning(f"Exception occurs when preparing snapshot: {e}", exc_info=True)
raise RuntimeError(f"Failed to prepare snapshot: {e}")

Expand All @@ -317,16 +291,8 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str
with self.snapshot_commit:
success = self.snapshot_commit.commit(snapshot_data, self.table.current_branch(), statistics)
if not success:
# Commit failed, clean up temporary files and retry
commit_time_sec = (int(time.time() * 1000) - start_time_ms) / 1000
logger.warning(
f"Atomic commit failed for snapshot #{new_snapshot_id} "
f"by user {self.commit_user} "
f"with identifier {commit_identifier} and kind {commit_kind} after {commit_time_sec}s. "
f"Clean up and try again."
)
self._cleanup_preparation_failure(new_manifest_file, delta_manifest_list,
base_manifest_list)
logger.warning(f"Atomic commit failed for snapshot #{new_snapshot_id} failed")
self._cleanup_preparation_failure(delta_manifest_list, base_manifest_list)
return RetryResult(latest_snapshot, None)
except Exception as e:
# Commit exception, not sure about the situation and should not clean up the files
Expand All @@ -340,14 +306,34 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str
)
return SuccessResult()

def _generate_overwrite_entries(self):
def _is_duplicate_commit(self, retry_result, latest_snapshot, commit_identifier, commit_kind) -> bool:
if retry_result is not None and latest_snapshot is not None:
start_check_snapshot_id = 1 # Snapshot.FIRST_SNAPSHOT_ID
if retry_result.latest_snapshot is not None:
start_check_snapshot_id = retry_result.latest_snapshot.id + 1

for snapshot_id in range(start_check_snapshot_id, latest_snapshot.id + 1):
snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id)
if (snapshot and snapshot.commit_user == self.commit_user and
snapshot.commit_identifier == commit_identifier and
snapshot.commit_kind == commit_kind):
logger.info(
f"Commit already completed (snapshot {snapshot_id}), "
f"user: {self.commit_user}, identifier: {commit_identifier}"
)
return True
return False

def _generate_overwrite_entries(self, latestSnapshot, partition_filter, commit_messages):
"""Generate commit entries for OVERWRITE mode based on latest snapshot."""
entries = []
current_entries = FullStartingScanner(self.table, self._overwrite_partition_filter, None).plan_files()
current_entries = [] if latestSnapshot is None \
else (FullStartingScanner(self.table, partition_filter, None).
read_manifest_entries(self.manifest_list_manager.read_all(latestSnapshot)))
for entry in current_entries:
entry.kind = 1 # DELETE
entries.append(entry)
for msg in self._overwrite_commit_messages:
for msg in commit_messages:
partition = GenericRow(list(msg.partition), self.table.partition_keys_fields)
for file in msg.new_files:
entries.append(ManifestEntry(
Expand Down Expand Up @@ -377,7 +363,7 @@ def _commit_retry_wait(self, retry_count: int):
)
time.sleep(total_wait_ms / 1000.0)

def _cleanup_preparation_failure(self, manifest_file: Optional[str],
def _cleanup_preparation_failure(self,
delta_manifest_list: Optional[str],
base_manifest_list: Optional[str]):
try:
Expand All @@ -394,10 +380,6 @@ def _cleanup_preparation_failure(self, manifest_file: Optional[str],
if base_manifest_list:
base_path = f"{manifest_path}/{base_manifest_list}"
self.table.file_io.delete_quietly(base_path)

if manifest_file:
manifest_file_path = f"{self.manifest_file_manager.manifest_path}/{manifest_file}"
self.table.file_io.delete_quietly(manifest_file_path)
except Exception as e:
logger.warning(f"Failed to clean up temporary files during preparation failure: {e}", exc_info=True)

Expand Down