diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index da6df4c3f593..8d3d914a9ed5 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -4,6 +4,7 @@ #### Features Added * Added merge support. See [PR 42924](https://github.com/Azure/azure-sdk-for-python/pull/42924). +* Added support for priority-based throttling at the client level for sync and async clients. See [PR 43917](https://github.com/Azure/azure-sdk-for-python/pull/43917) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py index dd9479280a34..4e08365e2c47 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py @@ -132,6 +132,9 @@ def __init__( # pylint: disable=too-many-statements The connection policy for the client. :param documents.ConsistencyLevel consistency_level: The default consistency policy for client operations. + :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for the + client. Once the user has reached their provisioned throughput, low priority requests are throttled + before high priority requests start getting throttled. Feature must first be enabled at the account level. """ self.client_id = str(uuid.uuid4()) self.url_connection = url_connection @@ -165,6 +168,10 @@ def __init__( # pylint: disable=too-many-statements if throughput_bucket: self.default_headers[http_constants.HttpHeaders.ThroughputBucket] = throughput_bucket + priority = kwargs.pop('priority', None) + if priority: + self.default_headers[http_constants.HttpHeaders.PriorityLevel] = priority + # Keeps the latest response headers from the server. self.last_response_headers: CaseInsensitiveDict = CaseInsensitiveDict() diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py index 80f031f1c292..67d5d4efa3e9 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py @@ -134,6 +134,9 @@ def __init__( # pylint: disable=too-many-statements The connection policy for the client. :param documents.ConsistencyLevel consistency_level: The default consistency policy for client operations. + :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for the + client. Once the user has reached their provisioned throughput, low priority requests are throttled + before high priority requests start getting throttled. Feature must first be enabled at the account level. """ self.client_id = str(uuid.uuid4()) self.url_connection = url_connection @@ -167,6 +170,10 @@ def __init__( # pylint: disable=too-many-statements if throughput_bucket: self.default_headers[http_constants.HttpHeaders.ThroughputBucket] = throughput_bucket + priority = kwargs.pop('priority', None) + if priority: + self.default_headers[http_constants.HttpHeaders.PriorityLevel] = priority + if consistency_level is not None: self.default_headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level diff --git a/sdk/cosmos/azure-cosmos/samples/examples.py b/sdk/cosmos/azure-cosmos/samples/examples.py index 675e279c9007..7cd6d0b56c84 100644 --- a/sdk/cosmos/azure-cosmos/samples/examples.py +++ b/sdk/cosmos/azure-cosmos/samples/examples.py @@ -126,6 +126,65 @@ # [END priority_level option] +# [START client_level_priority] +# Priority can also be set at the client level, which will apply to all requests made by that client. +# This is useful when you want all operations from a particular client to have the same priority. +# The client-level priority is set during client initialization with the `priority` parameter. + +# Create a client with Low priority for all requests +low_priority_client = CosmosClient(url, key, priority="Low") +low_priority_database = low_priority_client.get_database_client(database_name) +low_priority_container = low_priority_database.get_container_client(container_name) + +# Add some items to query +for i in range(1, 4): + low_priority_container.upsert_item( + dict(id="low_priority_item{}".format(i), productName="Widget", productModel="Model {}".format(i)) + ) + +# All requests from this client will have Low priority by default +for queried_item in low_priority_container.query_items( + query='SELECT * FROM products p WHERE p.productName = "Widget"', + enable_cross_partition_query=True +): + print(json.dumps(queried_item, indent=True)) + +# [END client_level_priority] + +# [START request_priority_precedence] +# Request-level priority takes precedence over client-level priority. +# This allows you to override the default priority for specific operations. + +# Create a client with Low priority +client_with_default_priority = CosmosClient(url, key, priority="Low") +database_with_priority = client_with_default_priority.get_database_client(database_name) +container_with_priority = database_with_priority.get_container_client(container_name) + +# Add items with different priority levels to the container +container_with_priority.upsert_item( + dict(id="urgent_item1", productName="Widget", priority="High", productModel="High Priority Model") +) +container_with_priority.upsert_item( + dict(id="normal_item1", productName="Widget", priority="Low", productModel="Low Priority Model") +) + +# This query will use High priority, overriding the client's Low priority setting +for important_item in container_with_priority.query_items( + query='SELECT * FROM products p WHERE p.priority = "High"', + enable_cross_partition_query=True, + priority="High" # Request-level priority overrides client-level priority +): + print(json.dumps(important_item, indent=True)) + +# This query will use the client's default Low priority +for normal_item in container_with_priority.query_items( + query='SELECT * FROM products p WHERE p.priority = "Low"', + enable_cross_partition_query=True +): + print(json.dumps(normal_item, indent=True)) + +# [END request_priority_precedence] + # Delete items from the container. # The Cosmos DB SQL API does not support 'DELETE' queries, # so deletes must be done with the delete_item method diff --git a/sdk/cosmos/azure-cosmos/samples/examples_async.py b/sdk/cosmos/azure-cosmos/samples/examples_async.py index 87751c2b4295..e0c7c996e1f1 100644 --- a/sdk/cosmos/azure-cosmos/samples/examples_async.py +++ b/sdk/cosmos/azure-cosmos/samples/examples_async.py @@ -122,12 +122,69 @@ async def examples_async(): # then Azure Cosmos DB will throttle low priority requests to allow high priority requests to execute. # Can be used for Read, Write, and Query operations. This is specified with the `priority` keyword. # the value can either be low or high. + async for queried_item in container.query_items( query='SELECT * FROM products p WHERE p.productModel <> "DISCONTINUED"', priority="High" ): print(json.dumps(queried_item, indent=True)) # [END priority_level option] + # [START client_level_priority] + # Priority can also be set at the client level, which will apply to all requests made by that client. + # This is useful when you want all operations from a particular client to have the same priority. + # The client-level priority is set during client initialization with the `priority` parameter. + + # Create a client with Low priority for all requests + async with CosmosClient(url, key, priority="Low") as low_priority_client: + low_priority_database = low_priority_client.get_database_client(database_name) + low_priority_container = low_priority_database.get_container_client(container_name) + + # Add some items to query + for i in range(1, 4): + await low_priority_container.upsert_item( + dict(id="low_priority_item{}".format(i), productName="Widget", productModel="Model {}".format(i)) + ) + + # All requests from this client will have Low priority by default + async for queried_item in low_priority_container.query_items( + query='SELECT * FROM products p WHERE p.productName = "Widget"' + ): + print(json.dumps(queried_item, indent=True)) + + # [END client_level_priority] + + # [START request_priority_precedence] + # Request-level priority takes precedence over client-level priority. + # This allows you to override the default priority for specific operations. + + # Create a client with Low priority + async with CosmosClient(url, key, priority="Low") as client_with_default_priority: + database_with_priority = client_with_default_priority.get_database_client(database_name) + container_with_priority = database_with_priority.get_container_client(container_name) + + # Add items with different priority levels to the container + await container_with_priority.upsert_item( + dict(id="urgent_item1", productName="Widget", priority="Low", productModel="Low Priority Model") + ) + await container_with_priority.upsert_item( + dict(id="normal_item1", productName="Widget", priority="High", productModel="High Priority Model") + ) + + # This query will use High priority, overriding the client's Low priority setting + async for important_item in container_with_priority.query_items( + query='SELECT * FROM products p WHERE p.priority = "High"', + priority="High" # Request-level priority overrides client-level priority + ): + print(json.dumps(important_item, indent=True)) + + # This query will use the client's default Low priority + async for normal_item in container_with_priority.query_items( + query='SELECT * FROM products p WHERE p.priority = "Low"' + ): + print(json.dumps(normal_item, indent=True)) + + # [END request_priority_precedence] + # Delete items from the container. # The Cosmos DB SQL API does not support 'DELETE' queries, # so deletes must be done with the delete_item method diff --git a/sdk/cosmos/azure-cosmos/tests/test_headers.py b/sdk/cosmos/azure-cosmos/tests/test_headers.py index 8d0673f0f446..7aeb6afed067 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_headers.py +++ b/sdk/cosmos/azure-cosmos/tests/test_headers.py @@ -15,6 +15,9 @@ client_throughput_bucket_number = 2 request_throughput_bucket_number = 3 +client_priority = "Low" +request_priority = "High" + def client_raw_response_hook(response): assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket] == str(client_throughput_bucket_number)) @@ -23,6 +26,14 @@ def request_raw_response_hook(response): assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket] == str(request_throughput_bucket_number)) +def client_priority_raw_response_hook(response): + assert (response.http_request.headers[http_constants.HttpHeaders.PriorityLevel] + == client_priority) + +def request_priority_raw_response_hook(response): + assert (response.http_request.headers[http_constants.HttpHeaders.PriorityLevel] + == request_priority) + def partition_merge_support_response_hook(raw_response): header = raw_response.http_request.headers assert http_constants.HttpHeaders.SDKSupportedCapabilities in header @@ -290,5 +301,24 @@ def test_partition_merge_support_header(self): # base method to set the header(GetHeaders). self.container.read(raw_response_hook=partition_merge_support_response_hook) + def test_client_level_priority(self): + # Test that priority level set at client level is used for all requests + cosmos_client.CosmosClient(self.host, self.masterKey, + priority=client_priority, + raw_response_hook=client_priority_raw_response_hook) + + def test_request_precedence_priority(self): + # Test that request-level priority takes precedence over client-level priority + client = cosmos_client.CosmosClient(self.host, self.masterKey, + priority=client_priority) + created_db = client.get_database_client(self.configs.TEST_DATABASE_ID) + created_container = created_db.get_container_client(self.configs.TEST_MULTI_PARTITION_CONTAINER_ID) + + # Create an item with request-level priority that overrides client-level priority + created_container.create_item( + body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}, + priority=request_priority, + raw_response_hook=request_priority_raw_response_hook) + if __name__ == "__main__": unittest.main() diff --git a/sdk/cosmos/azure-cosmos/tests/test_headers_async.py b/sdk/cosmos/azure-cosmos/tests/test_headers_async.py index dfbc28f9a8d4..d0453f5bf3fc 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_headers_async.py +++ b/sdk/cosmos/azure-cosmos/tests/test_headers_async.py @@ -16,6 +16,9 @@ client_throughput_bucket_number = 2 request_throughput_bucket_number = 3 +client_priority = "Low" +request_priority = "High" + async def client_raw_response_hook(response): assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket] == str(client_throughput_bucket_number)) @@ -24,6 +27,14 @@ async def request_raw_response_hook(response): assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket] == str(request_throughput_bucket_number)) +async def client_priority_raw_response_hook(response): + assert (response.http_request.headers[http_constants.HttpHeaders.PriorityLevel] + == client_priority) + +async def request_priority_raw_response_hook(response): + assert (response.http_request.headers[http_constants.HttpHeaders.PriorityLevel] + == request_priority) + class ClientIDVerificationError(Exception): """Custom exception for client ID verification errors.""" @@ -236,5 +247,28 @@ async def test_partition_merge_support_header(self): # base method to set the header(GetHeaders). await self.container.read(raw_response_hook=partition_merge_support_response_hook) + async def test_client_level_priority_async(self): + # Test that priority level set at client level is used for all requests + async with CosmosClient(self.host, self.masterKey, + priority=client_priority, + raw_response_hook=client_priority_raw_response_hook) as client: + # Make a request to trigger the hook + database = client.get_database_client(self.configs.TEST_DATABASE_ID) + container = database.get_container_client(self.configs.TEST_MULTI_PARTITION_CONTAINER_ID) + created_item = await container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) + + async def test_request_precedence_priority_async(self): + # Test that request-level priority takes precedence over client-level priority + async with CosmosClient(self.host, self.masterKey, + priority=client_priority) as client: + database = client.get_database_client(self.configs.TEST_DATABASE_ID) + created_container = database.get_container_client(self.configs.TEST_MULTI_PARTITION_CONTAINER_ID) + + # Create an item with request-level priority that overrides client-level priority + await created_container.create_item( + body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}, + priority=request_priority, + raw_response_hook=request_priority_raw_response_hook) + if __name__ == "__main__": unittest.main() diff --git a/sdk/cosmos/azure-cosmos/tests/test_vector_policy.py b/sdk/cosmos/azure-cosmos/tests/test_vector_policy.py index 5b63e8b76ef3..214896e12a60 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_vector_policy.py +++ b/sdk/cosmos/azure-cosmos/tests/test_vector_policy.py @@ -60,8 +60,8 @@ def test_create_vector_embedding_container(self): indexing_policy = { "vectorIndexes": [ {"path": "/vector1", "type": "flat"}, - {"path": "/vector2", "type": "quantizedFlat", "quantizationByteSize": 8}, - {"path": "/vector3", "type": "diskANN", "quantizationByteSize": 8, "vectorIndexShardKey": ["/city"], "indexingSearchListSize": 50} + {"path": "/vector2", "type": "quantizedFlat", "quantizerType": "product", "quantizationByteSize": 8}, + {"path": "/vector3", "type": "diskANN", "quantizerType": "product", "quantizationByteSize": 8, "vectorIndexShardKey": ["/city"], "indexingSearchListSize": 50} ] } vector_embedding_policy = { @@ -101,7 +101,7 @@ def test_create_vector_embedding_container(self): # Pass a vector indexing policy with hierarchical vectorIndexShardKey value indexing_policy = { "vectorIndexes": [ - {"path": "/vector2", "type": "diskANN", 'quantizationByteSize': 64, 'indexingSearchListSize': 100, "vectorIndexShardKey": ["/country/city"]}] + {"path": "/vector2", "type": "diskANN", "quantizerType": "product", 'quantizationByteSize': 64, 'indexingSearchListSize': 100, "vectorIndexShardKey": ["/country/city"]}] } container_id = "vector_container" + str(uuid.uuid4()) created_container = self.test_db.create_container( @@ -149,6 +149,7 @@ def test_replace_vector_indexing_policy(self): { "path": "/vector1", "type": "diskANN", + "quantizerType": "product", "quantizationByteSize": 128, "indexingSearchListSize": 100 } @@ -179,6 +180,7 @@ def test_replace_vector_indexing_policy(self): { "path": "/vector1", "type": "diskANN", + "quantizerType": "product", "quantizationByteSize": 128, "indexingSearchListSize": 100 }] @@ -407,17 +409,6 @@ def test_fail_replace_vector_indexing_policy(self): assert e.status_code == 400 assert ("The Vector Indexing Policy's path::/vector1 not matching in Embedding's path." in e.http_error_message) - # don't provide vector indexing policy - try: - self.test_db.replace_container( - created_container, - PartitionKey(path="/id"), - vector_embedding_policy=vector_embedding_policy) - pytest.fail("Container replace should have failed for missing indexing policy.") - except exceptions.CosmosHttpResponseError as e: - assert e.status_code == 400 - assert ("The Vector Indexing Policy cannot be changed in Collection Replace." - in e.http_error_message) # using a new indexing policy new_indexing_policy = { "vectorIndexes": [ @@ -451,7 +442,7 @@ def test_fail_replace_vector_indexing_policy(self): pytest.fail("Container replace should have failed for new embedding policy.") except exceptions.CosmosHttpResponseError as e: assert e.status_code == 400 - assert ("The Vector Embedding Policy cannot be changed in Collection Replace" + assert ("Paths in existing embedding policy cannot be modified in Collection Replace" in e.http_error_message) self.test_db.delete_container(container_id) diff --git a/sdk/cosmos/azure-cosmos/tests/test_vector_policy_async.py b/sdk/cosmos/azure-cosmos/tests/test_vector_policy_async.py index 8e680a946020..61c623b79270 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_vector_policy_async.py +++ b/sdk/cosmos/azure-cosmos/tests/test_vector_policy_async.py @@ -51,9 +51,9 @@ async def test_create_vector_embedding_container_async(self): "vectorIndexes": [ {"path": "/vector1", "type": "flat"}, - {"path": "/vector2", "type": "quantizedFlat", "quantizationByteSize": 64, "vectorIndexShardKey": ["/city"]}, + {"path": "/vector2", "type": "quantizedFlat", "quantizerType": "product", "quantizationByteSize": 64, "vectorIndexShardKey": ["/city"]}, - {"path": "/vector3", "type": "diskANN", "quantizationByteSize": 8, "indexingSearchListSize": 50} + {"path": "/vector3", "type": "diskANN", "quantizerType": "product", "quantizationByteSize": 8, "indexingSearchListSize": 50} ] } vector_embedding_policy = { @@ -124,6 +124,7 @@ async def test_replace_vector_indexing_policy_async(self): { "path": "/vector1", "type": "diskANN", + "quantizerType": "product", "quantizationByteSize": 128, "indexingSearchListSize": 100 } @@ -154,6 +155,7 @@ async def test_replace_vector_indexing_policy_async(self): { "path": "/vector1", "type": "diskANN", + "quantizerType": "product", "quantizationByteSize": 128, "indexingSearchListSize": 100 }] @@ -338,17 +340,6 @@ async def test_fail_replace_vector_indexing_policy_async(self): assert e.status_code == 400 assert ("The Vector Indexing Policy's path::/vector1 not matching in Embedding's path." in e.http_error_message) - # don't provide vector indexing policy - try: - await self.test_db.replace_container( - created_container, - PartitionKey(path="/id"), - vector_embedding_policy=vector_embedding_policy) - pytest.fail("Container replace should have failed for missing indexing policy.") - except exceptions.CosmosHttpResponseError as e: - assert e.status_code == 400 - assert ("The Vector Indexing Policy cannot be changed in Collection Replace." - in e.http_error_message) # using a new indexing policy new_indexing_policy = { "vectorIndexes": [ @@ -382,7 +373,7 @@ async def test_fail_replace_vector_indexing_policy_async(self): pytest.fail("Container replace should have failed for new embedding policy.") except exceptions.CosmosHttpResponseError as e: assert e.status_code == 400 - assert ("The Vector Embedding Policy cannot be changed in Collection Replace" + assert ("Paths in existing embedding policy cannot be modified in Collection Replace" in e.http_error_message) await self.test_db.delete_container(container_id) diff --git a/sdk/cosmos/test-resources.bicep b/sdk/cosmos/test-resources.bicep index cfaee9a3ff0f..c99344adfeb5 100644 --- a/sdk/cosmos/test-resources.bicep +++ b/sdk/cosmos/test-resources.bicep @@ -39,7 +39,7 @@ var multiRegionConfiguration = [ isZoneRedundant: false } { - locationName: 'West US' + locationName: 'West Central US' provisioningState: 'Succeeded' failoverPriority: 1 isZoneRedundant: false