Sami Marreed
feat: docker-v1 with optimized frontend
0646b18
import uvicorn
from typing import Annotated
from dotenv import load_dotenv
from cuga.backend.memory.agentic_memory.backend.mem0_backend import Mem0MemoryBackend
from cuga.backend.memory.agentic_memory.config import get_config
from cuga.backend.memory.agentic_memory.utils.logging import Logging
from cuga.backend.memory.agentic_memory.schema import Fact, Message, RecordedFact, Run, Namespace
from fastapi import APIRouter, FastAPI, HTTPException, Path, Body, Query, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from mem0 import Memory
from mem0.configs.base import MemoryConfig
from pydantic.json_schema import SkipJsonSchema
load_dotenv(override=True)
memory = Memory(config=MemoryConfig.model_validate(get_config()))
logger = Logging.get_logger()
app = FastAPI(debug=True)
app.openapi_version = '3.0.3'
# Add CORS middleware to handle preflight OPTIONS requests
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, replace with specific origins
allow_credentials=True,
allow_methods=["*"], # Allow all HTTP methods including OPTIONS
allow_headers=["*"], # Allow all headers
)
router_v1 = APIRouter(prefix="/v1")
memory_backend = Mem0MemoryBackend()
@router_v1.get("/health/live")
@router_v1.get("/health/ready")
async def ready():
"""Attempt to query if the memory service is available"""
if memory_backend.ready():
return {"status": "ok"}
raise HTTPException(status_code=503, detail="Memory service is unavailable")
@router_v1.post("/namespaces", status_code=201, response_description='The ID of the created namespace.')
def create_namespace(
namespace_id: Annotated[
str | SkipJsonSchema[None], Body(description='The namespace ID to create', pattern=r'^[a-zA-Z0-9_]+$')
] = None,
user_id: Annotated[
str | SkipJsonSchema[None],
Body(description='The user that created the namespace.', pattern=r'^[a-zA-Z0-9_]+$'),
] = None,
agent_id: Annotated[
str | SkipJsonSchema[None],
Body(description='The agent associated with the namespace.', pattern=r'^[a-zA-Z0-9_]+$'),
] = None,
app_id: Annotated[
str | SkipJsonSchema[None],
Body(description='The application associated with the namespace.', pattern=r'^[a-zA-Z0-9_]+$'),
] = None,
) -> Namespace:
"""Create a new namespace for facts to exist in."""
try:
namespace = memory_backend.create_namespace(namespace_id, user_id, agent_id, app_id)
return namespace
except RuntimeError:
raise HTTPException(status_code=409, detail=f'Namespace `{namespace_id}` already exists.')
@router_v1.get("/namespaces/{namespace_id}", response_description='The ID of the created namespace.')
def get_namespace_details(
namespace_id: Annotated[
str, Path(description='The namespace which contains facts relevant to the user.')
],
) -> Namespace:
"""Get the details of a specific namespace."""
try:
namespace = memory_backend.get_namespace_details(namespace_id)
except LookupError:
raise HTTPException(status_code=404, detail=f"Namespace {namespace_id} not found.")
return namespace
@router_v1.get("/namespaces", response_description='A list of namespaces matching the filters.')
def search_namespaces(
user_id: Annotated[
str | SkipJsonSchema[None], Query(description='The user that created the namespace.')
] = None,
agent_id: Annotated[
str | SkipJsonSchema[None], Query(description='The agent associated with the namespace.')
] = None,
app_id: Annotated[
str | SkipJsonSchema[None], Query(description='The application associated with the namespace.')
] = None,
limit: Annotated[int, Query(description='The number of results to return.')] = 10,
) -> list[Namespace]:
"""Find namespaces matching the filters."""
if user_id is None and agent_id is None and app_id is None:
return memory_backend.all_namespaces()
else:
return list(memory_backend.search_namespaces(user_id, agent_id, app_id, limit))
@router_v1.delete("/namespaces/{namespace_id}", status_code=204)
def delete_namespace(
namespace_id: Annotated[
str, Path(description='The namespace which contains facts relevant to the user.')
],
):
"""Delete a namespace that facts exist in."""
memory_backend.delete_namespace(namespace_id=namespace_id)
@router_v1.put(
"/namespaces/{namespace_id}/facts", status_code=201, response_description='The ID of the created fact.'
)
def create_and_store_fact(
namespace_id: Annotated[
str, Path(description='The namespace which contains facts relevant to the user.')
],
fact: Annotated[
Fact,
Body(
description='A fact about the user, their personal preferences, upcoming plans, '
'professional details, and other miscellaneous information.'
),
],
) -> str:
"""Based on something the user previously said, create and store in memory a new fact about the user,
their personal preferences, upcoming plans, professional details, and other miscellaneous information.
"""
return memory_backend.create_and_store_fact(namespace_id=namespace_id, fact=fact)
@router_v1.post(
"/namespaces/{namespace_id}/facts",
response_description='A list of facts that may be relevant to the user.',
)
def search_for_facts(
namespace_id: Annotated[
str, Path(description='The namespace which contains facts relevant to the user.')
],
query: Annotated[
str | SkipJsonSchema[None],
Body(description='A question written in natural language about the user that needs an answer.'),
] = None,
filters: Annotated[
dict | SkipJsonSchema[None], Body(description='A list of facts relevant to the user.')
] = None,
limit: Annotated[int, Query(description='The maximum number of facts to return.')] = 10,
) -> list[RecordedFact]:
"""Based on a query, find in memory a fact about the user, their personal preferences, upcoming plans,
professional details, and other miscellaneous information.
"""
return memory_backend.search_for_facts(
namespace_id=namespace_id, query=query, filters=filters, limit=limit
)
@router_v1.delete("/namespaces/{namespace_id}/facts/{fact_id}", status_code=204)
def delete_fact_by_id(
namespace_id: Annotated[
str, Path(description='The namespace which contains facts relevant to the user.')
],
fact_id: Annotated[str, Path(description='The ID of the fact to delete.')],
):
"""Remove a fact from memory by its ID."""
memory_backend.delete_fact_by_id(namespace_id=namespace_id, fact_id=fact_id)
@router_v1.post(
"/namespaces/{namespace_id}/messages",
response_description='The number of messages received for extraction.',
)
def extract_facts_from_messages(
namespace_id: Annotated[
str, Path(description='The namespace which contains facts relevant to the user.')
],
messages: Annotated[
list[Message], Body(description='A list of messages between a user and a chatbot.', embed=True)
],
) -> str:
"""Takes a list of messages between a user and a chatbot, extracting and storing facts about the user,
their personal preferences, upcoming plans, professional details, and other miscellaneous information.
"""
memory_backend.extract_facts_from_messages(namespace_id=namespace_id, messages=messages)
return f'{len(messages)} messages received for namespace {namespace_id}'
@router_v1.post(
"/namespaces/{namespace_id}/runs",
)
def create_run(
namespace_id: Annotated[
str, Path(description='The namespace which contains facts relevant to the user.')
],
run_id: Annotated[
str | SkipJsonSchema[None], Body(description='Optional ID to create the run with.', embed=True)
],
) -> Run:
"""Add a new run into memory. Runs are a series of steps executed in an agentic workflow."""
return memory_backend.create_run(namespace_id=namespace_id, run_id=run_id)
@router_v1.get(
"/namespaces/{namespace_id}/runs/{run_id}",
)
def get_run(
namespace_id: Annotated[
str, Path(description='The namespace which contains facts relevant to the user.')
],
run_id: Annotated[str, Path(description='The run which contains the steps for an agentic workflow.')],
) -> Run:
"""Get a run."""
return memory_backend.get_run(namespace_id=namespace_id, run_id=run_id)
@router_v1.delete("/namespaces/{namespace_id}/runs/{run_id}", status_code=204)
def delete_run(
namespace_id: Annotated[
str, Path(description='The namespace which contains facts relevant to the user.')
],
run_id: Annotated[str, Path(description='The run which contains the steps for an agentic workflow.')],
):
"""Delete a run from memory."""
return memory_backend.delete_run(namespace_id=namespace_id, run_id=run_id)
@router_v1.post("/namespaces/{namespace_id}/runs/{run_id}/steps", response_description='The number of runs.')
def add_step(
namespace_id: Annotated[
str, Path(description='The namespace which contains facts relevant to the user.')
],
run_id: Annotated[str, Path(description='The run which contains the steps for an agentic workflow.')],
step: Annotated[dict, Body(description='The step, an arbitrary JSON object.')],
prompt: Annotated[str, Body(description='The prompt used by an LLM to parse a step.')],
) -> str:
"""Add a new step into a run."""
return memory_backend.add_step(namespace_id, run_id, step, prompt)
@router_v1.post("/namespaces/{namespace_id}/runs/search")
def search_runs(
namespace_id: Annotated[
str, Path(description='The namespace which contains facts relevant to the user.')
],
query: Annotated[
str | SkipJsonSchema[None],
Body(description='A question written in natural language about the user that needs an answer.'),
] = None,
filters: Annotated[
dict[str, str] | SkipJsonSchema[None],
Body(description='A set of filters to apply against the metadata of each step.'),
] = None,
) -> Run:
return memory_backend.search_runs(namespace_id=namespace_id, query=query, filters=filters)
@router_v1.post("/namespaces/{namespace_id}/runs/{run_id}/end")
def end_run(
namespace_id: Annotated[
str, Path(description='The namespace which contains facts relevant to the user.')
],
run_id: Annotated[str, Path(description='The run which contains the steps for an agentic workflow.')],
background_tasks: BackgroundTasks,
) -> None:
"""End a run and execute any background tasks."""
memory_backend.end_run(namespace_id, run_id)
background_tasks.add_task(memory_backend.analyze_run, namespace_id=namespace_id, run_id=run_id)
app.include_router(router_v1)
def main():
"""Main entry point for the agentic-memory server."""
# Set debug logging if FastAPI debug mode is enabled
log_level = "debug" if app.debug else "info"
uvicorn.run(app, host='0.0.0.0', port=8888, log_level=log_level)
if __name__ == '__main__':
main()