import torch import gradio as gr import spaces from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer import os from threading import Thread import random from datasets import load_dataset import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer import pandas as pd from typing import List, Tuple import json from datetime import datetime # GPU 메모리 관리 torch.cuda.empty_cache() # 환경 변수 설정 HF_TOKEN = os.environ.get("HF_TOKEN", None) MODEL_ID = "CohereForAI/c4ai-command-r7b-12-2024" MODELS = os.environ.get("MODELS") MODEL_NAME = MODEL_ID.split("/")[-1] # 모델과 토크나이저 로드 model = AutoModelForCausalLM.from_pretrained( MODEL_ID, torch_dtype=torch.bfloat16, device_map="auto", ) tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) # 위키피디아 데이터셋 로드 wiki_dataset = load_dataset("lcw99/wikipedia-korean-20240501-1million-qna") print("Wikipedia dataset loaded:", wiki_dataset) # TF-IDF 벡터라이저 초기화 및 학습 print("TF-IDF 벡터화 시작...") questions = wiki_dataset['train']['question'][:10000] # 처음 10000개만 사용 vectorizer = TfidfVectorizer(max_features=1000) question_vectors = vectorizer.fit_transform(questions) print("TF-IDF 벡터화 완료") class ChatHistory: def __init__(self): self.history = [] self.history_file = "/tmp/chat_history.json" self.load_history() def add_conversation(self, user_msg: str, assistant_msg: str): conversation = { "timestamp": datetime.now().isoformat(), "messages": [ {"role": "user", "content": user_msg}, {"role": "assistant", "content": assistant_msg} ] } self.history.append(conversation) self.save_history() def format_for_display(self): formatted = [] for conv in self.history: formatted.append([ conv["messages"][0]["content"], conv["messages"][1]["content"] ]) return formatted def get_messages_for_api(self): messages = [] for conv in self.history: messages.extend([ {"role": "user", "content": conv["messages"][0]["content"]}, {"role": "assistant", "content": conv["messages"][1]["content"]} ]) return messages def clear_history(self): self.history = [] self.save_history() def save_history(self): try: with open(self.history_file, 'w', encoding='utf-8') as f: json.dump(self.history, f, ensure_ascii=False, indent=2) except Exception as e: print(f"히스토리 저장 실패: {e}") def load_history(self): try: if os.path.exists(self.history_file): with open(self.history_file, 'r', encoding='utf-8') as f: self.history = json.load(f) except Exception as e: print(f"히스토리 로드 실패: {e}") self.history = [] # 전역 ChatHistory 인스턴스 생성 chat_history = ChatHistory() def find_relevant_context(query, top_k=3): # 쿼리 벡터화 query_vector = vectorizer.transform([query]) # 코사인 유사도 계산 similarities = (query_vector * question_vectors.T).toarray()[0] # 가장 유사한 질문들의 인덱스 top_indices = np.argsort(similarities)[-top_k:][::-1] # 관련 컨텍스트 추출 relevant_contexts = [] for idx in top_indices: if similarities[idx] > 0: relevant_contexts.append({ 'question': questions[idx], 'answer': wiki_dataset['train']['answer'][idx], 'similarity': similarities[idx] }) return relevant_contexts def analyze_file_content(content, file_type): """Analyze file content and return structural summary""" if file_type in ['parquet', 'csv']: try: lines = content.split('\n') header = lines[0] columns = header.count('|') - 1 rows = len(lines) - 3 return f"📊 데이터셋 구조: {columns}개 컬럼, {rows}개 데이터" except: return "❌ 데이터셋 구조 분석 실패" lines = content.split('\n') total_lines = len(lines) non_empty_lines = len([line for line in lines if line.strip()]) if any(keyword in content.lower() for keyword in ['def ', 'class ', 'import ', 'function']): functions = len([line for line in lines if 'def ' in line]) classes = len([line for line in lines if 'class ' in line]) imports = len([line for line in lines if 'import ' in line or 'from ' in line]) return f"💻 코드 구조: {total_lines}줄 (함수: {functions}, 클래스: {classes}, 임포트: {imports})" paragraphs = content.count('\n\n') + 1 words = len(content.split()) return f"📝 문서 구조: {total_lines}줄, {paragraphs}단락, 약 {words}단어" def read_uploaded_file(file): if file is None: return "", "" try: file_ext = os.path.splitext(file.name)[1].lower() if file_ext == '.parquet': df = pd.read_parquet(file.name, engine='pyarrow') content = df.head(10).to_markdown(index=False) return content, "parquet" elif file_ext == '.csv': encodings = ['utf-8', 'cp949', 'euc-kr', 'latin1'] for encoding in encodings: try: df = pd.read_csv(file.name, encoding=encoding) content = f"📊 데이터 미리보기:\n{df.head(10).to_markdown(index=False)}\n\n" content += f"\n📈 데이터 정보:\n" content += f"- 전체 행 수: {len(df)}\n" content += f"- 전체 열 수: {len(df.columns)}\n" content += f"- 컬럼 목록: {', '.join(df.columns)}\n" content += f"\n📋 컬럼 데이터 타입:\n" for col, dtype in df.dtypes.items(): content += f"- {col}: {dtype}\n" null_counts = df.isnull().sum() if null_counts.any(): content += f"\n⚠️ 결측치:\n" for col, null_count in null_counts[null_counts > 0].items(): content += f"- {col}: {null_count}개 누락\n" return content, "csv" except UnicodeDecodeError: continue raise UnicodeDecodeError(f"❌ 지원되는 인코딩으로 파일을 읽을 수 없습니다 ({', '.join(encodings)})") else: encodings = ['utf-8', 'cp949', 'euc-kr', 'latin1'] for encoding in encodings: try: with open(file.name, 'r', encoding=encoding) as f: content = f.read() return content, "text" except UnicodeDecodeError: continue raise UnicodeDecodeError(f"❌ 지원되는 인코딩으로 파일을 읽을 수 없습니다 ({', '.join(encodings)})") except Exception as e: return f"❌ 파일 읽기 오류: {str(e)}", "error" @spaces.GPU def stream_chat(message: str, history: list, uploaded_file, temperature: float, max_new_tokens: int, top_p: float, top_k: int, penalty: float): try: print(f'message is - {message}') print(f'history is - {history}') # 파일 업로드 처리 file_context = "" if uploaded_file: content, file_type = read_uploaded_file(uploaded_file) if content: file_context = f"\n\n업로드된 파일 내용:\n```\n{content}\n```" # 관련 컨텍스트 찾기 relevant_contexts = find_relevant_context(message) wiki_context = "\n\n관련 위키피디아 정보:\n" for ctx in relevant_contexts: wiki_context += f"Q: {ctx['question']}\nA: {ctx['answer']}\n유사도: {ctx['similarity']:.3f}\n\n" # 대화 히스토리 구성 conversation = [] for prompt, answer in history: conversation.extend([ {"role": "user", "content": prompt}, {"role": "assistant", "content": answer} ]) # 최종 프롬프트 구성 final_message = file_context + wiki_context + "\n현재 질문: " + message conversation.append({"role": "user", "content": final_message}) input_ids = tokenizer.apply_chat_template(conversation, tokenize=False, add_generation_prompt=True) inputs = tokenizer(input_ids, return_tensors="pt").to(0) streamer = TextIteratorStreamer(tokenizer, timeout=10., skip_prompt=True, skip_special_tokens=True) generate_kwargs = dict( inputs, streamer=streamer, top_k=top_k, top_p=top_p, repetition_penalty=penalty, max_new_tokens=max_new_tokens, do_sample=True, temperature=temperature, eos_token_id=[255001], ) thread = Thread(target=model.generate, kwargs=generate_kwargs) thread.start() buffer = "" for new_text in streamer: buffer += new_text yield "", history + [[message, buffer]] except Exception as e: error_message = f"오류가 발생했습니다: {str(e)}" yield "", history + [[message, error_message]] CSS = """ /* 3D 스타일 CSS */ :root { --primary-color: #2196f3; --secondary-color: #1976d2; --background-color: #f0f2f5; --card-background: #ffffff; --text-color: #333333; --shadow-color: rgba(0, 0, 0, 0.1); } body { background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); min-height: 100vh; font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; } .container { transform-style: preserve-3d; perspective: 1000px; } .chatbot { background: var(--card-background); border-radius: 20px; box-shadow: 0 10px 20px var(--shadow-color), 0 6px 6px var(--shadow-color); transform: translateZ(0); transition: transform 0.3s ease; backdrop-filter: blur(10px); } .chatbot:hover { transform: translateZ(10px); } /* 메시지 입력 영역 */ .input-area { background: var(--card-background); border-radius: 15px; padding: 15px; margin-top: 20px; box-shadow: 0 5px 15px var(--shadow-color), 0 3px 3px var(--shadow-color); transform: translateZ(0); transition: all 0.3s ease; display: flex; align-items: center; gap: 10px; } .input-area:hover { transform: translateZ(5px); } /* 버튼 스타일 */ .custom-button { background: linear-gradient(145deg, var(--primary-color), var(--secondary-color)); color: white; border: none; border-radius: 10px; padding: 10px 20px; font-weight: 600; cursor: pointer; transform: translateZ(0); transition: all 0.3s ease; box-shadow: 0 4px 6px var(--shadow-color), 0 1px 3px var(--shadow-color); } .custom-button:hover { transform: translateZ(5px) translateY(-2px); box-shadow: 0 7px 14px var(--shadow-color), 0 3px 6px var(--shadow-color); } /* 파일 업로드 버튼 */ .file-upload-button { background: linear-gradient(145deg, #64b5f6, #42a5f5); color: white; border: none; border-radius: 10px; padding: 8px 15px; margin-right: 10px; cursor: pointer; transform: translateZ(0); transition: all 0.3s ease; } .file-upload-button:hover { transform: translateZ(5px) translateY(-2px); } /* 메시지 스타일 */ .message { background: var(--card-background); border-radius: 15px; padding: 15px; margin: 10px 0; box-shadow: 0 4px 6px var(--shadow-color), 0 1px 3px var(--shadow-color); transform: translateZ(0); transition: all 0.3s ease; } .message:hover { transform: translateZ(5px); } /* 설정 패널 */ .settings-panel { background: var(--card-background); border-radius: 15px; padding: 15px; margin-top: 20px; box-shadow: 0 5px 15px var(--shadow-color), 0 3px 3px var(--shadow-color); transform: translateZ(0); transition: all 0.3s ease; } .settings-panel:hover { transform: translateZ(5px); } /* 슬라이더 스타일 */ .slider-container { margin: 10px 0; } .slider { -webkit-appearance: none; width: 100%; height: 5px; border-radius: 5px; background: #ddd; outline: none; } .slider::-webkit-slider-thumb { -webkit-appearance: none; appearance: none; width: 15px; height: 15px; border-radius: 50%; background: var(--primary-color); cursor: pointer; transition: all 0.3s ease; } .slider::-webkit-slider-thumb:hover { transform: scale(1.2); } """ # UI 구성 수정 with gr.Blocks(css=CSS) as demo: with gr.Column(): chatbot = gr.Chatbot( value=[], height=500, label="GiniGEN AI Assistant", elem_classes="chatbot" ) with gr.Row(elem_classes="input-area"): file_upload = gr.File( label="📎 파일 첨부", type="filepath", scale=1, # 정수값으로 수정 min_width=50, elem_classes="file-upload-button" ) msg = gr.Textbox( show_label=False, placeholder="메시지를 입력하세요... 💭", scale=4, # 정수값으로 수정 container=False ) send = gr.Button( "전송", scale=1, # 정수값으로 수정 min_width=50, elem_classes="custom-button" ) with gr.Accordion("🎮 고급 설정", open=False, elem_classes="settings-panel"): with gr.Row(): with gr.Column(scale=1): temperature = gr.Slider( minimum=0, maximum=1, step=0.1, value=0.8, label="창의성 수준 🎨", elem_classes="slider" ) max_new_tokens = gr.Slider( minimum=128, maximum=8000, step=1, value=4000, label="최대 토큰 수 📝", elem_classes="slider" ) with gr.Column(scale=1): top_p = gr.Slider( minimum=0.0, maximum=1.0, step=0.1, value=0.8, label="다양성 조절 🎯", elem_classes="slider" ) top_k = gr.Slider( minimum=1, maximum=20, step=1, value=20, label="선택 범위 📊", elem_classes="slider" ) penalty = gr.Slider( minimum=0.0, maximum=2.0, step=0.1, value=1.0, label="반복 억제 🔄", elem_classes="slider" ) # Examples 컴포넌트 수정 gr.Examples( examples=[ ["한국의 전통 절기와 24절기에 대해 자세히 설명해주세요."], ["우리나라 전통 음식 중 건강에 좋은 발효음식 5가지를 추천하고 그 효능을 설명해주세요."], ["한국의 대표적인 산들을 소개하고, 각 산의 특징과 등산 코스를 추천해주세요."], ["사물놀이의 악기 구성과 장단에 대해 초보자도 이해하기 쉽게 설명해주세요."], ["한국의 전통 건축물에 담긴 과학적 원리를 현대적 관점에서 분석해주세요."], ], inputs=msg ) # 이벤트 바인딩 (이전과 동일) msg.submit( stream_chat, inputs=[msg, chatbot, file_upload, temperature, max_new_tokens, top_p, top_k, penalty], outputs=[msg, chatbot] ) send.click( stream_chat, inputs=[msg, chatbot, file_upload, temperature, max_new_tokens, top_p, top_k, penalty], outputs=[msg, chatbot] ) def init_msg(): return "파일 분석을 시작합니다..." file_upload.change( init_msg, outputs=msg ).then( stream_chat, inputs=[msg, chatbot, file_upload, temperature, max_new_tokens, top_p, top_k, penalty], outputs=[msg, chatbot] ) if __name__ == "__main__": demo.launch()