| | import os |
| | import gradio as gr |
| | import requests |
| | import pandas as pd |
| | from typing import Dict, List, Any, Optional, TypedDict, Annotated |
| | import re |
| | import numpy as np |
| | from datetime import datetime |
| |
|
| | |
| | from langchain_anthropic import ChatAnthropic |
| | from langchain_core.messages import HumanMessage, SystemMessage, BaseMessage, AIMessage |
| | from langchain_core.tools import tool |
| | from serpapi import GoogleSearch |
| | from langgraph.graph import StateGraph, END |
| | from langgraph.prebuilt import ToolNode |
| | from langgraph.graph.message import add_messages |
| | import numexpr |
| | from dotenv import load_dotenv |
| |
|
| | |
| | load_dotenv() |
| |
|
| | |
| | DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
| |
|
| | |
| | class AgentState(TypedDict): |
| | messages: Annotated[List[BaseMessage], add_messages] |
| |
|
| | |
| | @tool |
| | def web_search(query: str, max_results: int = 8) -> str: |
| | """ |
| | Enhanced web search using DuckDuckGo (no API key required). |
| | Falls back to SerpAPI if available. |
| | """ |
| | try: |
| | |
| | if isinstance(query, list): |
| | query = " ".join(str(item) for item in query) |
| | elif not isinstance(query, str): |
| | query = str(query) |
| | |
| | |
| | tavily_api_key = os.getenv("TAVILY_API_KEY") |
| | if tavily_api_key: |
| | try: |
| | import requests |
| | tavily_url = "https://api.tavily.com/search" |
| | tavily_headers = { |
| | "Content-Type": "application/json" |
| | } |
| | tavily_data = { |
| | "api_key": tavily_api_key, |
| | "query": query, |
| | "search_depth": "advanced", |
| | "include_answer": True, |
| | "include_raw_content": False, |
| | "max_results": max_results |
| | } |
| | |
| | response = requests.post(tavily_url, json=tavily_data, headers=tavily_headers, timeout=10) |
| | if response.status_code == 200: |
| | results = response.json() |
| | formatted_results = [] |
| | |
| | |
| | if results.get("answer"): |
| | formatted_results.append(f"DIRECT ANSWER: {results['answer']}") |
| | |
| | |
| | if results.get("results"): |
| | for i, result in enumerate(results["results"][:max_results], 1): |
| | title = result.get("title", "") |
| | content = result.get("content", "") |
| | url = result.get("url", "") |
| | formatted_results.append(f"{i}. {title}\n {content}\n Source: {url}") |
| | |
| | if formatted_results: |
| | return "\n\n".join(formatted_results) |
| | |
| | except Exception as tavily_error: |
| | print(f"Tavily search error: {tavily_error}") |
| | |
| | |
| | try: |
| | import requests |
| | from urllib.parse import quote |
| | |
| | |
| | ddg_success = False |
| | formatted_results = [] |
| | |
| | |
| | for attempt in range(2): |
| | try: |
| | ddg_url = f"https://api.duckduckgo.com/?q={quote(query)}&format=json&no_html=1" |
| | response = requests.get(ddg_url, timeout=5) |
| | |
| | if response.status_code == 200: |
| | ddg_data = response.json() |
| | |
| | |
| | if ddg_data.get("Answer"): |
| | formatted_results.append(f"DIRECT ANSWER: {ddg_data['Answer']}") |
| | ddg_success = True |
| | |
| | |
| | if ddg_data.get("Abstract"): |
| | formatted_results.append(f"SUMMARY: {ddg_data['Abstract']}") |
| | ddg_success = True |
| | |
| | |
| | if ddg_data.get("Definition"): |
| | formatted_results.append(f"DEFINITION: {ddg_data['Definition']}") |
| | ddg_success = True |
| | |
| | if ddg_success: |
| | break |
| | except: |
| | if attempt == 0: |
| | print(f"DuckDuckGo attempt 1 failed, retrying...") |
| | continue |
| | |
| | |
| | if not ddg_success: |
| | print(f"DuckDuckGo unavailable, checking alternatives...") |
| | |
| | |
| | if "wikipedia" in query.lower() or "featured article" in query.lower(): |
| | formatted_results.append(f"Search query: {query}") |
| | formatted_results.append("Note: For Wikipedia Featured Articles, check Wikipedia's FA archives") |
| | formatted_results.append("Tip: Featured Articles are promoted monthly and listed in Wikipedia's FA log") |
| | else: |
| | |
| | query_lower = query.lower() if isinstance(query, str) else str(query).lower() |
| | if "who is" in query_lower or "who was" in query_lower: |
| | formatted_results.append(f"Search query: {query}") |
| | formatted_results.append("Note: Live web search unavailable. Please verify information.") |
| | elif any(word in query_lower for word in ["when", "what year", "what date"]): |
| | formatted_results.append(f"Search query: {query}") |
| | formatted_results.append("Note: For current dates and recent events, web search is limited.") |
| | else: |
| | formatted_results.append(f"Search query: {query}") |
| | formatted_results.append("Note: Web search temporarily unavailable.") |
| | |
| | if formatted_results: |
| | return "\n\n".join(formatted_results) |
| | |
| | except Exception as ddg_error: |
| | print(f"DuckDuckGo search error: {ddg_error}") |
| | |
| | |
| | api_key = os.getenv("SERPAPI_KEY") |
| | if api_key: |
| | params = { |
| | "q": query, |
| | "api_key": api_key, |
| | "num": max_results, |
| | "engine": "google", |
| | "hl": "en", |
| | "gl": "us" |
| | } |
| | |
| | search = GoogleSearch(params) |
| | results = search.get_dict() |
| | |
| | formatted_results = [] |
| | |
| | |
| | if "answer_box" in results: |
| | ab = results["answer_box"] |
| | if "answer" in ab: |
| | formatted_results.append(f"DIRECT ANSWER: {ab['answer']}") |
| | elif "snippet" in ab: |
| | formatted_results.append(f"ANSWER BOX: {ab['snippet']}") |
| | |
| | if "organic_results" in results: |
| | for i, result in enumerate(results["organic_results"][:max_results], 1): |
| | title = result.get("title", "") |
| | snippet = result.get("snippet", "") |
| | formatted_results.append(f"{i}. {title}\n {snippet}") |
| | |
| | return "\n\n".join(formatted_results) if formatted_results else "No results found" |
| | |
| | return "No search service available. Please set SERPAPI_KEY or check internet connection." |
| | |
| | except Exception as e: |
| | return f"Search error: {str(e)}" |
| |
|
| | @tool |
| | def calculator(expression: str) -> str: |
| | """ |
| | Enhanced calculator with unit conversion and advanced functions. |
| | Supports: arithmetic, percentages, trigonometry, logarithms, unit conversion. |
| | Examples: "15% of 200", "sqrt(16)", "convert 5 km to miles" |
| | """ |
| | try: |
| | |
| | if isinstance(expression, list): |
| | expression = " ".join(str(item) for item in expression) |
| | elif not isinstance(expression, str): |
| | expression = str(expression) |
| | |
| | expression = expression.strip().lower() |
| | |
| | |
| | if "% of" in expression: |
| | parts = expression.split("% of") |
| | if len(parts) == 2: |
| | percent = float(parts[0].strip()) |
| | value = float(parts[1].strip()) |
| | result = (percent / 100) * value |
| | return str(result) |
| | |
| | |
| | if "convert" in expression or " to " in expression: |
| | |
| | conversions = { |
| | "km to miles": 0.621371, |
| | "miles to km": 1.60934, |
| | "kg to lbs": 2.20462, |
| | "lbs to kg": 0.453592, |
| | "celsius to fahrenheit": lambda c: (c * 9/5) + 32, |
| | "fahrenheit to celsius": lambda f: (f - 32) * 5/9, |
| | "meters to feet": 3.28084, |
| | "feet to meters": 0.3048, |
| | "liters to gallons": 0.264172, |
| | "gallons to liters": 3.78541 |
| | } |
| | |
| | for conv, factor in conversions.items(): |
| | if conv in expression: |
| | |
| | import re |
| | numbers = re.findall(r'[\d.]+', expression) |
| | if numbers: |
| | value = float(numbers[0]) |
| | if callable(factor): |
| | result = factor(value) |
| | else: |
| | result = value * factor |
| | return f"{result:.4f}".rstrip('0').rstrip('.') |
| | |
| | |
| | expression = expression.replace("sqrt", "sqrt") |
| | expression = expression.replace("log10", "log10") |
| | expression = expression.replace("log", "log") |
| | expression = expression.replace("sin", "sin") |
| | expression = expression.replace("cos", "cos") |
| | expression = expression.replace("tan", "tan") |
| | expression = expression.replace("pi", "3.14159265359") |
| | expression = expression.replace("e", "2.71828182846") |
| | |
| | |
| | expression = re.sub(r'[a-zA-Z]+', '', expression) |
| | |
| | |
| | result = numexpr.evaluate(expression) |
| | |
| | |
| | if isinstance(result, (int, np.integer)): |
| | return str(int(result)) |
| | elif isinstance(result, (float, np.floating)): |
| | if abs(result) < 1e-10: |
| | return "0" |
| | elif abs(result) > 1e10: |
| | return f"{result:.2e}" |
| | else: |
| | |
| | formatted = f"{result:.6f}".rstrip('0').rstrip('.') |
| | |
| | if float(formatted).is_integer(): |
| | return str(int(float(formatted))) |
| | return formatted |
| | else: |
| | return str(result) |
| | |
| | except Exception as e: |
| | |
| | try: |
| | import math |
| | result = eval(expression, {"__builtins__": {}, "math": math}) |
| | if isinstance(result, float) and result.is_integer(): |
| | return str(int(result)) |
| | return str(result) |
| | except: |
| | return f"Calculation error: {str(e)}" |
| |
|
| | @tool |
| | def python_executor(code: str) -> str: |
| | """ |
| | Enhanced Python executor with data analysis and web scraping capabilities. |
| | Includes: pandas, numpy, statistics, datetime, requests, BeautifulSoup. |
| | Always print the final result you want to return. |
| | """ |
| | try: |
| | |
| | if isinstance(code, list): |
| | code = "\n".join(str(item) for item in code) |
| | elif not isinstance(code, str): |
| | code = str(code) |
| | |
| | safe_globals = { |
| | '__builtins__': { |
| | 'print': print, |
| | 'len': len, |
| | 'range': range, |
| | 'sum': sum, |
| | 'min': min, |
| | 'max': max, |
| | 'abs': abs, |
| | 'round': round, |
| | 'sorted': sorted, |
| | 'reversed': reversed, |
| | 'enumerate': enumerate, |
| | 'zip': zip, |
| | 'map': map, |
| | 'filter': filter, |
| | 'str': str, |
| | 'int': int, |
| | 'float': float, |
| | 'list': list, |
| | 'dict': dict, |
| | 'set': set, |
| | 'tuple': tuple, |
| | 'bool': bool, |
| | 'all': all, |
| | 'any': any, |
| | 'isinstance': isinstance, |
| | 'type': type, |
| | }, |
| | 'math': __import__('math'), |
| | 'datetime': __import__('datetime'), |
| | 'json': __import__('json'), |
| | 're': __import__('re'), |
| | 'numpy': __import__('numpy'), |
| | 'np': __import__('numpy'), |
| | 'pandas': __import__('pandas'), |
| | 'pd': __import__('pandas'), |
| | 'statistics': __import__('statistics'), |
| | 'itertools': __import__('itertools'), |
| | 'collections': __import__('collections'), |
| | 'Counter': __import__('collections').Counter, |
| | 'defaultdict': __import__('collections').defaultdict, |
| | } |
| | |
| | |
| | from io import StringIO |
| | import sys |
| | |
| | old_stdout = sys.stdout |
| | sys.stdout = output_buffer = StringIO() |
| | |
| | try: |
| | |
| | enhanced_code = code |
| | if "from datetime" not in code and "import datetime" not in code: |
| | enhanced_code = "from datetime import datetime, date, timedelta\n" + enhanced_code |
| | |
| | exec(enhanced_code, safe_globals) |
| | output = output_buffer.getvalue().strip() |
| | |
| | |
| | if not output: |
| | for var in ['result', 'answer', 'output']: |
| | if var in safe_globals: |
| | output = str(safe_globals[var]) |
| | break |
| | |
| | return output if output else "No output (add print statement)" |
| | finally: |
| | sys.stdout = old_stdout |
| | |
| | except Exception as e: |
| | import traceback |
| | return f"Error: {str(e)}\nTraceback: {traceback.format_exc()}" |
| |
|
| | @tool |
| | def extract_image_from_question(question: str) -> str: |
| | """ |
| | Extract and analyze images mentioned in questions. |
| | For GAIA benchmark, images are typically base64 encoded or referenced. |
| | """ |
| | try: |
| | |
| | if isinstance(question, list): |
| | question = " ".join(str(item) for item in question) |
| | elif not isinstance(question, str): |
| | question = str(question) |
| | |
| | if "data:image" in question: |
| | return "Image data detected in question" |
| | |
| | |
| | image_extensions = ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.svg'] |
| | for ext in image_extensions: |
| | if ext in question.lower(): |
| | return f"Image file reference detected: {ext}" |
| | |
| | |
| | image_phrases = ['image', 'picture', 'photo', 'diagram', 'figure', 'screenshot'] |
| | for phrase in image_phrases: |
| | if phrase in question.lower(): |
| | return "Image-related content mentioned in question" |
| | |
| | return "No image content detected" |
| | except Exception as e: |
| | return f"Error analyzing for images: {str(e)}" |
| |
|
| | @tool |
| | def analyze_attachments(question: str) -> str: |
| | """ |
| | Analyze questions for references to attachments (files, videos, audio). |
| | For GAIA questions that reference external content. |
| | """ |
| | |
| | if isinstance(question, list): |
| | question = " ".join(str(item) for item in question) |
| | elif not isinstance(question, str): |
| | question = str(question) |
| | |
| | attachments = [] |
| | |
| | |
| | youtube_patterns = [ |
| | r'youtube\.com/watch\?v=([a-zA-Z0-9_-]+)', |
| | r'youtu\.be/([a-zA-Z0-9_-]+)' |
| | ] |
| | for pattern in youtube_patterns: |
| | import re |
| | matches = re.findall(pattern, question) |
| | if matches: |
| | attachments.append(f"YouTube video: {matches[0]}") |
| | |
| | |
| | url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+\.(?:xlsx|xls|csv|pdf|txt)' |
| | url_matches = re.findall(url_pattern, question, re.IGNORECASE) |
| | if url_matches: |
| | for url in url_matches: |
| | if '.xlsx' in url or '.xls' in url: |
| | attachments.append(f"Excel file URL: {url}") |
| | elif '.csv' in url: |
| | attachments.append(f"CSV file URL: {url}") |
| | elif '.pdf' in url: |
| | attachments.append(f"PDF file URL: {url}") |
| | elif '.txt' in url: |
| | attachments.append(f"Text file URL: {url}") |
| | |
| | |
| | file_patterns = [ |
| | r'attached (\w+) file', |
| | r'the (\w+) file', |
| | r'(\w+\.\w{2,4})' |
| | ] |
| | for pattern in file_patterns: |
| | matches = re.findall(pattern, question, re.IGNORECASE) |
| | if matches: |
| | |
| | for match in matches: |
| | if not any(match in url for url in url_matches): |
| | attachments.append(f"File reference: {match}") |
| | |
| | if attachments: |
| | return "Attachments found: " + ", ".join(attachments) |
| | return "No attachments detected" |
| |
|
| | @tool |
| | def analyze_reversed_text(text: str) -> str: |
| | """ |
| | Analyze text that might be written backwards or contains puzzles. |
| | Useful for GAIA questions with reversed text. |
| | """ |
| | try: |
| | |
| | if isinstance(text, list): |
| | text = " ".join(str(item) for item in text) |
| | elif not isinstance(text, str): |
| | text = str(text) |
| | |
| | reversed_text = text[::-1] |
| | |
| | |
| | if "rewsna" in text.lower() or "noitseuq" in text.lower(): |
| | return f"Text appears to be reversed. Original: {reversed_text}" |
| | |
| | |
| | words = text.split() |
| | reversed_words = [word[::-1] for word in words] |
| | |
| | return f"Normal text: {text}\nReversed text: {reversed_text}\nReversed words: {' '.join(reversed_words)}" |
| | except Exception as e: |
| | return f"Error analyzing text: {str(e)}" |
| |
|
| | @tool |
| | def analyze_code_in_question(question: str) -> str: |
| | """ |
| | Detect and extract Python code from questions. |
| | Looks for code blocks, inline code, and code-related phrases. |
| | """ |
| | try: |
| | |
| | if isinstance(question, list): |
| | question = " ".join(str(item) for item in question) |
| | elif not isinstance(question, str): |
| | question = str(question) |
| | |
| | extracted_code = [] |
| | |
| | |
| | code_block_pattern = r'```python\s*(.*?)\s*```' |
| | code_blocks = re.findall(code_block_pattern, question, re.DOTALL | re.IGNORECASE) |
| | if code_blocks: |
| | for i, code in enumerate(code_blocks, 1): |
| | extracted_code.append(f"Code Block {i}:\n{code.strip()}") |
| | |
| | |
| | generic_code_pattern = r'```\s*(.*?)\s*```' |
| | generic_blocks = re.findall(generic_code_pattern, question, re.DOTALL) |
| | if generic_blocks: |
| | for i, code in enumerate(generic_blocks, 1): |
| | |
| | if any(keyword in code for keyword in ['def ', 'import ', 'class ', 'if ', 'for ', 'while ', 'print(', 'return ']): |
| | extracted_code.append(f"Generic Code Block {i}:\n{code.strip()}") |
| | |
| | |
| | inline_code_pattern = r'`([^`]+)`' |
| | inline_codes = re.findall(inline_code_pattern, question) |
| | if inline_codes: |
| | |
| | python_inline = [] |
| | for code in inline_codes: |
| | if any(char in code for char in ['(', ')', '=', '[', ']', '{', '}', 'def', 'import', 'print']): |
| | python_inline.append(code) |
| | if python_inline: |
| | extracted_code.append("Inline Code:\n" + "\n".join(f"- {code}" for code in python_inline)) |
| | |
| | |
| | code_phrases = [ |
| | r'attached python code', |
| | r'the following code', |
| | r'this code', |
| | r'given code', |
| | r'code snippet', |
| | r'python script', |
| | r'the script', |
| | r'function below', |
| | r'class below', |
| | r'program below' |
| | ] |
| | |
| | code_indicators = [] |
| | for phrase in code_phrases: |
| | if re.search(phrase, question, re.IGNORECASE): |
| | code_indicators.append(phrase.replace(r'\\', '')) |
| | |
| | |
| | python_patterns = [ |
| | r'def\s+\w+\s*\([^)]*\)\s*:', |
| | r'class\s+\w+\s*(?:\([^)]*\))?\s*:', |
| | r'import\s+\w+', |
| | r'from\s+\w+\s+import', |
| | r'if\s+.*:\s*\n', |
| | r'for\s+\w+\s+in\s+', |
| | r'while\s+.*:\s*\n', |
| | ] |
| | |
| | loose_code = [] |
| | for pattern in python_patterns: |
| | matches = re.findall(pattern, question, re.MULTILINE) |
| | if matches: |
| | loose_code.extend(matches) |
| | |
| | if loose_code: |
| | extracted_code.append("Detected Python patterns:\n" + "\n".join(f"- {code.strip()}" for code in loose_code[:5])) |
| | |
| | |
| | response_parts = [] |
| | |
| | if extracted_code: |
| | response_parts.append("Found Python code in question:") |
| | response_parts.extend(extracted_code) |
| | |
| | if code_indicators: |
| | response_parts.append(f"\nCode-related phrases detected: {', '.join(code_indicators)}") |
| | |
| | if not extracted_code and not code_indicators: |
| | return "No Python code detected in the question" |
| | |
| | return "\n\n".join(response_parts) |
| | |
| | except Exception as e: |
| | return f"Error analyzing code in question: {str(e)}" |
| |
|
| | @tool |
| | def get_youtube_transcript(url: str) -> str: |
| | """ |
| | Extract transcript/subtitles from YouTube videos. |
| | Useful for questions asking about video content. |
| | """ |
| | try: |
| | |
| | if isinstance(url, list): |
| | url = " ".join(str(item) for item in url) |
| | elif not isinstance(url, str): |
| | url = str(url) |
| | |
| | |
| | import re |
| | video_id_match = re.search(r'(?:v=|/)([0-9A-Za-z_-]{11}).*', url) |
| | if not video_id_match: |
| | return "Error: Invalid YouTube URL" |
| | |
| | video_id = video_id_match.group(1) |
| | |
| | |
| | try: |
| | from youtube_transcript_api import YouTubeTranscriptApi |
| | import time |
| | |
| | |
| | time.sleep(1) |
| | |
| | |
| | transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) |
| | |
| | |
| | transcript = None |
| | try: |
| | transcript = transcript_list.find_transcript(['en']) |
| | except: |
| | |
| | try: |
| | transcript = transcript_list.find_manually_created_transcript() |
| | except: |
| | try: |
| | transcript = transcript_list.find_generated_transcript() |
| | except: |
| | pass |
| | |
| | if transcript: |
| | |
| | transcript_data = transcript.fetch() |
| | |
| | |
| | if isinstance(transcript_data, list): |
| | full_text = " ".join([entry.get('text', '') if isinstance(entry, dict) else str(entry) for entry in transcript_data]) |
| | else: |
| | |
| | full_text = str(transcript_data) |
| | |
| | |
| | if any(phrase in url.lower() or phrase in str(url).lower() |
| | for phrase in ["say", "response", "answer", "dialogue"]): |
| | |
| | return f"Transcript excerpt: ...{full_text[-500:]}" |
| | |
| | return f"Full transcript: {full_text[:1000]}..." if len(full_text) > 1000 else f"Full transcript: {full_text}" |
| | |
| | except Exception as yt_error: |
| | error_str = str(yt_error) |
| | print(f"YouTube transcript error: {yt_error}") |
| | |
| | |
| | if "429" in error_str or "Too Many Requests" in error_str: |
| | return "Unable to determine" |
| | |
| | |
| | try: |
| | from pytube import YouTube |
| | import time |
| | |
| | |
| | time.sleep(1) |
| | |
| | yt = YouTube(url) |
| | |
| | |
| | title = yt.title if hasattr(yt, 'title') else "Unknown" |
| | description = yt.description[:200] if hasattr(yt, 'description') and yt.description else "No description" |
| | |
| | return f"Video info - Title: {title}\nDescription: {description}\nNote: Transcript not available" |
| | |
| | except Exception as pytube_error: |
| | print(f"Pytube error: {pytube_error}") |
| | |
| | return "Unable to determine" |
| | |
| | except Exception as e: |
| | return f"Error accessing YouTube video: {str(e)}" |
| |
|
| | @tool |
| | def analyze_multimedia_reference(question: str) -> str: |
| | """ |
| | Detect and provide guidance for multimedia content in questions. |
| | Returns specific answers for common multimedia patterns. |
| | """ |
| | try: |
| | |
| | if isinstance(question, list): |
| | question = " ".join(str(item) for item in question) |
| | elif not isinstance(question, str): |
| | question = str(question) |
| | |
| | question_lower = question.lower() |
| | |
| | |
| | |
| | |
| | if any(term in question_lower for term in ["excel", "spreadsheet", ".xlsx", ".xls", ".csv"]): |
| | if any(term in question_lower for term in ["total", "sum", "how much", "how many", "amount"]): |
| | |
| | return "Cannot access spreadsheet - provide final answer: Unable to determine" |
| | elif "sales" in question_lower and "total" in question_lower: |
| | return "Cannot access sales data - provide final answer: Unable to determine" |
| | |
| | |
| | if "attached" in question_lower and ("python" in question_lower or "code" in question_lower): |
| | if "output" in question_lower and ("numeric" in question_lower or "final" in question_lower): |
| | return "Cannot access attached code - provide final answer: Unable to determine" |
| | elif "fix" in question_lower or "correct" in question_lower: |
| | return "Cannot access attached code to fix - provide final answer: Unable to determine" |
| | |
| | |
| | if ("pdf" in question_lower or ".pdf" in question_lower) and any(term in question_lower for term in ["how many", "count", "times"]): |
| | return "Cannot access PDF to count - provide final answer: Unable to determine" |
| | |
| | |
| | if any(term in question_lower for term in ["image", "picture", "photo", ".png", ".jpg", ".jpeg"]): |
| | if "chess" in question_lower: |
| | return "Cannot access chess position image - provide final answer: Unable to determine" |
| | elif any(term in question_lower for term in ["color", "what is", "describe"]): |
| | return "Cannot access image - provide final answer: Unable to determine" |
| | |
| | |
| | if any(term in question_lower for term in ["audio", ".mp3", ".wav", "recording"]): |
| | if any(term in question_lower for term in ["transcribe", "what does", "study", "exam"]): |
| | return "Cannot access audio file - provide final answer: Unable to determine" |
| | |
| | return "No specific multimedia pattern requiring 'Unable to determine' response" |
| | |
| | except Exception as e: |
| | return f"Error analyzing multimedia: {str(e)}" |
| |
|
| | @tool |
| | def download_and_process_file(url: str, file_type: str = None) -> str: |
| | """ |
| | Download and process files from URLs (Excel, CSV, PDF, etc). |
| | Useful when questions reference files by URL. |
| | """ |
| | try: |
| | |
| | if isinstance(url, list): |
| | url = " ".join(str(item) for item in url) |
| | elif not isinstance(url, str): |
| | url = str(url) |
| | |
| | |
| | url = url.strip() |
| | |
| | |
| | if not file_type: |
| | if any(ext in url.lower() for ext in ['.xlsx', '.xls']): |
| | file_type = 'excel' |
| | elif '.csv' in url.lower(): |
| | file_type = 'csv' |
| | elif '.pdf' in url.lower(): |
| | file_type = 'pdf' |
| | elif any(ext in url.lower() for ext in ['.txt', '.text']): |
| | file_type = 'text' |
| | else: |
| | return "Unable to determine file type from URL" |
| | |
| | |
| | import requests |
| | from io import BytesIO, StringIO |
| | |
| | try: |
| | response = requests.get(url, timeout=15, headers={'User-Agent': 'Mozilla/5.0'}) |
| | response.raise_for_status() |
| | except requests.exceptions.RequestException as e: |
| | return f"Failed to download file: {str(e)}" |
| | |
| | |
| | if file_type == 'excel': |
| | try: |
| | import pandas as pd |
| | df = pd.read_excel(BytesIO(response.content)) |
| | |
| | |
| | info = [] |
| | info.append(f"Excel file loaded successfully") |
| | info.append(f"Shape: {df.shape[0]} rows, {df.shape[1]} columns") |
| | info.append(f"Columns: {', '.join(df.columns)}") |
| | |
| | |
| | numeric_cols = df.select_dtypes(include=['number']).columns |
| | if len(numeric_cols) > 0: |
| | info.append("\nNumeric column sums:") |
| | for col in numeric_cols: |
| | total = df[col].sum() |
| | info.append(f" {col}: {total}") |
| | |
| | |
| | if 'sales' in ' '.join(df.columns).lower(): |
| | sales_cols = [col for col in df.columns if 'sales' in col.lower()] |
| | if sales_cols: |
| | total_sales = df[sales_cols].sum().sum() |
| | info.append(f"\nTotal sales: {total_sales}") |
| | |
| | return '\n'.join(info) |
| | |
| | except Exception as e: |
| | return f"Error processing Excel file: {str(e)}" |
| | |
| | elif file_type == 'csv': |
| | try: |
| | import pandas as pd |
| | df = pd.read_csv(StringIO(response.text)) |
| | |
| | info = [] |
| | info.append(f"CSV file loaded successfully") |
| | info.append(f"Shape: {df.shape[0]} rows, {df.shape[1]} columns") |
| | info.append(f"Columns: {', '.join(df.columns)}") |
| | |
| | |
| | numeric_cols = df.select_dtypes(include=['number']).columns |
| | if len(numeric_cols) > 0: |
| | info.append("\nNumeric column sums:") |
| | for col in numeric_cols: |
| | total = df[col].sum() |
| | info.append(f" {col}: {total}") |
| | |
| | return '\n'.join(info) |
| | |
| | except Exception as e: |
| | return f"Error processing CSV file: {str(e)}" |
| | |
| | elif file_type == 'pdf': |
| | try: |
| | import PyPDF2 |
| | pdf_reader = PyPDF2.PdfReader(BytesIO(response.content)) |
| | |
| | info = [] |
| | info.append(f"PDF file loaded successfully") |
| | info.append(f"Number of pages: {len(pdf_reader.pages)}") |
| | |
| | |
| | full_text = "" |
| | for page in pdf_reader.pages: |
| | text = page.extract_text() |
| | full_text += text + "\n" |
| | |
| | |
| | info.append(f"Total characters: {len(full_text)}") |
| | info.append(f"Total words: {len(full_text.split())}") |
| | |
| | |
| | info.append("\nFull text extracted and available for searching") |
| | |
| | return '\n'.join(info) + f"\n\nFull text (first 1000 chars):\n{full_text[:1000]}..." |
| | |
| | except Exception as e: |
| | return f"Error processing PDF file: {str(e)}" |
| | |
| | elif file_type == 'text': |
| | try: |
| | text_content = response.text |
| | info = [] |
| | info.append(f"Text file loaded successfully") |
| | info.append(f"Length: {len(text_content)} characters") |
| | info.append(f"Lines: {len(text_content.splitlines())}") |
| | info.append(f"\nContent preview:\n{text_content[:500]}...") |
| | |
| | return '\n'.join(info) |
| | |
| | except Exception as e: |
| | return f"Error processing text file: {str(e)}" |
| | |
| | else: |
| | return f"Unsupported file type: {file_type}" |
| | |
| | except Exception as e: |
| | return f"Error downloading/processing file: {str(e)}" |
| |
|
| | @tool |
| | def extract_file_urls(question: str) -> str: |
| | """ |
| | Extract file URLs from questions for downloading. |
| | Returns URLs of files that can be downloaded. |
| | """ |
| | try: |
| | |
| | if isinstance(question, list): |
| | question = " ".join(str(item) for item in question) |
| | elif not isinstance(question, str): |
| | question = str(question) |
| | |
| | import re |
| | |
| | |
| | url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+\.(?:xlsx|xls|csv|pdf|txt|doc|docx)' |
| | urls = re.findall(url_pattern, question, re.IGNORECASE) |
| | |
| | if urls: |
| | return f"Found downloadable file URLs: {', '.join(urls)}" |
| | else: |
| | return "No downloadable file URLs found in the question" |
| | |
| | except Exception as e: |
| | return f"Error extracting URLs: {str(e)}" |
| |
|
| | @tool |
| | def get_current_datetime() -> str: |
| | """Get the current date and time.""" |
| | return datetime.now().strftime("%Y-%m-%d %H:%M:%S %Z") |
| |
|
| | |
| | class LangGraphAgent: |
| | def __init__(self, anthropic_api_key: Optional[str] = None): |
| | |
| | api_key = anthropic_api_key or os.getenv("ANTHROPIC_API_KEY") |
| | if not api_key: |
| | raise ValueError("ANTHROPIC_API_KEY must be provided or set in environment variables") |
| | |
| | self.llm = ChatAnthropic( |
| | api_key=api_key, |
| | model="claude-3-5-sonnet-20241022", |
| | temperature=0.3, |
| | max_tokens=4096 |
| | ) |
| | |
| | |
| | self.tools = [ |
| | web_search, |
| | calculator, |
| | python_executor, |
| | extract_image_from_question, |
| | analyze_attachments, |
| | analyze_reversed_text, |
| | analyze_code_in_question, |
| | get_youtube_transcript, |
| | analyze_multimedia_reference, |
| | extract_file_urls, |
| | download_and_process_file, |
| | get_current_datetime |
| | ] |
| | |
| | |
| | self.llm_with_tools = self.llm.bind_tools(self.tools) |
| | |
| | |
| | self.tool_node = ToolNode(self.tools) |
| | |
| | |
| | self.graph = self._build_graph() |
| | |
| | def _build_graph(self): |
| | workflow = StateGraph(AgentState) |
| | |
| | |
| | workflow.add_node("agent", self._call_model) |
| | workflow.add_node("tools", self.tool_node) |
| | |
| | |
| | workflow.set_entry_point("agent") |
| | |
| | |
| | workflow.add_conditional_edges( |
| | "agent", |
| | self._should_continue, |
| | { |
| | "continue": "tools", |
| | "end": END |
| | } |
| | ) |
| | |
| | |
| | workflow.add_edge("tools", "agent") |
| | |
| | return workflow.compile() |
| | |
| | def _call_model(self, state: AgentState): |
| | """Call the model with tools.""" |
| | messages = state["messages"] |
| | response = self.llm_with_tools.invoke(messages) |
| | return {"messages": [response]} |
| | |
| | def _should_continue(self, state: AgentState): |
| | """Determine if we should continue with tools or end.""" |
| | last_message = state["messages"][-1] |
| | |
| | |
| | if hasattr(last_message, "tool_calls") and last_message.tool_calls: |
| | return "continue" |
| | |
| | |
| | tool_call_count = 0 |
| | for msg in state["messages"]: |
| | if hasattr(msg, "tool_calls") and msg.tool_calls: |
| | tool_call_count += len(msg.tool_calls) |
| | |
| | |
| | if tool_call_count < 2: |
| | |
| | if hasattr(last_message, "content") and last_message.content: |
| | content_str = last_message.content if isinstance(last_message.content, str) else str(last_message.content) |
| | has_final_answer = "FINAL ANSWER:" in content_str |
| | |
| | |
| | if not has_final_answer and tool_call_count < 3: |
| | return "continue" |
| | |
| | |
| | content_str = str(last_message.content) if hasattr(last_message, "content") else "" |
| | if tool_call_count >= 6 or "FINAL ANSWER:" in content_str: |
| | return "end" |
| | |
| | return "end" |
| | |
| | def run(self, question: str) -> str: |
| | """Run the agent on a question.""" |
| | print(f"\nDEBUG LangGraphAgent.run():") |
| | print(f" Input type: {type(question)}") |
| | print(f" Input value: {repr(question)[:200]}...") |
| | |
| | system_prompt = """You are solving GAIA benchmark questions that require deep research and analysis. |
| | |
| | IMPORTANT: You should: |
| | 1. Use multiple tools to thoroughly research the question |
| | 2. Search for specific facts, verify information, and perform calculations |
| | 3. Think step-by-step and use chain-of-thought reasoning |
| | 4. Double-check facts with multiple searches if needed |
| | 5. Use python_executor for complex data analysis or calculations |
| | |
| | At the very end, after all your research and reasoning, provide ONLY the final answer in this format: |
| | FINAL ANSWER: [your answer here] |
| | |
| | The final answer should contain ONLY the requested information: |
| | - Numbers: just the number (e.g., "5" not "5 people") |
| | - Years: just the year (e.g., "1969") |
| | - Names: exact name with proper capitalization |
| | - Yes/No: exactly "Yes" or "No" |
| | - Lists: comma-separated values |
| | |
| | Available tools: |
| | - web_search: Search for current information (use multiple times with different queries) |
| | - calculator: Perform calculations and unit conversions |
| | - python_executor: Complex analysis, data processing, date calculations |
| | - analyze_attachments: Detect references to external files/media |
| | - analyze_reversed_text: Decode backwards or puzzle text |
| | - analyze_code_in_question: Extract and analyze Python code from questions |
| | - get_youtube_transcript: Extract transcripts from YouTube videos |
| | - analyze_multimedia_reference: Handle questions about images, audio, PDFs, Excel files |
| | - extract_file_urls: Find downloadable file URLs in questions |
| | - download_and_process_file: Download and analyze files from URLs (Excel, CSV, PDF) |
| | - get_current_datetime: Get current date/time |
| | |
| | For questions mentioning "attached code" or containing code snippets: |
| | 1. First use analyze_code_in_question to extract the code |
| | 2. Then use python_executor to run it and get the output |
| | |
| | For questions with YouTube videos: |
| | 1. Use get_youtube_transcript to extract the video transcript |
| | 2. Search the transcript for the relevant information |
| | |
| | For questions mentioning files with URLs: |
| | 1. Use extract_file_urls to find any file URLs in the question |
| | 2. If URLs are found, use download_and_process_file to download and analyze the file |
| | 3. Extract the specific information requested (totals, counts, etc.) |
| | 4. For Excel files asking for totals, sum the relevant columns |
| | 5. For PDFs asking for word counts, search the extracted text |
| | |
| | For questions mentioning attached files without URLs: |
| | 1. Use analyze_multimedia_reference to check if file access is needed |
| | 2. Return "Unable to determine" if the file cannot be accessed""" |
| |
|
| | messages = [ |
| | SystemMessage(content=system_prompt), |
| | HumanMessage(content=question) |
| | ] |
| | |
| | try: |
| | |
| | config = { |
| | "recursion_limit": 25, |
| | "configurable": { |
| | "thread_id": "gaia_evaluation" |
| | } |
| | } |
| | |
| | result = self.graph.invoke({"messages": messages}, config) |
| | |
| | |
| | final_answer = self._extract_final_answer(result["messages"]) |
| | return final_answer |
| | |
| | except Exception as e: |
| | return f"Error: {str(e)}" |
| | |
| | def _extract_final_answer(self, messages: List[BaseMessage]) -> str: |
| | """Extract the final answer from the message history.""" |
| | |
| | for message in reversed(messages): |
| | if hasattr(message, "content") and message.content: |
| | content = message.content.strip() |
| | |
| | |
| | if "FINAL ANSWER:" in content: |
| | parts = content.split("FINAL ANSWER:") |
| | if len(parts) >= 2: |
| | answer = parts[-1].strip() |
| | |
| | answer = self._clean_answer(answer) |
| | return answer |
| | |
| | |
| | if isinstance(message, AIMessage): |
| | return self._clean_answer(content) |
| | |
| | return "Unable to determine" |
| | |
| | def _clean_answer(self, answer: str) -> str: |
| | """Clean and format the final answer.""" |
| | |
| | if isinstance(answer, list): |
| | answer = " ".join(str(item) for item in answer) |
| | elif not isinstance(answer, str): |
| | answer = str(answer) |
| | |
| | answer = answer.strip() |
| | |
| | |
| | if len(answer) > 2 and answer[0] == '"' and answer[-1] == '"': |
| | answer = answer[1:-1] |
| | |
| | |
| | prefixes_to_remove = [ |
| | "the answer is", "answer:", "based on", "according to", |
| | "my research shows", "i found that", "the result is", |
| | "after searching", "from the", "it is", "it's", "there are", |
| | "there is", "approximately", "about", "around" |
| | ] |
| | |
| | lower_answer = answer.lower() |
| | for prefix in prefixes_to_remove: |
| | if lower_answer.startswith(prefix): |
| | answer = answer[len(prefix):].strip() |
| | if answer and answer[0] == ':': |
| | answer = answer[1:].strip() |
| | lower_answer = answer.lower() |
| | |
| | |
| | if "unable to" in lower_answer or "cannot" in lower_answer: |
| | return "Unable to determine" |
| | |
| | |
| | if lower_answer in ["yes.", "no.", "yes,", "no,"]: |
| | return answer[:-1] |
| | |
| | |
| | if answer.endswith(".") and " " not in answer: |
| | answer = answer[:-1] |
| | |
| | return answer |
| |
|
| | |
| | class BasicAgent: |
| | def __init__(self): |
| | print("Initializing LangGraph Agent...") |
| | |
| | |
| | api_key = os.getenv("ANTHROPIC_API_KEY") |
| | |
| | if not api_key: |
| | print("Warning: ANTHROPIC_API_KEY not found in environment variables.") |
| | print("Please set it in the Gradio interface or as an environment variable.") |
| | self.agent = None |
| | else: |
| | try: |
| | self.agent = LangGraphAgent(api_key) |
| | print("LangGraph Agent initialized successfully.") |
| | except Exception as e: |
| | print(f"Error initializing LangGraph Agent: {e}") |
| | self.agent = None |
| | |
| | def set_api_key(self, api_key: str): |
| | """Set or update the API key.""" |
| | if api_key: |
| | try: |
| | self.agent = LangGraphAgent(api_key) |
| | return True |
| | except Exception as e: |
| | print(f"Error setting API key: {e}") |
| | return False |
| | return False |
| | |
| | def __call__(self, question: str) -> str: |
| | print(f"\n{'='*60}") |
| | print(f"DEBUG: Agent received question") |
| | print(f"Question type: {type(question)}") |
| | print(f"Question length: {len(question) if isinstance(question, str) else 'N/A'}") |
| | print(f"Question preview: {str(question)[:200]}...") |
| | print(f"{'='*60}\n") |
| | |
| | if not self.agent: |
| | return "Error: Agent not initialized. Please set your ANTHROPIC_API_KEY." |
| | |
| | try: |
| | answer = self.agent.run(question) |
| | print(f"\nDEBUG: Agent generated answer") |
| | print(f"Answer type: {type(answer)}") |
| | print(f"Answer preview: {str(answer)[:200]}...") |
| | return answer |
| | except Exception as e: |
| | error_msg = f"Error processing question: {str(e)}" |
| | print(f"\nDEBUG: Error occurred!") |
| | print(f"Error type: {type(e)}") |
| | print(f"Error details: {str(e)}") |
| | import traceback |
| | print(f"Traceback:\n{traceback.format_exc()}") |
| | return error_msg |
| |
|
| | |
| | global_agent = None |
| |
|
| | def validate_api_keys(anthropic_key: str, serpapi_key: str = None, tavily_key: str = None): |
| | """Validate the API keys before using them.""" |
| | results = [] |
| | |
| | |
| | if anthropic_key: |
| | try: |
| | test_llm = ChatAnthropic( |
| | api_key=anthropic_key, |
| | model="claude-3-5-sonnet-20241022", |
| | max_tokens=10 |
| | ) |
| | |
| | test_llm.invoke([HumanMessage(content="test")]) |
| | results.append("โ
Anthropic API key is valid") |
| | except Exception as e: |
| | error_msg = str(e) |
| | if "401" in error_msg or "authentication" in error_msg.lower(): |
| | results.append("โ Anthropic API key is invalid or expired") |
| | else: |
| | results.append(f"โ Anthropic API error: {error_msg[:100]}...") |
| | else: |
| | results.append("โ No Anthropic API key provided") |
| | |
| | |
| | if tavily_key: |
| | try: |
| | import requests |
| | test_url = "https://api.tavily.com/search" |
| | test_data = { |
| | "api_key": tavily_key, |
| | "query": "test", |
| | "max_results": 1 |
| | } |
| | response = requests.post(test_url, json=test_data, timeout=5) |
| | if response.status_code == 200: |
| | results.append("โ
Tavily API key is valid") |
| | else: |
| | results.append(f"โ Tavily API key error: {response.status_code}") |
| | except Exception as e: |
| | results.append(f"โ ๏ธ Tavily API test error: {str(e)[:100]}...") |
| | else: |
| | results.append("โน๏ธ No Tavily API key provided") |
| | |
| | |
| | if serpapi_key: |
| | try: |
| | params = { |
| | "q": "test", |
| | "api_key": serpapi_key, |
| | "num": 1, |
| | "engine": "google" |
| | } |
| | search = GoogleSearch(params) |
| | search.get_dict() |
| | results.append("โ
SerpAPI key is valid") |
| | except Exception as e: |
| | results.append(f"โ ๏ธ SerpAPI key error: {str(e)[:100]}...") |
| | else: |
| | results.append("โน๏ธ No SerpAPI key provided") |
| | |
| | return "\n".join(results) |
| |
|
| | def initialize_agent_with_key(api_key: str): |
| | """Initialize the global agent with the provided API key.""" |
| | global global_agent |
| | |
| | |
| | validation_result = validate_api_keys(api_key) |
| | if "โ Anthropic API key is invalid" in validation_result: |
| | return validation_result |
| | |
| | if api_key: |
| | if global_agent is None: |
| | global_agent = BasicAgent() |
| | success = global_agent.set_api_key(api_key) |
| | if success: |
| | return f"{validation_result}\n\nโ
Agent initialized successfully!" |
| | else: |
| | return "โ Failed to initialize agent. Please check if your API key is valid." |
| | return "โ Please provide an API key." |
| |
|
| | def run_and_submit_all(api_key: str, profile: gr.OAuthProfile | None): |
| | """ |
| | Fetches all questions, runs the BasicAgent on them, submits all answers, |
| | and displays the results. |
| | """ |
| | global global_agent |
| | |
| | |
| | if global_agent is None or api_key: |
| | init_msg = initialize_agent_with_key(api_key) |
| | print(init_msg) |
| | if "Failed" in init_msg or "Please provide" in init_msg: |
| | return init_msg, None |
| | |
| | |
| | space_id = os.getenv("SPACE_ID") |
| | |
| | if profile: |
| | username = f"{profile.username}" |
| | print(f"User logged in: {username}") |
| | else: |
| | print("User not logged in.") |
| | return "Please Login to Hugging Face with the button.", None |
| | |
| | api_url = DEFAULT_API_URL |
| | questions_url = f"{api_url}/questions" |
| | submit_url = f"{api_url}/submit" |
| | |
| | |
| | agent = global_agent |
| | if not agent: |
| | return "Error: Agent not initialized properly.", None |
| | |
| | agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "local" |
| | print(f"Agent code URL: {agent_code}") |
| | |
| | |
| | print(f"Fetching questions from: {questions_url}") |
| | try: |
| | response = requests.get(questions_url, timeout=15) |
| | response.raise_for_status() |
| | questions_data = response.json() |
| | if not questions_data: |
| | print("Fetched questions list is empty.") |
| | return "Fetched questions list is empty or invalid format.", None |
| | print(f"Fetched {len(questions_data)} questions.") |
| | except Exception as e: |
| | print(f"Error fetching questions: {e}") |
| | return f"Error fetching questions: {e}", None |
| | |
| | |
| | results_log = [] |
| | answers_payload = [] |
| | print(f"Running agent on {len(questions_data)} questions...") |
| | |
| | for i, item in enumerate(questions_data, 1): |
| | task_id = item.get("task_id") |
| | question_text = item.get("question") |
| | |
| | if not task_id or question_text is None: |
| | print(f"Skipping item with missing task_id or question: {item}") |
| | continue |
| | |
| | print(f"\nProcessing question {i}/{len(questions_data)}: {task_id}") |
| | |
| | try: |
| | submitted_answer = agent(question_text) |
| | answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
| | results_log.append({ |
| | "Task ID": task_id, |
| | "Question": question_text[:100] + "..." if len(question_text) > 100 else question_text, |
| | "Submitted Answer": submitted_answer[:200] + "..." if len(submitted_answer) > 200 else submitted_answer |
| | }) |
| | except Exception as e: |
| | print(f"Error running agent on task {task_id}: {e}") |
| | error_answer = f"AGENT ERROR: {e}" |
| | answers_payload.append({"task_id": task_id, "submitted_answer": error_answer}) |
| | results_log.append({ |
| | "Task ID": task_id, |
| | "Question": question_text[:100] + "...", |
| | "Submitted Answer": error_answer |
| | }) |
| | |
| | if not answers_payload: |
| | print("Agent did not produce any answers to submit.") |
| | return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
| | |
| | |
| | submission_data = { |
| | "username": username.strip(), |
| | "agent_code": agent_code, |
| | "answers": answers_payload |
| | } |
| | status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." |
| | print(status_update) |
| | |
| | |
| | print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
| | try: |
| | response = requests.post(submit_url, json=submission_data, timeout=60) |
| | response.raise_for_status() |
| | result_data = response.json() |
| | final_status = ( |
| | f"Submission Successful!\n" |
| | f"User: {result_data.get('username')}\n" |
| | f"Overall Score: {result_data.get('score', 'N/A')}% " |
| | f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
| | f"Message: {result_data.get('message', 'No message received.')}" |
| | ) |
| | print("Submission successful.") |
| | results_df = pd.DataFrame(results_log) |
| | return final_status, results_df |
| | except Exception as e: |
| | status_message = f"Submission Failed: {str(e)}" |
| | print(status_message) |
| | results_df = pd.DataFrame(results_log) |
| | return status_message, results_df |
| |
|
| | |
| | with gr.Blocks() as demo: |
| | gr.Markdown("# LangGraph Agent for GAIA Evaluation") |
| | gr.Markdown( |
| | """ |
| | **This agent uses LangGraph with multiple tools to answer complex questions:** |
| | - ๐ Web Search (Tavily โ DuckDuckGo โ SerpAPI) |
| | - ๐งฎ Calculator for mathematical computations |
| | - ๐ Python code execution |
| | - ๐
Current date/time |
| | - ๐ผ๏ธ Image analysis (description-based) |
| | |
| | **Instructions:** |
| | 1. Enter your Anthropic API key (Claude Sonnet 3.5) |
| | 2. Optionally enter your Tavily API key for best web search (free tier: 1000/month) |
| | 3. Optionally enter your SerpAPI key as backup |
| | 4. Log in to your Hugging Face account |
| | 5. Click 'Run Evaluation & Submit All Answers' |
| | |
| | **Search Priority:** Tavily (if key) โ DuckDuckGo (free) โ SerpAPI (if key) |
| | """ |
| | ) |
| | |
| | with gr.Row(): |
| | with gr.Column(): |
| | gr.LoginButton() |
| | |
| | with gr.Row(): |
| | with gr.Column(): |
| | api_key_input = gr.Textbox( |
| | label="Anthropic API Key (Required)", |
| | placeholder="sk-ant-...", |
| | type="password" |
| | ) |
| | tavily_key_input = gr.Textbox( |
| | label="Tavily API Key (Recommended for web search)", |
| | placeholder="tvly-...", |
| | type="password" |
| | ) |
| | serpapi_key_input = gr.Textbox( |
| | label="SerpAPI Key (Optional backup)", |
| | placeholder="Your SerpAPI key...", |
| | type="password" |
| | ) |
| | |
| | with gr.Row(): |
| | validate_button = gr.Button("Validate API Keys", variant="secondary") |
| | init_button = gr.Button("Initialize Agent", variant="secondary") |
| | run_button = gr.Button("Run Evaluation & Submit All Answers", variant="primary") |
| | |
| | status_output = gr.Textbox(label="Status / Results", lines=8, interactive=False) |
| | results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
| | |
| | |
| | def set_tavily_key(key): |
| | if key: |
| | os.environ["TAVILY_API_KEY"] = key |
| | return "โ
Tavily API key set!" |
| | return "" |
| | |
| | def set_serpapi_key(key): |
| | if key: |
| | os.environ["SERPAPI_KEY"] = key |
| | return "โ
SerpAPI key set!" |
| | return "" |
| | |
| | tavily_key_input.change(set_tavily_key, inputs=[tavily_key_input], outputs=[]) |
| | serpapi_key_input.change(set_serpapi_key, inputs=[serpapi_key_input], outputs=[]) |
| | |
| | |
| | def validate_all_keys(anthropic_key, tavily_key, serpapi_key): |
| | if tavily_key: |
| | os.environ["TAVILY_API_KEY"] = tavily_key |
| | if serpapi_key: |
| | os.environ["SERPAPI_KEY"] = serpapi_key |
| | return validate_api_keys(anthropic_key, serpapi_key, tavily_key) |
| | |
| | validate_button.click( |
| | fn=validate_all_keys, |
| | inputs=[api_key_input, tavily_key_input, serpapi_key_input], |
| | outputs=[status_output] |
| | ) |
| | |
| | init_button.click( |
| | fn=initialize_agent_with_key, |
| | inputs=[api_key_input], |
| | outputs=[status_output] |
| | ) |
| | |
| | run_button.click( |
| | fn=run_and_submit_all, |
| | inputs=[api_key_input], |
| | outputs=[status_output, results_table] |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | print("\n" + "-"*30 + " App Starting " + "-"*30) |
| | print("LangGraph Agent for GAIA Evaluation") |
| | print("Required: ANTHROPIC_API_KEY") |
| | print("Recommended: TAVILY_API_KEY for best web search (1000 free/month)") |
| | print("Optional: SERPAPI_KEY as backup") |
| | print("Fallback: DuckDuckGo search (no API key needed)") |
| | print("-"*74 + "\n") |
| | |
| | demo.launch(debug=True, share=False) |