|
|
""" |
|
|
MongoDB database connection and logging utilities, including admin media click logging. |
|
|
""" |
|
|
import os |
|
|
import logging |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Optional, Dict, Any, Union, List |
|
|
from bson import ObjectId |
|
|
from pymongo import MongoClient |
|
|
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
_client: Optional[MongoClient] = None |
|
|
_db = None |
|
|
|
|
|
|
|
|
_admin_client: Optional[MongoClient] = None |
|
|
_admin_db = None |
|
|
|
|
|
def get_mongodb_client() -> Optional[MongoClient]: |
|
|
"""Get or create MongoDB client""" |
|
|
global _client |
|
|
if _client is None: |
|
|
mongodb_uri = os.getenv("MONGODB_URI") |
|
|
if not mongodb_uri: |
|
|
logger.warning("MONGODB_URI environment variable not set. MongoDB features will be disabled.") |
|
|
return None |
|
|
try: |
|
|
_client = MongoClient( |
|
|
mongodb_uri, |
|
|
serverSelectionTimeoutMS=5000, |
|
|
connectTimeoutMS=5000 |
|
|
) |
|
|
|
|
|
_client.admin.command('ping') |
|
|
logger.info("MongoDB connection established successfully") |
|
|
except (ConnectionFailure, ServerSelectionTimeoutError) as e: |
|
|
logger.error("Failed to connect to MongoDB: %s", str(e)) |
|
|
_client = None |
|
|
return _client |
|
|
|
|
|
def get_database(): |
|
|
"""Get database instance""" |
|
|
global _db |
|
|
if _db is None: |
|
|
client = get_mongodb_client() |
|
|
if client: |
|
|
db_name = os.getenv("MONGODB_DB_NAME", "colorization_db") |
|
|
_db = client[db_name] |
|
|
else: |
|
|
logger.warning("MongoDB client not available") |
|
|
return _db |
|
|
|
|
|
|
|
|
def get_admin_client() -> Optional[MongoClient]: |
|
|
"""Get or create admin MongoDB client (for media_clicks collection).""" |
|
|
global _admin_client |
|
|
if _admin_client is None: |
|
|
mongodb_uri = os.getenv("MONGODB_ADMIN") |
|
|
if not mongodb_uri: |
|
|
logger.warning("MONGODB_ADMIN environment variable not set. Admin MongoDB features will be disabled.") |
|
|
return None |
|
|
try: |
|
|
_admin_client = MongoClient( |
|
|
mongodb_uri, |
|
|
serverSelectionTimeoutMS=5000, |
|
|
connectTimeoutMS=5000, |
|
|
) |
|
|
_admin_client.admin.command("ping") |
|
|
logger.info("Admin MongoDB connection established successfully") |
|
|
except (ConnectionFailure, ServerSelectionTimeoutError) as exc: |
|
|
logger.error("Failed to connect to Admin MongoDB: %s", str(exc)) |
|
|
_admin_client = None |
|
|
return _admin_client |
|
|
|
|
|
|
|
|
def get_admin_database(): |
|
|
"""Get admin database instance.""" |
|
|
global _admin_db |
|
|
if _admin_db is None: |
|
|
client = get_admin_client() |
|
|
if client: |
|
|
db_name = os.getenv("MONGODB_ADMIN_DB_NAME", "adminPanel") |
|
|
_admin_db = client[db_name] |
|
|
logger.info("Using admin database: %s", db_name) |
|
|
else: |
|
|
logger.warning("Admin MongoDB client not available") |
|
|
return _admin_db |
|
|
|
|
|
|
|
|
def _normalize_object_id(raw_value: Optional[Union[str, int, ObjectId]]) -> ObjectId: |
|
|
"""Normalize user id inputs into a deterministic ObjectId.""" |
|
|
if isinstance(raw_value, ObjectId): |
|
|
return raw_value |
|
|
|
|
|
if raw_value is None: |
|
|
return ObjectId() |
|
|
|
|
|
|
|
|
if isinstance(raw_value, str) and not raw_value.strip(): |
|
|
return ObjectId() |
|
|
|
|
|
try: |
|
|
if isinstance(raw_value, int) or (isinstance(raw_value, str) and raw_value.strip().lstrip("-").isdigit()): |
|
|
int_value = int(str(raw_value).strip()) |
|
|
hex_str = format(abs(int_value), "x").zfill(24)[-24:] |
|
|
if ObjectId.is_valid(hex_str): |
|
|
return ObjectId(hex_str) |
|
|
except Exception as exc: |
|
|
logger.debug("Numeric user id normalization failed: %s", str(exc)) |
|
|
|
|
|
if isinstance(raw_value, str): |
|
|
candidate = raw_value.strip() |
|
|
if ObjectId.is_valid(candidate): |
|
|
return ObjectId(candidate) |
|
|
|
|
|
return ObjectId() |
|
|
|
|
|
|
|
|
def _objectid_from_any(value: str) -> ObjectId: |
|
|
"""Convert arbitrary string into an ObjectId deterministically when possible.""" |
|
|
if ObjectId.is_valid(value): |
|
|
return ObjectId(value) |
|
|
try: |
|
|
hex_str = value.encode("utf-8").hex().zfill(24)[-24:] |
|
|
if ObjectId.is_valid(hex_str): |
|
|
return ObjectId(hex_str) |
|
|
except Exception as exc: |
|
|
logger.debug("Category id normalization failed: %s", str(exc)) |
|
|
return ObjectId() |
|
|
|
|
|
|
|
|
def _resolve_category_id( |
|
|
category_id: Optional[str], |
|
|
endpoint_path: Optional[str], |
|
|
default_category_id: Optional[str], |
|
|
) -> ObjectId: |
|
|
"""Pick category id from explicit value, endpoint default, or fallback.""" |
|
|
|
|
|
if isinstance(category_id, str) and not category_id.strip(): |
|
|
category_id = None |
|
|
|
|
|
endpoint_map = { |
|
|
"colorization": os.getenv("DEFAULT_CATEGORY_COLORIZATION"), |
|
|
"upload": os.getenv("DEFAULT_CATEGORY_COLORIZATION"), |
|
|
"colorize": os.getenv("DEFAULT_CATEGORY_COLORIZATION"), |
|
|
} |
|
|
normalized_endpoint = None |
|
|
if endpoint_path: |
|
|
normalized_endpoint = endpoint_path.strip("/").split("/")[0].lower() or None |
|
|
|
|
|
chosen = category_id |
|
|
if not chosen and normalized_endpoint and endpoint_map.get(normalized_endpoint): |
|
|
chosen = endpoint_map[normalized_endpoint] |
|
|
if not chosen: |
|
|
chosen = default_category_id or os.getenv("DEFAULT_CATEGORY_FALLBACK", "69368fcd2e46bd68ae1889b2") |
|
|
|
|
|
return _objectid_from_any(chosen) |
|
|
|
|
|
|
|
|
def _get_today_midnight_utc() -> datetime: |
|
|
"""Get today's date at midnight UTC (timezone-naive for MongoDB compatibility).""" |
|
|
now = datetime.utcnow() |
|
|
return datetime(now.year, now.month, now.day) |
|
|
|
|
|
|
|
|
def _update_daily_count(collection, user_object_id: ObjectId, today: datetime) -> None: |
|
|
"""Update ai_edit_daily_count array: add today with count 1 if missing, fill gaps with count 0.""" |
|
|
|
|
|
today_start = today.replace(hour=0, minute=0, second=0, microsecond=0) |
|
|
|
|
|
|
|
|
user_doc = collection.find_one({"userId": user_object_id}) |
|
|
|
|
|
if not user_doc: |
|
|
|
|
|
|
|
|
logger.warning("User document not found when updating daily count, initializing") |
|
|
collection.update_one( |
|
|
{"userId": user_object_id}, |
|
|
{ |
|
|
"$setOnInsert": { |
|
|
"ai_edit_daily_count": [{ |
|
|
"date": today_start, |
|
|
"count": 1 |
|
|
}] |
|
|
} |
|
|
}, |
|
|
upsert=False |
|
|
) |
|
|
return |
|
|
|
|
|
|
|
|
existing_counts = user_doc.get("ai_edit_daily_count", []) |
|
|
|
|
|
|
|
|
today_exists = False |
|
|
for entry in existing_counts: |
|
|
entry_date = entry.get("date") |
|
|
if entry_date: |
|
|
|
|
|
if isinstance(entry_date, datetime): |
|
|
normalized_date = entry_date.replace(hour=0, minute=0, second=0, microsecond=0) |
|
|
if normalized_date == today_start: |
|
|
today_exists = True |
|
|
break |
|
|
|
|
|
|
|
|
if today_exists: |
|
|
logger.debug("Today's date already exists in daily count, leaving unchanged: %s", today_start.isoformat()) |
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
last_date = None |
|
|
if existing_counts: |
|
|
dates = [] |
|
|
for entry in existing_counts: |
|
|
entry_date = entry.get("date") |
|
|
if entry_date: |
|
|
|
|
|
if isinstance(entry_date, datetime): |
|
|
dates.append(entry_date.replace(hour=0, minute=0, second=0, microsecond=0)) |
|
|
if dates: |
|
|
last_date = max(dates) |
|
|
|
|
|
|
|
|
dates_to_add = [] |
|
|
if last_date: |
|
|
|
|
|
last_date = last_date.replace(hour=0, minute=0, second=0, microsecond=0) |
|
|
current_date = last_date + timedelta(days=1) |
|
|
|
|
|
while current_date < today_start: |
|
|
dates_to_add.append({ |
|
|
"date": current_date, |
|
|
"count": 0 |
|
|
}) |
|
|
current_date += timedelta(days=1) |
|
|
|
|
|
|
|
|
dates_to_add.append({ |
|
|
"date": today_start, |
|
|
"count": 1 |
|
|
}) |
|
|
|
|
|
|
|
|
if dates_to_add: |
|
|
|
|
|
if "ai_edit_daily_count" not in user_doc: |
|
|
collection.update_one( |
|
|
{"userId": user_object_id}, |
|
|
{"$set": {"ai_edit_daily_count": []}} |
|
|
) |
|
|
|
|
|
collection.update_one( |
|
|
{"userId": user_object_id}, |
|
|
{ |
|
|
"$push": { |
|
|
"ai_edit_daily_count": { |
|
|
"$each": dates_to_add |
|
|
} |
|
|
} |
|
|
} |
|
|
) |
|
|
logger.debug("Added %d daily count entries (including today with count 1)", len(dates_to_add)) |
|
|
|
|
|
def log_api_call( |
|
|
endpoint: str, |
|
|
method: str, |
|
|
status_code: int = 200, |
|
|
request_data: Optional[Dict[str, Any]] = None, |
|
|
response_data: Optional[Dict[str, Any]] = None, |
|
|
error: Optional[str] = None, |
|
|
user_id: Optional[str] = None, |
|
|
ip_address: Optional[str] = None |
|
|
) -> bool: |
|
|
""" |
|
|
Log API call to MongoDB |
|
|
|
|
|
Args: |
|
|
endpoint: API endpoint path |
|
|
method: HTTP method (GET, POST, etc.) |
|
|
status_code: HTTP status code |
|
|
request_data: Request data/parameters |
|
|
response_data: Response data |
|
|
error: Error message if any |
|
|
user_id: User ID if authenticated |
|
|
ip_address: Client IP address |
|
|
|
|
|
Returns: |
|
|
True if logged successfully, False otherwise |
|
|
""" |
|
|
try: |
|
|
db = get_database() |
|
|
if db is None: |
|
|
logger.warning("MongoDB not available, skipping API log") |
|
|
return False |
|
|
|
|
|
collection = db["api_calls"] |
|
|
|
|
|
log_entry = { |
|
|
"endpoint": endpoint, |
|
|
"method": method, |
|
|
"status_code": status_code, |
|
|
"timestamp": datetime.utcnow(), |
|
|
"request_data": request_data or {}, |
|
|
"response_data": response_data or {}, |
|
|
"error": error, |
|
|
"user_id": user_id, |
|
|
"ip_address": ip_address |
|
|
} |
|
|
|
|
|
result = collection.insert_one(log_entry) |
|
|
logger.info("API call logged to MongoDB: %s", result.inserted_id) |
|
|
return True |
|
|
except Exception as e: |
|
|
logger.error("Failed to log API call to MongoDB: %s", str(e)) |
|
|
return False |
|
|
|
|
|
def log_image_upload( |
|
|
image_id: str, |
|
|
filename: str, |
|
|
file_size: int, |
|
|
content_type: str, |
|
|
user_id: Optional[str] = None, |
|
|
ip_address: Optional[str] = None |
|
|
) -> bool: |
|
|
""" |
|
|
Log image upload to MongoDB |
|
|
|
|
|
Args: |
|
|
image_id: Unique image identifier |
|
|
filename: Original filename |
|
|
file_size: File size in bytes |
|
|
content_type: MIME type |
|
|
user_id: User ID if authenticated |
|
|
ip_address: Client IP address |
|
|
|
|
|
Returns: |
|
|
True if logged successfully, False otherwise |
|
|
""" |
|
|
try: |
|
|
db = get_database() |
|
|
if db is None: |
|
|
logger.warning("MongoDB not available, skipping upload log") |
|
|
return False |
|
|
|
|
|
collection = db["image_uploads"] |
|
|
|
|
|
log_entry = { |
|
|
"image_id": image_id, |
|
|
"filename": filename, |
|
|
"file_size": file_size, |
|
|
"content_type": content_type, |
|
|
"uploaded_at": datetime.utcnow(), |
|
|
"user_id": user_id, |
|
|
"ip_address": ip_address |
|
|
} |
|
|
|
|
|
result = collection.insert_one(log_entry) |
|
|
logger.info("Image upload logged to MongoDB: %s", result.inserted_id) |
|
|
return True |
|
|
except Exception as e: |
|
|
logger.error("Failed to log image upload to MongoDB: %s", str(e)) |
|
|
return False |
|
|
|
|
|
def log_colorization( |
|
|
result_id: Optional[str] = None, |
|
|
image_id: Optional[str] = None, |
|
|
prompt: Optional[str] = None, |
|
|
model_type: Optional[str] = None, |
|
|
processing_time: Optional[float] = None, |
|
|
user_id: Optional[str] = None, |
|
|
ip_address: Optional[str] = None, |
|
|
status: str = "success", |
|
|
error: Optional[str] = None |
|
|
) -> bool: |
|
|
""" |
|
|
Log colorization request to MongoDB (colorization_db -> colorizations collection) |
|
|
Logs both successful and failed API calls. |
|
|
|
|
|
Args: |
|
|
result_id: Unique result identifier (None for failed requests) |
|
|
image_id: Original image identifier |
|
|
prompt: Text prompt used (if any) |
|
|
model_type: Model type used (fastai, pytorch, sdxl, etc.) |
|
|
processing_time: Time taken to process in seconds |
|
|
user_id: User ID if authenticated |
|
|
ip_address: Client IP address |
|
|
status: Status of the request ("success" or "failed") |
|
|
error: Error message if status is "failed" |
|
|
|
|
|
Returns: |
|
|
True if logged successfully, False otherwise |
|
|
""" |
|
|
try: |
|
|
db = get_database() |
|
|
if db is None: |
|
|
logger.warning("MongoDB not available, skipping colorization log") |
|
|
return False |
|
|
|
|
|
collection = db["colorizations"] |
|
|
|
|
|
|
|
|
if not result_id: |
|
|
import uuid |
|
|
result_id = str(uuid.uuid4()) |
|
|
|
|
|
log_entry = { |
|
|
"result_id": result_id, |
|
|
"image_id": image_id, |
|
|
"prompt": prompt, |
|
|
"model_type": model_type, |
|
|
"processing_time": processing_time, |
|
|
"created_at": datetime.utcnow(), |
|
|
"user_id": user_id, |
|
|
"ip_address": ip_address |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if status: |
|
|
log_entry["status"] = status |
|
|
if error: |
|
|
log_entry["error"] = error |
|
|
|
|
|
result = collection.insert_one(log_entry) |
|
|
logger.info("Colorization logged to MongoDB (status: %s): %s", status, result.inserted_id) |
|
|
return True |
|
|
except Exception as e: |
|
|
logger.error("Failed to log colorization to MongoDB: %s", str(e)) |
|
|
return False |
|
|
|
|
|
|
|
|
def log_media_click( |
|
|
user_id: Optional[Union[str, int, ObjectId]], |
|
|
category_id: Optional[str], |
|
|
*, |
|
|
endpoint_path: Optional[str] = None, |
|
|
default_category_id: Optional[str] = None, |
|
|
) -> bool: |
|
|
"""Log media clicks into the admin MongoDB (media_clicks collection). |
|
|
|
|
|
Only logs if user_id is provided. If user_id is None or empty, returns False without logging. |
|
|
Regular MongoDB logging (api_calls, image_uploads, colorizations) always happens regardless. |
|
|
""" |
|
|
|
|
|
if not user_id or (isinstance(user_id, str) and not user_id.strip()): |
|
|
logger.debug("Skipping media click log - user_id not provided") |
|
|
return False |
|
|
|
|
|
try: |
|
|
db = get_admin_database() |
|
|
if db is None: |
|
|
logger.warning("Admin MongoDB not available, skipping media click log") |
|
|
return False |
|
|
|
|
|
collection = db["media_clicks"] |
|
|
|
|
|
|
|
|
try: |
|
|
collection.drop_index("user_id_1_header_1_media_id_1") |
|
|
except Exception as exc: |
|
|
logger.debug("Legacy index drop skipped: %s", str(exc)) |
|
|
|
|
|
user_object_id = _normalize_object_id(user_id) |
|
|
category_object_id = _resolve_category_id(category_id, endpoint_path, default_category_id) |
|
|
now = datetime.utcnow() |
|
|
today = _get_today_midnight_utc() |
|
|
|
|
|
logger.info("Media click - user_id input: %s, normalized: %s, category_id input: %s, normalized: %s", |
|
|
user_id, str(user_object_id), category_id, str(category_object_id)) |
|
|
|
|
|
|
|
|
update_existing = collection.update_one( |
|
|
{"userId": user_object_id, "categories.categoryId": category_object_id}, |
|
|
{ |
|
|
"$inc": { |
|
|
"categories.$.click_count": 1, |
|
|
"ai_edit_complete": 1 |
|
|
}, |
|
|
"$set": { |
|
|
"categories.$.lastClickedAt": now, |
|
|
"updatedAt": now, |
|
|
"ai_edit_last_date": now |
|
|
}, |
|
|
}, |
|
|
) |
|
|
|
|
|
if update_existing.matched_count == 0: |
|
|
|
|
|
user_exists = collection.find_one({"userId": user_object_id}) |
|
|
|
|
|
if user_exists: |
|
|
|
|
|
collection.update_one( |
|
|
{"userId": user_object_id}, |
|
|
{ |
|
|
"$inc": {"ai_edit_complete": 1}, |
|
|
"$set": { |
|
|
"updatedAt": now, |
|
|
"ai_edit_last_date": now |
|
|
}, |
|
|
"$push": { |
|
|
"categories": { |
|
|
"categoryId": category_object_id, |
|
|
"click_count": 1, |
|
|
"lastClickedAt": now, |
|
|
} |
|
|
}, |
|
|
}, |
|
|
) |
|
|
else: |
|
|
|
|
|
|
|
|
collection.update_one( |
|
|
{"userId": user_object_id}, |
|
|
{ |
|
|
"$setOnInsert": { |
|
|
"createdAt": now, |
|
|
"ai_edit_complete": 0, |
|
|
"ai_edit_daily_count": [] |
|
|
}, |
|
|
"$inc": {"ai_edit_complete": 1}, |
|
|
"$set": { |
|
|
"updatedAt": now, |
|
|
"ai_edit_last_date": now |
|
|
}, |
|
|
"$push": { |
|
|
"categories": { |
|
|
"categoryId": category_object_id, |
|
|
"click_count": 1, |
|
|
"lastClickedAt": now, |
|
|
} |
|
|
}, |
|
|
}, |
|
|
upsert=True, |
|
|
) |
|
|
|
|
|
|
|
|
_update_daily_count(collection, user_object_id, today) |
|
|
|
|
|
logger.info("Media click logged for user %s", str(user_object_id)) |
|
|
return True |
|
|
except Exception as exc: |
|
|
logger.error("Failed to log media click to admin MongoDB: %s", str(exc)) |
|
|
return False |
|
|
|
|
|
def close_connection(): |
|
|
"""Close MongoDB connection""" |
|
|
global _client, _db, _admin_client, _admin_db |
|
|
if _client: |
|
|
_client.close() |
|
|
_client = None |
|
|
_db = None |
|
|
logger.info("MongoDB connection closed") |
|
|
if _admin_client: |
|
|
_admin_client.close() |
|
|
_admin_client = None |
|
|
_admin_db = None |
|
|
logger.info("Admin MongoDB connection closed") |
|
|
|
|
|
|