import os import streamlit as st import chardet import aiohttp import asyncio import pandas as pd from io import BytesIO from langchain_community.document_loaders import WebBaseLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import FAISS from langchain_community.embeddings import SentenceTransformerEmbeddings from langchain.prompts import PromptTemplate from langchain.chains import RetrievalQA from langchain_community.llms import HuggingFaceHub from langchain.schema import Document import bs4 from PyPDF2 import PdfReader import logging import base64 from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM from langchain import HuggingFacePipeline from langchain_community.embeddings import HuggingFaceEmbeddings # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize the HuggingFace Mistral-7B model llm = HuggingFaceHub( repo_id="mistralai/Mistral-7B-Instruct-v0.3", model_kwargs={"temperature": 0.7, "max_length": 512} ) # Initialize the HuggingFace model with caching @st.cache_resource def load_model(repo_id): return HuggingFaceHub( repo_id=repo_id, model_kwargs={"temperature": 0.7, "max_length": 512} ) # List of available open-source models open_source_models = { "Mistral-7B": "mistralai/Mistral-7B-Instruct-v0.3", "Llama-2-7B": "meta-llama/Llama-2-7b-chat-hf", "Zephyr-7B": "HuggingFaceH4/zephyr-7b-beta" } # Asynchronous fetching and processing URLs async def fetch_and_process_url(session, url): documents = [] try: async with session.get(url) as response: if response.status != 200: logger.error(f"Failed to fetch URL {url}: HTTP {response.status}") return documents content = await content_type = response.headers.get('Content-Type', '') if 'application/pdf' in content_type: reader = PdfReader(BytesIO(content)) documents.extend([Document(page_content=page.extract_text(), metadata={"source": url, "page": i+1}) for i, page in enumerate(reader.pages) if page.extract_text()]) else: text = content.decode('utf-8', errors='ignore') soup = bs4.BeautifulSoup(text, 'html.parser') title = soup.find('title').text if soup.find('title') else "" author = soup.find('meta', attrs={'name': 'author'})['content'] if soup.find('meta', attrs={'name': 'author'}) else "" publication_date = soup.find('meta', attrs={'name': 'publication-date'})['content'] if soup.find('meta', attrs={'name': 'publication-date'}) else "" cleaned_text = soup.get_text() documents.append(Document(page_content=cleaned_text, metadata={"source": url, "title": title, "author": author, "publication_date": publication_date})) except Exception as e: logger.error(f"Failed to fetch or process URL {url}: {e}") return documents async def load_data_async(file_paths): documents = [] async with aiohttp.ClientSession() as session: tasks = [fetch_and_process_url(session, file_path) for file_path in file_paths] #st.write(tasks) results = await asyncio.gather(*tasks, return_exceptions=True) #st.write(results) for result in results: if isinstance(result, Exception): logger.error(f"Error processing URL: {result}") elif result: documents.extend(result)"Total documents loaded: {len(documents)}") #st.write(documents) return documents # Synchronous function to load initial data asynchronously @st.cache_data def load_initial_data(): return # Main logic to initialize embeddings and vector store initial_data = load_initial_data() if not initial_data: raise ValueError("No documents were loaded from the provided URLs.") vectorstore_local, all_documents = initialize_embeddings(initial_data) # Define zero-shot and few-shot prompt templates for each question type def get_zero_shot_prompt(question_type): templates = { "factual": """ You are a Cybersecurity expert focusing on the latest trends and investigative techniques in cyber-attacks. Provide a concise answer based on the following context. Context: {context} Question: {question} Answer: """, "contrastive": """ You are a Cybersecurity expert focusing on the latest trends and investigative techniques in cyber-attacks. Provide a concise answer based on the following context. Context: {context} Question: {question} Answer: """, "opinion": """ You are a Cybersecurity expert focusing on the latest trends and investigative techniques in cyber-attacks. Provide a concise answer based on the following context. Context: {context} Question: {question} Answer: """, "inferential": """ You are a Cybersecurity expert focusing on the latest trends and investigative techniques in cyber-attacks. Provide a concise answer based on the following context. Context: {context} Question: {question} Answer: """ } return templates[question_type] def get_few_shot_prompt(question_type): templates = { "factual": """ You are a Cybersecurity expert focusing on the latest trends and investigative techniques in cyber-attacks. Provide a concise answer based on the following context. Context: {context} Question: {question} Example: Context: In recent years, cyber-attacks have increased significantly. Question: What are the main reasons for the rise in cyber-attacks? Answer: The main reasons include increased connectivity, outdated security infrastructure, and sophisticated attack methods. Answer: """, "contrastive": """ You are a Cybersecurity expert focusing on the latest trends and investigative techniques in cyber-attacks. Provide a concise answer based on the following context. Context: {context} Question: {question} Example: Context: Both phishing and malware attacks are common cyber threats. Question: How do phishing attacks differ from malware attacks? Answer: Phishing attacks involve tricking individuals into revealing sensitive information, while malware attacks involve malicious software designed to damage or gain unauthorized access to systems. Answer: """, "opinion": """ You are a Cybersecurity expert focusing on the latest trends and investigative techniques in cyber-attacks. Provide a concise answer based on the following context. Context:{context} Question: {question} Example: Context: Many experts believe that AI can improve cybersecurity defenses. Question: Do you think AI can effectively combat cyber-attacks? Answer: Yes, AI can help identify patterns and anomalies that indicate potential cyber-attacks, making it an effective tool for enhancing cybersecurity defenses. Answer: """, "inferential": """ You are a Cybersecurity expert focusing on the latest trends and investigative techniques in cyber-attacks. Provide a concise answer based on the following context. Context: {context} Question: {question} Example: Context: An organization faced a significant data breach last year. Question: What measures can the organization take to prevent future breaches? Answer: The organization can implement stronger access controls, regular security audits, employee training, and advanced threat detection systems. Answer: """ } return templates[question_type] # Function to identify question type def identify_question_type(question): question = question.lower() if any(keyword in question for keyword in ["what", "when", "who", "how many", "how much"]): return "factual" elif any(keyword in question for keyword in ["compare", "difference", "similar"]): return "contrastive" elif any(keyword in question for keyword in ["opinion", "feel", "think", "believe"]): return "opinion" elif any(keyword in question for keyword in ["why", "how", "cause", "reason"]): return "inferential" else: return "factual" # Default to factual if no keywords match # Function to filter documents based on metadata def filter_documents_by_metadata(query, documents): query_terms = query.lower().split() def doc_matches_query(doc): metadata = doc.metadata for term in query_terms: if any(term in str(value).lower() for key, value in metadata.items() if isinstance(value, str)): return True return False filtered_documents = [doc for doc in documents if doc_matches_query(doc)] return filtered_documents # Retrieve and generate zero-shot answers def retrieve_and_generate_zero_shot_answers(query, vectorstore, question_type): prompt_template = get_zero_shot_prompt(question_type) qa_chain = RetrievalQA.from_chain_type( llm=llm, chain_type="stuff", retriever=vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 3}), return_source_documents=True, chain_type_kwargs={"prompt": PromptTemplate(template=prompt_template, input_variables=["context", "question"])} ) response = qa_chain.invoke({"query": query}) if not response['result'].strip(): # Check if the result is empty or only whitespace return "Sorry, I don't know.", [], [] sources = [Document(page_content=doc.page_content, metadata=doc.metadata) for doc in response['source_documents']] contexts = [doc.page_content for doc in sources] return response['result'], sources, contexts # Retrieve and generate few-shot answers def retrieve_and_generate_few_shot_answers(query, vectorstore, question_type): prompt_template = get_few_shot_prompt(question_type) qa_chain = RetrievalQA.from_chain_type( llm=llm, chain_type="stuff", retriever=vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 3}), return_source_documents=True, chain_type_kwargs={"prompt": PromptTemplate(template=prompt_template, input_variables=["context", "question"])} ) response = qa_chain.invoke({"query": query}) if not response['result'].strip(): # Check if the result is empty or only whitespace return "Sorry, I don't know.", [], [] sources = [Document(page_content=doc.page_content, metadata=doc.metadata) for doc in response['source_documents']] contexts = [doc.page_content for doc in sources] return response['result'], sources, contexts # Function to generate answer using selected LLM def generate_answer_with_llm(query, selected_model, question_type=None): model = load_model(selected_model) prompt_template_zero_shot = get_zero_shot_prompt(question_type) prompt_template_few_shot = get_few_shot_prompt(question_type) context = "This is a placeholder context. Replace with actual context if available." # Replace with actual context if available formatted_query_zero_shot = prompt_template_zero_shot.format(context=context, question=query) formatted_query_few_shot = prompt_template_few_shot.format(context=context, question=query) response_zero_shot = model.generate(prompts=[formatted_query_zero_shot]) response_few_shot = model.generate(prompts=[formatted_query_few_shot]) return response_zero_shot.generations[0][0].text, response_few_shot.generations[0][0].text # Handle user query or file paths def handle_user_query_or_urls(query_mode, input_text, question_type, selected_model=None): if query_mode == "external data": #st.write(input_text) file_paths = [line.strip() for line in input_text.split('\n') if line.strip()] #st.write(file_paths) external_data = #st.write(external_data) if not external_data: raise ValueError("No valid documents found from the provided file paths.") vectorstore, _ = initialize_embeddings(external_data) elif query_mode == "specific document": documents = [Document(page_content=input_text, metadata={"source": "user"})] vectorstore, _ = initialize_embeddings(documents) elif query_mode == "metadata": filtered_documents = filter_documents_by_metadata(input_text, all_documents) vectorstore, split_documents = initialize_embeddings(filtered_documents) else: # entire dataset vectorstore = vectorstore_local # Use precomputed vectorstore for the entire dataset if selected_model: zero_shot_answer, few_shot_answer = generate_answer_with_llm(input_text, selected_model, question_type=question_type) zero_shot_sources = few_shot_sources = [] zero_shot_contexts = few_shot_contexts = [] else: zero_shot_answer, zero_shot_sources, zero_shot_contexts = retrieve_and_generate_zero_shot_answers(input_text, vectorstore, question_type) few_shot_answer, few_shot_sources, few_shot_contexts = retrieve_and_generate_few_shot_answers(input_text, vectorstore, question_type) return zero_shot_answer, zero_shot_sources, zero_shot_contexts, few_shot_answer, few_shot_sources, few_shot_contexts def process_csv_file(file_path, vectorstore, save_path, selected_model=None, query_mode="entire dataset"): try: # Detect file encoding with open(file_path, 'rb') as f: result = chardet.detect( encoding = result['encoding'] df = pd.read_csv(file_path, encoding=encoding) except Exception as e: st.error(f"An error occurred while reading the CSV file: {e}") return question_types = [] zero_shot_answers = [] few_shot_answers = [] zero_shot_sources_list = [] few_shot_sources_list = [] zero_shot_contexts_list = [] few_shot_contexts_list = [] for idx, row in df.iterrows(): question = row['questions'] ground_truth = row.get('answers', None) # Assuming there is an 'answers' column for ground truth question_type = identify_question_type(question) question_types.append(question_type) if query_mode == "specific document": specific_document = row['specific document'] if not specific_document: st.error(f"Content for the specific document is missing in the CSV file for question: {question}") return # Exit function if content is missing for any specific document combined_input = specific_document + " " + question zero_shot_answer, zero_shot_sources, zero_shot_contexts, few_shot_answer, few_shot_sources, few_shot_contexts = handle_user_query_or_urls(query_mode, combined_input, question_type, selected_model) else: zero_shot_answer, zero_shot_sources, zero_shot_contexts, few_shot_answer, few_shot_sources, few_shot_contexts = handle_user_query_or_urls(query_mode, question, question_type, selected_model) zero_shot_answers.append(zero_shot_answer) few_shot_answers.append(few_shot_answer) zero_shot_sources_list.append("; ".join([f"source: {doc.metadata.get('source')}, page: {doc.metadata.get('page')}, title: {doc.metadata.get('title')}, author: {doc.metadata.get('author')}, date_published: {doc.metadata.get('date_published')}, chunk: {doc.metadata.get('chunk')}" for doc in zero_shot_sources])) few_shot_sources_list.append("; ".join([f"source: {doc.metadata.get('source')}, page: {doc.metadata.get('page')}, title: {doc.metadata.get('title')}, author: {doc.metadata.get('author')}, date_published: {doc.metadata.get('date_published')}, chunk: {doc.metadata.get('chunk')}" for doc in few_shot_sources])) zero_shot_contexts_list.append(" ".join(zero_shot_contexts)) few_shot_contexts_list.append(" ".join(few_shot_contexts)) result_df = pd.DataFrame({ 'questions': df['questions'], 'question_type': question_types, 'zero_shot_answers': zero_shot_answers, 'zero_shot_sources': zero_shot_sources_list, 'zero_shot_contexts': zero_shot_contexts_list, 'few_shot_answers': few_shot_answers, 'few_shot_sources': few_shot_sources_list, 'few_shot_contexts': few_shot_contexts_list }) try: if save_path: result_df.to_csv(save_path, index=False, encoding='utf-8') st.success(f"CSV file processed and saved to {save_path} successfully.") else: result_df.to_csv(file_path, index=False, encoding='utf-8') st.success("CSV file processed and updated successfully.") csv = result_df.to_csv(index=False, encoding='utf-8') b64 = base64.b64encode(csv.encode()).decode() # some strings <-> bytes conversions necessary here href = f'Download processed CSV file' st.markdown(href, unsafe_allow_html=True) except Exception as e: st.error(f"An error occurred while saving the CSV file: {e}") def format_source_document(doc): source_html = f"""

Source: {doc.metadata.get('source')}

Page: {doc.metadata.get('page', 'None')}

Title: {doc.metadata.get('title', 'None')}

Author: {doc.metadata.get('author', 'None')}

Date Published: {doc.metadata.get('date_published', 'None')}

Venue: {doc.metadata.get('venue', 'None')}

Chunk: {doc.metadata.get('chunk', 'None')}

""" return source_html # Create Streamlit interface def main(): st.title("Cybersecurity QA System") st.write("Ask any questions related to cybersecurity, including trends, attack techniques, and investigative methods.") # Step 1: Choose between RAG and LLM approach = st.selectbox("Select approach:", ["Retrieval Augmentation Generation (RAG)", "Large Language Model (LLM)"]) # Step 2: Choose single query or bulk queries query_type = st.selectbox("Select query type:", ["Single query", "Bulk queries (CSV file)"]) if approach == "Retrieval Augmentation Generation (RAG)": if query_type == "Single query": query_mode = st.selectbox("Select query mode:", ["entire dataset", "specific document", "metadata", "external data"]) question_type = st.selectbox("Select question type:", ["factual", "contrastive", "opinion", "inferential"]) if query_mode == "entire dataset": input_text = st.text_area("Enter your question to ask it from entire dataset") elif query_mode == "metadata": st.write("Enter query with metadata filters (e.g., 'title: Cybersecurity, author: John Doe, date: 2023'):") input_text = st.text_area("Enter your question with specific metadata:") elif query_mode == "specific document": st.write("Enter content from a specific document along with question") input_text = st.text_area("Enter your question from entered content:") elif query_mode == "external data": st.write("Enter external URLs/file paths along with question") input_text = st.text_area("Enter your question with URLs/file paths:") if st.button("Get Answer"): try: zero_shot_answer, zero_shot_sources, zero_shot_contexts, few_shot_answer, few_shot_sources, few_shot_contexts = handle_user_query_or_urls(query_mode, input_text, question_type) st.write(f"**Question Type:** {question_type.capitalize()}") st.write("**Zero-Shot Answer:**") st.markdown(f"
", unsafe_allow_html=True) st.write("**Zero-Shot Context:**") st.markdown(f"
{' '.join(zero_shot_contexts)}
", unsafe_allow_html=True) st.write("**Zero-Shot Source Documents:**") for doc in zero_shot_sources: st.markdown(format_source_document(doc), unsafe_allow_html=True) st.write("**Few-Shot Answer:**") st.markdown(f"
", unsafe_allow_html=True) st.write("**Few-Shot Context:**") st.markdown(f"
{' '.join(few_shot_contexts)}
", unsafe_allow_html=True) st.write("**Few-Shot Source Documents:**") for doc in few_shot_sources: st.markdown(format_source_document(doc), unsafe_allow_html=True) except Exception as e: st.error(f"An error occurred: {e}") else: # Bulk queries query_mode_csv = st.selectbox("Select query mode for CSV processing:", ["entire dataset", "specific document", "metadata", "external data"]) uploaded_file = st.file_uploader("Choose a CSV file with questions", type="csv") if st.button("Process CSV"): try: file_path = with open(file_path, "wb") as f: f.write(uploaded_file.getbuffer()) save_path = st.text_input("Enter the path to save the processed CSV results file:") process_csv_file(file_path, vectorstore_local, save_path, query_mode=query_mode_csv) except Exception as e: st.error(f"An error occurred while processing the CSV file: {e}") else: # Large Language Model (LLM) selected_model = st.selectbox("Select Open-Source LLM:", ["None"] + list(open_source_models.keys())) if query_type == "Single query": question_type = st.selectbox("Select question type:", ["factual", "contrastive", "opinion", "inferential"]) input_text = st.text_area("Enter your question:") if st.button("Get Answer"): try: selected_model_repo = open_source_models[selected_model] if selected_model != "None" else None zero_shot_answer, few_shot_answer = generate_answer_with_llm(input_text, selected_model_repo, question_type=question_type) st.write(f"**Question Type:** {question_type.capitalize()}") st.write("**Zero-Shot Answer:**") st.markdown(f"
", unsafe_allow_html=True) st.write("**Few-Shot Answer:**") st.markdown(f"
", unsafe_allow_html=True) except Exception as e: st.error(f"An error occurred: {e}") else: # Bulk queries uploaded_file = st.file_uploader("Choose a CSV file with questions", type="csv") if st.button("Process CSV"): try: file_path = with open(file_path, "wb") as f: f.write(uploaded_file.getbuffer()) save_path = st.text_input("Enter the path to save the processed CSV results file:") process_csv_file(file_path, vectorstore_local, save_path, selected_model=open_source_models[selected_model]) except Exception as e: st.error(f"An error occurred while processing the CSV file: {e}") if __name__ == '__main__': main()