Spaces:
Running
Running
import torch | |
from PIL import Image | |
import gradio as gr | |
import spaces | |
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer | |
import os | |
from threading import Thread | |
from pydantic import BaseModel | |
from typing import Optional | |
import io | |
from io import BytesIO | |
import pymupdf | |
import docx | |
from pptx import Presentation | |
from fastapi import FastAPI, File, Form, UploadFile, HTTPException | |
from fastapi.responses import HTMLResponse | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.responses import StreamingResponse | |
from fastapi.responses import PlainTextResponse | |
import uvicorn | |
import json | |
app = FastAPI() | |
async def test_endpoint(message: dict): | |
if "text" not in message: | |
raise HTTPException(status_code=400, detail="Missing 'text' in request body") | |
response = {"message": f"Received your message: {message['text']}"} | |
return response | |
MODEL_LIST = ["nikravan/glm-4vq"] | |
HF_TOKEN = os.environ.get("HF_TOKEN", None) | |
MODEL_ID = MODEL_LIST[0] | |
MODEL_NAME = "GLM-4vq" | |
TITLE = "<h1>AI CHAT DOCS</h1>" | |
DESCRIPTION = f""" | |
<center> | |
<p> | |
<br> | |
USANDO MODELO: <a href="https://hf.co/nikravan/glm-4vq">{MODEL_NAME}</a> | |
</center>""" | |
CSS = """ | |
h1 { | |
text-align: center; | |
display: block; | |
} | |
""" | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True) | |
def extract_text(path): | |
return open(path, 'r').read() | |
def extract_pdf(path): | |
doc = pymupdf.open(path) | |
text = "" | |
for page in doc: | |
text += page.get_text() | |
return text | |
def extract_docx(path): | |
doc = docx.Document(path) | |
data = [] | |
for paragraph in doc.paragraphs: | |
data.append(paragraph.text) | |
content = '\n\n'.join(data) | |
return content | |
def extract_pptx(path): | |
prs = Presentation(path) | |
text = "" | |
for slide in prs.slides: | |
for shape in slide.shapes: | |
if hasattr(shape, "text"): | |
text += shape.text + "\n" | |
return text | |
# def mode_load(path): | |
# choice = "" | |
# file_type = path.split(".")[-1] | |
# print(file_type) | |
# if file_type in ["pdf", "txt", "py", "docx", "pptx", "json", "cpp", "md"]: | |
# if file_type.endswith("pdf"): | |
# content = extract_pdf(path) | |
# elif file_type.endswith("docx"): | |
# content = extract_docx(path) | |
# elif file_type.endswith("pptx"): | |
# content = extract_pptx(path) | |
# else: | |
# content = extract_text(path) | |
# choice = "doc" | |
# print(content[:100]) | |
# return choice, content[:5000] | |
# elif file_type in ["png", "jpg", "jpeg", "bmp", "tiff", "webp"]: | |
# content = Image.open(path).convert('RGB') | |
# choice = "image" | |
# return choice, content | |
# else: | |
# raise gr.Error("Oops, unsupported files.") | |
def mode_load(path): | |
choice = "" | |
file_type = path.split(".")[-1].lower() | |
print(file_type) | |
if file_type in ["pdf", "txt", "py", "docx", "pptx", "json", "cpp", "md"]: | |
if file_type == "pdf": | |
content = extract_pdf(path) | |
elif file_type == "docx": | |
content = extract_docx(path) | |
elif file_type == "pptx": | |
content = extract_pptx(path) | |
else: | |
content = extract_text(path) | |
choice = "doc" | |
print(content[:100]) | |
return choice, content[:5000] | |
elif file_type in ["png", "jpg", "jpeg", "bmp", "tiff", "webp"]: | |
try: | |
content = Image.open(path).convert('RGB') | |
choice = "image" | |
return choice, content | |
except Exception as e: | |
raise ValueError(f"Error processing image file: {e}") | |
else: | |
raise ValueError("Oops, unsupported file type.") | |
def stream_chat(message, history: list, temperature: float, max_length: int, top_p: float, top_k: int, penalty: float): | |
model = AutoModelForCausalLM.from_pretrained( | |
MODEL_ID, | |
torch_dtype=torch.bfloat16, | |
low_cpu_mem_usage=True, | |
trust_remote_code=True | |
) | |
print(f'message is - {message}') | |
print(f'history is - {history}') | |
conversation = [] | |
prompt_files = [] | |
if message["files"]: | |
choice, contents = mode_load(message["files"][-1]) | |
if choice == "image": | |
conversation.append({"role": "user", "image": contents, "content": message['text']}) | |
elif choice == "doc": | |
format_msg = contents + "\n\n\n" + "{} files uploaded.\n" + message['text'] | |
conversation.append({"role": "user", "content": format_msg}) | |
else: | |
if len(history) == 0: | |
# raise gr.Error("Please upload an image first.") | |
contents = None | |
conversation.append({"role": "user", "content": message['text']}) | |
else: | |
# image = Image.open(history[0][0][0]) | |
for prompt, answer in history: | |
if answer is None: | |
prompt_files.append(prompt[0]) | |
conversation.extend([{"role": "user", "content": ""}, {"role": "assistant", "content": ""}]) | |
else: | |
conversation.extend([{"role": "user", "content": prompt}, {"role": "assistant", "content": answer}]) | |
if len(prompt_files) > 0: | |
choice, contents = mode_load(prompt_files[-1]) | |
else: | |
choice = "" | |
conversation.append({"role": "user", "image": "", "content": message['text']}) | |
if choice == "image": | |
conversation.append({"role": "user", "image": contents, "content": message['text']}) | |
elif choice == "doc": | |
format_msg = contents + "\n\n\n" + "{} files uploaded.\n" + message['text'] | |
conversation.append({"role": "user", "content": format_msg}) | |
print(f"Conversation is -\n{conversation}") | |
input_ids = tokenizer.apply_chat_template(conversation, tokenize=True, add_generation_prompt=True, | |
return_tensors="pt", return_dict=True).to(model.device) | |
streamer = TextIteratorStreamer(tokenizer, timeout=60.0, skip_prompt=True, skip_special_tokens=True) | |
generate_kwargs = dict( | |
max_length=max_length, | |
streamer=streamer, | |
do_sample=True, | |
top_p=top_p, | |
top_k=top_k, | |
temperature=temperature, | |
repetition_penalty=penalty, | |
eos_token_id=[151329, 151336, 151338], | |
) | |
gen_kwargs = {**input_ids, **generate_kwargs} | |
with torch.no_grad(): | |
thread = Thread(target=model.generate, kwargs=gen_kwargs) | |
thread.start() | |
buffer = "" | |
for new_text in streamer: | |
buffer += new_text | |
yield buffer | |
chatbot = gr.Chatbot( | |
#rtl=True, | |
) | |
chat_input = gr.MultimodalTextbox( | |
interactive=True, | |
placeholder="Enter message or upload a file ...", | |
show_label=False, | |
#rtl=True, | |
) | |
EXAMPLES = [ | |
[{"text": "Resumir Documento"}], | |
[{"text": "Explicar la Imagen"}], | |
[{"text": "¿De qué es la foto?", "files": ["perro.jpg"]}], | |
[{"text": "Quiero armar un JSON, solo el JSON sin texto, que contenga los datos de la primera mitad de la tabla de la imagen (las primeras 10 jurisdicciones 901-910). Ten en cuenta que los valores numéricos son decimales de cuatro dígitos. La tabla contiene las siguientes columnas: Codigo, Nombre, Fecha Inicio, Fecha Cese, Coeficiente Ingresos, Coeficiente Gastos y Coeficiente Unificado. La tabla puede contener valores vacíos, en ese caso dejarlos como null. Cada fila de la tabla representa una jurisdicción con sus respectivos valores.", }] | |
] | |
def simple_chat(message, temperature: float = 0.8, max_length: int = 4096, top_p: float = 1, top_k: int = 10, penalty: float = 1.0): | |
model = AutoModelForCausalLM.from_pretrained( | |
MODEL_ID, | |
torch_dtype=torch.bfloat16, | |
low_cpu_mem_usage=True, | |
trust_remote_code=True | |
) | |
print(f'Message is - {message}') | |
conversation = [] | |
if message["file_content"]: | |
with open(message['file_name'], 'wb') as temp_file: | |
temp_file.write(message['file_content'].read()) | |
temp_file_path = temp_file.name | |
choice, contents = mode_load(temp_file_path) | |
if choice == "image": | |
conversation.append({"role": "user", "image": contents, "content": message['text']}) | |
elif choice == "doc": | |
format_msg = contents + "\n\n\n" + "{} files uploaded.\n".format(message['file_name']) + message['text'] | |
conversation.append({"role": "user", "content": format_msg}) | |
else: | |
conversation.append({"role": "user", "content": message['text']}) | |
print(f"Conversation is -\n{conversation}") | |
input_ids = tokenizer.apply_chat_template(conversation, tokenize=True, add_generation_prompt=True, | |
return_tensors="pt", return_dict=True).to(model.device) | |
streamer = TextIteratorStreamer(tokenizer, timeout=60.0, skip_prompt=True, skip_special_tokens=True) | |
generate_kwargs = dict( | |
max_length=max_length, | |
streamer=streamer, | |
do_sample=True, | |
top_p=top_p, | |
top_k=top_k, | |
temperature=temperature, | |
repetition_penalty=penalty, | |
eos_token_id=[151329, 151336, 151338], | |
) | |
gen_kwargs = {**input_ids, **generate_kwargs} | |
buffer = "" | |
def generate_text(): | |
nonlocal buffer | |
with torch.no_grad(): | |
model.generate(**gen_kwargs) | |
for new_text in streamer: | |
buffer += new_text | |
thread = Thread(target=generate_text) | |
thread.start() | |
thread.join() | |
try: | |
buffer = buffer.strip() | |
buffer = buffer.strip('`') | |
buffer = buffer.strip('json') | |
json_content = json.loads(buffer) | |
formatted_text = format_json_to_string(json_content) | |
except json.JSONDecodeError: | |
print("entro error") | |
formatted_text = buffer | |
print("---------") | |
print("Respuesta: ") | |
print(" ") | |
print(formatted_text) | |
print(" ") | |
print("---------") | |
return PlainTextResponse(formatted_text) | |
def format_json_to_string(json_content): | |
return json.dumps(json_content, ensure_ascii=False) | |
async def test_endpoint( | |
text: str = Form(...), | |
file: UploadFile = File(None) | |
): | |
if file: | |
file_content = BytesIO(await file.read()) | |
file_name = file.filename | |
message = { | |
"text": text, | |
"file_content": file_content, | |
"file_name": file_name | |
} | |
else: | |
message = { | |
"text": text, | |
"file_content": None, | |
"file_name": None | |
} | |
print(message) | |
response = simple_chat(message) | |
return response | |
with gr.Blocks(css=CSS, theme="soft", fill_height=True) as demo: | |
gr.HTML(TITLE) | |
gr.HTML(DESCRIPTION) | |
gr.ChatInterface( | |
fn=stream_chat, | |
multimodal=True, | |
textbox=chat_input, | |
chatbot=chatbot, | |
fill_height=True, | |
additional_inputs_accordion=gr.Accordion(label="⚙️ Parameters", open=False, render=False), | |
additional_inputs=[ | |
gr.Slider( | |
minimum=0, | |
maximum=1, | |
step=0.1, | |
value=0.8, | |
label="Temperature", | |
render=False, | |
), | |
gr.Slider( | |
minimum=1024, | |
maximum=8192, | |
step=1, | |
value=4096, | |
label="Max Length", | |
render=False, | |
), | |
gr.Slider( | |
minimum=0.0, | |
maximum=1.0, | |
step=0.1, | |
value=1.0, | |
label="top_p", | |
render=False, | |
), | |
gr.Slider( | |
minimum=1, | |
maximum=20, | |
step=1, | |
value=10, | |
label="top_k", | |
render=False, | |
), | |
gr.Slider( | |
minimum=0.0, | |
maximum=2.0, | |
step=0.1, | |
value=1.0, | |
label="Repetition penalty", | |
render=False, | |
), | |
], | |
), | |
gr.Examples(EXAMPLES, [chat_input]) | |
if __name__ == "__main__": | |
app = gr.mount_gradio_app(app, demo, "/") | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |
#app.mount("/static", StaticFiles(directory="static", html=True), name="static") | |
# app = gr.mount_gradio_app(app, block, "/", gradio_api_url="http://localhost:7860/") | |
# uvicorn.run(app, host="0.0.0.0", port=7860) | |
demo.queue(api_open=False).launch(show_api=False, share=False, )#server_name="0.0.0.0", ) |