# app.py | |
import os | |
import re | |
import torch | |
# import pdfplumber | |
from chromadb.utils import embedding_functions | |
from rerankers import Reranker | |
# from transformers import GPT2TokenizerFast | |
from groq import Groq | |
from chromadb import PersistentClient | |
import gradio as gr | |
# Retrieve the API key from environment variables (Hugging Face Secrets) | |
groq_api_key = os.environ.get('GROQ_API_KEY') | |
# Initialize the chat client with the API key | |
chat_client = Groq(api_key=groq_api_key) | |
model = "llama-3.2-90b-text-preview" | |
def edit_text(text): | |
pattern = re.compile(r'\[(\d+)\]') | |
prev_num = None | |
output = [] | |
last_end = 0 | |
# Iterate over the citations using finditer | |
for match in pattern.finditer(text): | |
start, end = match.span() | |
current_num = match.group(1) | |
# Append text from the end of the last match to the start of the current match | |
output.append(text[last_end:start]) | |
if prev_num == current_num: | |
# Remove the previous citation by not appending it | |
pass # We effectively skip appending anything, which removes the duplicate | |
else: | |
# Append the current citation with 'Page ' added | |
output.append(f'[Page {current_num}]') | |
prev_num = current_num | |
last_end = end | |
# Append any remaining text after the last citation | |
output.append(text[last_end:]) | |
# Join all parts to form the modified text | |
modified_text = ''.join(output) | |
return modified_text | |
# def edit_text(text): | |
# # Find all citations and their positions | |
# citation_matches = list(re.finditer(r'\[(\d+)\]', text)) | |
# # List to store indices of citations to remove | |
# indices_to_remove = [] | |
# prev_num = None | |
# prev_index = None | |
# # Identify consecutive duplicate citations | |
# for i in range(len(citation_matches)): | |
# current_citation = citation_matches[i] | |
# current_num = current_citation.group(1) | |
# if prev_num == current_num: | |
# # Mark the previous citation for removal | |
# indices_to_remove.append(prev_index) | |
# prev_num = current_num | |
# prev_index = i | |
# # Reconstruct the text with modifications | |
# output_parts = [] | |
# last_end = 0 | |
# for i in range(len(citation_matches)): | |
# m = citation_matches[i] | |
# start, end = m.span() | |
# if i in indices_to_remove: | |
# # Remove citation | |
# output_parts.append(text[last_end:start]) | |
# else: | |
# # Keep and modify citation | |
# output_parts.append(text[last_end:start]) | |
# page_num = m.group(1) | |
# new_citation = '[Page ' + page_num + ']' | |
# output_parts.append(new_citation) | |
# last_end = end | |
# # Append any remaining text after the last citation | |
# output_parts.append(text[last_end:]) | |
# modified_text = ''.join(output_parts) | |
# return modified_text | |
# def parse_pdf(pdf_path): | |
# texts = [] | |
# with pdfplumber.open(pdf_path) as pdf: | |
# for page_num, page in enumerate(pdf.pages, start=1): | |
# text = page.extract_text() | |
# if text: | |
# texts.append({ | |
# 'text': text, | |
# 'metadata': { | |
# 'page_number': page_num | |
# } | |
# }) | |
# return texts | |
# def preprocess_text(text): | |
# # ... (same as your original function) | |
# text = re.sub(r'\s+', ' ', text) | |
# text = text.strip() | |
# return text | |
def call_Llama_api(query, context): | |
chat_completion = chat_client.chat.completions.create( | |
messages = [ | |
{ | |
"role": "system", | |
"content": "You are a car technician. Given the user's question and relevant excerpts from different car manuals, answer the question by including direct quotes from the car manual. Always cite the page number in [] as mentioned in the excerpts used (the ones you are quoting from).\n\nFor Example: User Question: What is storage capacity of Tesla Model 3? Relevant Excerpt(s) : Page 108:: Front storage is 50 litres. , Page 150:: Rear storage is 240 litres.\n Answer: Tesla model 3 front storage is 50 litres [108] and rear storage is 240 litres [150]. Therefore total storage is 50+240 = 290 litres.\n\nHope the example above helped. Only cite an excerpt when you are explicitly referencing it." | |
}, | |
{ | |
"role": "user", | |
"content": "User Question: " + query + "\n\nRelevant Excerpt(s):\n\n" + context, | |
} | |
], | |
temperature=0.6, | |
max_tokens=200, | |
top_p=1, | |
stream=False, | |
stop= None, | |
model = model | |
) | |
response = chat_completion.choices[0].message.content | |
return response | |
# def call_Llama_api(query, context): | |
# # ... (same as your original function) | |
# chat_completion = chat_client.chat.completions.create( | |
# messages=[ | |
# { | |
# "role": "system", | |
# "content": "You are a car technician. Given the user's question and relevant excerpts from different car manuals, answer the question by including direct quotes from the correct car manual. Be concise and to the point in your response." | |
# }, | |
# { | |
# "role": "user", | |
# "content": "User Question: " + query + "\n\nRelevant Excerpt(s):\n\n" + context, | |
# } | |
# ], | |
# temperature=0.6, | |
# max_tokens=200, | |
# top_p=1, | |
# stream=False, | |
# stop=None, | |
# model=model | |
# ) | |
# response = chat_completion.choices[0].message.content | |
# return response | |
# def chunk_texts(texts, max_tokens=500, overlap_tokens=50): | |
# """ | |
# Splits texts into chunks based on paragraphs with overlap to preserve context. | |
# """ | |
# global tokenizer | |
# chunks = [] | |
# for item in texts: | |
# text = preprocess_text(item['text']) | |
# if not text: | |
# continue | |
# metadata = item['metadata'] | |
# # Split text into paragraphs | |
# paragraphs = text.split('\n\n') | |
# current_chunk = '' | |
# current_tokens = 0 | |
# for i, paragraph in enumerate(paragraphs): | |
# paragraph = paragraph.strip() | |
# if not paragraph: | |
# continue | |
# paragraph_tokens = len(tokenizer.encode(paragraph)) | |
# if current_tokens + paragraph_tokens <= max_tokens: | |
# current_chunk += paragraph + '\n\n' | |
# current_tokens += paragraph_tokens | |
# else: | |
# # Save the current chunk | |
# chunk = { | |
# 'text': current_chunk.strip(), | |
# 'metadata': metadata | |
# } | |
# chunks.append(chunk) | |
# # Start a new chunk with overlap | |
# overlap_text = ' '.join(current_chunk.split()[-overlap_tokens:]) | |
# current_chunk = overlap_text + ' ' + paragraph + '\n\n' | |
# current_tokens = len(tokenizer.encode(current_chunk)) | |
# if current_chunk: | |
# chunk = { | |
# 'text': current_chunk.strip(), | |
# 'metadata': metadata | |
# } | |
# chunks.append(chunk) | |
# return chunks | |
def is_car_model_available(query, available_models): | |
# ... (same as your original function) | |
for model in available_models: | |
if model.lower() in query.lower(): | |
return model | |
return None | |
# def extract_car_model(pdf_filename): | |
# base_name = os.path.basename(pdf_filename) | |
# match = re.search(r'manual_(.+)\.pdf', base_name) | |
# if match: | |
# model_name = match.group(1).replace('_', ' ').title() | |
# return model_name | |
# else: | |
# return 'Unknown Model' | |
def colbert_rerank(query=None, chunks=None): | |
# ... (same as your original function) | |
d = ranker.rank(query=query, docs=chunks) | |
reranked_chunks = [d[i].text for i in range(len(chunks))] | |
return reranked_chunks | |
def process_query(query): | |
# Use global variables | |
global available_car_models, collection | |
# print("Input Query:",query) | |
# print(type(query)) | |
car_model = is_car_model_available(query, available_car_models) | |
if not car_model: | |
return "The manual for the specified car model is not present." | |
# Initial retrieval from ChromaDB | |
results = collection.query( | |
query_texts=[query], | |
n_results=50, | |
where={"car_model": car_model}, | |
include=['documents', 'metadatas'] | |
) | |
if not results['documents']: | |
return "No relevant information found in the manual." | |
# Extract chunks and metadata | |
pre_chunks = results['documents'][0] | |
metadatas = results['metadatas'][0] | |
chunks = [f'Page {y["page_number"]}:: {x}' for x,y in zip(pre_chunks,metadatas)] | |
reranked_chunks = colbert_rerank(query, chunks) | |
final_context = " ".join(reranked_chunks[:10]) | |
answer = call_Llama_api(query, final_context) | |
last_complete = answer.rfind('.') | |
# last_newline = answer.rfind('\n') | |
# last_complete = max(last_period, last_newline) | |
if last_complete != -1: | |
answer = answer[:last_complete + 1].strip() | |
answer = edit_text(answer) | |
# Prepare citations | |
# citations = [ | |
# f"Page {meta.get('page_number', 'N/A')}" for meta in metadatas[:5] | |
# ] | |
# citations_text = "Pages cited from:\n" + "\n".join(citations) | |
# return f"{answer}\n\n{citations_text}" | |
return answer | |
# Initialize global variables | |
def initialize(): | |
global collection, available_car_models, ranker | |
# Check for CUDA availability | |
device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
print(f"Using device: {device}. new commit") | |
# tokenizer = GPT2TokenizerFast.from_pretrained("gpt2") # For token counting | |
# Initialize embedding model | |
embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction( | |
model_name="all-MiniLM-L12-v2", device=device | |
) | |
client = PersistentClient(path="./chromadb") | |
# Get the collection | |
collection_name = "car_manuals5" | |
# if collection_name in [col.name for col in client.list_collections()]: | |
# collection = client.get_collection( | |
# name=collection_name, | |
# embedding_function=embedding_function | |
# ) | |
available_car_models = ['TIAGO', 'Astor'] | |
# else: | |
collection = client.get_collection( | |
name=collection_name, | |
embedding_function=embedding_function | |
) | |
# collection = client.get_or_create_collection( | |
# name=collection_name, | |
# embedding_function=embedding_function | |
# ) | |
# Set available car models | |
# available_car_models = ['TIAGO', 'Astor'] | |
# pdf_files = ['./car_manuals/manual_Tiago.pdf', './car_manuals/manual_Astor.pdf'] | |
# available_car_models = [] | |
# for pdf_file in pdf_files: | |
# print(f"Parsing {pdf_file}...") | |
# pdf_texts = parse_pdf(pdf_file) | |
# car_model = extract_car_model(pdf_file) | |
# available_car_models.append(car_model) | |
# # Add car model to metadata | |
# for item in pdf_texts: | |
# item['metadata']['car_model'] = car_model | |
# # Chunk texts using the refined strategy | |
# chunks = chunk_texts(pdf_texts, max_tokens=500, overlap_tokens=50) | |
# # Prepare data for ChromaDB | |
# documents = [chunk['text'] for chunk in chunks] | |
# metadatas = [chunk['metadata'] for chunk in chunks] | |
# ids = [f"{car_model}_{i}" for i in range(len(documents))] | |
# # Add to ChromaDB collection | |
# collection.add( | |
# documents=documents, | |
# metadatas=metadatas, | |
# ids=ids | |
# ) | |
# Initialize the ranker | |
ranker = Reranker("answerdotai/answerai-colbert-small-v1", model_type='colbert') | |
# Call initialize function | |
initialize() | |
# Set up the Gradio interface | |
iface = gr.Interface( | |
fn=process_query, | |
inputs=gr.Textbox(lines=2, placeholder='Enter your question here...'), | |
outputs='text', | |
title='Car Manual Assistant', | |
description='Ask a question about Tata Tiago or MG Astor.', | |
) | |
if __name__ == "__main__": | |
# iface.launch(server_name="0.0.0.0", server_port=7860) | |
iface.launch() |