openaitestclip / handler.py
finhdev's picture
Update handler.py
270502a verified
# handler.py
import io, base64, time, torch
from PIL import Image
from transformers import CLIPModel, CLIPProcessor
class EndpointHandler:
def __init__(self, path=""):
self.model = CLIPModel.from_pretrained(path)
self.processor = CLIPProcessor.from_pretrained(path)
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.model.to(self.device).eval()
self.cache: dict[str, torch.Tensor] = {}
# -------------------------------------------------------
def __call__(self, data):
T = {} # timing dict
t0 = time.perf_counter()
payload = data.get("inputs", data)
img_b64 = payload["image"]
prompts = payload["candidate_labels"]
# β€”β€” text embeddings (cache) β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
t = time.perf_counter()
missing = [p for p in prompts if p not in self.cache]
if missing:
tok = self.processor(text=missing, return_tensors="pt",
padding=True).to(self.device)
with torch.no_grad():
emb = self.model.get_text_features(**tok)
emb = emb / emb.norm(dim=-1, keepdim=True)
for p, e in zip(missing, emb):
self.cache[p] = e
txt_feat = torch.stack([self.cache[p] for p in prompts])
T["encode_text"] = (time.perf_counter() - t) * 1000 # ms
# β€”β€” image preprocessing β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
t = time.perf_counter()
img = Image.open(io.BytesIO(base64.b64decode(img_b64))).convert("RGB")
img_in = self.processor(images=img, return_tensors="pt").to(self.device)
T["decode_resize"] = (time.perf_counter() - t) * 1000
# β€”β€” image embedding β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
t = time.perf_counter()
with torch.no_grad(), torch.cuda.amp.autocast():
img_feat = self.model.get_image_features(**img_in)
img_feat = img_feat / img_feat.norm(dim=-1, keepdim=True)
img_feat = img_feat.float(); txt_feat = txt_feat.float()
T["encode_image"] = (time.perf_counter() - t) * 1000
# β€”β€” similarity & softmax β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
t = time.perf_counter()
probs = (100 * img_feat @ txt_feat.T).softmax(dim=-1)[0].tolist()
T["similarity_softmax"] = (time.perf_counter() - t) * 1000
# β€”β€” log timings β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
total = (time.perf_counter() - t0) * 1000
print(f"[CLIP timings] total={total:.1f}β€―ms | " +
" | ".join(f"{k}={v:.1f}" for k, v in T.items()),
flush=True)
# β€”β€” build response β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
return [
{"label": p, "score": float(s)}
for p, s in sorted(zip(prompts, probs), key=lambda x: x[1], reverse=True)
]
# import io, base64, torch
# from PIL import Image
# from transformers import CLIPModel, CLIPProcessor
# class EndpointHandler:
# """
# CLIP ViT‑L/14 zero‑shot classifier.
# Expects JSON: {
# "inputs": {
# "image": "<base64>",
# "candidate_labels": ["prompt‑1", "prompt‑2", ...]
# }
# }
# """
# def __init__(self, path=""):
# self.model = CLIPModel.from_pretrained(path)
# self.processor = CLIPProcessor.from_pretrained(path)
# self.device = "cuda" if torch.cuda.is_available() else "cpu"
# self.model.to(self.device).eval()
# self.cache: dict[str, torch.Tensor] = {} # prompt -> emb
# def __call__(self, data):
# payload = data.get("inputs", data)
# img_b64 = payload["image"]
# prompts = payload.get("candidate_labels", [])
# if not prompts:
# return {"error": "candidate_labels list is empty"}
# # --- text embeddings with per‑process cache ----------
# missing = [p for p in prompts if p not in self.cache]
# if missing:
# tok = self.processor(text=missing, return_tensors="pt",
# padding=True).to(self.device)
# with torch.no_grad():
# emb = self.model.get_text_features(**tok)
# emb = emb / emb.norm(dim=-1, keepdim=True)
# for p, e in zip(missing, emb):
# self.cache[p] = e
# txt_feat = torch.stack([self.cache[p] for p in prompts])
# # --- image embedding ---------------------------------
# img = Image.open(io.BytesIO(base64.b64decode(img_b64))).convert("RGB")
# img_in = self.processor(images=img, return_tensors="pt").to(self.device)
# with torch.no_grad(), torch.cuda.amp.autocast():
# img_feat = self.model.get_image_features(**img_in)
# img_feat = img_feat / img_feat.norm(dim=-1, keepdim=True)
# # txt_feat = txt_feat / txt_feat.norm(dim=-1, keepdim=True)
# img_feat = img_feat.float() # ← add these two lines
# txt_feat = txt_feat.float() # ←
# probs = (100 * img_feat @ txt_feat.T).softmax(dim=-1)[0].tolist()
# return [
# {"label": p, "score": float(s)}
# for p, s in sorted(zip(prompts, probs), key=lambda x: x[1], reverse=True)
# ]