Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,36 @@ df = client.api_to_dataframe(data)
print(df)
```

### Fluent authentication configuration

The builder offers a fluent interface to configure authentication strategies. Any
auth provider implementing the `AuthProvider` interface can be applied via
`with_auth(...)` right before sending requests.

```python
from datetime import datetime, timedelta

from api_to_dataframe import ClientBuilder
from api_to_dataframe.models.auth import ApiKeyAuth, BearerTokenAuth


client = ClientBuilder(endpoint="https://api.example.com").with_auth(
ApiKeyAuth("X-Api-Key", "static-key")
)


def fetch_token():
"""Return a short-lived token and its expiry timestamp."""
return "dynamic-token", datetime.utcnow() + timedelta(minutes=5)


secure_client = ClientBuilder(endpoint="https://secure.example.com").with_auth(
BearerTokenAuth(fetch_token, refresh_margin=60)
)

payload = secure_client.get_api_data()
```

## Important notes:
* **Opcionals Parameters:** The params timeout, retry_strategy and headers are opcionals.
* **Default Params Value:** By default the quantity of retries is 3 and the time between retries is 1 second, but you can define manually.
Expand Down
22 changes: 20 additions & 2 deletions src/api_to_dataframe/controller/client_builder.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from typing import Optional

from api_to_dataframe.models.auth import AuthProvider
from api_to_dataframe.models.retainer import retry_strategies, Strategies
from api_to_dataframe.models.get_data import GetData
from api_to_dataframe.utils.logger import logger
Expand Down Expand Up @@ -53,9 +56,10 @@ def __init__( # pylint: disable=too-many-positional-arguments,too-many-argument
self.endpoint = endpoint
self.retry_strategy = retry_strategy
self.connection_timeout = connection_timeout
self.headers = headers
self.headers = dict(headers)
self.retries = retries
self.delay = initial_delay
self._auth_provider: Optional[AuthProvider] = None

@retry_strategies
def get_api_data(self):
Expand All @@ -69,14 +73,21 @@ def get_api_data(self):
Returns:
dict: The JSON response from the API as a dictionary.
"""
headers = self._compose_headers()

response = GetData.get_response(
endpoint=self.endpoint,
headers=self.headers,
headers=headers,
connection_timeout=self.connection_timeout,
)

return response.json()

def with_auth(self, auth_provider: AuthProvider):
"""Attach an authentication provider used to enrich request headers."""
self._auth_provider = auth_provider
return self

@staticmethod
def api_to_dataframe(response: dict):
"""
Expand All @@ -93,3 +104,10 @@ def api_to_dataframe(response: dict):
DataFrame: A pandas DataFrame containing the data from the API response.
"""
return GetData.to_dataframe(response)

def _compose_headers(self) -> dict:
"""Compose request headers including optional authentication details."""
composed_headers = dict(self.headers)
if self._auth_provider is not None:
composed_headers = self._auth_provider.apply(composed_headers)
return composed_headers
119 changes: 119 additions & 0 deletions src/api_to_dataframe/models/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""Authentication providers to enrich API requests with authorization headers."""
from __future__ import annotations

from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from typing import Callable, Dict, Optional, Tuple

AuthHeaders = Dict[str, str]
TokenWithExpiry = Tuple[str, Optional[datetime]]


class AuthProvider(ABC):
"""Represents a strategy capable of injecting authentication information."""

@abstractmethod
def apply(self, headers: Optional[AuthHeaders] = None) -> AuthHeaders:
"""Return request headers containing the required authentication details."""


class ApiKeyAuth(AuthProvider):
"""Static API key authentication added to a configurable header."""

def __init__(self, header_name: str, api_key: str):
"""Store the header name and API key used for authenticated requests."""
self.header_name = header_name
self.api_key = api_key

def apply(self, headers: Optional[AuthHeaders] = None) -> AuthHeaders:
"""Return headers with the API key injected into the configured header."""
composed_headers = dict(headers or {})
composed_headers[self.header_name] = self.api_key
return composed_headers


class BearerTokenAuth(AuthProvider):
"""Bearer token authentication supporting automatic token refresh."""

def __init__(
self,
token_supplier: Callable[[], TokenWithExpiry],
*,
header_name: str = "Authorization",
scheme: str = "Bearer",
refresh_margin: float = 0,
clock: Callable[[], datetime] = datetime.utcnow,
):
"""Configure the bearer token strategy and how tokens are supplied."""
self._token_supplier = token_supplier
self.header_name = header_name
self.scheme = scheme
self._token: Optional[str] = None
self._expires_at: Optional[datetime] = None
self._refresh_margin = timedelta(seconds=refresh_margin)
self._clock = clock

def _ensure_token(self) -> None:
"""Fetch or refresh the bearer token when it is missing or expired."""
if self._token is None or self._should_refresh():
self._token, self._expires_at = self._token_supplier()

def _should_refresh(self) -> bool:
"""Determine whether a new token is required based on the expiry time."""
if self._expires_at is None:
return False
return self._expires_at <= self._clock() + self._refresh_margin

def apply(self, headers: Optional[AuthHeaders] = None) -> AuthHeaders:
"""Return headers containing a valid bearer token with optional refresh."""
self._ensure_token()
composed_headers = dict(headers or {})
composed_headers[self.header_name] = f"{self.scheme} {self._token}"
return composed_headers


class OAuth2ClientCredentials(AuthProvider):
"""OAuth2 Client Credentials authentication with token renewal support."""

def __init__(
self,
client_id: str,
client_secret: str,
token_fetcher: Callable[[str, str, Optional[str]], TokenWithExpiry],
*,
scope: Optional[str] = None,
header_name: str = "Authorization",
refresh_margin: float = 30,
clock: Callable[[], datetime] = datetime.utcnow,
):
"""Store client credentials and the callable responsible for new tokens."""
self.client_id = client_id
self.client_secret = client_secret
self.scope = scope
self._token_fetcher = token_fetcher
self.header_name = header_name
self._refresh_margin = timedelta(seconds=refresh_margin)
self._clock = clock
self._token: Optional[str] = None
self._expires_at: Optional[datetime] = None

def _ensure_token(self) -> None:
"""Obtain a valid OAuth2 access token using the configured fetcher."""
if self._token is None or self._should_refresh():
self._token, self._expires_at = self._token_fetcher(
self.client_id, self.client_secret, self.scope
)

def _should_refresh(self) -> bool:
"""Determine whether the current OAuth2 token needs to be refreshed."""
if self._expires_at is None:
return False
return self._expires_at <= self._clock() + self._refresh_margin

def apply(self, headers: Optional[AuthHeaders] = None) -> AuthHeaders:
"""Return headers augmented with a fresh OAuth2 bearer token."""
self._ensure_token()
composed_headers = dict(headers or {})
composed_headers[self.header_name] = f"Bearer {self._token}"
return composed_headers

56 changes: 45 additions & 11 deletions tests/test_controller_client_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import responses

from api_to_dataframe import ClientBuilder, RetryStrategies
from api_to_dataframe.models.auth import ApiKeyAuth


@pytest.fixture()
Expand All @@ -13,14 +14,6 @@ def client_setup():
return new_client


@pytest.fixture()
def response_setup():
new_client = ClientBuilder(
endpoint="https://economia.awesomeapi.com.br/last/USD-BRL"
)
return new_client.get_api_data()


def test_constructor_raises():
with pytest.raises(ValueError):
ClientBuilder(endpoint="")
Expand Down Expand Up @@ -87,15 +80,39 @@ def test_constructor_with_retry_strategy():
assert client.delay == 2


@responses.activate
def test_response_to_json(client_setup): # pylint: disable=redefined-outer-name
"""Test JSON response retrieval using a mocked HTTP endpoint."""
expected_payload = {"rate": 5.2}
responses.add(
responses.GET,
client_setup.endpoint,
json=expected_payload,
status=200,
)

new_client = client_setup
response = new_client.get_api_data() # pylint: disable=protected-access
assert isinstance(response, dict)

assert response == expected_payload


def test_to_dataframe(response_setup): # pylint: disable=redefined-outer-name
df = ClientBuilder.api_to_dataframe(response_setup)
@responses.activate
def test_to_dataframe(client_setup): # pylint: disable=redefined-outer-name
"""Test DataFrame conversion using mocked API data."""
expected_payload = {"id": [1, 2], "value": ["a", "b"]}
responses.add(
responses.GET,
client_setup.endpoint,
json=expected_payload,
status=200,
)

response_data = client_setup.get_api_data()
df = ClientBuilder.api_to_dataframe(response_data)

assert isinstance(df, pd.DataFrame)
assert not df.empty


@responses.activate
Expand All @@ -118,3 +135,20 @@ def test_get_api_data_with_mocked_response():
assert response == expected_data
assert len(responses.calls) == 1
assert responses.calls[0].request.url == endpoint


@responses.activate
def test_client_builder_with_auth_headers():
"""Ensure ClientBuilder augments request headers using the auth provider."""
endpoint = "https://api.test.com/data"
responses.add(responses.GET, endpoint, json={"status": "ok"}, status=200)

client = ClientBuilder(endpoint=endpoint, headers={"Accept": "application/json"})
client.with_auth(ApiKeyAuth("X-Api-Key", "secret"))

payload = client.get_api_data()

sent_headers = responses.calls[0].request.headers
assert payload == {"status": "ok"}
assert sent_headers["X-Api-Key"] == "secret"
assert sent_headers["Accept"] == "application/json"
Loading