bskrishna2006
Add youtube_transcript_api for better cloud compatibility
b4562f5
"""
Transcript Service for YouTube Videos
This service extracts transcripts from YouTube videos using multiple methods:
1. First, try youtube_transcript_api (works well on cloud platforms)
2. Then try yt-dlp subtitle extraction
3. If no subtitles available, fallback to audio extraction + Whisper transcription
The fallback uses the SpeechToTextService for local Whisper transcription.
"""
import re
import os
import tempfile
import logging
from typing import Optional, Tuple, List
# Try to import youtube_transcript_api (more reliable for cloud deployments)
try:
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound
HAS_YOUTUBE_TRANSCRIPT_API = True
except ImportError:
HAS_YOUTUBE_TRANSCRIPT_API = False
import yt_dlp
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TranscriptService:
"""
Service for extracting transcripts from YouTube videos.
Supports two methods:
1. Subtitle extraction (fast, no ML models)
2. Audio transcription via Whisper (slower, requires SpeechToTextService)
"""
def __init__(self):
"""Initialize the transcript service."""
self._speech_to_text = None # Lazy-loaded
def _get_speech_to_text_service(self):
"""Lazy-load the SpeechToTextService to avoid loading Whisper unless needed."""
if self._speech_to_text is None:
from services.speech_to_text import SpeechToTextService
self._speech_to_text = SpeechToTextService()
return self._speech_to_text
def extract_video_id(self, url: str) -> str:
"""
Extract video ID from YouTube URL.
Args:
url: YouTube URL in various formats
Returns:
11-character video ID
Raises:
ValueError: If URL is invalid
"""
regex = r"(?:v=|\/|youtu\.be\/)([0-9A-Za-z_-]{11}).*"
match = re.search(regex, url)
if match:
return match.group(1)
raise ValueError("Invalid YouTube URL")
def clean_autogen_transcript(self, text: str) -> str:
"""
Clean auto-generated YouTube captions.
Removes:
- <c>...</c> tags
- Timestamps like <00:00:06.480>
- Multiple spaces
Args:
text: Raw VTT subtitle text
Returns:
Cleaned transcript text
"""
# Remove <c>...</c> tags
text = re.sub(r"</?c>", "", text)
# Remove timestamps like <00:00:06.480>
text = re.sub(r"<\d{2}:\d{2}:\d{2}\.\d{3}>", "", text)
# Collapse multiple spaces
text = re.sub(r"\s+", " ", text).strip()
return text
def get_transcript_api(self, video_id: str) -> Optional[dict]:
"""
Get transcript using youtube_transcript_api (works better on cloud platforms).
Args:
video_id: YouTube video ID
Returns:
Dictionary with transcript and language, or None if not available
"""
if not HAS_YOUTUBE_TRANSCRIPT_API:
logger.info("youtube_transcript_api not installed, skipping...")
return None
try:
# Try to get transcript in preferred languages
preferred_langs = ['en', 'en-IN', 'hi', 'ta', 'te', 'kn', 'ml', 'gu', 'bn', 'mr', 'pa', 'ur']
try:
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
# Try to find a manual transcript first, then auto-generated
transcript = None
detected_lang = "eng"
# First try manual transcripts
for lang in preferred_langs:
try:
transcript = transcript_list.find_manually_created_transcript([lang])
detected_lang = lang
break
except:
pass
# Then try auto-generated
if not transcript:
for lang in preferred_langs:
try:
transcript = transcript_list.find_generated_transcript([lang])
detected_lang = lang
break
except:
pass
# If still no transcript, try to get any available
if not transcript:
for t in transcript_list:
transcript = t
detected_lang = t.language_code
break
if transcript:
# Fetch the actual transcript
transcript_data = transcript.fetch()
# Combine all text
text_parts = [entry['text'] for entry in transcript_data]
full_text = ' '.join(text_parts)
# Clean the text
clean_text = self.clean_autogen_transcript(full_text)
if len(clean_text.strip()) < 50:
logger.info("Transcript too short")
return None
# Normalize language code
lang_map = {
"en": "eng", "en-IN": "eng", "en-US": "eng", "en-GB": "eng",
"hi": "hin", "hi-IN": "hin",
"ta": "tam", "ta-IN": "tam",
"te": "tel", "te-IN": "tel",
"kn": "kan", "kn-IN": "kan",
"ml": "mal", "ml-IN": "mal",
"gu": "guj", "gu-IN": "guj",
"bn": "ben", "bn-IN": "ben",
"mr": "mar", "mr-IN": "mar",
"pa": "pan", "pa-IN": "pan",
"ur": "urd", "ur-PK": "urd",
}
normalized_lang = lang_map.get(detected_lang, detected_lang)
logger.info(f"Transcript fetched via API (language: {normalized_lang})")
return {
"transcript": clean_text,
"language": normalized_lang,
"source": "youtube_api",
"word_count": len(clean_text.split())
}
except TranscriptsDisabled:
logger.info("Transcripts are disabled for this video")
return None
except NoTranscriptFound:
logger.info("No transcript found for this video")
return None
except Exception as e:
logger.warning(f"youtube_transcript_api failed: {e}")
return None
return None
def get_subtitles(self, url: str, lang: str = "en") -> Optional[dict]:
"""
Try to get existing subtitles from YouTube using yt-dlp.
Args:
url: YouTube video URL
lang: Preferred language code (default: "en")
Returns:
Dictionary with transcript and language, or None if no subtitles
"""
with tempfile.TemporaryDirectory() as temp_dir:
ydl_opts = {
"skip_download": True,
"writesubtitles": True,
"writeautomaticsub": True,
"subtitlesformat": "vtt",
"outtmpl": os.path.join(temp_dir, "%(id)s.%(ext)s"),
"quiet": True,
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
ydl.download([url])
# Find subtitle file
video_id = info["id"]
sub_file = None
detected_lang = "eng"
for file in os.listdir(temp_dir):
if file.startswith(video_id) and file.endswith(".vtt"):
sub_file = os.path.join(temp_dir, file)
# Try to extract language from filename
# Format: videoId.lang.vtt
parts = file.split(".")
if len(parts) >= 3:
detected_lang = parts[-2]
break
if not sub_file:
logger.info("No subtitle file found")
return None
# Read and clean VTT file
lines = []
with open(sub_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
if line.startswith("WEBVTT"):
continue
if "-->" in line:
continue
if re.match(r"^\d+$", line):
continue
lines.append(line)
raw_text = " ".join(lines)
clean_text = self.clean_autogen_transcript(raw_text)
if not clean_text or len(clean_text.strip()) < 50:
logger.info("Extracted subtitles too short")
return None
# Map common language codes
lang_map = {
"en": "eng", "en-US": "eng", "en-GB": "eng",
"hi": "hin", "hi-IN": "hin",
"ta": "tam", "ta-IN": "tam",
"te": "tel", "te-IN": "tel",
"kn": "kan", "kn-IN": "kan",
"ml": "mal", "ml-IN": "mal",
"gu": "guj", "gu-IN": "guj",
"bn": "ben", "bn-IN": "ben",
"mr": "mar", "mr-IN": "mar",
"pa": "pan", "pa-IN": "pan",
"ur": "urd", "ur-PK": "urd",
}
normalized_lang = lang_map.get(detected_lang, detected_lang)
logger.info(f"Subtitles extracted successfully (language: {normalized_lang})")
return {
"transcript": clean_text,
"language": normalized_lang,
"source": "subtitles",
"word_count": len(clean_text.split())
}
except Exception as e:
logger.warning(f"Subtitle extraction failed: {e}")
return None
def get_video_transcript(self, url: str, use_whisper_fallback: bool = True) -> dict:
"""
Get transcript from a YouTube video.
Tries multiple methods in order:
1. youtube_transcript_api (works best on cloud platforms)
2. yt-dlp subtitle extraction
3. Whisper transcription (fallback)
Args:
url: YouTube video URL
use_whisper_fallback: Whether to use Whisper if no subtitles (default: True)
Returns:
Dictionary with:
- transcript: The transcript text
- language: Detected/extracted language code
- source: "youtube_api", "subtitles", or "whisper"
- word_count: Number of words
Raises:
Exception: If transcript cannot be obtained
"""
# Extract video ID for API-based methods
video_id = self.extract_video_id(url)
# Method 1: Try youtube_transcript_api first (best for cloud platforms)
logger.info("Attempting to get transcript via YouTube API...")
result = self.get_transcript_api(video_id)
if result:
return result
# Method 2: Try yt-dlp subtitle extraction
logger.info("Attempting to get subtitles via yt-dlp...")
result = self.get_subtitles(url)
if result:
return result
# Fallback to Whisper transcription
if use_whisper_fallback:
logger.info("No subtitles found. Falling back to Whisper transcription...")
try:
stt_service = self._get_speech_to_text_service()
whisper_result = stt_service.transcribe_youtube_video(url)
return {
"transcript": whisper_result["text"],
"language": whisper_result["language"],
"source": "whisper",
"word_count": whisper_result["word_count"]
}
except Exception as e:
logger.error(f"Whisper transcription failed: {e}")
raise Exception(f"Could not retrieve transcript: {str(e)}")
raise Exception("No subtitles available and Whisper fallback is disabled")
def get_video_transcript_legacy(self, url: str, lang: str = "en") -> str:
"""
Legacy method for backward compatibility.
Returns only the transcript text (no language info).
"""
result = self.get_video_transcript(url, use_whisper_fallback=True)
return result["transcript"]