llm_eval3 / app.py
kland's picture
Upload app.py
9f48c7f verified
# app.py
from flask import Flask, render_template, request, session, redirect, url_for
from flask_session import Session
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import nltk
from rouge_score import rouge_scorer
from sacrebleu.metrics import BLEU
from datetime import datetime
import os
import math
import logging
import gc
import time
import re
print("AI ๋ชจ๋ธ๊ณผ ํ‰๊ฐ€ ์ง€ํ‘œ๋ฅผ ๋กœ๋”ฉํ•ฉ๋‹ˆ๋‹ค...")
try:
nltk_data_path = '/tmp/nltk_data'
nltk.download('punkt', download_dir=nltk_data_path, quiet=True)
nltk.data.path.append(nltk_data_path)
model_name = "EleutherAI/polyglot-ko-1.3b"
print(f"๋ชจ๋ธ ๋กœ๋”ฉ ์ค‘: {model_name}")
tokenizer = AutoTokenizer.from_pretrained(
model_name,
trust_remote_code=True
)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
low_cpu_mem_usage=True,
trust_remote_code=True
)
model.to(device)
# ๋ชจ๋ธ ์ตœ์ ํ™”
model.eval()
if torch.cuda.is_available():
model.half()
scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
bleu = BLEU()
print("AI ๋ชจ๋ธ ๋กœ๋”ฉ ๋ฐ ์ตœ์ ํ™” ์™„๋ฃŒ.")
model_loaded = True
if torch.cuda.is_available():
print(f"GPU ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ๋Ÿ‰: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
except Exception as e:
print(f"๋ชจ๋ธ ๋กœ๋”ฉ ์ค‘ ์‹ฌ๊ฐํ•œ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {e}")
model_loaded = False
app = Flask(__name__)
app.config["SESSION_PERMANENT"] = False
app.config["SESSION_TYPE"] = "filesystem"
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', os.urandom(24))
Session(app)
log_handler = logging.FileHandler('report_log.txt', encoding='utf-8')
log_handler.setLevel(logging.INFO)
log_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S')
log_handler.setFormatter(log_formatter)
app.logger.addHandler(log_handler)
app.logger.setLevel(logging.INFO)
def is_structured_text(text):
"""๋ฌธ์„œ๊ฐ€ ๊ตฌ์กฐํ™”๋œ ํ˜•์‹์ธ์ง€ ํŒ๋‹จ"""
lines = text.split('\n')
# ๊ตฌ์กฐํ™” ํŒจํ„ด ์ฒดํฌ
structure_indicators = 0
# ๋ฒˆํ˜ธ ๋งค๊ธฐ๊ธฐ ํŒจํ„ด (1., 1), (1), ๊ฐ€., ๊ฐ€), โ‘ , ใ‰  ๋“ฑ)
numbering_patterns = [
r'^\s*\d+[\.\)]\s+', # 1. or 1)
r'^\s*\(\d+\)\s+', # (1)
r'^\s*[๊ฐ€-ํžฃ][\.\)]\s+', # ๊ฐ€. or ๊ฐ€)
r'^\s*[โ‘ -โ‘ณ]\s+', # โ‘ 
r'^\s*[โ…ฐ-โ…น][\.\)]\s+', # โ…ฐ. or โ…ฐ)
r'^\s*[a-zA-Z][\.\)]\s+', # a. or A)
r'^\s*[-โ€ขโ€ฃโƒ]\s+', # bullet points
]
# ์ œ๋ชฉ/ํ—ค๋” ํŒจํ„ด
header_patterns = [
r'^#{1,6}\s+', # Markdown headers
r'^[์ œ็ฌฌ]\s*\d+\s*[์žฅ์ ˆ๊ด€์กฐํ•ญ]', # ์ œ1์žฅ, ์ œ2์ ˆ ๋“ฑ
r'^\d+\.\s+\w+', # 1. ์„œ๋ก 
r'^[โ… โ…กโ…ขโ…ฃโ…คโ…ฅโ…ฆโ…งโ…จโ…ฉโ…ชโ…ซโ…ฉโ…ขโ…ฉโ…ฃโ…ฉโ…คโ…ฉโ…ฅโ…ฉโ…ฆโ…ฉโ…งโ…ฉโ…จโ…ฉโ…ฉ][\.\s]', # ๋กœ๋งˆ ์ˆซ์ž
]
total_lines = len([l for l in lines if l.strip()])
if total_lines == 0:
return False
numbered_lines = 0
header_lines = 0
for line in lines:
if not line.strip():
continue
# ๋ฒˆํ˜ธ ๋งค๊ธฐ๊ธฐ ์ฒดํฌ
for pattern in numbering_patterns:
if re.match(pattern, line):
numbered_lines += 1
break
# ํ—ค๋” ์ฒดํฌ
for pattern in header_patterns:
if re.match(pattern, line):
header_lines += 1
break
# ๊ตฌ์กฐํ™” ๋น„์œจ ๊ณ„์‚ฐ
structure_ratio = (numbered_lines + header_lines) / total_lines
# 20% ์ด์ƒ์ด ๊ตฌ์กฐํ™”๋˜์–ด ์žˆ์œผ๋ฉด ๊ตฌ์กฐํ™”๋œ ๋ฌธ์„œ๋กœ ํŒ๋‹จ
return structure_ratio > 0.2
def validate_ppl_text(text):
text_len = len(text)
if text_len < 2000:
return {"valid": False, "message": f"ํ…์ŠคํŠธ๊ฐ€ ๋„ˆ๋ฌด ์งง์Šต๋‹ˆ๋‹ค. ํ˜„์žฌ {text_len}์ž, ์ตœ์†Œ 2000์ž ์ด์ƒ ์ž…๋ ฅํ•ด์ฃผ์„ธ์š”."}
# ๊ตฌ์กฐํ™”๋œ ํ…์ŠคํŠธ๋Š” ๋ฐ˜๋ณต ๊ฒ€์‚ฌ ์™„ํ™”
is_structured = is_structured_text(text)
# ๋ฌธ์ž ์ˆ˜์ค€ ๋ฐ˜๋ณต ํŒจํ„ด ๊ฒ€์‚ฌ (์—ฐ์†๋œ ๋™์ผ ๋ฌธ์ž)
char_repetitions = 0
max_consecutive = 0
current_consecutive = 1
for i in range(1, len(text)):
if text[i] == text[i-1] and text[i] not in ' \n\t': # ๊ณต๋ฐฑ ์ œ์™ธ
current_consecutive += 1
max_consecutive = max(max_consecutive, current_consecutive)
else:
if current_consecutive > 10: # 10์ž ์ด์ƒ ์—ฐ์†๋˜๋ฉด ๋ฐ˜๋ณต์œผ๋กœ ๊ฐ„์ฃผ
char_repetitions += current_consecutive
current_consecutive = 1
char_repetition_ratio = char_repetitions / text_len
# ๊ตฌ์กฐํ™”๋œ ๋ฌธ์„œ๋Š” ๊ธฐ์ค€ ์™„ํ™”
repetition_threshold = 0.5 if is_structured else 0.3
if char_repetition_ratio > repetition_threshold:
return {"valid": False, "message": f"๋ฐ˜๋ณต ๋ฌธ์ž๊ฐ€ ๋„ˆ๋ฌด ๋งŽ์Šต๋‹ˆ๋‹ค ({char_repetition_ratio*100:.1f}%). ์ •์ƒ์ ์ธ ํ…์ŠคํŠธ๋ฅผ ์ž…๋ ฅํ•ด์ฃผ์„ธ์š”."}
# ๋‹จ์–ด ์ˆ˜์ค€ ๋ฐ˜๋ณต ๊ฒ€์‚ฌ
words = text.split()
if len(words) > 0:
# ๊ตฌ์กฐํ™”๋œ ๋ฌธ์„œ์˜ ๊ฒฝ์šฐ ์งง์€ ๋ฌธ๊ตฌ(๋ฒˆํ˜ธ, ์ œ๋ชฉ ๋“ฑ) ์ œ์™ธ
if not is_structured:
bigrams = [' '.join(words[i:i+2]) for i in range(len(words) - 1)]
trigrams = [' '.join(words[i:i+3]) for i in range(len(words) - 2)]
bigram_unique_ratio = len(set(bigrams)) / len(bigrams) if bigrams else 1
trigram_unique_ratio = len(set(trigrams)) / len(trigrams) if trigrams else 1
if bigram_unique_ratio < 0.5 or trigram_unique_ratio < 0.6:
return {"valid": False, "message": "๋ฐ˜๋ณต๋˜๋Š” ๋‹จ์–ด ํŒจํ„ด์ด ๋„ˆ๋ฌด ๋งŽ์Šต๋‹ˆ๋‹ค. ๋‹ค์–‘ํ•œ ๋‚ด์šฉ์˜ ํ…์ŠคํŠธ๋ฅผ ์ž…๋ ฅํ•ด์ฃผ์„ธ์š”."}
# ํ† ํฐ ์ˆ˜์ค€ ๋ฐ˜๋ณต ๊ฒ€์‚ฌ (๊ธฐ์กด ์ฝ”๋“œ ์œ ์ง€ ๋ฐ ๊ฐ•ํ™”)
tokens = tokenizer.convert_ids_to_tokens(tokenizer(text, max_length=1024, truncation=True).input_ids)
# ๊ตฌ์กฐํ™”๋œ ๋ฌธ์„œ๋Š” n-gram ๊ฒ€์‚ฌ ๊ธฐ์ค€ ์™„ํ™”
for n in range(2, 7):
if len(tokens) >= n:
ngrams = [tuple(tokens[i:i+n]) for i in range(len(tokens) - n + 1)]
if ngrams:
unique_ratio = len(set(ngrams)) / len(ngrams)
# ๊ตฌ์กฐํ™”๋œ ๋ฌธ์„œ๋Š” ๊ธฐ์ค€ ์™„ํ™”
if is_structured:
threshold = 0.2 + (n - 2) * 0.1 # ๋” ๋‚ฎ์€ ๊ธฐ์ค€
else:
threshold = 0.3 + (n - 2) * 0.1
if unique_ratio < threshold:
return {"valid": False, "message": f"๋ฐ˜๋ณต๋˜๋Š” {n}-gram ํŒจํ„ด์ด ๋„ˆ๋ฌด ๋งŽ์Šต๋‹ˆ๋‹ค. ๋‹ค์–‘ํ•œ ๋‚ด์šฉ์˜ ํ…์ŠคํŠธ๋ฅผ ์ž…๋ ฅํ•ด์ฃผ์„ธ์š”."}
word_count = len(words)
structure_msg = " (๊ตฌ์กฐํ™”๋œ ๋ฌธ์„œ๋กœ ๊ฐ์ง€๋จ)" if is_structured else ""
return {"valid": True, "message": f"โœ… ๊ฒ€์ฆ ์™„๋ฃŒ: {text_len}์ž, {word_count}๋‹จ์–ด{structure_msg}"}
def calculate_perplexity_logic(text, max_tokens=512, use_sliding_window=False):
encodings = tokenizer(text, return_tensors="pt", max_length=max_tokens, truncation=True)
input_ids = encodings.input_ids[0].to(device)
if len(input_ids) < 10:
raise ValueError("ํ† ํฐ ์ˆ˜๊ฐ€ ๋„ˆ๋ฌด ์ ์Šต๋‹ˆ๋‹ค (์ตœ์†Œ 10๊ฐœ)")
tokens = tokenizer.convert_ids_to_tokens(input_ids)
# ๊ตฌ์กฐํ™”๋œ ํ…์ŠคํŠธ ์—ฌ๋ถ€ ํ™•์ธ
is_structured = is_structured_text(text)
# GPT ์Šคํƒ€์ผ ํ…์ŠคํŠธ ๊ฐ์ง€ (๊ณ ํ’ˆ์งˆ ํ…์ŠคํŠธ ํŠน์ง•)
is_high_quality = detect_high_quality_text(text)
# ๋ฐ˜๋ณต ํŽ˜๋„ํ‹ฐ ๊ณ„์‚ฐ (๊ฐœ์„ )
repetition_penalties = {}
# ์˜๋ฏธ์žˆ๋Š” ๋ฐ˜๋ณต๋งŒ ์ฒดํฌ (๋ฌด์˜๋ฏธํ•œ ๋ฐ˜๋ณต vs ์˜๋ฏธ์žˆ๋Š” ๋ฐ˜๋ณต ๊ตฌ๋ถ„)
char_repetitions = 0
# ์—ฐ์†๋œ ๋™์ผ ๋ฌธ์ž๋งŒ ์ฒดํฌ (์•„์•„์•„์•„ ๊ฐ™์€ ํŒจํ„ด)
for i in range(1, min(len(text), 1000)):
if i < len(text) and text[i] == text[i-1] and text[i] not in ' \n\t.,!?;:':
char_repetitions += 1
char_penalty = min(char_repetitions / 1000, 0.5) # ์ตœ๋Œ€ 0.5๋กœ ์ œํ•œ
# ํ† ํฐ ์ˆ˜์ค€ ๋ฐ˜๋ณต ์ฒดํฌ (์™„ํ™”)
for n in range(3, 6): # 2-gram ์ œ์™ธ, 3-gram๋ถ€ํ„ฐ ์ฒดํฌ
if len(tokens) >= n:
ngrams = [tuple(tokens[i:i+n]) for i in range(len(tokens) - n + 1)]
if ngrams:
unique_ratio = len(set(ngrams)) / len(ngrams)
# ํ•œ๊ธ€ ํ…์ŠคํŠธ๋Š” ๊ธฐ์ค€ ์™„ํ™”
repetition_penalties[f'{n}gram'] = max(0, (1 - unique_ratio) * 0.5)
# ์–ดํœ˜ ๋‹ค์–‘์„ฑ ์ฒดํฌ (ํ•œ๊ธ€ ํŠน์„ฑ ๊ณ ๋ ค)
unique_tokens = len(set(tokens))
total_tokens = len(tokens)
vocabulary_diversity = unique_tokens / total_tokens
# ํ•œ๊ธ€์€ ํ† ํฐํ™” ์‹œ ๋” ์ž˜๊ฒŒ ์ชผ๊ฐœ์ง€๋ฏ€๋กœ ๋‹ค์–‘์„ฑ ๊ธฐ์ค€ ์™„ํ™”
expected_diversity = 0.3 if any(ord(c) > 0x3130 for c in text[:100]) else 0.5
diversity_penalty = max(0, expected_diversity - vocabulary_diversity)
# ๊ณ ํ’ˆ์งˆ/๊ตฌ์กฐํ™” ํ…์ŠคํŠธ ๋ณด๋„ˆ์Šค
quality_bonus = 1.0
if is_high_quality:
quality_bonus = 0.5 # ๊ณ ํ’ˆ์งˆ ํ…์ŠคํŠธ๋Š” ํŽ˜๋„ํ‹ฐ 50% ๊ฐ์†Œ
elif is_structured:
quality_bonus = 0.7 # ๊ตฌ์กฐํ™” ํ…์ŠคํŠธ๋Š” ํŽ˜๋„ํ‹ฐ 30% ๊ฐ์†Œ
# ์ข…ํ•ฉ ํŽ˜๋„ํ‹ฐ ๊ณ„์‚ฐ (ํฌ๊ฒŒ ์™„ํ™”)
avg_repetition = sum(repetition_penalties.values()) / max(len(repetition_penalties), 1)
# ์‹ค์ œ ๋ฌด์˜๋ฏธํ•œ ๋ฐ˜๋ณต์ด ์žˆ์„ ๋•Œ๋งŒ ๊ฐ•ํ•œ ํŽ˜๋„ํ‹ฐ
if char_penalty > 0.3: # ์—ฐ์† ๋ฐ˜๋ณต์ด ๋งŽ์„ ๋•Œ
total_penalty = (avg_repetition * 2 + char_penalty * 3 + diversity_penalty)
penalty_factor = math.exp(total_penalty * 3.0)
else: # ์ผ๋ฐ˜์ ์ธ ๊ฒฝ์šฐ
total_penalty = (avg_repetition * 0.3 + char_penalty * 0.5 + diversity_penalty * 0.5) * quality_bonus
penalty_factor = math.exp(total_penalty * 1.5) # ์ง€์ˆ˜ ํฌ๊ฒŒ ์™„ํ™”
seq_len = input_ids.size(0)
with torch.no_grad():
if not use_sliding_window or seq_len <= 256:
outputs = model(input_ids.unsqueeze(0), labels=input_ids.unsqueeze(0))
ppl = torch.exp(outputs.loss).item()
else:
max_length = 256
stride = 128
nlls = []
for begin_loc in range(0, seq_len, stride):
end_loc = min(begin_loc + max_length, seq_len)
input_chunk = input_ids[begin_loc:end_loc].unsqueeze(0)
try:
outputs = model(input_chunk, labels=input_chunk)
if outputs.loss is not None and torch.isfinite(outputs.loss):
nlls.append(outputs.loss)
except Exception as chunk_error:
print(f"์ฒญํฌ ์ฒ˜๋ฆฌ ์˜ค๋ฅ˜: {chunk_error}")
continue
if not nlls:
raise RuntimeError("์œ ํšจํ•œ loss ๊ฐ’์„ ๊ณ„์‚ฐํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค")
ppl = torch.exp(torch.mean(torch.stack(nlls))).item()
# ๊ธฐ๋ณธ PPL ๋ณด์ • (๊ณ ํ’ˆ์งˆ ํ…์ŠคํŠธ)
if is_high_quality and ppl > 20:
ppl = ppl * 0.6 # ๊ณ ํ’ˆ์งˆ ํ…์ŠคํŠธ๋Š” ๊ธฐ๋ณธ PPL 40% ๊ฐ์†Œ
elif is_structured and ppl > 30:
ppl = ppl * 0.8 # ๊ตฌ์กฐํ™” ํ…์ŠคํŠธ๋Š” 20% ๊ฐ์†Œ
adjusted_ppl = ppl * penalty_factor
# ๊ทน๋‹จ์ ์ธ ๊ฐ’ ์ œํ•œ (์ƒํ•œ์„  ์„ค์ •)
if adjusted_ppl > 200 and char_penalty < 0.2:
# ๋ฐ˜๋ณต์ด ์ ์€๋ฐ PPL์ด ๋„ˆ๋ฌด ๋†’์œผ๋ฉด ์กฐ์ •
adjusted_ppl = min(adjusted_ppl, 150)
return {
'base_ppl': ppl,
'adjusted_ppl': adjusted_ppl,
'penalty_factor': penalty_factor,
'token_count': len(input_ids),
'vocabulary_diversity': vocabulary_diversity,
'char_repetition': char_penalty,
'is_structured': is_structured,
'is_high_quality': is_high_quality
}
def detect_high_quality_text(text):
"""๊ณ ํ’ˆ์งˆ ํ…์ŠคํŠธ(GPT ์ƒ์„ฑ ๋“ฑ) ๊ฐ์ง€"""
indicators = 0
# 1. ๋ฌธ๋‹จ ๊ตฌ์กฐ ์ฒดํฌ
paragraphs = text.split('\n\n')
if len(paragraphs) >= 3:
indicators += 1
# 2. ๋ฌธ์žฅ ์ข…๊ฒฐ ์ผ๊ด€์„ฑ
sentences = re.split(r'[.!?]\s', text)
if len(sentences) > 5:
# ๋Œ€๋ถ€๋ถ„ ์˜จ์ „ํ•œ ๋ฌธ์žฅ์œผ๋กœ ๋๋‚˜๋Š”์ง€
complete_sentences = sum(1 for s in sentences if len(s.strip()) > 10)
if complete_sentences / len(sentences) > 0.8:
indicators += 1
# 3. ์ ‘์†์‚ฌ/์ „ํ™˜์–ด ์‚ฌ์šฉ
transition_words = ['์ฒซ์งธ', '๋‘˜์งธ', '์…‹์งธ', '๋”ฐ๋ผ์„œ', '๊ทธ๋Ÿฌ๋‚˜', '๋˜ํ•œ', '์˜ˆ๋ฅผ ๋“ค์–ด',
'๊ฒฐ๋ก ์ ์œผ๋กœ', '๋ฌด์—‡๋ณด๋‹ค', '๋‚˜์•„๊ฐ€', '๋”๋ถˆ์–ด', 'ํŠนํžˆ', '์ด์— ๋”ฐ๋ผ']
transition_count = sum(1 for word in transition_words if word in text)
if transition_count >= 3:
indicators += 1
# 4. ์ „๋ฌธ ์šฉ์–ด ๋ฐ€๋„
professional_terms = ['์ฒด๊ณ„', '๊ตฌ์ถ•', '๊ธฐ๋ฐ˜', 'ํ™œ์šฉ', '๋ถ„์„', '๋ชจ๋ธ', '์‹œ์Šคํ…œ',
'ํ”„๋กœ์„ธ์Šค', '์ „๋žต', '๋ฐฉ์•ˆ', 'ํšจ๊ณผ', '๊ฐœ์„ ', '๊ณ ๋„ํ™”']
prof_count = sum(text.count(term) for term in professional_terms)
if prof_count > len(text.split()) * 0.02: # 2% ์ด์ƒ
indicators += 1
# 5. ๊ท ์ผํ•œ ๋ฌธ๋‹จ ๊ธธ์ด
if len(paragraphs) > 2:
lengths = [len(p) for p in paragraphs if p.strip()]
if lengths:
avg_length = sum(lengths) / len(lengths)
variance = sum((l - avg_length) ** 2 for l in lengths) / len(lengths)
if variance < (avg_length * 0.5) ** 2: # ๋ณ€๋™์ด ์ ์Œ
indicators += 1
# 3๊ฐœ ์ด์ƒ ์ง€ํ‘œ ์ถฉ์กฑ ์‹œ ๊ณ ํ’ˆ์งˆ ํ…์ŠคํŠธ๋กœ ํŒ๋‹จ
return indicators >= 3
def get_ppl_calculation_mode(text_length):
if text_length > 2000:
return "ultra_fast"
elif text_length > 1000:
return "fast"
else:
return "accurate"
def get_ppl_score(adjusted_ppl):
# 3์  ๋งŒ์ , 5๋‹จ๊ณ„ (20% ์ฐจ๋“ฑ)
if adjusted_ppl < 15: return 3.0 # 100%
elif adjusted_ppl < 30: return 2.4 # 80%
elif adjusted_ppl < 50: return 1.8 # 60%
elif adjusted_ppl < 100: return 1.2 # 40%
else: return 0.6 # 20%
def get_rouge_score(final_rouge_score):
# 3์  ๋งŒ์ , 5๋‹จ๊ณ„ (20% ์ฐจ๋“ฑ)
if final_rouge_score >= 0.60: return 3.0 # 100%
elif final_rouge_score >= 0.50: return 2.4 # 80%
elif final_rouge_score >= 0.40: return 1.8 # 60%
elif final_rouge_score >= 0.30: return 1.2 # 40%
else: return 0.6 # 20%
def get_bleu_score(bleu_score):
# 2์  ๋งŒ์ , 5๋‹จ๊ณ„ (20% ์ฐจ๋“ฑ)
if bleu_score >= 0.50: return 2.0 # 100%
elif bleu_score >= 0.40: return 1.6 # 80%
elif bleu_score >= 0.30: return 1.2 # 60%
elif bleu_score >= 0.20: return 0.8 # 40%
else: return 0.4 # 20%
def cleanup_memory():
if torch.cuda.is_available():
torch.cuda.empty_cache()
gc.collect()
@app.route('/', methods=['GET'])
def index():
all_results = session.get('all_results', {})
input_texts = session.get('input_texts', {})
return render_template('index.html', model_loaded=model_loaded, all_results=all_results, input_texts=input_texts)
@app.route('/evaluate', methods=['POST'])
def evaluate_text():
if 'all_results' not in session: session['all_results'] = {}
if 'input_texts' not in session: session['input_texts'] = {}
target_url = request.form.get('target_url')
if target_url: session['all_results']['target_url'] = target_url
metric = request.form.get('metric')
results_to_store = {'metric': metric}
try:
if metric == 'perplexity':
text = request.form.get('ppl_text', '').strip()
session['input_texts']['ppl_text'] = text
validation_result = validate_ppl_text(text)
if not validation_result["valid"]:
results_to_store['error'] = validation_result["message"]
elif not model_loaded:
results_to_store['error'] = "๋ชจ๋ธ์ด ๋กœ๋”ฉ๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค."
else:
try:
cleanup_memory()
calc_mode = get_ppl_calculation_mode(len(text))
start_time = time.time()
if calc_mode == "ultra_fast":
ppl_result = calculate_perplexity_logic(text, max_tokens=256, use_sliding_window=False)
elif calc_mode == "fast":
ppl_result = calculate_perplexity_logic(text, max_tokens=384, use_sliding_window=False)
else:
ppl_result = calculate_perplexity_logic(text, max_tokens=512, use_sliding_window=True)
calc_time = time.time() - start_time
adjusted_ppl = ppl_result['adjusted_ppl']
results_to_store['score_value'] = adjusted_ppl
results_to_store['score_display'] = f"{adjusted_ppl:.4f}"
results_to_store['details'] = {
'base_ppl': f"{ppl_result['base_ppl']:.4f}",
'penalty_factor': f"{ppl_result['penalty_factor']:.4f}",
'token_count': ppl_result['token_count'],
'calc_time': f"{calc_time:.2f}s",
'calc_mode': calc_mode,
'is_structured': ppl_result.get('is_structured', False)
}
results_to_store['final_score'] = get_ppl_score(adjusted_ppl)
cleanup_memory()
except Exception as ppl_error:
results_to_store['error'] = f"PPL ๊ณ„์‚ฐ ์ค‘ ์˜ค๋ฅ˜: {ppl_error}"
session['all_results']['perplexity'] = results_to_store
elif metric == 'rouge':
gen_text = request.form.get('rouge_generated', '').strip()
ref_text = request.form.get('rouge_reference', '').strip()
session['input_texts']['rouge_generated'] = gen_text
session['input_texts']['rouge_reference'] = ref_text
if not gen_text or not ref_text:
results_to_store['error'] = "์ƒ์„ฑ๋œ ์š”์•ฝ๋ฌธ๊ณผ ์ฐธ์กฐ ์š”์•ฝ๋ฌธ์„ ๋ชจ๋‘ ์ž…๋ ฅํ•ด์ฃผ์„ธ์š”."
else:
scores = scorer.score(ref_text, gen_text)
r1, r2, rL = scores['rouge1'].fmeasure, scores['rouge2'].fmeasure, scores['rougeL'].fmeasure
weighted_avg = (r1 * 0.3 + r2 * 0.3 + rL * 0.4)
len_gen = len(gen_text.split()); len_ref = len(ref_text.split())
length_ratio = len_gen / len_ref if len_ref > 0 else 0
if 0.8 <= length_ratio <= 1.2: length_penalty = 1.0
elif length_ratio < 0.5 or length_ratio > 2.0: length_penalty = 0.8
else: length_penalty = 0.9
final_rouge_score = weighted_avg * length_penalty
results_to_store['score_value'] = final_rouge_score
results_to_store['score_display'] = f"{final_rouge_score:.4f}"
results_to_store['details'] = {
'rouge1': f"{r1:.4f}",
'rouge2': f"{r2:.4f}",
'rougeL': f"{rL:.4f}",
'weighted_avg': f"{weighted_avg:.4f}",
'length_penalty': f"{length_penalty:.2f}"
}
results_to_store['final_score'] = get_rouge_score(final_rouge_score)
session['all_results']['rouge'] = results_to_store
elif metric == 'bleu':
gen_text = request.form.get('bleu_generated', '').strip()
ref_text1 = request.form.get('bleu_reference1', '').strip()
ref_text2 = request.form.get('bleu_reference2', '').strip()
session['input_texts']['bleu_generated'] = gen_text
session['input_texts']['bleu_reference1'] = ref_text1
session['input_texts']['bleu_reference2'] = ref_text2
if not gen_text or not ref_text1 or not ref_text2:
results_to_store['error'] = "์ƒ์„ฑ๋œ ๋ฒˆ์—ญ๋ฌธ๊ณผ 2๊ฐœ์˜ ์ฐธ์กฐ ๋ฒˆ์—ญ๋ฌธ์„ ๋ชจ๋‘ ์ž…๋ ฅํ•ด์ฃผ์„ธ์š”."
else:
try:
# sacrebleu ๋ฒ„์ „๋ณ„ ํ˜ธํ™˜์„ฑ ์ฒ˜๋ฆฌ
references = [[ref_text1, ref_text2]] # ์ฐธ์กฐ ๋ฒˆ์—ญ๋ฌธ์„ ๋ฆฌ์ŠคํŠธ๋กœ ๊ฐ์‹ธ๊ธฐ
# ์ƒˆ๋กœ์šด API ์‹œ๋„
try:
# sacrebleu 2.x ๋ฒ„์ „
bleu_score = bleu.sentence_score(gen_text, references).score / 100
except Exception:
# ๊ตฌ๋ฒ„์ „ ๋˜๋Š” ๋‹ค๋ฅธ ๋ฐฉ์‹ ์‹œ๋„
try:
# corpus_score ์‚ฌ์šฉ
bleu_score = bleu.corpus_score([gen_text], [references]).score / 100
except Exception:
# ๊ฐ€์žฅ ๊ธฐ๋ณธ์ ์ธ ๋ฐฉ์‹
from sacrebleu import sentence_bleu
bleu_score = sentence_bleu(gen_text, references).score / 100
results_to_store['score_value'] = bleu_score
results_to_store['score_display'] = f"{bleu_score:.4f}"
results_to_store['final_score'] = get_bleu_score(bleu_score)
except Exception as bleu_error:
# BLEU ๊ณ„์‚ฐ ์‹คํŒจ ์‹œ ๋Œ€์ฒด ๋ฐฉ๋ฒ•
try:
# nltk BLEU ์‚ฌ์šฉ
from nltk.translate.bleu_score import sentence_bleu as nltk_bleu
from nltk.translate.bleu_score import SmoothingFunction
gen_tokens = gen_text.split()
ref_tokens1 = ref_text1.split()
ref_tokens2 = ref_text2.split()
smoothing = SmoothingFunction().method1
bleu_score = nltk_bleu([ref_tokens1, ref_tokens2], gen_tokens, smoothing_function=smoothing)
results_to_store['score_value'] = bleu_score
results_to_store['score_display'] = f"{bleu_score:.4f}"
results_to_store['final_score'] = get_bleu_score(bleu_score)
except Exception as nltk_error:
results_to_store['error'] = f"BLEU ๊ณ„์‚ฐ ์ค‘ ์˜ค๋ฅ˜: {str(bleu_error)[:100]}"
session['all_results']['bleu'] = results_to_store
elif metric in ['mmlu', 'truthfulqa', 'drop', 'mbpp_humaneval']:
generated_text = request.form.get(f'{metric}_generated', '')
reference_text = request.form.get(f'{metric}_reference', '')
grade = request.form.get(f'{metric}_grade', '')
session['input_texts'][f'{metric}_generated'] = generated_text
session['input_texts'][f'{metric}_reference'] = reference_text
# ์กฐ์ •๋œ ๋ฐฐ์ : ๋ชจ๋‘ 3์  ๋งŒ์ 
max_score = 3
# 5๋‹จ๊ณ„ (20% ์ฐจ๋“ฑ) ์ ์šฉ
score_map = {
'์ˆ˜': 1.0, # 100%
'์šฐ': 0.8, # 80%
'๋ฏธ': 0.6, # 60%
'์–‘': 0.4, # 40%
'๊ฐ€': 0.2 # 20%
}
if grade and grade in score_map:
final_score = max_score * score_map[grade]
results_to_store['grade'] = grade
results_to_store['final_score'] = final_score
else:
results_to_store['grade'] = None
results_to_store['final_score'] = 0
if not grade:
results_to_store['error'] = "ํ‰๊ฐ€ ๋“ฑ๊ธ‰์„ ์„ ํƒํ•ด์ฃผ์„ธ์š”."
session['all_results'][metric] = results_to_store
except Exception as e:
results_to_store['error'] = f"๊ณ„์‚ฐ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {e}"
session['all_results'][metric] = results_to_store
app.logger.error(f"ํ‰๊ฐ€ ์ค‘ ์˜ค๋ฅ˜ - ๋ฉ”ํŠธ๋ฆญ: {metric}, ์˜ค๋ฅ˜: {e}")
session.modified = True
return redirect(url_for('index', _anchor=metric))
@app.route('/report')
def report():
all_results = session.get('all_results', {})
input_texts = session.get('input_texts', {})
try:
target_url = all_results.get('target_url', 'N/A')
total_score = sum(res.get('final_score', 0) for res in all_results.values() if isinstance(res, dict))
log_message = f"๋ณด๊ณ ์„œ ์ƒ์„ฑ - ๋Œ€์ƒ: {target_url}, ์ด์ : {total_score:.2f}/20"
app.logger.info(log_message)
except Exception as e:
app.logger.error(f"๋กœ๊ทธ ๊ธฐ๋ก ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {e}")
return render_template('report.html', all_results=all_results, input_texts=input_texts)
@app.route('/reset')
def reset():
session.pop('all_results', None)
session.pop('input_texts', None)
cleanup_memory()
return redirect(url_for('index'))
@app.route('/memory_status')
def memory_status():
status = {}
if torch.cuda.is_available():
status['gpu_allocated'] = f"{torch.cuda.memory_allocated() / 1024**3:.2f} GB"
status['gpu_reserved'] = f"{torch.cuda.memory_reserved() / 1024**3:.2f} GB"
import psutil
process = psutil.Process()
status['ram_usage'] = f"{process.memory_info().rss / 1024**3:.2f} GB"
return status
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860)