| """ | |
| MongoDB database connection and logging utilities, including admin media click logging. | |
| """ | |
| import os | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import Optional, Dict, Any, Union, List | |
| from bson import ObjectId | |
| from pymongo import MongoClient | |
| from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError | |
| logger = logging.getLogger(__name__) | |
| # MongoDB connection | |
| _client: Optional[MongoClient] = None | |
| _db = None | |
| # Admin MongoDB connection (media_clicks) | |
| _admin_client: Optional[MongoClient] = None | |
| _admin_db = None | |
| def get_mongodb_client() -> Optional[MongoClient]: | |
| """Get or create MongoDB client""" | |
| global _client | |
| if _client is None: | |
| mongodb_uri = os.getenv("MONGODB_URI") | |
| if not mongodb_uri: | |
| logger.warning("MONGODB_URI environment variable not set. MongoDB features will be disabled.") | |
| return None | |
| try: | |
| _client = MongoClient( | |
| mongodb_uri, | |
| serverSelectionTimeoutMS=5000, | |
| connectTimeoutMS=5000 | |
| ) | |
| # Test connection | |
| _client.admin.command('ping') | |
| logger.info("MongoDB connection established successfully") | |
| except (ConnectionFailure, ServerSelectionTimeoutError) as e: | |
| logger.error("Failed to connect to MongoDB: %s", str(e)) | |
| _client = None | |
| return _client | |
| def get_database(): | |
| """Get database instance""" | |
| global _db | |
| if _db is None: | |
| client = get_mongodb_client() | |
| if client: | |
| db_name = os.getenv("MONGODB_DB_NAME", "colorization_db") | |
| _db = client[db_name] | |
| else: | |
| logger.warning("MongoDB client not available") | |
| return _db | |
| def get_admin_client() -> Optional[MongoClient]: | |
| """Get or create admin MongoDB client (for media_clicks collection).""" | |
| global _admin_client | |
| if _admin_client is None: | |
| mongodb_uri = os.getenv("MONGODB_ADMIN") | |
| if not mongodb_uri: | |
| logger.warning("MONGODB_ADMIN environment variable not set. Admin MongoDB features will be disabled.") | |
| return None | |
| try: | |
| _admin_client = MongoClient( | |
| mongodb_uri, | |
| serverSelectionTimeoutMS=5000, | |
| connectTimeoutMS=5000, | |
| ) | |
| _admin_client.admin.command("ping") | |
| logger.info("Admin MongoDB connection established successfully") | |
| except (ConnectionFailure, ServerSelectionTimeoutError) as exc: | |
| logger.error("Failed to connect to Admin MongoDB: %s", str(exc)) | |
| _admin_client = None | |
| return _admin_client | |
| def get_admin_database(): | |
| """Get admin database instance.""" | |
| global _admin_db | |
| if _admin_db is None: | |
| client = get_admin_client() | |
| if client: | |
| db_name = os.getenv("MONGODB_ADMIN_DB_NAME", "adminPanel") | |
| _admin_db = client[db_name] | |
| logger.info("Using admin database: %s", db_name) | |
| else: | |
| logger.warning("Admin MongoDB client not available") | |
| return _admin_db | |
| def _normalize_object_id(raw_value: Optional[Union[str, int, ObjectId]]) -> ObjectId: | |
| """Normalize user id inputs into a deterministic ObjectId.""" | |
| if isinstance(raw_value, ObjectId): | |
| return raw_value | |
| if raw_value is None: | |
| return ObjectId() | |
| # Handle empty strings as None | |
| if isinstance(raw_value, str) and not raw_value.strip(): | |
| return ObjectId() | |
| try: | |
| if isinstance(raw_value, int) or (isinstance(raw_value, str) and raw_value.strip().lstrip("-").isdigit()): | |
| int_value = int(str(raw_value).strip()) | |
| hex_str = format(abs(int_value), "x").zfill(24)[-24:] | |
| if ObjectId.is_valid(hex_str): | |
| return ObjectId(hex_str) | |
| except Exception as exc: # pragma: no cover - defensive | |
| logger.debug("Numeric user id normalization failed: %s", str(exc)) | |
| if isinstance(raw_value, str): | |
| candidate = raw_value.strip() | |
| if ObjectId.is_valid(candidate): | |
| return ObjectId(candidate) | |
| return ObjectId() | |
| def _objectid_from_any(value: str) -> ObjectId: | |
| """Convert arbitrary string into an ObjectId deterministically when possible.""" | |
| if ObjectId.is_valid(value): | |
| return ObjectId(value) | |
| try: | |
| hex_str = value.encode("utf-8").hex().zfill(24)[-24:] | |
| if ObjectId.is_valid(hex_str): | |
| return ObjectId(hex_str) | |
| except Exception as exc: # pragma: no cover - defensive | |
| logger.debug("Category id normalization failed: %s", str(exc)) | |
| return ObjectId() | |
| def _resolve_category_id( | |
| category_id: Optional[str], | |
| endpoint_path: Optional[str], | |
| default_category_id: Optional[str], | |
| ) -> ObjectId: | |
| """Pick category id from explicit value, endpoint default, or fallback.""" | |
| # Handle empty strings as None | |
| if isinstance(category_id, str) and not category_id.strip(): | |
| category_id = None | |
| endpoint_map = { | |
| "colorization": os.getenv("DEFAULT_CATEGORY_COLORIZATION"), | |
| "upload": os.getenv("DEFAULT_CATEGORY_COLORIZATION"), | |
| "colorize": os.getenv("DEFAULT_CATEGORY_COLORIZATION"), | |
| } | |
| normalized_endpoint = None | |
| if endpoint_path: | |
| normalized_endpoint = endpoint_path.strip("/").split("/")[0].lower() or None | |
| chosen = category_id | |
| if not chosen and normalized_endpoint and endpoint_map.get(normalized_endpoint): | |
| chosen = endpoint_map[normalized_endpoint] | |
| if not chosen: | |
| chosen = default_category_id or os.getenv("DEFAULT_CATEGORY_FALLBACK", "69368fcd2e46bd68ae1889b2") | |
| return _objectid_from_any(chosen) | |
| def _get_today_midnight_utc() -> datetime: | |
| """Get today's date at midnight UTC (timezone-naive for MongoDB compatibility).""" | |
| now = datetime.utcnow() | |
| return datetime(now.year, now.month, now.day) | |
| def _update_daily_count(collection, user_object_id: ObjectId, today: datetime) -> None: | |
| """Update ai_edit_daily_count array: add today with count 1 if missing, fill gaps with count 0.""" | |
| # Ensure today is at midnight | |
| today_start = today.replace(hour=0, minute=0, second=0, microsecond=0) | |
| logger.info("Updating daily count for user %s, today: %s", str(user_object_id), today_start.isoformat()) | |
| # Check if today's date already exists | |
| user_doc = collection.find_one({"userId": user_object_id}) | |
| if not user_doc: | |
| # User document doesn't exist yet - this shouldn't happen as we create it first | |
| # But handle it gracefully by initializing the field | |
| logger.warning("User document not found when updating daily count, initializing") | |
| collection.update_one( | |
| {"userId": user_object_id}, | |
| { | |
| "$setOnInsert": { | |
| "ai_edit_daily_count": [{ | |
| "date": today_start, | |
| "count": 1 | |
| }] | |
| } | |
| }, | |
| upsert=False | |
| ) | |
| return | |
| # Get existing daily counts | |
| existing_counts = user_doc.get("ai_edit_daily_count", []) | |
| logger.debug("Existing daily counts: %d entries", len(existing_counts)) | |
| # Check if today's date already exists | |
| today_exists = False | |
| for entry in existing_counts: | |
| entry_date = entry.get("date") | |
| if entry_date: | |
| # Normalize to midnight for comparison | |
| if isinstance(entry_date, datetime): | |
| normalized_date = entry_date.replace(hour=0, minute=0, second=0, microsecond=0) | |
| logger.debug("Comparing entry date %s with today %s", normalized_date.isoformat(), today_start.isoformat()) | |
| if normalized_date == today_start: | |
| today_exists = True | |
| logger.info("Today's date already exists in daily count, leaving unchanged: %s", today_start.isoformat()) | |
| break | |
| # If today exists, do nothing (leave it as is) | |
| if today_exists: | |
| logger.debug("Today's date already exists in daily count, leaving unchanged: %s", today_start.isoformat()) | |
| return | |
| # Today doesn't exist - need to add it and fill missing dates | |
| logger.info("Today's date does not exist in daily count, adding new entry") | |
| # Find the latest date in existing counts | |
| last_date = None | |
| if existing_counts: | |
| dates = [] | |
| for entry in existing_counts: | |
| entry_date = entry.get("date") | |
| if entry_date: | |
| # Normalize to midnight for comparison | |
| if isinstance(entry_date, datetime): | |
| dates.append(entry_date.replace(hour=0, minute=0, second=0, microsecond=0)) | |
| if dates: | |
| last_date = max(dates) | |
| logger.debug("Last date found: %s", last_date.isoformat()) | |
| else: | |
| logger.debug("No valid dates found in existing counts") | |
| else: | |
| logger.debug("No existing counts, will add today as first entry") | |
| # Generate missing dates between last_date and today | |
| dates_to_add = [] | |
| if last_date: | |
| # Normalize last_date to midnight | |
| last_date = last_date.replace(hour=0, minute=0, second=0, microsecond=0) | |
| current_date = last_date + timedelta(days=1) | |
| # Fill missing dates with count 0 | |
| while current_date < today_start: | |
| dates_to_add.append({ | |
| "date": current_date, | |
| "count": 0 | |
| }) | |
| current_date += timedelta(days=1) | |
| logger.debug("Filled %d gap dates between %s and %s", len(dates_to_add), last_date.isoformat(), today_start.isoformat()) | |
| # Add today's entry with count 1 | |
| dates_to_add.append({ | |
| "date": today_start, | |
| "count": 1 | |
| }) | |
| logger.info("Adding %d new date entries (including today with count 1)", len(dates_to_add)) | |
| # Merge existing entries with the new ones, sort by date (oldest first), | |
| # and keep only the most recent 32 dates (drop the oldest beyond 32). | |
| if dates_to_add: | |
| all_entries = list(existing_counts) + dates_to_add | |
| def _entry_sort_key(entry: Dict[str, Any]) -> datetime: | |
| dt = entry.get("date") | |
| if isinstance(dt, datetime): | |
| return dt.replace(hour=0, minute=0, second=0, microsecond=0) | |
| return datetime.min | |
| all_entries.sort(key=_entry_sort_key) | |
| if len(all_entries) > 32: | |
| removed = len(all_entries) - 32 | |
| all_entries = all_entries[-32:] | |
| logger.debug("Removed %d oldest entries to maintain 32-entry limit", removed) | |
| result = collection.update_one( | |
| {"userId": user_object_id}, | |
| {"$set": {"ai_edit_daily_count": all_entries}}, | |
| ) | |
| logger.info( | |
| "Updated ai_edit_daily_count with %d entries (oldest first, max 32). Matched: %d, Modified: %d", | |
| len(all_entries), result.matched_count, result.modified_count, | |
| ) | |
| else: | |
| logger.warning("No dates to add - this should not happen!") | |
| def log_api_call( | |
| endpoint: str, | |
| method: str, | |
| status_code: int = 200, | |
| request_data: Optional[Dict[str, Any]] = None, | |
| response_data: Optional[Dict[str, Any]] = None, | |
| error: Optional[str] = None, | |
| user_id: Optional[str] = None, | |
| ip_address: Optional[str] = None | |
| ) -> bool: | |
| """ | |
| Log API call to MongoDB | |
| Args: | |
| endpoint: API endpoint path | |
| method: HTTP method (GET, POST, etc.) | |
| status_code: HTTP status code | |
| request_data: Request data/parameters | |
| response_data: Response data | |
| error: Error message if any | |
| user_id: User ID if authenticated | |
| ip_address: Client IP address | |
| Returns: | |
| True if logged successfully, False otherwise | |
| """ | |
| try: | |
| db = get_database() | |
| if db is None: | |
| logger.warning("MongoDB not available, skipping API log") | |
| return False | |
| collection = db["api_calls"] | |
| log_entry = { | |
| "endpoint": endpoint, | |
| "method": method, | |
| "status_code": status_code, | |
| "timestamp": datetime.utcnow(), | |
| "request_data": request_data or {}, | |
| "response_data": response_data or {}, | |
| "error": error, | |
| "user_id": user_id, | |
| "ip_address": ip_address | |
| } | |
| result = collection.insert_one(log_entry) | |
| logger.info("API call logged to MongoDB: %s", result.inserted_id) | |
| return True | |
| except Exception as e: | |
| logger.error("Failed to log API call to MongoDB: %s", str(e)) | |
| return False | |
| def log_image_upload( | |
| image_id: str, | |
| filename: str, | |
| file_size: int, | |
| content_type: str, | |
| user_id: Optional[str] = None, | |
| ip_address: Optional[str] = None | |
| ) -> bool: | |
| """ | |
| Log image upload to MongoDB | |
| Args: | |
| image_id: Unique image identifier | |
| filename: Original filename | |
| file_size: File size in bytes | |
| content_type: MIME type | |
| user_id: User ID if authenticated | |
| ip_address: Client IP address | |
| Returns: | |
| True if logged successfully, False otherwise | |
| """ | |
| try: | |
| db = get_database() | |
| if db is None: | |
| logger.warning("MongoDB not available, skipping upload log") | |
| return False | |
| collection = db["image_uploads"] | |
| log_entry = { | |
| "image_id": image_id, | |
| "filename": filename, | |
| "file_size": file_size, | |
| "content_type": content_type, | |
| "uploaded_at": datetime.utcnow(), | |
| "user_id": user_id, | |
| "ip_address": ip_address | |
| } | |
| result = collection.insert_one(log_entry) | |
| logger.info("Image upload logged to MongoDB: %s", result.inserted_id) | |
| return True | |
| except Exception as e: | |
| logger.error("Failed to log image upload to MongoDB: %s", str(e)) | |
| return False | |
| def log_colorization( | |
| result_id: Optional[str] = None, | |
| image_id: Optional[str] = None, | |
| prompt: Optional[str] = None, | |
| model_type: Optional[str] = None, | |
| processing_time: Optional[float] = None, | |
| user_id: Optional[str] = None, | |
| ip_address: Optional[str] = None, | |
| status: str = "success", | |
| error: Optional[str] = None | |
| ) -> bool: | |
| """ | |
| Log colorization request to MongoDB (colorization_db -> colorizations collection) | |
| Logs both successful and failed API calls. | |
| Args: | |
| result_id: Unique result identifier (None for failed requests) | |
| image_id: Original image identifier | |
| prompt: Text prompt used (if any) | |
| model_type: Model type used (fastai, pytorch, sdxl, etc.) | |
| processing_time: Time taken to process in seconds | |
| user_id: User ID if authenticated | |
| ip_address: Client IP address | |
| status: Status of the request ("success" or "failed") | |
| error: Error message if status is "failed" | |
| Returns: | |
| True if logged successfully, False otherwise | |
| """ | |
| try: | |
| db = get_database() | |
| if db is None: | |
| logger.warning("MongoDB not available, skipping colorization log") | |
| return False | |
| collection = db["colorizations"] | |
| # Generate result_id if not provided (for failed requests) | |
| if not result_id: | |
| import uuid | |
| result_id = str(uuid.uuid4()) | |
| log_entry = { | |
| "result_id": result_id, | |
| "image_id": image_id, | |
| "prompt": prompt, | |
| "model_type": model_type, | |
| "processing_time": processing_time, | |
| "created_at": datetime.utcnow(), | |
| "user_id": user_id, | |
| "ip_address": ip_address | |
| } | |
| # Add status and error fields only if provided (for new documents) | |
| # Existing documents won't have these fields, which is fine | |
| if status: | |
| log_entry["status"] = status | |
| if error: | |
| log_entry["error"] = error | |
| result = collection.insert_one(log_entry) | |
| logger.info("Colorization logged to MongoDB (status: %s): %s", status, result.inserted_id) | |
| return True | |
| except Exception as e: | |
| logger.error("Failed to log colorization to MongoDB: %s", str(e)) | |
| return False | |
| def log_media_click( | |
| user_id: Optional[Union[str, int, ObjectId]], | |
| category_id: Optional[str], | |
| *, | |
| endpoint_path: Optional[str] = None, | |
| default_category_id: Optional[str] = None, | |
| ) -> bool: | |
| """Log media clicks into the admin MongoDB (media_clicks collection). | |
| Only logs if user_id is provided. If user_id is None or empty, returns False without logging. | |
| Regular MongoDB logging (api_calls, image_uploads, colorizations) always happens regardless. | |
| """ | |
| # Only log to media_clicks if user_id is provided | |
| if not user_id or (isinstance(user_id, str) and not user_id.strip()): | |
| logger.debug("Skipping media click log - user_id not provided") | |
| return False | |
| try: | |
| db = get_admin_database() | |
| if db is None: | |
| logger.warning("Admin MongoDB not available, skipping media click log") | |
| return False | |
| collection = db["media_clicks"] | |
| # Drop legacy index to avoid duplicate key errors (best effort) | |
| try: | |
| collection.drop_index("user_id_1_header_1_media_id_1") | |
| except Exception as exc: | |
| logger.debug("Legacy index drop skipped: %s", str(exc)) | |
| user_object_id = _normalize_object_id(user_id) | |
| category_object_id = _resolve_category_id(category_id, endpoint_path, default_category_id) | |
| now = datetime.utcnow() | |
| today = _get_today_midnight_utc() | |
| logger.info("Media click - user_id input: %s, normalized: %s, category_id input: %s, normalized: %s", | |
| user_id, str(user_object_id), category_id, str(category_object_id)) | |
| # Try to update existing category in new schema (categories.categoryId) | |
| update_existing = collection.update_one( | |
| {"userId": user_object_id, "categories.categoryId": category_object_id}, | |
| { | |
| "$inc": { | |
| "categories.$.click_count": 1, | |
| "ai_edit_complete": 1 # Increment usage count | |
| }, | |
| "$set": { | |
| "categories.$.lastClickedAt": now, | |
| "updatedAt": now, | |
| "ai_edit_last_date": now # Update last used date (MongoDB Date object) | |
| }, | |
| }, | |
| ) | |
| if update_existing.matched_count == 0: | |
| # Category not found in new schema, check if user exists | |
| user_exists = collection.find_one({"userId": user_object_id}) | |
| if user_exists: | |
| # User exists but category doesn't in new schema - add to categories array | |
| collection.update_one( | |
| {"userId": user_object_id}, | |
| { | |
| "$inc": {"ai_edit_complete": 1}, # Increment usage count | |
| "$set": { | |
| "updatedAt": now, | |
| "ai_edit_last_date": now # Update last used date (MongoDB Date object) | |
| }, | |
| "$push": { | |
| "categories": { | |
| "categoryId": category_object_id, | |
| "click_count": 1, | |
| "lastClickedAt": now, | |
| } | |
| }, | |
| }, | |
| ) | |
| else: | |
| # User doesn't exist - create new document with categories array | |
| # Default ai_edit_complete = 0, then increment to 1 on first use | |
| collection.update_one( | |
| {"userId": user_object_id}, | |
| { | |
| "$setOnInsert": { | |
| "createdAt": now, | |
| "ai_edit_complete": 0, # Default 0 for new users | |
| "ai_edit_daily_count": [] # Initialize empty array | |
| }, | |
| "$inc": {"ai_edit_complete": 1}, # Increment to 1 on first use | |
| "$set": { | |
| "updatedAt": now, | |
| "ai_edit_last_date": now # Set last used date (MongoDB Date object) | |
| }, | |
| "$push": { | |
| "categories": { | |
| "categoryId": category_object_id, | |
| "click_count": 1, | |
| "lastClickedAt": now, | |
| } | |
| }, | |
| }, | |
| upsert=True, | |
| ) | |
| # Update daily count (after document is created/updated) | |
| _update_daily_count(collection, user_object_id, today) | |
| logger.info("Media click logged for user %s", str(user_object_id)) | |
| return True | |
| except Exception as exc: | |
| logger.error("Failed to log media click to admin MongoDB: %s", str(exc)) | |
| return False | |
| def close_connection(): | |
| """Close MongoDB connection""" | |
| global _client, _db, _admin_client, _admin_db | |
| if _client: | |
| _client.close() | |
| _client = None | |
| _db = None | |
| logger.info("MongoDB connection closed") | |
| if _admin_client: | |
| _admin_client.close() | |
| _admin_client = None | |
| _admin_db = None | |
| logger.info("Admin MongoDB connection closed") | |
| # """ | |
| # MongoDB database connection and logging utilities, including admin media click logging. | |
| # """ | |
| # import os | |
| # import logging | |
| # from datetime import datetime, timedelta | |
| # from typing import Optional, Dict, Any, Union, List | |
| # from bson import ObjectId | |
| # from pymongo import MongoClient | |
| # from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError | |
| # logger = logging.getLogger(__name__) | |
| # # MongoDB connection | |
| # _client: Optional[MongoClient] = None | |
| # _db = None | |
| # # Admin MongoDB connection (media_clicks) | |
| # _admin_client: Optional[MongoClient] = None | |
| # _admin_db = None | |
| # def get_mongodb_client() -> Optional[MongoClient]: | |
| # """Get or create MongoDB client""" | |
| # global _client | |
| # if _client is None: | |
| # mongodb_uri = os.getenv("MONGODB_URI") | |
| # if not mongodb_uri: | |
| # logger.warning("MONGODB_URI environment variable not set. MongoDB features will be disabled.") | |
| # return None | |
| # try: | |
| # _client = MongoClient( | |
| # mongodb_uri, | |
| # serverSelectionTimeoutMS=5000, | |
| # connectTimeoutMS=5000 | |
| # ) | |
| # # Test connection | |
| # _client.admin.command('ping') | |
| # logger.info("MongoDB connection established successfully") | |
| # except (ConnectionFailure, ServerSelectionTimeoutError) as e: | |
| # logger.error("Failed to connect to MongoDB: %s", str(e)) | |
| # _client = None | |
| # return _client | |
| # def get_database(): | |
| # """Get database instance""" | |
| # global _db | |
| # if _db is None: | |
| # client = get_mongodb_client() | |
| # if client: | |
| # db_name = os.getenv("MONGODB_DB_NAME", "colorization_db") | |
| # _db = client[db_name] | |
| # else: | |
| # logger.warning("MongoDB client not available") | |
| # return _db | |
| # def get_admin_client() -> Optional[MongoClient]: | |
| # """Get or create admin MongoDB client (for media_clicks collection).""" | |
| # global _admin_client | |
| # if _admin_client is None: | |
| # mongodb_uri = os.getenv("MONGODB_ADMIN") | |
| # if not mongodb_uri: | |
| # logger.warning("MONGODB_ADMIN environment variable not set. Admin MongoDB features will be disabled.") | |
| # return None | |
| # try: | |
| # _admin_client = MongoClient( | |
| # mongodb_uri, | |
| # serverSelectionTimeoutMS=5000, | |
| # connectTimeoutMS=5000, | |
| # ) | |
| # _admin_client.admin.command("ping") | |
| # logger.info("Admin MongoDB connection established successfully") | |
| # except (ConnectionFailure, ServerSelectionTimeoutError) as exc: | |
| # logger.error("Failed to connect to Admin MongoDB: %s", str(exc)) | |
| # _admin_client = None | |
| # return _admin_client | |
| # def get_admin_database(): | |
| # """Get admin database instance.""" | |
| # global _admin_db | |
| # if _admin_db is None: | |
| # client = get_admin_client() | |
| # if client: | |
| # db_name = os.getenv("MONGODB_ADMIN_DB_NAME", "adminPanel") | |
| # _admin_db = client[db_name] | |
| # logger.info("Using admin database: %s", db_name) | |
| # else: | |
| # logger.warning("Admin MongoDB client not available") | |
| # return _admin_db | |
| # def _normalize_object_id(raw_value: Optional[Union[str, int, ObjectId]]) -> ObjectId: | |
| # """Normalize user id inputs into a deterministic ObjectId.""" | |
| # if isinstance(raw_value, ObjectId): | |
| # return raw_value | |
| # if raw_value is None: | |
| # return ObjectId() | |
| # # Handle empty strings as None | |
| # if isinstance(raw_value, str) and not raw_value.strip(): | |
| # return ObjectId() | |
| # try: | |
| # if isinstance(raw_value, int) or (isinstance(raw_value, str) and raw_value.strip().lstrip("-").isdigit()): | |
| # int_value = int(str(raw_value).strip()) | |
| # hex_str = format(abs(int_value), "x").zfill(24)[-24:] | |
| # if ObjectId.is_valid(hex_str): | |
| # return ObjectId(hex_str) | |
| # except Exception as exc: # pragma: no cover - defensive | |
| # logger.debug("Numeric user id normalization failed: %s", str(exc)) | |
| # if isinstance(raw_value, str): | |
| # candidate = raw_value.strip() | |
| # if ObjectId.is_valid(candidate): | |
| # return ObjectId(candidate) | |
| # return ObjectId() | |
| # def _objectid_from_any(value: str) -> ObjectId: | |
| # """Convert arbitrary string into an ObjectId deterministically when possible.""" | |
| # if ObjectId.is_valid(value): | |
| # return ObjectId(value) | |
| # try: | |
| # hex_str = value.encode("utf-8").hex().zfill(24)[-24:] | |
| # if ObjectId.is_valid(hex_str): | |
| # return ObjectId(hex_str) | |
| # except Exception as exc: # pragma: no cover - defensive | |
| # logger.debug("Category id normalization failed: %s", str(exc)) | |
| # return ObjectId() | |
| # def _resolve_category_id( | |
| # category_id: Optional[str], | |
| # endpoint_path: Optional[str], | |
| # default_category_id: Optional[str], | |
| # ) -> ObjectId: | |
| # """Pick category id from explicit value, endpoint default, or fallback.""" | |
| # # Handle empty strings as None | |
| # if isinstance(category_id, str) and not category_id.strip(): | |
| # category_id = None | |
| # endpoint_map = { | |
| # "colorization": os.getenv("DEFAULT_CATEGORY_COLORIZATION"), | |
| # "upload": os.getenv("DEFAULT_CATEGORY_COLORIZATION"), | |
| # "colorize": os.getenv("DEFAULT_CATEGORY_COLORIZATION"), | |
| # } | |
| # normalized_endpoint = None | |
| # if endpoint_path: | |
| # normalized_endpoint = endpoint_path.strip("/").split("/")[0].lower() or None | |
| # chosen = category_id | |
| # if not chosen and normalized_endpoint and endpoint_map.get(normalized_endpoint): | |
| # chosen = endpoint_map[normalized_endpoint] | |
| # if not chosen: | |
| # chosen = default_category_id or os.getenv("DEFAULT_CATEGORY_FALLBACK", "69368fcd2e46bd68ae1889b2") | |
| # return _objectid_from_any(chosen) | |
| # def _get_today_midnight_utc() -> datetime: | |
| # """Get today's date at midnight UTC (timezone-naive for MongoDB compatibility).""" | |
| # now = datetime.utcnow() | |
| # return datetime(now.year, now.month, now.day) | |
| # def _update_daily_count(collection, user_object_id: ObjectId, today: datetime) -> None: | |
| # """Update ai_edit_daily_count array: add today with count 1 if missing, fill gaps with count 0.""" | |
| # # Ensure today is at midnight | |
| # today_start = today.replace(hour=0, minute=0, second=0, microsecond=0) | |
| # # Check if today's date already exists | |
| # user_doc = collection.find_one({"userId": user_object_id}) | |
| # if not user_doc: | |
| # # User document doesn't exist yet - this shouldn't happen as we create it first | |
| # # But handle it gracefully by initializing the field | |
| # logger.warning("User document not found when updating daily count, initializing") | |
| # collection.update_one( | |
| # {"userId": user_object_id}, | |
| # { | |
| # "$setOnInsert": { | |
| # "ai_edit_daily_count": [{ | |
| # "date": today_start, | |
| # "count": 1 | |
| # }] | |
| # } | |
| # }, | |
| # upsert=False | |
| # ) | |
| # return | |
| # # Get existing daily counts | |
| # existing_counts = user_doc.get("ai_edit_daily_count", []) | |
| # # Check if today's date already exists | |
| # today_exists = False | |
| # for entry in existing_counts: | |
| # entry_date = entry.get("date") | |
| # if entry_date: | |
| # # Normalize to midnight for comparison | |
| # if isinstance(entry_date, datetime): | |
| # normalized_date = entry_date.replace(hour=0, minute=0, second=0, microsecond=0) | |
| # if normalized_date == today_start: | |
| # today_exists = True | |
| # break | |
| # # If today exists, do nothing (leave it as is) | |
| # if today_exists: | |
| # logger.debug("Today's date already exists in daily count, leaving unchanged: %s", today_start.isoformat()) | |
| # return | |
| # # Today doesn't exist - need to add it and fill missing dates | |
| # # Find the latest date in existing counts | |
| # last_date = None | |
| # if existing_counts: | |
| # dates = [] | |
| # for entry in existing_counts: | |
| # entry_date = entry.get("date") | |
| # if entry_date: | |
| # # Normalize to midnight for comparison | |
| # if isinstance(entry_date, datetime): | |
| # dates.append(entry_date.replace(hour=0, minute=0, second=0, microsecond=0)) | |
| # if dates: | |
| # last_date = max(dates) | |
| # # Generate missing dates between last_date and today | |
| # dates_to_add = [] | |
| # if last_date: | |
| # # Normalize last_date to midnight | |
| # last_date = last_date.replace(hour=0, minute=0, second=0, microsecond=0) | |
| # current_date = last_date + timedelta(days=1) | |
| # # Fill missing dates with count 0 | |
| # while current_date < today_start: | |
| # dates_to_add.append({ | |
| # "date": current_date, | |
| # "count": 0 | |
| # }) | |
| # current_date += timedelta(days=1) | |
| # # Add today's entry with count 1 | |
| # dates_to_add.append({ | |
| # "date": today_start, | |
| # "count": 1 | |
| # }) | |
| # # Merge existing entries with the new ones, sort by date (oldest first), | |
| # # and keep only the most recent 32 dates (drop the oldest beyond 32). | |
| # if dates_to_add: | |
| # all_entries = list(existing_counts) + dates_to_add | |
| # def _entry_sort_key(entry: Dict[str, Any]) -> datetime: | |
| # dt = entry.get("date") | |
| # if isinstance(dt, datetime): | |
| # return dt.replace(hour=0, minute=0, second=0, microsecond=0) | |
| # return datetime.min | |
| # all_entries.sort(key=_entry_sort_key) | |
| # if len(all_entries) > 32: | |
| # all_entries = all_entries[-32:] | |
| # collection.update_one( | |
| # {"userId": user_object_id}, | |
| # {"$set": {"ai_edit_daily_count": all_entries}}, | |
| # ) | |
| # logger.debug( | |
| # "Updated ai_edit_daily_count with %d entries (oldest first, max 32)", | |
| # len(all_entries), | |
| # ) | |
| # def log_api_call( | |
| # endpoint: str, | |
| # method: str, | |
| # status_code: int = 200, | |
| # request_data: Optional[Dict[str, Any]] = None, | |
| # response_data: Optional[Dict[str, Any]] = None, | |
| # error: Optional[str] = None, | |
| # user_id: Optional[str] = None, | |
| # ip_address: Optional[str] = None | |
| # ) -> bool: | |
| # """ | |
| # Log API call to MongoDB | |
| # Args: | |
| # endpoint: API endpoint path | |
| # method: HTTP method (GET, POST, etc.) | |
| # status_code: HTTP status code | |
| # request_data: Request data/parameters | |
| # response_data: Response data | |
| # error: Error message if any | |
| # user_id: User ID if authenticated | |
| # ip_address: Client IP address | |
| # Returns: | |
| # True if logged successfully, False otherwise | |
| # """ | |
| # try: | |
| # db = get_database() | |
| # if db is None: | |
| # logger.warning("MongoDB not available, skipping API log") | |
| # return False | |
| # collection = db["api_calls"] | |
| # log_entry = { | |
| # "endpoint": endpoint, | |
| # "method": method, | |
| # "status_code": status_code, | |
| # "timestamp": datetime.utcnow(), | |
| # "request_data": request_data or {}, | |
| # "response_data": response_data or {}, | |
| # "error": error, | |
| # "user_id": user_id, | |
| # "ip_address": ip_address | |
| # } | |
| # result = collection.insert_one(log_entry) | |
| # logger.info("API call logged to MongoDB: %s", result.inserted_id) | |
| # return True | |
| # except Exception as e: | |
| # logger.error("Failed to log API call to MongoDB: %s", str(e)) | |
| # return False | |
| # def log_image_upload( | |
| # image_id: str, | |
| # filename: str, | |
| # file_size: int, | |
| # content_type: str, | |
| # user_id: Optional[str] = None, | |
| # ip_address: Optional[str] = None | |
| # ) -> bool: | |
| # """ | |
| # Log image upload to MongoDB | |
| # Args: | |
| # image_id: Unique image identifier | |
| # filename: Original filename | |
| # file_size: File size in bytes | |
| # content_type: MIME type | |
| # user_id: User ID if authenticated | |
| # ip_address: Client IP address | |
| # Returns: | |
| # True if logged successfully, False otherwise | |
| # """ | |
| # try: | |
| # db = get_database() | |
| # if db is None: | |
| # logger.warning("MongoDB not available, skipping upload log") | |
| # return False | |
| # collection = db["image_uploads"] | |
| # log_entry = { | |
| # "image_id": image_id, | |
| # "filename": filename, | |
| # "file_size": file_size, | |
| # "content_type": content_type, | |
| # "uploaded_at": datetime.utcnow(), | |
| # "user_id": user_id, | |
| # "ip_address": ip_address | |
| # } | |
| # result = collection.insert_one(log_entry) | |
| # logger.info("Image upload logged to MongoDB: %s", result.inserted_id) | |
| # return True | |
| # except Exception as e: | |
| # logger.error("Failed to log image upload to MongoDB: %s", str(e)) | |
| # return False | |
| # def log_colorization( | |
| # result_id: Optional[str] = None, | |
| # image_id: Optional[str] = None, | |
| # prompt: Optional[str] = None, | |
| # model_type: Optional[str] = None, | |
| # processing_time: Optional[float] = None, | |
| # user_id: Optional[str] = None, | |
| # ip_address: Optional[str] = None, | |
| # status: str = "success", | |
| # error: Optional[str] = None | |
| # ) -> bool: | |
| # """ | |
| # Log colorization request to MongoDB (colorization_db -> colorizations collection) | |
| # Logs both successful and failed API calls. | |
| # Args: | |
| # result_id: Unique result identifier (None for failed requests) | |
| # image_id: Original image identifier | |
| # prompt: Text prompt used (if any) | |
| # model_type: Model type used (fastai, pytorch, sdxl, etc.) | |
| # processing_time: Time taken to process in seconds | |
| # user_id: User ID if authenticated | |
| # ip_address: Client IP address | |
| # status: Status of the request ("success" or "failed") | |
| # error: Error message if status is "failed" | |
| # Returns: | |
| # True if logged successfully, False otherwise | |
| # """ | |
| # try: | |
| # db = get_database() | |
| # if db is None: | |
| # logger.warning("MongoDB not available, skipping colorization log") | |
| # return False | |
| # collection = db["colorizations"] | |
| # # Generate result_id if not provided (for failed requests) | |
| # if not result_id: | |
| # import uuid | |
| # result_id = str(uuid.uuid4()) | |
| # log_entry = { | |
| # "result_id": result_id, | |
| # "image_id": image_id, | |
| # "prompt": prompt, | |
| # "model_type": model_type, | |
| # "processing_time": processing_time, | |
| # "created_at": datetime.utcnow(), | |
| # "user_id": user_id, | |
| # "ip_address": ip_address | |
| # } | |
| # # Add status and error fields only if provided (for new documents) | |
| # # Existing documents won't have these fields, which is fine | |
| # if status: | |
| # log_entry["status"] = status | |
| # if error: | |
| # log_entry["error"] = error | |
| # result = collection.insert_one(log_entry) | |
| # logger.info("Colorization logged to MongoDB (status: %s): %s", status, result.inserted_id) | |
| # return True | |
| # except Exception as e: | |
| # logger.error("Failed to log colorization to MongoDB: %s", str(e)) | |
| # return False | |
| # def log_media_click( | |
| # user_id: Optional[Union[str, int, ObjectId]], | |
| # category_id: Optional[str], | |
| # *, | |
| # endpoint_path: Optional[str] = None, | |
| # default_category_id: Optional[str] = None, | |
| # ) -> bool: | |
| # """Log media clicks into the admin MongoDB (media_clicks collection). | |
| # Only logs if user_id is provided. If user_id is None or empty, returns False without logging. | |
| # Regular MongoDB logging (api_calls, image_uploads, colorizations) always happens regardless. | |
| # """ | |
| # # Only log to media_clicks if user_id is provided | |
| # if not user_id or (isinstance(user_id, str) and not user_id.strip()): | |
| # logger.debug("Skipping media click log - user_id not provided") | |
| # return False | |
| # try: | |
| # db = get_admin_database() | |
| # if db is None: | |
| # logger.warning("Admin MongoDB not available, skipping media click log") | |
| # return False | |
| # collection = db["media_clicks"] | |
| # # Drop legacy index to avoid duplicate key errors (best effort) | |
| # try: | |
| # collection.drop_index("user_id_1_header_1_media_id_1") | |
| # except Exception as exc: | |
| # logger.debug("Legacy index drop skipped: %s", str(exc)) | |
| # user_object_id = _normalize_object_id(user_id) | |
| # category_object_id = _resolve_category_id(category_id, endpoint_path, default_category_id) | |
| # now = datetime.utcnow() | |
| # today = _get_today_midnight_utc() | |
| # logger.info("Media click - user_id input: %s, normalized: %s, category_id input: %s, normalized: %s", | |
| # user_id, str(user_object_id), category_id, str(category_object_id)) | |
| # # Try to update existing category in new schema (categories.categoryId) | |
| # update_existing = collection.update_one( | |
| # {"userId": user_object_id, "categories.categoryId": category_object_id}, | |
| # { | |
| # "$inc": { | |
| # "categories.$.click_count": 1, | |
| # "ai_edit_complete": 1 # Increment usage count | |
| # }, | |
| # "$set": { | |
| # "categories.$.lastClickedAt": now, | |
| # "updatedAt": now, | |
| # "ai_edit_last_date": now # Update last used date (MongoDB Date object) | |
| # }, | |
| # }, | |
| # ) | |
| # if update_existing.matched_count == 0: | |
| # # Category not found in new schema, check if user exists | |
| # user_exists = collection.find_one({"userId": user_object_id}) | |
| # if user_exists: | |
| # # User exists but category doesn't in new schema - add to categories array | |
| # collection.update_one( | |
| # {"userId": user_object_id}, | |
| # { | |
| # "$inc": {"ai_edit_complete": 1}, # Increment usage count | |
| # "$set": { | |
| # "updatedAt": now, | |
| # "ai_edit_last_date": now # Update last used date (MongoDB Date object) | |
| # }, | |
| # "$push": { | |
| # "categories": { | |
| # "categoryId": category_object_id, | |
| # "click_count": 1, | |
| # "lastClickedAt": now, | |
| # } | |
| # }, | |
| # }, | |
| # ) | |
| # else: | |
| # # User doesn't exist - create new document with categories array | |
| # # Default ai_edit_complete = 0, then increment to 1 on first use | |
| # collection.update_one( | |
| # {"userId": user_object_id}, | |
| # { | |
| # "$setOnInsert": { | |
| # "createdAt": now, | |
| # "ai_edit_complete": 0, # Default 0 for new users | |
| # "ai_edit_daily_count": [] # Initialize empty array | |
| # }, | |
| # "$inc": {"ai_edit_complete": 1}, # Increment to 1 on first use | |
| # "$set": { | |
| # "updatedAt": now, | |
| # "ai_edit_last_date": now # Set last used date (MongoDB Date object) | |
| # }, | |
| # "$push": { | |
| # "categories": { | |
| # "categoryId": category_object_id, | |
| # "click_count": 1, | |
| # "lastClickedAt": now, | |
| # } | |
| # }, | |
| # }, | |
| # upsert=True, | |
| # ) | |
| # # Update daily count (after document is created/updated) | |
| # _update_daily_count(collection, user_object_id, today) | |
| # logger.info("Media click logged for user %s", str(user_object_id)) | |
| # return True | |
| # except Exception as exc: | |
| # logger.error("Failed to log media click to admin MongoDB: %s", str(exc)) | |
| # return False | |
| # def close_connection(): | |
| # """Close MongoDB connection""" | |
| # global _client, _db, _admin_client, _admin_db | |
| # if _client: | |
| # _client.close() | |
| # _client = None | |
| # _db = None | |
| # logger.info("MongoDB connection closed") | |
| # if _admin_client: | |
| # _admin_client.close() | |
| # _admin_client = None | |
| # _admin_db = None | |
| # logger.info("Admin MongoDB connection closed") |