Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions MIGRATION_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Changes are grouped as follows:
### Optional
- **Async Support**: The SDK now provides full async support. The main client is now `AsyncCogniteClient`, but the synchronous `CogniteClient` is still available for backward compatibility. An important implementation detail is that it just wraps `AsyncCogniteClient`.
- All helper/utility methods on data classes now have an async variant. A few examples: Class `Asset` has `children` and now also `children_async`, `subtree` and `subtree_async`, class `Function` now has `call` and `call_async`, class `TimeSeries` now has `latest` and `latest_async` etc.
- Instantiating a client has gotten a tiny bit simpler, by allowing either `cluster` or `base_url` to be passed. When passing cluster, it is expected to be on the form 'https://{cluster}.cognitedata.com'
- The context manager `FileMultipartUploadSession`, returned by a call to one of the Files API methods multipart_upload_content` or `multipart_upload_content_session`, now also supports async; you can enter with `async with`, and upload parts using `await upload_part_async`.
- The SDK now ships with a new mock for the async client, namely `AsyncCogniteClientMock`. Both it and the previous `CogniteClientMock` are greatly improved and provide better type safety, checking of call signatures and spec_set=True is now enforced for all APIs (even the mocked client itself), through the use of `create_autospec` and bottom-up construction of nested APIs.
- With the move to an async client, concurrency now works in Pyodide e.g. Jupyter-Lite in the browser. This also means that user interfaces like Streamlit won't freeze while resources from CDF are being fetched!
Expand All @@ -38,7 +39,6 @@ Changes are grouped as follows:
- The `__iter__` method has been removed from all APIs. Use `__call__` instead: `for ts in client.time_series()`. This makes it seamless to pass one or more parameters.
- All references to `legacy_name` on time series data classes and API have been removed.
- The helper methods on `client.iam`, `compare_capabilities` and `verify_capabilities` no longer support the `ignore_allscope_meaning` parameter.
- The Files API no longer accepts file handles opened in text mode.
- The method `load_yaml` on the data class `Query` has been removed. Use `load` instead.
- The Templates API has been completely removed from the SDK (the API service has already been shut off)
- The separate beta `CogniteClient` has been removed. Note: APIs currently in alpha/beta are (already) implemented directly on the main client and throw warnings on use.
Expand All @@ -59,7 +59,6 @@ Changes are grouped as follows:
- Parameter `partitions` has been removed from all `__call__` methods except for the Raw Rows API (which has special handling for it). It was previosuly being ignored with the added side effect of ignoring `chunk_size` stemming from a very early API design oversight.
- The method `retrieve` on the Workflow Versions API no longer accepts `workflow_external_id` and `version` as separate arguments. Pass a single or a sequence of `WorkflowVersionId` (tuples also accepted).
- When loading a `ViewProperty` or `ViewPropertyApply`, the resource dictionary must contain the `"connectionType"` key or an error is raised.
- The Files API now expects `pathlib.Path` by default, but keeps the `str` support for now.
- The specific exceptions `CogniteDuplicatedError` and `CogniteNotFoundError` should now always be used when appropriate (previously certain API endpoints always used `CogniteAPIError`)
- `ModelFailedException` has changed name to `CogniteModelFailedError`.
- For `class Transformation`, which used to have an async `run` method, this is now named `run_async` to unify the overall interface. The same applies to the `cancel` and `jobs` methods for the same class, and `update` and `wait` on `TransformationJob`.
Expand Down
10 changes: 6 additions & 4 deletions cognite/client/_api/data_modeling/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -1343,10 +1343,11 @@ async def aggregate(
Get the average run time in minutes for pumps grouped by release year:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.data_modeling import ViewId, aggregations as aggs
>>> from cognite.client.data_classes.aggregations import Average
>>> from cognite.client.data_classes.data_modeling import ViewId
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> avg_run_time = aggs.Avg("runTimeMinutes")
>>> avg_run_time = Average("runTimeMinutes")
>>> view_id = ViewId("mySpace", "PumpView", "v1")
>>> res = client.data_modeling.instances.aggregate(view_id, avg_run_time, group_by="releaseYear")

Expand Down Expand Up @@ -1450,10 +1451,11 @@ async def histogram(
Find the number of people born per decade:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.data_modeling import aggregations as aggs, ViewId
>>> from cognite.client.data_classes.aggregations import Histogram
>>> from cognite.client.data_classes.data_modeling import ViewId
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> birth_by_decade = aggs.Histogram("birthYear", interval=10.0)
>>> birth_by_decade = Histogram("birthYear", interval=10.0)
>>> view_id = ViewId("mySpace", "PersonView", "v1")
>>> res = client.data_modeling.instances.histogram(view_id, birth_by_decade)
"""
Expand Down
25 changes: 15 additions & 10 deletions cognite/client/_api/entity_matching.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@


class EntityMatchingAPI(APIClient):
# TODO: The API class should specify the resource path, not the data class:
_RESOURCE_PATH = EntityMatchingModel._RESOURCE_PATH

async def retrieve(self, id: int | None = None, external_id: str | None = None) -> EntityMatchingModel | None:
Expand Down Expand Up @@ -272,8 +273,8 @@ async def predict(
targets (Sequence[dict] | None): entities to match to, does not need an 'id' field. Tolerant to passing more than is needed or used. If omitted, will use data from fit.
num_matches (int): number of matches to return for each item.
score_threshold (float | None): only return matches with a score above this threshold
id (int | None): ids of the model to use.
external_id (str | None): external ids of the model to use.
id (int | None): id of the model to use.
external_id (str | None): external id of the model to use.

Returns:
EntityMatchingPredictionResult: object which can be used to wait for and retrieve results.
Expand All @@ -293,17 +294,23 @@ async def predict(
... id=1
... )
"""

model = await self.retrieve(id=id, external_id=external_id)
# TODO: Change assert to proper error
assert model
return await model.predict_async( # could call predict directly but this is friendlier
model = await self._get_model_or_raise(id, external_id)
# TODO: The data class should call the API class 'predict' method, not the other way around:
return await model.predict_async(
sources=EntityMatchingModel._dump_entities(sources),
targets=EntityMatchingModel._dump_entities(targets),
num_matches=num_matches,
score_threshold=score_threshold,
)

async def _get_model_or_raise(self, id: int | None, external_id: str | None) -> EntityMatchingModel:
if id is external_id is None:
raise ValueError("Either id or external_id must be provided.")
model = await self.retrieve(id=id, external_id=external_id)
if model is None:
raise ValueError("No model found with the given identifier(s).")
return model

async def refit(
self,
true_matches: Sequence[dict | tuple[int | str, int | str]],
Expand Down Expand Up @@ -332,7 +339,5 @@ async def refit(
>>> true_matches = [(1, 101)]
>>> model = client.entity_matching.refit(true_matches=true_matches, id=1)
"""
model = await self.retrieve(id=id, external_id=external_id)
# TODO: Change assert to proper error
assert model
model = await self._get_model_or_raise(id, external_id)
return await model.refit_async(true_matches=true_matches)
36 changes: 19 additions & 17 deletions cognite/client/_basic_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from cognite.client.exceptions import (
CogniteAPIError,
CogniteDuplicatedError,
CogniteHTTPStatusError,
CogniteNotFoundError,
CogniteProjectAccessError,
)
Expand All @@ -30,6 +31,7 @@
if TYPE_CHECKING:
from cognite.client import AsyncCogniteClient
from cognite.client.config import ClientConfig
from cognite.client.data_classes._response import CogniteSDKResponse


logger = logging.getLogger(__name__)
Expand All @@ -45,15 +47,15 @@ class FailedRequestHandler:
headers: dict[str, str] | httpx.Headers
response_headers: dict[str, str] | httpx.Headers
extra: dict[str, Any]
cause: httpx.HTTPStatusError
cause: CogniteHTTPStatusError
stream: bool

def __post_init__(self) -> None:
self.headers = BasicAsyncAPIClient._sanitize_headers(self.headers)
self.response_headers = BasicAsyncAPIClient._sanitize_headers(self.response_headers)

@classmethod
async def from_status_error(cls, err: httpx.HTTPStatusError, stream: bool) -> Self:
async def from_status_error(cls, err: CogniteHTTPStatusError, stream: bool) -> Self:
response = err.response
error, missing, duplicated = {}, None, None

Expand Down Expand Up @@ -134,7 +136,7 @@ async def _raise_no_project_access_error(
x_request_id=self.x_request_id,
maybe_projects=maybe_projects,
cluster=cluster,
) from None # we don't surface the underlying httpx.HTTPStatusError
) from None # we don't surface the underlying CogniteHTTPStatusError

def _raise_api_error(self, err_type: type[CogniteAPIError], cluster: str | None, project: str) -> NoReturn:
raise err_type(
Expand Down Expand Up @@ -230,7 +232,7 @@ async def _request(
timeout: float | None = None,
include_cdf_headers: bool = False,
api_subversion: str | None = None,
) -> httpx.Response:
) -> CogniteSDKResponse:
"""
Make a request to something that is outside Cognite Data Fusion, with retry enabled.
Requires the caller to handle errors coming from non-2xx response status codes.
Expand All @@ -245,10 +247,10 @@ async def _request(
api_subversion (str | None): When include_cdf_headers=True, override the API subversion to use for the request. Has no effect otherwise.

Returns:
httpx.Response: The response from the server.
CogniteSDKResponse: The response from the server.

Raises:
httpx.HTTPStatusError: If the response status code is 4xx or 5xx.
CogniteHTTPStatusError: If the response status code is 4xx or 5xx.
"""
http_client = self._select_async_http_client(method in {"GET", "PUT", "HEAD"})
if include_cdf_headers:
Expand All @@ -260,7 +262,7 @@ async def _request(
self._log_successful_request(res)
return res

except httpx.HTTPStatusError as err:
except CogniteHTTPStatusError as err:
handler = await FailedRequestHandler.from_status_error(err, stream=False)
handler.log_failed_request()
raise
Expand All @@ -278,7 +280,7 @@ async def _stream(
full_headers: dict[str, Any] | None = None,
timeout: float | None = None,
api_subversion: str | None = None,
) -> AsyncIterator[httpx.Response]:
) -> AsyncIterator[CogniteSDKResponse]:
assert url_path or full_url, "Either url_path or full_url must be provided"
full_url = full_url or resolve_url(self, method, cast(str, url_path))[1]
if full_headers is None:
Expand All @@ -293,7 +295,7 @@ async def _stream(
self._log_successful_request(resp, payload=json, stream=True)
yield resp

except httpx.HTTPStatusError as err:
except CogniteHTTPStatusError as err:
await self._handle_status_error(err, payload=json, stream=True)

async def _get(
Expand All @@ -304,7 +306,7 @@ async def _get(
follow_redirects: bool = False,
api_subversion: str | None = None,
semaphore: asyncio.BoundedSemaphore | None = None,
) -> httpx.Response:
) -> CogniteSDKResponse:
_, full_url = resolve_url(self, "GET", url_path)
full_headers = self._configure_headers(additional_headers=headers, api_subversion=api_subversion)
try:
Expand All @@ -317,7 +319,7 @@ async def _get(
timeout=self._config.timeout,
semaphore=semaphore,
)
except httpx.HTTPStatusError as err:
except CogniteHTTPStatusError as err:
await self._handle_status_error(err)

self._log_successful_request(res)
Expand All @@ -332,7 +334,7 @@ async def _post(
follow_redirects: bool = False,
api_subversion: str | None = None,
semaphore: asyncio.BoundedSemaphore | None = None,
) -> httpx.Response:
) -> CogniteSDKResponse:
is_retryable, full_url = resolve_url(self, "POST", url_path)
full_headers = self._configure_headers(additional_headers=headers, api_subversion=api_subversion)
# We want to control json dumping, so we pass it along to httpx.Client.post as 'content'
Expand All @@ -350,7 +352,7 @@ async def _post(
timeout=self._config.timeout,
semaphore=semaphore,
)
except httpx.HTTPStatusError as err:
except CogniteHTTPStatusError as err:
await self._handle_status_error(err, payload=json)

self._log_successful_request(res, payload=json)
Expand All @@ -367,7 +369,7 @@ async def _put(
api_subversion: str | None = None,
timeout: float | None = None,
semaphore: asyncio.BoundedSemaphore | None = None,
) -> httpx.Response:
) -> CogniteSDKResponse:
_, full_url = resolve_url(self, "PUT", url_path)

full_headers = self._configure_headers(additional_headers=headers, api_subversion=api_subversion)
Expand All @@ -384,7 +386,7 @@ async def _put(
timeout=timeout or self._config.timeout,
semaphore=semaphore,
)
except httpx.HTTPStatusError as err:
except CogniteHTTPStatusError as err:
await self._handle_status_error(err, payload=json)

self._log_successful_request(res, payload=json)
Expand Down Expand Up @@ -417,15 +419,15 @@ def _refresh_auth_header(self, headers: MutableMapping[str, Any]) -> None:
headers[auth_header_name] = auth_header_value

async def _handle_status_error(
self, error: httpx.HTTPStatusError, payload: dict[str, Any] | None = None, stream: bool = False
self, error: CogniteHTTPStatusError, payload: dict[str, Any] | None = None, stream: bool = False
) -> NoReturn:
"""The response had an HTTP status code of 4xx or 5xx"""
handler = await FailedRequestHandler.from_status_error(error, stream=stream)
handler.log_failed_request(payload)
await handler.raise_api_error(self._cognite_client)

def _log_successful_request(
self, res: httpx.Response, payload: dict[str, Any] | None = None, stream: bool = False
self, res: CogniteSDKResponse, payload: dict[str, Any] | None = None, stream: bool = False
) -> None:
extra: dict[str, Any] = {
"headers": self._sanitize_headers(res.request.headers),
Expand Down
8 changes: 4 additions & 4 deletions cognite/client/_cognite_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from cognite.client.utils._auxiliary import load_resource_to_dict

if TYPE_CHECKING:
import httpx
from cognite.client.data_classes._response import CogniteSDKResponse


class AsyncCogniteClient:
Expand Down Expand Up @@ -94,7 +94,7 @@ def __init__(self, config: ClientConfig | None = None) -> None:

async def get(
self, url: str, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None
) -> httpx.Response:
) -> CogniteSDKResponse:
"""Perform a GET request to an arbitrary path in the API."""
return await self._api_client._get(url, params=params, headers=headers)

Expand All @@ -104,7 +104,7 @@ async def post(
json: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
headers: dict[str, Any] | None = None,
) -> httpx.Response:
) -> CogniteSDKResponse:
"""Perform a POST request to an arbitrary path in the API."""
return await self._api_client._post(url, json=json, params=params, headers=headers)

Expand All @@ -114,7 +114,7 @@ async def put(
json: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
headers: dict[str, Any] | None = None,
) -> httpx.Response:
) -> CogniteSDKResponse:
"""Perform a PUT request to an arbitrary path in the API."""
return await self._api_client._put(url, json=json, params=params, headers=headers)

Expand Down
Loading
Loading