PacmanAI-2 / main.py
Marroco93's picture
no message
1d6eb67
raw
history blame
5.32 kB
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from huggingface_hub import InferenceClient
import uvicorn
from typing import Generator
import json # Asegúrate de que esta línea esté al principio del archivo
import nltk
import os
import google.protobuf # This line should execute without errors if protobuf is installed correctly
import sentencepiece
from transformers import pipeline, AutoTokenizer,AutoModelForSeq2SeqLM
nltk.data.path.append(os.getenv('NLTK_DATA'))
app = FastAPI()
# Initialize the InferenceClient with your model
client = InferenceClient("mistralai/Mistral-7B-Instruct-v0.2")
# summarizer = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6")
class Item(BaseModel):
prompt: str
history: list
system_prompt: str
temperature: float = 0.8
max_new_tokens: int = 12000
top_p: float = 0.15
repetition_penalty: float = 1.0
def format_prompt(current_prompt, history):
formatted_history = "<s>"
for entry in history:
if entry["role"] == "user":
formatted_history += f"[USER] {entry['content']} [/USER]"
elif entry["role"] == "assistant":
formatted_history += f"[ASSISTANT] {entry['content']} [/ASSISTANT]"
formatted_history += f"[USER] {current_prompt} [/USER]</s>"
return formatted_history
def generate_stream(item: Item) -> Generator[bytes, None, None]:
formatted_prompt = format_prompt(f"{item.system_prompt}, {item.prompt}", item.history)
# Estimate token count for the formatted_prompt
input_token_count = len(nltk.word_tokenize(formatted_prompt)) # NLTK tokenization
# Ensure total token count doesn't exceed the maximum limit
max_tokens_allowed = 32768
max_new_tokens_adjusted = max(1, min(item.max_new_tokens, max_tokens_allowed - input_token_count))
generate_kwargs = {
"temperature": item.temperature,
"max_new_tokens": max_new_tokens_adjusted,
"top_p": item.top_p,
"repetition_penalty": item.repetition_penalty,
"do_sample": True,
"seed": 42,
}
# Stream the response from the InferenceClient
for response in client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True):
# This assumes 'details=True' gives you a structure where you can access the text like this
chunk = {
"text": response.token.text,
"complete": response.generated_text is not None # Adjust based on how you detect completion
}
yield json.dumps(chunk).encode("utf-8") + b"\n"
class SummarizeRequest(BaseModel):
text: str
@app.post("/generate/")
async def generate_text(item: Item):
# Stream response back to the client
return StreamingResponse(generate_stream(item), media_type="application/x-ndjson")
def split_text_by_tokens(text, max_tokens=1024):
print("Tokenizing text...")
tokens = tokenizer.tokenize(text)
chunks = []
token_counts = []
for i in range(0, len(tokens), max_tokens):
chunk = tokenizer.convert_tokens_to_string(tokens[i:i+max_tokens])
chunks.append(chunk)
token_counts.append(len(tokenizer.encode(chunk))) # Count tokens of the current chunk
print("Tokenization complete.")
return chunks, token_counts
# Load the tokenizer and model from Hugging Face Hub
tokenizer = AutoTokenizer.from_pretrained("nsi319/legal-pegasus")
model = AutoModelForSeq2SeqLM.from_pretrained("nsi319/legal-pegasus")
class SummarizeRequest(BaseModel):
text: str
def chunk_text(text, max_length=1024):
"""Split the text into manageable parts for the model to handle."""
words = text.split()
current_chunk = ""
chunks = []
for word in words:
if len(tokenizer.encode(current_chunk + word)) < max_length:
current_chunk += word + ' '
else:
chunks.append(current_chunk.strip())
current_chunk = word + ' '
chunks.append(current_chunk.strip()) # Add the last chunk
return chunks
def summarize_legal_text(text):
"""Generate summaries for each chunk and combine them."""
chunks = chunk_text(text, max_length=900) # A bit less than 1024 to be safe
all_summaries = []
for chunk in chunks:
inputs = tokenizer.encode(chunk, return_tensors='pt', max_length=1024, truncation=True)
summary_ids = model.generate(
inputs,
num_beams=5,
no_repeat_ngram_size=3,
length_penalty=1.0,
min_length=150,
max_length=300, # You can adjust this based on your needs
early_stopping=True
)
summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True)
all_summaries.append(summary)
return " ".join(all_summaries)
@app.post("/summarize")
async def summarize_text(request: SummarizeRequest):
try:
summarized_text = summarize_legal_text(request.text)
return JSONResponse(content={"summary": summarized_text})
except Exception as e:
print(f"Error during summarization: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)