File size: 6,225 Bytes
a03bf1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# Bi Encoders Only
import math
from collections import Counter
import torch
import numpy as np
from typing import Dict, Any
from engineering_parser import extract_steps
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from rouge_score import rouge_scorer
from bert_score import BERTScorer


# Model & Scorer Initialization
# These models are loaded once when the module is imported, which is efficient.
print("Initializing evaluation models...")
SENTENCE_MODEL = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
ROUGE_SCORER = rouge_scorer.RougeScorer(['rouge2', 'rougeL', 'rougeLsum'], use_stemmer=True)
BERT_SCORER = BERTScorer(model_type='allenai/longformer-base-4096', 
                         device='cuda' if torch.cuda.is_available() else 'cpu')
print("Evaluation models initialized.")


# Cos similarity on two bags of words over given threshold
def cos_similarity(a: str, b: str, min_val: float = 0.0, max_val: float = 1.0) -> float:
    """Pure cosine similarity between two raw strings (bag-of-words)."""
    c1, c2 = Counter(a.split()), Counter(b.split())
    vocab = set(c1) | set(c2)
    dot   = sum(c1[w] * c2[w] for w in vocab)
    n1    = math.sqrt(sum(c1[w] ** 2 for w in vocab))
    n2    = math.sqrt(sum(c2[w] ** 2 for w in vocab))
    val   = dot / (n1 * n2) if n1 and n2 else 0.0
    if val >= min_val and val <= max_val:
        return val
    return None


def safe_bert_score(gt: str, pred: str) -> float:
    """ A wrapper for BERTScore to handle potential errors and empty strings. """
    if not all(isinstance(s, str) for s in [gt, pred]) or not gt.strip() or not pred.strip():
        return 0.0
    try:
        # BERTScore returns Precision, Recall, and F1. We use F1.
        _, _, f1 = BERT_SCORER.score([pred], [gt])
        return f1.item()
    except Exception as e:
        print(f"Warning: BERTScore failed with error: {e}")
        return 0.0

def evaluate_trace_eng(gt_solution: str, pred_generation: str) -> Dict[str, Any]:
    """

    Compares a ground-truth engineering solution with a model's generation.



    This function calculates final answer accuracy, reasoning step recall/precision

    (ChainEval metric), and standard text similarity scores.



    Args:

        gt_solution: The ground-truth solution text.

        pred_generation: The model-generated solution text.



    Returns:

        A dictionary containing all calculated evaluation metrics.

    """
    if not gt_solution or not pred_generation:
        return {
            'error': 'Input solution or generation is empty.',
            'recall': 0, 'precision': 0, 'step_f1': 0, 'final_answer_match': 0,
            'rouge2': 0, 'rougeL': 0, 'rougeLsum': 0, 'bertscore': 0
        }
    
    #  1. Standard Text Similarity Metrics 
    rouge_scores = ROUGE_SCORER.score(gt_solution, pred_generation)
    bertscore = safe_bert_score(gt_solution, pred_generation)
    
    #  2. Parse Both Traces using the Engineering Parser 
    gt_steps, gt_step_answers, gt_final_answer = extract_steps(gt_solution)
    pred_steps, pred_step_answers, pred_final_answer = extract_steps(pred_generation)

    #  3. Final Answer Match 
    final_answer_match = 0
    # Use a 1% relative tolerance for final answers, common in engineering.
    FINAL_ANSWER_TOLERANCE = 0.01
    if gt_final_answer is not None and pred_final_answer is not None:
        if abs(gt_final_answer) > 1e-9:  # Avoid division by zero for non-zero answers
            if abs(gt_final_answer - pred_final_answer) / abs(gt_final_answer) < FINAL_ANSWER_TOLERANCE:
                final_answer_match = 1
        else:  # Handle cases where the answer is zero
            if abs(gt_final_answer - pred_final_answer) < 1e-9:
                final_answer_match = 1

    #  4. Reasoning Step Evaluation (ChainEval Metric) 
    if not gt_steps or not pred_steps:
        # Cannot calculate recall/precision if either trace has no steps
        recall, precision = 0, 0
    else:
        # Embed all step texts for semantic comparison
        gt_embeddings = SENTENCE_MODEL.encode(gt_steps)
        pred_embeddings = SENTENCE_MODEL.encode(pred_steps)

        # Create a numeric correctness matrix: 1 if numbers match, 0 otherwise
        numeric_correctness = np.zeros((len(gt_steps), len(pred_steps)))
        STEP_ANSWER_TOLERANCE = 0.02  # Use a slightly higher 2% tolerance for intermediate steps
        for i, gt_ans in enumerate(gt_step_answers):
            if gt_ans is None: continue
            for j, pred_ans in enumerate(pred_step_answers):
                if pred_ans is None: continue
                
                if abs(gt_ans) > 1e-9:
                    if (abs(gt_ans - pred_ans) / abs(gt_ans)) < STEP_ANSWER_TOLERANCE:
                        numeric_correctness[i, j] = 1
                elif abs(gt_ans - pred_ans) < 1e-9:
                    numeric_correctness[i, j] = 1
        
        # Calculate semantic similarity matrix
        semantic_similarity = cosine_similarity(gt_embeddings, pred_embeddings)
        
        # Combine matrices: similarity is only valid if numbers are also correct
        combined_matrix = np.multiply(semantic_similarity, numeric_correctness)
        
        # Calculate recall and precision
        SIMILARITY_THRESHOLD = 0.7  # Threshold for considering a step as "matched"
        recall = float(np.sum(np.max(combined_matrix, axis=1) > SIMILARITY_THRESHOLD) / len(gt_steps))
        precision = float(np.sum(np.max(combined_matrix, axis=0) > SIMILARITY_THRESHOLD) / len(pred_steps))

    #  5. Compile and Return All Metrics 
    step_f1 = 0
    if recall + precision > 0:
        step_f1 = 2 * (recall * precision) / (recall + precision)

    return {
        'recall': recall,
        'precision': precision,
        'step_f1': step_f1,
        'final_answer_match': final_answer_match,
        'rouge2': rouge_scores['rouge2'].fmeasure,
        'rougeL': rouge_scores['rougeL'].fmeasure,
        'rougeLsum': rouge_scores['rougeLsum'].fmeasure,
        'bertscore': bertscore
    }