|
21 | 21 | InactiveAPIKeyError, |
22 | 22 | InvalidAPIKeyError, |
23 | 23 | ) |
| 24 | +from .daily_usage import DailyUsageRepository |
24 | 25 | from .db import APIKeyRecord, APIKeyRepository, UsageLimitExceededError |
25 | 26 |
|
26 | 27 |
|
|
37 | 38 | FIRESTORE_COLLECTION = os.getenv("FIRESTORE_COLLECTION", "apiKeys") |
38 | 39 | API_KEY_CACHE_TTL = int(os.getenv("API_KEY_CACHE_TTL", "30")) |
39 | 40 | API_KEY_CACHE_MAX_ITEMS = int(os.getenv("API_KEY_CACHE_MAX_ITEMS", "1024")) |
| 41 | +FREE_LIMIT_DEFAULT_PRO = 1500 |
| 42 | +FREE_LIMIT_DEFAULT_FLASH = 1500 |
| 43 | + |
| 44 | + |
| 45 | +def _parse_free_limit(env_var: str, default: int) -> int: |
| 46 | + """Parse a non-negative integer from the environment with logging.""" |
| 47 | + value = os.getenv(env_var) |
| 48 | + if value is None or value == "": |
| 49 | + return default |
| 50 | + try: |
| 51 | + parsed = int(value) |
| 52 | + except ValueError: |
| 53 | + logger.warning( |
| 54 | + "Invalid value '%s' for %s; falling back to %d", |
| 55 | + value, |
| 56 | + env_var, |
| 57 | + default, |
| 58 | + ) |
| 59 | + return default |
| 60 | + if parsed < 0: |
| 61 | + logger.warning( |
| 62 | + "Negative value '%s' for %s; treating as 0", |
| 63 | + value, |
| 64 | + env_var, |
| 65 | + ) |
| 66 | + return 0 |
| 67 | + return parsed |
| 68 | + |
| 69 | + |
| 70 | +MODEL_TO_USAGE_BUCKET: dict[str, str] = { |
| 71 | + "gemini-2.5-pro": "gemini-2.5-pro", |
| 72 | + "gemini-2.5-flash": "gemini-2.5-flash-family", |
| 73 | + "gemini-2.5-flash-lite": "gemini-2.5-flash-family", |
| 74 | +} |
| 75 | + |
| 76 | +BUCKET_FREE_LIMITS: dict[str, int] = { |
| 77 | + "gemini-2.5-pro": _parse_free_limit( |
| 78 | + "GEMINI_GROUNDING_FREE_LIMIT_PRO", |
| 79 | + FREE_LIMIT_DEFAULT_PRO, |
| 80 | + ), |
| 81 | + "gemini-2.5-flash-family": _parse_free_limit( |
| 82 | + "GEMINI_GROUNDING_FREE_LIMIT_FLASH", |
| 83 | + FREE_LIMIT_DEFAULT_FLASH, |
| 84 | + ), |
| 85 | +} |
| 86 | + |
| 87 | + |
| 88 | +def _resolve_usage_bucket(model: str) -> tuple[str, int]: |
| 89 | + """Return the usage bucket and free allowance for the given model.""" |
| 90 | + bucket = MODEL_TO_USAGE_BUCKET.get(model, model) |
| 91 | + return bucket, BUCKET_FREE_LIMITS.get(bucket, 0) |
| 92 | + |
40 | 93 |
|
41 | 94 | RETRYABLE_EXCEPTIONS: tuple[type[Exception], ...] = ( |
42 | 95 | google_exceptions.ResourceExhausted, |
@@ -189,6 +242,13 @@ async def startup_event() -> None: |
189 | 242 | cache_ttl_seconds=API_KEY_CACHE_TTL, |
190 | 243 | cache_max_items=API_KEY_CACHE_MAX_ITEMS, |
191 | 244 | ) |
| 245 | + app.state.daily_usage_repository = DailyUsageRepository( |
| 246 | + firestore_client, |
| 247 | + collection_name=os.getenv( |
| 248 | + "DAILY_USAGE_COLLECTION", |
| 249 | + "dailyUsageCounters", |
| 250 | + ), |
| 251 | + ) |
192 | 252 |
|
193 | 253 |
|
194 | 254 | async def shutdown_event() -> None: |
@@ -235,6 +295,18 @@ def get_authenticator() -> APIKeyAuthenticator: |
235 | 295 | return authenticator |
236 | 296 |
|
237 | 297 |
|
| 298 | +def get_daily_usage_repository() -> DailyUsageRepository: |
| 299 | + """Return the daily usage repository stored on the app state.""" |
| 300 | + repository: DailyUsageRepository | None = getattr( |
| 301 | + app.state, |
| 302 | + "daily_usage_repository", |
| 303 | + None, |
| 304 | + ) |
| 305 | + if repository is None: |
| 306 | + raise RuntimeError("Daily usage repository has not been initialised") |
| 307 | + return repository |
| 308 | + |
| 309 | + |
238 | 310 | async def _authenticate_request( |
239 | 311 | api_key_header: str, |
240 | 312 | authenticator: APIKeyAuthenticator, |
@@ -269,37 +341,6 @@ async def _authenticate_request( |
269 | 341 | ) from exc |
270 | 342 |
|
271 | 343 |
|
272 | | -async def require_api_key( |
273 | | - api_key_header: Annotated[str, Header(alias="X-API-Key")], |
274 | | - authenticator: Annotated[APIKeyAuthenticator, Depends(get_authenticator)], |
275 | | -) -> APIKeyRecord: |
276 | | - """Validate the user's API key and reserve a usage slot. |
277 | | -
|
278 | | - Parameters |
279 | | - ---------- |
280 | | - api_key_header : str |
281 | | - API key supplied in the ``X-API-Key`` header. |
282 | | - authenticator : APIKeyAuthenticator |
283 | | - Authenticator responsible for validating and reserving usage. |
284 | | -
|
285 | | - Returns |
286 | | - ------- |
287 | | - APIKeyRecord |
288 | | - Updated API key record that includes the latest usage counter. |
289 | | -
|
290 | | - Raises |
291 | | - ------ |
292 | | - HTTPException |
293 | | - Raised when the API key is invalid, inactive, or has exhausted its |
294 | | - quota. |
295 | | - """ |
296 | | - return await _authenticate_request( |
297 | | - api_key_header, |
298 | | - authenticator, |
299 | | - consume_usage=True, |
300 | | - ) |
301 | | - |
302 | | - |
303 | 344 | async def require_api_key_without_consumption( |
304 | 345 | api_key_header: Annotated[str, Header(alias="X-API-Key")], |
305 | 346 | authenticator: Annotated[APIKeyAuthenticator, Depends(get_authenticator)], |
@@ -405,7 +446,9 @@ async def call_gemini_with_retry(request: RequestBody) -> types.GenerateContentR |
405 | 446 | ) |
406 | 447 | except RETRYABLE_EXCEPTIONS as exc: |
407 | 448 | if attempt >= MAX_GEMINI_ATTEMPTS: |
408 | | - logger.exception("Gemini request failed after retries") |
| 449 | + logger.exception( |
| 450 | + "Gemini request failed after %d retries", MAX_GEMINI_ATTEMPTS |
| 451 | + ) |
409 | 452 | raise HTTPException( |
410 | 453 | status_code=status.HTTP_502_BAD_GATEWAY, |
411 | 454 | detail="Gemini is currently unavailable", |
@@ -444,25 +487,93 @@ async def health() -> dict[str, str]: |
444 | 487 | @router.post("/v1/grounding_with_search") |
445 | 488 | async def search( |
446 | 489 | request: RequestBody, |
447 | | - _: Annotated[APIKeyRecord, Depends(require_api_key)], |
| 490 | + record: Annotated[ |
| 491 | + APIKeyRecord, |
| 492 | + Depends(require_api_key_without_consumption), |
| 493 | + ], |
| 494 | + authenticator: Annotated[APIKeyAuthenticator, Depends(get_authenticator)], |
| 495 | + daily_usage: Annotated[ |
| 496 | + DailyUsageRepository, |
| 497 | + Depends(get_daily_usage_repository), |
| 498 | + ], |
448 | 499 | ) -> dict[str, object]: |
449 | 500 | """Proxy Gemini grounding requests with quota enforcement. |
450 | 501 |
|
451 | 502 | Parameters |
452 | 503 | ---------- |
453 | 504 | request : RequestBody |
454 | 505 | Payload describing the Gemini call. |
455 | | - _ : APIKeyRecord |
456 | | - API key record produced by ``require_api_key``. The underscore keeps |
457 | | - the dependency explicit without exposing it to callers. |
| 506 | + record : APIKeyRecord |
| 507 | + API key record produced by ``require_api_key``. |
| 508 | + authenticator : APIKeyAuthenticator |
| 509 | + Authenticator dependency used to roll back usage reservations on error. |
458 | 510 |
|
459 | 511 | Returns |
460 | 512 | ------- |
461 | | - google.genai.types.GenerateContentResponse |
462 | | - Response returned by the Gemini model. |
| 513 | + dict of str to object |
| 514 | + JSON serialisable response returned by the Gemini model. |
463 | 515 | """ |
464 | | - response = await call_gemini_with_retry(request) |
465 | | - logger.info("Gemini request completed for model %s", request.model) |
| 516 | + bucket, free_limit = _resolve_usage_bucket(request.model) |
| 517 | + consumed_api_quota = False |
| 518 | + reservation = await daily_usage.reserve(bucket, free_limit) |
| 519 | + |
| 520 | + if not reservation.consumed_free: |
| 521 | + try: |
| 522 | + updated_record = await authenticator.consume_usage(record.lookup_hash) |
| 523 | + except UsageLimitExceededError as exc: |
| 524 | + await daily_usage.release(reservation) |
| 525 | + raise HTTPException( |
| 526 | + status_code=status.HTTP_403_FORBIDDEN, |
| 527 | + detail="API key usage limit exceeded", |
| 528 | + ) from exc |
| 529 | + except InvalidAPIKeyError as exc: |
| 530 | + await daily_usage.release(reservation) |
| 531 | + raise HTTPException( |
| 532 | + status_code=status.HTTP_401_UNAUTHORIZED, |
| 533 | + detail="Invalid API key provided", |
| 534 | + ) from exc |
| 535 | + except InactiveAPIKeyError as exc: |
| 536 | + await daily_usage.release(reservation) |
| 537 | + raise HTTPException( |
| 538 | + status_code=status.HTTP_403_FORBIDDEN, |
| 539 | + detail="API key is inactive", |
| 540 | + ) from exc |
| 541 | + except ExpiredAPIKeyError as exc: |
| 542 | + await daily_usage.release(reservation) |
| 543 | + raise HTTPException( |
| 544 | + status_code=status.HTTP_403_FORBIDDEN, |
| 545 | + detail="API key has expired", |
| 546 | + ) from exc |
| 547 | + |
| 548 | + record = updated_record |
| 549 | + consumed_api_quota = True |
| 550 | + |
| 551 | + try: |
| 552 | + response = await call_gemini_with_retry(request) |
| 553 | + except Exception: |
| 554 | + try: |
| 555 | + await daily_usage.release(reservation) |
| 556 | + except Exception: # pragma: no cover - defensive logging for rollbacks |
| 557 | + logger.exception( |
| 558 | + "Failed to roll back daily usage for bucket %s", |
| 559 | + bucket, |
| 560 | + ) |
| 561 | + |
| 562 | + if consumed_api_quota: |
| 563 | + try: |
| 564 | + await authenticator.release_usage(record.lookup_hash) |
| 565 | + except Exception: # pragma: no cover - defensive logging for rollbacks |
| 566 | + logger.exception( |
| 567 | + "Failed to roll back usage for API key %s", record.lookup_hash |
| 568 | + ) |
| 569 | + raise |
| 570 | + |
| 571 | + logger.info( |
| 572 | + "Gemini request completed for model %s (bucket=%s, consumed_free=%s)", |
| 573 | + request.model, |
| 574 | + bucket, |
| 575 | + reservation.consumed_free if reservation else False, |
| 576 | + ) |
466 | 577 | return response.to_json_dict() |
467 | 578 |
|
468 | 579 |
|
|
0 commit comments