import os import torch import unicodedata import re import json import textwrap import numpy as np from pathlib import Path import gradio as gr import traceback from transformers import AutoTokenizer, AutoModel from sklearn.feature_extraction.text import HashingVectorizer from sklearn.preprocessing import normalize import chromadb import logging import shutil # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # --------------------------- CONFIG ----------------------------------- DATA_PATH = "dataset14.json" # Changed from dataset13.json to dataset14.json DB_DIR = "chroma_new_db" COL_NAME = "dataset13_grbert_charword" MODEL_NAME = "sentence-transformers/paraphrase-xlm-r-multilingual-v1" CHUNK_SIZE = 512 CHUNK_OVERLAP = 40 BATCH_EMB = 64 ALPHA_BASE = 0.50 # default α ALPHA_LONGQ = 0.65 # α όταν το query > 30 λέξεις DEVICE = "cuda" if torch.cuda.is_available() else "cpu" MAX_TEXT_LENGTH = 10000 # Limit maximum text length to avoid tokenizer warnings # For simple operation when debugging DEBUG_MODE = True # Initialize globals as None tok = None model = None col = None raw_chunks = [] pre_chunks = [] metas = [] ids = [] X_char = None X_word = None char_vec = None word_vec = None # ----------------------- PRE-/POST HELPERS ---------------------------- def strip_acc(s: str) -> str: return ''.join(ch for ch in unicodedata.normalize('NFD', s) if not unicodedata.combining(ch)) STOP = {"σχετικο", "σχετικά", "με", "και"} def preprocess(txt: str) -> str: txt = strip_acc(txt.lower()) txt = re.sub(r"[^a-zα-ω0-9 ]", " ", txt) txt = re.sub(r"\s+", " ", txt).strip() return " ".join(w for w in txt.split() if w not in STOP) def chunk_text(text, tok): # Tokenize text ids = tok(text)["input_ids"] if len(ids) <= CHUNK_SIZE: return [text] step = CHUNK_SIZE - CHUNK_OVERLAP return [tok.decode(ids[i:i+CHUNK_SIZE], skip_special_tokens=True) for i in range(0, len(ids), step)] def cls_embed(texts, tok, model, bs=BATCH_EMB): out = [] for i in range(0, len(texts), bs): try: enc = tok(texts[i:i+bs], padding=True, truncation=True, max_length=CHUNK_SIZE, return_tensors="pt").to(DEVICE) with torch.no_grad(): hs = model(**enc, output_hidden_states=True).hidden_states cls = torch.stack(hs[-4:], 0).mean(0)[:, 0, :] cls = torch.nn.functional.normalize(cls, p=2, dim=1) out.append(cls.cpu()) except Exception as e: logger.error(f"Error encoding batch: {e}") # Skip this batch if there's an error continue if not out: raise ValueError("Failed to encode any text batches") return torch.cat(out).numpy() def format_result(hit): title = hit['metadata']['title'] score = hit['score'] snippet = hit['snippet'] url = hit['metadata'].get('url', '') result = f"### {title} (score={score:.3f})\n\n" result += f"{snippet}\n\n" if url: result += f"[Δείτε το έγγραφο]({url})\n\n" result += "---\n\n" return result def reset_db(): """Remove existing DB directory if it exists to create a fresh one""" if os.path.exists(DB_DIR): logger.info(f"Removing existing DB directory: {DB_DIR}") shutil.rmtree(DB_DIR) os.makedirs(DB_DIR, exist_ok=True) logger.info(f"Created fresh DB directory: {DB_DIR}") def load_resources(): global tok, model, col, raw_chunks, pre_chunks, metas, ids, X_char, X_word, char_vec, word_vec try: logger.info("Loading Model...") tok = AutoTokenizer.from_pretrained(MODEL_NAME) model = AutoModel.from_pretrained(MODEL_NAME).to(DEVICE).eval() logger.info("Reading JSON...") try: with open(DATA_PATH, encoding="utf-8") as f: docs_json = json.load(f) logger.info(f"Successfully loaded {len(docs_json)} documents from {DATA_PATH}") except Exception as e: logger.error(f"Error loading JSON file {DATA_PATH}: {e}") raise # Reset and create fresh database reset_db() # Process documents raw_chunks, pre_chunks, metas, ids = [], [], [], [] for d in docs_json: text = d.get("text", "") if not text: logger.warning(f"Document {d.get('id', 'unknown')} has no text, skipping") continue try: for idx, chunk in enumerate(chunk_text(text, tok)): raw_chunks.append(chunk) pre_chunks.append(preprocess(chunk)) metas.append({ "id": d.get("id", f"doc_{len(raw_chunks)}"), "title": d.get("title", "Untitled Document"), "url": d.get("url", ""), "chunk": idx }) ids.append(f'{d.get("id", f"doc_{len(raw_chunks)}")}_{idx}') except Exception as e: logger.error(f"Error processing document: {e}") continue logger.info(f" → total chunks: {len(raw_chunks):,}") if len(raw_chunks) == 0: raise ValueError("No documents were processed successfully") logger.info("Building lexical matrices...") char_vec = HashingVectorizer(analyzer="char_wb", ngram_range=(2, 5), n_features=2**20, norm=None, alternate_sign=False, binary=True) word_vec = HashingVectorizer(analyzer="word", ngram_range=(1, 2), n_features=2**19, norm=None, alternate_sign=False, binary=True) logger.info(f"Building char matrix for {len(pre_chunks)} chunks...") X_char = normalize(char_vec.transform(pre_chunks)) logger.info(f"Building word matrix for {len(pre_chunks)} chunks...") X_word = normalize(word_vec.transform(pre_chunks)) logger.info("Creating new Chroma DB...") client = chromadb.PersistentClient(path=DB_DIR) # Create a new collection col = client.create_collection(name=COL_NAME, metadata={"hnsw:space": "cosine"}) logger.info("Encoding & adding documents to ChromaDB...") # Add documents to collection in smaller batches to avoid memory issues batch_size = min(BATCH_EMB, 32) # Use smaller batch size for adding to DB for start in range(0, len(pre_chunks), batch_size): end = min(start + batch_size, len(pre_chunks)) batch_ids = ids[start:end] batch_docs = pre_chunks[start:end] batch_metas = metas[start:end] try: batch_embs = cls_embed(batch_docs, tok, model, bs=batch_size) col.add( documents=batch_docs, embeddings=batch_embs.tolist(), ids=batch_ids, metadatas=batch_metas ) logger.info(f"Added batch {start}-{end} to ChromaDB") except Exception as e: logger.error(f"Error adding batch {start}-{end} to ChromaDB: {e}") # Continue with next batch logger.info(f"✓ Resources loaded. Database has {col.count()} entries.") except Exception as e: logger.error(f"Error in load_resources: {e}") logger.error(traceback.format_exc()) raise def dummy_search(): """Fallback search function when normal search fails or for debugging""" return [ { "score": 0.95, "snippet": "Υποψήφιοι μπορούν να είναι επιστήμονες ανεγνωρισμένου επιστημονικού κύρους κάτοχοι διδακτορικού διπλώματος. Τα προσόντα που απαιτείται να έχουν οι υποψήφιοι περιλαμβάνουν διδακτική εμπειρία σε συναφές αντικείμενο και ερευνητικό έργο...", "metadata": {"title": "Προκήρυξη Εντεταλμένων Διδασκόντων Πανεπιστημίου Αθηνών", "url": ""} }, { "score": 0.85, "snippet": "Απαιτούμενα δικαιολογητικά: 1. Αίτηση υποψηφιότητας, 2. Αντίγραφο διδακτορικού διπλώματος, 3. Βιογραφικό σημείωμα, 4. Αναλυτικό υπόμνημα επιστημονικών δημοσιεύσεων, 5. Αποδεικτικά διδακτικής εμπειρίας...", "metadata": {"title": "Δικαιολογητικά για Εντεταλμένους Διδάσκοντες", "url": ""} } ] def hybrid_search(query, k=5): global X_char, X_word, char_vec, word_vec, ids, raw_chunks, pre_chunks, metas, col # For debugging - if resources didn't load properly if DEBUG_MODE and (col is None or len(ids) == 0): logger.warning("Using dummy search because resources are not loaded") return dummy_search() try: q_pre = preprocess(query) words = q_pre.split() alpha = ALPHA_LONGQ if len(words) > 30 else ALPHA_BASE # exact substring boost exact_ids = {ids[i] for i, t in enumerate(pre_chunks) if q_pre in t} # semantic q_emb = cls_embed([q_pre], tok, model)[0] sem = col.query(query_embeddings=[q_emb.tolist()], n_results=min(k*30, len(ids)), include=["distances"]) sem_sims = {d: 1-dist for d, dist in zip(sem["ids"][0], sem["distances"][0])} # lexical char q_char = normalize(char_vec.transform([q_pre])) char_sim = (q_char @ X_char.T).toarray()[0] # lexical word q_word = normalize(word_vec.transform([q_pre])) word_sim = (q_word @ X_word.T).toarray()[0] lex_sims = {} for idx, (c, w) in enumerate(zip(char_sim, word_sim)): if c > 0 or w > 0: lex_sims[ids[idx]] = 0.85*c + 0.15*w # blend + exact boost all_ids = set(sem_sims) | set(lex_sims) | exact_ids scored = [] for doc_id in all_ids: s = alpha*sem_sims.get(doc_id, 0) + (1-alpha)*lex_sims.get(doc_id, 0) if doc_id in exact_ids: s = 1.0 scored.append((doc_id, s)) # 1) Ταξινόμησε κατά score φθίνουσα scored.sort(key=lambda x: -x[1]) # 2) Φτιάξε top-k μοναδικά έγγραφα hits = [] seen_doc_ids = set() for chunk_id, score in scored: idx = ids.index(chunk_id) doc_id = metas[idx]['id'] if doc_id in seen_doc_ids: continue # προσθέτουμε το πρώτο (το καλύτερο) chunk για αυτό το doc hits.append({ "score": score, "snippet": raw_chunks[idx][:300] + " …", "metadata": metas[idx] }) seen_doc_ids.add(doc_id) if len(hits) >= k: break return hits except Exception as e: logger.error(f"Error in hybrid_search: {e}") logger.error(traceback.format_exc()) if DEBUG_MODE: return dummy_search() else: raise def chat_response(message, history): if not message.strip(): return "Παρακαλώ πληκτρολογήστε μια ερώτηση.", "" try: logger.info(f"Processing query: {message[:50]}...") results = hybrid_search(message, k=5) if not results: return "Δεν βρέθηκαν σχετικά έγγραφα για την ερώτησή σας.", "" response = "# Σχετικά Έγγραφα\n\n" for hit in results: response += format_result(hit) return response, "" except Exception as e: logger.error(f"Error processing query: {e}") logger.error(traceback.format_exc()) return f"Συνέβη ένα σφάλμα κατά την αναζήτηση: {str(e)[:200]}. Παρακαλώ δοκιμάστε ξανά με διαφορετική ερώτηση.", "" # -------------------- GRADIO INTERFACE ------------------------------ def create_demo(): with gr.Blocks(css="footer {visibility: hidden}") as demo: gr.Markdown(""" # Αναζήτηση Εγγράφων Ελληνικού Δημόσιου Τομέα Αυτό το chatbot σάς επιτρέπει να αναζητήσετε και να ανακτήσετε έγγραφα του Ελληνικού δημόσιου τομέα (όπως Προκηρύξεις Εντεταλμένων Διδασκόντων στα Πανεπιστήμια) μέσω ερωτήσεων σε φυσική γλώσσα. **Παραδείγματα ερωτήσεων:** - Ποιες είναι οι προϋποθέσεις για να γίνει κάποιος εντεταλμένος διδάσκων; - Ποια δικαιολογητικά χρειάζονται για την αίτηση; - Πότε λήγει η προθεσμία υποβολής αιτήσεων; """) chatbot = gr.Chatbot( label="Αποτελέσματα Αναζήτησης", bubble_full_width=False, height=600, avatar_images=(None, None), # Removed avatar to simplify show_copy_button=True ) with gr.Row(): msg = gr.Textbox( placeholder="Πληκτρολογήστε την ερώτησή σας εδώ...", container=False, scale=9 ) submit = gr.Button("Αναζήτηση", scale=1) clear = gr.Button("Καθαρισμός") examples = gr.Examples( examples=[ "Ποιες είναι οι προϋποθέσεις για να γίνει κάποιος εντεταλμένος διδάσκων;", "Ποια δικαιολογητικά χρειάζονται για την αίτηση;", "Πότε λήγει η προθεσμία υποβολής αιτήσεων;" ], inputs=msg ) # Define a function to correctly update the chatbot def add_message(user_message, history): # First add the user's message to history history = history + [(user_message, None)] return history, "" def bot_response(history): try: user_message = history[-1][0] bot_message, _ = chat_response(user_message, history[:-1]) history[-1] = (user_message, bot_message) return history except Exception as e: logger.error(f"Error in bot_response: {e}") logger.error(traceback.format_exc()) error_message = f"Συνέβη ένα σφάλμα: {str(e)[:200]}" history[-1] = (user_message, error_message) return history # Fix event binding msg.submit(add_message, [msg, chatbot], [chatbot, msg]).then( bot_response, chatbot, chatbot ) submit.click(add_message, [msg, chatbot], [chatbot, msg]).then( bot_response, chatbot, chatbot ) clear.click(lambda: [], outputs=[chatbot]) gr.Markdown(""" ### Πληροφορίες Αυτή η εφαρμογή χρησιμοποιεί hybrid search με LLM embeddings (GreekBERT) και lexical search (TF-IDF) για να βρει τα πιο σχετικά έγγραφα με βάση την ερώτησή σας. """) return demo # ---------------------- MAIN ----------------------------------------- if __name__ == "__main__": try: logger.info("Starting application...") if not os.path.exists(DATA_PATH): logger.error(f"ERROR: Dataset file {DATA_PATH} not found!") with gr.Blocks() as error_demo: gr.Markdown(f""" # Σφάλμα: Δεν βρέθηκε το αρχείο δεδομένων Δεν μπορεί να βρεθεί το αρχείο: {DATA_PATH} Παρακαλώ βεβαιωθείτε ότι το αρχείο dataset14.json υπάρχει στον φάκελο της εφαρμογής. """) error_demo.launch(share=True) else: # Try load resources but continue with dummy data if it fails try: load_resources() except Exception as e: logger.error(f"Failed to load resources: {e}") logger.error("Continuing with DEBUG mode") DEBUG_MODE = True demo = create_demo() # Check if running on Hugging Face Spaces if os.environ.get('SPACE_ID'): demo.launch() # Don't use share=True on Hugging Face Spaces else: demo.launch(share=True) # Only use share=True locally except Exception as e: logger.error(f"Application failed to start: {e}") logger.error(traceback.format_exc()) # In case of failure, try to create a minimal UI to show error with gr.Blocks() as error_demo: gr.Markdown(f""" # Σφάλμα Εφαρμογής Η εφαρμογή αντιμετώπισε σφάλμα κατά την εκκίνηση: ``` {str(e)} {traceback.format_exc()} ``` Παρακαλώ επικοινωνήστε με τον διαχειριστή. """) # Also check here if running on Hugging Face Spaces if os.environ.get('SPACE_ID'): error_demo.launch() else: error_demo.launch(share=True)