From a24fa4c694d2049185e64eacf35855f4c11ff37f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Tue, 9 Dec 2025 11:13:20 +0100 Subject: [PATCH 01/18] bump version to 8.0.0a2 --- cognite/client/_version.py | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cognite/client/_version.py b/cognite/client/_version.py index e6819477ea..f4602acee4 100644 --- a/cognite/client/_version.py +++ b/cognite/client/_version.py @@ -1,5 +1,5 @@ from __future__ import annotations -__version__ = "8.0.0a1" # x-release-please-version +__version__ = "8.0.0a2" # x-release-please-version __api_subversion__ = "20230101" diff --git a/pyproject.toml b/pyproject.toml index 02e95f4064..83410410d1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "cognite-sdk" -version = "8.0.0a1" +version = "8.0.0a2" description = "Cognite Python SDK" readme = "README.md" From 612430f8a91bbed358c73d78c8737c898d7bb0c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Tue, 9 Dec 2025 11:34:46 +0100 Subject: [PATCH 02/18] update MIGRATION_GUIDE.md --- MIGRATION_GUIDE.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/MIGRATION_GUIDE.md b/MIGRATION_GUIDE.md index ad510d35fa..22e192aa0d 100644 --- a/MIGRATION_GUIDE.md +++ b/MIGRATION_GUIDE.md @@ -16,6 +16,7 @@ Changes are grouped as follows: ### Optional - **Async Support**: The SDK now provides full async support. The main client is now `AsyncCogniteClient`, but the synchronous `CogniteClient` is still available for backward compatibility. An important implementation detail is that it just wraps `AsyncCogniteClient`. - All helper/utility methods on data classes now have an async variant. A few examples: Class `Asset` has `children` and now also `children_async`, `subtree` and `subtree_async`, class `Function` now has `call` and `call_async`, class `TimeSeries` now has `latest` and `latest_async` etc. +- Instantiating a client has gotten a tiny bit simpler, by allowing either `cluster` or `base_url` to be passed. When passing cluster, it is expected to be on the form 'https://{cluster}.cognitedata.com' - The context manager `FileMultipartUploadSession`, returned by a call to one of the Files API methods multipart_upload_content` or `multipart_upload_content_session`, now also supports async; you can enter with `async with`, and upload parts using `await upload_part_async`. - The SDK now ships with a new mock for the async client, namely `AsyncCogniteClientMock`. Both it and the previous `CogniteClientMock` are greatly improved and provide better type safety, checking of call signatures and spec_set=True is now enforced for all APIs (even the mocked client itself), through the use of `create_autospec` and bottom-up construction of nested APIs. - With the move to an async client, concurrency now works in Pyodide e.g. Jupyter-Lite in the browser. This also means that user interfaces like Streamlit won't freeze while resources from CDF are being fetched! @@ -38,7 +39,6 @@ Changes are grouped as follows: - The `__iter__` method has been removed from all APIs. Use `__call__` instead: `for ts in client.time_series()`. This makes it seamless to pass one or more parameters. - All references to `legacy_name` on time series data classes and API have been removed. - The helper methods on `client.iam`, `compare_capabilities` and `verify_capabilities` no longer support the `ignore_allscope_meaning` parameter. -- The Files API no longer accepts file handles opened in text mode. - The method `load_yaml` on the data class `Query` has been removed. Use `load` instead. - The Templates API has been completely removed from the SDK (the API service has already been shut off) - The separate beta `CogniteClient` has been removed. Note: APIs currently in alpha/beta are (already) implemented directly on the main client and throw warnings on use. @@ -59,7 +59,6 @@ Changes are grouped as follows: - Parameter `partitions` has been removed from all `__call__` methods except for the Raw Rows API (which has special handling for it). It was previosuly being ignored with the added side effect of ignoring `chunk_size` stemming from a very early API design oversight. - The method `retrieve` on the Workflow Versions API no longer accepts `workflow_external_id` and `version` as separate arguments. Pass a single or a sequence of `WorkflowVersionId` (tuples also accepted). - When loading a `ViewProperty` or `ViewPropertyApply`, the resource dictionary must contain the `"connectionType"` key or an error is raised. -- The Files API now expects `pathlib.Path` by default, but keeps the `str` support for now. - The specific exceptions `CogniteDuplicatedError` and `CogniteNotFoundError` should now always be used when appropriate (previously certain API endpoints always used `CogniteAPIError`) - `ModelFailedException` has changed name to `CogniteModelFailedError`. - For `class Transformation`, which used to have an async `run` method, this is now named `run_async` to unify the overall interface. The same applies to the `cancel` and `jobs` methods for the same class, and `update` and `wait` on `TransformationJob`. From 1ff4297adba609e294b21be1341241c6045005bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Tue, 9 Dec 2025 11:37:35 +0100 Subject: [PATCH 03/18] fix bug with credentials refresh, move to JIT before sending req --- cognite/client/_http_client.py | 9 +++++---- tests/tests_unit/test_api_client.py | 20 +++++++++++++------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/cognite/client/_http_client.py b/cognite/client/_http_client.py index f6244cd775..5d80c7e35f 100644 --- a/cognite/client/_http_client.py +++ b/cognite/client/_http_client.py @@ -259,6 +259,9 @@ async def _with_retry( while True: try: async with semaphore: + # Ensure our credentials are not about to expire right before making the request: + if headers is not None: + self.refresh_auth_header(headers) response = await coro_factory() if accepts_json: # Cache .json() return value in order to avoid redecoding JSON if called multiple times @@ -288,7 +291,5 @@ async def _with_retry( # base class for all exceptions that can be raised during a request, so we use it here as a fallback. raise CogniteRequestError from err - retry_tracker.back_off(url) - # During a backoff loop, our credentials might expire, so we check and maybe refresh: - if headers is not None: - self.refresh_auth_header(headers) # TODO: Refactoring needed to make this "prettier" + # If we got here, it means we have decided to retry the request! + retry_tracker.back_off() diff --git a/tests/tests_unit/test_api_client.py b/tests/tests_unit/test_api_client.py index 45588a2d96..d9244c84ad 100644 --- a/tests/tests_unit/test_api_client.py +++ b/tests/tests_unit/test_api_client.py @@ -1735,8 +1735,10 @@ async def test_connection_pool_is_shared_between_clients(self) -> None: async def test_worker_in_backoff_loop_gets_new_token(httpx_mock: HTTPXMock) -> None: + # Right before sending a request, we verify that our token is not about to expire. url = "https://foo.cognitedata.com/api/v1/projects/c/assets/byids" httpx_mock.add_response(method="POST", url=url, status_code=429, json={"error": "Backoff plz"}) + httpx_mock.add_response(method="POST", url=url, status_code=429, json={"error": "Backoff plz"}) httpx_mock.add_response( method="POST", url=url, @@ -1748,16 +1750,20 @@ async def test_worker_in_backoff_loop_gets_new_token(httpx_mock: HTTPXMock) -> N def token_callable() -> str: nonlocal call_count - if call_count < 1: - call_count += 1 - return "outdated-token" - return "valid-token" + call_count += 1 + return f"valid-token-{call_count}" client = CogniteClient(ClientConfig(client_name="a", cluster="foo", credentials=Token(token_callable), project="c")) + get_wrapped_async_client(client).assets._http_client_with_retry.config.backoff_factor = 0.0 # speed up test retries + assert get_or_raise(client.assets.retrieve(id=1)).id == 123 - assert call_count > 0 - assert httpx_mock.get_requests()[0].headers["Authorization"] == "Bearer outdated-token" - assert httpx_mock.get_requests()[1].headers["Authorization"] == "Bearer valid-token" + assert call_count == 4 + requests = httpx_mock.get_requests() + # First request should be 'valid-token-2' (not -1) because the first check-in with the Credentials class on + # "get or maybe refresh token" happens in BasicAsyncAPIClient._configure_headers. + assert requests[0].headers["Authorization"] == "Bearer valid-token-2" + assert requests[1].headers["Authorization"] == "Bearer valid-token-3" + assert requests[2].headers["Authorization"] == "Bearer valid-token-4" @pytest.mark.parametrize("limit, expected_error", ((-2, ValueError), (0, ValueError), ("10", TypeError))) From 14a6364769a08092cb5e316613cdeb9d7a8e7db0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Tue, 9 Dec 2025 12:01:27 +0100 Subject: [PATCH 04/18] update docstr on as_apply with warning on read-only props --- .../data_classes/data_modeling/instances.py | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/cognite/client/data_classes/data_modeling/instances.py b/cognite/client/data_classes/data_modeling/instances.py index ea5413dac2..574b1c84ed 100644 --- a/cognite/client/data_classes/data_modeling/instances.py +++ b/cognite/client/data_classes/data_modeling/instances.py @@ -718,13 +718,16 @@ def __init__( def as_apply(self) -> NodeApply: """ - This is a convenience function for converting the read to a write node. + This is a convenience method for converting from the read version of the ``Node`` to the + write version (``NodeApply``). - It makes the simplifying assumption that all properties are from the same view. Note that this - is not true in general. + Warning: + Properties can be read-only and then the converted write node will fail on ingestion. + Examples are auto-increment properties, or system-controlled ones like ``path`` or ``root`` + (CogniteAsset), or ``isUploaded`` (CogniteFile). Returns: - NodeApply: A write node, NodeApply + NodeApply: A write node, NodeApply, with all properties (even read-only) copied over. """ return NodeApply( @@ -902,13 +905,16 @@ def __init__( def as_apply(self) -> EdgeApply: """ - This is a convenience function for converting the read to a write edge. + This is a convenience method for converting from the read version of the ``Edge`` to the + write version (``EdgeApply``). - It makes the simplifying assumption that all properties are from the same view. Note that this - is not true in general. + Warning: + Properties can be read-only (e.g. if using auto-increment) and then the converted write + edge will fail on ingestion. Returns: - EdgeApply: A write edge, EdgeApply + EdgeApply: A write edge, EdgeApply, with all properties (even read-only) copied over. + """ return EdgeApply( space=self.space, From 1fca832e5d711f70420a5b539ab7306bfcc13c02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Tue, 9 Dec 2025 15:18:18 +0100 Subject: [PATCH 05/18] use Generics to allow DM Query classes to use MutableMapping for with_/select --- .../data_classes/data_modeling/query.py | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/cognite/client/data_classes/data_modeling/query.py b/cognite/client/data_classes/data_modeling/query.py index d400d91171..909e32947f 100644 --- a/cognite/client/data_classes/data_modeling/query.py +++ b/cognite/client/data_classes/data_modeling/query.py @@ -2,9 +2,9 @@ from abc import ABC from collections import UserDict -from collections.abc import Mapping, Sequence +from collections.abc import Mapping, MutableMapping, Sequence from dataclasses import dataclass, field -from typing import TYPE_CHECKING, Any, Literal +from typing import TYPE_CHECKING, Any, Generic, Literal, TypeVar from typing_extensions import Never, Self, assert_never @@ -125,12 +125,16 @@ def _load(cls, resource: dict[str, Any], cognite_client: AsyncCogniteClient | No ) +_T_ResultSetExpression = TypeVar("_T_ResultSetExpression", bound="ResultSetExpressionBase") +_T_Select = TypeVar("_T_Select", bound=SelectBase) + + @dataclass -class QueryBase(CogniteObject, ABC): +class QueryBase(CogniteObject, ABC, Generic[_T_ResultSetExpression, _T_Select]): """Abstract base class for normal queries and sync queries""" - with_: Mapping[str, ResultSetExpressionBase] - select: Mapping[str, SelectBase] + with_: MutableMapping[str, _T_ResultSetExpression] + select: MutableMapping[str, _T_Select] parameters: Mapping[str, PropertyValue] = field(default_factory=dict) cursors: Mapping[str, str | None] = field(default_factory=dict) @@ -163,7 +167,10 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: @classmethod def _load_base( - cls, resource: dict[str, Any], result_set_class: type[ResultSetExpressionBase], select_class: type[SelectBase] + cls, + resource: dict[str, Any], + result_set_class: type[_T_ResultSetExpression], + select_class: type[_T_Select], ) -> Self: parameters = dict(resource["parameters"].items()) if "parameters" in resource else {} cursors = dict(resource["cursors"].items()) if "cursors" in resource else {} @@ -176,7 +183,7 @@ def _load_base( @dataclass -class Query(QueryBase): +class Query(QueryBase["ResultSetExpression", Select]): r"""Query allows you to do advanced queries on the data model. Args: @@ -186,16 +193,13 @@ class Query(QueryBase): cursors (Mapping[str, str | None] | None): A dictionary of cursors to use in the query. These allow for pagination. """ - with_: Mapping[str, ResultSetExpression] - select: Mapping[str, Select] - @classmethod def _load(cls, resource: dict[str, Any], cognite_client: AsyncCogniteClient | None = None) -> Self: return cls._load_base(resource, ResultSetExpression, Select) @dataclass -class QuerySync(QueryBase): +class QuerySync(QueryBase["ResultSetExpressionSync", SelectSync]): r"""Sync allows you to do subscribe to changes in instances. Args: @@ -205,9 +209,6 @@ class QuerySync(QueryBase): cursors (Mapping[str, str | None] | None): A dictionary of cursors to use in the query. These allow for pagination. """ - with_: Mapping[str, ResultSetExpressionSync] - select: Mapping[str, SelectSync] - @classmethod def _load(cls, resource: dict[str, Any], cognite_client: AsyncCogniteClient | None = None) -> Self: return cls._load_base(resource, ResultSetExpressionSync, SelectSync) From 221ae7f16f64e3e8efc328e778f0602a3624d67f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Tue, 9 Dec 2025 16:14:16 +0100 Subject: [PATCH 06/18] fix broken imports --- cognite/client/data_classes/data_modeling/__init__.py | 2 ++ cognite/client/data_classes/datapoints.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/cognite/client/data_classes/data_modeling/__init__.py b/cognite/client/data_classes/data_modeling/__init__.py index 22f5f8854b..daf9abb86e 100644 --- a/cognite/client/data_classes/data_modeling/__init__.py +++ b/cognite/client/data_classes/data_modeling/__init__.py @@ -2,6 +2,7 @@ from cognite.client.data_classes import aggregations, filters from cognite.client.data_classes.aggregations import AggregatedValue, Aggregation +from cognite.client.data_classes.data_modeling import query from cognite.client.data_classes.data_modeling.containers import ( BTreeIndex, BTreeIndexApply, @@ -255,4 +256,5 @@ "ViewList", "aggregations", "filters", + "query", ] diff --git a/cognite/client/data_classes/datapoints.py b/cognite/client/data_classes/datapoints.py index 06d3799053..a2461daeb5 100644 --- a/cognite/client/data_classes/datapoints.py +++ b/cognite/client/data_classes/datapoints.py @@ -21,6 +21,7 @@ from cognite.client.data_classes.datapoint_aggregates import ( _INT_AGGREGATES_CAMEL, ALL_SORTED_DP_AGGS, + Aggregate, ) from cognite.client.utils import _json_extended as _json from cognite.client.utils._auxiliary import find_duplicates @@ -54,7 +55,6 @@ from cognite.client import AsyncCogniteClient from cognite.client._api.datapoint_tasks import BaseTaskOrchestrator - from cognite.client.data_classes.datapoint_aggregates import Aggregate NumpyDatetime64NSArray: TypeAlias = npt.NDArray[np.datetime64] NumpyUInt32Array: TypeAlias = npt.NDArray[np.uint32] From ee70480e29008acddde3c397b59237cf44ba3768 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Tue, 9 Dec 2025 16:36:34 +0100 Subject: [PATCH 07/18] add Average aggregation alias. cleanup docstr examples --- cognite/client/_api/data_modeling/instances.py | 10 ++++++---- .../client/_sync_api/data_modeling/instances.py | 12 +++++++----- cognite/client/data_classes/aggregations.py | 14 ++++++++++---- .../test_api/test_data_modeling/test_instances.py | 8 ++++---- 4 files changed, 27 insertions(+), 17 deletions(-) diff --git a/cognite/client/_api/data_modeling/instances.py b/cognite/client/_api/data_modeling/instances.py index 550c5f2c07..a41461bf94 100644 --- a/cognite/client/_api/data_modeling/instances.py +++ b/cognite/client/_api/data_modeling/instances.py @@ -1343,10 +1343,11 @@ async def aggregate( Get the average run time in minutes for pumps grouped by release year: >>> from cognite.client import CogniteClient - >>> from cognite.client.data_classes.data_modeling import ViewId, aggregations as aggs + >>> from cognite.client.data_classes.aggregations import Average + >>> from cognite.client.data_classes.data_modeling import ViewId >>> client = CogniteClient() >>> # async_client = AsyncCogniteClient() # another option - >>> avg_run_time = aggs.Avg("runTimeMinutes") + >>> avg_run_time = Average("runTimeMinutes") >>> view_id = ViewId("mySpace", "PumpView", "v1") >>> res = client.data_modeling.instances.aggregate(view_id, avg_run_time, group_by="releaseYear") @@ -1450,10 +1451,11 @@ async def histogram( Find the number of people born per decade: >>> from cognite.client import CogniteClient - >>> from cognite.client.data_classes.data_modeling import aggregations as aggs, ViewId + >>> from cognite.client.data_classes.aggregations import Histogram + >>> from cognite.client.data_classes.data_modeling import ViewId >>> client = CogniteClient() >>> # async_client = AsyncCogniteClient() # another option - >>> birth_by_decade = aggs.Histogram("birthYear", interval=10.0) + >>> birth_by_decade = Histogram("birthYear", interval=10.0) >>> view_id = ViewId("mySpace", "PersonView", "v1") >>> res = client.data_modeling.instances.histogram(view_id, birth_by_decade) """ diff --git a/cognite/client/_sync_api/data_modeling/instances.py b/cognite/client/_sync_api/data_modeling/instances.py index dd3349742f..c6147416cb 100644 --- a/cognite/client/_sync_api/data_modeling/instances.py +++ b/cognite/client/_sync_api/data_modeling/instances.py @@ -1,6 +1,6 @@ """ =============================================================================== -88242160659a9c1d60527e2489a46239 +0a2523efb8b74a7a5ff6a1141b11bf09 This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ @@ -906,10 +906,11 @@ def aggregate( Get the average run time in minutes for pumps grouped by release year: >>> from cognite.client import CogniteClient - >>> from cognite.client.data_classes.data_modeling import ViewId, aggregations as aggs + >>> from cognite.client.data_classes.aggregations import Average + >>> from cognite.client.data_classes.data_modeling import ViewId >>> client = CogniteClient() >>> # async_client = AsyncCogniteClient() # another option - >>> avg_run_time = aggs.Avg("runTimeMinutes") + >>> avg_run_time = Average("runTimeMinutes") >>> view_id = ViewId("mySpace", "PumpView", "v1") >>> res = client.data_modeling.instances.aggregate(view_id, avg_run_time, group_by="releaseYear") """ @@ -990,10 +991,11 @@ def histogram( Find the number of people born per decade: >>> from cognite.client import CogniteClient - >>> from cognite.client.data_classes.data_modeling import aggregations as aggs, ViewId + >>> from cognite.client.data_classes.aggregations import Histogram + >>> from cognite.client.data_classes.data_modeling import ViewId >>> client = CogniteClient() >>> # async_client = AsyncCogniteClient() # another option - >>> birth_by_decade = aggs.Histogram("birthYear", interval=10.0) + >>> birth_by_decade = Histogram("birthYear", interval=10.0) >>> view_id = ViewId("mySpace", "PersonView", "v1") >>> res = client.data_modeling.instances.histogram(view_id, birth_by_decade) """ diff --git a/cognite/client/data_classes/aggregations.py b/cognite/client/data_classes/aggregations.py index 8164e02966..b9e0f48db2 100644 --- a/cognite/client/data_classes/aggregations.py +++ b/cognite/client/data_classes/aggregations.py @@ -33,7 +33,7 @@ class Aggregation(CogniteObject, ABC): @classmethod def _load(cls, resource: dict[str, Any], cognite_client: AsyncCogniteClient | None = None) -> Aggregation: if "avg" in resource: - return Avg(property=resource["avg"]["property"]) + return Average(property=resource["avg"]["property"]) elif "count" in resource: return Count(property=resource["count"]["property"]) elif "max" in resource: @@ -59,10 +59,13 @@ class MetricAggregation(Aggregation, ABC): ... @final @dataclass -class Avg(MetricAggregation): +class Average(MetricAggregation): _aggregation_name = "avg" +Avg = Average # Backwards-compatible alias + + @final @dataclass class Count(MetricAggregation): @@ -113,7 +116,7 @@ def _load(cls, resource: dict[str, Any], cognite_client: AsyncCogniteClient | No match aggregate: case "avg": - return AvgValue(resource["property"], resource.get("value")) + return AverageValue(resource["property"], resource.get("value")) case "count": return CountValue(resource["property"], resource.get("value")) case "max": @@ -150,10 +153,13 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: @final @dataclass -class AvgValue(AggregatedNumberedValue): +class AverageValue(AggregatedNumberedValue): _aggregate: ClassVar[str] = "avg" +AvgValue = AverageValue # Backwards-compatible alias + + @final @dataclass class CountValue(AggregatedNumberedValue): diff --git a/tests/tests_integration/test_api/test_data_modeling/test_instances.py b/tests/tests_integration/test_api/test_data_modeling/test_instances.py index fc318931c5..b139c19370 100644 --- a/tests/tests_integration/test_api/test_data_modeling/test_instances.py +++ b/tests/tests_integration/test_api/test_data_modeling/test_instances.py @@ -942,7 +942,7 @@ def test_aggregate_histogram_across_nodes(self, cognite_client: CogniteClient, p def test_aggregate_with_grouping(self, cognite_client: CogniteClient, movie_view: View) -> None: view_id = movie_view.as_id() - avg_agg = aggregations.Avg("runTimeMinutes") + avg_agg = aggregations.Average("runTimeMinutes") max_agg = aggregations.Max("runTimeMinutes") result = cognite_client.data_modeling.instances.aggregate( @@ -954,7 +954,7 @@ def test_aggregate_with_grouping(self, cognite_client: CogniteClient, movie_view def test_aggregate_multiple(self, cognite_client: CogniteClient, movie_view: View) -> None: view_id = movie_view.as_id() - avg_agg = aggregations.Avg("runTimeMinutes") + avg_agg = aggregations.Average("runTimeMinutes") max_agg = aggregations.Max("runTimeMinutes") result = cognite_client.data_modeling.instances.aggregate( @@ -965,7 +965,7 @@ def test_aggregate_multiple(self, cognite_client: CogniteClient, movie_view: Vie assert result[0].property == "runTimeMinutes" assert result[1].property == "runTimeMinutes" max_value = next((item for item in result if isinstance(item, aggregations.MaxValue)), None) - avg_value = next((item for item in result if isinstance(item, aggregations.AvgValue)), None) + avg_value = next((item for item in result if isinstance(item, aggregations.AverageValue)), None) assert max_value is not None assert avg_value is not None assert isinstance(max_value.value, float) @@ -1145,7 +1145,7 @@ def test_aggregate_in_units( aggregated = cognite_client.data_modeling.instances.aggregate( view=unit_view.as_id(), - aggregates=[aggregations.Avg("pressure")], + aggregates=[aggregations.Average("pressure")], target_units=target_units, filter=is_node, ) From 2e4640ee67fa7de6e3bf681e516754fc3738c51c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Mon, 22 Dec 2025 13:11:18 +0100 Subject: [PATCH 08/18] add CogniteSDKResponse to avoid returning httpx.Response --- cognite/client/data_classes/_response.py | 159 +++++++++++++++++++++++ 1 file changed, 159 insertions(+) create mode 100644 cognite/client/data_classes/_response.py diff --git a/cognite/client/data_classes/_response.py b/cognite/client/data_classes/_response.py new file mode 100644 index 0000000000..0e86582bed --- /dev/null +++ b/cognite/client/data_classes/_response.py @@ -0,0 +1,159 @@ +from __future__ import annotations + +import typing +from collections.abc import AsyncIterator, Iterator +from datetime import timedelta +from typing import Any, Final + +# We import the specific types used by httpx for accurate type hinting +from httpx import URL, Cookies, Headers, HTTPStatusError +from httpx import Request as HttpxRequest +from httpx import Response as HttpxResponse + + +class CogniteSDKResponse: # Sadly, CogniteResponse is taken + """ + A wrapper class for httpx.Response to isolate the SDK from the + underlying HTTP library's public interface. + """ + + # Every single response is wrapped in this class, so squeeze out every bit of perf: + __slots__ = ("_json_cache", "_response") + + def __init__(self, response: HttpxResponse) -> None: + self._response: Final[HttpxResponse] = response + + @property + def httpx_response(self) -> HttpxResponse: + """ + Direct access to the Response object from the underlying http library (currently httpx). + + Disclaimer: Usage is neither backwards- nor forwards-compatible. + """ + return self._response + + @property + def status_code(self) -> int: + return self._response.status_code + + @property + def reason_phrase(self) -> str: + return self._response.reason_phrase + + @property + def http_version(self) -> str: + return self._response.http_version + + @property + def url(self) -> URL: + return self._response.url + + @property + def headers(self) -> Headers: + return self._response.headers + + @property + def cookies(self) -> Cookies: + # Note: httpx.Response.cookies returns httpx.Cookies, which is a wrapper around SimpleCookie + return self._response.cookies + + @property + def history(self) -> list[CogniteSDKResponse]: + return [CogniteSDKResponse(r) for r in self._response.history] + + @property + def request(self) -> HttpxRequest: + return self._response.request + + @property + def content(self) -> bytes: + return self._response.content + + @property + def text(self) -> str: + return self._response.text + + @property + def encoding(self) -> str | None: + return self._response.encoding + + @encoding.setter + def encoding(self, value: str) -> None: + self._response.encoding = value + + @property + def is_success(self) -> bool: + return self._response.is_success + + @property + def is_error(self) -> bool: + return self._response.is_error + + @property + def elapsed(self) -> timedelta: + return self._response.elapsed + + @property + def next_request(self) -> HttpxRequest | None: + return self._response.next_request + + @property + def stream(self) -> typing.Any: + # 'stream' typing is tricky; we leave it at Any as users should consume from the stream + # using the provided methods like iter_bytes, iter_lines, iter_text, etc. + return self._response.stream + + @property + def num_bytes_downloaded(self) -> int: + return self._response.num_bytes_downloaded + + def json(self) -> Any: + try: + return self._json_cache + except AttributeError: + self._json_cache = self._response.json() + return self._json_cache + + def raise_for_status(self) -> CogniteSDKResponse: + """ + Raises a CogniteHTTPStatusError if the response status code is 4xx or 5xx. + If successful (2xx), returns the response object (self) for chaining. + """ + from cognite.client.exceptions import CogniteHTTPStatusError + + try: + self._response.raise_for_status() + return self + except HTTPStatusError: + raise CogniteHTTPStatusError(self.status_code, request=self.request, response=self) from None + + def iter_bytes(self, chunk_size: int | None = None) -> Iterator[bytes]: + return self._response.iter_bytes(chunk_size) + + def read(self) -> bytes: + return self._response.read() + + def close(self) -> None: + self._response.close() + + async def aiter_bytes(self, chunk_size: int | None = None) -> AsyncIterator[bytes]: + async for chunk in self._response.aiter_bytes(chunk_size): + yield chunk + + async def aiter_text(self, chunk_size: int | None = None) -> AsyncIterator[str]: + async for text in self._response.aiter_text(chunk_size): + yield text + + async def aiter_lines(self) -> AsyncIterator[str]: + async for line in self._response.aiter_lines(): + yield line + + async def aiter_raw(self, chunk_size: int | None = None) -> AsyncIterator[bytes]: + async for chunk in self._response.aiter_raw(chunk_size): + yield chunk + + async def aread(self) -> bytes: + return await self._response.aread() + + async def aclose(self) -> None: + await self._response.aclose() From 7cc94ea6b9ab6388e2c790034263e326a370cd78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Mon, 22 Dec 2025 21:36:19 +0100 Subject: [PATCH 09/18] change http client to return CogniteSDKResponse --- cognite/client/_basic_api_client.py | 15 ++++++++------- cognite/client/_cognite_client.py | 8 ++++---- cognite/client/_http_client.py | 10 +++++----- cognite/client/_sync_cognite_client.py | 9 ++++----- cognite/client/utils/_auxiliary.py | 5 ++--- .../sync_client_codegen/sync_client_template.txt | 7 ++++--- .../test_api/test_simulators/utils.py | 5 ++--- 7 files changed, 29 insertions(+), 30 deletions(-) diff --git a/cognite/client/_basic_api_client.py b/cognite/client/_basic_api_client.py index 0a97ecd33a..7af0d1f2bb 100644 --- a/cognite/client/_basic_api_client.py +++ b/cognite/client/_basic_api_client.py @@ -30,6 +30,7 @@ if TYPE_CHECKING: from cognite.client import AsyncCogniteClient from cognite.client.config import ClientConfig + from cognite.client.data_classes._response import CogniteSDKResponse logger = logging.getLogger(__name__) @@ -230,7 +231,7 @@ async def _request( timeout: float | None = None, include_cdf_headers: bool = False, api_subversion: str | None = None, - ) -> httpx.Response: + ) -> CogniteSDKResponse: """ Make a request to something that is outside Cognite Data Fusion, with retry enabled. Requires the caller to handle errors coming from non-2xx response status codes. @@ -245,7 +246,7 @@ async def _request( api_subversion (str | None): When include_cdf_headers=True, override the API subversion to use for the request. Has no effect otherwise. Returns: - httpx.Response: The response from the server. + CogniteSDKResponse: The response from the server. Raises: httpx.HTTPStatusError: If the response status code is 4xx or 5xx. @@ -278,7 +279,7 @@ async def _stream( full_headers: dict[str, Any] | None = None, timeout: float | None = None, api_subversion: str | None = None, - ) -> AsyncIterator[httpx.Response]: + ) -> AsyncIterator[CogniteSDKResponse]: assert url_path or full_url, "Either url_path or full_url must be provided" full_url = full_url or resolve_url(self, method, cast(str, url_path))[1] if full_headers is None: @@ -304,7 +305,7 @@ async def _get( follow_redirects: bool = False, api_subversion: str | None = None, semaphore: asyncio.BoundedSemaphore | None = None, - ) -> httpx.Response: + ) -> CogniteSDKResponse: _, full_url = resolve_url(self, "GET", url_path) full_headers = self._configure_headers(additional_headers=headers, api_subversion=api_subversion) try: @@ -332,7 +333,7 @@ async def _post( follow_redirects: bool = False, api_subversion: str | None = None, semaphore: asyncio.BoundedSemaphore | None = None, - ) -> httpx.Response: + ) -> CogniteSDKResponse: is_retryable, full_url = resolve_url(self, "POST", url_path) full_headers = self._configure_headers(additional_headers=headers, api_subversion=api_subversion) # We want to control json dumping, so we pass it along to httpx.Client.post as 'content' @@ -367,7 +368,7 @@ async def _put( api_subversion: str | None = None, timeout: float | None = None, semaphore: asyncio.BoundedSemaphore | None = None, - ) -> httpx.Response: + ) -> CogniteSDKResponse: _, full_url = resolve_url(self, "PUT", url_path) full_headers = self._configure_headers(additional_headers=headers, api_subversion=api_subversion) @@ -425,7 +426,7 @@ async def _handle_status_error( await handler.raise_api_error(self._cognite_client) def _log_successful_request( - self, res: httpx.Response, payload: dict[str, Any] | None = None, stream: bool = False + self, res: CogniteSDKResponse, payload: dict[str, Any] | None = None, stream: bool = False ) -> None: extra: dict[str, Any] = { "headers": self._sanitize_headers(res.request.headers), diff --git a/cognite/client/_cognite_client.py b/cognite/client/_cognite_client.py index 2bff1aab83..a47c809c96 100644 --- a/cognite/client/_cognite_client.py +++ b/cognite/client/_cognite_client.py @@ -36,7 +36,7 @@ from cognite.client.utils._auxiliary import load_resource_to_dict if TYPE_CHECKING: - import httpx + from cognite.client.data_classes._response import CogniteSDKResponse class AsyncCogniteClient: @@ -94,7 +94,7 @@ def __init__(self, config: ClientConfig | None = None) -> None: async def get( self, url: str, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None - ) -> httpx.Response: + ) -> CogniteSDKResponse: """Perform a GET request to an arbitrary path in the API.""" return await self._api_client._get(url, params=params, headers=headers) @@ -104,7 +104,7 @@ async def post( json: dict[str, Any] | None = None, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None, - ) -> httpx.Response: + ) -> CogniteSDKResponse: """Perform a POST request to an arbitrary path in the API.""" return await self._api_client._post(url, json=json, params=params, headers=headers) @@ -114,7 +114,7 @@ async def put( json: dict[str, Any] | None = None, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None, - ) -> httpx.Response: + ) -> CogniteSDKResponse: """Perform a PUT request to an arbitrary path in the API.""" return await self._api_client._put(url, json=json, params=params, headers=headers) diff --git a/cognite/client/_http_client.py b/cognite/client/_http_client.py index 5d80c7e35f..e37a6bf79d 100644 --- a/cognite/client/_http_client.py +++ b/cognite/client/_http_client.py @@ -199,7 +199,7 @@ async def request( follow_redirects: bool = False, timeout: float | None = None, semaphore: asyncio.BoundedSemaphore | None = None, - ) -> httpx.Response: + ) -> CogniteSDKResponse: def coro_factory() -> HTTPResponseCoro: return self.httpx_async_client.request( method, @@ -225,7 +225,7 @@ async def stream( json: Any = None, headers: MutableMapping[str, str] | None = None, timeout: float | None = None, - ) -> AsyncIterator[httpx.Response]: + ) -> AsyncIterator[CogniteSDKResponse]: # This method is basically a clone of httpx.AsyncClient.stream() so that we may add our own retry logic. def coro_factory() -> HTTPResponseCoro: request = self.httpx_async_client.build_request( @@ -233,7 +233,7 @@ def coro_factory() -> HTTPResponseCoro: ) return self.httpx_async_client.send(request, stream=True) - response: httpx.Response | None = None + response: CogniteSDKResponse | None = None try: yield (response := await self._with_retry(coro_factory, url=url, headers=headers)) finally: @@ -247,7 +247,7 @@ async def _with_retry( url: str, headers: MutableMapping[str, str] | None, semaphore: asyncio.BoundedSemaphore | None = None, - ) -> httpx.Response: + ) -> CogniteSDKResponse: if semaphore is None: # By default, we run with a semaphore decided by user settings of 'max_workers' in 'global_config'. # Since the user can run any number of SDK tasks concurrently, this needs to be global: @@ -266,7 +266,7 @@ async def _with_retry( if accepts_json: # Cache .json() return value in order to avoid redecoding JSON if called multiple times response.json = functools.cache(response.json) # type: ignore [method-assign] - return response.raise_for_status() + return CogniteSDKResponse(response.raise_for_status()) except httpx.HTTPStatusError as err: response = err.response diff --git a/cognite/client/_sync_cognite_client.py b/cognite/client/_sync_cognite_client.py index ccb39d6fe5..aa1bc178d5 100644 --- a/cognite/client/_sync_cognite_client.py +++ b/cognite/client/_sync_cognite_client.py @@ -8,8 +8,6 @@ from typing import TYPE_CHECKING, Any -import httpx - from cognite.client import AsyncCogniteClient from cognite.client._sync_api.agents.agents import SyncAgentsAPI from cognite.client._sync_api.ai import SyncAIAPI @@ -45,6 +43,7 @@ if TYPE_CHECKING: from cognite.client import ClientConfig + from cognite.client.data_classes._response import CogniteSDKResponse class CogniteClient: @@ -92,7 +91,7 @@ def __init__(self, config: ClientConfig | None = None) -> None: def get( self, url: str, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None - ) -> httpx.Response: + ) -> CogniteSDKResponse: """Perform a GET request to an arbitrary path in the API.""" return run_sync(self.__async_client.get(url, params=params, headers=headers)) @@ -102,7 +101,7 @@ def post( json: dict[str, Any] | None = None, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None, - ) -> httpx.Response: + ) -> CogniteSDKResponse: """Perform a POST request to an arbitrary path in the API.""" return run_sync(self.__async_client.post(url, json=json, params=params, headers=headers)) @@ -112,7 +111,7 @@ def put( json: dict[str, Any] | None = None, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None, - ) -> httpx.Response: + ) -> CogniteSDKResponse: """Perform a PUT request to an arbitrary path in the API.""" return run_sync(self.__async_client.put(url, json=json, params=params, headers=headers)) diff --git a/cognite/client/utils/_auxiliary.py b/cognite/client/utils/_auxiliary.py index 0043f760ff..086a4e1ccc 100644 --- a/cognite/client/utils/_auxiliary.py +++ b/cognite/client/utils/_auxiliary.py @@ -23,9 +23,8 @@ from cognite.client.utils.useful_types import SequenceNotStr if TYPE_CHECKING: - import httpx - from cognite.client.data_classes._base import T_CogniteResource + from cognite.client.data_classes._response import CogniteSDKResponse T = TypeVar("T") K = TypeVar("K") @@ -200,7 +199,7 @@ def flatten_dict(d: dict[str, Any], parent_keys: tuple[str, ...], sep: str = "." return dict(items) -def unpack_items(res: httpx.Response) -> list[Any]: +def unpack_items(res: CogniteSDKResponse) -> list[Any]: return res.json()["items"] diff --git a/scripts/sync_client_codegen/sync_client_template.txt b/scripts/sync_client_codegen/sync_client_template.txt index 60b69f5edd..a35bf5d0de 100644 --- a/scripts/sync_client_codegen/sync_client_template.txt +++ b/scripts/sync_client_codegen/sync_client_template.txt @@ -16,6 +16,7 @@ from cognite.client.utils._async_helpers import run_sync if TYPE_CHECKING: from cognite.client import ClientConfig + from cognite.client.data_classes._response import CogniteSDKResponse class CogniteClient: @@ -36,7 +37,7 @@ class CogniteClient: def get( self, url: str, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None - ) -> httpx.Response: + ) -> CogniteSDKResponse: """Perform a GET request to an arbitrary path in the API.""" return run_sync(self.__async_client.get(url, params=params, headers=headers)) @@ -46,7 +47,7 @@ class CogniteClient: json: dict[str, Any] | None = None, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None, - ) -> httpx.Response: + ) -> CogniteSDKResponse: """Perform a POST request to an arbitrary path in the API.""" return run_sync(self.__async_client.post(url, json=json, params=params, headers=headers)) @@ -56,7 +57,7 @@ class CogniteClient: json: dict[str, Any] | None = None, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None, - ) -> httpx.Response: + ) -> CogniteSDKResponse: """Perform a PUT request to an arbitrary path in the API.""" return run_sync(self.__async_client.put(url, json=json, params=params, headers=headers)) diff --git a/tests/tests_integration/test_api/test_simulators/utils.py b/tests/tests_integration/test_api/test_simulators/utils.py index 482e9b477a..d85e17164b 100644 --- a/tests/tests_integration/test_api/test_simulators/utils.py +++ b/tests/tests_integration/test_api/test_simulators/utils.py @@ -1,10 +1,9 @@ from __future__ import annotations -import httpx - from cognite.client import AsyncCogniteClient +from cognite.client.data_classes._response import CogniteSDKResponse -async def update_logs(async_client: AsyncCogniteClient, log_id: int, payload: list[dict]) -> httpx.Response: +async def update_logs(async_client: AsyncCogniteClient, log_id: int, payload: list[dict]) -> CogniteSDKResponse: items = {"items": [{"id": log_id, "update": {"data": {"add": payload}}}]} return await async_client.simulators._post("/simulators/logs/update", json=items) From 774a4bbaced42804edefb4dbbc8f385a70b11b5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Mon, 22 Dec 2025 13:12:03 +0100 Subject: [PATCH 10/18] add CogniteHTTPStatusError to avoid raising httpx.HTTPStatusError --- cognite/client/exceptions.py | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/cognite/client/exceptions.py b/cognite/client/exceptions.py index 61f4892752..d9f4e33ad2 100644 --- a/cognite/client/exceptions.py +++ b/cognite/client/exceptions.py @@ -3,6 +3,8 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, Any +from httpx import Request as HttpxRequest + from cognite.client._constants import _RUNNING_IN_BROWSER from cognite.client.utils import _json_extended as _json from cognite.client.utils._async_helpers import async_timed_cache @@ -11,6 +13,7 @@ if TYPE_CHECKING: from cognite.client import AsyncCogniteClient from cognite.client.data_classes import AssetHierarchy + from cognite.client.data_classes._response import CogniteSDKResponse class CogniteException(Exception): @@ -101,6 +104,32 @@ class CogniteRequestError(CogniteException): pass +class CogniteHTTPStatusError(CogniteRequestError): + """Raised when an HTTP status code indicates an error (4xx or 5xx).""" + + def __init__(self, status_code: int, *, request: HttpxRequest, response: CogniteSDKResponse) -> None: + super().__init__(f"HTTP {status_code} Error") + self.status_code = status_code + self.request = request + self.response = response + + @staticmethod + def get_error_type(status_code: int) -> str: + # Shamelessly stolen from httpx (we don't use their message directly as it is way too long) + return { + 1: "Informational response", + 3: "Redirect response", + 4: "Client error", + 5: "Server error", + }.get(status_code // 100, "Invalid status code") + + def __str__(self) -> str: + return ( + f"{self.get_error_type(self.status_code)} '{self.status_code} {self.response.reason_phrase}' " + f"for url '{self.request.url}'" + ) + + class CogniteConnectionError(CogniteRequestError): pass @@ -115,11 +144,7 @@ class CogniteReadTimeout(CogniteRequestError): class CogniteFileUploadError(CogniteException): - def __init__( - self, - message: str, - code: int, - ) -> None: + def __init__(self, message: str, code: int) -> None: self.message = message self.code = code From e7d16880f448f4d461bef574f5e14d1a591bd7f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Mon, 22 Dec 2025 16:11:51 +0100 Subject: [PATCH 11/18] raise CogniteHTTPStatusError from http client (not httpx) --- cognite/client/_http_client.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/cognite/client/_http_client.py b/cognite/client/_http_client.py index e37a6bf79d..349f9e023d 100644 --- a/cognite/client/_http_client.py +++ b/cognite/client/_http_client.py @@ -21,6 +21,7 @@ import httpx from cognite.client.config import global_config +from cognite.client.data_classes._response import CogniteSDKResponse from cognite.client.exceptions import ( CogniteConnectionError, CogniteConnectionRefused, @@ -263,6 +264,7 @@ async def _with_retry( if headers is not None: self.refresh_auth_header(headers) response = await coro_factory() + if accepts_json: # Cache .json() return value in order to avoid redecoding JSON if called multiple times response.json = functools.cache(response.json) # type: ignore [method-assign] @@ -271,8 +273,12 @@ async def _with_retry( except httpx.HTTPStatusError as err: response = err.response is_auto_retryable = response.headers.get("cdf-is-auto-retryable", False) - if not retry_tracker.should_retry_status_code(response.status_code, is_auto_retryable): - raise + if not retry_tracker.should_retry_status_code(err, is_auto_retryable): + raise CogniteHTTPStatusError( + response.status_code, + request=err.request, + response=CogniteSDKResponse(response), + ) from None except httpx.ConnectError as err: if not retry_tracker.should_retry_connect_error(err): From 5bfd5585f7cec0fd5f5a83406ce1f102382c0a70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Mon, 22 Dec 2025 16:14:15 +0100 Subject: [PATCH 12/18] update API classes to handle CogniteHTTPStatusError --- cognite/client/_basic_api_client.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/cognite/client/_basic_api_client.py b/cognite/client/_basic_api_client.py index 7af0d1f2bb..f2a8be543f 100644 --- a/cognite/client/_basic_api_client.py +++ b/cognite/client/_basic_api_client.py @@ -19,6 +19,7 @@ from cognite.client.exceptions import ( CogniteAPIError, CogniteDuplicatedError, + CogniteHTTPStatusError, CogniteNotFoundError, CogniteProjectAccessError, ) @@ -46,7 +47,7 @@ class FailedRequestHandler: headers: dict[str, str] | httpx.Headers response_headers: dict[str, str] | httpx.Headers extra: dict[str, Any] - cause: httpx.HTTPStatusError + cause: CogniteHTTPStatusError stream: bool def __post_init__(self) -> None: @@ -54,7 +55,7 @@ def __post_init__(self) -> None: self.response_headers = BasicAsyncAPIClient._sanitize_headers(self.response_headers) @classmethod - async def from_status_error(cls, err: httpx.HTTPStatusError, stream: bool) -> Self: + async def from_status_error(cls, err: CogniteHTTPStatusError, stream: bool) -> Self: response = err.response error, missing, duplicated = {}, None, None @@ -135,7 +136,7 @@ async def _raise_no_project_access_error( x_request_id=self.x_request_id, maybe_projects=maybe_projects, cluster=cluster, - ) from None # we don't surface the underlying httpx.HTTPStatusError + ) from None # we don't surface the underlying CogniteHTTPStatusError def _raise_api_error(self, err_type: type[CogniteAPIError], cluster: str | None, project: str) -> NoReturn: raise err_type( @@ -249,7 +250,7 @@ async def _request( CogniteSDKResponse: The response from the server. Raises: - httpx.HTTPStatusError: If the response status code is 4xx or 5xx. + CogniteHTTPStatusError: If the response status code is 4xx or 5xx. """ http_client = self._select_async_http_client(method in {"GET", "PUT", "HEAD"}) if include_cdf_headers: @@ -261,7 +262,7 @@ async def _request( self._log_successful_request(res) return res - except httpx.HTTPStatusError as err: + except CogniteHTTPStatusError as err: handler = await FailedRequestHandler.from_status_error(err, stream=False) handler.log_failed_request() raise @@ -294,7 +295,7 @@ async def _stream( self._log_successful_request(resp, payload=json, stream=True) yield resp - except httpx.HTTPStatusError as err: + except CogniteHTTPStatusError as err: await self._handle_status_error(err, payload=json, stream=True) async def _get( @@ -318,7 +319,7 @@ async def _get( timeout=self._config.timeout, semaphore=semaphore, ) - except httpx.HTTPStatusError as err: + except CogniteHTTPStatusError as err: await self._handle_status_error(err) self._log_successful_request(res) @@ -351,7 +352,7 @@ async def _post( timeout=self._config.timeout, semaphore=semaphore, ) - except httpx.HTTPStatusError as err: + except CogniteHTTPStatusError as err: await self._handle_status_error(err, payload=json) self._log_successful_request(res, payload=json) @@ -385,7 +386,7 @@ async def _put( timeout=timeout or self._config.timeout, semaphore=semaphore, ) - except httpx.HTTPStatusError as err: + except CogniteHTTPStatusError as err: await self._handle_status_error(err, payload=json) self._log_successful_request(res, payload=json) @@ -418,7 +419,7 @@ def _refresh_auth_header(self, headers: MutableMapping[str, Any]) -> None: headers[auth_header_name] = auth_header_value async def _handle_status_error( - self, error: httpx.HTTPStatusError, payload: dict[str, Any] | None = None, stream: bool = False + self, error: CogniteHTTPStatusError, payload: dict[str, Any] | None = None, stream: bool = False ) -> NoReturn: """The response had an HTTP status code of 4xx or 5xx""" handler = await FailedRequestHandler.from_status_error(error, stream=stream) From 0152e3e92c91445bae40dace8a41f2f062c71a6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Mon, 22 Dec 2025 16:09:11 +0100 Subject: [PATCH 13/18] overhaul RetryTracker and add logging on retries --- cognite/client/_http_client.py | 48 ++++++++++++----- tests/tests_unit/test_http_client.py | 79 +++++++++++++++++++--------- 2 files changed, 87 insertions(+), 40 deletions(-) diff --git a/cognite/client/_http_client.py b/cognite/client/_http_client.py index 349f9e023d..606da0b716 100644 --- a/cognite/client/_http_client.py +++ b/cognite/client/_http_client.py @@ -25,6 +25,7 @@ from cognite.client.exceptions import ( CogniteConnectionError, CogniteConnectionRefused, + CogniteHTTPStatusError, CogniteReadTimeout, CogniteRequestError, ) @@ -128,24 +129,41 @@ def max_retries_connect(self) -> int: class RetryTracker: - def __init__(self, config: AsyncHTTPClientWithRetryConfig) -> None: + def __init__(self, url: str, config: AsyncHTTPClientWithRetryConfig) -> None: + self.url = url self.config = config self.status = self.read = self.connect = 0 - self.last_failed_reason = "" + self.retry_causes: list[str] = [] + self.total_time_in_backoff = 0.0 @property def total(self) -> int: return self.status + self.read + self.connect + @property + def last_failed_reason(self) -> str | None: + if self.retry_causes: + return self.retry_causes[-1] + return None + def get_backoff_time(self) -> float: - backoff_time = self.config.backoff_factor * 2**self.total - return random.random() * min(backoff_time, self.config.max_backoff_seconds) + """ + Computes the randomized delay (backoff time) before the next retry attempt. + + This method implements the **Exponential Backoff with Full Jitter** strategy, + with one addition, a small "initial floor" to avoid immediate retries. + """ + base = self.config.backoff_factor * 2**self.total + cap = min(base, self.config.max_backoff_seconds) + return 0.1 + random.uniform(0, cap) - def back_off(self, url: str) -> None: + def back_off(self) -> None: backoff_time = self.get_backoff_time() + self.total_time_in_backoff += backoff_time + # We put logging here, since this is always called before retrying logger.debug( - f"Retrying failed request, attempt #{self.total}, backoff time: {backoff_time=:.4f} sec, " - f"reason: {self.last_failed_reason!r}, url: {url}" + f"Retrying failed request, attempt #{self.total}, backoff wait: {backoff_time:.4f} sec " + f"(total: {self.total_time_in_backoff:.4f} sec), reason: {self.last_failed_reason!r}, url: {self.url}" ) time.sleep(backoff_time) @@ -155,23 +173,25 @@ def should_retry_total(self) -> bool: # one of [status, read, connect] += 1. Said differently, do last retry when 'total = max': return self.total <= self.config.max_retries_total - def should_retry_status_code(self, status_code: int, is_auto_retryable: bool = False) -> bool: + def should_retry_status_code(self, err: httpx.HTTPStatusError, is_auto_retryable: bool = False) -> bool: self.status += 1 - self.last_failed_reason = f"{status_code=}" + status_code = err.response.status_code + error_type = CogniteHTTPStatusError.get_error_type(status_code) + self.retry_causes.append(f"HTTPStatusError({status_code} - {error_type}: {err.response.reason_phrase})") return ( self.should_retry_total and self.status <= self.config.max_retries_status and (is_auto_retryable or status_code in self.config.status_codes_to_retry) ) - def should_retry_connect_error(self, error: httpx.RequestError) -> bool: + def should_retry_connect_error(self, error: httpx.TransportError | httpx.DecodingError) -> bool: self.connect += 1 - self.last_failed_reason = type(error).__name__ + self.retry_causes.append(f"{type(error).__name__}({error!s})") return self.should_retry_total and self.connect <= self.config.max_retries_connect - def should_retry_timeout(self, error: httpx.RequestError) -> bool: + def should_retry_timeout(self, error: httpx.TimeoutException) -> bool: self.read += 1 - self.last_failed_reason = type(error).__name__ + self.retry_causes.append(f"{type(error).__name__}({error!s})") return self.should_retry_total and self.read <= self.config.max_retries_read @@ -255,7 +275,7 @@ async def _with_retry( semaphore = get_global_semaphore() is_auto_retryable = False - retry_tracker = RetryTracker(self.config) + retry_tracker = RetryTracker(url, self.config) accepts_json = (headers or {}).get("accept") == "application/json" while True: try: diff --git a/tests/tests_unit/test_http_client.py b/tests/tests_unit/test_http_client.py index 85b009161f..3947027522 100644 --- a/tests/tests_unit/test_http_client.py +++ b/tests/tests_unit/test_http_client.py @@ -19,64 +19,91 @@ def default_config() -> AsyncHTTPClientWithRetryConfig: ) +URL = "https://example.com" + + +def make_http_status_error(status_code: int) -> httpx.HTTPStatusError: + request = httpx.Request("GET", URL) + response = httpx.Response(status_code=status_code, request=request) + return httpx.HTTPStatusError(f"Error {status_code}", request=request, response=response) + + +@pytest.fixture +def timeout_error() -> httpx.TimeoutException: + return httpx.ReadTimeout("read timeout") + + +@pytest.fixture +def connect_error() -> httpx.ConnectError: + return httpx.ConnectError("connection error") + + @pytest.fixture -def example_error() -> httpx.RequestError: - return httpx.RequestError("nice error") +def status_error_429() -> httpx.HTTPStatusError: + return make_http_status_error(429) class TestRetryTracker: - def test_total_retries_exceeded(self, default_config: AsyncHTTPClientWithRetryConfig) -> None: + def test_total_retries_exceeded( + self, default_config: AsyncHTTPClientWithRetryConfig, status_error_429: httpx.HTTPStatusError + ) -> None: default_config._max_retries_total = 10 - rt = RetryTracker(default_config) + rt = RetryTracker(URL, default_config) rt.status = 4 rt.connect = 4 rt.read = 4 assert rt.total == 12 assert rt.should_retry_total is False - assert rt.should_retry_status_code(429) is False + assert rt.should_retry_status_code(status_error_429) is False - def test_status_retries_exceeded(self, default_config: AsyncHTTPClientWithRetryConfig) -> None: + def test_status_retries_exceeded( + self, default_config: AsyncHTTPClientWithRetryConfig, status_error_429: httpx.HTTPStatusError + ) -> None: default_config._max_retries_status = 1 - rt = RetryTracker(default_config) + rt = RetryTracker(URL, default_config) assert rt.should_retry_total is True - assert rt.should_retry_status_code(429) is True - assert rt.should_retry_status_code(429) is False - assert rt.last_failed_reason == "status_code=429" + assert rt.should_retry_status_code(status_error_429) is True + assert rt.should_retry_status_code(status_error_429) is False + assert rt.last_failed_reason is not None + assert "429" in rt.last_failed_reason def test_read_retries_exceeded( - self, default_config: AsyncHTTPClientWithRetryConfig, example_error: httpx.RequestError + self, default_config: AsyncHTTPClientWithRetryConfig, timeout_error: httpx.TimeoutException ) -> None: default_config._max_retries_read = 1 - rt = RetryTracker(default_config) - assert rt.should_retry_timeout(example_error) is True - assert rt.should_retry_timeout(example_error) is False - assert rt.last_failed_reason == "RequestError" + rt = RetryTracker(URL, default_config) + assert rt.should_retry_timeout(timeout_error) is True + assert rt.should_retry_timeout(timeout_error) is False + assert rt.last_failed_reason is not None + assert "ReadTimeout" in rt.last_failed_reason def test_connect_retries_exceeded( - self, default_config: AsyncHTTPClientWithRetryConfig, example_error: httpx.RequestError + self, default_config: AsyncHTTPClientWithRetryConfig, connect_error: httpx.ConnectError ) -> None: default_config._max_retries_connect = 1 - rt = RetryTracker(default_config) - assert rt.should_retry_connect_error(example_error) is True - assert rt.should_retry_connect_error(example_error) is False + rt = RetryTracker(URL, default_config) + assert rt.should_retry_connect_error(connect_error) is True + assert rt.should_retry_connect_error(connect_error) is False + assert rt.last_failed_reason is not None + assert "ConnectError" in rt.last_failed_reason def test_correct_retry_of_status(self, default_config: AsyncHTTPClientWithRetryConfig) -> None: - rt = RetryTracker(default_config) - assert rt.should_retry_status_code(500) is False + rt = RetryTracker(URL, default_config) + assert rt.should_retry_status_code(make_http_status_error(500)) is False rt.config.status_codes_to_retry.add(500) - assert rt.should_retry_status_code(500) is True + assert rt.should_retry_status_code(make_http_status_error(500)) is True def test_get_backoff_time(self, default_config: AsyncHTTPClientWithRetryConfig) -> None: - rt = RetryTracker(default_config) + rt = RetryTracker(URL, default_config) for i in range(10): rt.read += 1 assert 0 <= rt.get_backoff_time() <= default_config.max_backoff_seconds def test_is_auto_retryable(self, default_config: AsyncHTTPClientWithRetryConfig) -> None: default_config._max_retries_status = 1 - rt = RetryTracker(default_config) + rt = RetryTracker(URL, default_config) # 409 is not in the list of status codes to retry, but we set is_auto_retryable=True, which should override it - assert rt.should_retry_status_code(409, is_auto_retryable=True) is True - assert rt.should_retry_status_code(409, is_auto_retryable=False) is False + assert rt.should_retry_status_code(make_http_status_error(409), is_auto_retryable=True) is True + assert rt.should_retry_status_code(make_http_status_error(409), is_auto_retryable=False) is False From 7656bfdc1b00c7ebb1072b63fb25a0af24b16bae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Tue, 23 Dec 2025 08:18:59 +0100 Subject: [PATCH 14/18] update documentation on debug logging of request retries --- cognite/client/config.py | 3 ++- docs/source/settings.rst | 13 +++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/cognite/client/config.py b/cognite/client/config.py index d6c5fd87a0..65f7c5ce47 100644 --- a/cognite/client/config.py +++ b/cognite/client/config.py @@ -140,7 +140,8 @@ class ClientConfig: headers (dict[str, str] | None): Additional headers to add to all requests. timeout (int | None): Timeout on requests sent to the api. Defaults to 60 seconds. file_transfer_timeout (int | None): Timeout on file upload/download requests. Defaults to 600 seconds. - debug (bool): Configures logger to log extra request details to stderr. + debug (bool): Enables debug logging to stderr. This includes full request/response details and logs regarding retry + attempts (e.g., on 429 throttling or 5xx errors). """ def __init__( diff --git a/docs/source/settings.rst b/docs/source/settings.rst index da0aa75b8e..e88065686c 100644 --- a/docs/source/settings.rst +++ b/docs/source/settings.rst @@ -56,9 +56,12 @@ You can increase the max connection pool size by setting the :code:`max_connecti Debug logging ------------- -If you need to know the details of the http requests the SDK sends under the hood, you can enable debug logging. One way -is to pass :code:`debug=True` argument to :ref:`ClientConfig `. Alternatively, you can toggle debug -logging on and off by setting the :code:`debug` attribute on the :ref:`ClientConfig ` object. +If you need to inspect the details of the HTTP requests and responses, or monitor the SDK's retry behavior (e.g. during throttling), +you can enable debug logging. + +One way is to pass the :code:`debug=True` argument to :ref:`ClientConfig ` when you instantiate your client. +Alternatively, you can toggle debug logging on or off dynamically by setting the :code:`debug` attribute on the +:ref:`ClientConfig ` object. .. code:: python @@ -73,10 +76,12 @@ logging on and off by setting the :code:`debug` attribute on the :ref:`ClientCon debug=True, ) ) - print(client.config.debug) # True, all http request details will be logged + print(client.config.debug) # True, requests, responses, and retries are logged to stderr client.config.debug = False # disable debug logging client.config.debug = True # enable debug logging again +Note: Large outgoing or incoming payloads will be truncated to 1000 characters in the logs to avoid overwhelming the log output. + HTTP Request logging -------------------- Internally this library uses the ``httpx`` library to perform network calls to the Cognite API service endpoints. For authentication and From c023037184f7e90c2011ae2b087cff065277f81d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Tue, 23 Dec 2025 09:11:26 +0100 Subject: [PATCH 15/18] fix CogniteObject._load docstr and make abstract --- cognite/client/data_classes/_base.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/cognite/client/data_classes/_base.py b/cognite/client/data_classes/_base.py index 56dad2ef9f..cf6cd55050 100644 --- a/cognite/client/data_classes/_base.py +++ b/cognite/client/data_classes/_base.py @@ -147,16 +147,13 @@ def load(cls, resource: dict | str, cognite_client: AsyncCogniteClient | None = return cls._load(loaded, cognite_client=cognite_client) @classmethod + @abstractmethod def _load(cls, resource: dict[str, Any], cognite_client: AsyncCogniteClient | None = None) -> Self: """ - This is the internal load method that is called by the public load method. - It has a default implementation that can be overridden by subclasses. - - The typical use case for overriding this method is to handle nested resources, - or to handle resources that have required fields as the default implementation assumes - all fields are optional. + This is the internal load method that is called by the public load method or directly from + within the SDK when loading resources from the API. - Note that the base class takes care of loading from YAML/JSON strings and error handling. + Subclasses must implement this method to handle their specific resource loading logic. Args: resource (dict[str, Any]): The resource to load. From 493b3359d3005c0c57071553da11a57d8c2eff63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Tue, 23 Dec 2025 10:53:47 +0100 Subject: [PATCH 16/18] fix missing _load implementations --- cognite/client/data_classes/sequences.py | 6 ------ cognite/client/data_classes/three_d.py | 15 +++++++++++---- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/cognite/client/data_classes/sequences.py b/cognite/client/data_classes/sequences.py index 72b58e777f..aa0678155a 100644 --- a/cognite/client/data_classes/sequences.py +++ b/cognite/client/data_classes/sequences.py @@ -66,12 +66,6 @@ def __init__( self.value_type = value_type self.metadata = metadata - @classmethod - def _load(cls, resource: dict, cognite_client: AsyncCogniteClient | None = None) -> Self: - # Snake case is supported for backwards compatibility - resource = convert_all_keys_to_camel_case(resource) - return super()._load(resource, cognite_client) - class SequenceColumn(SequenceColumnCore): """This represents a column in a sequence. It is used for reading only. diff --git a/cognite/client/data_classes/three_d.py b/cognite/client/data_classes/three_d.py index c416a3198f..fabe4635a0 100644 --- a/cognite/client/data_classes/three_d.py +++ b/cognite/client/data_classes/three_d.py @@ -281,10 +281,17 @@ def __init__( @classmethod def _load(cls, resource: dict, cognite_client: AsyncCogniteClient | None = None) -> Self: - instance = super()._load(resource, cognite_client) - if isinstance(instance.camera, dict): - instance.camera = RevisionCameraProperties._load(instance.camera) - return instance + if camera := resource.get("camera"): + camera = RevisionCameraProperties._load(camera) + return cls( + file_id=resource.get("fileId"), + published=resource.get("published"), + rotation=resource.get("rotation"), + scale=resource.get("scale"), + translation=resource.get("translation"), + camera=RevisionCameraProperties._load(resource["camera"]) if "camera" in resource else None, + metadata=resource.get("metadata"), + ) def dump(self, camel_case: bool = True) -> dict[str, Any]: result = super().dump(camel_case) From f4ee4c326a74a0e9604d0af8ace5d365ab114fc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Sat, 27 Dec 2025 23:45:32 +0100 Subject: [PATCH 17/18] add missing error handling in EntityMatchingAPI --- cognite/client/_api/entity_matching.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/cognite/client/_api/entity_matching.py b/cognite/client/_api/entity_matching.py index ed2adb5714..ce4e516933 100644 --- a/cognite/client/_api/entity_matching.py +++ b/cognite/client/_api/entity_matching.py @@ -22,6 +22,7 @@ class EntityMatchingAPI(APIClient): + # TODO: The API class should specify the resource path, not the data class: _RESOURCE_PATH = EntityMatchingModel._RESOURCE_PATH async def retrieve(self, id: int | None = None, external_id: str | None = None) -> EntityMatchingModel | None: @@ -272,8 +273,8 @@ async def predict( targets (Sequence[dict] | None): entities to match to, does not need an 'id' field. Tolerant to passing more than is needed or used. If omitted, will use data from fit. num_matches (int): number of matches to return for each item. score_threshold (float | None): only return matches with a score above this threshold - id (int | None): ids of the model to use. - external_id (str | None): external ids of the model to use. + id (int | None): id of the model to use. + external_id (str | None): external id of the model to use. Returns: EntityMatchingPredictionResult: object which can be used to wait for and retrieve results. @@ -293,17 +294,23 @@ async def predict( ... id=1 ... ) """ - - model = await self.retrieve(id=id, external_id=external_id) - # TODO: Change assert to proper error - assert model - return await model.predict_async( # could call predict directly but this is friendlier + model = await self._get_model_or_raise(id, external_id) + # TODO: The data class should call the API class 'predict' method, not the other way around: + return await model.predict_async( sources=EntityMatchingModel._dump_entities(sources), targets=EntityMatchingModel._dump_entities(targets), num_matches=num_matches, score_threshold=score_threshold, ) + async def _get_model_or_raise(self, id: int | None, external_id: str | None) -> EntityMatchingModel: + if id is external_id is None: + raise ValueError("Either id or external_id must be provided.") + model = await self.retrieve(id=id, external_id=external_id) + if model is None: + raise ValueError("No model found with the given identifier(s).") + return model + async def refit( self, true_matches: Sequence[dict | tuple[int | str, int | str]], @@ -332,7 +339,5 @@ async def refit( >>> true_matches = [(1, 101)] >>> model = client.entity_matching.refit(true_matches=true_matches, id=1) """ - model = await self.retrieve(id=id, external_id=external_id) - # TODO: Change assert to proper error - assert model + model = await self._get_model_or_raise(id, external_id) return await model.refit_async(true_matches=true_matches) From 0fbae1126334ed6330dded8d3a1e372d8b382aeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Tue, 30 Dec 2025 18:35:59 +0100 Subject: [PATCH 18/18] use asyncio.sleep (not time.sleep) in request retry loop --- cognite/client/_http_client.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cognite/client/_http_client.py b/cognite/client/_http_client.py index 606da0b716..affcd259ca 100644 --- a/cognite/client/_http_client.py +++ b/cognite/client/_http_client.py @@ -4,7 +4,6 @@ import functools import logging import random -import time from collections.abc import ( AsyncIterable, AsyncIterator, @@ -157,7 +156,7 @@ def get_backoff_time(self) -> float: cap = min(base, self.config.max_backoff_seconds) return 0.1 + random.uniform(0, cap) - def back_off(self) -> None: + async def back_off(self) -> None: backoff_time = self.get_backoff_time() self.total_time_in_backoff += backoff_time # We put logging here, since this is always called before retrying @@ -165,7 +164,7 @@ def back_off(self) -> None: f"Retrying failed request, attempt #{self.total}, backoff wait: {backoff_time:.4f} sec " f"(total: {self.total_time_in_backoff:.4f} sec), reason: {self.last_failed_reason!r}, url: {self.url}" ) - time.sleep(backoff_time) + await asyncio.sleep(backoff_time) @property def should_retry_total(self) -> bool: @@ -318,4 +317,4 @@ async def _with_retry( raise CogniteRequestError from err # If we got here, it means we have decided to retry the request! - retry_tracker.back_off() + await retry_tracker.back_off()