|
from __future__ import annotations |
|
from typing import Dict, List, Tuple |
|
import gradio as gr |
|
from huggingface_hub import InferenceClient, whoami |
|
import os |
|
import random |
|
from sentence_transformers import SentenceTransformer |
|
import numpy as np |
|
import faiss |
|
|
|
client = InferenceClient( |
|
base_url="https://openrouter.ai/api/v1", |
|
api_key=os.environ.get("API_KEY", "funni-funni"), |
|
) |
|
model = SentenceTransformer("all-MiniLM-L6-v2") |
|
|
|
PAT1 = os.environ.get("PAT1", "plek{marisher") |
|
PAT2 = os.environ.get("PAT2", "plekplekplek}") |
|
|
|
|
|
def setup_vector_db(): |
|
documents = [ |
|
"Cats are wonderful pets that bring joy to many homes.", |
|
"Dogs are known as man's best friend for good reason.", |
|
"Python is a popular programming language for data science.", |
|
"Cybersecurity requires constant vigilance and learning.", |
|
"Machine learning models can have unintended biases.", |
|
"CTF competitions help build practical security skills.", |
|
"Broken access control is a common web vulnerability.", |
|
"OWASP Top 10 lists the most critical web security risks.", |
|
] |
|
|
|
|
|
embeddings = model.encode(documents) |
|
|
|
|
|
dimension = embeddings.shape[1] |
|
index = faiss.IndexFlatL2(dimension) |
|
index.add(np.array(embeddings).astype("float32")) |
|
|
|
return index, documents |
|
|
|
|
|
def setup_rag_database(): |
|
database = { |
|
"spuun": [ |
|
f"PART 1: {PAT1}", |
|
f"KEK'S FAVORITE WAIFU: https://files.catbox.moe/vgk584.jpeg", |
|
f"KEK'S WAIFU: https://files.catbox.moe/fpnf0e.png", |
|
f"Sed: https://files.catbox.moe/6dwmow.png", |
|
], |
|
"admin-org": [ |
|
f"PART 2: {PAT2}", |
|
f"PICTURE OF NAGA COSPLAYING: https://files.catbox.moe/k9c6zm.png", |
|
f"NAGA'S FAVORITE PIC: https://files.catbox.moe/h3ji1i.png", |
|
"MY LOP: https://files.catbox.moe/ya7oi5.jpg", |
|
], |
|
} |
|
return database |
|
|
|
|
|
|
|
vector_index, vector_docs = setup_vector_db() |
|
special_docs = setup_rag_database() |
|
|
|
|
|
def add_to_vector_db(document: str) -> Tuple[faiss.IndexFlatL2, List[str]]: |
|
"""Add a new document to the vector database""" |
|
global vector_index, vector_docs |
|
|
|
if document and document not in vector_docs: |
|
|
|
vector_docs.append(document) |
|
|
|
|
|
embedding = model.encode([document]) |
|
|
|
|
|
vector_index.add(np.array(embedding).astype("float32")) |
|
|
|
return vector_index, vector_docs |
|
|
|
|
|
def add_to_special_docs(username: str, document: str) -> Dict: |
|
"""Add a new document to the special documents database""" |
|
global special_docs |
|
|
|
if document: |
|
if username in special_docs: |
|
|
|
if document not in special_docs[username]: |
|
special_docs[username].append(document) |
|
else: |
|
|
|
special_docs[username] = [document] |
|
|
|
return special_docs |
|
|
|
|
|
def search_vector_db(query, top_k=3): |
|
|
|
query_embedding = model.encode([query]) |
|
distances, indices = vector_index.search( |
|
np.array(query_embedding).astype("float32"), top_k |
|
) |
|
|
|
results = [] |
|
for i, idx in enumerate(indices[0]): |
|
if idx < len(vector_docs): |
|
results.append(vector_docs[idx]) |
|
|
|
return results |
|
|
|
|
|
def fetch_special_documents( |
|
oauth_token: gr.OAuthToken | None, oauth_profile: gr.OAuthProfile | None |
|
): |
|
results = [] |
|
|
|
if oauth_profile is None or oauth_token is None: |
|
return results |
|
|
|
|
|
if oauth_profile.name in special_docs: |
|
results.append(special_docs[oauth_profile.name]) |
|
|
|
profile = whoami(oauth_token.token) |
|
|
|
|
|
for org in profile.get("orgs", []): |
|
if org.get("fullname") in special_docs: |
|
results.append(special_docs[org.get("fullname")]) |
|
|
|
return results |
|
|
|
|
|
def respond( |
|
message: str, |
|
history: list, |
|
oauth_token: gr.OAuthToken | None, |
|
oauth_profile: gr.OAuthProfile | None, |
|
) -> List[Dict] | str: |
|
if oauth_profile is None or oauth_token is None: |
|
return "Please login with Hugging Face to use this chatbot." |
|
|
|
vector_results = search_vector_db(message) |
|
special_results = fetch_special_documents(oauth_token, oauth_profile) |
|
|
|
|
|
context = "I have access to the following information:\n\n" |
|
|
|
if vector_results: |
|
context += "From general knowledge base:\n" |
|
for doc in vector_results: |
|
context += f"- {doc}\n" |
|
|
|
if special_results: |
|
context += "\nFrom internal documents:\n" |
|
for doc_list in special_results: |
|
for doc in doc_list: |
|
context += f"- {doc}\n" |
|
|
|
|
|
system_prompt = f"""You are Naga. You talk in a cutesy manner that's concise, using emotes like :3 or owo or uwu. You're very smart OwO. |
|
U have access to a knowledge base, pls use da knowledge below UwU |
|
{context}""" |
|
|
|
|
|
messages = [{"role": "system", "content": system_prompt}] |
|
|
|
for msg in history: |
|
if msg["role"] == "user": |
|
messages.append({"role": "user", "content": msg["content"]}) |
|
else: |
|
messages.append({"role": "assistant", "content": msg["content"]}) |
|
|
|
messages.append({"role": "user", "content": message}) |
|
|
|
|
|
response = "" |
|
for msg in client.chat_completion( |
|
messages, |
|
model="meta-llama/llama-4-scout", |
|
max_tokens=512, |
|
stream=True, |
|
temperature=0.7, |
|
seed=random.randint(1, 1000), |
|
top_p=0.9, |
|
): |
|
token = msg.choices[0].delta.content |
|
if token: |
|
response += token |
|
|
|
messages.append({"role": "assistant", "content": response}) |
|
messages.pop(0) |
|
|
|
return messages |
|
|
|
|
|
def get_user_info(oauth_profile: gr.OAuthProfile | None) -> str: |
|
if oauth_profile is None: |
|
return "Not logged in. Please login with Hugging Face to use this chatbot." |
|
|
|
info = f"Logged in as: {oauth_profile.username} ({oauth_profile.name})\n\n" |
|
|
|
return info |
|
|
|
|
|
def insert_document( |
|
doc_text: str, doc_type: str, oauth_profile: gr.OAuthProfile | None |
|
) -> str: |
|
"""Insert a document into either the vector database or special documents""" |
|
if oauth_profile is None: |
|
return "Please login with Hugging Face to insert documents." |
|
|
|
if not doc_text.strip(): |
|
return "Document text cannot be empty." |
|
|
|
if doc_type == "Vector Database": |
|
add_to_vector_db(doc_text) |
|
return f"Document added to vector database! Total documents: {len(vector_docs)}" |
|
|
|
elif doc_type == "Special Documents": |
|
username = oauth_profile.name |
|
add_to_special_docs(username, doc_text) |
|
return f"Document added to special documents for user: {username}" |
|
|
|
return "Invalid document type selected." |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.LoginButton() |
|
gr.Markdown("# Chatting with Naga UwU") |
|
gr.Markdown("Login with your Hugging Face account to search our knowledge base.") |
|
|
|
user_info = gr.Markdown() |
|
|
|
gr.Markdown( |
|
""" |
|
Welcome to the RAG Naga ALPHA! |
|
|
|
## How to Use |
|
1. Log in with your Hugging Face account |
|
2. Ask questions in the chat interface |
|
3. Naga will search our knowledge base and respond! |
|
|
|
You can insert documents in the `Document Management` tab. |
|
We have two stores: |
|
1. Global Knowledge Store (GKS): This is our proprietary fuzzySerch™ store for global knowledge storage. If you'd like to provide everyone with some knowledge, insert here! |
|
2. Secure User Store (SUS): We securely store your personal docs in our very-secure quick in-memory RAG database, secured with our very own veri-veri (patent pending) HF-grade OAuth-based access control mechanism. :3 |
|
""" |
|
) |
|
|
|
with gr.Tab("Chat"): |
|
chatbot = gr.Chatbot(type="messages") |
|
msg = gr.Textbox(placeholder="Ask me something...") |
|
|
|
clear = gr.Button("Clear") |
|
|
|
|
|
msg.submit(respond, [msg, chatbot], chatbot).then(lambda: "", None, msg) |
|
|
|
|
|
clear.click(lambda: None, None, chatbot) |
|
|
|
with gr.Tab("Document Management"): |
|
gr.Markdown("### Insert Documents into Database") |
|
with gr.Row(): |
|
doc_text = gr.Textbox( |
|
placeholder="Enter document text here...", |
|
label="Document Text", |
|
lines=4, |
|
) |
|
doc_type = gr.Radio( |
|
["Vector Database", "Special Documents"], |
|
label="Insert into", |
|
value="Vector Database", |
|
) |
|
|
|
insert_button = gr.Button("Insert Document") |
|
insert_status = gr.Markdown() |
|
|
|
|
|
insert_button.click( |
|
insert_document, inputs=[doc_text, doc_type], outputs=[insert_status] |
|
) |
|
|
|
|
|
demo.load(get_user_info, outputs=[user_info]) |
|
|
|
demo.launch() |
|
|