refactor: lifespan handlers, module-level imports, bounded scope cache
Replace deprecated @app.on_event startup/shutdown handlers with a FastAPI lifespan context manager, move the inline hashlib/time imports in the auth middleware to module top, and back the unbounded _api_scope_cache with a new size- and TTL-bounded BoundedTTLCache utility. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,71 @@
|
||||
"""Bounded, TTL-based in-memory caches with size eviction.
|
||||
|
||||
Provides a small dependency-free cache used by the auth middleware and the
|
||||
per-user authorization layer. Entries expire after a TTL and the cache is
|
||||
bounded by a maximum size to prevent unbounded memory growth from untrusted
|
||||
key cardinality (e.g. one entry per distinct token or per (user, repo) pair).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from collections import OrderedDict
|
||||
from typing import Generic, TypeVar
|
||||
|
||||
K = TypeVar("K")
|
||||
V = TypeVar("V")
|
||||
|
||||
|
||||
class BoundedTTLCache(Generic[K, V]):
|
||||
"""A size-bounded cache whose entries expire after a fixed TTL.
|
||||
|
||||
Eviction is least-recently-inserted (FIFO) once ``max_size`` is reached.
|
||||
Expired entries are removed lazily on access and proactively when the
|
||||
cache is full, so the cache never exceeds ``max_size`` live entries.
|
||||
"""
|
||||
|
||||
def __init__(self, *, ttl_seconds: float, max_size: int = 1024) -> None:
|
||||
"""Initialize the cache with a TTL and maximum entry count."""
|
||||
if ttl_seconds <= 0:
|
||||
raise ValueError("ttl_seconds must be positive")
|
||||
if max_size <= 0:
|
||||
raise ValueError("max_size must be positive")
|
||||
self._ttl = float(ttl_seconds)
|
||||
self._max_size = int(max_size)
|
||||
self._store: OrderedDict[K, tuple[V, float]] = OrderedDict()
|
||||
|
||||
def get(self, key: K) -> V | None:
|
||||
"""Return the cached value for ``key`` or ``None`` if absent/expired."""
|
||||
entry = self._store.get(key)
|
||||
if entry is None:
|
||||
return None
|
||||
value, expiry = entry
|
||||
if time.monotonic() >= expiry:
|
||||
# Lazily evict expired entry.
|
||||
self._store.pop(key, None)
|
||||
return None
|
||||
return value
|
||||
|
||||
def set(self, key: K, value: V) -> None:
|
||||
"""Store ``value`` under ``key`` with the configured TTL."""
|
||||
now = time.monotonic()
|
||||
# Drop the existing entry so reinsertion refreshes ordering.
|
||||
self._store.pop(key, None)
|
||||
self._store[key] = (value, now + self._ttl)
|
||||
self._evict(now)
|
||||
|
||||
def _evict(self, now: float) -> None:
|
||||
"""Remove expired entries, then enforce the size bound (FIFO)."""
|
||||
expired = [key for key, (_, expiry) in self._store.items() if now >= expiry]
|
||||
for key in expired:
|
||||
self._store.pop(key, None)
|
||||
while len(self._store) > self._max_size:
|
||||
self._store.popitem(last=False)
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Remove all entries (primarily for tests)."""
|
||||
self._store.clear()
|
||||
|
||||
def __len__(self) -> int:
|
||||
"""Return the number of stored (not necessarily live) entries."""
|
||||
return len(self._store)
|
||||
Reference in New Issue
Block a user