| # handler.py | |
| import io, base64, time, torch | |
| from PIL import Image | |
| from transformers import CLIPModel, CLIPProcessor | |
| class EndpointHandler: | |
| def __init__(self, path=""): | |
| self.model = CLIPModel.from_pretrained(path) | |
| self.processor = CLIPProcessor.from_pretrained(path) | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.model.to(self.device).eval() | |
| self.cache: dict[str, torch.Tensor] = {} | |
| # ------------------------------------------------------- | |
| def __call__(self, data): | |
| T = {} # timing dict | |
| t0 = time.perf_counter() | |
| payload = data.get("inputs", data) | |
| img_b64 = payload["image"] | |
| prompts = payload["candidate_labels"] | |
| # ββ text embeddings (cache) ββββββββββββββββ | |
| t = time.perf_counter() | |
| missing = [p for p in prompts if p not in self.cache] | |
| if missing: | |
| tok = self.processor(text=missing, return_tensors="pt", | |
| padding=True).to(self.device) | |
| with torch.no_grad(): | |
| emb = self.model.get_text_features(**tok) | |
| emb = emb / emb.norm(dim=-1, keepdim=True) | |
| for p, e in zip(missing, emb): | |
| self.cache[p] = e | |
| txt_feat = torch.stack([self.cache[p] for p in prompts]) | |
| T["encode_text"] = (time.perf_counter() - t) * 1000 # ms | |
| # ββ image preprocessing ββββββββββββββββ | |
| t = time.perf_counter() | |
| img = Image.open(io.BytesIO(base64.b64decode(img_b64))).convert("RGB") | |
| img_in = self.processor(images=img, return_tensors="pt").to(self.device) | |
| T["decode_resize"] = (time.perf_counter() - t) * 1000 | |
| # ββ image embedding ββββββββββββββββ | |
| t = time.perf_counter() | |
| with torch.no_grad(), torch.cuda.amp.autocast(): | |
| img_feat = self.model.get_image_features(**img_in) | |
| img_feat = img_feat / img_feat.norm(dim=-1, keepdim=True) | |
| img_feat = img_feat.float(); txt_feat = txt_feat.float() | |
| T["encode_image"] = (time.perf_counter() - t) * 1000 | |
| # ββ similarity & softmax ββββββββββββββββ | |
| t = time.perf_counter() | |
| probs = (100 * img_feat @ txt_feat.T).softmax(dim=-1)[0].tolist() | |
| T["similarity_softmax"] = (time.perf_counter() - t) * 1000 | |
| # ββ log timings ββββββββββββββββ | |
| total = (time.perf_counter() - t0) * 1000 | |
| print(f"[CLIP timings] total={total:.1f}β―ms | " + | |
| " | ".join(f"{k}={v:.1f}" for k, v in T.items()), | |
| flush=True) | |
| # ββ build response ββββββββββββββββ | |
| return [ | |
| {"label": p, "score": float(s)} | |
| for p, s in sorted(zip(prompts, probs), key=lambda x: x[1], reverse=True) | |
| ] | |
| # import io, base64, torch | |
| # from PIL import Image | |
| # from transformers import CLIPModel, CLIPProcessor | |
| # class EndpointHandler: | |
| # """ | |
| # CLIP ViTβL/14 zeroβshot classifier. | |
| # Expects JSON: { | |
| # "inputs": { | |
| # "image": "<base64>", | |
| # "candidate_labels": ["promptβ1", "promptβ2", ...] | |
| # } | |
| # } | |
| # """ | |
| # def __init__(self, path=""): | |
| # self.model = CLIPModel.from_pretrained(path) | |
| # self.processor = CLIPProcessor.from_pretrained(path) | |
| # self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # self.model.to(self.device).eval() | |
| # self.cache: dict[str, torch.Tensor] = {} # prompt -> emb | |
| # def __call__(self, data): | |
| # payload = data.get("inputs", data) | |
| # img_b64 = payload["image"] | |
| # prompts = payload.get("candidate_labels", []) | |
| # if not prompts: | |
| # return {"error": "candidate_labels list is empty"} | |
| # # --- text embeddings with perβprocess cache ---------- | |
| # missing = [p for p in prompts if p not in self.cache] | |
| # if missing: | |
| # tok = self.processor(text=missing, return_tensors="pt", | |
| # padding=True).to(self.device) | |
| # with torch.no_grad(): | |
| # emb = self.model.get_text_features(**tok) | |
| # emb = emb / emb.norm(dim=-1, keepdim=True) | |
| # for p, e in zip(missing, emb): | |
| # self.cache[p] = e | |
| # txt_feat = torch.stack([self.cache[p] for p in prompts]) | |
| # # --- image embedding --------------------------------- | |
| # img = Image.open(io.BytesIO(base64.b64decode(img_b64))).convert("RGB") | |
| # img_in = self.processor(images=img, return_tensors="pt").to(self.device) | |
| # with torch.no_grad(), torch.cuda.amp.autocast(): | |
| # img_feat = self.model.get_image_features(**img_in) | |
| # img_feat = img_feat / img_feat.norm(dim=-1, keepdim=True) | |
| # # txt_feat = txt_feat / txt_feat.norm(dim=-1, keepdim=True) | |
| # img_feat = img_feat.float() # β add these two lines | |
| # txt_feat = txt_feat.float() # β | |
| # probs = (100 * img_feat @ txt_feat.T).softmax(dim=-1)[0].tolist() | |
| # return [ | |
| # {"label": p, "score": float(s)} | |
| # for p, s in sorted(zip(prompts, probs), key=lambda x: x[1], reverse=True) | |
| # ] | |