|
|
""" |
|
|
YouTube Video Summarizer API - Hugging Face Spaces Edition |
|
|
|
|
|
Flask backend deployed on Hugging Face Spaces. |
|
|
Provides multilingual YouTube video summarization using: |
|
|
- Whisper (speech-to-text) |
|
|
- NLLB-200 (translation) |
|
|
- Groq API (summarization) |
|
|
|
|
|
All ML models are FREE and run locally on HF Spaces infrastructure. |
|
|
""" |
|
|
|
|
|
from flask import Flask, request, jsonify |
|
|
from flask_cors import CORS |
|
|
from dotenv import load_dotenv |
|
|
import os |
|
|
import logging |
|
|
|
|
|
from services.transcript import TranscriptService |
|
|
from services.summarizer import SummarizerService |
|
|
from config import ( |
|
|
SUPPORTED_LANGUAGES, |
|
|
get_language_name, |
|
|
is_english, |
|
|
) |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
app = Flask(__name__) |
|
|
|
|
|
|
|
|
CORS(app, resources={ |
|
|
r"/*": { |
|
|
"origins": "*", |
|
|
"methods": ["GET", "POST", "OPTIONS"], |
|
|
"allow_headers": ["Content-Type", "Authorization"] |
|
|
} |
|
|
}) |
|
|
|
|
|
|
|
|
transcript_service = TranscriptService() |
|
|
summarizer_service = SummarizerService() |
|
|
|
|
|
|
|
|
_translation_service = None |
|
|
|
|
|
def get_translation_service(): |
|
|
"""Lazy-load the translation service.""" |
|
|
global _translation_service |
|
|
if _translation_service is None: |
|
|
from services.translation import TranslationService |
|
|
_translation_service = TranslationService() |
|
|
return _translation_service |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route('/', methods=['GET']) |
|
|
def root(): |
|
|
"""Root endpoint - serves as health check for HF Spaces""" |
|
|
return jsonify({ |
|
|
'status': 'healthy', |
|
|
'service': 'YouTube Summarizer API', |
|
|
'version': '2.0.0', |
|
|
'docs': '/api/health for detailed status' |
|
|
}), 200 |
|
|
|
|
|
|
|
|
@app.route('/api/health', methods=['GET']) |
|
|
def health_check(): |
|
|
"""Detailed health check endpoint""" |
|
|
return jsonify({ |
|
|
'status': 'healthy', |
|
|
'message': 'YouTube Summarizer API is running on Hugging Face Spaces', |
|
|
'version': '2.0.0', |
|
|
'features': ['multilingual', 'whisper', 'translation'], |
|
|
'models': { |
|
|
'whisper': 'openai/whisper-small', |
|
|
'translation': 'facebook/nllb-200-distilled-600M', |
|
|
'summarization': 'groq/llama-3.1-8b-instant' |
|
|
} |
|
|
}), 200 |
|
|
|
|
|
|
|
|
@app.route('/api/languages', methods=['GET']) |
|
|
def get_languages(): |
|
|
"""Get list of supported languages""" |
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'languages': SUPPORTED_LANGUAGES |
|
|
}), 200 |
|
|
|
|
|
|
|
|
@app.route('/api/warmup', methods=['POST']) |
|
|
def warmup_models(): |
|
|
""" |
|
|
Pre-load ML models to avoid delay on first request. |
|
|
This can take 2-5 minutes on first run (downloading models). |
|
|
""" |
|
|
try: |
|
|
results = {} |
|
|
data = request.get_json() or {} |
|
|
|
|
|
if data.get('translation', False): |
|
|
logger.info("Warming up translation model...") |
|
|
translation_service = get_translation_service() |
|
|
translation_service.warmup() |
|
|
results['translation'] = 'loaded' |
|
|
|
|
|
if data.get('whisper', False): |
|
|
logger.info("Warming up Whisper model...") |
|
|
from services.speech_to_text import SpeechToTextService |
|
|
stt = SpeechToTextService() |
|
|
stt.warmup() |
|
|
results['whisper'] = 'loaded' |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'message': 'Models warmed up successfully', |
|
|
'models': results |
|
|
}), 200 |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Warmup failed: {e}") |
|
|
return jsonify({ |
|
|
'error': 'Warmup failed', |
|
|
'message': str(e) |
|
|
}), 500 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route('/api/transcribe-audio', methods=['POST']) |
|
|
def transcribe_audio(): |
|
|
""" |
|
|
Transcribe audio using Whisper. |
|
|
Receives audio as base64 from Railway backend. |
|
|
""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
|
|
|
if not data or 'audio_base64' not in data: |
|
|
return jsonify({ |
|
|
'error': 'Missing audio', |
|
|
'message': 'Please provide audio_base64' |
|
|
}), 400 |
|
|
|
|
|
import base64 |
|
|
import tempfile |
|
|
|
|
|
|
|
|
audio_data = base64.b64decode(data['audio_base64']) |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f: |
|
|
f.write(audio_data) |
|
|
audio_path = f.name |
|
|
|
|
|
try: |
|
|
|
|
|
from services.speech_to_text import SpeechToTextService |
|
|
stt = SpeechToTextService() |
|
|
result = stt.transcribe_audio(audio_path) |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'transcript': result['text'], |
|
|
'language': result['language'], |
|
|
'word_count': len(result['text'].split()) |
|
|
}), 200 |
|
|
|
|
|
finally: |
|
|
|
|
|
import os |
|
|
if os.path.exists(audio_path): |
|
|
os.remove(audio_path) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Audio transcription failed: {e}") |
|
|
return jsonify({ |
|
|
'error': 'Transcription failed', |
|
|
'message': str(e) |
|
|
}), 500 |
|
|
|
|
|
|
|
|
@app.route('/api/process-audio', methods=['POST']) |
|
|
def process_audio(): |
|
|
""" |
|
|
Full pipeline for audio: Whisper transcription → Translation → Summary. |
|
|
Receives audio as base64 from Railway backend. |
|
|
""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
|
|
|
if not data or 'audio_base64' not in data: |
|
|
return jsonify({ |
|
|
'error': 'Missing audio', |
|
|
'message': 'Please provide audio_base64' |
|
|
}), 400 |
|
|
|
|
|
import base64 |
|
|
import tempfile |
|
|
|
|
|
video_id = data.get('video_id', 'unknown') |
|
|
summary_type = data.get('summary_type', 'general') |
|
|
target_language = data.get('target_language', 'eng') |
|
|
|
|
|
|
|
|
audio_data = base64.b64decode(data['audio_base64']) |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f: |
|
|
f.write(audio_data) |
|
|
audio_path = f.name |
|
|
|
|
|
try: |
|
|
|
|
|
logger.info("Transcribing audio with Whisper...") |
|
|
from services.speech_to_text import SpeechToTextService |
|
|
stt = SpeechToTextService() |
|
|
whisper_result = stt.transcribe_audio(audio_path) |
|
|
|
|
|
transcript = whisper_result['text'] |
|
|
original_language = whisper_result['language'] |
|
|
original_word_count = len(transcript.split()) |
|
|
|
|
|
logger.info(f"Transcription complete. Language: {original_language}") |
|
|
|
|
|
|
|
|
english_transcript = transcript |
|
|
|
|
|
if not is_english(original_language): |
|
|
logger.info("Translating to English...") |
|
|
translation_service = get_translation_service() |
|
|
english_transcript = translation_service.translate_to_english( |
|
|
transcript, |
|
|
original_language |
|
|
) |
|
|
|
|
|
|
|
|
logger.info("Generating summary...") |
|
|
summary = summarizer_service.summarize( |
|
|
text=english_transcript, |
|
|
summary_type=summary_type, |
|
|
chunk_size=2500, |
|
|
max_tokens=500 |
|
|
) |
|
|
|
|
|
|
|
|
final_summary = summary |
|
|
summary_language = "eng" |
|
|
|
|
|
if not is_english(target_language): |
|
|
logger.info(f"Translating summary to {target_language}...") |
|
|
translation_service = get_translation_service() |
|
|
final_summary = translation_service.translate_from_english(summary, target_language) |
|
|
summary_language = target_language |
|
|
|
|
|
|
|
|
summary_word_count = len(final_summary.split()) |
|
|
compression_ratio = (summary_word_count / original_word_count) * 100 if original_word_count > 0 else 0 |
|
|
|
|
|
response = { |
|
|
'success': True, |
|
|
'video_id': video_id, |
|
|
'original_language': original_language, |
|
|
'original_language_name': get_language_name(original_language), |
|
|
'transcript': transcript, |
|
|
'transcript_source': 'whisper', |
|
|
'summary': final_summary, |
|
|
'summary_language': summary_language, |
|
|
'summary_language_name': get_language_name(summary_language), |
|
|
'statistics': { |
|
|
'original_word_count': original_word_count, |
|
|
'summary_word_count': summary_word_count, |
|
|
'compression_ratio': round(compression_ratio, 1), |
|
|
'reading_time_minutes': max(1, summary_word_count // 200) |
|
|
} |
|
|
} |
|
|
|
|
|
if not is_english(original_language): |
|
|
response['english_transcript'] = english_transcript |
|
|
if not is_english(target_language): |
|
|
response['english_summary'] = summary |
|
|
|
|
|
logger.info("Audio processing complete!") |
|
|
return jsonify(response), 200 |
|
|
|
|
|
finally: |
|
|
|
|
|
import os |
|
|
if os.path.exists(audio_path): |
|
|
os.remove(audio_path) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Audio processing failed: {e}") |
|
|
return jsonify({ |
|
|
'error': 'Processing failed', |
|
|
'message': str(e) |
|
|
}), 500 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route('/api/transcript', methods=['POST']) |
|
|
def get_transcript(): |
|
|
""" |
|
|
Extract transcript from YouTube video (multilingual). |
|
|
|
|
|
Request: { "url": "youtube_url", "use_whisper": true } |
|
|
Response: { "success": true, "transcript": "...", "language": "tam", ... } |
|
|
""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
|
|
|
if not data or 'url' not in data: |
|
|
return jsonify({ |
|
|
'error': 'Missing YouTube URL', |
|
|
'message': 'Please provide a valid YouTube URL' |
|
|
}), 400 |
|
|
|
|
|
url = data['url'] |
|
|
use_whisper = data.get('use_whisper', True) |
|
|
|
|
|
video_id = transcript_service.extract_video_id(url) |
|
|
result = transcript_service.get_video_transcript(url, use_whisper_fallback=use_whisper) |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'video_id': video_id, |
|
|
'transcript': result['transcript'], |
|
|
'language': result['language'], |
|
|
'language_name': get_language_name(result['language']), |
|
|
'source': result['source'], |
|
|
'word_count': result['word_count'] |
|
|
}), 200 |
|
|
|
|
|
except ValueError as e: |
|
|
return jsonify({'error': 'Invalid URL', 'message': str(e)}), 400 |
|
|
except Exception as e: |
|
|
logger.error(f"Transcript extraction failed: {e}") |
|
|
return jsonify({'error': 'Transcript extraction failed', 'message': str(e)}), 500 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route('/api/translate', methods=['POST']) |
|
|
def translate_text(): |
|
|
""" |
|
|
Translate text between languages. |
|
|
|
|
|
Request: { "text": "Hello", "source_lang": "eng", "target_lang": "hin" } |
|
|
Response: { "success": true, "translated_text": "नमस्ते", ... } |
|
|
""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
|
|
|
if not data or 'text' not in data: |
|
|
return jsonify({ |
|
|
'error': 'Missing text', |
|
|
'message': 'Please provide text to translate' |
|
|
}), 400 |
|
|
|
|
|
text = data['text'] |
|
|
source_lang = data.get('source_lang', 'eng') |
|
|
target_lang = data.get('target_lang', 'hin') |
|
|
|
|
|
translation_service = get_translation_service() |
|
|
translated = translation_service.translate(text, source_lang, target_lang) |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'translated_text': translated, |
|
|
'source_lang': source_lang, |
|
|
'source_lang_name': get_language_name(source_lang), |
|
|
'target_lang': target_lang, |
|
|
'target_lang_name': get_language_name(target_lang) |
|
|
}), 200 |
|
|
|
|
|
except ValueError as e: |
|
|
return jsonify({'error': 'Invalid language', 'message': str(e)}), 400 |
|
|
except Exception as e: |
|
|
logger.error(f"Translation failed: {e}") |
|
|
return jsonify({'error': 'Translation failed', 'message': str(e)}), 500 |
|
|
|
|
|
|
|
|
@app.route('/api/detect-language', methods=['POST']) |
|
|
def detect_language(): |
|
|
"""Detect the language of given text.""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
|
|
|
if not data or 'text' not in data: |
|
|
return jsonify({ |
|
|
'error': 'Missing text', |
|
|
'message': 'Please provide text for language detection' |
|
|
}), 400 |
|
|
|
|
|
translation_service = get_translation_service() |
|
|
result = translation_service.detect_language(data['text']) |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'language': result['code'], |
|
|
'language_name': result['name'] |
|
|
}), 200 |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Language detection failed: {e}") |
|
|
return jsonify({'error': 'Language detection failed', 'message': str(e)}), 500 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route('/api/summarize', methods=['POST']) |
|
|
def summarize(): |
|
|
""" |
|
|
Generate summary from transcript. |
|
|
|
|
|
Request: { "transcript": "...", "summary_type": "general" } |
|
|
Response: { "success": true, "summary": "...", "statistics": {...} } |
|
|
""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
|
|
|
if not data or 'transcript' not in data: |
|
|
return jsonify({ |
|
|
'error': 'Missing transcript', |
|
|
'message': 'Please provide transcript text' |
|
|
}), 400 |
|
|
|
|
|
transcript = data['transcript'] |
|
|
summary_type = data.get('summary_type', 'general') |
|
|
chunk_size = data.get('chunk_size', 2500) |
|
|
max_tokens = data.get('max_tokens', 500) |
|
|
|
|
|
valid_types = ['general', 'detailed', 'bullet_points', 'key_takeaways'] |
|
|
if summary_type not in valid_types: |
|
|
return jsonify({ |
|
|
'error': 'Invalid summary type', |
|
|
'message': f'Must be one of: {", ".join(valid_types)}' |
|
|
}), 400 |
|
|
|
|
|
summary = summarizer_service.summarize( |
|
|
text=transcript, |
|
|
summary_type=summary_type, |
|
|
chunk_size=chunk_size, |
|
|
max_tokens=max_tokens |
|
|
) |
|
|
|
|
|
summary_word_count = len(summary.split()) |
|
|
original_word_count = len(transcript.split()) |
|
|
compression_ratio = (summary_word_count / original_word_count) * 100 if original_word_count > 0 else 0 |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'summary': summary, |
|
|
'statistics': { |
|
|
'original_word_count': original_word_count, |
|
|
'summary_word_count': summary_word_count, |
|
|
'compression_ratio': round(compression_ratio, 1), |
|
|
'reading_time_minutes': max(1, summary_word_count // 200) |
|
|
} |
|
|
}), 200 |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Summarization failed: {e}") |
|
|
return jsonify({'error': 'Summarization failed', 'message': str(e)}), 500 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route('/api/process', methods=['POST']) |
|
|
def process_video(): |
|
|
""" |
|
|
Full multilingual pipeline: Transcript → Translation → Summary → Translation |
|
|
|
|
|
Request: { |
|
|
"url": "youtube_url", |
|
|
"summary_type": "general", |
|
|
"target_language": "hin" (optional) |
|
|
} |
|
|
""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
|
|
|
if not data or 'url' not in data: |
|
|
return jsonify({ |
|
|
'error': 'Missing YouTube URL', |
|
|
'message': 'Please provide a valid YouTube URL' |
|
|
}), 400 |
|
|
|
|
|
url = data['url'] |
|
|
summary_type = data.get('summary_type', 'general') |
|
|
target_language = data.get('target_language', 'eng') |
|
|
chunk_size = data.get('chunk_size', 2500) |
|
|
max_tokens = data.get('max_tokens', 500) |
|
|
|
|
|
|
|
|
video_id = transcript_service.extract_video_id(url) |
|
|
logger.info(f"Processing video: {video_id}") |
|
|
|
|
|
|
|
|
logger.info("Step 1/4: Extracting transcript...") |
|
|
transcript_result = transcript_service.get_video_transcript(url, use_whisper_fallback=True) |
|
|
|
|
|
original_transcript = transcript_result['transcript'] |
|
|
original_language = transcript_result['language'] |
|
|
original_word_count = transcript_result['word_count'] |
|
|
|
|
|
|
|
|
english_transcript = original_transcript |
|
|
|
|
|
if not is_english(original_language): |
|
|
logger.info("Step 2/4: Translating to English...") |
|
|
translation_service = get_translation_service() |
|
|
english_transcript = translation_service.translate_to_english( |
|
|
original_transcript, |
|
|
original_language |
|
|
) |
|
|
else: |
|
|
logger.info("Step 2/4: Skipped (already English)") |
|
|
|
|
|
|
|
|
logger.info("Step 3/4: Generating summary...") |
|
|
summary = summarizer_service.summarize( |
|
|
text=english_transcript, |
|
|
summary_type=summary_type, |
|
|
chunk_size=chunk_size, |
|
|
max_tokens=max_tokens |
|
|
) |
|
|
|
|
|
|
|
|
final_summary = summary |
|
|
summary_language = "eng" |
|
|
|
|
|
if not is_english(target_language): |
|
|
logger.info(f"Step 4/4: Translating summary to {target_language}...") |
|
|
translation_service = get_translation_service() |
|
|
final_summary = translation_service.translate_from_english(summary, target_language) |
|
|
summary_language = target_language |
|
|
else: |
|
|
logger.info("Step 4/4: Skipped (English output)") |
|
|
|
|
|
|
|
|
summary_word_count = len(final_summary.split()) |
|
|
compression_ratio = (summary_word_count / original_word_count) * 100 if original_word_count > 0 else 0 |
|
|
|
|
|
response = { |
|
|
'success': True, |
|
|
'video_id': video_id, |
|
|
'original_language': original_language, |
|
|
'original_language_name': get_language_name(original_language), |
|
|
'transcript': original_transcript, |
|
|
'transcript_source': transcript_result['source'], |
|
|
'summary': final_summary, |
|
|
'summary_language': summary_language, |
|
|
'summary_language_name': get_language_name(summary_language), |
|
|
'statistics': { |
|
|
'original_word_count': original_word_count, |
|
|
'summary_word_count': summary_word_count, |
|
|
'compression_ratio': round(compression_ratio, 1), |
|
|
'reading_time_minutes': max(1, summary_word_count // 200) |
|
|
} |
|
|
} |
|
|
|
|
|
if not is_english(original_language): |
|
|
response['english_transcript'] = english_transcript |
|
|
if not is_english(target_language): |
|
|
response['english_summary'] = summary |
|
|
|
|
|
logger.info("Processing complete!") |
|
|
return jsonify(response), 200 |
|
|
|
|
|
except ValueError as e: |
|
|
return jsonify({'error': 'Invalid URL', 'message': str(e)}), 400 |
|
|
except Exception as e: |
|
|
logger.error(f"Processing failed: {e}") |
|
|
return jsonify({'error': 'Processing failed', 'message': str(e)}), 500 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.errorhandler(404) |
|
|
def not_found(error): |
|
|
return jsonify({ |
|
|
'error': 'Not found', |
|
|
'message': 'The requested endpoint does not exist' |
|
|
}), 404 |
|
|
|
|
|
|
|
|
@app.errorhandler(500) |
|
|
def internal_error(error): |
|
|
return jsonify({ |
|
|
'error': 'Internal server error', |
|
|
'message': 'An unexpected error occurred' |
|
|
}), 500 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
port = int(os.environ.get('PORT', 7860)) |
|
|
|
|
|
if not os.getenv('GROQ_API_KEY'): |
|
|
print("⚠️ Warning: GROQ_API_KEY not found") |
|
|
print("Set it in HF Spaces Settings → Secrets") |
|
|
|
|
|
print("🚀 Starting YouTube Summarizer API...") |
|
|
print(f"📡 API available at: http://localhost:{port}") |
|
|
|
|
|
app.run(debug=False, host='0.0.0.0', port=port) |
|
|
|