Spaces:
Sleeping
Sleeping
import os | |
from threading import Thread | |
from typing import Iterator | |
import json | |
from datetime import datetime | |
from pathlib import Path | |
from uuid import uuid4 | |
import gradio as gr | |
import spaces | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer, AutoModel | |
from pathlib import Path | |
from pinecone.grpc import PineconeGRPC as Pinecone | |
import torch | |
from huggingface_hub import CommitScheduler | |
HF_UPLOAD = os.environ.get("HF_UPLOAD") | |
JSON_DATASET_DIR = Path("json_dataset") | |
JSON_DATASET_DIR.mkdir(parents=True, exist_ok=True) | |
JSON_DATASET_PATH = JSON_DATASET_DIR / f"train-{uuid4()}.json" | |
scheduler = CommitScheduler( | |
repo_id="psyche/llama3-mrc-chat-log", | |
repo_type="dataset", | |
folder_path=JSON_DATASET_DIR, | |
path_in_repo="data", | |
token=HF_UPLOAD | |
) | |
pc = Pinecone(api_key=os.environ.get("PINECONE")) | |
index = pc.Index("commonsense") | |
""" | |
device = torch.device("cuda" if torch.cuda.is_available() else "CPU") | |
retriever_tokenizer = AutoTokenizer.from_pretrained("psyche/dpr-longformer-ko-4096") | |
retriever = AutoModel.from_pretrained("psyche/dpr-longformer-ko-4096") | |
retriever.eval() | |
retriever.to(device) | |
""" | |
def save_json(question: str, answer: str) -> None: | |
with scheduler.lock: | |
with JSON_DATASET_PATH.open("a") as f: | |
json.dump({"question": question, "answer": answer, "datetime": datetime.now().isoformat(), "label":""}, f, ensure_ascii=False) | |
f.write("\n") | |
MAX_MAX_NEW_TOKENS = 8192 | |
DEFAULT_MAX_NEW_TOKENS = 4096 | |
MAX_INPUT_TOKEN_LENGTH = 2048 | |
DESCRIPTION = """\ | |
# Llama-3 8B Korean QA Chatbot \ | |
""" | |
if not torch.cuda.is_available(): | |
DESCRIPTION += "\n<p>Running on CPU π₯Ά This demo does not work on CPU.</p>" | |
if torch.cuda.is_available(): | |
model_id = "psyche/llama3-8b-instruct-ko" | |
#model_id = "psyche/meta-llama3-experiments" | |
model = AutoModelForCausalLM.from_pretrained(model_id, device_map="auto", load_in_4bit=True, revision="v4.3") | |
tokenizer = AutoTokenizer.from_pretrained(model_id, revision="v4.3") | |
def generate( | |
message: str, | |
chat_history: list[tuple[str, str]], | |
system_prompt: str, | |
max_new_tokens: int = 1024, | |
temperature: float = 0.6, | |
top_p: float = 0.9, | |
top_k: int = 50, | |
repetition_penalty: float = 1.2, | |
) -> Iterator[str]: | |
conversation = [] | |
if system_prompt: | |
conversation.append({"role": "system", "content": system_prompt}) | |
for user, assistant in chat_history: | |
conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}]) | |
""" | |
retriever_inputs = retriever_tokenizer([message], max_length=1024, truncation=True, return_tensors="pt") | |
retriever_inputs = {k:v.to(retriever.device) for k,v in retriever_inputs.items()} | |
with torch.no_grad(): | |
embeddings = retriever(**retriever_inputs).pooler_output | |
embeddings = embeddings.cpu().numpy() | |
results = index.query( | |
vector=embeddings[0], | |
top_k=1, | |
include_values=False, | |
include_metadata=True | |
) | |
results = [result for result in results["matches"] if result["score"] > 0.6] | |
if len(results) > 0: | |
message = results[0]["metadata"]["text"] + f"\n\nμ λ¬Έλ§₯μ μ°Έκ³ νμ¬ μ§λ¬Έ '{message}'μ λ΅νλ©΄?" | |
""" | |
conversation.append({"role": "user", "content": message }) | |
input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt", add_generation_prompt=True) | |
if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: | |
input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] | |
gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") | |
input_ids = input_ids.to(model.device) | |
streamer = TextIteratorStreamer(tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True) | |
generate_kwargs = dict( | |
{"input_ids": input_ids}, | |
streamer=streamer, | |
max_new_tokens=max_new_tokens, | |
do_sample=True, | |
top_p=top_p, | |
top_k=top_k, | |
temperature=temperature, | |
num_beams=1, | |
repetition_penalty=repetition_penalty, | |
) | |
t = Thread(target=model.generate, kwargs=generate_kwargs) | |
t.start() | |
outputs = [] | |
for text in streamer: | |
outputs.append(text) | |
yield "".join(outputs) | |
save_json(message, "".join(outputs)) | |
chat_interface = gr.ChatInterface( | |
fn=generate, | |
additional_inputs=[ | |
gr.Textbox(label="System prompt", lines=6), | |
gr.Slider( | |
label="Max new tokens", | |
minimum=1, | |
maximum=MAX_MAX_NEW_TOKENS, | |
step=1, | |
value=DEFAULT_MAX_NEW_TOKENS, | |
), | |
gr.Slider( | |
label="Temperature", | |
minimum=0.01, | |
maximum=4.0, | |
step=0.1, | |
value=0.01, | |
), | |
gr.Slider( | |
label="Top-p (nucleus sampling)", | |
minimum=0.05, | |
maximum=1.0, | |
step=0.05, | |
value=0.9, | |
), | |
gr.Slider( | |
label="Top-k", | |
minimum=1, | |
maximum=1000, | |
step=1, | |
value=50, | |
), | |
gr.Slider( | |
label="Repetition penalty", | |
minimum=1.0, | |
maximum=2.0, | |
step=0.05, | |
value=1.15, | |
), | |
], | |
stop_btn=None, | |
examples=[ | |
["μλ ?"], | |
["λκ° ν μ μλκ² λμΌ?"], | |
["νμ΄μ¬μ λν΄μ μλ €μ€"], | |
["λνλ―Όκ΅μ μλλ?"], | |
["λ λλ μ΄λλλΌ λ μ΄μΌ?"], | |
], | |
) | |
with gr.Blocks(css="style.css") as demo: | |
gr.Markdown(DESCRIPTION) | |
gr.DuplicateButton(value="Duplicate Space for private use", elem_id="duplicate-button") | |
chat_interface.render() | |
if __name__ == "__main__": | |
demo.queue(max_size=20).launch() | |