Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile, HTTPException, Body, Form | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| import os | |
| import cv2 | |
| import torch | |
| import json | |
| import re | |
| import logging | |
| from datetime import datetime | |
| from paddleocr import PaddleOCR | |
| from sentence_transformers import CrossEncoder | |
| from textblob import TextBlob | |
| from ultralytics import YOLO | |
| import nltk | |
| from nltk.tokenize import sent_tokenize | |
| # --- CONFIGURATION --- | |
| logging.basicConfig(level=logging.INFO) | |
| nltk.download('punkt') | |
| app = FastAPI() | |
| # Folders Setup | |
| os.makedirs("static", exist_ok=True) | |
| os.makedirs("uploads", exist_ok=True) | |
| os.makedirs("uploads/notes", exist_ok=True) | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| app.mount("/uploads", StaticFiles(directory="uploads"), name="uploads") | |
| # Security Constants | |
| SECRET_ADMIN_CODE = "MPGI2025" | |
| DB_FILE = "users.json" | |
| NOTES_DB = "notes.json" | |
| # --- LOAD AI MODELS --- | |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
| # ✅ FIX: Removed 'show_log=False' to fix PaddleOCR version error | |
| ocr = PaddleOCR(use_angle_cls=True, lang='en') | |
| grade_model = CrossEncoder('cross-encoder/stsb-roberta-large', device=device) | |
| yolo_model = YOLO("yolov8n.pt") | |
| # --- DATABASE HELPERS --- | |
| def load_users(): | |
| if not os.path.exists(DB_FILE): | |
| with open(DB_FILE, "w") as f: json.dump({"teachers": {}, "students": {}}, f) | |
| with open(DB_FILE, "r") as f: return json.load(f) | |
| def save_users(data): | |
| with open(DB_FILE, "w") as f: json.dump(data, f, indent=4) | |
| def load_notes_db(): | |
| if not os.path.exists(NOTES_DB): | |
| with open(NOTES_DB, "w") as f: json.dump([], f) | |
| with open(NOTES_DB, "r") as f: return json.load(f) | |
| def save_note_entry(entry): | |
| notes = load_notes_db() | |
| notes.insert(0, entry) # Newest first | |
| with open(NOTES_DB, "w") as f: json.dump(notes, f, indent=4) | |
| # --- AUTH ROUTES --- | |
| async def register_user(payload: dict = Body(...)): | |
| user_id = payload.get("user_id") | |
| password = payload.get("password") | |
| name = payload.get("name") | |
| role = payload.get("role") | |
| admin_code = payload.get("admin_code", "") | |
| if not all([user_id, password, name, role]): | |
| raise HTTPException(status_code=400, detail="Missing fields") | |
| if role == "teacher" and admin_code != SECRET_ADMIN_CODE: | |
| raise HTTPException(status_code=403, detail="Invalid Admin Code!") | |
| db = load_users() | |
| category = "teachers" if role == "teacher" else "students" | |
| if user_id in db[category]: | |
| raise HTTPException(status_code=400, detail="User ID already exists!") | |
| db[category][user_id] = {"name": name, "password": password, "role": role} | |
| save_users(db) | |
| return {"success": True, "message": "Registered Successfully!"} | |
| async def login_user(payload: dict = Body(...)): | |
| user_id = payload.get("user_id") | |
| password = payload.get("password") | |
| role = payload.get("role") | |
| db = load_users() | |
| category = "teachers" if role == "teacher" else "students" | |
| user = db.get(category, {}).get(user_id) | |
| if user and user["password"] == password: | |
| return { | |
| "success": True, "name": user["name"], | |
| "user_id": user_id, "role": role, | |
| "redirect": "/teacher" if role == "teacher" else "/student" | |
| } | |
| raise HTTPException(status_code=401, detail="Invalid Credentials") | |
| # --- NOTES SHARING ROUTES --- | |
| async def share_note(file: UploadFile = File(...), title: str = Form(...)): | |
| clean_name = re.sub(r'[^\w\-_\.]', '_', file.filename) | |
| filename = f"{datetime.now().strftime('%H%M%S')}_{clean_name}" | |
| file_path = os.path.join("uploads/notes", filename) | |
| with open(file_path, "wb") as f: f.write(await file.read()) | |
| entry = { | |
| "title": title, | |
| "filename": filename, | |
| "date": datetime.now().strftime("%d %b, %Y"), | |
| "size": f"{round(os.path.getsize(file_path)/1024, 1)} KB", | |
| "url": f"/uploads/notes/{filename}" | |
| } | |
| save_note_entry(entry) | |
| return {"success": True, "note": entry} | |
| async def get_all_notes(): | |
| return load_notes_db() | |
| # --- PAGE ROUTES --- | |
| def serve_login(): return FileResponse("static/login.html") | |
| def serve_teacher(): return FileResponse("static/teacher.html") | |
| def serve_student(): return FileResponse("static/student.html") | |
| def serve_dashboard(): return FileResponse("static/dashboard.html") | |
| # --- GRADING LOGIC --- | |
| def get_student_id_from_text(result_pairs, cleaned_text): | |
| for t, _ in result_pairs: | |
| t_norm = t.replace(" ", "").replace("-", "") | |
| if t_norm.isdigit() and len(t_norm) >= 6: return t_norm | |
| match = re.search(r"\b\d{6,}\b", cleaned_text) | |
| if match: return match.group() | |
| return None | |
| def load_rubric(): | |
| path = os.path.join("static", "rubric.json") | |
| if os.path.exists(path): | |
| with open(path, "r") as f: return json.load(f) | |
| return {} | |
| async def upload_and_grade(file: UploadFile = File(...)): | |
| path = os.path.join("uploads", file.filename) | |
| with open(path, "wb") as f: f.write(await file.read()) | |
| img = cv2.imread(path) | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| gray = cv2.resize(gray, None, fx=2.0, fy=2.0) | |
| # YOLO Diagram | |
| yolo_res = yolo_model(img) | |
| diagram_found = any(float(box.conf.item()) > 0.5 for box in yolo_res[0].boxes) | |
| # OCR | |
| ocr_res = ocr.ocr(img, cls=True) | |
| raw_text = " ".join([line[1][0] for line in ocr_res[0]]) if ocr_res and ocr_res[0] else "" | |
| cleaned = raw_text.replace("..", ".").replace("_", " ") | |
| cleaned = re.sub(r'(?<=[a-z0-9])(Q\d+)', r' \1', cleaned) | |
| sentences = sent_tokenize(cleaned) | |
| student_id = get_student_id_from_text(ocr_res[0] if ocr_res else [], cleaned) | |
| rubric = load_rubric() | |
| report = {} | |
| total_score = 0 | |
| total_possible = 0 | |
| for q, data in rubric.items(): | |
| max_m = data.get("marks", 5) | |
| model_ans = data.get("model_answer", q) | |
| total_possible += max_m | |
| pairs = [[model_ans, s] for s in sentences] | |
| best_score = float(max(grade_model.predict(pairs))) if pairs else 0 | |
| marks = 0 | |
| status = "Not Found" | |
| if best_score > 0.25: | |
| marks = round(min(best_score * max_m, max_m), 2) | |
| status = "Matched" | |
| report[q] = {"status": status, "marks_awarded": marks, "score_percentage": round(best_score*100, 2)} | |
| total_score += marks | |
| if diagram_found: | |
| total_score += 1.0 | |
| report["Diagram Bonus"] = {"status": "Detected", "marks_awarded": 1.0, "score_percentage": 100} | |
| res_data = { | |
| "filename": file.filename, | |
| "student_id": student_id, | |
| "total_marks_awarded": round(total_score, 2), | |
| "total_possible_marks": total_possible, | |
| "grading_report": report, | |
| "processed_image_path": f"/uploads/{file.filename}" | |
| } | |
| with open(path.replace(".jpg", ".json").replace(".png", ".json"), "w") as f: json.dump(res_data, f) | |
| return res_data | |
| async def view_report(id: str): | |
| for f in os.listdir("uploads"): | |
| if f.endswith(".json"): | |
| with open(os.path.join("uploads", f)) as jf: | |
| data = json.load(jf) | |
| if data.get("student_id") == id: | |
| data["processed_image_path"] = f"/uploads/{data['filename']}" | |
| return data | |
| raise HTTPException(status_code=404, detail="Not Found") | |
| async def get_analytics(): | |
| scores = {} | |
| for f in os.listdir("uploads"): | |
| if f.endswith(".json"): | |
| with open(os.path.join("uploads", f)) as jf: data = json.load(jf) | |
| for q, inf in data.get("grading_report", {}).items(): | |
| scores.setdefault(q, []).append(inf.get("score_percentage", 0)) | |
| return { | |
| "rubric_points": list(scores.keys()), | |
| "rubric_scores": [round(sum(v)/len(v), 2) for v in scores.values()] | |
| } |