Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, HTTPException | |
from fastapi.responses import StreamingResponse | |
from fastapi.responses import JSONResponse | |
from pydantic import BaseModel | |
from huggingface_hub import InferenceClient | |
import uvicorn | |
from typing import Generator | |
import json # Asegúrate de que esta línea esté al principio del archivo | |
import nltk | |
import os | |
import google.protobuf # This line should execute without errors if protobuf is installed correctly | |
import sentencepiece | |
from transformers import pipeline, AutoTokenizer,AutoModelForSeq2SeqLM | |
nltk.data.path.append(os.getenv('NLTK_DATA')) | |
app = FastAPI() | |
# Initialize the InferenceClient with your model | |
client = InferenceClient("mistralai/Mistral-7B-Instruct-v0.2") | |
# summarizer = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6") | |
class Item(BaseModel): | |
prompt: str | |
history: list | |
system_prompt: str | |
temperature: float = 0.8 | |
max_new_tokens: int = 12000 | |
top_p: float = 0.15 | |
repetition_penalty: float = 1.0 | |
def format_prompt(current_prompt, history): | |
formatted_history = "<s>" | |
for entry in history: | |
if entry["role"] == "user": | |
formatted_history += f"[USER] {entry['content']} [/USER]" | |
elif entry["role"] == "assistant": | |
formatted_history += f"[ASSISTANT] {entry['content']} [/ASSISTANT]" | |
formatted_history += f"[USER] {current_prompt} [/USER]</s>" | |
return formatted_history | |
def generate_stream(item: Item) -> Generator[bytes, None, None]: | |
formatted_prompt = format_prompt(f"{item.system_prompt}, {item.prompt}", item.history) | |
# Estimate token count for the formatted_prompt | |
input_token_count = len(nltk.word_tokenize(formatted_prompt)) # NLTK tokenization | |
# Ensure total token count doesn't exceed the maximum limit | |
max_tokens_allowed = 32768 | |
max_new_tokens_adjusted = max(1, min(item.max_new_tokens, max_tokens_allowed - input_token_count)) | |
generate_kwargs = { | |
"temperature": item.temperature, | |
"max_new_tokens": max_new_tokens_adjusted, | |
"top_p": item.top_p, | |
"repetition_penalty": item.repetition_penalty, | |
"do_sample": True, | |
"seed": 42, | |
} | |
# Stream the response from the InferenceClient | |
for response in client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True): | |
# This assumes 'details=True' gives you a structure where you can access the text like this | |
chunk = { | |
"text": response.token.text, | |
"complete": response.generated_text is not None # Adjust based on how you detect completion | |
} | |
yield json.dumps(chunk).encode("utf-8") + b"\n" | |
class SummarizeRequest(BaseModel): | |
text: str | |
async def generate_text(item: Item): | |
# Stream response back to the client | |
return StreamingResponse(generate_stream(item), media_type="application/x-ndjson") | |
def split_text_by_tokens(text, max_tokens=1024): | |
print("Tokenizing text...") | |
tokens = tokenizer.tokenize(text) | |
chunks = [] | |
token_counts = [] | |
for i in range(0, len(tokens), max_tokens): | |
chunk = tokenizer.convert_tokens_to_string(tokens[i:i+max_tokens]) | |
chunks.append(chunk) | |
token_counts.append(len(tokenizer.encode(chunk))) # Count tokens of the current chunk | |
print("Tokenization complete.") | |
return chunks, token_counts | |
# Load the tokenizer and model from Hugging Face Hub | |
tokenizer = AutoTokenizer.from_pretrained("nsi319/legal-pegasus") | |
model = AutoModelForSeq2SeqLM.from_pretrained("nsi319/legal-pegasus") | |
class SummarizeRequest(BaseModel): | |
text: str | |
def chunk_text(text, max_length=1024): | |
"""Split the text into manageable parts for the model to handle.""" | |
words = text.split() | |
current_chunk = "" | |
chunks = [] | |
for word in words: | |
if len(tokenizer.encode(current_chunk + word)) < max_length: | |
current_chunk += word + ' ' | |
else: | |
chunks.append(current_chunk.strip()) | |
current_chunk = word + ' ' | |
chunks.append(current_chunk.strip()) # Add the last chunk | |
return chunks | |
def summarize_legal_text(text): | |
"""Generate summaries for each chunk and combine them.""" | |
chunks = chunk_text(text, max_length=900) # A bit less than 1024 to be safe | |
all_summaries = [] | |
for chunk in chunks: | |
inputs = tokenizer.encode(chunk, return_tensors='pt', max_length=1024, truncation=True) | |
summary_ids = model.generate( | |
inputs, | |
num_beams=5, | |
no_repeat_ngram_size=3, | |
length_penalty=1.0, | |
min_length=150, | |
max_length=300, # You can adjust this based on your needs | |
early_stopping=True | |
) | |
summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True) | |
all_summaries.append(summary) | |
return " ".join(all_summaries) | |
async def summarize_text(request: SummarizeRequest): | |
try: | |
summarized_text = summarize_legal_text(request.text) | |
return JSONResponse(content={"summary": summarized_text}) | |
except Exception as e: | |
print(f"Error during summarization: {e}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
if __name__ == "__main__": | |
uvicorn.run(app, host="0.0.0.0", port=8000) | |