import torch import gradio as gr import spaces from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer import os from threading import Thread import random from datasets import load_dataset import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer import pandas as pd from typing import List, Tuple import json from datetime import datetime # GPU 메모리 관리 torch.cuda.empty_cache() # 환경 변수 설정 HF_TOKEN = os.environ.get("HF_TOKEN", None) MODEL_ID = "CohereForAI/c4ai-command-r7b-12-2024" MODELS = os.environ.get("MODELS") MODEL_NAME = MODEL_ID.split("/")[-1] # 모델과 토크나이저 로드 model = AutoModelForCausalLM.from_pretrained( MODEL_ID, torch_dtype=torch.bfloat16, device_map="auto", ) tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) # 위키피디아 데이터셋 로드 wiki_dataset = load_dataset("lcw99/wikipedia-korean-20240501-1million-qna") print("Wikipedia dataset loaded:", wiki_dataset) # TF-IDF 벡터라이저 초기화 및 학습 print("TF-IDF 벡터화 시작...") questions = wiki_dataset['train']['question'][:10000] # 처음 10000개만 사용 vectorizer = TfidfVectorizer(max_features=1000) question_vectors = vectorizer.fit_transform(questions) print("TF-IDF 벡터화 완료") class ChatHistory: def __init__(self): self.history = [] self.history_file = "/tmp/chat_history.json" self.load_history() def add_conversation(self, user_msg: str, assistant_msg: str): conversation = { "timestamp": datetime.now().isoformat(), "messages": [ {"role": "user", "content": user_msg}, {"role": "assistant", "content": assistant_msg} ] } self.history.append(conversation) self.save_history() def format_for_display(self): formatted = [] for conv in self.history: formatted.append([ conv["messages"][0]["content"], conv["messages"][1]["content"] ]) return formatted def get_messages_for_api(self): messages = [] for conv in self.history: messages.extend([ {"role": "user", "content": conv["messages"][0]["content"]}, {"role": "assistant", "content": conv["messages"][1]["content"]} ]) return messages def clear_history(self): self.history = [] self.save_history() def save_history(self): try: with open(self.history_file, 'w', encoding='utf-8') as f: json.dump(self.history, f, ensure_ascii=False, indent=2) except Exception as e: print(f"히스토리 저장 실패: {e}") def load_history(self): try: if os.path.exists(self.history_file): with open(self.history_file, 'r', encoding='utf-8') as f: self.history = json.load(f) except Exception as e: print(f"히스토리 로드 실패: {e}") self.history = [] # 전역 ChatHistory 인스턴스 생성 chat_history = ChatHistory() def find_relevant_context(query, top_k=3): # 쿼리 벡터화 query_vector = vectorizer.transform([query]) # 코사인 유사도 계산 similarities = (query_vector * question_vectors.T).toarray()[0] # 가장 유사한 질문들의 인덱스 top_indices = np.argsort(similarities)[-top_k:][::-1] # 관련 컨텍스트 추출 relevant_contexts = [] for idx in top_indices: if similarities[idx] > 0: relevant_contexts.append({ 'question': questions[idx], 'answer': wiki_dataset['train']['answer'][idx], 'similarity': similarities[idx] }) return relevant_contexts def analyze_file_content(content, file_type): """Analyze file content and return structural summary""" if file_type in ['parquet', 'csv']: try: lines = content.split('\n') header = lines[0] columns = header.count('|') - 1 rows = len(lines) - 3 return f"📊 데이터셋 구조: {columns}개 컬럼, {rows}개 데이터" except: return "❌ 데이터셋 구조 분석 실패" lines = content.split('\n') total_lines = len(lines) non_empty_lines = len([line for line in lines if line.strip()]) if any(keyword in content.lower() for keyword in ['def ', 'class ', 'import ', 'function']): functions = len([line for line in lines if 'def ' in line]) classes = len([line for line in lines if 'class ' in line]) imports = len([line for line in lines if 'import ' in line or 'from ' in line]) return f"💻 코드 구조: {total_lines}줄 (함수: {functions}, 클래스: {classes}, 임포트: {imports})" paragraphs = content.count('\n\n') + 1 words = len(content.split()) return f"📝 문서 구조: {total_lines}줄, {paragraphs}단락, 약 {words}단어" def read_uploaded_file(file): if file is None: return "", "" try: file_ext = os.path.splitext(file.name)[1].lower() if file_ext == '.parquet': df = pd.read_parquet(file.name, engine='pyarrow') content = df.head(10).to_markdown(index=False) return content, "parquet" elif file_ext == '.csv': encodings = ['utf-8', 'cp949', 'euc-kr', 'latin1'] for encoding in encodings: try: df = pd.read_csv(file.name, encoding=encoding) content = f"📊 데이터 미리보기:\n{df.head(10).to_markdown(index=False)}\n\n" content += f"\n📈 데이터 정보:\n" content += f"- 전체 행 수: {len(df)}\n" content += f"- 전체 열 수: {len(df.columns)}\n" content += f"- 컬럼 목록: {', '.join(df.columns)}\n" content += f"\n📋 컬럼 데이터 타입:\n" for col, dtype in df.dtypes.items(): content += f"- {col}: {dtype}\n" null_counts = df.isnull().sum() if null_counts.any(): content += f"\n⚠️ 결측치:\n" for col, null_count in null_counts[null_counts > 0].items(): content += f"- {col}: {null_count}개 누락\n" return content, "csv" except UnicodeDecodeError: continue raise UnicodeDecodeError(f"❌ 지원되는 인코딩으로 파일을 읽을 수 없습니다 ({', '.join(encodings)})") else: encodings = ['utf-8', 'cp949', 'euc-kr', 'latin1'] for encoding in encodings: try: with open(file.name, 'r', encoding=encoding) as f: content = f.read() return content, "text" except UnicodeDecodeError: continue raise UnicodeDecodeError(f"❌ 지원되는 인코딩으로 파일을 읽을 수 없습니다 ({', '.join(encodings)})") except Exception as e: return f"❌ 파일 읽기 오류: {str(e)}", "error" @spaces.GPU def stream_chat(message: str, history: list, uploaded_file, temperature: float, max_new_tokens: int, top_p: float, top_k: int, penalty: float): print(f'message is - {message}') print(f'history is - {history}') # 파일 업로드 처리 file_context = "" if uploaded_file: content, file_type = read_uploaded_file(uploaded_file) if content: file_context = f"\n\n업로드된 파일 내용:\n```\n{content}\n```" # 관련 컨텍스트 찾기 relevant_contexts = find_relevant_context(message) wiki_context = "\n\n관련 위키피디아 정보:\n" for ctx in relevant_contexts: wiki_context += f"Q: {ctx['question']}\nA: {ctx['answer']}\n유사도: {ctx['similarity']:.3f}\n\n" # 대화 히스토리 구성 conversation = [] for prompt, answer in history: conversation.extend([ {"role": "user", "content": prompt}, {"role": "assistant", "content": answer} ]) # 최종 프롬프트 구성 final_message = file_context + wiki_context + "\n현재 질문: " + message conversation.append({"role": "user", "content": final_message}) input_ids = tokenizer.apply_chat_template(conversation, tokenize=False, add_generation_prompt=True) inputs = tokenizer(input_ids, return_tensors="pt").to(0) streamer = TextIteratorStreamer(tokenizer, timeout=10., skip_prompt=True, skip_special_tokens=True) generate_kwargs = dict( inputs, streamer=streamer, top_k=top_k, top_p=top_p, repetition_penalty=penalty, max_new_tokens=max_new_tokens, do_sample=True, temperature=temperature, eos_token_id=[255001], ) thread = Thread(target=model.generate, kwargs=generate_kwargs) thread.start() buffer = "" for new_text in streamer: buffer += new_text yield buffer CSS = """ /* 전체 페이지 스타일링 */ body { background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); min-height: 100vh; font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; } /* ... (이전의 CSS 스타일 유지) ... */ """ with gr.Blocks(css=CSS) as demo: with gr.Row(): with gr.Column(scale=2): chatbot = gr.Chatbot(height=500) msg = gr.Textbox( label="메시지 입력", show_label=False, placeholder="무엇이든 물어보세요... 💭", container=False ) with gr.Row(): clear = gr.ClearButton([msg, chatbot], value="대화내용 지우기") send = gr.Button("보내기 📤") with gr.Column(scale=1): gr.Markdown("### 파일 업로드 📁") file_upload = gr.File( label="파일 선택", file_types=["text", ".csv", ".parquet"], type="filepath" ) with gr.Accordion("고급 설정 ⚙️", open=False): temperature = gr.Slider( minimum=0, maximum=1, step=0.1, value=0.8, label="온도", ) max_new_tokens = gr.Slider( minimum=128, maximum=8000, step=1, value=4000, label="최대 토큰 수", ) top_p = gr.Slider( minimum=0.0, maximum=1.0, step=0.1, value=0.8, label="상위 확률", ) top_k = gr.Slider( minimum=1, maximum=20, step=1, value=20, label="상위 K", ) penalty = gr.Slider( minimum=0.0, maximum=2.0, step=0.1, value=1.0, label="반복 패널티", ) # 예시 질문 gr.Examples( examples=[ ["한국의 전통 절기와 24절기에 대해 자세히 설명해주세요."], ["우리나라 전통 음식 중 건강에 좋은 발효음식 5가지를 추천하고 그 효능을 설명해주세요."], ["한국의 대표적인 산들을 소개하고, 각 산의 특징과 등산 코스를 추천해주세요."], ["사물놀이의 악기 구성과 장단에 대해 초보자도 이해하기 쉽게 설명해주세요."], ["한국의 전통 건축물에 담긴 과학적 원리를 현대적 관점에서 분석해주세요."], ["조선시대 과거 시험 제도를 현대의 입시 제도와 비교하여 설명해주세요."], ["한국의 4대 궁궐을 비교하여 각각의 특징과 역사적 의미를 설명해주세요."], ["한국의 전통 놀이를 현대적으로 재해석하여 실내에서 할 수 있는 방법을 제안해주세요."], ["한글 창제 과정과 훈민정음의 과학적 원리를 상세히 설명해주세요."], ["한국의 전통 차 문화에 대해 설명하고, 계절별로 어울리는 전통차를 추천해주세요."], ["한국의 전통 의복인 한복의 구조와 특징을 과학적, 미학적 관점에서 분석해주세요."], ["한국의 전통 가옥 구조를 기후와 환경 관점에서 분석하고, 현대 건축에 적용할 수 있는 요소를 제안해주세요."] ], inputs=msg, ) # 이벤트 바인딩 msg.submit( stream_chat, inputs=[msg, chatbot, file_upload, temperature, max_new_tokens, top_p, top_k, penalty], outputs=[msg, chatbot] ) send.click( stream_chat, inputs=[msg, chatbot, file_upload, temperature, max_new_tokens, top_p, top_k, penalty], outputs=[msg, chatbot] ) # 파일 업로드시 자동 분석 file_upload.change( lambda: "파일 분석을 시작합니다...", outputs=msg ).then( stream_chat, inputs=[msg, chatbot, file_upload, temperature, max_new_tokens, top_p, top_k, penalty], outputs=[msg, chatbot] ) if __name__ == "__main__": demo.launch()