File size: 5,113 Bytes
938949f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0a569be
938949f
 
 
 
 
0a569be
938949f
 
 
 
0a569be
 
938949f
 
 
 
 
 
 
 
 
 
 
0a569be
 
938949f
 
 
 
 
0a569be
 
938949f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""
Thin Redis wrapper for cross-process caching (Upstash Redis REST API).

Falls back gracefully to ``None`` returns when Redis is unavailable,
so callers can use in-memory TTLCache as a fallback.

Usage::

    from src.data.redis_cache import get_redis

    redis = get_redis()            # None if no UPSTASH_REDIS_URL
    if redis:
        redis.set_json("weather:current", data, ttl=1800)
        cached = redis.get_json("weather:current")
"""

from __future__ import annotations

import json
import logging
import os
import threading
from typing import Any, Optional

log = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Singleton (thread-safe)
# ---------------------------------------------------------------------------

_instance: Optional["RedisCache"] = None
_init_failed: bool = False
_lock = threading.Lock()


def get_redis() -> Optional["RedisCache"]:
    """Return the global RedisCache instance, or *None* if not configured."""
    global _instance, _init_failed

    # Fast path (no lock)
    if _instance is not None:
        return _instance
    if _init_failed:
        return None

    url = os.environ.get("UPSTASH_REDIS_URL")
    token = os.environ.get("UPSTASH_REDIS_TOKEN")
    if not url or not token:
        log.debug("Redis not configured (UPSTASH_REDIS_URL / UPSTASH_REDIS_TOKEN missing)")
        return None

    with _lock:
        # Double-check after acquiring lock
        if _instance is not None:
            return _instance
        if _init_failed:
            return None
        try:
            _instance = RedisCache(url=url, token=token)
            log.info("Redis connected: %s", url.split("@")[-1] if "@" in url else url[:40])
            return _instance
        except Exception as exc:
            _init_failed = True
            log.error("Redis init failed (will not retry): %s", exc)
            return None


# ---------------------------------------------------------------------------
# RedisCache (Upstash REST)
# ---------------------------------------------------------------------------

class RedisCache:
    """Minimal Redis cache using the Upstash REST API (no native driver needed)."""

    def __init__(self, url: str, token: str):
        self._url = url.rstrip("/")
        self._headers = {"Authorization": f"Bearer {token}"}
        # Lazy import — requests is already a project dependency
        import requests as _req
        self._req = _req
        # Connectivity check
        resp = self._req.get(f"{self._url}/ping", headers=self._headers, timeout=5)
        resp.raise_for_status()

    # -- JSON helpers -------------------------------------------------------

    def get_json(self, key: str) -> Optional[Any]:
        """Retrieve and JSON-decode a key. Returns None on miss or error."""
        try:
            resp = self._req.get(
                f"{self._url}/get/{key}",
                headers=self._headers,
                timeout=5,
            )
            resp.raise_for_status()
            result = resp.json().get("result")
            if result is None:
                return None
            return json.loads(result)
        except Exception as exc:
            log.debug("Redis GET %s failed: %s", key, exc)
            return None

    def set_json(self, key: str, value: Any, ttl: int = 300) -> bool:
        """JSON-encode and store *value* with a TTL in seconds."""
        try:
            payload = json.dumps(value, default=str)
            # Upstash REST API: POST pipeline format
            resp = self._req.post(
                f"{self._url}/pipeline",
                headers={**self._headers, "Content-Type": "application/json"},
                json=[["SET", key, payload, "EX", str(ttl)]],
                timeout=5,
            )
            resp.raise_for_status()
            return True
        except Exception as exc:
            log.debug("Redis SET %s failed: %s", key, exc)
            return False

    def delete(self, key: str) -> bool:
        """Delete a key."""
        try:
            resp = self._req.get(
                f"{self._url}/del/{key}",
                headers=self._headers,
                timeout=5,
            )
            resp.raise_for_status()
            return True
        except Exception as exc:
            log.debug("Redis DEL %s failed: %s", key, exc)
            return False

    def exists(self, key: str) -> bool:
        """Check if a key exists."""
        try:
            resp = self._req.get(
                f"{self._url}/exists/{key}",
                headers=self._headers,
                timeout=5,
            )
            resp.raise_for_status()
            return resp.json().get("result", 0) == 1
        except Exception:
            return False

    def ping(self) -> bool:
        """Health check."""
        try:
            resp = self._req.get(
                f"{self._url}/ping",
                headers=self._headers,
                timeout=5,
            )
            return resp.status_code == 200
        except Exception:
            return False