| from __future__ import annotations |
|
|
| import os |
| import json |
| import base64 |
| import tldextract |
| import tempfile |
| from urllib.parse import urlparse |
| from langchain_tavily import TavilyExtract |
| from youtube_transcript_api import YouTubeTranscriptApi |
| import io |
| import pandas as pd |
| from typing import List, Optional, Dict, Any |
| from datetime import datetime |
| from PIL import Image, ImageStat, ExifTags |
| import google.generativeai as genai |
| from langchain_core.tools import tool |
| from langchain_community.tools.tavily_search import TavilySearchResults |
| from langchain_community.document_loaders import ArxivLoader |
| from langchain_community.document_loaders import WikipediaLoader |
| from PIL import ImageDraw, ImageFont, ImageEnhance, ImageFilter |
| from langchain_community.tools.tavily_search import TavilySearchResults |
| from src.utils.image_processing import * |
| import re |
|
|
| def _exif_dict(img: Image.Image) -> dict: |
| try: |
| exif = img._getexif() or {} |
| out = {} |
| for k, v in exif.items(): |
| tag = ExifTags.TAGS.get(k, str(k)) |
| out[tag] = v if isinstance(v, (int, float, str)) else str(v) |
| return out |
| except Exception: |
| return {} |
|
|
| def _clip(text: str | None, n: int) -> str: |
| """Утилита: безопасно обрезаем длинные сниппеты.""" |
| if not text: |
| return "" |
| text = text.strip() |
| return (text[: n - 1] + "…") if len(text) > n else text |
|
|
|
|
|
|
| def _parse_dt(v) -> Optional[str]: |
| """[ИЗМЕНЕНИЕ] Приводим даты к ISO-строке, если возможно.""" |
| try: |
| if isinstance(v, datetime): |
| return v.isoformat() |
| if isinstance(v, str) and v: |
| |
| return v |
| except Exception: |
| pass |
| return None |
|
|
| def _read_text_best_effort(path: str, max_chars: int) -> tuple[str, str]: |
| |
| try: |
| with open(path, "r", encoding="utf-8") as f: |
| s = f.read() |
| return s[:max_chars], "utf-8" |
| except Exception: |
| with open(path, "r", encoding="latin-1", errors="replace") as f: |
| s = f.read() |
| return s[:max_chars], "latin-1" |
|
|
| |
| def preprocess_files(files: List[str]) -> Dict[str, Dict[str, Any]]: |
| """Анализирует файлы и возвращает их метаданные""" |
| file_info = {} |
| |
| for file_path in files: |
| if not os.path.exists(file_path): |
| print(f"Warning: File {file_path} not found") |
| continue |
| |
| file_ext = os.path.splitext(file_path)[1].lower() |
| file_size = os.path.getsize(file_path) |
| |
| info = { |
| "path": file_path, |
| "extension": file_ext, |
| "size": file_size, |
| "type": None, |
| "suggested_tool": None, |
| "preview": None |
| } |
| |
| |
| if file_ext in ['.csv']: |
| info["type"] = "table" |
| info["suggested_tool"] = "analyze_csv_file" |
| elif file_ext in ['.xlsx', '.xls']: |
| info["type"] = "excel" |
| info["suggested_tool"] = "analyze_excel_file" |
| elif file_ext in ['.pdf']: |
| info["type"] = "document" |
| info["suggested_tool"] = "analyze_pdf_file" |
| elif file_ext in ['.docx', '.doc']: |
| info["type"] = "document" |
| info["suggested_tool"] = "analyze_docx_file" |
| elif file_ext in ['.txt', '.md']: |
| info["type"] = "text" |
| info["suggested_tool"] = "analyze_txt_file" |
| elif file_ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']: |
| info["type"] = "image" |
| info["suggested_tool"] = "vision_qa_gemma" |
| elif file_ext in [".mp3"]: |
| info["type"] = "audio" |
| info["suggested_tool"] = "transcribe_audio" |
| elif file_ext in [".mp4"]: |
| info["type"] = "video" |
| info["suggested_tool"] = "video_qa_gemma" |
| else: |
| info["type"] = "unknown" |
| info["suggested_tool"] = "analyze_txt_file (fallback)" |
| |
| |
| if file_ext == '.txt' and file_size < 1000: |
| try: |
| with open(file_path, 'r', encoding='utf-8') as f: |
| content = f.read() |
| info["preview"] = content[:200] + "..." if len(content) > 200 else content |
| except Exception as e: |
| info["preview"] = f"Error reading file: {e}" |
| |
| file_info[file_path] = info |
| |
| return file_info |
|
|
| |
|
|
|
|
| |
|
|
| @tool |
| def wiki_search( |
| query: str, |
| max_results: int = 3, |
| language: str = "en", |
| content_chars_max: int = 5000, |
| snippet_chars: int = 400, |
| ) -> str: |
| """ |
| Search Wikipedia using LangChain's WikipediaLoader. |
| Returns a JSON string: |
| { |
| "query": "...", |
| "language": "en", |
| "items": [ |
| { |
| "url": "https://en.wikipedia.org/wiki/...", |
| "title": "Title", |
| "snippet": "First N chars of page content", |
| "page_content": "...(clipped to content_chars_max)..." |
| } |
| ] |
| } |
| """ |
| try: |
| docs = WikipediaLoader( |
| query=query, |
| load_max_docs=max_results, |
| lang=language, |
| doc_content_chars_max=content_chars_max, |
| ).load() |
|
|
| items: List[dict] = [] |
| seen_urls = set() |
|
|
| for d in docs: |
| url = d.metadata.get("source") or "" |
| if not url or url in seen_urls: |
| continue |
| seen_urls.add(url) |
|
|
| title = d.metadata.get("title") or "" |
| page_content = d.page_content or "" |
| snippet = _clip(page_content, snippet_chars) |
|
|
| items.append( |
| { |
| "url": url, |
| "title": title, |
| "snippet": snippet, |
| "page_content": page_content, |
| } |
| ) |
|
|
| payload = { |
| "query": query, |
| "language": language, |
| "items": items, |
| } |
| return json.dumps(payload) |
|
|
| except Exception as e: |
| return json.dumps({"error": str(e), "query": query, "language": language}) |
|
|
| |
|
|
| def _domain(url: str) -> str: |
| """Утилита: вытаскиваем домен в виде 'site.tld' (без поддоменов).""" |
| ext = tldextract.extract(url) |
| return ".".join([p for p in (ext.domain, ext.suffix) if p]) |
|
|
| @tool |
| def web_search( |
| query: str, |
| max_results: int = 5, |
| unique_domains: int = 5, |
| snippet_chars: int = 400, |
| include_domains: Optional[List[str]] = None, |
| exclude_domains: Optional[List[str]] = None, |
| ) -> str: |
| """ |
| Structured web search via Tavily. |
| |
| Возвращает JSON-строку такого вида: |
| { |
| "query": "...", |
| "provider": "tavily", |
| "items": [ |
| { |
| "url": "...", |
| "title": "...", |
| "snippet": "...", |
| "published": "2024-05-01T10:00:00Z", # если Tavily отдал |
| "source": "example.com" # домен |
| } |
| ] |
| } |
| """ |
| |
| try: |
| |
| raw_results = TavilySearchResults(max_results=max_results).invoke(query) |
|
|
| items: List[dict] = [] |
| seen_urls: set[str] = set() |
| seen_domains: set[str] = set() |
|
|
| inc = set(include_domains or []) |
| exc = set(exclude_domains or []) |
|
|
| for r in raw_results: |
| url = (r.get("url") or "").strip() |
| if not url: |
| continue |
|
|
| dom = _domain(url) |
|
|
| |
| if inc and dom not in inc: |
| continue |
| if dom in exc: |
| continue |
|
|
| |
| if url in seen_urls: |
| continue |
|
|
| |
| if unique_domains > 0 and dom in seen_domains: |
| |
| pass |
| else: |
| |
| seen_domains.add(dom) |
|
|
| title = (r.get("title") or "").strip() |
| content = r.get("content") or r.get("snippet") or "" |
| snippet = _clip(content, snippet_chars) |
| published = r.get("published_date") or r.get("created_at") |
|
|
| items.append( |
| { |
| "url": url, |
| "title": title, |
| "snippet": snippet, |
| "published": published, |
| "source": dom, |
| } |
| ) |
| seen_urls.add(url) |
|
|
| |
| if len(items) >= max_results: |
| break |
|
|
| payload = { |
| "query": query, |
| "provider": "tavily", |
| "items": items, |
| } |
| return json.dumps(payload) |
|
|
| except Exception as e: |
| |
| return json.dumps({"error": str(e), "query": query, "provider": "tavily"}) |
|
|
|
|
| |
|
|
| @tool |
| def arxiv_search( |
| query: str, |
| max_results: int = 5, |
| ) -> str: |
| """ |
| Поиск по arXiv через LangChain ArxivLoader. |
| |
| [ИЗМЕНЕНИЕ] Возвращает **строгий JSON** вида: |
| { |
| "query": "...", |
| "provider": "arxiv", |
| "items": [ |
| { |
| "title": "...", |
| "authors": ["A. Author","B. Author"], |
| "published": "YYYY-MM-DDTHH:MM:SS", |
| "journal_ref": "…", # если есть |
| "comment": "…", # если есть |
| "snippet": "first N chars of summary", |
| "summary": "… (может быть клипнут ArxivLoader'ом по умолчанию)" |
| } |
| ] |
| } |
| """ |
| try: |
| docs = ArxivLoader( |
| query=query, |
| load_max_docs=max_results, |
| ).load() |
|
|
| items: List[dict] = [] |
|
|
| for d in docs: |
| md = d.metadata or {} |
|
|
| title = md.get("Title") or md.get("title") or "" |
| authors = md.get("Authors") or md.get("authors") or [] |
| if isinstance(authors, str): |
| authors = [a.strip() for a in authors.split(",") if a.strip()] |
|
|
| published = _parse_dt(md.get("Published") or md.get("published")) |
| summary = d.page_content or "" |
|
|
| items.append( |
| { |
| "title": title, |
| "authors": authors, |
| "published": published, |
| "summary": summary, |
| } |
| ) |
|
|
| if len(items) >= max_results: |
| break |
|
|
| payload = { |
| "query": query, |
| "provider": "arxiv", |
| "items": items, |
| } |
| return json.dumps(payload) |
|
|
| except Exception as e: |
| return json.dumps({"error": str(e), "query": query, "provider": "arxiv"}) |
| |
|
|
| @tool |
| def web_extract( |
| urls: List[str] | str, |
| include_images: bool = False, |
| extract_depth: str = "basic", |
| ) -> str: |
| """ |
| Extract text content from web pages using TavilyExtract. |
| |
| 🔹 Input: {"urls": str | List[str]} |
| - Example: web_extract.invoke({"urls": ["https://python.langchain.com/docs/introduction/"]}) |
| 🔹 Output: JSON string with {url, title, text, images?} |
| |
| Options: |
| include_images (bool) – add image URLs if True |
| extract_depth (str) – "basic" (default) or "advanced" |
| """ |
| |
| if isinstance(urls, str): |
| urls = [urls] |
|
|
| tool = TavilyExtract( |
| extract_depth=extract_depth, |
| include_images=include_images, |
| ) |
| |
| results = tool.invoke({"urls": urls}) |
| return json.dumps(results) |
|
|
|
|
| @tool |
| def extract_youtube_transcript(url: str, chars: int = 10_00) -> str: |
| """ |
| Fetch full YouTube transcript (first *chars* characters). |
| """ |
|
|
| video_id_match = re.search(r"[?&]v=([A-Za-z0-9_\-]{11})", url) |
| if not video_id_match: |
| return "yt_error:id_not_found" |
| try: |
| transcript = YouTubeTranscriptApi.get_transcript(video_id_match.group(1)) |
| text = " ".join(piece["text"] for piece in transcript) |
| return text[:chars] |
| except Exception as exc: |
| return f"yt_error:{exc}" |
|
|
| |
| |
| @tool |
| def add(a: float, b: float) -> float: |
| """Returns the sum of two numbers. |
| Example: add(2, 3) -> 5 |
| """ |
| return a + b |
|
|
| @tool |
| def subtract(a: float, b: float) -> float: |
| """Returns the difference of two numbers. |
| Example: subtract(5, 3) -> 2 |
| """ |
| return a - b |
|
|
| @tool |
| def multiply(a: float, b: float) -> float: |
| """Returns the product of two numbers. |
| Example: multiply(2, 3) -> 6 |
| """ |
| return a * b |
|
|
| @tool |
| def divide(a: float, b: float) -> float: |
| """Returns the quotient of two numbers. |
| Example: divide(6, 3) -> 2 |
| """ |
| if b == 0: |
| raise ValueError("Cannot divide by zero.") |
| return a / b |
|
|
| @tool |
| def power(a: float, b: float) -> float: |
| """Returns a raised to the power of b. |
| Example: power(2, 3) -> 8 |
| """ |
| return a ** b |
|
|
|
|
| |
|
|
| @tool |
| def analyze_csv_file(file_path: str, preview_rows: int = 20) -> str: |
| """ |
| Analyze a CSV file: returns JSON with {kind, path, shape, columns, head, numeric_summary}. |
| - preview_rows: number of rows for preview (head) |
| """ |
| if not os.path.exists(file_path): |
| return json.dumps({"error": "file not found", "path": file_path}) |
| try: |
| df = pd.read_csv(file_path) |
| head = df.head(preview_rows).to_dict(orient="records") |
| numeric = df.select_dtypes("number").describe().to_dict() |
| payload = { |
| "kind": "csv", |
| "path": file_path, |
| "shape": list(df.shape), |
| "columns": list(map(str, df.columns)), |
| "head": head, |
| "numeric_summary": numeric, |
| } |
| return json.dumps(payload) |
| except Exception as e: |
| return json.dumps({"error": str(e), "path": file_path}) |
| |
| @tool |
| def analyze_excel_file(file_path: str, sheet: int | str | None = None, preview_rows: int = 20, list_sheets: bool = True) -> str: |
| """ |
| Analyze an Excel file: {kind, path, sheets?, active_sheet, shape, columns, head}. |
| - sheet: sheet index or name (None -> first sheet) |
| - list_sheets: include all sheet names |
| """ |
| if not os.path.exists(file_path): |
| return json.dumps({"error": "file not found", "path": file_path}) |
| try: |
| xls = pd.ExcelFile(file_path) |
| target = sheet if sheet is not None else 0 |
| df = pd.read_excel(xls, sheet_name=target) |
| head = df.head(preview_rows).to_dict(orient="records") |
| payload = { |
| "kind": "excel", |
| "path": file_path, |
| "active_sheet": target if isinstance(target, int) else str(target), |
| "shape": list(df.shape), |
| "columns": list(map(str, df.columns)), |
| "head": head, |
| } |
| if list_sheets: |
| payload["sheets"] = list(map(str, xls.sheet_names)) |
| return json.dumps(payload) |
| except Exception as e: |
| return json.dumps({"error": str(e), "path": file_path}) |
| |
| @tool |
| def analyze_docx_file(file_path: str, max_chars: int = 20000, join_with: str = "\n") -> str: |
| """ |
| Extract text from DOCX: {kind, path, paragraphs, text[:max_chars]}. |
| """ |
| if not os.path.exists(file_path): |
| return json.dumps({"error": "file not found", "path": file_path}) |
| try: |
| from docx import Document |
| except Exception as e: |
| return json.dumps({"error": f"python-docx not installed: {e}"}) |
| try: |
| doc = Document(file_path) |
| paras = [p.text for p in doc.paragraphs if p.text is not None] |
| text = join_with.join(paras) |
| payload = { |
| "kind": "docx", |
| "path": file_path, |
| "paragraphs": len(paras), |
| "text": text[:max_chars], |
| "length": len(text), |
| } |
| return json.dumps(payload) |
| except Exception as e: |
| return json.dumps({"error": str(e), "path": file_path}) |
| |
|
|
| @tool |
| def analyze_txt_file(file_path: str, max_chars: int = 20000) -> str: |
| """ |
| Read plain text: {kind, path, encoding_guess, text[:max_chars], length}. |
| """ |
| if not os.path.exists(file_path): |
| return json.dumps({"error": "file not found", "path": file_path}) |
| try: |
| text, enc = _read_text_best_effort(file_path, max_chars=max_chars) |
| payload = { |
| "kind": "txt", |
| "path": file_path, |
| "encoding_guess": enc, |
| "text": text, |
| "length": os.path.getsize(file_path), |
| } |
| return json.dumps(payload) |
| except Exception as e: |
| return json.dumps({"error": str(e), "path": file_path}) |
| |
| @tool |
| def analyze_pdf_file(file_path: str, max_chars: int = 20000) -> str: |
| """ |
| Extract text & page count from PDF: {kind, path, pages, text[:max_chars]}. |
| Uses pdfminer.six for text and page counting. |
| """ |
| if not os.path.exists(file_path): |
| return json.dumps({"error": "file not found", "path": file_path}) |
| try: |
| |
| from pdfminer.high_level import extract_text |
| text = extract_text(file_path) or "" |
| |
| from pdfminer.pdfpage import PDFPage |
| with open(file_path, "rb") as f: |
| pages = sum(1 for _ in PDFPage.get_pages(f)) |
| payload = { |
| "kind": "pdf", |
| "path": file_path, |
| "pages": pages, |
| "text": text[:max_chars], |
| "length": len(text), |
| } |
| return json.dumps(payload) |
| except Exception as e: |
| return json.dumps({"error": str(e), "path": file_path}) |
| |
|
|
| |
|
|
| @tool |
| def analyze_image_file(file_path: str, ocr: bool = False, lang: Optional[str] = None, max_ocr_chars: int = 10000) -> str: |
| """ |
| Analyze image: {kind, path, format, mode, size, mean_brightness, exif?, ocr_text?}. |
| - ocr: optional Tesseract OCR (pip install pytesseract + tesseract) |
| """ |
| if not os.path.exists(file_path): |
| return json.dumps({"error": "file not found", "path": file_path}) |
| try: |
| img = Image.open(file_path) |
| stat = ImageStat.Stat(img.convert("L")) |
| mean_brightness = float(stat.mean[0]) |
| payload = { |
| "kind": "image", |
| "path": file_path, |
| "format": img.format, |
| "mode": img.mode, |
| "size": list(img.size), |
| "mean_brightness": mean_brightness, |
| } |
| exif = _exif_dict(img) |
| if exif: |
| payload["exif"] = exif |
|
|
| if ocr: |
| try: |
| import pytesseract |
| conf = {} |
| if lang: |
| conf["lang"] = lang |
| text = pytesseract.image_to_string(img, **conf) or "" |
| payload["ocr_text"] = text[:max_ocr_chars] |
| payload["ocr_length"] = len(text) |
| except Exception as e: |
| payload["ocr_error"] = str(e) |
|
|
| return json.dumps(payload) |
| except Exception as e: |
| return json.dumps({"error": str(e), "path": file_path}) |
| |
|
|
|
|
|
|
| |
|
|
| def _configure(): |
| api_key = os.getenv("GOOGLE_API_KEY") or os.getenv("GENAI_API_KEY") |
| if not api_key: |
| raise RuntimeError("Missing GOOGLE_API_KEY (or GENAI_API_KEY) in environment") |
| genai.configure(api_key=api_key) |
|
|
| def _image_bytes_to_part(img_bytes: bytes, mime: str = "image/png") -> Dict[str, Any]: |
| |
| return {"mime_type": mime, "data": base64.b64encode(img_bytes).decode("utf-8")} |
|
|
| def _ensure_png_bytes(img: Image.Image, max_pixels: int = 25_000_000) -> bytes: |
| |
| w, h = img.size |
| if w * h > max_pixels: |
| scale = (max_pixels / (w * h)) ** 0.5 |
| nw, nh = max(1, int(w * scale)), max(1, int(h * scale)) |
| img = img.resize((nw, nh), Image.LANCZOS) |
|
|
| |
| buf = io.BytesIO() |
| img.save(buf, format="PNG", optimize=True) |
| return buf.getvalue() |
|
|
| def _load_image_as_png_bytes_from_path(path: str) -> bytes: |
| if not os.path.exists(path): |
| raise FileNotFoundError(f"Image not found: {path}") |
| img = Image.open(path) |
| return _ensure_png_bytes(img) |
|
|
| def _load_image_as_png_bytes_from_b64(b64: str) -> bytes: |
| raw = base64.b64decode(b64, validate=True) |
| img = Image.open(io.BytesIO(raw)) |
| return _ensure_png_bytes(img) |
|
|
| def _clean_json_text(s: str) -> str: |
| |
| s = s.strip() |
| if s.startswith("```"): |
| s = s.strip("`").replace("json", "", 1).strip() |
| |
| start = s.find("{") |
| end = s.rfind("}") |
| if start != -1 and end != -1 and end > start: |
| return s[start:end+1] |
| return s |
|
|
| _SINGLE_IMAGE_QA_PROMPT = ( |
| "You will be given ONE image and a user question about it.\n" |
| "Answer STRICTLY and CONCISELY based only on the image content.\n" |
| "If the image does not contain enough information to answer, reply 'not enough information'.\n" |
| "If the answer is numeric, include units if visible.\n" |
| "Return ONLY valid JSON with the schema:\n" |
| "{\"answer\": string}\n" |
| ) |
|
|
| def _call_model(parts: List[Any], temperature: float) -> Dict[str, Any]: |
| MODEL_NAME = "gemma-3-27b-it" |
| model = genai.GenerativeModel(MODEL_NAME) |
| resp = model.generate_content(parts, generation_config={"temperature": temperature}) |
| text = (resp.text or "").strip() |
|
|
| |
| try: |
| return json.loads(_clean_json_text(text)) |
| except Exception: |
| |
| fixer = genai.GenerativeModel(MODEL_NAME) |
| fix_prompt = ( |
| "Convert the following text into STRICT valid JSON matching schema {\"answer\": string}. " |
| "Return ONLY JSON, no extra text:\n" + text |
| ) |
| fix_resp = fixer.generate_content([{"text": fix_prompt}]) |
| return json.loads(_clean_json_text((fix_resp.text or "").strip())) |
|
|
| |
|
|
| @tool |
| def vision_qa_gemma( |
| question: str, |
| image_path: Optional[str] = None, |
| image_base64: Optional[str] = None, |
| temperature: float = 0.2, |
| ) -> str: |
| """ |
| Vision QA with Google GenAI (Gemma/Gemini). Returns JSON: {"answer": "..."}. |
| |
| Args: |
| question: user question about the image. |
| image_path: local file path to the image (PNG/JPG/...). |
| image_base64: base64-encoded image (if no path). |
| temperature: decoding temperature (default 0.2). |
| |
| Exactly ONE of (image_path, image_base64) must be provided. |
| """ |
| import json as _json |
| try: |
| _configure() |
| if bool(image_path) == bool(image_base64): |
| return _json.dumps({"error": "Provide exactly ONE of image_path or image_base64"}) |
|
|
| if image_path: |
| img_bytes = _load_image_as_png_bytes_from_path(image_path) |
| else: |
| img_bytes = _load_image_as_png_bytes_from_b64(image_base64) |
|
|
| parts = [ |
| {"text": _SINGLE_IMAGE_QA_PROMPT + "\nQuestion: " + question.strip()}, |
| _image_bytes_to_part(img_bytes, "image/png"), |
| ] |
|
|
| data = _call_model(parts, temperature) |
| |
| answer = data["answer"] if isinstance(data, dict) and "answer" in data else None |
| if not isinstance(answer, str): |
| answer = str(answer) if answer is not None else "not enough information" |
|
|
| return _json.dumps({ |
| "answer": answer, |
| }) |
|
|
| except Exception as e: |
| return _json.dumps({"error": str(e)}) |
|
|
|
|
| |
| @tool |
| def draw_on_image( |
| image_base64: str, drawing_type: str, params: Dict[str, Any] |
| ) -> Dict[str, Any]: |
| """ |
| Draw shapes (rectangle, circle, line) or text onto an image. |
| Args: |
| image_base64 (str): Base64 encoded input image |
| drawing_type (str): Drawing type |
| params (Dict[str, Any]): Drawing parameters |
| Returns: |
| Dictionary with result image (base64) |
| """ |
| try: |
| img = decode_image(image_base64) |
| draw = ImageDraw.Draw(img) |
| color = params.get("color", "red") |
|
|
| if drawing_type == "rectangle": |
| draw.rectangle( |
| [params["left"], params["top"], params["right"], params["bottom"]], |
| outline=color, |
| width=params.get("width", 2), |
| ) |
| elif drawing_type == "circle": |
| x, y, r = params["x"], params["y"], params["radius"] |
| draw.ellipse( |
| (x - r, y - r, x + r, y + r), |
| outline=color, |
| width=params.get("width", 2), |
| ) |
| elif drawing_type == "line": |
| draw.line( |
| ( |
| params["start_x"], |
| params["start_y"], |
| params["end_x"], |
| params["end_y"], |
| ), |
| fill=color, |
| width=params.get("width", 2), |
| ) |
| elif drawing_type == "text": |
| font_size = params.get("font_size", 20) |
| try: |
| font = ImageFont.truetype("arial.ttf", font_size) |
| except IOError: |
| font = ImageFont.load_default() |
| draw.text( |
| (params["x"], params["y"]), |
| params.get("text", "Text"), |
| fill=color, |
| font=font, |
| ) |
| else: |
| return {"error": f"Unknown drawing type: {drawing_type}"} |
|
|
| result_path = save_image(img) |
| result_base64 = encode_image(result_path) |
| return {"result_image": result_base64} |
|
|
| except Exception as e: |
| return {"error": str(e)} |
| |
| @tool |
| def transform_image( |
| image_base64: str, operation: str, params: Optional[Dict[str, Any]] = None |
| ) -> Dict[str, Any]: |
| """ |
| Apply transformations: resize, rotate, crop, flip, brightness, contrast, blur, sharpen, grayscale. |
| Args: |
| image_base64 (str): Base64 encoded input image |
| operation (str): Transformation operation |
| params (Dict[str, Any], optional): Parameters for the operation |
| Returns: |
| Dictionary with transformed image (base64) |
| """ |
| try: |
| img = decode_image(image_base64) |
| params = params or {} |
|
|
| if operation == "resize": |
| img = img.resize( |
| ( |
| params.get("width", img.width // 2), |
| params.get("height", img.height // 2), |
| ) |
| ) |
| elif operation == "rotate": |
| img = img.rotate(params.get("angle", 90), expand=True) |
| elif operation == "crop": |
| img = img.crop( |
| ( |
| params.get("left", 0), |
| params.get("top", 0), |
| params.get("right", img.width), |
| params.get("bottom", img.height), |
| ) |
| ) |
| elif operation == "flip": |
| if params.get("direction", "horizontal") == "horizontal": |
| img = img.transpose(Image.FLIP_LEFT_RIGHT) |
| else: |
| img = img.transpose(Image.FLIP_TOP_BOTTOM) |
| elif operation == "adjust_brightness": |
| img = ImageEnhance.Brightness(img).enhance(params.get("factor", 1.5)) |
| elif operation == "adjust_contrast": |
| img = ImageEnhance.Contrast(img).enhance(params.get("factor", 1.5)) |
| elif operation == "blur": |
| img = img.filter(ImageFilter.GaussianBlur(params.get("radius", 2))) |
| elif operation == "sharpen": |
| img = img.filter(ImageFilter.SHARPEN) |
| elif operation == "grayscale": |
| img = img.convert("L") |
| else: |
| return {"error": f"Unknown operation: {operation}"} |
|
|
| result_path = save_image(img) |
| result_base64 = encode_image(result_path) |
| return {"transformed_image": result_base64} |
|
|
| except Exception as e: |
| return {"error": str(e)} |
|
|
| @tool |
| def save_and_read_file(content: str, filename: Optional[str] = None) -> str: |
| """ |
| Save content to a file and return the path. |
| Args: |
| content (str): the content to save to the file |
| filename (str, optional): the name of the file. If not provided, a random name file will be created. |
| """ |
| temp_dir = tempfile.gettempdir() |
| if filename is None: |
| temp_file = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir) |
| filepath = temp_file.name |
| else: |
| filepath = os.path.join(temp_dir, filename) |
|
|
| with open(filepath, "w") as f: |
| f.write(content) |
|
|
| return f"File saved to {filepath}. You can read this file to process its contents." |
|
|
|
|
| import requests |
|
|
| @tool |
| def download_file_from_url(url: str, filename: Optional[str] = None) -> str: |
| """ |
| Download a file from a URL and save it to a temporary location. |
| Args: |
| url (str): the URL of the file to download. |
| filename (str, optional): the name of the file. If not provided, a random name file will be created. |
| """ |
| try: |
| |
| if not filename: |
| path = urlparse(url).path |
| filename = os.path.basename(path) |
| if not filename: |
| filename = f"downloaded_{uuid.uuid4().hex[:8]}" |
|
|
| |
| temp_dir = tempfile.gettempdir() |
| filepath = os.path.join(temp_dir, filename) |
|
|
| |
| response = requests.get(url, stream=True) |
| response.raise_for_status() |
|
|
| |
| with open(filepath, "wb") as f: |
| for chunk in response.iter_content(chunk_size=8192): |
| f.write(chunk) |
|
|
| return f"File downloaded to {filepath}. You can read this file to process its contents." |
| except Exception as e: |
| return f"Error downloading file: {str(e)}" |
|
|
|
|
| @tool |
| def transcribe_audio(audio_file: str) -> str: |
| """ |
| Transcribe an audio file (URL or local path) using AssemblyAI and return the transcript text. |
| """ |
| try: |
| loader = AssemblyAIAudioTranscriptLoader(file_path=audio_file) |
| docs = loader.load() |
| |
| return docs[0].page_content if docs else "No transcription result." |
| except Exception as e: |
| return f"transcribe_error:{str(e)} (if you see this, please describe error for fixing)" |
|
|