""" Thin Redis wrapper for cross-process caching (Upstash Redis REST API). Falls back gracefully to ``None`` returns when Redis is unavailable, so callers can use in-memory TTLCache as a fallback. Usage:: from src.data.redis_cache import get_redis redis = get_redis() # None if no UPSTASH_REDIS_URL if redis: redis.set_json("weather:current", data, ttl=1800) cached = redis.get_json("weather:current") """ from __future__ import annotations import json import logging import os import threading from typing import Any, Optional log = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Singleton (thread-safe) # --------------------------------------------------------------------------- _instance: Optional["RedisCache"] = None _init_failed: bool = False _lock = threading.Lock() def get_redis() -> Optional["RedisCache"]: """Return the global RedisCache instance, or *None* if not configured.""" global _instance, _init_failed # Fast path (no lock) if _instance is not None: return _instance if _init_failed: return None url = os.environ.get("UPSTASH_REDIS_URL") token = os.environ.get("UPSTASH_REDIS_TOKEN") if not url or not token: log.debug("Redis not configured (UPSTASH_REDIS_URL / UPSTASH_REDIS_TOKEN missing)") return None with _lock: # Double-check after acquiring lock if _instance is not None: return _instance if _init_failed: return None try: _instance = RedisCache(url=url, token=token) log.info("Redis connected: %s", url.split("@")[-1] if "@" in url else url[:40]) return _instance except Exception as exc: _init_failed = True log.error("Redis init failed (will not retry): %s", exc) return None # --------------------------------------------------------------------------- # RedisCache (Upstash REST) # --------------------------------------------------------------------------- class RedisCache: """Minimal Redis cache using the Upstash REST API (no native driver needed).""" def __init__(self, url: str, token: str): self._url = url.rstrip("/") self._headers = {"Authorization": f"Bearer {token}"} # Lazy import — requests is already a project dependency import requests as _req self._req = _req # Connectivity check resp = self._req.get(f"{self._url}/ping", headers=self._headers, timeout=5) resp.raise_for_status() # -- JSON helpers ------------------------------------------------------- def get_json(self, key: str) -> Optional[Any]: """Retrieve and JSON-decode a key. Returns None on miss or error.""" try: resp = self._req.get( f"{self._url}/get/{key}", headers=self._headers, timeout=5, ) resp.raise_for_status() result = resp.json().get("result") if result is None: return None return json.loads(result) except Exception as exc: log.debug("Redis GET %s failed: %s", key, exc) return None def set_json(self, key: str, value: Any, ttl: int = 300) -> bool: """JSON-encode and store *value* with a TTL in seconds.""" try: payload = json.dumps(value, default=str) # Upstash REST API: POST pipeline format resp = self._req.post( f"{self._url}/pipeline", headers={**self._headers, "Content-Type": "application/json"}, json=[["SET", key, payload, "EX", str(ttl)]], timeout=5, ) resp.raise_for_status() return True except Exception as exc: log.debug("Redis SET %s failed: %s", key, exc) return False def delete(self, key: str) -> bool: """Delete a key.""" try: resp = self._req.get( f"{self._url}/del/{key}", headers=self._headers, timeout=5, ) resp.raise_for_status() return True except Exception as exc: log.debug("Redis DEL %s failed: %s", key, exc) return False def exists(self, key: str) -> bool: """Check if a key exists.""" try: resp = self._req.get( f"{self._url}/exists/{key}", headers=self._headers, timeout=5, ) resp.raise_for_status() return resp.json().get("result", 0) == 1 except Exception: return False def ping(self) -> bool: """Health check.""" try: resp = self._req.get( f"{self._url}/ping", headers=self._headers, timeout=5, ) return resp.status_code == 200 except Exception: return False