import torch
import os
from transformers import AutoModelForCausalLM, GemmaTokenizerFast, TextIteratorStreamer, AutoTokenizer
from interface import GemmaLLMInterface
from llama_index.core.node_parser import SentenceSplitter
from llama_index.embeddings.instructor import InstructorEmbedding
import gradio as gr
from llama_index.core import Settings, VectorStoreIndex, SimpleDirectoryReader, ChatPromptTemplate, PromptTemplate, load_index_from_storage, StorageContext
from llama_index.core.node_parser import SentenceSplitter
import spaces
from huggingface_hub import login
from llama_index.core.memory import ChatMemoryBuffer
from typing import Iterator, List, Any
from llama_index.core.chat_engine import CondensePlusContextChatEngine
from llama_index.core.llms import ChatMessage, MessageRole , CompletionResponse
from IPython.display import Markdown, display
import keras
import keras_nlp
#from langchain.embeddings.huggingface import HuggingFaceEmbeddings
#from llama_index import LangchainEmbedding, ServiceContext
# Set the backbend before importing Keras
#os.environ["KERAS_BACKEND"] = "jax"
# Avoid memory fragmentation on JAX backend.
#os.environ["XLA_PYTHON_CLIENT_MEM_FRACTION"] = "1.00"
#os.getenv("KAGGLE_USERNAME")
#os.getenv["KAGGLE_KEY"]
"""huggingface_token = os.getenv("HUGGINGFACE_TOKEN")
login(huggingface_token)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")"""
# Let's load Gemma using Keras
gemma_model_id = "gemma2_instruct_2b_en"
gemma = keras_nlp.models.GemmaCausalLM.from_preset(gemma_model_id)
# what models will be used by LlamaIndex:
Settings.embed_model = InstructorEmbedding(model_name="hkunlp/instructor-base")
#Settings.embed_model = LangchainEmbedding(HuggingFaceEmbeddings(model_name='sentence-transformers/all-MiniLM-L6-v2'))
#Settings.llm = GemmaLLMInterface()
Settings.llm = GemmaLLMInterface(model=gemma)
documents_paths = {
'blockchain': 'data/blockchainprova.txt',
'metaverse': 'data/metaverseprova.txt',
'payment': 'data/paymentprova.txt'
}
global session_state
session_state = {"index": False,
"documents_loaded": False,
"document_db": None,
"original_message": None,
"clarification": False}
PERSIST_DIR = "./db"
os.makedirs(PERSIST_DIR, exist_ok=True)
ISTR = "In italiano, chiedi molto brevemente se la domanda si riferisce agli 'Osservatori Blockchain', 'Osservatori Payment' oppure 'Osservatori Metaverse'."
############################---------------------------------
# Get the parser
parser = SentenceSplitter.from_defaults(
chunk_size=256, chunk_overlap=64, paragraph_separator="\n\n"
)
def build_index(path: str):
# Load documents from a file
documents = SimpleDirectoryReader(input_files=[path]).load_data()
# Parse the documents into nodes
nodes = parser.get_nodes_from_documents(documents)
# Build the vector store index from the nodes
index = VectorStoreIndex(nodes)
#storage_context = StorageContext.from_defaults()
#index.storage_context.persist(persist_dir=PERSIST_DIR)
return index
# define prompt viewing function
def display_prompt_dict(prompts_dict):
for k, p in prompts_dict.items():
text_md = f"**Prompt Key**: {k}
" f"**Text:**
"
display(Markdown(text_md))
print(p.get_template())
display(Markdown("
"))
@spaces.GPU(duration=15)
def handle_query(query_str: str,
chat_history: list[tuple[str, str]]) -> Iterator[str]:
index= build_index("data/blockchainprova.txt")
conversation: List[ChatMessage] = []
for user, assistant in chat_history:
conversation.extend([
ChatMessage(role=MessageRole.USER, content=user),
ChatMessage(role=MessageRole.ASSISTANT, content=assistant),
]
)
try:
memory = ChatMemoryBuffer.from_defaults(token_limit=1500)
"""chat_engine = index.as_chat_engine(
chat_mode="condense_plus_context",
memory=memory,
similarity_top_k=3,
response_mode= "tree_summarize", #Good for summarization purposes
context_prompt = (
"Sei un assistente Q&A italiano di nome Odi, che risponde solo alle domande o richieste pertinenti in modo preciso."
" Quando un utente ti chiede informazioni su di te o sul tuo creatore puoi dire che sei un assistente ricercatore creato dagli Osservatori Digitali e fornire gli argomenti di cui sei esperto."
" Ecco i documenti rilevanti per il contesto:\n"
"{context_str}"
"\nIstruzione: Usa la cronologia della chat, o il contesto sopra, per interagire e aiutare l'utente a rispondere alla sua domanda."
),
verbose=False,
)"""
chat_engine = index.as_chat_engine(
chat_mode="context",
similarity_top_k=3,
memory=memory,
system_prompt=(
"Sei un assistente Q&A italiano di nome Odi, che risponde solo alle domande o richieste pertinenti in modo preciso."
" Usa la cronologia della chat, o il contesto fornito, per interagire e aiutare l'utente a rispondere alla sua domanda."
),
)
"""retriever = index.as_retriever(similarity_top_k=3)
# Let's test it out
relevant_chunks = relevant_chunks = retriever.retrieve(query_str)
print(f"Found: {len(relevant_chunks)} relevant chunks")
for idx, chunk in enumerate(relevant_chunks):
info_message += f"{idx + 1}) {chunk.text[:64]}...\n"
print(info_message)
gr.Info(info_message)"""
#chat_engine.reset()
outputs = []
#response = query_engine.query(query_str)
response = chat_engine.stream_chat(query_str, chat_history=conversation)
sources = [] # Use a list to collect multiple sources if present
#response = chat_engine.chat(query_str)
for token in response.response_gen:
if token.startswith("assistant:"):
# Remove the "assistant:" prefix
outputs.append(token[len("assistant:"):])
print(f"Generated token: {token}")
yield "".join(outputs)
#yield CompletionResponse(text=''.join(outputs), delta=token)
"""if sources:
sources_str = ", ".join(sources)
outputs.append(f"Fonti utilizzate: {sources_str}")
else:
outputs.append("Nessuna fonte specifica utilizzata.")
yield "".join(outputs)"""
except Exception as e:
yield f"Error processing query: {str(e)}"