from fastapi import FastAPI, HTTPException from fastapi.responses import StreamingResponse from fastapi.responses import JSONResponse from pydantic import BaseModel from huggingface_hub import InferenceClient import uvicorn from typing import Generator import json # Asegúrate de que esta línea esté al principio del archivo import nltk import os import google.protobuf # This line should execute without errors if protobuf is installed correctly import sentencepiece from transformers import pipeline, AutoTokenizer,AutoModelForSeq2SeqLM nltk.data.path.append(os.getenv('NLTK_DATA')) app = FastAPI() # Initialize the InferenceClient with your model client = InferenceClient("mistralai/Mistral-7B-Instruct-v0.2") # summarizer = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6") class Item(BaseModel): prompt: str history: list system_prompt: str temperature: float = 0.8 max_new_tokens: int = 12000 top_p: float = 0.15 repetition_penalty: float = 1.0 def format_prompt(current_prompt, history): formatted_history = "" for entry in history: if entry["role"] == "user": formatted_history += f"[USER] {entry['content']} [/USER]" elif entry["role"] == "assistant": formatted_history += f"[ASSISTANT] {entry['content']} [/ASSISTANT]" formatted_history += f"[USER] {current_prompt} [/USER]" return formatted_history def generate_stream(item: Item) -> Generator[bytes, None, None]: formatted_prompt = format_prompt(f"{item.system_prompt}, {item.prompt}", item.history) # Estimate token count for the formatted_prompt input_token_count = len(nltk.word_tokenize(formatted_prompt)) # NLTK tokenization # Ensure total token count doesn't exceed the maximum limit max_tokens_allowed = 32768 max_new_tokens_adjusted = max(1, min(item.max_new_tokens, max_tokens_allowed - input_token_count)) generate_kwargs = { "temperature": item.temperature, "max_new_tokens": max_new_tokens_adjusted, "top_p": item.top_p, "repetition_penalty": item.repetition_penalty, "do_sample": True, "seed": 42, } # Stream the response from the InferenceClient for response in client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True): # This assumes 'details=True' gives you a structure where you can access the text like this chunk = { "text": response.token.text, "complete": response.generated_text is not None # Adjust based on how you detect completion } yield json.dumps(chunk).encode("utf-8") + b"\n" class SummarizeRequest(BaseModel): text: str @app.post("/generate/") async def generate_text(item: Item): # Stream response back to the client return StreamingResponse(generate_stream(item), media_type="application/x-ndjson") def split_text_by_tokens(text, max_tokens=1024): print("Tokenizing text...") tokens = tokenizer.tokenize(text) chunks = [] token_counts = [] for i in range(0, len(tokens), max_tokens): chunk = tokenizer.convert_tokens_to_string(tokens[i:i+max_tokens]) chunks.append(chunk) token_counts.append(len(tokenizer.encode(chunk))) # Count tokens of the current chunk print("Tokenization complete.") return chunks, token_counts # Load the tokenizer and model from Hugging Face Hub tokenizer = AutoTokenizer.from_pretrained("nsi319/legal-pegasus") model = AutoModelForSeq2SeqLM.from_pretrained("nsi319/legal-pegasus") def summarize_legal_text(text): # Ensure the text is within the maximum length limit for the model inputs = tokenizer.encode(text, return_tensors='pt', max_length=1024, truncation=True) # Generate summary summary_ids = model.generate( inputs, num_beams=5, no_repeat_ngram_size=3, length_penalty=1.0, min_length=150, max_length=1000, early_stopping=True ) # Decode generated tokens to a string summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True) return summary class SummarizeRequest(BaseModel): text: str @app.post("/summarize") async def summarize_text(request: SummarizeRequest): try: # Use the newly defined summarization function summarized_text = summarize_legal_text(request.text) return JSONResponse(content={"summary": summarized_text}) except Exception as e: print(f"Error during summarization: {e}") raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)