File size: 5,113 Bytes
938949f 0a569be 938949f 0a569be 938949f 0a569be 938949f 0a569be 938949f 0a569be 938949f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 | """
Thin Redis wrapper for cross-process caching (Upstash Redis REST API).
Falls back gracefully to ``None`` returns when Redis is unavailable,
so callers can use in-memory TTLCache as a fallback.
Usage::
from src.data.redis_cache import get_redis
redis = get_redis() # None if no UPSTASH_REDIS_URL
if redis:
redis.set_json("weather:current", data, ttl=1800)
cached = redis.get_json("weather:current")
"""
from __future__ import annotations
import json
import logging
import os
import threading
from typing import Any, Optional
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Singleton (thread-safe)
# ---------------------------------------------------------------------------
_instance: Optional["RedisCache"] = None
_init_failed: bool = False
_lock = threading.Lock()
def get_redis() -> Optional["RedisCache"]:
"""Return the global RedisCache instance, or *None* if not configured."""
global _instance, _init_failed
# Fast path (no lock)
if _instance is not None:
return _instance
if _init_failed:
return None
url = os.environ.get("UPSTASH_REDIS_URL")
token = os.environ.get("UPSTASH_REDIS_TOKEN")
if not url or not token:
log.debug("Redis not configured (UPSTASH_REDIS_URL / UPSTASH_REDIS_TOKEN missing)")
return None
with _lock:
# Double-check after acquiring lock
if _instance is not None:
return _instance
if _init_failed:
return None
try:
_instance = RedisCache(url=url, token=token)
log.info("Redis connected: %s", url.split("@")[-1] if "@" in url else url[:40])
return _instance
except Exception as exc:
_init_failed = True
log.error("Redis init failed (will not retry): %s", exc)
return None
# ---------------------------------------------------------------------------
# RedisCache (Upstash REST)
# ---------------------------------------------------------------------------
class RedisCache:
"""Minimal Redis cache using the Upstash REST API (no native driver needed)."""
def __init__(self, url: str, token: str):
self._url = url.rstrip("/")
self._headers = {"Authorization": f"Bearer {token}"}
# Lazy import — requests is already a project dependency
import requests as _req
self._req = _req
# Connectivity check
resp = self._req.get(f"{self._url}/ping", headers=self._headers, timeout=5)
resp.raise_for_status()
# -- JSON helpers -------------------------------------------------------
def get_json(self, key: str) -> Optional[Any]:
"""Retrieve and JSON-decode a key. Returns None on miss or error."""
try:
resp = self._req.get(
f"{self._url}/get/{key}",
headers=self._headers,
timeout=5,
)
resp.raise_for_status()
result = resp.json().get("result")
if result is None:
return None
return json.loads(result)
except Exception as exc:
log.debug("Redis GET %s failed: %s", key, exc)
return None
def set_json(self, key: str, value: Any, ttl: int = 300) -> bool:
"""JSON-encode and store *value* with a TTL in seconds."""
try:
payload = json.dumps(value, default=str)
# Upstash REST API: POST pipeline format
resp = self._req.post(
f"{self._url}/pipeline",
headers={**self._headers, "Content-Type": "application/json"},
json=[["SET", key, payload, "EX", str(ttl)]],
timeout=5,
)
resp.raise_for_status()
return True
except Exception as exc:
log.debug("Redis SET %s failed: %s", key, exc)
return False
def delete(self, key: str) -> bool:
"""Delete a key."""
try:
resp = self._req.get(
f"{self._url}/del/{key}",
headers=self._headers,
timeout=5,
)
resp.raise_for_status()
return True
except Exception as exc:
log.debug("Redis DEL %s failed: %s", key, exc)
return False
def exists(self, key: str) -> bool:
"""Check if a key exists."""
try:
resp = self._req.get(
f"{self._url}/exists/{key}",
headers=self._headers,
timeout=5,
)
resp.raise_for_status()
return resp.json().get("result", 0) == 1
except Exception:
return False
def ping(self) -> bool:
"""Health check."""
try:
resp = self._req.get(
f"{self._url}/ping",
headers=self._headers,
timeout=5,
)
return resp.status_code == 200
except Exception:
return False
|