|
|
|
|
|
|
|
|
""" |
|
|
Database Tools Examples for EvoAgentX |
|
|
|
|
|
This file demonstrates how to use various database toolkits: |
|
|
- MongoDBToolkit: Document database operations |
|
|
- PostgreSQLToolkit: Relational database operations |
|
|
- FaissToolkit: Vector database for semantic search |
|
|
|
|
|
Each toolkit provides comprehensive database management capabilities with automatic |
|
|
storage management and support for complex queries. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import sys |
|
|
import json |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
sys.path.append(str(Path(__file__).parent.parent)) |
|
|
|
|
|
from evoagentx.tools import ( |
|
|
MongoDBToolkit, |
|
|
PostgreSQLToolkit |
|
|
) |
|
|
from evoagentx.tools.database_faiss import FaissToolkit |
|
|
|
|
|
|
|
|
def run_mongodb_examples(): |
|
|
"""Run examples using MongoDBToolkit for document database operations.""" |
|
|
print("\n===== MONGODB TOOLKIT EXAMPLES =====\n") |
|
|
|
|
|
try: |
|
|
|
|
|
toolkit = MongoDBToolkit( |
|
|
name="DemoMongoDBToolkit", |
|
|
database_name="demo_db", |
|
|
auto_save=True |
|
|
) |
|
|
|
|
|
print("β MongoDBToolkit initialized with default storage") |
|
|
|
|
|
|
|
|
execute_tool = toolkit.get_tool("mongodb_execute_query") |
|
|
find_tool = toolkit.get_tool("mongodb_find") |
|
|
update_tool = toolkit.get_tool("mongodb_update") |
|
|
delete_tool = toolkit.get_tool("mongodb_delete") |
|
|
info_tool = toolkit.get_tool("mongodb_info") |
|
|
|
|
|
print(f"β Available tools: {[tool.name for tool in toolkit.get_tools()]}") |
|
|
|
|
|
|
|
|
print("\n1. Inserting product data...") |
|
|
products = [ |
|
|
{"id": "P001", "name": "Laptop", "category": "Electronics", "price": 999.99, "stock": 50, "brand": "TechCorp"}, |
|
|
{"id": "P002", "name": "Wireless Mouse", "category": "Electronics", "price": 29.99, "stock": 100, "brand": "TechCorp"}, |
|
|
{"id": "P003", "name": "Desk Chair", "category": "Furniture", "price": 199.99, "stock": 25, "brand": "ComfortCo"}, |
|
|
{"id": "P004", "name": "Coffee Table", "category": "Furniture", "price": 149.99, "stock": 15, "brand": "ComfortCo"}, |
|
|
{"id": "P005", "name": "Smartphone", "category": "Electronics", "price": 799.99, "stock": 75, "brand": "MobileTech"} |
|
|
] |
|
|
|
|
|
insert_result = execute_tool( |
|
|
query=json.dumps(products), |
|
|
query_type="insert", |
|
|
collection_name="products" |
|
|
) |
|
|
|
|
|
if insert_result.get("success"): |
|
|
print(f"β Successfully inserted {len(products)} products") |
|
|
print(f" Documents inserted: {insert_result.get('data', {}).get('inserted_count', 'Unknown')}") |
|
|
else: |
|
|
print(f"β Insert failed: {insert_result.get('error', 'Unknown error')}") |
|
|
return |
|
|
|
|
|
|
|
|
print("\n2. Finding electronics products...") |
|
|
find_result = find_tool( |
|
|
collection_name="products", |
|
|
filter='{"category": "Electronics"}', |
|
|
sort='{"price": -1}', |
|
|
limit=5 |
|
|
) |
|
|
|
|
|
if find_result.get("success"): |
|
|
electronics = find_result.get("data", []) |
|
|
print(f"β Found {len(electronics)} electronics products:") |
|
|
for product in electronics: |
|
|
name = product.get('name', 'Unknown') |
|
|
price = product.get('price', 0) |
|
|
brand = product.get('brand', 'Unknown') |
|
|
print(f" - {name}: ${price} ({brand})") |
|
|
else: |
|
|
print(f"β Find failed: {find_result.get('error', 'Unknown error')}") |
|
|
|
|
|
|
|
|
print("\n3. Updating product prices (10% discount on electronics)...") |
|
|
update_result = update_tool( |
|
|
collection_name="products", |
|
|
filter='{"category": "Electronics"}', |
|
|
update='{"$mul": {"price": 0.9}}', |
|
|
multi=True |
|
|
) |
|
|
|
|
|
if update_result.get("success"): |
|
|
updated_count = update_result.get("data", {}).get("modified_count", 0) |
|
|
print(f"β Updated {updated_count} electronics products with 10% discount") |
|
|
else: |
|
|
print(f"β Update failed: {update_result.get('error', 'Unknown error')}") |
|
|
|
|
|
|
|
|
print("\n4. Running aggregation query (average price by category)...") |
|
|
aggregation_pipeline = [ |
|
|
{"$group": {"_id": "$category", "avg_price": {"$avg": "$price"}, "total_stock": {"$sum": "$stock"}}}, |
|
|
{"$sort": {"avg_price": -1}} |
|
|
] |
|
|
|
|
|
agg_result = execute_tool( |
|
|
query=json.dumps(aggregation_pipeline), |
|
|
query_type="aggregate", |
|
|
collection_name="products" |
|
|
) |
|
|
|
|
|
if agg_result.get("success"): |
|
|
categories = agg_result.get("data", []) |
|
|
print(f"β Category analysis:") |
|
|
for category in categories: |
|
|
cat_name = category.get('_id', 'Unknown') |
|
|
avg_price = category.get('avg_price', 0) |
|
|
total_stock = category.get('total_stock', 0) |
|
|
print(f" - {cat_name}: Avg price ${avg_price:.2f}, Total stock: {total_stock}") |
|
|
else: |
|
|
print(f"β Aggregation failed: {agg_result.get('error', 'Unknown error')}") |
|
|
|
|
|
|
|
|
print("\n5. Testing delete functionality...") |
|
|
delete_result = delete_tool( |
|
|
collection_name="products", |
|
|
filter='{"category": "Furniture"}', |
|
|
multi=True |
|
|
) |
|
|
|
|
|
if delete_result.get("success"): |
|
|
deleted_count = delete_result.get("data", {}).get("deleted_count", 0) |
|
|
print(f"β Deleted {deleted_count} furniture products") |
|
|
else: |
|
|
print(f"β Delete failed: {delete_result.get('error', 'Unknown error')}") |
|
|
|
|
|
|
|
|
print("\n6. Getting database information...") |
|
|
info_result = info_tool() |
|
|
|
|
|
if info_result.get("success"): |
|
|
info = info_result.get("data", {}) |
|
|
print(f"β Database info:") |
|
|
print(f" - Database: {info.get('database_name', 'Unknown')}") |
|
|
|
|
|
|
|
|
collections = info.get('collections', []) |
|
|
if isinstance(collections, (list, tuple)) and collections: |
|
|
print(f" - Collections: {', '.join(collections)}") |
|
|
elif collections: |
|
|
print(f" - Collections: {collections}") |
|
|
else: |
|
|
print(" - Collections: None") |
|
|
|
|
|
print(f" - Total documents: {info.get('total_documents', 'Unknown')}") |
|
|
else: |
|
|
print(f"β Info failed: {info_result.get('error', 'Unknown error')}") |
|
|
|
|
|
print("\nβ MongoDB examples completed successfully!") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error running MongoDB examples: {str(e)}") |
|
|
|
|
|
def run_postgresql_examples(): |
|
|
"""Powerful example using PostgreSQLToolkit for database operations.""" |
|
|
print("\n===== POSTGRESQL TOOL EXAMPLE =====\n") |
|
|
|
|
|
try: |
|
|
|
|
|
toolkit = PostgreSQLToolkit( |
|
|
name="DemoPostgreSQLToolkit", |
|
|
database_name="demo_db", |
|
|
auto_save=True |
|
|
) |
|
|
|
|
|
print("β PostgreSQLToolkit initialized with default storage") |
|
|
|
|
|
|
|
|
execute_tool = toolkit.get_tool("postgresql_execute") |
|
|
find_tool = toolkit.get_tool("postgresql_find") |
|
|
create_tool = toolkit.get_tool("postgresql_create") |
|
|
delete_tool = toolkit.get_tool("postgresql_delete") |
|
|
|
|
|
|
|
|
create_sql = """ |
|
|
CREATE TABLE IF NOT EXISTS users ( |
|
|
id SERIAL PRIMARY KEY, |
|
|
name VARCHAR(100) NOT NULL, |
|
|
email VARCHAR(100) UNIQUE NOT NULL, |
|
|
age INTEGER, |
|
|
department VARCHAR(50) |
|
|
); |
|
|
""" |
|
|
|
|
|
result = create_tool(create_sql) |
|
|
if result["success"]: |
|
|
print("β Created users table") |
|
|
|
|
|
|
|
|
insert_sql = """ |
|
|
INSERT INTO users (name, email, age, department) VALUES |
|
|
('Alice Johnson', 'alice@example.com', 28, 'Engineering'), |
|
|
('Bob Smith', 'bob@example.com', 32, 'Marketing'), |
|
|
('Carol Davis', 'carol@example.com', 25, 'Engineering') |
|
|
""" |
|
|
|
|
|
result = execute_tool(insert_sql) |
|
|
if result["success"]: |
|
|
print("β Inserted users") |
|
|
|
|
|
|
|
|
find_result = find_tool( |
|
|
"users", |
|
|
where="department = 'Engineering'", |
|
|
columns="name, age", |
|
|
sort="age ASC" |
|
|
) |
|
|
|
|
|
if find_result["success"]: |
|
|
engineers = find_result["data"]["data"] |
|
|
print(f"β Found {len(engineers)} engineers:") |
|
|
for user in engineers: |
|
|
|
|
|
name = user.get('name', 'Unknown') |
|
|
age = user.get('age', 'N/A') |
|
|
print(f" - {name} (age: {age})") |
|
|
|
|
|
|
|
|
print("\nποΈ Testing delete functionality...") |
|
|
delete_result = delete_tool( |
|
|
"users", |
|
|
"department = 'Marketing'" |
|
|
) |
|
|
|
|
|
if delete_result["success"]: |
|
|
deleted_count = delete_result["data"].get("rowcount", 0) |
|
|
print(f"β Deleted {deleted_count} marketing users") |
|
|
|
|
|
|
|
|
verify_result = find_tool("users") |
|
|
if verify_result["success"]: |
|
|
remaining = verify_result["data"] |
|
|
print(f"β Remaining users after deletion: {len(remaining)}") |
|
|
|
|
|
print("\nβ PostgreSQLToolkit test completed with default storage") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error: {str(e)}") |
|
|
|
|
|
def run_faiss_examples(): |
|
|
"""Run examples using FaissToolkit for vector database operations.""" |
|
|
print("\n===== FAISS TOOLKIT EXAMPLES =====\n") |
|
|
|
|
|
|
|
|
if not os.getenv("OPENAI_API_KEY"): |
|
|
print("β OPENAI_API_KEY not found in environment variables") |
|
|
print("To test FAISS examples, set your OpenAI API key:") |
|
|
print("export OPENAI_API_KEY='your-openai-api-key-here'") |
|
|
print("Get your key from: https://platform.openai.com/api-keys") |
|
|
return |
|
|
|
|
|
try: |
|
|
|
|
|
toolkit = FaissToolkit( |
|
|
name="DemoFaissToolkit", |
|
|
default_corpus_id="demo_corpus" |
|
|
) |
|
|
|
|
|
print("β FaissToolkit initialized with default storage") |
|
|
print(f"β Using OpenAI API key: {os.getenv('OPENAI_API_KEY')[:8]}...") |
|
|
|
|
|
|
|
|
insert_tool = toolkit.get_tool("faiss_insert") |
|
|
query_tool = toolkit.get_tool("faiss_query") |
|
|
list_tool = toolkit.get_tool("faiss_list") |
|
|
stats_tool = toolkit.get_tool("faiss_stats") |
|
|
delete_tool = toolkit.get_tool("faiss_delete") |
|
|
|
|
|
print(f"β Available tools: {[tool.name for tool in toolkit.get_tools()]}") |
|
|
|
|
|
|
|
|
print("\n1. Inserting AI knowledge documents...") |
|
|
ai_documents = [ |
|
|
"Artificial Intelligence (AI) is a branch of computer science that aims to create intelligent machines capable of performing tasks that typically require human intelligence.", |
|
|
"Machine learning is a subset of artificial intelligence that enables computers to learn and improve from experience without being explicitly programmed.", |
|
|
"Deep learning is a specialized form of machine learning that uses neural networks with multiple layers to analyze and learn from data.", |
|
|
"Natural Language Processing (NLP) helps computers understand, interpret, and generate human language in a useful way.", |
|
|
"Computer vision enables machines to interpret and understand visual information from the world, including images and videos.", |
|
|
"Reinforcement learning is a type of machine learning where an agent learns to make decisions by taking actions in an environment to achieve maximum cumulative reward.", |
|
|
"Neural networks are computing systems inspired by biological neural networks, consisting of interconnected nodes that process information.", |
|
|
"Transfer learning allows a model trained on one task to be adapted for a related task, improving efficiency and performance.", |
|
|
"Generative AI models can create new content, such as text, images, music, and code, based on patterns learned from training data.", |
|
|
"Explainable AI focuses on making AI systems' decisions and processes transparent and understandable to humans." |
|
|
] |
|
|
|
|
|
insert_result = insert_tool( |
|
|
documents=ai_documents, |
|
|
metadata={ |
|
|
"source": "ai_knowledge_base", |
|
|
"topic": "artificial_intelligence", |
|
|
"language": "en", |
|
|
"difficulty": "intermediate" |
|
|
} |
|
|
) |
|
|
|
|
|
if insert_result.get("success"): |
|
|
docs_inserted = insert_result.get("data", {}).get("documents_inserted", 0) |
|
|
chunks_created = insert_result.get("data", {}).get("chunks_created", 0) |
|
|
print(f"β Successfully inserted {docs_inserted} documents") |
|
|
print(f" Chunks created: {chunks_created}") |
|
|
else: |
|
|
print(f"β Insert failed: {insert_result.get('error', 'Unknown error')}") |
|
|
return |
|
|
|
|
|
|
|
|
print("\n2. Performing semantic search queries...") |
|
|
|
|
|
search_queries = [ |
|
|
"How do machines learn?", |
|
|
"What is neural network?", |
|
|
"Explain deep learning", |
|
|
"How does AI generate content?", |
|
|
"What is computer vision?" |
|
|
] |
|
|
|
|
|
for i, query in enumerate(search_queries, 1): |
|
|
print(f"\n Query {i}: '{query}'") |
|
|
search_result = query_tool( |
|
|
query=query, |
|
|
top_k=3, |
|
|
similarity_threshold=0.1 |
|
|
) |
|
|
|
|
|
if search_result.get("success"): |
|
|
results = search_result.get("data", {}).get("results", []) |
|
|
print(f" β Found {len(results)} relevant results:") |
|
|
for j, result in enumerate(results, 1): |
|
|
score = result.get('score', 0) |
|
|
content = result.get('content', '')[:80] |
|
|
print(f" {j}. Score: {score:.3f} - {content}...") |
|
|
else: |
|
|
print(f" β Search failed: {search_result.get('error', 'Unknown error')}") |
|
|
|
|
|
|
|
|
print("\n3. Searching with metadata filters...") |
|
|
filtered_search_result = query_tool( |
|
|
query="machine learning algorithms", |
|
|
top_k=5, |
|
|
similarity_threshold=0.1, |
|
|
metadata_filters={"topic": "artificial_intelligence", "difficulty": "intermediate"} |
|
|
) |
|
|
|
|
|
if filtered_search_result.get("success"): |
|
|
results = filtered_search_result.get("data", {}).get("results", []) |
|
|
print(f"β Found {len(results)} results with metadata filters:") |
|
|
for i, result in enumerate(results, 1): |
|
|
score = result.get('score', 0) |
|
|
content = result.get('content', '')[:100] |
|
|
metadata = result.get('metadata', {}) |
|
|
print(f" {i}. Score: {score:.3f} - {content}...") |
|
|
print(f" Metadata: {metadata}") |
|
|
else: |
|
|
print(f"β Filtered search failed: {filtered_search_result.get('error', 'Unknown error')}") |
|
|
|
|
|
|
|
|
print("\n4. Getting database statistics...") |
|
|
stats_result = stats_tool() |
|
|
|
|
|
if stats_result.get("success"): |
|
|
stats = stats_result.get("data", {}) |
|
|
print(f"β Database statistics:") |
|
|
print(f" - Total corpora: {stats.get('total_corpora', 'Unknown')}") |
|
|
print(f" - Corpora: {', '.join(stats.get('corpora', []))}") |
|
|
print(f" - Embedding model: {stats.get('embedding_model', 'Unknown')}") |
|
|
print(f" - Vector store type: {stats.get('vector_store_type', 'Unknown')}") |
|
|
else: |
|
|
print(f"β Stats failed: {stats_result.get('error', 'Unknown error')}") |
|
|
|
|
|
|
|
|
print("\n5. Listing all corpora...") |
|
|
list_result = list_tool() |
|
|
|
|
|
if list_result.get("success"): |
|
|
corpora = list_result.get("data", {}).get("corpora", []) |
|
|
print(f"β Found {len(corpora)} corpora:") |
|
|
for corpus in corpora: |
|
|
corpus_id = corpus.get('corpus_id', 'Unknown') |
|
|
doc_count = corpus.get('document_count', 'Unknown') |
|
|
chunk_count = corpus.get('chunk_count', 'Unknown') |
|
|
print(f" - {corpus_id}: {doc_count} documents, {chunk_count} chunks") |
|
|
else: |
|
|
print(f"β List failed: {list_result.get('error', 'Unknown error')}") |
|
|
|
|
|
|
|
|
print("\n6. Testing delete functionality...") |
|
|
delete_result = delete_tool( |
|
|
metadata_filters={"source": "ai_knowledge_base"} |
|
|
) |
|
|
|
|
|
if delete_result.get("success"): |
|
|
deleted_count = delete_result.get("data", {}).get("deleted_count", 0) |
|
|
print(f"β Deleted {deleted_count} documents with metadata filter") |
|
|
|
|
|
|
|
|
verify_result = query_tool( |
|
|
query="artificial intelligence", |
|
|
top_k=5, |
|
|
similarity_threshold=0.1 |
|
|
) |
|
|
|
|
|
if verify_result.get("success"): |
|
|
remaining = verify_result.get('data', {}).get('total_results', 0) |
|
|
print(f"β Remaining documents after deletion: {remaining}") |
|
|
else: |
|
|
print(f"β Delete failed: {delete_result.get('error', 'Unknown error')}") |
|
|
|
|
|
print("\nβ FAISS examples completed successfully!") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error running FAISS examples: {str(e)}") |
|
|
if "DocumentMetadata" in str(e): |
|
|
print("Note: This appears to be a dependency issue with the RAG engine components") |
|
|
print("The FAISS toolkit may need additional setup or dependencies") |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main function to run all database tool examples.""" |
|
|
print("===== DATABASE TOOLS EXAMPLES =====\n") |
|
|
|
|
|
|
|
|
run_mongodb_examples() |
|
|
|
|
|
|
|
|
run_postgresql_examples() |
|
|
|
|
|
|
|
|
run_faiss_examples() |
|
|
|
|
|
print("\n===== ALL DATABASE EXAMPLES COMPLETED =====") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|