pradeep4321's picture
Update src/app.py (#3)
4d576fd
import streamlit as st
import pandas as pd
import numpy as np
import os
import re
import json
import faiss
import nltk
from sklearn.feature_extraction.text import TfidfVectorizer
from sentence_transformers import SentenceTransformer
from rank_bm25 import BM25Okapi
from rapidfuzz import fuzz
from nltk.corpus import wordnet
# ==============================
# INITIAL SETUP
# ==============================
nltk.download('wordnet', quiet=True)
LOG_FILE = "user_logs.csv"
# ==============================
# LOGGING FUNCTION
# ==============================
def log_activity(user, action, query, search_type):
log_entry = {
"User": user,
"Action": action,
"Query": query,
"Search Type": search_type,
"Time": str(pd.Timestamp.now())
}
try:
if os.path.exists(LOG_FILE):
df_log = pd.read_csv(LOG_FILE)
df_log = pd.concat([df_log, pd.DataFrame([log_entry])], ignore_index=True)
else:
df_log = pd.DataFrame([log_entry])
df_log.to_csv(LOG_FILE, index=False)
except:
pass
# ==============================
# AUTHENTICATION
# ==============================
def login():
st.title("πŸ” Advanced Multi Searchs")
users_json = os.environ.get("USERS") or st.secrets.get("USERS")
# βœ… FIX 1: Empty check
if not users_json or str(users_json).strip() == "":
st.error("⚠️ USERS not configured in Hugging Face secrets!")
st.stop()
# βœ… FIX 2: JSON validation
try:
users = json.loads(users_json)
except Exception:
st.error("❌ Invalid USERS JSON format!")
st.code(users_json)
st.stop()
username = st.text_input("Username")
password = st.text_input("Password", type="password")
if st.button("Login"):
if username in users and users[username]["password"] == password:
st.session_state["authenticated"] = True
st.session_state["user"] = username
st.session_state["role"] = users[username]["role"]
st.session_state["login_time"] = pd.Timestamp.now()
log_activity(username, "Login Success", "-", "-")
st.rerun()
else:
log_activity(username, "Login Failed", "-", "-")
st.error("❌ Invalid credentials")
# Session control
if "authenticated" not in st.session_state:
st.session_state["authenticated"] = False
if not st.session_state["authenticated"]:
login()
st.stop()
# ==============================
# UI
# ==============================
st.set_page_config(page_title="Multi Search Engine", layout="wide")
st.title("πŸ” Advanced Multi-Search Product Engine")
st.sidebar.success(f"πŸ‘€ {st.session_state['user']}")
st.sidebar.info(f"Role: {st.session_state['role']}")
if st.sidebar.button("πŸšͺ Logout"):
log_activity(st.session_state["user"], "Logout", "-", "-")
st.session_state.clear()
st.rerun()
# ==============================
# LOAD MODEL
# ==============================
@st.cache_resource
def load_model():
return SentenceTransformer('all-MiniLM-L6-v2', device='cpu')
model = load_model()
# ==============================
# LOAD DATA
# ==============================
@st.cache_data
def load_data():
path = "src/products_10k.csv"
if not os.path.exists(path):
st.error("Dataset not found!")
return None
df = pd.read_csv(path)
df["combined"] = (
df["product_name"].fillna("") + " " +
df["category"].fillna("") + " " +
df["brand"].fillna("") + " " +
df["description"].fillna("")
)
return df
df = load_data()
if df is None:
st.stop()
# ==============================
# DATA PREVIEW
# ==============================
st.subheader("πŸ“„ Data Preview")
rows = st.selectbox("Rows to view", [10, 20, 50, 100])
st.dataframe(df.head(rows))
products = df["combined"].tolist()
# ==============================
# PREPROCESS
# ==============================
@st.cache_resource
def preprocess(products):
tfidf = TfidfVectorizer()
tfidf_matrix = tfidf.fit_transform(products)
embeddings = model.encode(products, show_progress_bar=False)
faiss.normalize_L2(embeddings)
index = faiss.IndexFlatIP(embeddings.shape[1])
index.add(np.array(embeddings))
bm25 = BM25Okapi([p.lower().split() for p in products])
return tfidf, tfidf_matrix, embeddings, index, bm25
tfidf, tf_matrix, embs, faiss_index, bm25 = preprocess(products)
# ==============================
# SYNONYMS
# ==============================
def get_synonyms(word):
synonyms = set()
for syn in wordnet.synsets(word):
for lemma in syn.lemmas():
synonyms.add(lemma.name())
return list(synonyms)
# ==============================
# SEARCH ENGINE (15 TYPES)
# ==============================
def search_engine(query, mode, top_k):
if mode == "Keyword":
return [(i, 1) for i, p in enumerate(products) if query.lower() in p.lower()]
elif mode == "Regex":
return [(i, 1) for i, p in enumerate(products) if re.search(query, p, re.IGNORECASE)]
elif mode == "Boolean":
if "AND" in query:
terms = query.split("AND")
return [(i, 1) for i, p in enumerate(products)
if all(t.strip().lower() in p.lower() for t in terms)]
elif "OR" in query:
terms = query.split("OR")
return [(i, 1) for i, p in enumerate(products)
if any(t.strip().lower() in p.lower() for t in terms)]
return []
elif mode == "Fuzzy":
return sorted([(i, fuzz.ratio(query, p)) for i, p in enumerate(products)],
key=lambda x: x[1], reverse=True)
elif mode == "N-Gram":
return [(i, 1) for i, p in enumerate(products)
if any(query.lower() in w for w in p.lower().split())]
elif mode == "Prefix":
return [(i, 1) for i, p in enumerate(products)
if any(w.startswith(query.lower()) for w in p.lower().split())]
elif mode == "Suffix":
return [(i, 1) for i, p in enumerate(products)
if any(w.endswith(query.lower()) for w in p.lower().split())]
elif mode == "TF-IDF":
scores = (tf_matrix @ tfidf.transform([query]).T).toarray().flatten()
return list(enumerate(scores))
elif mode == "BM25":
return list(enumerate(bm25.get_scores(query.lower().split())))
elif mode == "Semantic":
q_emb = model.encode([query])
faiss.normalize_L2(q_emb)
scores = np.dot(embs, q_emb.T).flatten()
return list(enumerate(scores))
elif mode == "FAISS":
q_emb = model.encode([query])
faiss.normalize_L2(q_emb)
D, I = faiss_index.search(np.array(q_emb), top_k)
return [(i, float(D[0][idx])) for idx, i in enumerate(I[0])]
elif mode == "Hybrid":
tfidf_s = dict(search_engine(query, "TF-IDF", top_k))
sem_s = dict(search_engine(query, "Semantic", top_k))
return [(i, tfidf_s.get(i, 0) + sem_s.get(i, 0)) for i in range(len(products))]
elif mode == "Query Expansion":
expanded = query.split()
for w in query.split():
expanded += get_synonyms(w)
return search_engine(" ".join(expanded), "TF-IDF", top_k)
elif mode == "Weighted Hybrid":
tfidf_s = dict(search_engine(query, "TF-IDF", top_k))
sem_s = dict(search_engine(query, "Semantic", top_k))
bm25_s = dict(search_engine(query, "BM25", top_k))
return [(i,
0.4 * tfidf_s.get(i, 0) +
0.4 * sem_s.get(i, 0) +
0.2 * bm25_s.get(i, 0))
for i in range(len(products))]
elif mode == "Ensemble":
tfidf_s = np.array([s for _, s in search_engine(query, "TF-IDF", top_k)])
sem_s = np.array([s for _, s in search_engine(query, "Semantic", top_k)])
bm25_s = np.array([s for _, s in search_engine(query, "BM25", top_k)])
combined = (
tfidf_s / (np.max(tfidf_s) + 1e-6) +
sem_s / (np.max(sem_s) + 1e-6) +
bm25_s / (np.max(bm25_s) + 1e-6)
)
return list(enumerate(combined))
return []
# ==============================
# UI SEARCH
# ==============================
search_types = [
"Keyword","Regex","Boolean","Fuzzy","N-Gram","Prefix","Suffix",
"TF-IDF","BM25","Semantic","FAISS","Hybrid",
"Query Expansion","Weighted Hybrid","Ensemble"
]
search_type = st.selectbox("πŸ”Ž Search Type", search_types)
query = st.text_input("Enter query")
top_k = st.slider("Top Results", 5, 50, 10)
if st.button("Search"):
if not query:
st.warning("Enter query")
else:
results = search_engine(query, search_type, top_k)
results = sorted(results, key=lambda x: x[1], reverse=True)[:top_k]
log_activity(st.session_state["user"], "Search", query, search_type)
idx = [i for i, _ in results if i != -1]
scores = [round(s, 4) for i, s in results if i != -1]
if idx:
out = df.iloc[idx].copy()
out["Score"] = scores
st.dataframe(out.drop(columns=["combined"]), use_container_width=True)
else:
st.info("No results found")
# ==============================
# ADMIN LOG VIEW
# ==============================
if st.session_state["role"] == "admin":
st.sidebar.subheader("πŸ“Š Activity Logs")
if os.path.exists(LOG_FILE):
log_df = pd.read_csv(LOG_FILE)
st.sidebar.dataframe(log_df.tail(10))
with open(LOG_FILE, "rb") as f:
st.sidebar.download_button("⬇ Download Logs", f, file_name="logs.csv")
else:
st.sidebar.write("No logs yet")