Spaces:
Sleeping
Sleeping
File size: 4,153 Bytes
2d6a8ea |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# app/fetch/fetcher.py
from __future__ import annotations
import re
from typing import List, Optional
from urllib.parse import urlparse
import httpx
from bs4 import BeautifulSoup
from readability import Document
import trafilatura
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
HEADERS = {
"User-Agent": USER_AGENT,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en;q=0.9",
}
TIMEOUT = httpx.Timeout(10.0, connect=5.0)
BLOCKED_SCHEMES = {"javascript", "data"}
BLOCKED_EXTS = {".pdf", ".jpg", ".jpeg", ".png", ".gif", ".webp", ".svg"}
def _looks_blocked(url: str) -> bool:
try:
p = urlparse(url)
if p.scheme in BLOCKED_SCHEMES:
return True
for ext in BLOCKED_EXTS:
if p.path.lower().endswith(ext):
return True
except Exception:
return True
return False
async def fetch_html(url: str) -> Optional[str]:
if _looks_blocked(url):
return None
async with httpx.AsyncClient(headers=HEADERS, timeout=TIMEOUT, follow_redirects=True) as client:
resp = await client.get(url)
ct = resp.headers.get("Content-Type", "")
if "text/html" not in ct and "application/xhtml+xml" not in ct:
return None
resp.raise_for_status()
return resp.text
def _clean_text(txt: str) -> str:
txt = re.sub(r"\r\n|\r", "\n", txt)
txt = re.sub(r"[ \t]+", " ", txt)
txt = re.sub(r"\n{3,}", "\n\n", txt)
return txt.strip()
def _bs4_text(html: str) -> str:
soup = BeautifulSoup(html, "lxml")
# Drop nav, footer, script, style
for tag in soup(["script", "style", "noscript", "header", "footer", "nav", "aside"]):
tag.decompose()
return _clean_text(soup.get_text("\n"))
def extract_main_text(html: str, base_url: str | None = None) -> Optional[str]:
# Try trafilatura first
try:
txt = trafilatura.extract(html, url=base_url, include_comments=False, include_tables=False)
if txt and len(txt) >= 400:
return _clean_text(txt)
except Exception:
pass
# Fallback to readability-lxml
try:
doc = Document(html)
summary_html = doc.summary() or ""
txt = _bs4_text(summary_html)
if txt and len(txt) >= 300:
return _clean_text(txt)
except Exception:
pass
# Last resort: whole page text
try:
txt = _bs4_text(html)
if txt and len(txt) >= 200:
return _clean_text(txt)
except Exception:
pass
return None
def _split_paragraphs(text: str) -> List[str]:
# Split on blank lines first
parts = [p.strip() for p in re.split(r"\n\s*\n", text) if p.strip()]
out: list[str] = []
for p in parts:
# Further split very long paragraphs by sentence groups
if len(p) > 1200:
chunks = re.split(r"(?<=[.!?])\s+(?=[A-Z0-9])", p)
buf = []
cur = ""
for s in chunks:
cur = (cur + " " + s).strip()
if len(cur) >= 400:
buf.append(cur)
cur = ""
if cur:
buf.append(cur)
out.extend(buf)
else:
out.append(p)
# Filter short or junky lines
out = [p for p in out if len(p) >= 160]
# Deduplicate
seen: set[str] = set()
deduped: list[str] = []
for p in out:
key = re.sub(r"\W+", " ", p).strip().casefold()
if key not in seen:
seen.add(key)
deduped.append(p)
return deduped[:12] # cap
async def get_paragraphs_for_url(url: str) -> List[str]:
html = await fetch_html(url)
if not html:
return []
text = extract_main_text(html, base_url=url)
if not text:
return []
return _split_paragraphs(text)
async def get_paragraphs_with_fallback(url: str, snippet: str | None) -> List[str]:
paras = await get_paragraphs_for_url(url)
if paras:
return paras
return [snippet] if snippet else []
|