|
import os |
|
from dotenv import load_dotenv |
|
import gradio as gr |
|
from huggingface_hub import InferenceClient |
|
import pandas as pd |
|
from typing import List, Tuple |
|
import json |
|
from datetime import datetime |
|
|
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN") |
|
|
|
|
|
LLM_MODELS = { |
|
"Cohere c4ai-crp-08-2024": "CohereForAI/c4ai-command-r-plus-08-2024", |
|
"Meta Llama3.3-70B": "meta-llama/Llama-3.3-70B-Instruct" |
|
} |
|
|
|
class ChatHistory: |
|
def __init__(self): |
|
self.history = [] |
|
self.history_file = "/tmp/chat_history.json" |
|
self.load_history() |
|
|
|
def add_conversation(self, user_msg: str, assistant_msg: str): |
|
conversation = { |
|
"timestamp": datetime.now().isoformat(), |
|
"messages": [ |
|
{"role": "user", "content": user_msg}, |
|
{"role": "assistant", "content": assistant_msg} |
|
] |
|
} |
|
self.history.append(conversation) |
|
self.save_history() |
|
|
|
def format_for_display(self): |
|
|
|
formatted = [] |
|
for conv in self.history: |
|
formatted.append([ |
|
conv["messages"][0]["content"], |
|
conv["messages"][1]["content"] |
|
]) |
|
return formatted |
|
|
|
def get_messages_for_api(self): |
|
|
|
messages = [] |
|
for conv in self.history: |
|
messages.extend([ |
|
{"role": "user", "content": conv["messages"][0]["content"]}, |
|
{"role": "assistant", "content": conv["messages"][1]["content"]} |
|
]) |
|
return messages |
|
|
|
def clear_history(self): |
|
self.history = [] |
|
self.save_history() |
|
|
|
def save_history(self): |
|
try: |
|
with open(self.history_file, 'w', encoding='utf-8') as f: |
|
json.dump(self.history, f, ensure_ascii=False, indent=2) |
|
except Exception as e: |
|
print(f"νμ€ν 리 μ μ₯ μ€ν¨: {e}") |
|
|
|
def load_history(self): |
|
try: |
|
if os.path.exists(self.history_file): |
|
with open(self.history_file, 'r', encoding='utf-8') as f: |
|
self.history = json.load(f) |
|
except Exception as e: |
|
print(f"νμ€ν 리 λ‘λ μ€ν¨: {e}") |
|
self.history = [] |
|
|
|
|
|
|
|
chat_history = ChatHistory() |
|
|
|
def get_client(model_name="Cohere c4ai-crp-08-2024"): |
|
try: |
|
return InferenceClient(LLM_MODELS[model_name], token=HF_TOKEN) |
|
except Exception: |
|
return InferenceClient(LLM_MODELS["Meta Llama3.3-70B"], token=HF_TOKEN) |
|
|
|
def analyze_file_content(content, file_type): |
|
"""Analyze file content and return structural summary""" |
|
if file_type in ['parquet', 'csv']: |
|
try: |
|
lines = content.split('\n') |
|
header = lines[0] |
|
columns = header.count('|') - 1 |
|
rows = len(lines) - 3 |
|
return f"π λ°μ΄ν°μ
ꡬ쑰: {columns}κ° μ»¬λΌ, {rows}κ° λ°μ΄ν°" |
|
except: |
|
return "β λ°μ΄ν°μ
ꡬ쑰 λΆμ μ€ν¨" |
|
|
|
lines = content.split('\n') |
|
total_lines = len(lines) |
|
non_empty_lines = len([line for line in lines if line.strip()]) |
|
|
|
if any(keyword in content.lower() for keyword in ['def ', 'class ', 'import ', 'function']): |
|
functions = len([line for line in lines if 'def ' in line]) |
|
classes = len([line for line in lines if 'class ' in line]) |
|
imports = len([line for line in lines if 'import ' in line or 'from ' in line]) |
|
return f"π» μ½λ ꡬ쑰: {total_lines}μ€ (ν¨μ: {functions}, ν΄λμ€: {classes}, μν¬νΈ: {imports})" |
|
|
|
paragraphs = content.count('\n\n') + 1 |
|
words = len(content.split()) |
|
return f"π λ¬Έμ ꡬ쑰: {total_lines}μ€, {paragraphs}λ¨λ½, μ½ {words}λ¨μ΄" |
|
|
|
def read_uploaded_file(file): |
|
if file is None: |
|
return "", "" |
|
try: |
|
file_ext = os.path.splitext(file.name)[1].lower() |
|
|
|
if file_ext == '.parquet': |
|
df = pd.read_parquet(file.name, engine='pyarrow') |
|
content = df.head(10).to_markdown(index=False) |
|
return content, "parquet" |
|
elif file_ext == '.csv': |
|
encodings = ['utf-8', 'cp949', 'euc-kr', 'latin1'] |
|
for encoding in encodings: |
|
try: |
|
df = pd.read_csv(file.name, encoding=encoding) |
|
content = f"π λ°μ΄ν° 미리보기:\n{df.head(10).to_markdown(index=False)}\n\n" |
|
content += f"\nπ λ°μ΄ν° μ 보:\n" |
|
content += f"- μ 체 ν μ: {len(df)}\n" |
|
content += f"- μ 체 μ΄ μ: {len(df.columns)}\n" |
|
content += f"- μ»¬λΌ λͺ©λ‘: {', '.join(df.columns)}\n" |
|
content += f"\nπ μ»¬λΌ λ°μ΄ν° νμ
:\n" |
|
for col, dtype in df.dtypes.items(): |
|
content += f"- {col}: {dtype}\n" |
|
null_counts = df.isnull().sum() |
|
if null_counts.any(): |
|
content += f"\nβ οΈ κ²°μΈ‘μΉ:\n" |
|
for col, null_count in null_counts[null_counts > 0].items(): |
|
content += f"- {col}: {null_count}κ° λλ½\n" |
|
return content, "csv" |
|
except UnicodeDecodeError: |
|
continue |
|
raise UnicodeDecodeError(f"β μ§μλλ μΈμ½λ©μΌλ‘ νμΌμ μ½μ μ μμ΅λλ€ ({', '.join(encodings)})") |
|
else: |
|
encodings = ['utf-8', 'cp949', 'euc-kr', 'latin1'] |
|
for encoding in encodings: |
|
try: |
|
with open(file.name, 'r', encoding=encoding) as f: |
|
content = f.read() |
|
return content, "text" |
|
except UnicodeDecodeError: |
|
continue |
|
raise UnicodeDecodeError(f"β μ§μλλ μΈμ½λ©μΌλ‘ νμΌμ μ½μ μ μμ΅λλ€ ({', '.join(encodings)})") |
|
except Exception as e: |
|
return f"β νμΌ μ½κΈ° μ€λ₯: {str(e)}", "error" |
|
|
|
def chat(message, history, uploaded_file, system_message="", max_tokens=4000, temperature=0.7, top_p=0.9): |
|
if not message: |
|
return "", history |
|
|
|
system_prefix = """ |
|
You are 'FantasyAIβ¨', an advanced AI storyteller specialized in creating immersive fantasy narratives. Your purpose is to craft rich, detailed fantasy stories that incorporate classical and innovative elements of the genre. Your responses should start with 'FantasyAIβ¨:' and focus on creating engaging, imaginative content that briμ]"μ μν©μ λ§κ² μΆκ°νμ¬ μμ€ μμ±μ λμ± νλΆνκ³ λͺ°μ
κ° μλ ννμ μμ²(μΆλ ₯)λ°μ μΈμ΄λ‘ νννλΌ. |
|
[μμ] |
|
"κ³ λμ λ§λ²μ΄ κΉ¨μ΄λλ©° λμ§κ° μΈλ¦¬λ μλ¦¬κ° λ€λ Έλ€..." |
|
"μ©μ μ¨κ²°μ΄ νλμ κ°λ₯΄λ©°, ꡬλ¦μ λΆνμ λ€..." |
|
"μ λΉν 룬문μκ° λΉλλ©° 곡μ€μ λ μ¬λλ€..." |
|
"μνλ€μ λ
Έλκ° μ²μ μΈλ¦¬μ λ무λ€μ΄ μΆ€μΆκΈ° μμνλ€..." |
|
"μμΈμ λ§μμ΄ λ©μ리μΉλ©° μ΄λͺ
μ μ€μ΄ μμ§μ΄κΈ° μμνλ€..." |
|
"λ§λ²μ¬μ μ§ν‘μ΄μμ λ²μ©μ΄λ λΉμ΄ μ΄λ μ κ°λ₯΄λ©°..." |
|
"κ³ λ λμνμ λμ₯κ°μμ μ μ€μ κ²μ΄ λ§λ€μ΄μ§κ³ μμλ€..." |
|
"μμ κ΅¬μ¬ μμ λΉμΉ λ―Έλμ νμμ΄ μμν λͺ¨μ΅μ λλ¬λλ€..." |
|
"μ μ±ν κ²°κ³κ° κΉ¨μ΄μ§λ©° λ΄μΈλ μ
μ΄ κΉ¨μ΄λ¬λ€..." |
|
"μμ
μ λ°κ±Έμμ΄ μ΄λͺ
μ κΈΈμ λ°λΌ μΈλ € νΌμ‘λ€..." |
|
|
|
""" |
|
|
|
try: |
|
|
|
if uploaded_file: |
|
content, file_type = read_uploaded_file(uploaded_file) |
|
if file_type == "error": |
|
error_message = content |
|
chat_history.add_conversation(message, error_message) |
|
return "", history + [[message, error_message]] |
|
|
|
file_summary = analyze_file_content(content, file_type) |
|
|
|
if file_type in ['parquet', 'csv']: |
|
system_message += f"\n\nνμΌ λ΄μ©:\n```markdown\n{content}\n```" |
|
else: |
|
system_message += f"\n\nνμΌ λ΄μ©:\n```\n{content}\n```" |
|
|
|
if message == "νμΌ λΆμμ μμν©λλ€...": |
|
message = f"""[νμΌ κ΅¬μ‘° λΆμ] {file_summary} |
|
λ€μ κ΄μ μμ λμμ λλ¦¬κ² μ΅λλ€: |
|
1. π μ λ°μ μΈ λ΄μ© νμ
|
|
2. π‘ μ£Όμ νΉμ§ μ€λͺ
|
|
3. π― μ€μ©μ μΈ νμ© λ°©μ |
|
4. β¨ κ°μ μ μ |
|
5. π¬ μΆκ° μ§λ¬Έμ΄λ νμν μ€λͺ
""" |
|
|
|
|
|
messages = [{"role": "system", "content": system_prefix + system_message}] |
|
|
|
|
|
if history: |
|
for user_msg, assistant_msg in history: |
|
messages.append({"role": "user", "content": user_msg}) |
|
messages.append({"role": "assistant", "content": assistant_msg}) |
|
|
|
messages.append({"role": "user", "content": message}) |
|
|
|
|
|
client = get_client() |
|
partial_message = "" |
|
|
|
for msg in client.chat_completion( |
|
messages, |
|
max_tokens=max_tokens, |
|
stream=True, |
|
temperature=temperature, |
|
top_p=top_p, |
|
): |
|
token = msg.choices[0].delta.get('content', None) |
|
if token: |
|
partial_message += token |
|
current_history = history + [[message, partial_message]] |
|
yield "", current_history |
|
|
|
|
|
chat_history.add_conversation(message, partial_message) |
|
|
|
except Exception as e: |
|
error_msg = f"β μ€λ₯κ° λ°μνμ΅λλ€: {str(e)}" |
|
chat_history.add_conversation(message, error_msg) |
|
yield "", history + [[message, error_msg]] |
|
|
|
with gr.Blocks(theme="Yntec/HaleyCH_Theme_Orange", title="GiniGEN π€") as demo: |
|
|
|
initial_history = chat_history.format_for_display() |
|
with gr.Row(): |
|
with gr.Column(scale=2): |
|
chatbot = gr.Chatbot( |
|
value=initial_history, |
|
height=600, |
|
label="λνμ°½ π¬", |
|
show_label=True |
|
) |
|
|
|
|
|
msg = gr.Textbox( |
|
label="λ©μμ§ μ
λ ₯", |
|
show_label=False, |
|
placeholder="무μμ΄λ λ¬Όμ΄λ³΄μΈμ... π", |
|
container=False |
|
) |
|
with gr.Row(): |
|
clear = gr.ClearButton([msg, chatbot], value="λνλ΄μ© μ§μ°κΈ°") |
|
send = gr.Button("보λ΄κΈ° π€") |
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown("### GiniGEN π€ [νμΌ μ
λ‘λ] π\nμ§μ νμ: ν
μ€νΈ, μ½λ, CSV, Parquet νμΌ") |
|
file_upload = gr.File( |
|
label="νμΌ μ ν", |
|
file_types=["text", ".csv", ".parquet"], |
|
type="filepath" |
|
) |
|
|
|
with gr.Accordion("κ³ κΈ μ€μ βοΈ", open=False): |
|
system_message = gr.Textbox(label="μμ€ν
λ©μμ§ π", value="") |
|
max_tokens = gr.Slider(minimum=1, maximum=8000, value=4000, label="μ΅λ ν ν° μ π") |
|
temperature = gr.Slider(minimum=0, maximum=1, value=0.7, label="μ°½μμ± μμ€ π‘οΈ") |
|
top_p = gr.Slider(minimum=0, maximum=1, value=0.9, label="μλ΅ λ€μμ± π") |
|
|
|
|
|
gr.Examples( |
|
examples=[ |
|
["ν₯λ―Έλ‘μ΄ μμ¬ 10κ°μ§λ₯Ό μ μν΄μ€μ π€"], |
|
["λμ± νμμ μ΄κ³ μμΈν λ¬μ¬λ₯Ό μμΈνν΄μ€μ π"], |
|
["μ΄μΈκ²(λ€λ₯Έ μ°¨μμ μΈμ) λ°°κ²½μΌλ‘ ν΄μ€μ π―"], |
|
["μμ§μλ μλλ‘ νμμν μμ¬λ‘ μμ±μ± β¨"], |
|
["κ³μ μ΄μ΄μ μμ±ν΄μ€ π€"], |
|
], |
|
inputs=msg, |
|
) |
|
|
|
|
|
def clear_chat(): |
|
chat_history.clear_history() |
|
return None, None |
|
|
|
|
|
msg.submit( |
|
chat, |
|
inputs=[msg, chatbot, file_upload, system_message, max_tokens, temperature, top_p], |
|
outputs=[msg, chatbot] |
|
) |
|
|
|
send.click( |
|
chat, |
|
inputs=[msg, chatbot, file_upload, system_message, max_tokens, temperature, top_p], |
|
outputs=[msg, chatbot] |
|
) |
|
|
|
clear.click( |
|
clear_chat, |
|
outputs=[msg, chatbot] |
|
) |
|
|
|
|
|
file_upload.change( |
|
lambda: "νμΌ λΆμμ μμν©λλ€...", |
|
outputs=msg |
|
).then( |
|
chat, |
|
inputs=[msg, chatbot, file_upload, system_message, max_tokens, temperature, top_p], |
|
outputs=[msg, chatbot] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |