Spaces:
Sleeping
Sleeping
| import os | |
| import spacy | |
| import stanza | |
| import pandas as pd | |
| import re | |
| import docx | |
| from collections import Counter | |
| import stanza | |
| from transformers import pipeline | |
| import torch | |
| from langdetect import detect | |
| import streamlit as st | |
| import io | |
| from newspaper import Article # β for URL input | |
| import google.generativeai as genai # β Gemini for insights | |
| # =============================== | |
| # π§ Safe SpaCy + Stanza Downloads | |
| # =============================== | |
| def safe_load_spacy(): | |
| try: | |
| return spacy.load("en_core_web_trf") | |
| except OSError: | |
| try: | |
| return spacy.load("en_core_web_sm") | |
| except OSError: | |
| os.system("python -m spacy download en_core_web_sm") | |
| return spacy.load("en_core_web_sm") | |
| nlp_en = safe_load_spacy() | |
| stanza_dir = os.path.expanduser("~/.stanza_resources") | |
| if not os.path.exists(stanza_dir): | |
| stanza.download('hi') | |
| stanza.download('ta') | |
| stanza.download('hi') | |
| stanza.download('ta') | |
| nlp_hi = stanza.Pipeline('hi', processors='tokenize,pos', use_gpu=torch.cuda.is_available()) | |
| nlp_ta = stanza.Pipeline('ta', processors='tokenize,pos', use_gpu=torch.cuda.is_available()) | |
| # =============================== | |
| # Gemini setup | |
| # =============================== | |
| api_key = os.getenv("GEMINI_API_KEY") | |
| if not api_key: | |
| raise ValueError("β Missing GEMINI_API_KEY. Please set it in Hugging Face secrets or locally.") | |
| genai.configure(api_key=api_key) | |
| # =============================== | |
| # Language-Aware Pipeline Loader | |
| # =============================== | |
| def load_pipelines(language_code): | |
| lang = language_code.upper() | |
| device = 0 if torch.cuda.is_available() else -1 | |
| st.write(f"π Language detected: {lang}") | |
| st.write(f"Device set to use {'cuda:0' if device == 0 else 'cpu'}") | |
| if lang == "EN": | |
| emo_model = "SamLowe/roberta-base-go_emotions" | |
| elif lang in ["HI", "TA"]: | |
| emo_model = "bhadresh-savani/bert-base-go-emotion" | |
| else: | |
| emo_model = "SamLowe/roberta-base-go_emotions" | |
| emotion_pipeline = pipeline( | |
| "text-classification", | |
| model=emo_model, | |
| tokenizer=emo_model, | |
| return_all_scores=True, | |
| device=device | |
| ) | |
| if lang == "EN": | |
| sent_model = "distilbert-base-uncased-finetuned-sst-2-english" | |
| else: | |
| sent_model = "cardiffnlp/twitter-xlm-roberta-base-sentiment-multilingual" | |
| sentiment_pipeline = pipeline( | |
| "text-classification", | |
| model=sent_model, | |
| tokenizer=sent_model, | |
| return_all_scores=True, | |
| device=device | |
| ) | |
| return emotion_pipeline, sentiment_pipeline | |
| # =============================== | |
| # DOCX Reader β keep paras separate | |
| # =============================== | |
| def read_and_split_articles(file_path): | |
| doc = docx.Document(file_path) | |
| paragraphs = [para.text.strip() for para in doc.paragraphs if para.text.strip()] | |
| return paragraphs | |
| # =============================== | |
| # URL Reader β title + main body | |
| # =============================== | |
| def read_article_from_url(url): | |
| article = Article(url) | |
| article.download() | |
| article.parse() | |
| title = article.title.strip() | |
| body = article.text.strip() | |
| full_text = f"{title}\n\n{body}" | |
| return full_text | |
| # =============================== | |
| # Filter Neutral | |
| # =============================== | |
| def filter_neutral(emotion_results, neutral_threshold=0.75): | |
| scores = {r["label"]: round(r["score"], 3) | |
| for r in sorted(emotion_results, key=lambda x: x["score"], reverse=True)} | |
| if "neutral" in scores and scores["neutral"] > neutral_threshold: | |
| scores.pop("neutral") | |
| return scores | |
| # =============================== | |
| # Sentence Splitter | |
| # =============================== | |
| def split_sentences(text, lang): | |
| if lang == "hi": | |
| sentences = re.split(r'ΰ₯€', text) | |
| elif lang == "ta": | |
| sentences = re.split(r'\.', text) | |
| else: | |
| doc = nlp_en(text) | |
| sentences = [sent.text.strip() for sent in doc.sents] | |
| return [s.strip() for s in sentences if s.strip()] | |
| # =============================== | |
| # POS Tagger | |
| # =============================== | |
| def get_pos_tags(sentence, lang): | |
| if lang == "en": | |
| doc = nlp_en(sentence) | |
| return [(token.text, token.pos_) for token in doc] | |
| elif lang == "hi": | |
| doc = nlp_hi(sentence) | |
| return [(word.text, word.upos) for sent in doc.sentences for word in sent.words] | |
| elif lang == "ta": | |
| doc = nlp_ta(sentence) | |
| return [(word.text, word.upos) for sent in doc.sentences for word in sent.words] | |
| else: | |
| return [] | |
| # =============================== | |
| # Gemini β Generate Insight + Rewrites | |
| # =============================== | |
| def generate_insight(paragraph, emotions, sentiment): | |
| """Use Gemini to suggest improvements and rewrites with Top 3 emotions only""" | |
| try: | |
| top_emotions = sorted(emotions.items(), key=lambda x: x[1], reverse=True)[:3] | |
| emo_text = ", ".join([f"{k}: {v}" for k, v in top_emotions]) | |
| sent_text = f"{sentiment['label']} ({round(sentiment['score'], 3)})" if sentiment else "N/A" | |
| prompt = ( | |
| f"Here is a paragraph:\n\n{paragraph}\n\n" | |
| f"Top 3 detected emotions: {emo_text}\n" | |
| f"Overall sentiment: {sent_text}\n\n" | |
| "π Please provide:\n" | |
| "1. A rewrite that keeps meaning intact but improves clarity and flow.\n" | |
| "2. A rewrite that emphasizes the detected emotions to increase engagement.\n" | |
| "Make them concrete and content-specific, not generic advice." | |
| ) | |
| model = genai.GenerativeModel("gemini-1.5-flash") | |
| response = model.generate_content(prompt) | |
| return response.text.strip() if response and response.text else "No insight generated." | |
| except Exception as e: | |
| return f"β οΈ Insight generation failed: {str(e)}" | |
| # =============================== | |
| # Normalize Scores (scale to 1) | |
| # =============================== | |
| def normalize_scores(scores: dict): | |
| if not scores: | |
| return scores | |
| max_val = max(scores.values()) | |
| if max_val == 0: | |
| return scores | |
| return {k: round(v / max_val, 3) for k, v in scores.items()} | |
| # =============================== | |
| # Analysis Function | |
| # =============================== | |
| def analyze_article(article_text, lang, emotion_pipeline, sentiment_pipeline): | |
| export_rows = [] | |
| paragraphs = [p.strip() for p in article_text.split("\n\n") if p.strip()] | |
| if len(paragraphs) <= 1: | |
| paragraphs = [p.strip() for p in article_text.split("\n") if p.strip()] | |
| # Weighted overall results | |
| weighted_scores = {} | |
| total_length = 0 | |
| all_sentiments = [] | |
| for para in paragraphs: | |
| sentences = split_sentences(para, lang[:2]) | |
| for sentence in sentences: | |
| emo_results = emotion_pipeline(sentence[:512])[0] | |
| filtered = filter_neutral(emo_results) | |
| length = len(sentence.split()) | |
| total_length += length | |
| for emo, score in filtered.items(): | |
| weighted_scores[emo] = weighted_scores.get(emo, 0) + score * length | |
| sentiment_results = sentiment_pipeline(sentence[:512])[0] | |
| all_sentiments.append(max(sentiment_results, key=lambda x: x["score"])) | |
| if total_length > 0: | |
| weighted_scores = {emo: val / total_length for emo, val in weighted_scores.items()} | |
| weighted_scores = normalize_scores(weighted_scores) # β normalize to scale of 1 | |
| overall_sentiment = max(all_sentiments, key=lambda x: x["score"]) if all_sentiments else {} | |
| st.subheader("π OVERALL (Weighted)") | |
| st.write("Emotions β", weighted_scores) | |
| st.write("Sentiment β", overall_sentiment) | |
| export_rows.append({ | |
| "Type": "Overall", | |
| "Text": "Weighted across article", | |
| "Emotions": weighted_scores, | |
| "Sentiment": overall_sentiment | |
| }) | |
| # Paragraph-level | |
| for p_idx, para in enumerate(paragraphs, start=1): | |
| para_counter = Counter() | |
| all_para_sentiments = [] | |
| sentences = split_sentences(para, lang[:2]) | |
| for sentence in sentences: | |
| results = emotion_pipeline(sentence[:512])[0] | |
| filtered = filter_neutral(results, neutral_threshold=0.75) | |
| for emo, score in filtered.items(): | |
| para_counter[emo] += score | |
| sentiment_results = sentiment_pipeline(sentence[:512])[0] | |
| all_para_sentiments.append(max(sentiment_results, key=lambda x: x["score"])) | |
| para_emotions = dict(sorted(para_counter.items(), key=lambda x: x[1], reverse=True)) | |
| para_emotions = normalize_scores(para_emotions) # β normalize to scale of 1 | |
| para_sentiment = max(all_para_sentiments, key=lambda x: x["score"]) if all_para_sentiments else {} | |
| st.write(f"\nπ Paragraph {p_idx}: {para}") | |
| st.write("Emotions β", para_emotions) | |
| st.write("Sentiment β", para_sentiment) | |
| insight = generate_insight(para, para_emotions, para_sentiment) | |
| st.write("π‘ Insights + Rewrites β", insight) | |
| export_rows.append({ | |
| "Type": "Paragraph", | |
| "Text": para, | |
| "Emotions": para_emotions, | |
| "Sentiment": para_sentiment, | |
| "Insight": insight | |
| }) | |
| return export_rows | |
| # =============================== | |
| # Streamlit App | |
| # =============================== | |
| st.title("π Multilingual Text Emotion + Sentiment Analyzer") | |
| uploaded_file = st.file_uploader("Upload a DOCX file", type=["docx"]) | |
| url_input = st.text_input("Or enter an Article URL") | |
| text_input = st.text_area("Or paste text here") | |
| if st.button("π Analyze"): | |
| with st.spinner("Running analysis... β³"): | |
| if uploaded_file: | |
| articles = read_and_split_articles(uploaded_file) | |
| text_to_analyze = "\n\n".join(articles) | |
| elif url_input.strip(): | |
| text_to_analyze = read_article_from_url(url_input) | |
| elif text_input.strip(): | |
| text_to_analyze = text_input | |
| else: | |
| st.warning("Please upload a DOCX, enter a URL, or paste text to analyze.") | |
| st.stop() | |
| detected_lang = detect(text_to_analyze[:200]) if text_to_analyze else "en" | |
| emotion_pipeline, sentiment_pipeline = load_pipelines(detected_lang) | |
| export_rows = analyze_article(text_to_analyze, detected_lang, emotion_pipeline, sentiment_pipeline) | |
| # β Download buttons FIRST | |
| df_export = pd.DataFrame(export_rows) | |
| csv = df_export.to_csv(index=False).encode("utf-8") | |
| st.download_button( | |
| label="β¬οΈ Download CSV", | |
| data=csv, | |
| file_name="analysis_results.csv", | |
| mime="text/csv", | |
| ) | |
| excel_buffer = io.BytesIO() | |
| df_export.to_excel(excel_buffer, index=False, engine="xlsxwriter") | |
| st.download_button( | |
| label="β¬οΈ Download Excel", | |
| data=excel_buffer, | |
| file_name="analysis_results.xlsx", | |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
| ) | |