Skip to content

Commit ae6a5c2

Browse files
andrewmathew1Andrew Mathewsimorenoh
authored
Added Priority Based Execution for Sync and Async Clients (#43917)
* Added priority level for clients * added to changelog and docstring * changed item to queried_item for pylint * made suggested changes * fixed sample bug * Update sdk/cosmos/azure-cosmos/CHANGELOG.md Co-authored-by: Simon Moreno <30335873+simorenoh@users.noreply.github.com> * Changed test pipeline from West US to West Central US * added changes to vector index policy tests * bugfixes for vector policy tests --------- Co-authored-by: Andrew Mathew <andrewmathew@microsoft.com> Co-authored-by: Simon Moreno <30335873+simorenoh@users.noreply.github.com>
1 parent 738735b commit ae6a5c2

File tree

10 files changed

+207
-30
lines changed

10 files changed

+207
-30
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#### Features Added
66
* Added merge support. See [PR 42924](https://github.com/Azure/azure-sdk-for-python/pull/42924).
7+
* Added support for priority-based throttling at the client level for sync and async clients. See [PR 43917](https://github.com/Azure/azure-sdk-for-python/pull/43917)
78

89
#### Breaking Changes
910

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,9 @@ def __init__( # pylint: disable=too-many-statements
132132
The connection policy for the client.
133133
:param documents.ConsistencyLevel consistency_level:
134134
The default consistency policy for client operations.
135+
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for the
136+
client. Once the user has reached their provisioned throughput, low priority requests are throttled
137+
before high priority requests start getting throttled. Feature must first be enabled at the account level.
135138
"""
136139
self.client_id = str(uuid.uuid4())
137140
self.url_connection = url_connection
@@ -165,6 +168,10 @@ def __init__( # pylint: disable=too-many-statements
165168
if throughput_bucket:
166169
self.default_headers[http_constants.HttpHeaders.ThroughputBucket] = throughput_bucket
167170

171+
priority = kwargs.pop('priority', None)
172+
if priority:
173+
self.default_headers[http_constants.HttpHeaders.PriorityLevel] = priority
174+
168175
# Keeps the latest response headers from the server.
169176
self.last_response_headers: CaseInsensitiveDict = CaseInsensitiveDict()
170177

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ def __init__( # pylint: disable=too-many-statements
134134
The connection policy for the client.
135135
:param documents.ConsistencyLevel consistency_level:
136136
The default consistency policy for client operations.
137+
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for the
138+
client. Once the user has reached their provisioned throughput, low priority requests are throttled
139+
before high priority requests start getting throttled. Feature must first be enabled at the account level.
137140
"""
138141
self.client_id = str(uuid.uuid4())
139142
self.url_connection = url_connection
@@ -167,6 +170,10 @@ def __init__( # pylint: disable=too-many-statements
167170
if throughput_bucket:
168171
self.default_headers[http_constants.HttpHeaders.ThroughputBucket] = throughput_bucket
169172

173+
priority = kwargs.pop('priority', None)
174+
if priority:
175+
self.default_headers[http_constants.HttpHeaders.PriorityLevel] = priority
176+
170177
if consistency_level is not None:
171178
self.default_headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level
172179

sdk/cosmos/azure-cosmos/samples/examples.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,65 @@
126126

127127
# [END priority_level option]
128128

129+
# [START client_level_priority]
130+
# Priority can also be set at the client level, which will apply to all requests made by that client.
131+
# This is useful when you want all operations from a particular client to have the same priority.
132+
# The client-level priority is set during client initialization with the `priority` parameter.
133+
134+
# Create a client with Low priority for all requests
135+
low_priority_client = CosmosClient(url, key, priority="Low")
136+
low_priority_database = low_priority_client.get_database_client(database_name)
137+
low_priority_container = low_priority_database.get_container_client(container_name)
138+
139+
# Add some items to query
140+
for i in range(1, 4):
141+
low_priority_container.upsert_item(
142+
dict(id="low_priority_item{}".format(i), productName="Widget", productModel="Model {}".format(i))
143+
)
144+
145+
# All requests from this client will have Low priority by default
146+
for queried_item in low_priority_container.query_items(
147+
query='SELECT * FROM products p WHERE p.productName = "Widget"',
148+
enable_cross_partition_query=True
149+
):
150+
print(json.dumps(queried_item, indent=True))
151+
152+
# [END client_level_priority]
153+
154+
# [START request_priority_precedence]
155+
# Request-level priority takes precedence over client-level priority.
156+
# This allows you to override the default priority for specific operations.
157+
158+
# Create a client with Low priority
159+
client_with_default_priority = CosmosClient(url, key, priority="Low")
160+
database_with_priority = client_with_default_priority.get_database_client(database_name)
161+
container_with_priority = database_with_priority.get_container_client(container_name)
162+
163+
# Add items with different priority levels to the container
164+
container_with_priority.upsert_item(
165+
dict(id="urgent_item1", productName="Widget", priority="High", productModel="High Priority Model")
166+
)
167+
container_with_priority.upsert_item(
168+
dict(id="normal_item1", productName="Widget", priority="Low", productModel="Low Priority Model")
169+
)
170+
171+
# This query will use High priority, overriding the client's Low priority setting
172+
for important_item in container_with_priority.query_items(
173+
query='SELECT * FROM products p WHERE p.priority = "High"',
174+
enable_cross_partition_query=True,
175+
priority="High" # Request-level priority overrides client-level priority
176+
):
177+
print(json.dumps(important_item, indent=True))
178+
179+
# This query will use the client's default Low priority
180+
for normal_item in container_with_priority.query_items(
181+
query='SELECT * FROM products p WHERE p.priority = "Low"',
182+
enable_cross_partition_query=True
183+
):
184+
print(json.dumps(normal_item, indent=True))
185+
186+
# [END request_priority_precedence]
187+
129188
# Delete items from the container.
130189
# The Cosmos DB SQL API does not support 'DELETE' queries,
131190
# so deletes must be done with the delete_item method

sdk/cosmos/azure-cosmos/samples/examples_async.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,69 @@ async def examples_async():
122122
# then Azure Cosmos DB will throttle low priority requests to allow high priority requests to execute.
123123
# Can be used for Read, Write, and Query operations. This is specified with the `priority` keyword.
124124
# the value can either be low or high.
125+
125126
async for queried_item in container.query_items(
126127
query='SELECT * FROM products p WHERE p.productModel <> "DISCONTINUED"', priority="High"
127128
):
128129
print(json.dumps(queried_item, indent=True))
129130
# [END priority_level option]
130131

132+
# [START client_level_priority]
133+
# Priority can also be set at the client level, which will apply to all requests made by that client.
134+
# This is useful when you want all operations from a particular client to have the same priority.
135+
# The client-level priority is set during client initialization with the `priority` parameter.
136+
137+
# Create a client with Low priority for all requests
138+
async with CosmosClient(url, key, priority="Low") as low_priority_client:
139+
low_priority_database = low_priority_client.get_database_client(database_name)
140+
low_priority_container = low_priority_database.get_container_client(container_name)
141+
142+
# Add some items to query
143+
for i in range(1, 4):
144+
await low_priority_container.upsert_item(
145+
dict(id="low_priority_item{}".format(i), productName="Widget", productModel="Model {}".format(i))
146+
)
147+
148+
# All requests from this client will have Low priority by default
149+
async for queried_item in low_priority_container.query_items(
150+
query='SELECT * FROM products p WHERE p.productName = "Widget"'
151+
):
152+
print(json.dumps(queried_item, indent=True))
153+
154+
# [END client_level_priority]
155+
156+
# [START request_priority_precedence]
157+
# Request-level priority takes precedence over client-level priority.
158+
# This allows you to override the default priority for specific operations.
159+
160+
# Create a client with Low priority
161+
async with CosmosClient(url, key, priority="Low") as client_with_default_priority:
162+
database_with_priority = client_with_default_priority.get_database_client(database_name)
163+
container_with_priority = database_with_priority.get_container_client(container_name)
164+
165+
# Add items with different priority levels to the container
166+
await container_with_priority.upsert_item(
167+
dict(id="urgent_item1", productName="Widget", priority="Low", productModel="Low Priority Model")
168+
)
169+
await container_with_priority.upsert_item(
170+
dict(id="normal_item1", productName="Widget", priority="High", productModel="High Priority Model")
171+
)
172+
173+
# This query will use High priority, overriding the client's Low priority setting
174+
async for important_item in container_with_priority.query_items(
175+
query='SELECT * FROM products p WHERE p.priority = "High"',
176+
priority="High" # Request-level priority overrides client-level priority
177+
):
178+
print(json.dumps(important_item, indent=True))
179+
180+
# This query will use the client's default Low priority
181+
async for normal_item in container_with_priority.query_items(
182+
query='SELECT * FROM products p WHERE p.priority = "Low"'
183+
):
184+
print(json.dumps(normal_item, indent=True))
185+
186+
# [END request_priority_precedence]
187+
131188
# Delete items from the container.
132189
# The Cosmos DB SQL API does not support 'DELETE' queries,
133190
# so deletes must be done with the delete_item method

sdk/cosmos/azure-cosmos/tests/test_headers.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515

1616
client_throughput_bucket_number = 2
1717
request_throughput_bucket_number = 3
18+
client_priority = "Low"
19+
request_priority = "High"
20+
1821
def client_raw_response_hook(response):
1922
assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket]
2023
== str(client_throughput_bucket_number))
@@ -23,6 +26,14 @@ def request_raw_response_hook(response):
2326
assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket]
2427
== str(request_throughput_bucket_number))
2528

29+
def client_priority_raw_response_hook(response):
30+
assert (response.http_request.headers[http_constants.HttpHeaders.PriorityLevel]
31+
== client_priority)
32+
33+
def request_priority_raw_response_hook(response):
34+
assert (response.http_request.headers[http_constants.HttpHeaders.PriorityLevel]
35+
== request_priority)
36+
2637
def partition_merge_support_response_hook(raw_response):
2738
header = raw_response.http_request.headers
2839
assert http_constants.HttpHeaders.SDKSupportedCapabilities in header
@@ -290,5 +301,24 @@ def test_partition_merge_support_header(self):
290301
# base method to set the header(GetHeaders).
291302
self.container.read(raw_response_hook=partition_merge_support_response_hook)
292303

304+
def test_client_level_priority(self):
305+
# Test that priority level set at client level is used for all requests
306+
cosmos_client.CosmosClient(self.host, self.masterKey,
307+
priority=client_priority,
308+
raw_response_hook=client_priority_raw_response_hook)
309+
310+
def test_request_precedence_priority(self):
311+
# Test that request-level priority takes precedence over client-level priority
312+
client = cosmos_client.CosmosClient(self.host, self.masterKey,
313+
priority=client_priority)
314+
created_db = client.get_database_client(self.configs.TEST_DATABASE_ID)
315+
created_container = created_db.get_container_client(self.configs.TEST_MULTI_PARTITION_CONTAINER_ID)
316+
317+
# Create an item with request-level priority that overrides client-level priority
318+
created_container.create_item(
319+
body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'},
320+
priority=request_priority,
321+
raw_response_hook=request_priority_raw_response_hook)
322+
293323
if __name__ == "__main__":
294324
unittest.main()

sdk/cosmos/azure-cosmos/tests/test_headers_async.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
client_throughput_bucket_number = 2
1818
request_throughput_bucket_number = 3
19+
client_priority = "Low"
20+
request_priority = "High"
21+
1922
async def client_raw_response_hook(response):
2023
assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket]
2124
== str(client_throughput_bucket_number))
@@ -24,6 +27,14 @@ async def request_raw_response_hook(response):
2427
assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket]
2528
== str(request_throughput_bucket_number))
2629

30+
async def client_priority_raw_response_hook(response):
31+
assert (response.http_request.headers[http_constants.HttpHeaders.PriorityLevel]
32+
== client_priority)
33+
34+
async def request_priority_raw_response_hook(response):
35+
assert (response.http_request.headers[http_constants.HttpHeaders.PriorityLevel]
36+
== request_priority)
37+
2738

2839
class ClientIDVerificationError(Exception):
2940
"""Custom exception for client ID verification errors."""
@@ -236,5 +247,28 @@ async def test_partition_merge_support_header(self):
236247
# base method to set the header(GetHeaders).
237248
await self.container.read(raw_response_hook=partition_merge_support_response_hook)
238249

250+
async def test_client_level_priority_async(self):
251+
# Test that priority level set at client level is used for all requests
252+
async with CosmosClient(self.host, self.masterKey,
253+
priority=client_priority,
254+
raw_response_hook=client_priority_raw_response_hook) as client:
255+
# Make a request to trigger the hook
256+
database = client.get_database_client(self.configs.TEST_DATABASE_ID)
257+
container = database.get_container_client(self.configs.TEST_MULTI_PARTITION_CONTAINER_ID)
258+
created_item = await container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'})
259+
260+
async def test_request_precedence_priority_async(self):
261+
# Test that request-level priority takes precedence over client-level priority
262+
async with CosmosClient(self.host, self.masterKey,
263+
priority=client_priority) as client:
264+
database = client.get_database_client(self.configs.TEST_DATABASE_ID)
265+
created_container = database.get_container_client(self.configs.TEST_MULTI_PARTITION_CONTAINER_ID)
266+
267+
# Create an item with request-level priority that overrides client-level priority
268+
await created_container.create_item(
269+
body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'},
270+
priority=request_priority,
271+
raw_response_hook=request_priority_raw_response_hook)
272+
239273
if __name__ == "__main__":
240274
unittest.main()

sdk/cosmos/azure-cosmos/tests/test_vector_policy.py

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ def test_create_vector_embedding_container(self):
6060
indexing_policy = {
6161
"vectorIndexes": [
6262
{"path": "/vector1", "type": "flat"},
63-
{"path": "/vector2", "type": "quantizedFlat", "quantizationByteSize": 8},
64-
{"path": "/vector3", "type": "diskANN", "quantizationByteSize": 8, "vectorIndexShardKey": ["/city"], "indexingSearchListSize": 50}
63+
{"path": "/vector2", "type": "quantizedFlat", "quantizerType": "product", "quantizationByteSize": 8},
64+
{"path": "/vector3", "type": "diskANN", "quantizerType": "product", "quantizationByteSize": 8, "vectorIndexShardKey": ["/city"], "indexingSearchListSize": 50}
6565
]
6666
}
6767
vector_embedding_policy = {
@@ -101,7 +101,7 @@ def test_create_vector_embedding_container(self):
101101
# Pass a vector indexing policy with hierarchical vectorIndexShardKey value
102102
indexing_policy = {
103103
"vectorIndexes": [
104-
{"path": "/vector2", "type": "diskANN", 'quantizationByteSize': 64, 'indexingSearchListSize': 100, "vectorIndexShardKey": ["/country/city"]}]
104+
{"path": "/vector2", "type": "diskANN", "quantizerType": "product", 'quantizationByteSize': 64, 'indexingSearchListSize': 100, "vectorIndexShardKey": ["/country/city"]}]
105105
}
106106
container_id = "vector_container" + str(uuid.uuid4())
107107
created_container = self.test_db.create_container(
@@ -149,6 +149,7 @@ def test_replace_vector_indexing_policy(self):
149149
{
150150
"path": "/vector1",
151151
"type": "diskANN",
152+
"quantizerType": "product",
152153
"quantizationByteSize": 128,
153154
"indexingSearchListSize": 100
154155
}
@@ -179,6 +180,7 @@ def test_replace_vector_indexing_policy(self):
179180
{
180181
"path": "/vector1",
181182
"type": "diskANN",
183+
"quantizerType": "product",
182184
"quantizationByteSize": 128,
183185
"indexingSearchListSize": 100
184186
}]
@@ -407,17 +409,6 @@ def test_fail_replace_vector_indexing_policy(self):
407409
assert e.status_code == 400
408410
assert ("The Vector Indexing Policy's path::/vector1 not matching in Embedding's path."
409411
in e.http_error_message)
410-
# don't provide vector indexing policy
411-
try:
412-
self.test_db.replace_container(
413-
created_container,
414-
PartitionKey(path="/id"),
415-
vector_embedding_policy=vector_embedding_policy)
416-
pytest.fail("Container replace should have failed for missing indexing policy.")
417-
except exceptions.CosmosHttpResponseError as e:
418-
assert e.status_code == 400
419-
assert ("The Vector Indexing Policy cannot be changed in Collection Replace."
420-
in e.http_error_message)
421412
# using a new indexing policy
422413
new_indexing_policy = {
423414
"vectorIndexes": [
@@ -451,7 +442,7 @@ def test_fail_replace_vector_indexing_policy(self):
451442
pytest.fail("Container replace should have failed for new embedding policy.")
452443
except exceptions.CosmosHttpResponseError as e:
453444
assert e.status_code == 400
454-
assert ("The Vector Embedding Policy cannot be changed in Collection Replace"
445+
assert ("Paths in existing embedding policy cannot be modified in Collection Replace"
455446
in e.http_error_message)
456447
self.test_db.delete_container(container_id)
457448

0 commit comments

Comments
 (0)