| """ |
| pipeline.py |
| Orchestrates fetch -> clean -> summarize in a single pipeline call. |
| Author: algorembrant |
| """ |
|
|
| from __future__ import annotations |
|
|
| import os |
| import sys |
| from dataclasses import dataclass, field |
| from typing import Optional |
|
|
| from fetcher import TranscriptResult, fetch, extract_video_id |
| from cleaner import clean |
| from summarizer import summarize |
| from config import DEFAULT_MODEL, QUALITY_MODEL |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class PipelineResult: |
| video_id: str |
| raw: str = "" |
| cleaned: str = "" |
| summary: str = "" |
| errors: list[str] = field(default_factory=list) |
|
|
| @property |
| def success(self) -> bool: |
| return not self.errors |
|
|
|
|
| |
| |
| |
|
|
| def run( |
| url_or_id: str, |
| languages: list[str] | None = None, |
| do_clean: bool = False, |
| do_summarize: bool = False, |
| summary_mode: str = "brief", |
| model: str = DEFAULT_MODEL, |
| quality: bool = False, |
| stream: bool = True, |
| output_dir: str | None = None, |
| transcript_format: str = "text", |
| timestamps: bool = False, |
| ) -> PipelineResult: |
| """ |
| Full pipeline for one video. |
| |
| Args: |
| url_or_id: YouTube URL or video ID. |
| languages: Language preference list. |
| do_clean: Run paragraph cleaner. |
| do_summarize: Run summarizer. |
| summary_mode: One of 'brief', 'detailed', 'bullets', 'outline'. |
| model: Anthropic model identifier. |
| quality: Use the higher-quality model instead of the default fast one. |
| stream: Stream AI tokens to stderr. |
| output_dir: Directory to write output files (optional). |
| transcript_format: Raw transcript format: 'text', 'json', 'srt', 'vtt'. |
| timestamps: Include timestamps in plain-text transcript. |
| |
| Returns: |
| PipelineResult with all produced artifacts. |
| """ |
| chosen_model = QUALITY_MODEL if quality else model |
| result = PipelineResult(video_id="") |
|
|
| |
| try: |
| video_id = extract_video_id(url_or_id) |
| result.video_id = video_id |
| except ValueError as exc: |
| result.errors.append(str(exc)) |
| return result |
|
|
| |
| print(f"\n[fetch] {video_id}", file=sys.stderr) |
| transcript: TranscriptResult = fetch(video_id, languages=languages) |
| result.raw = transcript.formatted(transcript_format, timestamps=timestamps) |
| plain_text = transcript.plain_text |
|
|
| |
| if do_clean: |
| print(f"\n[clean] Running paragraph cleaner...", file=sys.stderr) |
| try: |
| result.cleaned = clean(plain_text, model=chosen_model, stream=stream) |
| except Exception as exc: |
| result.errors.append(f"Cleaner error: {exc}") |
|
|
| |
| if do_summarize: |
| print(f"\n[summarize] Mode: {summary_mode}", file=sys.stderr) |
| |
| source_text = result.cleaned if result.cleaned else plain_text |
| try: |
| result.summary = summarize( |
| source_text, mode=summary_mode, model=chosen_model, stream=stream |
| ) |
| except Exception as exc: |
| result.errors.append(f"Summarizer error: {exc}") |
|
|
| |
| if output_dir: |
| _save(result, output_dir, transcript_format) |
|
|
| return result |
|
|
|
|
| def _save(result: PipelineResult, output_dir: str, fmt: str) -> None: |
| """Write all non-empty artifacts to output_dir.""" |
| os.makedirs(output_dir, exist_ok=True) |
| vid = result.video_id |
|
|
| ext_map = {"text": "txt", "json": "json", "srt": "srt", "vtt": "vtt"} |
| ext = ext_map.get(fmt, "txt") |
|
|
| files_written = [] |
|
|
| if result.raw: |
| p = os.path.join(output_dir, f"{vid}_transcript.{ext}") |
| _write(p, result.raw) |
| files_written.append(p) |
|
|
| if result.cleaned: |
| p = os.path.join(output_dir, f"{vid}_cleaned.txt") |
| _write(p, result.cleaned) |
| files_written.append(p) |
|
|
| if result.summary: |
| p = os.path.join(output_dir, f"{vid}_summary.txt") |
| _write(p, result.summary) |
| files_written.append(p) |
|
|
| for path in files_written: |
| print(f"[saved] {path}", file=sys.stderr) |
|
|
|
|
| def _write(path: str, content: str) -> None: |
| with open(path, "w", encoding="utf-8") as f: |
| f.write(content) |
|
|
|
|
| |
| |
| |
|
|
| def run_batch( |
| urls_or_ids: list[str], |
| **kwargs, |
| ) -> list[PipelineResult]: |
| """ |
| Run the pipeline for multiple videos sequentially. |
| All keyword arguments are forwarded to `run()`. |
| |
| Returns a list of PipelineResult, one per video. |
| """ |
| results = [] |
| total = len(urls_or_ids) |
| for i, url_or_id in enumerate(urls_or_ids, 1): |
| print(f"\n{'='*60}", file=sys.stderr) |
| print(f"[{i}/{total}] Processing: {url_or_id}", file=sys.stderr) |
| print(f"{'='*60}", file=sys.stderr) |
| r = run(url_or_id, **kwargs) |
| results.append(r) |
| return results |
|
|