Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import torch | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from collections import defaultdict | |
| from openai import OpenAI | |
| from pydantic import BaseModel, Field, field_validator, ValidationInfo | |
| from typing import Optional, Dict, Any, List, Annotated | |
| from instructor import patch | |
| import instructor | |
| from prompts import sentiments_prompt | |
| # Load model and tokenizer globally for efficiency | |
| model_name = "tabularisai/multilingual-sentiment-analysis" | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model = AutoModelForSequenceClassification.from_pretrained(model_name) | |
| # Define sentiment weights for score calculation | |
| SENTIMENT_WEIGHTS = { | |
| 0: 0.0, # Very Negative | |
| 1: 0.25, # Negative | |
| 2: 0.5, # Neutral | |
| 3: 0.75, # Positive | |
| 4: 1.0 # Very Positive | |
| } | |
| class ExtractProductSentiment(BaseModel): | |
| """Extracts what people like and dislike about a product based on product reviews and sentiment scores (0-100)""" | |
| product_likes: List[str] = Field(..., description="What people like about the product. List of 3 sentences AT MOST. Must be aggregated in the order of importance.") | |
| product_dislikes: List[str] = Field(..., description="What people dislike about the product. List of 3 sentences AT MOST. Must be aggregated in the order of importance.") | |
| def validate_product_likes_and_dislikes(cls, v, info: ValidationInfo): | |
| if not v: | |
| raise ValueError(f"At least one {info.field_name} must be provided. If nothing to say, please enter 'None'") | |
| if len(v) > 3: | |
| raise ValueError( | |
| f"{info.field_name} contains {len(v)} points. Please aggregate the points to a maximum of 3 key points " | |
| "in order of importance. Combine similar points together." | |
| ) | |
| return v | |
| def predict_sentiment_with_scores(texts): | |
| """ | |
| Predict sentiment for a list of texts and return both class labels and sentiment scores | |
| """ | |
| inputs = tokenizer(texts, return_tensors="pt", truncation=True, padding=True, max_length=512) | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1) | |
| # Get predicted classes | |
| sentiment_map = { | |
| 0: "Very Negative", | |
| 1: "Negative", | |
| 2: "Neutral", | |
| 3: "Positive", | |
| 4: "Very Positive" | |
| } | |
| predicted_classes = [sentiment_map[p] for p in torch.argmax(probabilities, dim=-1).tolist()] | |
| # Calculate sentiment scores (0-100) | |
| sentiment_scores = [] | |
| for prob in probabilities: | |
| # Weighted sum of probabilities | |
| score = sum(prob[i].item() * SENTIMENT_WEIGHTS[i] for i in range(len(prob))) | |
| # Scale to 0-100 | |
| sentiment_scores.append(round(score * 100, 2)) | |
| return predicted_classes, sentiment_scores | |
| #patch() # Patch OpenAI client to support response models | |
| def get_product_sentiment(client, reviews: List[str], scores: List[float]) -> ExtractProductSentiment: | |
| """Extract product likes and dislikes using OpenAI""" | |
| # Combine reviews and scores for context | |
| review_context = "\n".join([f"Review (Score: {score}): {review}" | |
| for review, score in zip(reviews, scores)]) | |
| #client = instructor.patch(OpenAI(api_key=openai_api_key)) | |
| prompt = sentiments_prompt.format(review_context=review_context) | |
| response = client.chat.completions.create( | |
| model="gpt-4o", | |
| response_model=ExtractProductSentiment, | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful product analyst."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| max_retries=3 | |
| ) | |
| return response | |
| def create_comparison_charts(sentiment_results, avg_sentiment_scores): | |
| """ | |
| Create comparison charts for sentiment analysis across products | |
| """ | |
| # Create summary DataFrame | |
| summary_data = [] | |
| for product in sentiment_results.keys(): | |
| counts = sentiment_results[product] | |
| total = counts.sum() | |
| row = { | |
| 'Product': product, | |
| 'Average Sentiment Score': avg_sentiment_scores[product], | |
| 'Total Reviews': total, | |
| 'Very Positive %': round((counts.get('Very Positive', 0) / total) * 100, 2), | |
| 'Positive %': round((counts.get('Positive', 0) / total) * 100, 2), | |
| 'Neutral %': round((counts.get('Neutral', 0) / total) * 100, 2), | |
| 'Negative %': round((counts.get('Negative', 0) / total) * 100, 2), | |
| 'Very Negative %': round((counts.get('Very Negative', 0) / total) * 100, 2) | |
| } | |
| summary_data.append(row) | |
| summary_df = pd.DataFrame(summary_data) | |
| # Score comparison chart | |
| score_comparison_fig = px.bar( | |
| summary_df, | |
| x='Product', | |
| y='Average Sentiment Score', | |
| title='Average Sentiment Scores by Product', | |
| labels={'Average Sentiment Score': 'Score (0-100)'} | |
| ) | |
| # Distribution chart | |
| distribution_data = [] | |
| for product in sentiment_results.keys(): | |
| counts = sentiment_results[product] | |
| # Aggregate positive and negative sentiments | |
| aggregated_counts = { | |
| 'Positive': counts.get('Very Positive', 0) + counts.get('Positive', 0), | |
| 'Neutral': counts.get('Neutral', 0), | |
| 'Negative': counts.get('Very Negative', 0) + counts.get('Negative', 0) | |
| } | |
| for sentiment, count in aggregated_counts.items(): | |
| distribution_data.append({ | |
| 'Product': product, | |
| 'Sentiment': sentiment, | |
| 'Count': count | |
| }) | |
| distribution_df = pd.DataFrame(distribution_data) | |
| distribution_fig = px.bar( | |
| distribution_df, | |
| x='Product', | |
| y='Count', | |
| color='Sentiment', | |
| title='Sentiment Distribution by Product', | |
| barmode='group', | |
| color_discrete_map={ | |
| 'Positive': '#2ECC71', # Green | |
| 'Neutral': '#F1C40F', # Yellow | |
| 'Negative': '#E74C3C' # Red | |
| } | |
| ) | |
| # Ratio chart (percentage stacked bar) | |
| ratio_fig = px.bar( | |
| distribution_df, | |
| x='Product', | |
| y='Count', | |
| color='Sentiment', | |
| title='Sentiment Distribution Ratio by Product', | |
| barmode='relative' | |
| ) | |
| return score_comparison_fig, distribution_fig, ratio_fig, summary_df | |
| def process_single_sheet(df, product_name, openai_client): | |
| """ | |
| Process a single dataframe and return sentiment analysis results | |
| """ | |
| if 'Reviews' not in df.columns: | |
| raise ValueError(f"'Reviews' column not found in sheet/file for {product_name}") | |
| reviews = df['Reviews'].fillna("") | |
| sentiments, scores = predict_sentiment_with_scores(reviews.tolist()) | |
| df['Sentiment'] = sentiments | |
| df['Sentiment_Score'] = scores | |
| # Extract product likes and dislikes | |
| try: | |
| product_sentiment = get_product_sentiment(openai_client, reviews.tolist(), scores) | |
| # Initialize empty columns | |
| df['Likes'] = "" | |
| df['Dislikes'] = "" | |
| # Get the likes and dislikes lists | |
| likes_list = product_sentiment.product_likes | |
| dislikes_list = product_sentiment.product_dislikes | |
| # Only populate the first N rows where N is the length of the likes/dislikes lists | |
| for idx, (like, dislike) in enumerate(zip(likes_list, dislikes_list)): | |
| df.loc[idx, 'Likes'] = like | |
| df.loc[idx, 'Dislikes'] = dislike | |
| except Exception as e: | |
| print(f"Error extracting likes/dislikes for {product_name}: {str(e)}") | |
| df['Likes'] = "" | |
| df['Dislikes'] = "" | |
| # Calculate sentiment distribution | |
| sentiment_counts = pd.Series(sentiments).value_counts() | |
| avg_sentiment_score = round(sum(scores) / len(scores), 2) | |
| return df, sentiment_counts, avg_sentiment_score | |
| def process_file(file_obj, api_key): | |
| """ | |
| Process the input file and add sentiment analysis results | |
| """ | |
| try: | |
| if not api_key: | |
| raise ValueError("OpenAI API key is required") | |
| client = instructor.patch(OpenAI(api_key=api_key)) | |
| file_path = file_obj.name | |
| sentiment_results = defaultdict(pd.Series) | |
| avg_sentiment_scores = {} | |
| all_processed_dfs = {} | |
| if file_path.endswith('.csv'): | |
| df = pd.read_csv(file_path) | |
| product_name = "Product" # Default name for CSV | |
| processed_df, sentiment_counts, avg_score = process_single_sheet(df, product_name, client) | |
| all_processed_dfs[product_name] = processed_df | |
| sentiment_results[product_name] = sentiment_counts | |
| avg_sentiment_scores[product_name] = avg_score | |
| elif file_path.endswith(('.xlsx', '.xls')): | |
| excel_file = pd.ExcelFile(file_path) | |
| for sheet_name in excel_file.sheet_names: | |
| df = pd.read_excel(file_path, sheet_name=sheet_name) | |
| processed_df, sentiment_counts, avg_score = process_single_sheet(df, sheet_name, client) | |
| all_processed_dfs[sheet_name] = processed_df | |
| sentiment_results[sheet_name] = sentiment_counts | |
| avg_sentiment_scores[sheet_name] = avg_score | |
| else: | |
| raise ValueError("Unsupported file format. Please upload a CSV or Excel file.") | |
| # Create visualizations with new sentiment score chart | |
| score_comparison_fig, distribution_fig, ratio_fig, summary_df = create_comparison_charts( | |
| sentiment_results, avg_sentiment_scores | |
| ) | |
| # Save results | |
| output_path = "sentiment_analysis_results.xlsx" | |
| with pd.ExcelWriter(output_path) as writer: | |
| for sheet_name, df in all_processed_dfs.items(): | |
| df.to_excel(writer, sheet_name=sheet_name, index=False) | |
| if isinstance(summary_df, pd.DataFrame): # Safety check | |
| summary_df.to_excel(writer, sheet_name='Summary', index=False) | |
| return score_comparison_fig, distribution_fig, summary_df, output_path | |
| except Exception as e: | |
| raise gr.Error(str(e)) | |
| # Update the Gradio interface | |
| with gr.Blocks() as interface: | |
| gr.Markdown("# Product Review Sentiment Analysis") | |
| gr.Markdown(""" | |
| ### Quick Guide | |
| 1. **Excel File (Multiple Products)**: | |
| - Create separate sheets for each product | |
| - Name sheets with product/company names | |
| - Include "Reviews" column in each sheet | |
| 2. **CSV File (Single Product)**: | |
| - Include "Reviews" column | |
| Upload your file and click Analyze to get started. | |
| """) | |
| with gr.Row(): | |
| api_key_input = gr.Textbox( | |
| label="OpenAI API Key", | |
| placeholder="Enter your OpenAI API key", | |
| type="password" | |
| ) | |
| with gr.Row(): | |
| file_input = gr.File( | |
| label="Upload File (CSV or Excel)", | |
| file_types=[".csv", ".xlsx", ".xls"] | |
| ) | |
| with gr.Row(): | |
| analyze_btn = gr.Button("Analyze Sentiments") | |
| with gr.Row(): | |
| sentiment_score_plot = gr.Plot(label="Weighted Sentiment Scores") | |
| with gr.Row(): | |
| distribution_plot = gr.Plot(label="Sentiment Distribution") | |
| with gr.Row(): | |
| summary_table = gr.Dataframe(label="Summary Metrics") | |
| with gr.Row(): | |
| output_file = gr.File(label="Download Full Report") | |
| analyze_btn.click( | |
| fn=process_file, | |
| inputs=[file_input, api_key_input], | |
| outputs=[sentiment_score_plot, distribution_plot, summary_table, output_file] | |
| ) | |
| # Launch interface | |
| interface.launch() |