Spaces:
Building
Building
import os | |
from dotenv import load_dotenv | |
import gradio as gr | |
from huggingface_hub import InferenceClient | |
import pandas as pd | |
from typing import List, Tuple | |
import json | |
from datetime import datetime | |
# νκ²½ λ³μ μ€μ | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
# LLM Models Definition | |
LLM_MODELS = { | |
"Cohere c4ai-crp-08-2024": "CohereForAI/c4ai-command-r-plus-08-2024", # Default | |
"Meta Llama3.3-70B": "meta-llama/Llama-3.3-70B-Instruct" # Backup model | |
} | |
# λν νμ€ν 리λ₯Ό μ μ₯ν ν΄λμ€ | |
class ChatHistory: | |
def __init__(self): | |
self.history = [] | |
self.history_file = "/tmp/chat_history.json" # HF Spaceμμ μ¬μ©ν μμ κ²½λ‘ | |
self.load_history() | |
def add_message(self, role: str, content: str): | |
message = { | |
"role": role, | |
"content": content, | |
"timestamp": datetime.now().isoformat() | |
} | |
self.history.append(message) | |
self.save_history() | |
def get_history(self): | |
return self.history | |
def clear_history(self): | |
self.history = [] | |
self.save_history() | |
def save_history(self): | |
try: | |
with open(self.history_file, 'w', encoding='utf-8') as f: | |
json.dump(self.history, f, ensure_ascii=False, indent=2) | |
except Exception as e: | |
print(f"νμ€ν 리 μ μ₯ μ€ν¨: {e}") | |
def load_history(self): | |
try: | |
if os.path.exists(self.history_file): | |
with open(self.history_file, 'r', encoding='utf-8') as f: | |
self.history = json.load(f) | |
except Exception as e: | |
print(f"νμ€ν 리 λ‘λ μ€ν¨: {e}") | |
self.history = [] | |
# μ μ ChatHistory μΈμ€ν΄μ€ μμ± | |
chat_history = ChatHistory() | |
def get_client(model_name="Cohere c4ai-crp-08-2024"): | |
try: | |
return InferenceClient(LLM_MODELS[model_name], token=HF_TOKEN) | |
except Exception: | |
return InferenceClient(LLM_MODELS["Meta Llama3.3-70B"], token=HF_TOKEN) | |
def analyze_file_content(content, file_type): | |
"""Analyze file content and return structural summary""" | |
if file_type in ['parquet', 'csv']: | |
try: | |
lines = content.split('\n') | |
header = lines[0] | |
columns = header.count('|') - 1 | |
rows = len(lines) - 3 | |
return f"π λ°μ΄ν°μ ꡬ쑰: {columns}κ° μ»¬λΌ, {rows}κ° λ°μ΄ν°" | |
except: | |
return "β λ°μ΄ν°μ ꡬ쑰 λΆμ μ€ν¨" | |
lines = content.split('\n') | |
total_lines = len(lines) | |
non_empty_lines = len([line for line in lines if line.strip()]) | |
if any(keyword in content.lower() for keyword in ['def ', 'class ', 'import ', 'function']): | |
functions = len([line for line in lines if 'def ' in line]) | |
classes = len([line for line in lines if 'class ' in line]) | |
imports = len([line for line in lines if 'import ' in line or 'from ' in line]) | |
return f"π» μ½λ ꡬ쑰: {total_lines}μ€ (ν¨μ: {functions}, ν΄λμ€: {classes}, μν¬νΈ: {imports})" | |
paragraphs = content.count('\n\n') + 1 | |
words = len(content.split()) | |
return f"π λ¬Έμ ꡬ쑰: {total_lines}μ€, {paragraphs}λ¨λ½, μ½ {words}λ¨μ΄" | |
def read_uploaded_file(file): | |
if file is None: | |
return "", "" | |
try: | |
file_ext = os.path.splitext(file.name)[1].lower() | |
if file_ext == '.parquet': | |
df = pd.read_parquet(file.name, engine='pyarrow') | |
content = df.head(10).to_markdown(index=False) | |
return content, "parquet" | |
elif file_ext == '.csv': | |
encodings = ['utf-8', 'cp949', 'euc-kr', 'latin1'] | |
for encoding in encodings: | |
try: | |
df = pd.read_csv(file.name, encoding=encoding) | |
content = f"π λ°μ΄ν° 미리보기:\n{df.head(10).to_markdown(index=False)}\n\n" | |
content += f"\nπ λ°μ΄ν° μ 보:\n" | |
content += f"- μ 체 ν μ: {len(df)}\n" | |
content += f"- μ 체 μ΄ μ: {len(df.columns)}\n" | |
content += f"- μ»¬λΌ λͺ©λ‘: {', '.join(df.columns)}\n" | |
content += f"\nπ μ»¬λΌ λ°μ΄ν° νμ :\n" | |
for col, dtype in df.dtypes.items(): | |
content += f"- {col}: {dtype}\n" | |
null_counts = df.isnull().sum() | |
if null_counts.any(): | |
content += f"\nβ οΈ κ²°μΈ‘μΉ:\n" | |
for col, null_count in null_counts[null_counts > 0].items(): | |
content += f"- {col}: {null_count}κ° λλ½\n" | |
return content, "csv" | |
except UnicodeDecodeError: | |
continue | |
raise UnicodeDecodeError(f"β μ§μλλ μΈμ½λ©μΌλ‘ νμΌμ μ½μ μ μμ΅λλ€ ({', '.join(encodings)})") | |
else: | |
encodings = ['utf-8', 'cp949', 'euc-kr', 'latin1'] | |
for encoding in encodings: | |
try: | |
with open(file.name, 'r', encoding=encoding) as f: | |
content = f.read() | |
return content, "text" | |
except UnicodeDecodeError: | |
continue | |
raise UnicodeDecodeError(f"β μ§μλλ μΈμ½λ©μΌλ‘ νμΌμ μ½μ μ μμ΅λλ€ ({', '.join(encodings)})") | |
except Exception as e: | |
return f"β νμΌ μ½κΈ° μ€λ₯: {str(e)}", "error" | |
def format_history(history): | |
formatted_history = [] | |
for user_msg, assistant_msg in history: | |
formatted_history.append({"role": "user", "content": user_msg}) | |
if assistant_msg: | |
formatted_history.append({"role": "assistant", "content": assistant_msg}) | |
return formatted_history | |
def chat(message, history, uploaded_file, system_message="", max_tokens=4000, temperature=0.7, top_p=0.9): | |
system_prefix = """μ λ μ¬λ¬λΆμ μΉκ·Όνκ³ μ§μ μΈ AI μ΄μμ€ν΄νΈ 'GiniGEN'μ λλ€.. λ€μκ³Ό κ°μ μμΉμΌλ‘ μν΅νκ² μ΅λλ€: | |
1. π€ μΉκ·Όνκ³ κ³΅κ°μ μΈ νλλ‘ λν | |
2. π‘ λͺ ννκ³ μ΄ν΄νκΈ° μ¬μ΄ μ€λͺ μ 곡 | |
3. π― μ§λ¬Έμ μλλ₯Ό μ νν νμ νμ¬ λ§μΆ€ν λ΅λ³ | |
4. π νμν κ²½μ° μ λ‘λλ νμΌ λ΄μ©μ μ°Έκ³ νμ¬ κ΅¬μ²΄μ μΈ λμ μ 곡 | |
5. β¨ μΆκ°μ μΈ ν΅μ°°κ³Ό μ μμ ν΅ν κ°μΉ μλ λν | |
νμ μμ λ°λ₯΄κ³ μΉμ νκ² μλ΅νλ©°, νμν κ²½μ° κ΅¬μ²΄μ μΈ μμλ μ€λͺ μ μΆκ°νμ¬ | |
μ΄ν΄λ₯Ό λκ² μ΅λλ€.""" | |
# μ¬μ©μ λ©μμ§ μ μ₯ | |
chat_history.add_message("user", message) | |
if uploaded_file: | |
content, file_type = read_uploaded_file(uploaded_file) | |
if file_type == "error": | |
error_message = content | |
chat_history.add_message("assistant", error_message) | |
return "", [{"role": "user", "content": message}, | |
{"role": "assistant", "content": error_message}] | |
file_summary = analyze_file_content(content, file_type) | |
if file_type in ['parquet', 'csv']: | |
system_message += f"\n\nνμΌ λ΄μ©:\n```markdown\n{content}\n```" | |
else: | |
system_message += f"\n\nνμΌ λ΄μ©:\n```\n{content}\n```" | |
if message == "νμΌ λΆμμ μμν©λλ€...": | |
message = f"""[νμΌ κ΅¬μ‘° λΆμ] {file_summary} | |
λ€μ κ΄μ μμ λμμ λλ¦¬κ² μ΅λλ€: | |
1. π μ λ°μ μΈ λ΄μ© νμ | |
2. π‘ μ£Όμ νΉμ§ μ€λͺ | |
3. π― μ€μ©μ μΈ νμ© λ°©μ | |
4. β¨ κ°μ μ μ | |
5. π¬ μΆκ° μ§λ¬Έμ΄λ νμν μ€λͺ """ | |
messages = [{"role": "system", "content": f"{system_prefix} {system_message}"}] | |
if history is not None: | |
for item in history: | |
if isinstance(item, dict): | |
messages.append(item) | |
elif isinstance(item, (list, tuple)) and len(item) == 2: | |
messages.append({"role": "user", "content": item[0]}) | |
if item[1]: | |
messages.append({"role": "assistant", "content": item[1]}) | |
messages.append({"role": "user", "content": message}) | |
try: | |
client = get_client() | |
partial_message = "" | |
current_history = [] | |
for msg in client.chat_completion( | |
messages, | |
max_tokens=max_tokens, | |
stream=True, | |
temperature=temperature, | |
top_p=top_p, | |
): | |
token = msg.choices[0].delta.get('content', None) | |
if token: | |
partial_message += token | |
current_history = [ | |
{"role": "user", "content": message}, | |
{"role": "assistant", "content": partial_message} | |
] | |
yield "", current_history | |
# μμ±λ μλ΅ μ μ₯ | |
chat_history.add_message("assistant", partial_message) | |
except Exception as e: | |
error_msg = f"β μ€λ₯κ° λ°μνμ΅λλ€: {str(e)}" | |
chat_history.add_message("assistant", error_msg) | |
error_history = [ | |
{"role": "user", "content": message}, | |
{"role": "assistant", "content": error_msg} | |
] | |
yield "", error_history | |
with gr.Blocks(theme="Yntec/HaleyCH_Theme_Orange", title="GiniGEN π€") as demo: | |
gr.HTML( | |
""" | |
<div style="text-align: center; max-width: 800px; margin: 0 auto;"> | |
<h3 style="font-size: 3em; font-weight: 600; margin: 0.5em;">GiniGEN π€</h3> | |
</div> | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
chatbot = gr.Chatbot( | |
height=600, | |
label="λνμ°½ π¬", | |
show_label=True, | |
type="messages" | |
) | |
msg = gr.Textbox( | |
label="λ©μμ§ μ λ ₯", | |
show_label=False, | |
placeholder="무μμ΄λ λ¬Όμ΄λ³΄μΈμ... π", | |
container=False | |
) | |
with gr.Row(): | |
clear = gr.ClearButton([msg, chatbot], value="λνλ΄μ© μ§μ°κΈ°") | |
send = gr.Button("보λ΄κΈ° π€") | |
with gr.Column(scale=1): | |
gr.Markdown("### νμΌ μ λ‘λ π\nμ§μ νμ: ν μ€νΈ, μ½λ, CSV, Parquet νμΌ") | |
file_upload = gr.File( | |
label="νμΌ μ ν", | |
file_types=["text", ".csv", ".parquet"], | |
type="filepath" | |
) | |
with gr.Accordion("κ³ κΈ μ€μ βοΈ", open=False): | |
system_message = gr.Textbox(label="μμ€ν λ©μμ§ π", value="") | |
max_tokens = gr.Slider(minimum=1, maximum=8000, value=4000, label="μ΅λ ν ν° μ π") | |
temperature = gr.Slider(minimum=0, maximum=1, value=0.7, label="μ°½μμ± μμ€ π‘οΈ") | |
top_p = gr.Slider(minimum=0, maximum=1, value=0.9, label="μλ΅ λ€μμ± π") | |
# μμ μ§λ¬Έ | |
gr.Examples( | |
examples=[ | |
["μλ νμΈμ! μ΄λ€ λμμ΄ νμνμ κ°μ? π€"], | |
["μ΄ λ΄μ©μ λν΄ μ’ λ μμΈν μ€λͺ ν΄ μ£Όμ€ μ μλμ? π‘"], | |
["μ κ° μ΄ν΄νκΈ° μ½κ² μ€λͺ ν΄ μ£Όμκ² μ΄μ? π"], | |
["μ΄ λ΄μ©μ μ€μ λ‘ μ΄λ»κ² νμ©ν μ μμκΉμ? π―"], | |
["μΆκ°λ‘ μ‘°μΈν΄ μ£Όμ€ λ΄μ©μ΄ μμΌμ κ°μ? β¨"], | |
["κΆκΈν μ μ΄ λ μλλ° μ¬μ€λ΄λ λ κΉμ? π€"], | |
], | |
inputs=msg, | |
) | |
# λνλ΄μ© μ§μ°κΈ° λ²νΌμ νμ€ν 리 μ΄κΈ°ν κΈ°λ₯ μΆκ° | |
def clear_chat(): | |
chat_history.clear_history() | |
return None, None | |
# μ΄λ²€νΈ λ°μΈλ© | |
msg.submit( | |
chat, | |
inputs=[msg, chatbot, file_upload, system_message, max_tokens, temperature, top_p], | |
outputs=[msg, chatbot] | |
) | |
send.click( | |
chat, | |
inputs=[msg, chatbot, file_upload, system_message, max_tokens, temperature, top_p], | |
outputs=[msg, chatbot] | |
) | |
clear.click( | |
clear_chat, | |
outputs=[msg, chatbot] | |
) | |
# νμΌ μ λ‘λμ μλ λΆμ | |
file_upload.change( | |
lambda: "νμΌ λΆμμ μμν©λλ€...", | |
outputs=msg | |
).then( | |
chat, | |
inputs=[msg, chatbot, file_upload, system_message, max_tokens, temperature, top_p], | |
outputs=[msg, chatbot] | |
) | |
if __name__ == "__main__": | |
demo.launch() |