Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ exclude = (?x)(
| ^tests/test_sse_connection_manager\.py$
| ^prefab_pb2.*\.pyi?$
| ^examples/
| ^tests/test_api_client\.py$
)

# Strict typing options
Expand Down
209 changes: 175 additions & 34 deletions prefab_cloud_python/_requests.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import importlib
from socket import socket
from typing import Optional
import re
from collections import OrderedDict
from dataclasses import dataclass
import time

from ._internal_logging import (
InternalLogger,
Expand Down Expand Up @@ -72,51 +74,190 @@ def __next__(self):
return host


# --- Simple LRU Cache Implementation ---


@dataclass
class CacheEntry:
data: bytes
etag: str
expires_at: float
url: str # The full URL from the successful response


class LRUCache:
def __init__(self, max_size: int):
self.max_size = max_size
self.cache = OrderedDict()

def get(self, key):
try:
value = self.cache.pop(key)
self.cache[key] = value # Mark as recently used.
return value
except KeyError:
return None

def set(self, key, value):
if key in self.cache:
self.cache.pop(key)
elif len(self.cache) >= self.max_size:
self.cache.popitem(last=False)
self.cache[key] = value

def clear(self):
self.cache.clear()

def __len__(self):
return len(self.cache)


class ApiClient:
def __init__(self, options):
"""
:param options: An object with attributes such as:
- prefab_api_urls: list of API host URLs (e.g. ["https://a.example.com", "https://b.example.com"])
- version: version string
"""
self.hosts = options.prefab_api_urls
self.session = requests.Session()
self.session.mount("https://", NoRetryAdapter())
self.session.mount("http://", NoRetryAdapter())
self.session.headers.update({VersionHeader: f"prefab-cloud-python-{Version}"})
self.session.mount("https://", requests.adapters.HTTPAdapter())
self.session.mount("http://", requests.adapters.HTTPAdapter())
self.session.headers.update(
{
"X-PrefabCloud-Client-Version": f"prefab-cloud-python-{getattr(options, 'version', 'development')}"
}
)
# Initialize a cache (here with a maximum of 2 entries).
self.cache = LRUCache(max_size=2)

def get_host(self, attempt_number, host_list):
return host_list[attempt_number % len(host_list)]

def _get_attempt_number(self) -> int:
"""
Retrieve the current attempt number from tenacity's statistics if available,
otherwise default to 1.
"""
stats = getattr(self.resilient_request, "statistics", None)
if stats is None:
return 1
return stats.get("attempt_number", 1)

def _build_url(self, path, hosts: list[str] = None) -> str:
"""
Build the full URL using host-selection logic.
"""
attempt_number = self._get_attempt_number()
host = self.get_host(attempt_number - 1, hosts or self.hosts)
return f"{host.rstrip('/')}/{path.lstrip('/')}"

def _get_cached_response(self, url: str) -> Response:
"""
If a valid cache entry exists for the given URL, return a synthetic Response.
"""
now = time.time()
entry = self.cache.get(url)
if entry is not None and entry.expires_at > now:
resp = Response()
resp._content = entry.data
resp.status_code = 200
resp.headers = {"ETag": entry.etag, "X-Cache": "HIT"}
resp.url = entry.url
return resp
return None

def _apply_cache_headers(self, url: str, kwargs: dict) -> dict:
"""
If a stale cache entry exists, add its ETag as an 'If-None-Match' header.
"""
entry = self.cache.get(url)
headers = kwargs.get("headers", {}).copy()
if entry is not None and entry.etag:
headers["If-None-Match"] = entry.etag
kwargs["headers"] = headers
return kwargs

def _update_cache(self, url: str, response: Response) -> None:
"""
If the response is cacheable (status 200, and Cache-Control does not include 'no-store'),
update the cache. If Cache-Control includes 'no-cache', mark the cache entry as immediately expired,
so that subsequent requests always trigger revalidation.
"""
cache_control = response.headers.get("Cache-Control", "")
if "no-store" in cache_control.lower():
return

etag = response.headers.get("ETag")
max_age = 0
m = re.search(r"max-age=(\d+)", cache_control)
if m:
max_age = int(m.group(1))

# If 'no-cache' is present, then even though we may store the response,
# we treat it as expired immediately so that every subsequent request is revalidated.
if "no-cache" in cache_control.lower():
expires_at = time.time() # Immediately expired.
else:
expires_at = time.time() + max_age if max_age > 0 else 0

if (etag is not None or max_age > 0) and expires_at > time.time():
self.cache.set(
url,
CacheEntry(
data=response.content,
etag=etag,
expires_at=expires_at,
url=response.url,
),
)
response.headers["X-Cache"] = "MISS"

def _send_request(self, method: str, url: str, **kwargs) -> Response:
"""
Hook method to perform the actual HTTP request.
"""
return self.session.request(method, url, **kwargs)

@retry(
stop=stop_after_delay(8),
wait=wait_exponential(multiplier=1, min=0.05, max=2),
retry=retry_if_exception_type((RequestException, ConnectionError, OSError)),
)
def resilient_request(
self, path, method="GET", hosts: Optional[list[str]] = None, **kwargs
self,
path,
method="GET",
allow_cache: bool = False,
hosts: list[str] = None,
**kwargs,
) -> Response:
# Get the current attempt number from tenacity's context
attempt_number = self.resilient_request.statistics["attempt_number"]
host = self.get_host(
attempt_number - 1, hosts or self.hosts
) # Subtract 1 because attempt_number starts at 1
url = f"{host.rstrip('/')}/{path.lstrip('/')}"
"""
Makes a resilient (retrying) request.

try:
logger.info(f"Attempt {attempt_number}: Requesting {url}")
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
logger.info(f"Attempt {attempt_number}: Successful request to {url}")
return response
except (RequestException, ConnectionError) as e:
logger.warning(
f"Attempt {attempt_number}: Request to {url} failed: {str(e)}. Will retry"
)
raise
except OSError as e:
if isinstance(e, socket.gaierror):
logger.warning(
f"Attempt {attempt_number}: DNS resolution failed for {url}: {str(e)}. Will retry"
)
raise
else:
logger.error(
f"Attempt {attempt_number}: Non-retryable error occurred: {str(e)}"
)
raise
If allow_cache is True and the request method is GET, caching logic is applied.
This includes:
- Checking the cache and returning a synthetic response if valid.
- Adding an 'If-None-Match' header when a stale entry exists.
- Handling a 304 (Not Modified) response by returning the cached entry.
- Caching a 200 response if Cache-Control permits.
"""
url = self._build_url(path, hosts)
if method.upper() == "GET" and allow_cache:
cached = self._get_cached_response(url)
if cached:
return cached
kwargs = self._apply_cache_headers(url, kwargs)
response = self._send_request(method, url, **kwargs)
if method.upper() == "GET" and allow_cache:
if response.status_code == 304:
cached = self.cache.get(url)
if cached:
resp = Response()
resp._content = cached.data
resp.status_code = 200
resp.headers = {"ETag": cached.etag, "X-Cache": "HIT"}
resp.url = cached.url
return resp
self._update_cache(url, response)
return response
6 changes: 5 additions & 1 deletion prefab_cloud_python/config_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,12 @@ def load_initial_data(self):

def load_checkpoint_from_api_cdn(self):
try:
hwm = self.config_loader.highwater_mark
response = self.api_client.resilient_request(
"/api/v1/configs/0", auth=("authuser", self.options.api_key), timeout=4
"/api/v1/configs/" + str(hwm),
auth=("authuser", self.options.api_key),
timeout=4,
allow_cache=True,
)
if response.ok:
configs = Prefab.Configs.FromString(response.content)
Expand Down
Loading
Loading