import os from typing import Any, List, Mapping, Optional from datetime import datetime _MODEL_LOGS: List[str] = [] def log(message: str) -> None: print(f"[WORDS2CSV] {message}") def log_debug(message: str) -> None: if os.getenv("WORDS2CSV_DEBUG"): print(f"[WORDS2CSV-DEBUG] {message}") def _log_model_response( *, model_name: str, content: str, duration: float, usage: Optional[Any] = None, pricing: Optional[Mapping[str, Mapping[str, float]]] = None, default_pricing_key: str = "default", ) -> Optional[float]: """Log model usage, cost (if pricing and usage are provided), and response details. Returns the calculated cost if pricing and usage are provided, otherwise None. """ global _MODEL_LOGS cost: Optional[float] = None if usage is not None and pricing is not None: # Normalize different provider usage formats into a common shape. prompt_tokens: Optional[float] = None completion_tokens: Optional[float] = None total_tokens: Optional[float] = None # Attribute-style access (e.g. OpenAI usage) if hasattr(usage, "prompt_tokens") and hasattr(usage, "completion_tokens"): prompt_tokens = float(getattr(usage, "prompt_tokens")) completion_tokens = float(getattr(usage, "completion_tokens")) total_tokens = float(getattr(usage, "total_tokens", prompt_tokens + completion_tokens)) # Gemini GenerateContentResponseUsageMetadata-style elif hasattr(usage, "prompt_token_count") and hasattr(usage, "candidates_token_count"): prompt_tokens = float(getattr(usage, "prompt_token_count")) completion_tokens = float(getattr(usage, "candidates_token_count")) total_tokens = float(getattr(usage, "total_token_count", prompt_tokens + completion_tokens)) # dict-style access fallback elif isinstance(usage, Mapping): if {"prompt_tokens", "completion_tokens"}.issubset(usage.keys()): prompt_tokens = float(usage["prompt_tokens"]) completion_tokens = float(usage["completion_tokens"]) total_tokens = float(usage.get("total_tokens", prompt_tokens + completion_tokens)) elif {"prompt_token_count", "candidates_token_count"}.issubset(usage.keys()): prompt_tokens = float(usage["prompt_token_count"]) completion_tokens = float(usage["candidates_token_count"]) total_tokens = float(usage.get("total_token_count", prompt_tokens + completion_tokens)) if prompt_tokens is not None and completion_tokens is not None and total_tokens is not None: pricing_row = pricing.get(model_name, pricing[default_pricing_key]) input_cost = (prompt_tokens / 1_000_000) * pricing_row["input"] output_cost = (completion_tokens / 1_000_000) * pricing_row["output"] cost = input_cost + output_cost log(f"Model: {model_name}") log( "Token usage: Input={prompt}, Output={completion}, Total={total}".format( prompt=int(prompt_tokens), completion=int(completion_tokens), total=int(total_tokens) ) ) log(f"Estimated cost: ${cost:.6f}") log(f"Execution time: {duration:.3f} seconds") else: # Usage provided but in an unknown format – still log basic info. log(f"Model: {model_name}") log(f"Execution time: {duration:.3f} seconds") log_debug(f"Unrecognized usage format: {usage!r}") else: log(f"Model: {model_name}") log(f"Execution time: {duration:.3f} seconds") log("Model response received") # Store latest model log in a simple CSV-like line: timestamp, model name, duration, cost timestamp = datetime.now().isoformat(timespec="seconds") duration_s = float(duration) cost_value = float(cost) if cost is not None else 0.0 line = f"{timestamp}, {model_name}, {duration_s:.3f} seconds, ${cost_value:.6f}" _MODEL_LOGS.append(line) log_debug(f"Response length: {len(content)} characters") log_debug(f"Result: {content}") log_debug("End of result") return cost def get_latest_model_log() -> Optional[str]: """Return full accumulated model logs as a single string, or None if empty.""" if not _MODEL_LOGS: return None return "\n".join(_MODEL_LOGS)