Skip to content

Commit 01c1563

Browse files
committed
Adding patch to opensearch backend.
1 parent e0bd94f commit 01c1563

File tree

2 files changed

+227
-27
lines changed

2 files changed

+227
-27
lines changed

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
ElasticsearchSettings as SyncElasticsearchSettings,
2525
)
2626
from stac_fastapi.types.errors import ConflictError, NotFoundError
27+
from stac_fastapi.types.links import resolve_links
2728
from stac_fastapi.types.stac import (
2829
Collection,
2930
Item,
@@ -764,11 +765,8 @@ async def merge_patch_item(
764765
base_url: (str): The base URL used for constructing URLs for the item.
765766
refresh (bool, optional): Refresh the index after performing the operation. Defaults to True.
766767
767-
Raises:
768-
ConflictError: If the item already exists in the database.
769-
770768
Returns:
771-
None
769+
patched item.
772770
"""
773771
operations = merge_to_operations(item)
774772

@@ -797,11 +795,8 @@ async def json_patch_item(
797795
base_url (str): The base URL used for constructing URLs for the item.
798796
refresh (bool, optional): Refresh the index after performing the operation. Defaults to True.
799797
800-
Raises:
801-
ConflictError: If the item already exists in the database.
802-
803798
Returns:
804-
None
799+
patched item.
805800
"""
806801
new_item_id = None
807802
new_collection_id = None
@@ -1000,21 +995,18 @@ async def merge_patch_collection(
1000995
collection: PartialCollection,
1001996
base_url: str,
1002997
refresh: bool = True,
1003-
) -> Item:
998+
) -> Collection:
1004999
"""Database logic for merge patching a collection following RF7396.
10051000
10061001
Args:
1007-
collection_id(str): Collection that item belongs to.
1008-
item_id(str): Id of item to be patched.
1009-
item (PartialItem): The partial item to be updated.
1010-
base_url: (str): The base URL used for constructing URLs for the item.
1002+
collection_id(str): Id of collection to be patched.
1003+
collection (PartialCollection): The partial collection to be updated.
1004+
base_url: (str): The base URL used for constructing links.
10111005
refresh (bool, optional): Refresh the index after performing the operation. Defaults to True.
10121006
1013-
Raises:
1014-
ConflictError: If the item already exists in the database.
10151007
10161008
Returns:
1017-
None
1009+
patched collection.
10181010
"""
10191011
operations = merge_to_operations(collection)
10201012

@@ -1031,21 +1023,17 @@ async def json_patch_collection(
10311023
operations: List[PatchOperation],
10321024
base_url: str,
10331025
refresh: bool = True,
1034-
) -> Item:
1035-
"""Database logic for json patching an item following RF6902.
1026+
) -> Collection:
1027+
"""Database logic for json patching a collection following RF6902.
10361028
10371029
Args:
1038-
collection_id(str): Collection that item belongs to.
1039-
item_id(str): Id of item to be patched.
1030+
collection_id(str): Id of collection to be patched.
10401031
operations (list): List of operations to run.
1041-
base_url (str): The base URL used for constructing URLs for the item.
1032+
base_url (str): The base URL used for constructing links.
10421033
refresh (bool, optional): Refresh the index after performing the operation. Defaults to True.
10431034
1044-
Raises:
1045-
ConflictError: If the item already exists in the database.
1046-
10471035
Returns:
1048-
None
1036+
patched collection.
10491037
"""
10501038
new_collection_id = None
10511039
script_operations = []
@@ -1075,6 +1063,7 @@ async def json_patch_collection(
10751063

10761064
if new_collection_id:
10771065
collection["id"] = new_collection_id
1066+
collection["links"] = resolve_links([], base_url)
10781067
await self.update_collection(
10791068
collection_id=collection_id, collection=collection, refresh=False
10801069
)

stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py

Lines changed: 213 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Database logic."""
2+
23
import asyncio
34
import logging
45
import os
@@ -14,13 +15,25 @@
1415

1516
from stac_fastapi.core import serializers
1617
from stac_fastapi.core.extensions import filter
17-
from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon
18+
from stac_fastapi.core.utilities import (
19+
MAX_LIMIT,
20+
bbox2polygon,
21+
merge_to_operations,
22+
operations_to_script,
23+
)
1824
from stac_fastapi.opensearch.config import (
1925
AsyncOpensearchSettings as AsyncSearchSettings,
2026
)
2127
from stac_fastapi.opensearch.config import OpensearchSettings as SyncSearchSettings
2228
from stac_fastapi.types.errors import ConflictError, NotFoundError
23-
from stac_fastapi.types.stac import Collection, Item
29+
from stac_fastapi.types.links import resolve_links
30+
from stac_fastapi.types.stac import (
31+
Collection,
32+
Item,
33+
PartialCollection,
34+
PartialItem,
35+
PatchOperation,
36+
)
2437

2538
logger = logging.getLogger(__name__)
2639

@@ -767,6 +780,123 @@ async def create_item(self, item: Item, refresh: bool = False):
767780
f"Item {item_id} in collection {collection_id} already exists"
768781
)
769782

783+
async def merge_patch_item(
784+
self,
785+
collection_id: str,
786+
item_id: str,
787+
item: PartialItem,
788+
base_url: str,
789+
refresh: bool = True,
790+
) -> Item:
791+
"""Database logic for merge patching an item following RF7396.
792+
793+
Args:
794+
collection_id(str): Collection that item belongs to.
795+
item_id(str): Id of item to be patched.
796+
item (PartialItem): The partial item to be updated.
797+
base_url: (str): The base URL used for constructing URLs for the item.
798+
refresh (bool, optional): Refresh the index after performing the operation. Defaults to True.
799+
800+
Returns:
801+
patched item.
802+
"""
803+
operations = merge_to_operations(item)
804+
805+
return await self.json_patch_item(
806+
collection_id=collection_id,
807+
item_id=item_id,
808+
operations=operations,
809+
base_url=base_url,
810+
refresh=refresh,
811+
)
812+
813+
async def json_patch_item(
814+
self,
815+
collection_id: str,
816+
item_id: str,
817+
operations: List[PatchOperation],
818+
base_url: str,
819+
refresh: bool = True,
820+
) -> Item:
821+
"""Database logic for json patching an item following RF6902.
822+
823+
Args:
824+
collection_id(str): Collection that item belongs to.
825+
item_id(str): Id of item to be patched.
826+
operations (list): List of operations to run.
827+
base_url (str): The base URL used for constructing URLs for the item.
828+
refresh (bool, optional): Refresh the index after performing the operation. Defaults to True.
829+
830+
Returns:
831+
patched item.
832+
"""
833+
new_item_id = None
834+
new_collection_id = None
835+
script_operations = []
836+
837+
for operation in operations:
838+
if operation["op"] in ["add", "replace"]:
839+
if (
840+
operation["path"] == "collection"
841+
and collection_id != operation["value"]
842+
):
843+
await self.check_collection_exists(collection_id=operation["value"])
844+
new_collection_id = operation["value"]
845+
846+
if operation["path"] == "id" and item_id != operation["value"]:
847+
new_item_id = operation["value"]
848+
849+
else:
850+
script_operations.append(operation)
851+
852+
script = operations_to_script(script_operations)
853+
854+
if not new_collection_id and not new_item_id:
855+
await self.client.update(
856+
index=index_by_collection_id(collection_id),
857+
id=mk_item_id(item_id, collection_id),
858+
script=script,
859+
refresh=refresh,
860+
)
861+
862+
if new_collection_id:
863+
await self.client.reindex(
864+
body={
865+
"dest": {"index": f"{ITEMS_INDEX_PREFIX}{operation['value']}"},
866+
"source": {
867+
"index": f"{ITEMS_INDEX_PREFIX}{collection_id}",
868+
"query": {"term": {"id": {"value": item_id}}},
869+
},
870+
"script": {
871+
"lang": "painless",
872+
"source": (
873+
f"""ctx._id = ctx._id.replace('{collection_id}', '{operation["value"]}');"""
874+
f"""ctx._source.collection = '{operation["value"]}';"""
875+
+ script
876+
),
877+
},
878+
},
879+
wait_for_completion=True,
880+
refresh=False,
881+
)
882+
883+
item = await self.get_one_item(collection_id, item_id)
884+
885+
if new_item_id:
886+
item["id"] = new_item_id
887+
item = await self.prep_create_item(item=item, base_url=base_url)
888+
await self.create_item(item=item, refresh=False)
889+
890+
if new_item_id or new_collection_id:
891+
892+
await self.delete_item(
893+
item_id=item_id,
894+
collection_id=collection_id,
895+
refresh=refresh,
896+
)
897+
898+
return item
899+
770900
async def delete_item(
771901
self, item_id: str, collection_id: str, refresh: bool = False
772902
):
@@ -891,6 +1021,87 @@ async def update_collection(
8911021
refresh=refresh,
8921022
)
8931023

1024+
async def merge_patch_collection(
1025+
self,
1026+
collection_id: str,
1027+
collection: PartialCollection,
1028+
base_url: str,
1029+
refresh: bool = True,
1030+
) -> Collection:
1031+
"""Database logic for merge patching a collection following RF7396.
1032+
1033+
Args:
1034+
collection_id(str): Id of collection to be patched.
1035+
collection (PartialCollection): The partial collection to be updated.
1036+
base_url: (str): The base URL used for constructing links.
1037+
refresh (bool, optional): Refresh the index after performing the operation. Defaults to True.
1038+
1039+
1040+
Returns:
1041+
patched collection.
1042+
"""
1043+
operations = merge_to_operations(collection)
1044+
1045+
return await self.json_patch_collection(
1046+
collection_id=collection_id,
1047+
operations=operations,
1048+
base_url=base_url,
1049+
refresh=refresh,
1050+
)
1051+
1052+
async def json_patch_collection(
1053+
self,
1054+
collection_id: str,
1055+
operations: List[PatchOperation],
1056+
base_url: str,
1057+
refresh: bool = True,
1058+
) -> Collection:
1059+
"""Database logic for json patching a collection following RF6902.
1060+
1061+
Args:
1062+
collection_id(str): Id of collection to be patched.
1063+
operations (list): List of operations to run.
1064+
base_url (str): The base URL used for constructing links.
1065+
refresh (bool, optional): Refresh the index after performing the operation. Defaults to True.
1066+
1067+
Returns:
1068+
patched collection.
1069+
"""
1070+
new_collection_id = None
1071+
script_operations = []
1072+
1073+
for operation in operations:
1074+
if (
1075+
operation["op"] in ["add", "replace"]
1076+
and operation["path"] == "collection"
1077+
and collection_id != operation["value"]
1078+
):
1079+
new_collection_id = operation["value"]
1080+
1081+
else:
1082+
script_operations.append(operation)
1083+
1084+
script = operations_to_script(script_operations)
1085+
1086+
if not new_collection_id:
1087+
await self.client.update(
1088+
index=COLLECTIONS_INDEX,
1089+
id=collection_id,
1090+
script=script,
1091+
refresh=refresh,
1092+
)
1093+
1094+
collection = await self.find_collection(collection_id)
1095+
1096+
if new_collection_id:
1097+
collection["id"] = new_collection_id
1098+
collection["links"] = resolve_links([], base_url)
1099+
await self.update_collection(
1100+
collection_id=collection_id, collection=collection, refresh=False
1101+
)
1102+
1103+
return collection
1104+
8941105
async def delete_collection(self, collection_id: str, refresh: bool = False):
8951106
"""Delete a collection from the database.
8961107

0 commit comments

Comments
 (0)