| """ |
| Comprehensive Japanese Counseling Model Benchmark Script |
| Based on KokoroChat paper evaluation methodology |
| """ |
|
|
| import torch |
| from transformers import AutoModelForCausalLM, AutoTokenizer |
| import numpy as np |
| from typing import List, Dict, Tuple, Optional, Any |
| import json |
| from tqdm import tqdm |
| import os |
| import gc |
| import warnings |
| from datetime import datetime |
| import pandas as pd |
| import matplotlib.pyplot as plt |
| import seaborn as sns |
| from collections import defaultdict |
| import MeCab |
| from rouge_score import rouge_scorer |
| from nltk.translate.bleu_score import sentence_bleu, corpus_bleu, SmoothingFunction |
| import sacrebleu |
| from bert_score import score as bert_score |
| import re |
| import statistics |
|
|
| warnings.filterwarnings('ignore') |
|
|
| |
| plt.style.use('seaborn-v0_8-darkgrid') |
| sns.set_palette("husl") |
|
|
| class JapaneseCounselingBenchmark: |
| """ |
| Comprehensive benchmark suite for Japanese counseling models |
| Following KokoroChat paper evaluation methodology |
| """ |
| |
| def __init__(self, |
| base_model_name: str = "LiquidAI/LFM2-1.2B", |
| finetuned_model_path: str = "./merged_counselor_model", |
| test_data_path: str = "./processed_data_score70/test.jsonl", |
| device: str = None): |
| """ |
| Initialize Japanese counseling benchmark |
| |
| Args: |
| base_model_name: Name/path of base model |
| finetuned_model_path: Path to fine-tuned merged model |
| test_data_path: Path to test dataset |
| device: Device to run on (cuda/cpu) |
| """ |
| self.base_model_name = base_model_name |
| self.finetuned_model_path = finetuned_model_path |
| self.test_data_path = test_data_path |
| self.device = device or ("cuda" if torch.cuda.is_available() else "cpu") |
| |
| print("="*80) |
| print("🎌 Japanese Counseling Model Benchmark Suite") |
| print("="*80) |
| print(f"📍 Device: {self.device}") |
| if self.device == "cuda": |
| print(f" GPU: {torch.cuda.get_device_name(0)}") |
| print(f" Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB") |
| |
| |
| try: |
| self.mecab = MeCab.Tagger("-Owakati") |
| print("✅ MeCab initialized for Japanese tokenization") |
| except: |
| print("⚠️ MeCab not available. Install with: apt-get install mecab libmecab-dev mecab-ipadic-utf8") |
| print(" and: pip install mecab-python3") |
| print(" Using fallback character-level tokenization") |
| self.mecab = None |
| |
| |
| self.rouge_scorer = rouge_scorer.RougeScorer( |
| ['rouge1', 'rouge2', 'rougeL'], |
| use_stemmer=False |
| ) |
| |
| |
| self.smoothing = SmoothingFunction().method1 |
| |
| |
| self.results = {} |
| self.detailed_results = [] |
| |
| def tokenize_japanese(self, text: str) -> List[str]: |
| """ |
| Tokenize Japanese text using MeCab or fallback method |
| |
| Args: |
| text: Japanese text to tokenize |
| |
| Returns: |
| List of tokens |
| """ |
| if self.mecab: |
| try: |
| |
| tokens = self.mecab.parse(text).strip().split() |
| return tokens if tokens else list(text) |
| except: |
| |
| pass |
| |
| |
| |
| text = re.sub(r'[。、!?\n\s]', ' ', text) |
| |
| words = text.split() |
| if words: |
| |
| tokens = [] |
| for word in words: |
| if len(word) <= 4: |
| tokens.append(word) |
| else: |
| tokens.extend(list(word)) |
| return tokens |
| else: |
| |
| return list(text.replace(' ', '')) |
| |
| def load_test_data(self, max_samples: Optional[int] = None) -> List[Dict]: |
| """ |
| Load test dataset |
| |
| Args: |
| max_samples: Maximum number of samples to load |
| |
| Returns: |
| List of test examples |
| """ |
| print(f"\n📚 Loading test data from {self.test_data_path}") |
| |
| test_data = [] |
| |
| if not os.path.exists(self.test_data_path): |
| print(f"❌ Test data not found at {self.test_data_path}") |
| print(" Creating synthetic test data for demonstration...") |
| return self.create_synthetic_test_data() |
| |
| with open(self.test_data_path, 'r', encoding='utf-8') as f: |
| for i, line in enumerate(f): |
| if max_samples and i >= max_samples: |
| break |
| try: |
| data = json.loads(line) |
| |
| |
| text = data.get('text', '') |
| |
| |
| if "### Input:" in text and "### Response:" in text: |
| parts = text.split("### Input:") |
| if len(parts) > 1: |
| input_part = parts[1].split("### Response:")[0].strip() |
| response_part = text.split("### Response:")[1].strip() |
| |
| test_data.append({ |
| 'input': input_part, |
| 'reference': response_part, |
| 'score': data.get('score', 0), |
| 'topic': data.get('topic', 'Unknown') |
| }) |
| except Exception as e: |
| print(f"⚠️ Error parsing line {i}: {e}") |
| continue |
| |
| if not test_data: |
| print("⚠️ No valid test data found. Creating synthetic data...") |
| return self.create_synthetic_test_data() |
| |
| print(f"✅ Loaded {len(test_data)} test examples") |
| return test_data |
| |
| def create_synthetic_test_data(self) -> List[Dict]: |
| """Create synthetic test data for demonstration""" |
| synthetic_data = [ |
| { |
| 'input': '最近ストレスを感じています。', |
| 'reference': 'ストレスを感じているのですね。それは大変つらいことだと思います。どのような状況でストレスを感じることが多いですか?', |
| 'score': 75, |
| 'topic': 'ストレス' |
| }, |
| { |
| 'input': '仕事がうまくいかなくて悩んでいます。', |
| 'reference': '仕事でお悩みなのですね。うまくいかないと感じると、本当に辛いですよね。具体的にどのような点で困難を感じていらっしゃいますか?', |
| 'score': 78, |
| 'topic': '仕事' |
| }, |
| { |
| 'input': '人間関係で困っています。', |
| 'reference': '人間関係の悩みは本当に心が疲れますよね。お気持ちお察しします。どのような関係性でお困りでしょうか?', |
| 'score': 80, |
| 'topic': '人間関係' |
| }, |
| { |
| 'input': '将来が不安です。', |
| 'reference': '将来への不安を抱えていらっしゃるのですね。先が見えない不安は、とても重く感じられることと思います。', |
| 'score': 72, |
| 'topic': '不安' |
| }, |
| { |
| 'input': '自信が持てません。', |
| 'reference': '自信が持てないというお気持ち、よくわかります。多くの方が同じような悩みを抱えています。', |
| 'score': 76, |
| 'topic': '自信' |
| } |
| ] |
| return synthetic_data |
| |
| def load_models(self): |
| """Load base and fine-tuned models""" |
| print("\n🤖 Loading models for benchmarking...") |
| |
| |
| print(" Loading tokenizer...") |
| try: |
| self.tokenizer = AutoTokenizer.from_pretrained(self.base_model_name) |
| except: |
| print(" Using GPT2 tokenizer as fallback...") |
| self.tokenizer = AutoTokenizer.from_pretrained("gpt2") |
| |
| if self.tokenizer.pad_token is None: |
| self.tokenizer.pad_token = self.tokenizer.eos_token |
| |
| |
| print(" Loading base model...") |
| try: |
| self.base_model = AutoModelForCausalLM.from_pretrained( |
| self.base_model_name, |
| torch_dtype=torch.float16 if self.device == "cuda" else torch.float32, |
| device_map="auto" if self.device == "cuda" else None, |
| trust_remote_code=True, |
| low_cpu_mem_usage=True |
| ) |
| except Exception as e: |
| print(f" ⚠️ Could not load base model {self.base_model_name}: {e}") |
| print(" Using GPT2 as fallback base model...") |
| self.base_model = AutoModelForCausalLM.from_pretrained( |
| "gpt2", |
| torch_dtype=torch.float16 if self.device == "cuda" else torch.float32, |
| device_map="auto" if self.device == "cuda" else None |
| ) |
| self.base_model.eval() |
| |
| |
| print(f" Loading fine-tuned model from {self.finetuned_model_path}...") |
| |
| |
| if not os.path.exists(self.finetuned_model_path): |
| print(f" ⚠️ Fine-tuned model not found at {self.finetuned_model_path}") |
| print(" Using base model for both comparisons (for demonstration)") |
| self.finetuned_model = self.base_model |
| else: |
| try: |
| self.finetuned_model = AutoModelForCausalLM.from_pretrained( |
| self.finetuned_model_path, |
| torch_dtype=torch.float16 if self.device == "cuda" else torch.float32, |
| device_map="auto" if self.device == "cuda" else None, |
| trust_remote_code=True, |
| low_cpu_mem_usage=True, |
| local_files_only=True |
| ) |
| self.finetuned_model.eval() |
| except Exception as e: |
| print(f" ⚠️ Error loading fine-tuned model: {e}") |
| print(" Using base model for comparison") |
| self.finetuned_model = self.base_model |
| |
| print("✅ Models loaded successfully!") |
| |
| def generate_response(self, model, prompt: str, max_length: int = 150) -> str: |
| """ |
| Generate response from model |
| |
| Args: |
| model: Model to use for generation |
| prompt: Input prompt |
| max_length: Maximum length of generated response |
| |
| Returns: |
| Generated response text |
| """ |
| |
| formatted_prompt = f"""### Instruction: |
| あなたは思いやりのある心理カウンセラーです。 |
| クライアントの感情を理解し、共感的で支援的な応答を提供してください。 |
| |
| ### Input: |
| {prompt} |
| |
| ### Response: |
| """ |
| |
| |
| inputs = self.tokenizer( |
| formatted_prompt, |
| return_tensors="pt", |
| truncation=True, |
| max_length=512 |
| ) |
| |
| if self.device == "cuda": |
| inputs = {k: v.cuda() for k, v in inputs.items()} |
| |
| |
| try: |
| with torch.no_grad(): |
| outputs = model.generate( |
| **inputs, |
| max_new_tokens=max_length, |
| temperature=0.7, |
| do_sample=True, |
| top_p=0.9, |
| repetition_penalty=1.1, |
| pad_token_id=self.tokenizer.pad_token_id, |
| eos_token_id=self.tokenizer.eos_token_id |
| ) |
| |
| |
| full_response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) |
| |
| |
| if "### Response:" in full_response: |
| response = full_response.split("### Response:")[-1].strip() |
| else: |
| response = full_response[len(formatted_prompt):].strip() |
| except Exception as e: |
| print(f" ⚠️ Generation error: {e}") |
| response = "申し訳ございません。応答を生成できませんでした。" |
| |
| return response |
| |
| def calculate_bleu_scores(self, reference: str, hypothesis: str) -> Dict[str, float]: |
| """ |
| Calculate BLEU scores using Japanese tokenization |
| |
| Args: |
| reference: Reference text |
| hypothesis: Generated text |
| |
| Returns: |
| Dictionary of BLEU scores |
| """ |
| |
| ref_tokens = self.tokenize_japanese(reference) |
| hyp_tokens = self.tokenize_japanese(hypothesis) |
| |
| |
| if not ref_tokens: |
| ref_tokens = ['empty'] |
| if not hyp_tokens: |
| hyp_tokens = ['empty'] |
| |
| |
| scores = {} |
| |
| try: |
| |
| for n in range(1, 5): |
| weights = tuple([1/n] * n + [0] * (4-n)) |
| score = sentence_bleu( |
| [ref_tokens], |
| hyp_tokens, |
| weights=weights, |
| smoothing_function=self.smoothing |
| ) |
| scores[f'BLEU-{n}'] = score |
| except Exception as e: |
| print(f" ⚠️ BLEU calculation error: {e}") |
| for n in range(1, 5): |
| scores[f'BLEU-{n}'] = 0.0 |
| |
| return scores |
| |
| def calculate_rouge_scores(self, reference: str, hypothesis: str) -> Dict[str, float]: |
| """ |
| Calculate ROUGE scores for Japanese text |
| |
| Args: |
| reference: Reference text |
| hypothesis: Generated text |
| |
| Returns: |
| Dictionary of ROUGE scores |
| """ |
| try: |
| |
| if self.mecab: |
| ref_tokenized = ' '.join(self.tokenize_japanese(reference)) |
| hyp_tokenized = ' '.join(self.tokenize_japanese(hypothesis)) |
| else: |
| |
| ref_tokenized = ' '.join(list(reference)) |
| hyp_tokenized = ' '.join(list(hypothesis)) |
| |
| |
| scores = self.rouge_scorer.score(ref_tokenized, hyp_tokenized) |
| |
| return { |
| 'ROUGE-1': scores['rouge1'].fmeasure, |
| 'ROUGE-2': scores['rouge2'].fmeasure, |
| 'ROUGE-L': scores['rougeL'].fmeasure |
| } |
| except Exception as e: |
| print(f" ⚠️ ROUGE calculation error: {e}") |
| return { |
| 'ROUGE-1': 0.0, |
| 'ROUGE-2': 0.0, |
| 'ROUGE-L': 0.0 |
| } |
| |
| def calculate_bert_score(self, references: List[str], hypotheses: List[str]) -> Dict[str, float]: |
| """ |
| Calculate BERTScore for semantic similarity |
| |
| Args: |
| references: List of reference texts |
| hypotheses: List of generated texts |
| |
| Returns: |
| Dictionary with BERTScore metrics |
| """ |
| try: |
| |
| P, R, F1 = bert_score( |
| hypotheses, |
| references, |
| lang='ja', |
| verbose=False, |
| device=self.device |
| ) |
| |
| return { |
| 'BERTScore_P': float(P.mean()), |
| 'BERTScore_R': float(R.mean()), |
| 'BERTScore_F1': float(F1.mean()) |
| } |
| except Exception as e: |
| print(f" ⚠️ BERTScore calculation failed: {e}") |
| print(" Install with: pip install bert-score") |
| return { |
| 'BERTScore_P': 0.0, |
| 'BERTScore_R': 0.0, |
| 'BERTScore_F1': 0.0 |
| } |
| |
| def evaluate_counseling_quality(self, response: str) -> Dict[str, float]: |
| """ |
| Evaluate counseling-specific qualities |
| Based on KokoroChat paper evaluation criteria |
| |
| Args: |
| response: Generated counseling response |
| |
| Returns: |
| Dictionary of counseling quality scores |
| """ |
| scores = {} |
| |
| |
| empathy_keywords = [ |
| 'わかります', '理解', '共感', 'お気持ち', 'つらい', |
| '大変', 'お察し', 'そうですね', 'なるほど', '感じ' |
| ] |
| empathy_score = sum(1 for keyword in empathy_keywords if keyword in response) |
| scores['empathy'] = min(empathy_score / 5.0, 1.0) |
| |
| |
| support_keywords = [ |
| 'サポート', '支援', '助け', '一緒に', '協力', |
| '応援', 'お手伝い', '力になり', '相談', '話を聞' |
| ] |
| support_score = sum(1 for keyword in support_keywords if keyword in response) |
| scores['support'] = min(support_score / 5.0, 1.0) |
| |
| |
| listening_indicators = ['?', 'でしょうか', 'ですか', 'いかがですか', 'どのような'] |
| scores['active_listening'] = 1.0 if any(ind in response for ind in listening_indicators) else 0.3 |
| |
| |
| positive_keywords = ['大丈夫', '良い', '素晴らしい', '頑張', '希望', '改善', '解決'] |
| positive_score = sum(1 for keyword in positive_keywords if keyword in response) |
| scores['positivity'] = min(positive_score / 3.0, 1.0) |
| |
| |
| response_length = len(response) |
| if 30 <= response_length <= 200: |
| scores['appropriateness'] = 1.0 |
| elif 20 <= response_length < 30 or 200 < response_length <= 300: |
| scores['appropriateness'] = 0.7 |
| else: |
| scores['appropriateness'] = 0.4 |
| |
| return scores |
| |
| def run_comprehensive_benchmark(self, num_samples: Optional[int] = None): |
| """ |
| Run comprehensive benchmark evaluation |
| |
| Args: |
| num_samples: Number of samples to evaluate (None for all) |
| """ |
| print("\n" + "="*80) |
| print("🚀 Running Comprehensive Benchmark") |
| print("="*80) |
| |
| |
| test_data = self.load_test_data(max_samples=num_samples) |
| |
| if not test_data: |
| raise ValueError("No test data available!") |
| |
| |
| base_metrics = defaultdict(list) |
| finetuned_metrics = defaultdict(list) |
| |
| |
| all_references = [] |
| all_base_responses = [] |
| all_finetuned_responses = [] |
| |
| print(f"\n📊 Evaluating {len(test_data)} test examples...") |
| print("-"*80) |
| |
| |
| for i, example in enumerate(tqdm(test_data, desc="Evaluating")): |
| input_text = example['input'] |
| reference = example['reference'] |
| |
| |
| base_response = self.generate_response(self.base_model, input_text) |
| finetuned_response = self.generate_response(self.finetuned_model, input_text) |
| |
| |
| all_references.append(reference) |
| all_base_responses.append(base_response) |
| all_finetuned_responses.append(finetuned_response) |
| |
| |
| base_bleu = self.calculate_bleu_scores(reference, base_response) |
| finetuned_bleu = self.calculate_bleu_scores(reference, finetuned_response) |
| |
| for key, value in base_bleu.items(): |
| base_metrics[key].append(value) |
| for key, value in finetuned_bleu.items(): |
| finetuned_metrics[key].append(value) |
| |
| |
| base_rouge = self.calculate_rouge_scores(reference, base_response) |
| finetuned_rouge = self.calculate_rouge_scores(reference, finetuned_response) |
| |
| for key, value in base_rouge.items(): |
| base_metrics[key].append(value) |
| for key, value in finetuned_rouge.items(): |
| finetuned_metrics[key].append(value) |
| |
| |
| base_quality = self.evaluate_counseling_quality(base_response) |
| finetuned_quality = self.evaluate_counseling_quality(finetuned_response) |
| |
| for key, value in base_quality.items(): |
| base_metrics[f'quality_{key}'].append(value) |
| for key, value in finetuned_quality.items(): |
| finetuned_metrics[f'quality_{key}'].append(value) |
| |
| |
| self.detailed_results.append({ |
| 'input': input_text, |
| 'reference': reference, |
| 'base_response': base_response, |
| 'finetuned_response': finetuned_response, |
| 'base_metrics': {**base_bleu, **base_rouge, **base_quality}, |
| 'finetuned_metrics': {**finetuned_bleu, **finetuned_rouge, **finetuned_quality} |
| }) |
| |
| |
| if i < 3: |
| print(f"\n📝 Example {i+1}:") |
| print(f"Input: {input_text[:100]}...") |
| print(f"Base BLEU-4: {base_bleu['BLEU-4']:.3f}, Fine-tuned BLEU-4: {finetuned_bleu['BLEU-4']:.3f}") |
| |
| |
| if len(all_references) > 0: |
| print("\n🧮 Calculating BERTScore...") |
| base_bert = self.calculate_bert_score(all_references, all_base_responses) |
| finetuned_bert = self.calculate_bert_score(all_references, all_finetuned_responses) |
| |
| for key, value in base_bert.items(): |
| base_metrics[key] = [value] * len(test_data) |
| for key, value in finetuned_bert.items(): |
| finetuned_metrics[key] = [value] * len(test_data) |
| |
| |
| self.results = self.calculate_aggregate_statistics(base_metrics, finetuned_metrics) |
| |
| |
| self.print_results() |
| |
| return self.results |
| |
| def calculate_aggregate_statistics(self, base_metrics: Dict, finetuned_metrics: Dict) -> Dict: |
| """ |
| Calculate aggregate statistics from collected metrics |
| |
| Args: |
| base_metrics: Base model metrics |
| finetuned_metrics: Fine-tuned model metrics |
| |
| Returns: |
| Dictionary of aggregate results |
| """ |
| results = { |
| 'metrics': {}, |
| 'improvements': {}, |
| 'summary': {} |
| } |
| |
| |
| all_metric_names = set(base_metrics.keys()) | set(finetuned_metrics.keys()) |
| |
| for metric in all_metric_names: |
| base_values = base_metrics.get(metric, [0]) |
| finetuned_values = finetuned_metrics.get(metric, [0]) |
| |
| results['metrics'][metric] = { |
| 'base': { |
| 'mean': float(np.mean(base_values)), |
| 'std': float(np.std(base_values)), |
| 'min': float(np.min(base_values)), |
| 'max': float(np.max(base_values)) |
| }, |
| 'finetuned': { |
| 'mean': float(np.mean(finetuned_values)), |
| 'std': float(np.std(finetuned_values)), |
| 'min': float(np.min(finetuned_values)), |
| 'max': float(np.max(finetuned_values)) |
| } |
| } |
| |
| |
| base_mean = np.mean(base_values) |
| finetuned_mean = np.mean(finetuned_values) |
| if base_mean > 0: |
| improvement = ((finetuned_mean - base_mean) / base_mean) * 100 |
| else: |
| improvement = 0 |
| |
| results['improvements'][metric] = improvement |
| |
| |
| bleu_metrics = [m for m in results['metrics'] if 'BLEU' in m] |
| rouge_metrics = [m for m in results['metrics'] if 'ROUGE' in m] |
| quality_metrics = [m for m in results['metrics'] if 'quality' in m] |
| |
| |
| results['summary'] = { |
| 'bleu_avg_improvement': np.mean([results['improvements'][m] for m in bleu_metrics]) if bleu_metrics else 0, |
| 'rouge_avg_improvement': np.mean([results['improvements'][m] for m in rouge_metrics]) if rouge_metrics else 0, |
| 'quality_avg_improvement': np.mean([results['improvements'][m] for m in quality_metrics]) if quality_metrics else 0, |
| 'overall_improvement': np.mean(list(results['improvements'].values())) if results['improvements'] else 0 |
| } |
| |
| return results |
| |
| def print_results(self): |
| """Print formatted benchmark results""" |
| print("\n" + "="*80) |
| print("📊 BENCHMARK RESULTS") |
| print("="*80) |
| |
| |
| bleu_metrics = sorted([m for m in self.results['metrics'] if 'BLEU' in m]) |
| rouge_metrics = sorted([m for m in self.results['metrics'] if 'ROUGE' in m]) |
| bert_metrics = sorted([m for m in self.results['metrics'] if 'BERT' in m]) |
| quality_metrics = sorted([m for m in self.results['metrics'] if 'quality' in m]) |
| |
| |
| if bleu_metrics: |
| print("\n📘 BLEU Scores:") |
| print("-"*60) |
| print(f"{'Metric':<15} {'Base Model':<20} {'Fine-tuned':<20} {'Improvement':<15}") |
| print("-"*60) |
| for metric in bleu_metrics: |
| base = self.results['metrics'][metric]['base']['mean'] |
| finetuned = self.results['metrics'][metric]['finetuned']['mean'] |
| improvement = self.results['improvements'][metric] |
| print(f"{metric:<15} {base:.4f}±{self.results['metrics'][metric]['base']['std']:.3f} " |
| f"{finetuned:.4f}±{self.results['metrics'][metric]['finetuned']['std']:.3f} " |
| f"{improvement:+.1f}%") |
| |
| |
| if rouge_metrics: |
| print("\n📕 ROUGE Scores:") |
| print("-"*60) |
| for metric in rouge_metrics: |
| base = self.results['metrics'][metric]['base']['mean'] |
| finetuned = self.results['metrics'][metric]['finetuned']['mean'] |
| improvement = self.results['improvements'][metric] |
| print(f"{metric:<15} {base:.4f}±{self.results['metrics'][metric]['base']['std']:.3f} " |
| f"{finetuned:.4f}±{self.results['metrics'][metric]['finetuned']['std']:.3f} " |
| f"{improvement:+.1f}%") |
| |
| |
| if bert_metrics: |
| print("\n📗 BERTScore:") |
| print("-"*60) |
| for metric in bert_metrics: |
| base = self.results['metrics'][metric]['base']['mean'] |
| finetuned = self.results['metrics'][metric]['finetuned']['mean'] |
| improvement = self.results['improvements'][metric] |
| print(f"{metric:<15} {base:.4f} {finetuned:.4f} {improvement:+.1f}%") |
| |
| |
| if quality_metrics: |
| print("\n💬 Counseling Quality Metrics:") |
| print("-"*60) |
| for metric in quality_metrics: |
| base = self.results['metrics'][metric]['base']['mean'] |
| finetuned = self.results['metrics'][metric]['finetuned']['mean'] |
| improvement = self.results['improvements'][metric] |
| metric_name = metric.replace('quality_', '').capitalize() |
| print(f"{metric_name:<15} {base:.4f}±{self.results['metrics'][metric]['base']['std']:.3f} " |
| f"{finetuned:.4f}±{self.results['metrics'][metric]['finetuned']['std']:.3f} " |
| f"{improvement:+.1f}%") |
| |
| |
| print("\n" + "="*80) |
| print("📈 SUMMARY") |
| print("="*80) |
| print(f"Average BLEU Improvement: {self.results['summary']['bleu_avg_improvement']:+.1f}%") |
| print(f"Average ROUGE Improvement: {self.results['summary']['rouge_avg_improvement']:+.1f}%") |
| print(f"Average Quality Improvement: {self.results['summary']['quality_avg_improvement']:+.1f}%") |
| print(f"Overall Improvement: {self.results['summary']['overall_improvement']:+.1f}%") |
| print("="*80) |
| |
| def save_results(self, output_dir: str = "./benchmark_results"): |
| """Save all benchmark results""" |
| os.makedirs(output_dir, exist_ok=True) |
| |
| |
| with open(os.path.join(output_dir, "detailed_results.json"), 'w', encoding='utf-8') as f: |
| json.dump(self.detailed_results, f, ensure_ascii=False, indent=2, default=str) |
| |
| |
| with open(os.path.join(output_dir, "aggregate_results.json"), 'w', encoding='utf-8') as f: |
| json.dump(self.results, f, ensure_ascii=False, indent=2, default=str) |
| |
| print(f"✅ Results saved to {output_dir}/") |
|
|
|
|
| def main(): |
| """Main execution function""" |
| import argparse |
| |
| parser = argparse.ArgumentParser(description='Japanese Counseling Model Benchmark') |
| parser.add_argument('--base_model', type=str, default='LiquidAI/LFM2-1.2B', |
| help='Base model name or path') |
| parser.add_argument('--finetuned_model', type=str, default='./merged_counselor_model', |
| help='Path to fine-tuned merged model') |
| parser.add_argument('--test_data', type=str, default='./processed_data_score70/test.jsonl', |
| help='Path to test data') |
| parser.add_argument('--num_samples', type=int, default=None, |
| help='Number of samples to evaluate (None for all)') |
| parser.add_argument('--output_dir', type=str, default='./benchmark_results', |
| help='Directory to save results') |
| |
| args = parser.parse_args() |
| |
| try: |
| |
| print("🎌 Initializing Japanese Counseling Benchmark Suite") |
| benchmark = JapaneseCounselingBenchmark( |
| base_model_name=args.base_model, |
| finetuned_model_path=args.finetuned_model, |
| test_data_path=args.test_data |
| ) |
| |
| |
| benchmark.load_models() |
| |
| |
| results = benchmark.run_comprehensive_benchmark(num_samples=args.num_samples) |
| |
| |
| benchmark.save_results(args.output_dir) |
| |
| print("\n✅ Benchmark completed successfully!") |
| print(f"📁 Results saved to {args.output_dir}/") |
| |
| except Exception as e: |
| print(f"\n❌ Error during benchmarking: {e}") |
| import traceback |
| traceback.print_exc() |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|