Spaces:
Running
Running
| import uvicorn | |
| from typing import Annotated | |
| from dotenv import load_dotenv | |
| from cuga.backend.memory.agentic_memory.backend.mem0_backend import Mem0MemoryBackend | |
| from cuga.backend.memory.agentic_memory.config import get_config | |
| from cuga.backend.memory.agentic_memory.utils.logging import Logging | |
| from cuga.backend.memory.agentic_memory.schema import Fact, Message, RecordedFact, Run, Namespace | |
| from fastapi import APIRouter, FastAPI, HTTPException, Path, Body, Query, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from mem0 import Memory | |
| from mem0.configs.base import MemoryConfig | |
| from pydantic.json_schema import SkipJsonSchema | |
| load_dotenv(override=True) | |
| memory = Memory(config=MemoryConfig.model_validate(get_config())) | |
| logger = Logging.get_logger() | |
| app = FastAPI(debug=True) | |
| app.openapi_version = '3.0.3' | |
| # Add CORS middleware to handle preflight OPTIONS requests | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # In production, replace with specific origins | |
| allow_credentials=True, | |
| allow_methods=["*"], # Allow all HTTP methods including OPTIONS | |
| allow_headers=["*"], # Allow all headers | |
| ) | |
| router_v1 = APIRouter(prefix="/v1") | |
| memory_backend = Mem0MemoryBackend() | |
| async def ready(): | |
| """Attempt to query if the memory service is available""" | |
| if memory_backend.ready(): | |
| return {"status": "ok"} | |
| raise HTTPException(status_code=503, detail="Memory service is unavailable") | |
| def create_namespace( | |
| namespace_id: Annotated[ | |
| str | SkipJsonSchema[None], Body(description='The namespace ID to create', pattern=r'^[a-zA-Z0-9_]+$') | |
| ] = None, | |
| user_id: Annotated[ | |
| str | SkipJsonSchema[None], | |
| Body(description='The user that created the namespace.', pattern=r'^[a-zA-Z0-9_]+$'), | |
| ] = None, | |
| agent_id: Annotated[ | |
| str | SkipJsonSchema[None], | |
| Body(description='The agent associated with the namespace.', pattern=r'^[a-zA-Z0-9_]+$'), | |
| ] = None, | |
| app_id: Annotated[ | |
| str | SkipJsonSchema[None], | |
| Body(description='The application associated with the namespace.', pattern=r'^[a-zA-Z0-9_]+$'), | |
| ] = None, | |
| ) -> Namespace: | |
| """Create a new namespace for facts to exist in.""" | |
| try: | |
| namespace = memory_backend.create_namespace(namespace_id, user_id, agent_id, app_id) | |
| return namespace | |
| except RuntimeError: | |
| raise HTTPException(status_code=409, detail=f'Namespace `{namespace_id}` already exists.') | |
| def get_namespace_details( | |
| namespace_id: Annotated[ | |
| str, Path(description='The namespace which contains facts relevant to the user.') | |
| ], | |
| ) -> Namespace: | |
| """Get the details of a specific namespace.""" | |
| try: | |
| namespace = memory_backend.get_namespace_details(namespace_id) | |
| except LookupError: | |
| raise HTTPException(status_code=404, detail=f"Namespace {namespace_id} not found.") | |
| return namespace | |
| def search_namespaces( | |
| user_id: Annotated[ | |
| str | SkipJsonSchema[None], Query(description='The user that created the namespace.') | |
| ] = None, | |
| agent_id: Annotated[ | |
| str | SkipJsonSchema[None], Query(description='The agent associated with the namespace.') | |
| ] = None, | |
| app_id: Annotated[ | |
| str | SkipJsonSchema[None], Query(description='The application associated with the namespace.') | |
| ] = None, | |
| limit: Annotated[int, Query(description='The number of results to return.')] = 10, | |
| ) -> list[Namespace]: | |
| """Find namespaces matching the filters.""" | |
| if user_id is None and agent_id is None and app_id is None: | |
| return memory_backend.all_namespaces() | |
| else: | |
| return list(memory_backend.search_namespaces(user_id, agent_id, app_id, limit)) | |
| def delete_namespace( | |
| namespace_id: Annotated[ | |
| str, Path(description='The namespace which contains facts relevant to the user.') | |
| ], | |
| ): | |
| """Delete a namespace that facts exist in.""" | |
| memory_backend.delete_namespace(namespace_id=namespace_id) | |
| def create_and_store_fact( | |
| namespace_id: Annotated[ | |
| str, Path(description='The namespace which contains facts relevant to the user.') | |
| ], | |
| fact: Annotated[ | |
| Fact, | |
| Body( | |
| description='A fact about the user, their personal preferences, upcoming plans, ' | |
| 'professional details, and other miscellaneous information.' | |
| ), | |
| ], | |
| ) -> str: | |
| """Based on something the user previously said, create and store in memory a new fact about the user, | |
| their personal preferences, upcoming plans, professional details, and other miscellaneous information. | |
| """ | |
| return memory_backend.create_and_store_fact(namespace_id=namespace_id, fact=fact) | |
| def search_for_facts( | |
| namespace_id: Annotated[ | |
| str, Path(description='The namespace which contains facts relevant to the user.') | |
| ], | |
| query: Annotated[ | |
| str | SkipJsonSchema[None], | |
| Body(description='A question written in natural language about the user that needs an answer.'), | |
| ] = None, | |
| filters: Annotated[ | |
| dict | SkipJsonSchema[None], Body(description='A list of facts relevant to the user.') | |
| ] = None, | |
| limit: Annotated[int, Query(description='The maximum number of facts to return.')] = 10, | |
| ) -> list[RecordedFact]: | |
| """Based on a query, find in memory a fact about the user, their personal preferences, upcoming plans, | |
| professional details, and other miscellaneous information. | |
| """ | |
| return memory_backend.search_for_facts( | |
| namespace_id=namespace_id, query=query, filters=filters, limit=limit | |
| ) | |
| def delete_fact_by_id( | |
| namespace_id: Annotated[ | |
| str, Path(description='The namespace which contains facts relevant to the user.') | |
| ], | |
| fact_id: Annotated[str, Path(description='The ID of the fact to delete.')], | |
| ): | |
| """Remove a fact from memory by its ID.""" | |
| memory_backend.delete_fact_by_id(namespace_id=namespace_id, fact_id=fact_id) | |
| def extract_facts_from_messages( | |
| namespace_id: Annotated[ | |
| str, Path(description='The namespace which contains facts relevant to the user.') | |
| ], | |
| messages: Annotated[ | |
| list[Message], Body(description='A list of messages between a user and a chatbot.', embed=True) | |
| ], | |
| ) -> str: | |
| """Takes a list of messages between a user and a chatbot, extracting and storing facts about the user, | |
| their personal preferences, upcoming plans, professional details, and other miscellaneous information. | |
| """ | |
| memory_backend.extract_facts_from_messages(namespace_id=namespace_id, messages=messages) | |
| return f'{len(messages)} messages received for namespace {namespace_id}' | |
| def create_run( | |
| namespace_id: Annotated[ | |
| str, Path(description='The namespace which contains facts relevant to the user.') | |
| ], | |
| run_id: Annotated[ | |
| str | SkipJsonSchema[None], Body(description='Optional ID to create the run with.', embed=True) | |
| ], | |
| ) -> Run: | |
| """Add a new run into memory. Runs are a series of steps executed in an agentic workflow.""" | |
| return memory_backend.create_run(namespace_id=namespace_id, run_id=run_id) | |
| def get_run( | |
| namespace_id: Annotated[ | |
| str, Path(description='The namespace which contains facts relevant to the user.') | |
| ], | |
| run_id: Annotated[str, Path(description='The run which contains the steps for an agentic workflow.')], | |
| ) -> Run: | |
| """Get a run.""" | |
| return memory_backend.get_run(namespace_id=namespace_id, run_id=run_id) | |
| def delete_run( | |
| namespace_id: Annotated[ | |
| str, Path(description='The namespace which contains facts relevant to the user.') | |
| ], | |
| run_id: Annotated[str, Path(description='The run which contains the steps for an agentic workflow.')], | |
| ): | |
| """Delete a run from memory.""" | |
| return memory_backend.delete_run(namespace_id=namespace_id, run_id=run_id) | |
| def add_step( | |
| namespace_id: Annotated[ | |
| str, Path(description='The namespace which contains facts relevant to the user.') | |
| ], | |
| run_id: Annotated[str, Path(description='The run which contains the steps for an agentic workflow.')], | |
| step: Annotated[dict, Body(description='The step, an arbitrary JSON object.')], | |
| prompt: Annotated[str, Body(description='The prompt used by an LLM to parse a step.')], | |
| ) -> str: | |
| """Add a new step into a run.""" | |
| return memory_backend.add_step(namespace_id, run_id, step, prompt) | |
| def search_runs( | |
| namespace_id: Annotated[ | |
| str, Path(description='The namespace which contains facts relevant to the user.') | |
| ], | |
| query: Annotated[ | |
| str | SkipJsonSchema[None], | |
| Body(description='A question written in natural language about the user that needs an answer.'), | |
| ] = None, | |
| filters: Annotated[ | |
| dict[str, str] | SkipJsonSchema[None], | |
| Body(description='A set of filters to apply against the metadata of each step.'), | |
| ] = None, | |
| ) -> Run: | |
| return memory_backend.search_runs(namespace_id=namespace_id, query=query, filters=filters) | |
| def end_run( | |
| namespace_id: Annotated[ | |
| str, Path(description='The namespace which contains facts relevant to the user.') | |
| ], | |
| run_id: Annotated[str, Path(description='The run which contains the steps for an agentic workflow.')], | |
| background_tasks: BackgroundTasks, | |
| ) -> None: | |
| """End a run and execute any background tasks.""" | |
| memory_backend.end_run(namespace_id, run_id) | |
| background_tasks.add_task(memory_backend.analyze_run, namespace_id=namespace_id, run_id=run_id) | |
| app.include_router(router_v1) | |
| def main(): | |
| """Main entry point for the agentic-memory server.""" | |
| # Set debug logging if FastAPI debug mode is enabled | |
| log_level = "debug" if app.debug else "info" | |
| uvicorn.run(app, host='0.0.0.0', port=8888, log_level=log_level) | |
| if __name__ == '__main__': | |
| main() | |