""" YouTube Video Summarizer API - Hugging Face Spaces Edition Flask backend deployed on Hugging Face Spaces. Provides multilingual YouTube video summarization using: - Whisper (speech-to-text) - NLLB-200 (translation) - Groq API (summarization) All ML models are FREE and run locally on HF Spaces infrastructure. """ from flask import Flask, request, jsonify from flask_cors import CORS from dotenv import load_dotenv import os import logging from services.transcript import TranscriptService from services.summarizer import SummarizerService from config import ( SUPPORTED_LANGUAGES, get_language_name, is_english, ) # Load environment variables load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) # Enable CORS for all origins (allow frontend from any domain) CORS(app, resources={ r"/*": { "origins": "*", "methods": ["GET", "POST", "OPTIONS"], "allow_headers": ["Content-Type", "Authorization"] } }) # Initialize services (lazy-loaded for heavy models) transcript_service = TranscriptService() summarizer_service = SummarizerService() # Translation service is lazy-loaded to avoid loading 2.4GB model on startup _translation_service = None def get_translation_service(): """Lazy-load the translation service.""" global _translation_service if _translation_service is None: from services.translation import TranslationService _translation_service = TranslationService() return _translation_service # ============================================================================= # ROOT & HEALTH ENDPOINTS # ============================================================================= @app.route('/', methods=['GET']) def root(): """Root endpoint - serves as health check for HF Spaces""" return jsonify({ 'status': 'healthy', 'service': 'YouTube Summarizer API', 'version': '2.0.0', 'docs': '/api/health for detailed status' }), 200 @app.route('/api/health', methods=['GET']) def health_check(): """Detailed health check endpoint""" return jsonify({ 'status': 'healthy', 'message': 'YouTube Summarizer API is running on Hugging Face Spaces', 'version': '2.0.0', 'features': ['multilingual', 'whisper', 'translation'], 'models': { 'whisper': 'openai/whisper-small', 'translation': 'facebook/nllb-200-distilled-600M', 'summarization': 'groq/llama-3.1-8b-instant' } }), 200 @app.route('/api/languages', methods=['GET']) def get_languages(): """Get list of supported languages""" return jsonify({ 'success': True, 'languages': SUPPORTED_LANGUAGES }), 200 @app.route('/api/warmup', methods=['POST']) def warmup_models(): """ Pre-load ML models to avoid delay on first request. This can take 2-5 minutes on first run (downloading models). """ try: results = {} data = request.get_json() or {} if data.get('translation', False): logger.info("Warming up translation model...") translation_service = get_translation_service() translation_service.warmup() results['translation'] = 'loaded' if data.get('whisper', False): logger.info("Warming up Whisper model...") from services.speech_to_text import SpeechToTextService stt = SpeechToTextService() stt.warmup() results['whisper'] = 'loaded' return jsonify({ 'success': True, 'message': 'Models warmed up successfully', 'models': results }), 200 except Exception as e: logger.error(f"Warmup failed: {e}") return jsonify({ 'error': 'Warmup failed', 'message': str(e) }), 500 # ============================================================================= # AUDIO TRANSCRIPTION ENDPOINTS (for Railway integration) # ============================================================================= @app.route('/api/transcribe-audio', methods=['POST']) def transcribe_audio(): """ Transcribe audio using Whisper. Receives audio as base64 from Railway backend. """ try: data = request.get_json() if not data or 'audio_base64' not in data: return jsonify({ 'error': 'Missing audio', 'message': 'Please provide audio_base64' }), 400 import base64 import tempfile # Decode audio audio_data = base64.b64decode(data['audio_base64']) # Save to temp file with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f: f.write(audio_data) audio_path = f.name try: # Transcribe with Whisper from services.speech_to_text import SpeechToTextService stt = SpeechToTextService() result = stt.transcribe_audio(audio_path) return jsonify({ 'success': True, 'transcript': result['text'], 'language': result['language'], 'word_count': len(result['text'].split()) }), 200 finally: # Cleanup import os if os.path.exists(audio_path): os.remove(audio_path) except Exception as e: logger.error(f"Audio transcription failed: {e}") return jsonify({ 'error': 'Transcription failed', 'message': str(e) }), 500 @app.route('/api/process-audio', methods=['POST']) def process_audio(): """ Full pipeline for audio: Whisper transcription → Translation → Summary. Receives audio as base64 from Railway backend. """ try: data = request.get_json() if not data or 'audio_base64' not in data: return jsonify({ 'error': 'Missing audio', 'message': 'Please provide audio_base64' }), 400 import base64 import tempfile video_id = data.get('video_id', 'unknown') summary_type = data.get('summary_type', 'general') target_language = data.get('target_language', 'eng') # Decode audio audio_data = base64.b64decode(data['audio_base64']) # Save to temp file with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f: f.write(audio_data) audio_path = f.name try: # Step 1: Transcribe with Whisper logger.info("Transcribing audio with Whisper...") from services.speech_to_text import SpeechToTextService stt = SpeechToTextService() whisper_result = stt.transcribe_audio(audio_path) transcript = whisper_result['text'] original_language = whisper_result['language'] original_word_count = len(transcript.split()) logger.info(f"Transcription complete. Language: {original_language}") # Step 2: Translate to English if needed english_transcript = transcript if not is_english(original_language): logger.info("Translating to English...") translation_service = get_translation_service() english_transcript = translation_service.translate_to_english( transcript, original_language ) # Step 3: Summarize logger.info("Generating summary...") summary = summarizer_service.summarize( text=english_transcript, summary_type=summary_type, chunk_size=2500, max_tokens=500 ) # Step 4: Translate summary to target language if needed final_summary = summary summary_language = "eng" if not is_english(target_language): logger.info(f"Translating summary to {target_language}...") translation_service = get_translation_service() final_summary = translation_service.translate_from_english(summary, target_language) summary_language = target_language # Calculate statistics summary_word_count = len(final_summary.split()) compression_ratio = (summary_word_count / original_word_count) * 100 if original_word_count > 0 else 0 response = { 'success': True, 'video_id': video_id, 'original_language': original_language, 'original_language_name': get_language_name(original_language), 'transcript': transcript, 'transcript_source': 'whisper', 'summary': final_summary, 'summary_language': summary_language, 'summary_language_name': get_language_name(summary_language), 'statistics': { 'original_word_count': original_word_count, 'summary_word_count': summary_word_count, 'compression_ratio': round(compression_ratio, 1), 'reading_time_minutes': max(1, summary_word_count // 200) } } if not is_english(original_language): response['english_transcript'] = english_transcript if not is_english(target_language): response['english_summary'] = summary logger.info("Audio processing complete!") return jsonify(response), 200 finally: # Cleanup import os if os.path.exists(audio_path): os.remove(audio_path) except Exception as e: logger.error(f"Audio processing failed: {e}") return jsonify({ 'error': 'Processing failed', 'message': str(e) }), 500 # ============================================================================= # TRANSCRIPT ENDPOINTS # ============================================================================= @app.route('/api/transcript', methods=['POST']) def get_transcript(): """ Extract transcript from YouTube video (multilingual). Request: { "url": "youtube_url", "use_whisper": true } Response: { "success": true, "transcript": "...", "language": "tam", ... } """ try: data = request.get_json() if not data or 'url' not in data: return jsonify({ 'error': 'Missing YouTube URL', 'message': 'Please provide a valid YouTube URL' }), 400 url = data['url'] use_whisper = data.get('use_whisper', True) video_id = transcript_service.extract_video_id(url) result = transcript_service.get_video_transcript(url, use_whisper_fallback=use_whisper) return jsonify({ 'success': True, 'video_id': video_id, 'transcript': result['transcript'], 'language': result['language'], 'language_name': get_language_name(result['language']), 'source': result['source'], 'word_count': result['word_count'] }), 200 except ValueError as e: return jsonify({'error': 'Invalid URL', 'message': str(e)}), 400 except Exception as e: logger.error(f"Transcript extraction failed: {e}") return jsonify({'error': 'Transcript extraction failed', 'message': str(e)}), 500 # ============================================================================= # TRANSLATION ENDPOINTS # ============================================================================= @app.route('/api/translate', methods=['POST']) def translate_text(): """ Translate text between languages. Request: { "text": "Hello", "source_lang": "eng", "target_lang": "hin" } Response: { "success": true, "translated_text": "नमस्ते", ... } """ try: data = request.get_json() if not data or 'text' not in data: return jsonify({ 'error': 'Missing text', 'message': 'Please provide text to translate' }), 400 text = data['text'] source_lang = data.get('source_lang', 'eng') target_lang = data.get('target_lang', 'hin') translation_service = get_translation_service() translated = translation_service.translate(text, source_lang, target_lang) return jsonify({ 'success': True, 'translated_text': translated, 'source_lang': source_lang, 'source_lang_name': get_language_name(source_lang), 'target_lang': target_lang, 'target_lang_name': get_language_name(target_lang) }), 200 except ValueError as e: return jsonify({'error': 'Invalid language', 'message': str(e)}), 400 except Exception as e: logger.error(f"Translation failed: {e}") return jsonify({'error': 'Translation failed', 'message': str(e)}), 500 @app.route('/api/detect-language', methods=['POST']) def detect_language(): """Detect the language of given text.""" try: data = request.get_json() if not data or 'text' not in data: return jsonify({ 'error': 'Missing text', 'message': 'Please provide text for language detection' }), 400 translation_service = get_translation_service() result = translation_service.detect_language(data['text']) return jsonify({ 'success': True, 'language': result['code'], 'language_name': result['name'] }), 200 except Exception as e: logger.error(f"Language detection failed: {e}") return jsonify({'error': 'Language detection failed', 'message': str(e)}), 500 # ============================================================================= # SUMMARIZATION ENDPOINTS # ============================================================================= @app.route('/api/summarize', methods=['POST']) def summarize(): """ Generate summary from transcript. Request: { "transcript": "...", "summary_type": "general" } Response: { "success": true, "summary": "...", "statistics": {...} } """ try: data = request.get_json() if not data or 'transcript' not in data: return jsonify({ 'error': 'Missing transcript', 'message': 'Please provide transcript text' }), 400 transcript = data['transcript'] summary_type = data.get('summary_type', 'general') chunk_size = data.get('chunk_size', 2500) max_tokens = data.get('max_tokens', 500) valid_types = ['general', 'detailed', 'bullet_points', 'key_takeaways'] if summary_type not in valid_types: return jsonify({ 'error': 'Invalid summary type', 'message': f'Must be one of: {", ".join(valid_types)}' }), 400 summary = summarizer_service.summarize( text=transcript, summary_type=summary_type, chunk_size=chunk_size, max_tokens=max_tokens ) summary_word_count = len(summary.split()) original_word_count = len(transcript.split()) compression_ratio = (summary_word_count / original_word_count) * 100 if original_word_count > 0 else 0 return jsonify({ 'success': True, 'summary': summary, 'statistics': { 'original_word_count': original_word_count, 'summary_word_count': summary_word_count, 'compression_ratio': round(compression_ratio, 1), 'reading_time_minutes': max(1, summary_word_count // 200) } }), 200 except Exception as e: logger.error(f"Summarization failed: {e}") return jsonify({'error': 'Summarization failed', 'message': str(e)}), 500 # ============================================================================= # FULL PIPELINE ENDPOINT # ============================================================================= @app.route('/api/process', methods=['POST']) def process_video(): """ Full multilingual pipeline: Transcript → Translation → Summary → Translation Request: { "url": "youtube_url", "summary_type": "general", "target_language": "hin" (optional) } """ try: data = request.get_json() if not data or 'url' not in data: return jsonify({ 'error': 'Missing YouTube URL', 'message': 'Please provide a valid YouTube URL' }), 400 url = data['url'] summary_type = data.get('summary_type', 'general') target_language = data.get('target_language', 'eng') chunk_size = data.get('chunk_size', 2500) max_tokens = data.get('max_tokens', 500) # Step 1: Extract video ID video_id = transcript_service.extract_video_id(url) logger.info(f"Processing video: {video_id}") # Step 2: Get transcript with language logger.info("Step 1/4: Extracting transcript...") transcript_result = transcript_service.get_video_transcript(url, use_whisper_fallback=True) original_transcript = transcript_result['transcript'] original_language = transcript_result['language'] original_word_count = transcript_result['word_count'] # Step 3: Translate to English if needed english_transcript = original_transcript if not is_english(original_language): logger.info("Step 2/4: Translating to English...") translation_service = get_translation_service() english_transcript = translation_service.translate_to_english( original_transcript, original_language ) else: logger.info("Step 2/4: Skipped (already English)") # Step 4: Summarize in English logger.info("Step 3/4: Generating summary...") summary = summarizer_service.summarize( text=english_transcript, summary_type=summary_type, chunk_size=chunk_size, max_tokens=max_tokens ) # Step 5: Translate summary to target language final_summary = summary summary_language = "eng" if not is_english(target_language): logger.info(f"Step 4/4: Translating summary to {target_language}...") translation_service = get_translation_service() final_summary = translation_service.translate_from_english(summary, target_language) summary_language = target_language else: logger.info("Step 4/4: Skipped (English output)") # Calculate statistics summary_word_count = len(final_summary.split()) compression_ratio = (summary_word_count / original_word_count) * 100 if original_word_count > 0 else 0 response = { 'success': True, 'video_id': video_id, 'original_language': original_language, 'original_language_name': get_language_name(original_language), 'transcript': original_transcript, 'transcript_source': transcript_result['source'], 'summary': final_summary, 'summary_language': summary_language, 'summary_language_name': get_language_name(summary_language), 'statistics': { 'original_word_count': original_word_count, 'summary_word_count': summary_word_count, 'compression_ratio': round(compression_ratio, 1), 'reading_time_minutes': max(1, summary_word_count // 200) } } if not is_english(original_language): response['english_transcript'] = english_transcript if not is_english(target_language): response['english_summary'] = summary logger.info("Processing complete!") return jsonify(response), 200 except ValueError as e: return jsonify({'error': 'Invalid URL', 'message': str(e)}), 400 except Exception as e: logger.error(f"Processing failed: {e}") return jsonify({'error': 'Processing failed', 'message': str(e)}), 500 # ============================================================================= # ERROR HANDLERS # ============================================================================= @app.errorhandler(404) def not_found(error): return jsonify({ 'error': 'Not found', 'message': 'The requested endpoint does not exist' }), 404 @app.errorhandler(500) def internal_error(error): return jsonify({ 'error': 'Internal server error', 'message': 'An unexpected error occurred' }), 500 # ============================================================================= # MAIN (for local testing only - gunicorn is used in production) # ============================================================================= if __name__ == '__main__': port = int(os.environ.get('PORT', 7860)) if not os.getenv('GROQ_API_KEY'): print("⚠️ Warning: GROQ_API_KEY not found") print("Set it in HF Spaces Settings → Secrets") print("🚀 Starting YouTube Summarizer API...") print(f"📡 API available at: http://localhost:{port}") app.run(debug=False, host='0.0.0.0', port=port)