Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 35 additions & 15 deletions cognite/client/_basic_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,25 +213,45 @@ async def _request(
method: Literal["GET", "PUT", "HEAD"],
/,
full_url: str,
content: str | bytes | Iterable[bytes] | None = None,
content: bytes | AsyncIterator[bytes] | None = None,
headers: dict[str, Any] | None = None,
timeout: float | None = None,
api_subversion: str | None = None,
include_cdf_headers: bool = False,
api_subversion: str | None = None,
) -> httpx.Response:
"""Make a request to something that is outside Cognite Data Fusion"""
"""
Make a request to something that is outside Cognite Data Fusion, with retry enabled.
Requires the caller to handle errors coming from non-2xx response status codes.

Args:
method (Literal['GET', 'PUT', 'HEAD']): HTTP method.
full_url (str): Full URL to make the request to.
content (bytes | AsyncIterator[bytes] | None): Optional body content to send along with the request.
headers (dict[str, Any] | None): Optional headers to include in the request.
timeout (float | None): Override the default timeout for this request.
include_cdf_headers (bool): Whether to include Cognite Data Fusion headers in the request. Defaults to False.
api_subversion (str | None): When include_cdf_headers=True, override the API subversion to use for the request. Has no effect otherwise.

Returns:
httpx.Response: The response from the server.

Raises:
httpx.HTTPStatusError: If the response status code is 4xx or 5xx.
"""
client = self._select_async_http_client(method in {"GET", "PUT", "HEAD"})
if include_cdf_headers:
headers = self._configure_headers(additional_headers=headers, api_subversion=api_subversion)
try:
res = await client(
method, full_url, content=content, headers=headers, timeout=timeout or self._config.timeout
)
except httpx.HTTPStatusError as err:
await self._handle_status_error(err)
self._log_successful_request(res)
return res

self._log_successful_request(res)
return res
except httpx.HTTPStatusError as err:
handler = await FailedRequestHandler.from_status_error(err, stream=False)
handler.log_failed_request()
raise

@asynccontextmanager
async def _stream(
Expand All @@ -241,14 +261,14 @@ async def _stream(
*,
url_path: str | None = None,
full_url: str | None = None,
json: Any = None,
json: dict[str, Any] | None = None,
headers: dict[str, Any] | None = None,
full_headers: dict[str, Any] | None = None,
timeout: float | None = None,
api_subversion: str | None = None,
) -> AsyncIterator[httpx.Response]:
assert url_path or full_url, "Either url_path or full_url must be provided"
full_url = full_url or resolve_url(self, "GET", cast(str, url_path))[1]
full_url = full_url or resolve_url(self, method, cast(str, url_path))[1]
if full_headers is None:
full_headers = self._configure_headers(headers, api_subversion)

Expand All @@ -262,7 +282,7 @@ async def _stream(
yield resp

except httpx.HTTPStatusError as err:
await self._handle_status_error(err, stream=True)
await self._handle_status_error(err, payload=json, stream=True)

async def _get(
self,
Expand Down Expand Up @@ -319,15 +339,15 @@ async def _post(
semaphore=semaphore,
)
except httpx.HTTPStatusError as err:
await self._handle_status_error(err)
await self._handle_status_error(err, payload=json)

self._log_successful_request(res, payload=json)
return res

async def _put(
self,
url_path: str,
content: str | bytes | Iterable[bytes] | None = None,
content: str | bytes | AsyncIterator[bytes] | None = None,
json: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
headers: dict[str, Any] | None = None,
Expand All @@ -337,10 +357,10 @@ async def _put(
semaphore: asyncio.BoundedSemaphore | None = None,
) -> httpx.Response:
_, full_url = resolve_url(self, "PUT", url_path)

full_headers = self._configure_headers(additional_headers=headers, api_subversion=api_subversion)
if content is None:
content = self._handle_json_dump(json, full_headers)

try:
res = await self._http_client_with_retry(
"PUT",
Expand All @@ -353,7 +373,7 @@ async def _put(
semaphore=semaphore,
)
except httpx.HTTPStatusError as err:
await self._handle_status_error(err)
await self._handle_status_error(err, payload=json)

self._log_successful_request(res, payload=json)
return res
Expand Down Expand Up @@ -381,7 +401,7 @@ def _refresh_auth_header(self, headers: MutableMapping[str, Any]) -> None:
headers[auth_header_name] = auth_header_value

async def _handle_status_error(
self, error: httpx.HTTPStatusError, payload: dict | None = None, stream: bool = False
self, error: httpx.HTTPStatusError, payload: dict[str, Any] | None = None, stream: bool = False
) -> NoReturn:
"""The response had an HTTP status code of 4xx or 5xx"""
handler = await FailedRequestHandler.from_status_error(error, stream=stream)
Expand Down
14 changes: 0 additions & 14 deletions cognite/client/_cognite_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,17 +261,3 @@ def load(cls, config: dict[str, Any] | str) -> AsyncCogniteClient:
"""
loaded = load_resource_to_dict(config)
return cls(config=ClientConfig.load(loaded))


class CogniteClient:
"""Main entrypoint into the Cognite Python SDK.

All Cognite Data Fusion APIs are accessible through this synchronous client.
For the asynchronous client, see :class:`~cognite.client._cognite_client.AsyncCogniteClient`.

Args:
config (ClientConfig | None): The configuration for this client.
"""

def __init__(self, config: ClientConfig | None = None) -> None:
raise NotImplementedError
Empty file.
Empty file.
Loading
Loading