Skip to content

Commit 0497631

Browse files
bambrizsimorenoh
andauthored
[Cosmos] Session container fixes new branch (#41678)
* session container fixes * async changes * sync changes * mypy/pylint * Update _session.py * mark query plan as fetched for query * adjust logic after merging * Update _base.py * Update _base.py * tests - missing sync mwr * sync mwr tests, test fixes * Update test_session_async.py * Update test_backwards_compatibility.py * Update test_backwards_compatibility.py * Update test_backwards_compatibility_async.py * Update test_backwards_compatibility_async.py * Update test_backwards_compatibility.py * Update execution_dispatcher.py * merge leftovers * slip * Update test_backwards_compatibility.py * Update test_backwards_compatibility.py * Update test_query_hybrid_search_async.py * further changes, changelog * add tests * typehint * address comments * Update _session.py * Update _base.py * ci tests * merging main * Update _cosmos_client_connection.py * tests * change session token logic * Update _session.py * small fixes * Update _session.py * Update _session.py * updates * update tests that checked for compound tokens - this will no longer be the case * Update test_config.py * Update execution_dispatcher.py * remove query logic * remove partition split testing * oylint * Update dev_requirements.txt * delete duplicate * update tests for partition split * change timeout of split partition key tests reduces timeout from 25 minutes to 7 minutes * update test * Test fixes Fixes tests to accomodate the extra readfeed that may happen during a read item operation. This also includes other general test fixes. * update tests * update tests Updates tests and fixes a key error issue in session.py * add logger to session token * Update _session.py * Update _session.py * Update test_session.py * async changes * small test fixes - align sync and async * small updates * Update documents.py * test updates * Update _session.py * async tests * pylint * tests * logic fixes, test updates * leftover changes * match session logic exactly * propagate headers to request params for retry logic * Update CHANGELOG.md * revert session token dropping * last test updates * Update _session.py * update session token on errors * reformat internal exceptions * change string for constant * address comments --------- Co-authored-by: Simon Moreno <30335873+simorenoh@users.noreply.github.com>
1 parent 0d79413 commit 0497631

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1266
-479
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
#### Breaking Changes
99

1010
#### Bugs Fixed
11+
* Fixed session container session token logic. The SDK will now only send the relevant partition-local session tokens for read document requests and write requests when multi-region writes are enabled, as opposed to the entire compound session token for the container for every document request. See [PR 41678](https://github.com/Azure/azure-sdk-for-python/pull/41678).
12+
* Write requests for single-write region accounts will no longer send session tokens when using session consistency. See [PR 41678](https://github.com/Azure/azure-sdk-for-python/pull/41678).
1113
* Fixed bug where container cache was not being properly updated resulting in unnecessary extra requests. See [PR 42143](https://github.com/Azure/azure-sdk-for-python/pull/42143).
1214

1315
#### Other Changes
@@ -16,7 +18,7 @@
1618
### 4.14.0b1 (2025-07-14)
1719

1820
#### Features Added
19-
* Added option to enable automatic retries for write operations. See [PR 41272](https://github.com/Azure/azure-sdk-for-python/pull/41272).
21+
* Added option to enable automatic retries for write document operations. See [PR 41272](https://github.com/Azure/azure-sdk-for-python/pull/41272).
2022

2123
#### Breaking Changes
2224
* Adds cross region retries when no preferred locations are set. This is only a breaking change for customers using bounded staleness consistency. See [PR 39714](https://github.com/Azure/azure-sdk-for-python/pull/39714)

sdk/cosmos/azure-cosmos/azure/cosmos/_base.py

Lines changed: 75 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@
4545
if TYPE_CHECKING:
4646
from ._cosmos_client_connection import CosmosClientConnection
4747
from .aio._cosmos_client_connection_async import CosmosClientConnection as AsyncClientConnection
48+
from ._request_object import RequestObject
4849

50+
# pylint: disable=protected-access
4951

5052
_COMMON_OPTIONS = {
5153
'initial_headers': 'initialHeaders',
@@ -174,37 +176,9 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
174176
if options.get("indexingDirective"):
175177
headers[http_constants.HttpHeaders.IndexingDirective] = options["indexingDirective"]
176178

177-
consistency_level = None
178-
179-
# get default client consistency level
180-
default_client_consistency_level = headers.get(http_constants.HttpHeaders.ConsistencyLevel)
181-
182-
# set consistency level. check if set via options, this will override the default
179+
# set request consistency level - if session consistency, the client should be setting this on its own
183180
if options.get("consistencyLevel"):
184-
consistency_level = options["consistencyLevel"]
185-
# TODO: move this line outside of if-else cause to remove the code duplication
186-
headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level
187-
elif default_client_consistency_level is not None:
188-
consistency_level = default_client_consistency_level
189-
headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level
190-
191-
# figure out if consistency level for this request is session
192-
is_session_consistency = consistency_level == documents.ConsistencyLevel.Session
193-
194-
# set session token if required
195-
if is_session_consistency is True and not IsMasterResource(resource_type):
196-
# if there is a token set via option, then use it to override default
197-
if options.get("sessionToken"):
198-
headers[http_constants.HttpHeaders.SessionToken] = options["sessionToken"]
199-
else:
200-
# check if the client's default consistency is session (and request consistency level is same),
201-
# then update from session container
202-
if default_client_consistency_level == documents.ConsistencyLevel.Session and \
203-
cosmos_client_connection.session:
204-
# populate session token from the client's session container
205-
headers[http_constants.HttpHeaders.SessionToken] = cosmos_client_connection.session.get_session_token(
206-
path
207-
)
181+
headers[http_constants.HttpHeaders.ConsistencyLevel] = options["consistencyLevel"]
208182

209183
if options.get("enableScanInQuery"):
210184
headers[http_constants.HttpHeaders.EnableScanInQuery] = options["enableScanInQuery"]
@@ -349,6 +323,77 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
349323

350324
return headers
351325

326+
def _is_session_token_request(
327+
cosmos_client_connection: Union["CosmosClientConnection", "AsyncClientConnection"],
328+
headers: dict,
329+
request_object: "RequestObject") -> bool:
330+
consistency_level = headers.get(http_constants.HttpHeaders.ConsistencyLevel)
331+
# Figure out if consistency level for this request is session
332+
is_session_consistency = consistency_level == documents.ConsistencyLevel.Session
333+
334+
# Verify that it is not a metadata request, and that it is either a read request, batch request, or an account
335+
# configured to use multiple write regions. Batch requests are special-cased because they can contain both read and
336+
# write operations, and we want to use session consistency for the read operations.
337+
return (is_session_consistency is True and cosmos_client_connection.session is not None
338+
and not IsMasterResource(request_object.resource_type)
339+
and (documents._OperationType.IsReadOnlyOperation(request_object.operation_type)
340+
or request_object.operation_type == "Batch"
341+
or cosmos_client_connection._global_endpoint_manager.can_use_multiple_write_locations(request_object)))
342+
343+
344+
def set_session_token_header(
345+
cosmos_client_connection: Union["CosmosClientConnection", "AsyncClientConnection"],
346+
headers: dict,
347+
path: str,
348+
request_object: "RequestObject",
349+
options: Mapping[str, Any],
350+
partition_key_range_id: Optional[str] = None) -> None:
351+
# set session token if required
352+
if _is_session_token_request(cosmos_client_connection, headers, request_object):
353+
# if there is a token set via option, then use it to override default
354+
if options.get("sessionToken"):
355+
headers[http_constants.HttpHeaders.SessionToken] = options["sessionToken"]
356+
else:
357+
# check if the client's default consistency is session (and request consistency level is same),
358+
# then update from session container
359+
if headers[http_constants.HttpHeaders.ConsistencyLevel] == documents.ConsistencyLevel.Session and \
360+
cosmos_client_connection.session:
361+
# populate session token from the client's session container
362+
session_token = (
363+
cosmos_client_connection.session.get_session_token(path,
364+
options.get('partitionKey'),
365+
cosmos_client_connection._container_properties_cache,
366+
cosmos_client_connection._routing_map_provider,
367+
partition_key_range_id))
368+
if session_token != "":
369+
headers[http_constants.HttpHeaders.SessionToken] = session_token
370+
371+
async def set_session_token_header_async(
372+
cosmos_client_connection: Union["CosmosClientConnection", "AsyncClientConnection"],
373+
headers: dict,
374+
path: str,
375+
request_object: "RequestObject",
376+
options: Mapping[str, Any],
377+
partition_key_range_id: Optional[str] = None) -> None:
378+
# set session token if required
379+
if _is_session_token_request(cosmos_client_connection, headers, request_object):
380+
# if there is a token set via option, then use it to override default
381+
if options.get("sessionToken"):
382+
headers[http_constants.HttpHeaders.SessionToken] = options["sessionToken"]
383+
else:
384+
# check if the client's default consistency is session (and request consistency level is same),
385+
# then update from session container
386+
if headers[http_constants.HttpHeaders.ConsistencyLevel] == documents.ConsistencyLevel.Session and \
387+
cosmos_client_connection.session:
388+
# populate session token from the client's session container
389+
session_token = \
390+
await cosmos_client_connection.session.get_session_token_async(path,
391+
options.get('partitionKey'),
392+
cosmos_client_connection._container_properties_cache,
393+
cosmos_client_connection._routing_map_provider,
394+
partition_key_range_id)
395+
if session_token != "":
396+
headers[http_constants.HttpHeaders.SessionToken] = session_token
352397

353398
def GetResourceIdOrFullNameFromLink(resource_link: str) -> str:
354399
"""Gets resource id or full name from resource link.

0 commit comments

Comments
 (0)