| """ |
| Thin Redis wrapper for cross-process caching (Upstash Redis REST API). |
| |
| Falls back gracefully to ``None`` returns when Redis is unavailable, |
| so callers can use in-memory TTLCache as a fallback. |
| |
| Usage:: |
| |
| from src.data.redis_cache import get_redis |
| |
| redis = get_redis() # None if no UPSTASH_REDIS_URL |
| if redis: |
| redis.set_json("weather:current", data, ttl=1800) |
| cached = redis.get_json("weather:current") |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import os |
| import threading |
| from typing import Any, Optional |
|
|
| log = logging.getLogger(__name__) |
|
|
| |
| |
| |
|
|
| _instance: Optional["RedisCache"] = None |
| _init_failed: bool = False |
| _lock = threading.Lock() |
|
|
|
|
| def get_redis() -> Optional["RedisCache"]: |
| """Return the global RedisCache instance, or *None* if not configured.""" |
| global _instance, _init_failed |
|
|
| |
| if _instance is not None: |
| return _instance |
| if _init_failed: |
| return None |
|
|
| url = os.environ.get("UPSTASH_REDIS_URL") |
| token = os.environ.get("UPSTASH_REDIS_TOKEN") |
| if not url or not token: |
| log.debug("Redis not configured (UPSTASH_REDIS_URL / UPSTASH_REDIS_TOKEN missing)") |
| return None |
|
|
| with _lock: |
| |
| if _instance is not None: |
| return _instance |
| if _init_failed: |
| return None |
| try: |
| _instance = RedisCache(url=url, token=token) |
| log.info("Redis connected: %s", url.split("@")[-1] if "@" in url else url[:40]) |
| return _instance |
| except Exception as exc: |
| _init_failed = True |
| log.error("Redis init failed (will not retry): %s", exc) |
| return None |
|
|
|
|
| |
| |
| |
|
|
| class RedisCache: |
| """Minimal Redis cache using the Upstash REST API (no native driver needed).""" |
|
|
| def __init__(self, url: str, token: str): |
| self._url = url.rstrip("/") |
| self._headers = {"Authorization": f"Bearer {token}"} |
| |
| import requests as _req |
| self._req = _req |
| |
| resp = self._req.get(f"{self._url}/ping", headers=self._headers, timeout=5) |
| resp.raise_for_status() |
|
|
| |
|
|
| def get_json(self, key: str) -> Optional[Any]: |
| """Retrieve and JSON-decode a key. Returns None on miss or error.""" |
| try: |
| resp = self._req.get( |
| f"{self._url}/get/{key}", |
| headers=self._headers, |
| timeout=5, |
| ) |
| resp.raise_for_status() |
| result = resp.json().get("result") |
| if result is None: |
| return None |
| return json.loads(result) |
| except Exception as exc: |
| log.debug("Redis GET %s failed: %s", key, exc) |
| return None |
|
|
| def set_json(self, key: str, value: Any, ttl: int = 300) -> bool: |
| """JSON-encode and store *value* with a TTL in seconds.""" |
| try: |
| payload = json.dumps(value, default=str) |
| |
| resp = self._req.post( |
| f"{self._url}/pipeline", |
| headers={**self._headers, "Content-Type": "application/json"}, |
| json=[["SET", key, payload, "EX", str(ttl)]], |
| timeout=5, |
| ) |
| resp.raise_for_status() |
| return True |
| except Exception as exc: |
| log.debug("Redis SET %s failed: %s", key, exc) |
| return False |
|
|
| def delete(self, key: str) -> bool: |
| """Delete a key.""" |
| try: |
| resp = self._req.get( |
| f"{self._url}/del/{key}", |
| headers=self._headers, |
| timeout=5, |
| ) |
| resp.raise_for_status() |
| return True |
| except Exception as exc: |
| log.debug("Redis DEL %s failed: %s", key, exc) |
| return False |
|
|
| def exists(self, key: str) -> bool: |
| """Check if a key exists.""" |
| try: |
| resp = self._req.get( |
| f"{self._url}/exists/{key}", |
| headers=self._headers, |
| timeout=5, |
| ) |
| resp.raise_for_status() |
| return resp.json().get("result", 0) == 1 |
| except Exception: |
| return False |
|
|
| def ping(self) -> bool: |
| """Health check.""" |
| try: |
| resp = self._req.get( |
| f"{self._url}/ping", |
| headers=self._headers, |
| timeout=5, |
| ) |
| return resp.status_code == 200 |
| except Exception: |
| return False |
|
|