From be91f8a48a2c8bb6519f3dc82e993085360b2288 Mon Sep 17 00:00:00 2001 From: Alexandra Popova Date: Fri, 10 Oct 2025 10:18:34 +0300 Subject: [PATCH 1/9] Check storage-credentials before config in table response --- pyiceberg/catalog/rest/__init__.py | 13 +++++++++-- tests/catalog/test_rest.py | 37 ++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index f7d8eec960..db18e771f7 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -156,6 +156,7 @@ class TableResponse(IcebergBaseModel): metadata_location: Optional[str] = Field(alias="metadata-location", default=None) metadata: TableMetadata config: Properties = Field(default_factory=dict) + storage_credentials: Optional[Properties] = Field(alias="storage-credentials", default=None) class CreateTableRequest(IcebergBaseModel): @@ -454,7 +455,11 @@ def _response_to_table(self, identifier_tuple: Tuple[str, ...], table_response: metadata_location=table_response.metadata_location, # type: ignore metadata=table_response.metadata, io=self._load_file_io( - {**table_response.metadata.properties, **table_response.config}, table_response.metadata_location + { + **table_response.metadata.properties, + **(table_response.storage_credentials or table_response.config), + }, + table_response.metadata_location, ), catalog=self, config=table_response.config, @@ -466,7 +471,11 @@ def _response_to_staged_table(self, identifier_tuple: Tuple[str, ...], table_res metadata_location=table_response.metadata_location, # type: ignore metadata=table_response.metadata, io=self._load_file_io( - {**table_response.metadata.properties, **table_response.config}, table_response.metadata_location + { + **table_response.metadata.properties, + **(table_response.storage_credentials or table_response.config), + }, + table_response.metadata_location, ), catalog=self, ) diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 223c6d2f9e..3998632049 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -18,6 +18,7 @@ import base64 import os from typing import Any, Callable, Dict, cast +from copy import deepcopy from unittest import mock import pytest @@ -858,6 +859,42 @@ def test_load_table_200(rest_mock: Mocker, example_table_metadata_with_snapshot_ assert actual == expected +def test_load_table_prefers_storage_credentials_over_config( + rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any], monkeypatch: pytest.MonkeyPatch +) -> None: + table_resp = deepcopy(example_table_metadata_with_snapshot_v1_rest_json) + table_resp["config"] = {"some.key": "from-config", "only.config": "only-config"} + table_resp["storage-credentials"] = {"some.key": "from-cred", "only.creds": "only-creds"} + + table_resp["metadata"].setdefault("properties", {})["meta.key"] = "from-metadata" + + captured: Dict[str, Any] = {} + + def _capture_io(_self: Any, properties: Dict[str, Any], location: Any) -> Any: # type: ignore + captured["properties"] = dict(properties) + captured["location"] = location + class _DummyIO: + pass + return _DummyIO() + + monkeypatch.setattr(RestCatalog, "_load_file_io", _capture_io, raising=True) + + rest_mock.get( + f"{TEST_URI}v1/namespaces/fokko/tables/table", + json=table_resp, + status_code=200, + request_headers=TEST_HEADERS, + ) + + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + _ = catalog.load_table(("fokko", "table")) + + props = cast(Dict[str, Any], captured["properties"]) + assert props["meta.key"] == "from-metadata" + assert props["some.key"] == "from-cred" + assert props["only.creds"] == "only-creds" + + def test_load_table_200_loading_mode( rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any] ) -> None: From c55eecc6ef3bb8c70a3c283127a369649f877837 Mon Sep 17 00:00:00 2001 From: Alexandra Popova Date: Fri, 10 Oct 2025 14:18:21 +0300 Subject: [PATCH 2/9] Check storage-credentials before config in table response --- pyiceberg/catalog/rest/__init__.py | 67 +++++++++++++++--------------- tests/catalog/test_rest.py | 57 ++++++++++++++----------- 2 files changed, 66 insertions(+), 58 deletions(-) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index db18e771f7..d0c5aa5a7d 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -20,7 +20,6 @@ Any, Dict, List, - Optional, Set, Tuple, Union, @@ -153,18 +152,18 @@ def _retry_hook(retry_state: RetryCallState) -> None: class TableResponse(IcebergBaseModel): - metadata_location: Optional[str] = Field(alias="metadata-location", default=None) + metadata_location: str | None = Field(alias="metadata-location", default=None) metadata: TableMetadata config: Properties = Field(default_factory=dict) - storage_credentials: Optional[Properties] = Field(alias="storage-credentials", default=None) + storage_credentials: Properties | None = Field(alias="storage-credentials", default=None) class CreateTableRequest(IcebergBaseModel): name: str = Field() - location: Optional[str] = Field() + location: str | None = Field() table_schema: Schema = Field(alias="schema") - partition_spec: Optional[PartitionSpec] = Field(alias="partition-spec") - write_order: Optional[SortOrder] = Field(alias="write-order") + partition_spec: PartitionSpec | None = Field(alias="partition-spec") + write_order: SortOrder | None = Field(alias="write-order") stage_create: bool = Field(alias="stage-create", default=False) properties: Dict[str, str] = Field(default_factory=dict) @@ -180,8 +179,8 @@ class RegisterTableRequest(IcebergBaseModel): class ConfigResponse(IcebergBaseModel): - defaults: Optional[Properties] = Field(default_factory=dict) - overrides: Optional[Properties] = Field(default_factory=dict) + defaults: Properties | None = Field(default_factory=dict) + overrides: Properties | None = Field(default_factory=dict) class ListNamespaceResponse(IcebergBaseModel): @@ -295,7 +294,7 @@ def _create_legacy_oauth2_auth_manager(self, session: Session) -> AuthManager: return AuthManagerFactory.create("legacyoauth2", auth_config) - def _check_valid_namespace_identifier(self, identifier: Union[str, Identifier]) -> Identifier: + def _check_valid_namespace_identifier(self, identifier: str | Identifier) -> Identifier: """Check if the identifier has at least one element.""" identifier_tuple = Catalog.identifier_to_tuple(identifier) if len(identifier_tuple) < 1: @@ -378,14 +377,14 @@ def _fetch_config(self) -> None: # Update URI based on overrides self.uri = config[URI] - def _identifier_to_validated_tuple(self, identifier: Union[str, Identifier]) -> Identifier: + def _identifier_to_validated_tuple(self, identifier: str | Identifier) -> Identifier: identifier_tuple = self.identifier_to_tuple(identifier) if len(identifier_tuple) <= 1: raise NoSuchIdentifierError(f"Missing namespace or invalid identifier: {'.'.join(identifier_tuple)}") return identifier_tuple def _split_identifier_for_path( - self, identifier: Union[str, Identifier, TableIdentifier], kind: IdentifierKind = IdentifierKind.TABLE + self, identifier: str | Identifier | TableIdentifier, kind: IdentifierKind = IdentifierKind.TABLE ) -> Properties: if isinstance(identifier, TableIdentifier): return {"namespace": NAMESPACE_SEPARATOR.join(identifier.namespace.root), kind.value: identifier.name} @@ -393,7 +392,7 @@ def _split_identifier_for_path( return {"namespace": NAMESPACE_SEPARATOR.join(identifier_tuple[:-1]), kind.value: identifier_tuple[-1]} - def _split_identifier_for_json(self, identifier: Union[str, Identifier]) -> Dict[str, Union[Identifier, str]]: + def _split_identifier_for_json(self, identifier: str | Identifier) -> Dict[str, Identifier | str]: identifier_tuple = self._identifier_to_validated_tuple(identifier) return {"namespace": identifier_tuple[:-1], "name": identifier_tuple[-1]} @@ -497,9 +496,9 @@ def _config_headers(self, session: Session) -> None: def _create_table( self, - identifier: Union[str, Identifier], + identifier: str | Identifier, schema: Union[Schema, "pa.Schema"], - location: Optional[str] = None, + location: str | None = None, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, @@ -539,9 +538,9 @@ def _create_table( @retry(**_RETRY_ARGS) def create_table( self, - identifier: Union[str, Identifier], + identifier: str | Identifier, schema: Union[Schema, "pa.Schema"], - location: Optional[str] = None, + location: str | None = None, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, @@ -560,9 +559,9 @@ def create_table( @retry(**_RETRY_ARGS) def create_table_transaction( self, - identifier: Union[str, Identifier], + identifier: str | Identifier, schema: Union[Schema, "pa.Schema"], - location: Optional[str] = None, + location: str | None = None, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, @@ -580,7 +579,7 @@ def create_table_transaction( return CreateTableTransaction(staged_table) @retry(**_RETRY_ARGS) - def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: + def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table: """Register a new table using existing metadata. Args: @@ -612,7 +611,7 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: return self._response_to_table(self.identifier_to_tuple(identifier), table_response) @retry(**_RETRY_ARGS) - def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: + def list_tables(self, namespace: str | Identifier) -> List[Identifier]: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple) response = self._session.get(self.url(Endpoints.list_tables, namespace=namespace_concat)) @@ -623,7 +622,7 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: return [(*table.namespace, table.name) for table in ListTablesResponse.model_validate_json(response.text).identifiers] @retry(**_RETRY_ARGS) - def load_table(self, identifier: Union[str, Identifier]) -> Table: + def load_table(self, identifier: str | Identifier) -> Table: params = {} if mode := self.properties.get(SNAPSHOT_LOADING_MODE): if mode in {"all", "refs"}: @@ -643,7 +642,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: return self._response_to_table(self.identifier_to_tuple(identifier), table_response) @retry(**_RETRY_ARGS) - def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool = False) -> None: + def drop_table(self, identifier: str | Identifier, purge_requested: bool = False) -> None: response = self._session.delete( self.url(Endpoints.drop_table, prefixed=True, **self._split_identifier_for_path(identifier)), params={"purgeRequested": purge_requested}, @@ -654,11 +653,11 @@ def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool = _handle_non_200_response(exc, {404: NoSuchTableError}) @retry(**_RETRY_ARGS) - def purge_table(self, identifier: Union[str, Identifier]) -> None: + def purge_table(self, identifier: str | Identifier) -> None: self.drop_table(identifier=identifier, purge_requested=True) @retry(**_RETRY_ARGS) - def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: + def rename_table(self, from_identifier: str | Identifier, to_identifier: str | Identifier) -> Table: payload = { "source": self._split_identifier_for_json(from_identifier), "destination": self._split_identifier_for_json(to_identifier), @@ -683,7 +682,7 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm return table_request @retry(**_RETRY_ARGS) - def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: + def list_views(self, namespace: str | Identifier) -> List[Identifier]: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple) response = self._session.get(self.url(Endpoints.list_views, namespace=namespace_concat)) @@ -740,7 +739,7 @@ def commit_table( return CommitTableResponse.model_validate_json(response.text) @retry(**_RETRY_ARGS) - def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: + def create_namespace(self, namespace: str | Identifier, properties: Properties = EMPTY_DICT) -> None: namespace_tuple = self._check_valid_namespace_identifier(namespace) payload = {"namespace": namespace_tuple, "properties": properties} response = self._session.post(self.url(Endpoints.create_namespace), json=payload) @@ -750,7 +749,7 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper _handle_non_200_response(exc, {409: NamespaceAlreadyExistsError}) @retry(**_RETRY_ARGS) - def drop_namespace(self, namespace: Union[str, Identifier]) -> None: + def drop_namespace(self, namespace: str | Identifier) -> None: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) response = self._session.delete(self.url(Endpoints.drop_namespace, namespace=namespace)) @@ -760,7 +759,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None: _handle_non_200_response(exc, {404: NoSuchNamespaceError, 409: NamespaceNotEmptyError}) @retry(**_RETRY_ARGS) - def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: + def list_namespaces(self, namespace: str | Identifier = ()) -> List[Identifier]: namespace_tuple = self.identifier_to_tuple(namespace) response = self._session.get( self.url( @@ -777,7 +776,7 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi return ListNamespaceResponse.model_validate_json(response.text).namespaces @retry(**_RETRY_ARGS) - def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: + def load_namespace_properties(self, namespace: str | Identifier) -> Properties: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) response = self._session.get(self.url(Endpoints.load_namespace_metadata, namespace=namespace)) @@ -790,7 +789,7 @@ def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Proper @retry(**_RETRY_ARGS) def update_namespace_properties( - self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT + self, namespace: str | Identifier, removals: Set[str] | None = None, updates: Properties = EMPTY_DICT ) -> PropertiesUpdateSummary: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) @@ -808,7 +807,7 @@ def update_namespace_properties( ) @retry(**_RETRY_ARGS) - def namespace_exists(self, namespace: Union[str, Identifier]) -> bool: + def namespace_exists(self, namespace: str | Identifier) -> bool: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) response = self._session.head(self.url(Endpoints.namespace_exists, namespace=namespace)) @@ -826,7 +825,7 @@ def namespace_exists(self, namespace: Union[str, Identifier]) -> bool: return False @retry(**_RETRY_ARGS) - def table_exists(self, identifier: Union[str, Identifier]) -> bool: + def table_exists(self, identifier: str | Identifier) -> bool: """Check if a table exists. Args: @@ -852,7 +851,7 @@ def table_exists(self, identifier: Union[str, Identifier]) -> bool: return False @retry(**_RETRY_ARGS) - def view_exists(self, identifier: Union[str, Identifier]) -> bool: + def view_exists(self, identifier: str | Identifier) -> bool: """Check if a view exists. Args: @@ -877,7 +876,7 @@ def view_exists(self, identifier: Union[str, Identifier]) -> bool: return False @retry(**_RETRY_ARGS) - def drop_view(self, identifier: Union[str]) -> None: + def drop_view(self, identifier: str) -> None: response = self._session.delete( self.url(Endpoints.drop_view, prefixed=True, **self._split_identifier_for_path(identifier, IdentifierKind.VIEW)), ) diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 3998632049..b69fe6d456 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -18,7 +18,6 @@ import base64 import os from typing import Any, Callable, Dict, cast -from copy import deepcopy from unittest import mock import pytest @@ -40,7 +39,7 @@ ServerError, TableAlreadyExistsError, ) -from pyiceberg.io import load_file_io +from pyiceberg.io import AWS_ACCESS_KEY_ID, load_file_io from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table @@ -859,41 +858,51 @@ def test_load_table_200(rest_mock: Mocker, example_table_metadata_with_snapshot_ assert actual == expected -def test_load_table_prefers_storage_credentials_over_config( - rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any], monkeypatch: pytest.MonkeyPatch -) -> None: - table_resp = deepcopy(example_table_metadata_with_snapshot_v1_rest_json) - table_resp["config"] = {"some.key": "from-config", "only.config": "only-config"} - table_resp["storage-credentials"] = {"some.key": "from-cred", "only.creds": "only-creds"} +def test_storage_credentials_over_config(rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]) -> None: + response = { + "metadata-location": example_table_metadata_with_snapshot_v1_rest_json["metadata-location"], + "metadata": example_table_metadata_with_snapshot_v1_rest_json["metadata"], + "config": { + AWS_ACCESS_KEY_ID: "from_config", + }, + "storage-credentials": { + AWS_ACCESS_KEY_ID: "from_storage_credentials", + }, + } - table_resp["metadata"].setdefault("properties", {})["meta.key"] = "from-metadata" + rest_mock.get( + f"{TEST_URI}v1/namespaces/fokko/tables/table", + json=response, + status_code=200, + request_headers=TEST_HEADERS, + ) + + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + table = catalog.load_table(("fokko", "table")) - captured: Dict[str, Any] = {} + assert table.io.properties[AWS_ACCESS_KEY_ID] == "from_storage_credentials" - def _capture_io(_self: Any, properties: Dict[str, Any], location: Any) -> Any: # type: ignore - captured["properties"] = dict(properties) - captured["location"] = location - class _DummyIO: - pass - return _DummyIO() - monkeypatch.setattr(RestCatalog, "_load_file_io", _capture_io, raising=True) +def test_config_when_no_storage_credentials(rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]) -> None: + response = { + "metadata-location": example_table_metadata_with_snapshot_v1_rest_json["metadata-location"], + "metadata": example_table_metadata_with_snapshot_v1_rest_json["metadata"], + "config": { + AWS_ACCESS_KEY_ID: "from_config", + }, + } rest_mock.get( f"{TEST_URI}v1/namespaces/fokko/tables/table", - json=table_resp, + json=response, status_code=200, request_headers=TEST_HEADERS, ) catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) - _ = catalog.load_table(("fokko", "table")) - - props = cast(Dict[str, Any], captured["properties"]) - assert props["meta.key"] == "from-metadata" - assert props["some.key"] == "from-cred" - assert props["only.creds"] == "only-creds" + table = catalog.load_table(("fokko", "table")) + assert table.io.properties[AWS_ACCESS_KEY_ID] == "from_config" def test_load_table_200_loading_mode( rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any] From e309deaa78206f709d7aa1c7371de5769a10b8fc Mon Sep 17 00:00:00 2001 From: Alexandra Popova Date: Fri, 10 Oct 2025 14:23:00 +0300 Subject: [PATCH 3/9] rollback ruff changes --- pyiceberg/catalog/rest/__init__.py | 69 +++++++++++++++--------------- 1 file changed, 35 insertions(+), 34 deletions(-) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index d0c5aa5a7d..7a306a54ff 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -20,6 +20,7 @@ Any, Dict, List, + Optional, Set, Tuple, Union, @@ -152,18 +153,18 @@ def _retry_hook(retry_state: RetryCallState) -> None: class TableResponse(IcebergBaseModel): - metadata_location: str | None = Field(alias="metadata-location", default=None) + metadata_location: Optional[str] = Field(alias="metadata-location", default=None) metadata: TableMetadata config: Properties = Field(default_factory=dict) - storage_credentials: Properties | None = Field(alias="storage-credentials", default=None) + storage_credentials: Optional[Properties] = Field(alias="storage-credentials", default=None) class CreateTableRequest(IcebergBaseModel): name: str = Field() - location: str | None = Field() + location: Optional[str] = Field() table_schema: Schema = Field(alias="schema") - partition_spec: PartitionSpec | None = Field(alias="partition-spec") - write_order: SortOrder | None = Field(alias="write-order") + partition_spec: Optional[PartitionSpec] = Field(alias="partition-spec") + write_order: Optional[SortOrder] = Field(alias="write-order") stage_create: bool = Field(alias="stage-create", default=False) properties: Dict[str, str] = Field(default_factory=dict) @@ -179,8 +180,8 @@ class RegisterTableRequest(IcebergBaseModel): class ConfigResponse(IcebergBaseModel): - defaults: Properties | None = Field(default_factory=dict) - overrides: Properties | None = Field(default_factory=dict) + defaults: Optional[Properties] = Field(default_factory=dict) + overrides: Optional[Properties] = Field(default_factory=dict) class ListNamespaceResponse(IcebergBaseModel): @@ -294,7 +295,7 @@ def _create_legacy_oauth2_auth_manager(self, session: Session) -> AuthManager: return AuthManagerFactory.create("legacyoauth2", auth_config) - def _check_valid_namespace_identifier(self, identifier: str | Identifier) -> Identifier: + def _check_valid_namespace_identifier(self, identifier: Union[str, Identifier]) -> Identifier: """Check if the identifier has at least one element.""" identifier_tuple = Catalog.identifier_to_tuple(identifier) if len(identifier_tuple) < 1: @@ -377,14 +378,14 @@ def _fetch_config(self) -> None: # Update URI based on overrides self.uri = config[URI] - def _identifier_to_validated_tuple(self, identifier: str | Identifier) -> Identifier: + def _identifier_to_validated_tuple(self, identifier: Union[str, Identifier]) -> Identifier: identifier_tuple = self.identifier_to_tuple(identifier) if len(identifier_tuple) <= 1: raise NoSuchIdentifierError(f"Missing namespace or invalid identifier: {'.'.join(identifier_tuple)}") return identifier_tuple def _split_identifier_for_path( - self, identifier: str | Identifier | TableIdentifier, kind: IdentifierKind = IdentifierKind.TABLE + self, identifier: Union[str, Identifier, TableIdentifier], kind: IdentifierKind = IdentifierKind.TABLE ) -> Properties: if isinstance(identifier, TableIdentifier): return {"namespace": NAMESPACE_SEPARATOR.join(identifier.namespace.root), kind.value: identifier.name} @@ -392,7 +393,7 @@ def _split_identifier_for_path( return {"namespace": NAMESPACE_SEPARATOR.join(identifier_tuple[:-1]), kind.value: identifier_tuple[-1]} - def _split_identifier_for_json(self, identifier: str | Identifier) -> Dict[str, Identifier | str]: + def _split_identifier_for_json(self, identifier: Union[str, Identifier]) -> Dict[str, Union[Identifier, str]]: identifier_tuple = self._identifier_to_validated_tuple(identifier) return {"namespace": identifier_tuple[:-1], "name": identifier_tuple[-1]} @@ -496,9 +497,9 @@ def _config_headers(self, session: Session) -> None: def _create_table( self, - identifier: str | Identifier, + identifier: Union[str, Identifier], schema: Union[Schema, "pa.Schema"], - location: str | None = None, + location: Optional[str] = None, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, @@ -538,9 +539,9 @@ def _create_table( @retry(**_RETRY_ARGS) def create_table( self, - identifier: str | Identifier, + identifier: Union[str, Identifier], schema: Union[Schema, "pa.Schema"], - location: str | None = None, + location: Optional[str] = None, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, @@ -559,9 +560,9 @@ def create_table( @retry(**_RETRY_ARGS) def create_table_transaction( self, - identifier: str | Identifier, + identifier: Union[str, Identifier], schema: Union[Schema, "pa.Schema"], - location: str | None = None, + location: Optional[str] = None, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, @@ -579,7 +580,7 @@ def create_table_transaction( return CreateTableTransaction(staged_table) @retry(**_RETRY_ARGS) - def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table: + def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: """Register a new table using existing metadata. Args: @@ -611,7 +612,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str) - return self._response_to_table(self.identifier_to_tuple(identifier), table_response) @retry(**_RETRY_ARGS) - def list_tables(self, namespace: str | Identifier) -> List[Identifier]: + def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple) response = self._session.get(self.url(Endpoints.list_tables, namespace=namespace_concat)) @@ -622,7 +623,7 @@ def list_tables(self, namespace: str | Identifier) -> List[Identifier]: return [(*table.namespace, table.name) for table in ListTablesResponse.model_validate_json(response.text).identifiers] @retry(**_RETRY_ARGS) - def load_table(self, identifier: str | Identifier) -> Table: + def load_table(self, identifier: Union[str, Identifier]) -> Table: params = {} if mode := self.properties.get(SNAPSHOT_LOADING_MODE): if mode in {"all", "refs"}: @@ -642,7 +643,7 @@ def load_table(self, identifier: str | Identifier) -> Table: return self._response_to_table(self.identifier_to_tuple(identifier), table_response) @retry(**_RETRY_ARGS) - def drop_table(self, identifier: str | Identifier, purge_requested: bool = False) -> None: + def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool = False) -> None: response = self._session.delete( self.url(Endpoints.drop_table, prefixed=True, **self._split_identifier_for_path(identifier)), params={"purgeRequested": purge_requested}, @@ -653,11 +654,11 @@ def drop_table(self, identifier: str | Identifier, purge_requested: bool = False _handle_non_200_response(exc, {404: NoSuchTableError}) @retry(**_RETRY_ARGS) - def purge_table(self, identifier: str | Identifier) -> None: + def purge_table(self, identifier: Union[str, Identifier]) -> None: self.drop_table(identifier=identifier, purge_requested=True) @retry(**_RETRY_ARGS) - def rename_table(self, from_identifier: str | Identifier, to_identifier: str | Identifier) -> Table: + def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: payload = { "source": self._split_identifier_for_json(from_identifier), "destination": self._split_identifier_for_json(to_identifier), @@ -682,7 +683,7 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm return table_request @retry(**_RETRY_ARGS) - def list_views(self, namespace: str | Identifier) -> List[Identifier]: + def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple) response = self._session.get(self.url(Endpoints.list_views, namespace=namespace_concat)) @@ -739,7 +740,7 @@ def commit_table( return CommitTableResponse.model_validate_json(response.text) @retry(**_RETRY_ARGS) - def create_namespace(self, namespace: str | Identifier, properties: Properties = EMPTY_DICT) -> None: + def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: namespace_tuple = self._check_valid_namespace_identifier(namespace) payload = {"namespace": namespace_tuple, "properties": properties} response = self._session.post(self.url(Endpoints.create_namespace), json=payload) @@ -749,7 +750,7 @@ def create_namespace(self, namespace: str | Identifier, properties: Properties = _handle_non_200_response(exc, {409: NamespaceAlreadyExistsError}) @retry(**_RETRY_ARGS) - def drop_namespace(self, namespace: str | Identifier) -> None: + def drop_namespace(self, namespace: Union[str, Identifier]) -> None: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) response = self._session.delete(self.url(Endpoints.drop_namespace, namespace=namespace)) @@ -759,7 +760,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None: _handle_non_200_response(exc, {404: NoSuchNamespaceError, 409: NamespaceNotEmptyError}) @retry(**_RETRY_ARGS) - def list_namespaces(self, namespace: str | Identifier = ()) -> List[Identifier]: + def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: namespace_tuple = self.identifier_to_tuple(namespace) response = self._session.get( self.url( @@ -776,7 +777,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> List[Identifier]: return ListNamespaceResponse.model_validate_json(response.text).namespaces @retry(**_RETRY_ARGS) - def load_namespace_properties(self, namespace: str | Identifier) -> Properties: + def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) response = self._session.get(self.url(Endpoints.load_namespace_metadata, namespace=namespace)) @@ -789,7 +790,7 @@ def load_namespace_properties(self, namespace: str | Identifier) -> Properties: @retry(**_RETRY_ARGS) def update_namespace_properties( - self, namespace: str | Identifier, removals: Set[str] | None = None, updates: Properties = EMPTY_DICT + self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT ) -> PropertiesUpdateSummary: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) @@ -807,7 +808,7 @@ def update_namespace_properties( ) @retry(**_RETRY_ARGS) - def namespace_exists(self, namespace: str | Identifier) -> bool: + def namespace_exists(self, namespace: Union[str, Identifier]) -> bool: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) response = self._session.head(self.url(Endpoints.namespace_exists, namespace=namespace)) @@ -825,7 +826,7 @@ def namespace_exists(self, namespace: str | Identifier) -> bool: return False @retry(**_RETRY_ARGS) - def table_exists(self, identifier: str | Identifier) -> bool: + def table_exists(self, identifier: Union[str, Identifier]) -> bool: """Check if a table exists. Args: @@ -851,7 +852,7 @@ def table_exists(self, identifier: str | Identifier) -> bool: return False @retry(**_RETRY_ARGS) - def view_exists(self, identifier: str | Identifier) -> bool: + def view_exists(self, identifier: Union[str, Identifier]) -> bool: """Check if a view exists. Args: @@ -876,7 +877,7 @@ def view_exists(self, identifier: str | Identifier) -> bool: return False @retry(**_RETRY_ARGS) - def drop_view(self, identifier: str) -> None: + def drop_view(self, identifier: Union[str]) -> None: response = self._session.delete( self.url(Endpoints.drop_view, prefixed=True, **self._split_identifier_for_path(identifier, IdentifierKind.VIEW)), ) @@ -890,4 +891,4 @@ def close(self) -> None: This method closes mounted HttpAdapters' pooled connections and any active Proxy pooled connections. """ - self._session.close() + self._session.close() \ No newline at end of file From 59dfe20ed0d4ae65a3b94fc03af41c48380c0af1 Mon Sep 17 00:00:00 2001 From: Alexandra Popova Date: Fri, 10 Oct 2025 15:17:43 +0300 Subject: [PATCH 4/9] fix storage credentials type --- pyiceberg/catalog/rest/__init__.py | 25 ++++++++++++++++++++++--- tests/catalog/test_rest.py | 11 ++++++++--- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 7a306a54ff..0b76dbe97d 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -152,11 +152,16 @@ def _retry_hook(retry_state: RetryCallState) -> None: } +class StorageCredential(IcebergBaseModel): + prefix: str = Field() + config: Properties = Field() + + class TableResponse(IcebergBaseModel): metadata_location: Optional[str] = Field(alias="metadata-location", default=None) metadata: TableMetadata config: Properties = Field(default_factory=dict) - storage_credentials: Optional[Properties] = Field(alias="storage-credentials", default=None) + storage_credentials: Optional[List[StorageCredential]] = Field(alias="storage-credentials", default=None) class CreateTableRequest(IcebergBaseModel): @@ -457,7 +462,7 @@ def _response_to_table(self, identifier_tuple: Tuple[str, ...], table_response: io=self._load_file_io( { **table_response.metadata.properties, - **(table_response.storage_credentials or table_response.config), + **self._get_credentials(table_response.storage_credentials, table_response.config), }, table_response.metadata_location, ), @@ -473,13 +478,27 @@ def _response_to_staged_table(self, identifier_tuple: Tuple[str, ...], table_res io=self._load_file_io( { **table_response.metadata.properties, - **(table_response.storage_credentials or table_response.config), + **self._get_credentials(table_response.storage_credentials, table_response.config), }, table_response.metadata_location, ), catalog=self, ) + def _get_credentials(self, storage_credentials: List[StorageCredential], config: Properties) -> Properties: + if storage_credentials: + return self._get_storage_credentials(storage_credentials) + else: + return config + + @staticmethod + def _get_storage_credentials(storage_credentials: List[StorageCredential]) -> Properties: + credentials: List[StorageCredential] = [sc for sc in storage_credentials if sc.prefix.startswith("s3")] + if len(credentials) > 1: + raise ValueError("Multiple S3 storage credentials found") + elif len(credentials) == 1: + return credentials[0].config + def _refresh_token(self) -> None: # Reactive token refresh is atypical - we should proactively refresh tokens in a separate thread # instead of retrying on Auth Exceptions. Keeping refresh behavior for the LegacyOAuth2AuthManager diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index b69fe6d456..01bae68734 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -865,9 +865,14 @@ def test_storage_credentials_over_config(rest_mock: Mocker, example_table_metada "config": { AWS_ACCESS_KEY_ID: "from_config", }, - "storage-credentials": { - AWS_ACCESS_KEY_ID: "from_storage_credentials", - }, + "storage-credentials": [ + { + "prefix": "s3://warehouse/database/", + "config": { + AWS_ACCESS_KEY_ID: "from_storage_credentials", + }, + } + ], } rest_mock.get( From d98327cf2b2708338dc95dca8707705571b25d95 Mon Sep 17 00:00:00 2001 From: Alexandra Popova Date: Fri, 10 Oct 2025 15:30:06 +0300 Subject: [PATCH 5/9] fix condition --- pyiceberg/catalog/rest/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 0b76dbe97d..08138ae737 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -488,8 +488,7 @@ def _response_to_staged_table(self, identifier_tuple: Tuple[str, ...], table_res def _get_credentials(self, storage_credentials: List[StorageCredential], config: Properties) -> Properties: if storage_credentials: return self._get_storage_credentials(storage_credentials) - else: - return config + return config @staticmethod def _get_storage_credentials(storage_credentials: List[StorageCredential]) -> Properties: From a64bcd4cc58bfef964bd815727ccf90b14039139 Mon Sep 17 00:00:00 2001 From: Alexandra Popova Date: Fri, 10 Oct 2025 16:37:56 +0300 Subject: [PATCH 6/9] fix: choosing the most specific prefix for storage credentials --- pyiceberg/catalog/rest/__init__.py | 45 +++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 08138ae737..9daba65258 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -462,7 +462,12 @@ def _response_to_table(self, identifier_tuple: Tuple[str, ...], table_response: io=self._load_file_io( { **table_response.metadata.properties, - **self._get_credentials(table_response.storage_credentials, table_response.config), + **self._get_credentials( + table_response.storage_credentials, + table_response.config, + table_response.metadata_location, + getattr(table_response.metadata, "location", None), + ), }, table_response.metadata_location, ), @@ -478,25 +483,39 @@ def _response_to_staged_table(self, identifier_tuple: Tuple[str, ...], table_res io=self._load_file_io( { **table_response.metadata.properties, - **self._get_credentials(table_response.storage_credentials, table_response.config), + **self._get_credentials( + table_response.storage_credentials, + table_response.config, + table_response.metadata_location, + getattr(table_response.metadata, "location", None), + ), }, table_response.metadata_location, ), catalog=self, ) - def _get_credentials(self, storage_credentials: List[StorageCredential], config: Properties) -> Properties: - if storage_credentials: - return self._get_storage_credentials(storage_credentials) - return config - @staticmethod - def _get_storage_credentials(storage_credentials: List[StorageCredential]) -> Properties: - credentials: List[StorageCredential] = [sc for sc in storage_credentials if sc.prefix.startswith("s3")] - if len(credentials) > 1: - raise ValueError("Multiple S3 storage credentials found") - elif len(credentials) == 1: - return credentials[0].config + def _get_credentials( + storage_credentials: Optional[List[StorageCredential]], + config: Properties, + metadata_location: Optional[str], + table_location: Optional[str], + ) -> Properties: + if not storage_credentials: + return config + + target = metadata_location or table_location + if not target: + return config + + # Choose the most specific (longest) matching prefix + matching: List[StorageCredential] = [sc for sc in storage_credentials if target.startswith(sc.prefix)] + if not matching: + return config + + selected = max(matching, key=lambda sc: len(sc.prefix)) + return selected.config def _refresh_token(self) -> None: # Reactive token refresh is atypical - we should proactively refresh tokens in a separate thread From 4a080ef6a7c86cef9a3a0558d8ac490684181b96 Mon Sep 17 00:00:00 2001 From: Alexandra Popova Date: Fri, 10 Oct 2025 17:58:06 +0300 Subject: [PATCH 7/9] add signing commit --- tests/catalog/test_rest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 01bae68734..af94e10b13 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -872,7 +872,7 @@ def test_storage_credentials_over_config(rest_mock: Mocker, example_table_metada AWS_ACCESS_KEY_ID: "from_storage_credentials", }, } - ], + ] } rest_mock.get( From 656b07d337148b368b18d5932acdc683d0ac870c Mon Sep 17 00:00:00 2001 From: Alexandra Popova Date: Fri, 10 Oct 2025 18:02:34 +0300 Subject: [PATCH 8/9] add signing commit --- tests/catalog/test_rest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index af94e10b13..01bae68734 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -872,7 +872,7 @@ def test_storage_credentials_over_config(rest_mock: Mocker, example_table_metada AWS_ACCESS_KEY_ID: "from_storage_credentials", }, } - ] + ], } rest_mock.get( From b16b2af29e1807ebf6b656a9546c4740f3a95436 Mon Sep 17 00:00:00 2001 From: Alexandra Popova Date: Mon, 13 Oct 2025 11:20:08 +0300 Subject: [PATCH 9/9] fix: lintering --- pyiceberg/catalog/rest/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 9daba65258..9027cc63a5 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -928,4 +928,4 @@ def close(self) -> None: This method closes mounted HttpAdapters' pooled connections and any active Proxy pooled connections. """ - self._session.close() \ No newline at end of file + self._session.close()