import torch import gradio as gr import spaces from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer import os from threading import Thread import random from datasets import load_dataset import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer import pandas as pd from typing import List, Tuple import json from datetime import datetime # GPU 메모리 관리 torch.cuda.empty_cache() # 환경 변수 설정 HF_TOKEN = os.environ.get("HF_TOKEN", None) MODEL_ID = "CohereForAI/c4ai-command-r7b-12-2024" MODELS = os.environ.get("MODELS") MODEL_NAME = MODEL_ID.split("/")[-1] # 모델과 토크나이저 로드 model = AutoModelForCausalLM.from_pretrained( MODEL_ID, torch_dtype=torch.bfloat16, device_map="auto", ) tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) # 위키피디아 데이터셋 로드 wiki_dataset = load_dataset("lcw99/wikipedia-korean-20240501-1million-qna") print("Wikipedia dataset loaded:", wiki_dataset) # TF-IDF 벡터라이저 초기화 및 학습 print("TF-IDF 벡터화 시작...") questions = wiki_dataset['train']['question'][:10000] # 처음 10000개만 사용 vectorizer = TfidfVectorizer(max_features=1000) question_vectors = vectorizer.fit_transform(questions) print("TF-IDF 벡터화 완료") class ChatHistory: def __init__(self): self.history = [] self.history_file = "/tmp/chat_history.json" self.load_history() def add_conversation(self, user_msg: str, assistant_msg: str): conversation = { "timestamp": datetime.now().isoformat(), "messages": [ {"role": "user", "content": user_msg}, {"role": "assistant", "content": assistant_msg} ] } self.history.append(conversation) self.save_history() def format_for_display(self): formatted = [] for conv in self.history: formatted.append([ conv["messages"][0]["content"], conv["messages"][1]["content"] ]) return formatted def get_messages_for_api(self): messages = [] for conv in self.history: messages.extend([ {"role": "user", "content": conv["messages"][0]["content"]}, {"role": "assistant", "content": conv["messages"][1]["content"]} ]) return messages def clear_history(self): self.history = [] self.save_history() def save_history(self): try: with open(self.history_file, 'w', encoding='utf-8') as f: json.dump(self.history, f, ensure_ascii=False, indent=2) except Exception as e: print(f"히스토리 저장 실패: {e}") def load_history(self): try: if os.path.exists(self.history_file): with open(self.history_file, 'r', encoding='utf-8') as f: self.history = json.load(f) except Exception as e: print(f"히스토리 로드 실패: {e}") self.history = [] # 전역 ChatHistory 인스턴스 생성 chat_history = ChatHistory() def find_relevant_context(query, top_k=3): # 쿼리 벡터화 query_vector = vectorizer.transform([query]) # 코사인 유사도 계산 similarities = (query_vector * question_vectors.T).toarray()[0] # 가장 유사한 질문들의 인덱스 top_indices = np.argsort(similarities)[-top_k:][::-1] # 관련 컨텍스트 추출 relevant_contexts = [] for idx in top_indices: if similarities[idx] > 0: relevant_contexts.append({ 'question': questions[idx], 'answer': wiki_dataset['train']['answer'][idx], 'similarity': similarities[idx] }) return relevant_contexts def analyze_file_content(content, file_type): """Analyze file content and return structural summary""" if file_type in ['parquet', 'csv']: try: lines = content.split('\n') header = lines[0] columns = header.count('|') - 1 rows = len(lines) - 3 return f"📊 데이터셋 구조: {columns}개 컬럼, {rows}개 데이터" except: return "❌ 데이터셋 구조 분석 실패" lines = content.split('\n') total_lines = len(lines) non_empty_lines = len([line for line in lines if line.strip()]) if any(keyword in content.lower() for keyword in ['def ', 'class ', 'import ', 'function']): functions = len([line for line in lines if 'def ' in line]) classes = len([line for line in lines if 'class ' in line]) imports = len([line for line in lines if 'import ' in line or 'from ' in line]) return f"💻 코드 구조: {total_lines}줄 (함수: {functions}, 클래스: {classes}, 임포트: {imports})" paragraphs = content.count('\n\n') + 1 words = len(content.split()) return f"📝 문서 구조: {total_lines}줄, {paragraphs}단락, 약 {words}단어" def read_uploaded_file(file): if file is None: return "", "" try: file_ext = os.path.splitext(file.name)[1].lower() if file_ext == '.parquet': df = pd.read_parquet(file.name, engine='pyarrow') content = df.head(10).to_markdown(index=False) return content, "parquet" elif file_ext == '.csv': encodings = ['utf-8', 'cp949', 'euc-kr', 'latin1'] for encoding in encodings: try: df = pd.read_csv(file.name, encoding=encoding) content = f"📊 데이터 미리보기:\n{df.head(10).to_markdown(index=False)}\n\n" content += f"\n📈 데이터 정보:\n" content += f"- 전체 행 수: {len(df)}\n" content += f"- 전체 열 수: {len(df.columns)}\n" content += f"- 컬럼 목록: {', '.join(df.columns)}\n" content += f"\n📋 컬럼 데이터 타입:\n" for col, dtype in df.dtypes.items(): content += f"- {col}: {dtype}\n" null_counts = df.isnull().sum() if null_counts.any(): content += f"\n⚠️ 결측치:\n" for col, null_count in null_counts[null_counts > 0].items(): content += f"- {col}: {null_count}개 누락\n" return content, "csv" except UnicodeDecodeError: continue raise UnicodeDecodeError(f"❌ 지원되는 인코딩으로 파일을 읽을 수 없습니다 ({', '.join(encodings)})") else: encodings = ['utf-8', 'cp949', 'euc-kr', 'latin1'] for encoding in encodings: try: with open(file.name, 'r', encoding=encoding) as f: content = f.read() return content, "text" except UnicodeDecodeError: continue raise UnicodeDecodeError(f"❌ 지원되는 인코딩으로 파일을 읽을 수 없습니다 ({', '.join(encodings)})") except Exception as e: return f"❌ 파일 읽기 오류: {str(e)}", "error" def read_uploaded_file(file): if file is None: return "", "" try: file_ext = os.path.splitext(file.name)[1].lower() if file_ext == '.parquet': df = pd.read_parquet(file.name) content = f"📊 데이터 미리보기:\n{df.head(10).to_markdown(index=False)}\n\n" content += f"\n📈 데이터 정보:\n" content += f"- 전체 행 수: {len(df)}\n" content += f"- 전체 열 수: {len(df.columns)}\n" content += f"- 컬럼 목록: {', '.join(df.columns)}\n" return content, "parquet" elif file_ext == '.csv': encodings = ['utf-8', 'cp949', 'euc-kr', 'latin1'] for encoding in encodings: try: df = pd.read_csv(file.name, encoding=encoding) content = f"📊 데이터 미리보기:\n{df.head(10).to_markdown(index=False)}\n\n" content += f"\n📈 데이터 정보:\n" content += f"- 전체 행 수: {len(df)}\n" content += f"- 전체 열 수: {len(df.columns)}\n" content += f"- 컬럼 목록: {', '.join(df.columns)}\n" content += f"\n📋 컬럼 데이터 타입:\n" for col, dtype in df.dtypes.items(): content += f"- {col}: {dtype}\n" null_counts = df.isnull().sum() if null_counts.any(): content += f"\n⚠️ 결측치:\n" for col, null_count in null_counts[null_counts > 0].items(): content += f"- {col}: {null_count}개 누락\n" return content, "csv" except UnicodeDecodeError: continue raise UnicodeDecodeError(f"지원되는 인코딩으로 파일을 읽을 수 없습니다 ({', '.join(encodings)})") else: # 텍스트 파일 encodings = ['utf-8', 'cp949', 'euc-kr', 'latin1'] for encoding in encodings: try: with open(file.name, 'r', encoding=encoding) as f: content = f.read() # 파일 내용 분석 lines = content.split('\n') total_lines = len(lines) non_empty_lines = len([line for line in lines if line.strip()]) # 코드 파일 여부 확인 is_code = any(keyword in content.lower() for keyword in ['def ', 'class ', 'import ', 'function']) if is_code: # 코드 파일 분석 functions = len([line for line in lines if 'def ' in line]) classes = len([line for line in lines if 'class ' in line]) imports = len([line for line in lines if 'import ' in line or 'from ' in line]) analysis = f"\n📝 코드 분석:\n" analysis += f"- 전체 라인 수: {total_lines}\n" analysis += f"- 함수 수: {functions}\n" analysis += f"- 클래스 수: {classes}\n" analysis += f"- import 문 수: {imports}\n" else: # 일반 텍스트 파일 분석 words = len(content.split()) chars = len(content) analysis = f"\n📝 텍스트 분석:\n" analysis += f"- 전체 라인 수: {total_lines}\n" analysis += f"- 실제 내용이 있는 라인 수: {non_empty_lines}\n" analysis += f"- 단어 수: {words}\n" analysis += f"- 문자 수: {chars}\n" return content + analysis, "text" except UnicodeDecodeError: continue raise UnicodeDecodeError(f"지원되는 인코딩으로 파일을 읽을 수 없습니다 ({', '.join(encodings)})") except Exception as e: return f"파일 읽기 오류: {str(e)}", "error" # 파일 업로드 이벤트 핸들링 수정 def init_msg(): return "파일을 분석하고 있습니다..." CSS = """ /* 3D 스타일 CSS */ :root { --primary-color: #2196f3; --secondary-color: #1976d2; --background-color: #f0f2f5; --card-background: #ffffff; --text-color: #333333; --shadow-color: rgba(0, 0, 0, 0.1); } body { background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); min-height: 100vh; font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; } .container { transform-style: preserve-3d; perspective: 1000px; } .chatbot { background: var(--card-background); border-radius: 20px; box-shadow: 0 10px 20px var(--shadow-color), 0 6px 6px var(--shadow-color); transform: translateZ(0); transition: transform 0.3s ease; backdrop-filter: blur(10px); } .chatbot:hover { transform: translateZ(10px); } /* 메시지 입력 영역 */ .input-area { background: var(--card-background); border-radius: 15px; padding: 15px; margin-top: 20px; box-shadow: 0 5px 15px var(--shadow-color), 0 3px 3px var(--shadow-color); transform: translateZ(0); transition: all 0.3s ease; display: flex; align-items: center; gap: 10px; } .input-area:hover { transform: translateZ(5px); } /* 버튼 스타일 */ .custom-button { background: linear-gradient(145deg, var(--primary-color), var(--secondary-color)); color: white; border: none; border-radius: 10px; padding: 10px 20px; font-weight: 600; cursor: pointer; transform: translateZ(0); transition: all 0.3s ease; box-shadow: 0 4px 6px var(--shadow-color), 0 1px 3px var(--shadow-color); } .custom-button:hover { transform: translateZ(5px) translateY(-2px); box-shadow: 0 7px 14px var(--shadow-color), 0 3px 6px var(--shadow-color); } /* 파일 업로드 버튼 */ .file-upload-icon { background: linear-gradient(145deg, #64b5f6, #42a5f5); color: white; border-radius: 8px; font-size: 2em; cursor: pointer; display: flex; align-items: center; justify-content: center; height: 70px; width: 70px; transition: all 0.3s ease; box-shadow: 0 2px 5px rgba(0,0,0,0.1); } .file-upload-icon:hover { transform: translateY(-2px); box-shadow: 0 4px 8px rgba(0,0,0,0.2); } /* 파일 업로드 버튼 내부 요소 스타일링 */ .file-upload-icon > .wrap { display: flex !important; align-items: center; justify-content: center; width: 100%; height: 100%; } .file-upload-icon > .wrap > p { display: none !important; } .file-upload-icon > .wrap::before { content: "📁"; font-size: 2em; display: block; } /* 메시지 스타일 */ .message { background: var(--card-background); border-radius: 15px; padding: 15px; margin: 10px 0; box-shadow: 0 4px 6px var(--shadow-color), 0 1px 3px var(--shadow-color); transform: translateZ(0); transition: all 0.3s ease; } .message:hover { transform: translateZ(5px); } .chat-container { height: 600px !important; margin-bottom: 10px; } .input-container { height: 70px !important; display: flex; align-items: center; gap: 10px; margin-top: 5px; } .input-textbox { height: 70px !important; border-radius: 8px !important; font-size: 1.1em !important; padding: 10px 15px !important; display: flex !important; align-items: flex-start !important; /* 텍스트 입력 위치를 위로 조정 */ } .input-textbox textarea { padding-top: 5px !important; /* 텍스트 상단 여백 조정 */ } .send-button { height: 70px !important; min-width: 70px !important; font-size: 1.1em !important; } /* 설정 패널 기본 스타일 */ .settings-panel { padding: 20px; margin-top: 20px; } """ @spaces.GPU def stream_chat(message: str, history: list, uploaded_file, temperature: float, max_new_tokens: int, top_p: float, top_k: int, penalty: float): try: print(f'message is - {message}') print(f'history is - {history}') # 파일 업로드 처리 file_context = "" if uploaded_file and message == "파일을 분석하고 있습니다...": try: content, file_type = read_uploaded_file(uploaded_file) if content: file_analysis = analyze_file_content(content, file_type) file_context = f"\n\n📄 파일 분석 결과:\n{file_analysis}\n\n파일 내용:\n```\n{content}\n```" message = "업로드된 파일을 분석해주세요." except Exception as e: print(f"파일 분석 오류: {str(e)}") file_context = f"\n\n❌ 파일 분석 중 오류가 발생했습니다: {str(e)}" # 관련 컨텍스트 찾기 try: relevant_contexts = find_relevant_context(message) wiki_context = "\n\n관련 위키피디아 정보:\n" for ctx in relevant_contexts: wiki_context += f"Q: {ctx['question']}\nA: {ctx['answer']}\n유사도: {ctx['similarity']:.3f}\n\n" except Exception as e: print(f"컨텍스트 검색 오류: {str(e)}") wiki_context = "" # 대화 히스토리 구성 conversation = [] for prompt, answer in history: conversation.extend([ {"role": "user", "content": prompt}, {"role": "assistant", "content": answer} ]) # 최종 프롬프트 구성 final_message = file_context + wiki_context + "\n현재 질문: " + message conversation.append({"role": "user", "content": final_message}) # 토크나이저 설정 input_ids = tokenizer.apply_chat_template(conversation, tokenize=False, add_generation_prompt=True) inputs = tokenizer(input_ids, return_tensors="pt").to(0) streamer = TextIteratorStreamer(tokenizer, timeout=10., skip_prompt=True, skip_special_tokens=True) generate_kwargs = dict( inputs, streamer=streamer, top_k=top_k, top_p=top_p, repetition_penalty=penalty, max_new_tokens=max_new_tokens, do_sample=True, temperature=temperature, eos_token_id=[255001], ) thread = Thread(target=model.generate, kwargs=generate_kwargs) thread.start() buffer = "" for new_text in streamer: buffer += new_text yield "", history + [[message, buffer]] except Exception as e: error_message = f"오류가 발생했습니다: {str(e)}" print(f"Stream chat 오류: {error_message}") yield "", history + [[message, error_message]] def create_demo(): with gr.Blocks(css=CSS) as demo: with gr.Column(): chatbot = gr.Chatbot( value=[], height=600, label="GiniGEN AI Assistant", elem_classes="chat-container" ) with gr.Row(elem_classes="input-container"): with gr.Column(scale=1, min_width=70): file_upload = gr.File( type="filepath", elem_classes="file-upload-icon", scale=1, container=True, interactive=True, show_label=False ) with gr.Column(scale=4): msg = gr.Textbox( show_label=False, placeholder="메시지를 입력하세요... 💭", container=False, elem_classes="input-textbox", scale=1 ) with gr.Column(scale=1, min_width=70): send = gr.Button( "전송", elem_classes="send-button custom-button", scale=1 ) # 이벤트 바인딩 수정 def init_msg(): return "파일을 분석하고 있습니다..." file_upload.change( fn=init_msg, outputs=msg, queue=False ).then( fn=stream_chat, inputs=[msg, chatbot, file_upload, temperature, max_new_tokens, top_p, top_k, penalty], outputs=[msg, chatbot], queue=True ) with gr.Accordion("🎮 고급 설정", open=False): with gr.Row(): with gr.Column(scale=1): temperature = gr.Slider( minimum=0, maximum=1, step=0.1, value=0.8, label="창의성 수준 🎨" ) max_new_tokens = gr.Slider( minimum=128, maximum=8000, step=1, value=4000, label="최대 토큰 수 📝" ) with gr.Column(scale=1): top_p = gr.Slider( minimum=0.0, maximum=1.0, step=0.1, value=0.8, label="다양성 조절 🎯" ) top_k = gr.Slider( minimum=1, maximum=20, step=1, value=20, label="선택 범위 📊" ) penalty = gr.Slider( minimum=0.0, maximum=2.0, step=0.1, value=1.0, label="반복 억제 🔄" ) # Examples 위치 수정 gr.Examples( examples=[ ["다음 코드의 문제점을 찾아내고 개선된 버전을 제시해주세요:\ndef fibonacci(n):\n if n <= 1: return n\n return fibonacci(n-1) + fibonacci(n-2)"], ["다음 영어 문장을 한국어로 번역하고, 어휘와 문법적 특징을 설명해주세요: 'The implementation of artificial intelligence in healthcare has revolutionized patient care, yet it raises ethical concerns regarding privacy and decision-making autonomy.'"], ["주어진 데이터를 분석하고 인사이트를 도출해주세요:\n연도별 매출액(억원)\n2019: 1200\n2020: 980\n2021: 1450\n2022: 2100\n2023: 1890"], ["다음 시나리오에 대한 SWOT 분석을 해주세요: '전통적인 오프라인 서점이 온라인 플랫폼으로의 전환을 고려중입니다. 독자들의 디지털 콘텐츠 소비가 증가하는 상황에서 경쟁력을 유지하면서 기존 고객층도 지키고 싶습니다.'"], ["다음 수학 문제를 단계별로 자세히 풀이해주세요: '한 원의 넓이가 그 원에 내접하는 정사각형 넓이의 2배일 때, 원의 반지름과 정사각형의 한 변의 길이의 관계를 구하시오.'"], ["다음 SQL 쿼리를 최적화하고 개선점을 설명해주세요:\nSELECT * FROM orders o\nLEFT JOIN customers c ON o.customer_id = c.id\nWHERE YEAR(o.order_date) = 2023\nAND c.country = 'Korea'\nORDER BY o.order_date DESC;"], ["다음 마케팅 캠페인의 ROI를 분석하고 개선방안을 제시해주세요:\n총 비용: 5000만원\n도달자 수: 100만명\n클릭률: 2.3%\n전환율: 0.8%\n평균 구매액: 35,000원"], ], inputs=msg ) # 이벤트 바인딩 msg.submit( stream_chat, inputs=[msg, chatbot, file_upload, temperature, max_new_tokens, top_p, top_k, penalty], outputs=[msg, chatbot] ) send.click( stream_chat, inputs=[msg, chatbot, file_upload, temperature, max_new_tokens, top_p, top_k, penalty], outputs=[msg, chatbot] ) file_upload.change( init_msg, outputs=msg ).then( stream_chat, inputs=[msg, chatbot, file_upload, temperature, max_new_tokens, top_p, top_k, penalty], outputs=[msg, chatbot] ) return demo if __name__ == "__main__": demo = create_demo() demo.launch()