Spaces:
Sleeping
Sleeping
import streamlit as st | |
from streamlit_option_menu import option_menu | |
import pandas as pd | |
import os | |
from google.oauth2 import service_account | |
from googleapiclient.discovery import build | |
from streamlit_chat import message as st_message | |
import plotly.express as px | |
import re | |
import streamlit as st | |
import gspread | |
from google.oauth2.service_account import Credentials | |
import warnings | |
import time | |
from langchain.schema import HumanMessage, SystemMessage, AIMessage | |
from langchain.chat_models import ChatOpenAI | |
from langchain.memory import ConversationBufferWindowMemory | |
from langchain.prompts import PromptTemplate | |
from langchain_community.utilities import GoogleSerperAPIWrapper | |
from langchain.agents import initialize_agent, Tool | |
from langchain.agents import AgentType | |
from langchain_groq import ChatGroq | |
import numpy as np | |
import gspread | |
from dotenv import load_dotenv | |
warnings.filterwarnings("ignore", category=DeprecationWarning) | |
scopes = ["https://www.googleapis.com/auth/spreadsheets"] | |
creds = Credentials.from_service_account_file("credentials.json", scopes=scopes) | |
client = gspread.authorize(creds) | |
#environment | |
load_dotenv() | |
GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
SERPER_API_KEY = os.getenv("SERPER_API_KEY") | |
llm = ChatGroq(model="llama-3.1-70b-versatile") | |
# Initialize Google Serper API wrapper | |
search = GoogleSerperAPIWrapper(serp_api_key=SERPER_API_KEY) | |
# Create the system and human messages for dynamic query processing | |
system_message_content = """ | |
You are a helpful assistant designed to answer questions by extracting information from the web and external sources. Your goal is to provide the most relevant, concise, and accurate response to user queries. | |
""" | |
# Define the tool list | |
tools = [ | |
Tool( | |
name="Web Search", | |
func=search.run, | |
description="Searches the web for information related to the query" | |
) | |
] | |
# Initialize the agent with the tools | |
agent = initialize_agent( | |
tools, | |
ChatGroq(api_key=GROQ_API_KEY, model="llama-3.1-70b-versatile"), | |
agent_type=AgentType.SELF_ASK_WITH_SEARCH, | |
verbose=True, | |
memory=ConversationBufferWindowMemory(k=5, return_messages=True) | |
) | |
# Function to perform the web search and get results | |
def perform_web_search(query, max_retries=3, delay=2): | |
retries = 0 | |
while retries < max_retries: | |
try: | |
search_results = search.run(query) | |
return search_results | |
except Exception as e: | |
retries += 1 | |
st.warning(f"Web search failed for query '{query}'. Retrying ({retries}/{max_retries})...") | |
time.sleep(delay) | |
st.error(f"Failed to perform web search for query '{query}' after {max_retries} retries.") | |
return "NaN" | |
def update_google_sheet(sheet_id, range_name, data): | |
try: | |
# Define the Google Sheets API scope | |
scopes = ["https://www.googleapis.com/auth/spreadsheets"] | |
creds = Credentials.from_service_account_file("credentials.json", scopes=scopes) | |
client = gspread.authorize(creds) | |
# Open the Google Sheet and specify the worksheet | |
sheet = client.open_by_key(sheet_id).worksheet(range_name.split("!")[0]) | |
# Prepare data for update | |
data_to_update = [data.columns.tolist()] + data.values.tolist() | |
# Clear the existing content in the specified range and update it with new data | |
sheet.clear() | |
sheet.update(range_name, data_to_update) | |
st.success("Data successfully updated in the Google Sheet!") | |
except Exception as e: | |
st.error(f"Error updating Google Sheet: {e}") | |
# Function to get LLM response for dynamic queries | |
def get_llm_response(entity, query, web_results): | |
prompt = f""" | |
Extract relevant {query} (e.g., email, phone number) from the following web results for the entity: {entity}. | |
Web Results: {web_results} | |
""" | |
human_message_content = f""" | |
Entity: {entity} | |
Query: {query} | |
Web Results: {web_results} | |
""" | |
try: | |
response = agent.invoke([system_message_content, human_message_content], handle_parsing_errors=True) | |
extracted_info = response.get("output", "Information not available").strip() | |
# Clean up irrelevant parts of the response | |
cleaned_info = re.sub(r"(Thought:|Action:)[^A-Za-z0-9]*", "", extracted_info).strip() | |
return cleaned_info | |
except Exception as e: | |
return "NaN" | |
# Retry logic for multiple web searches if necessary | |
def refine_answer_with_searches(entity, query, max_retries=3): | |
search_results = perform_web_search(query.format(entity=entity)) | |
extracted_answer = get_llm_response(entity, query, search_results) | |
if len(extracted_answer.split()) <= 2 or "not available" in extracted_answer.lower(): | |
search_results = perform_web_search(query.format(entity=entity)) | |
extracted_answer = get_llm_response(entity, query, search_results) | |
return extracted_answer, search_results | |
# Setup Google Sheets data fetch | |
def get_google_sheet_data(sheet_id, range_name): | |
creds = service_account.Credentials.from_service_account_info( | |
st.secrets["gcp_service_account"], | |
scopes=["https://www.googleapis.com/auth/spreadsheets.readonly"], | |
) | |
service = build("sheets", "v4", credentials=creds) | |
sheet = service.spreadsheets() | |
result = sheet.values().get(spreadsheetId=sheet_id, range=range_name).execute() | |
values = result.get("values", []) | |
return pd.DataFrame(values[1:], columns=values[0]) | |
#streamlitconfiguration | |
st.set_page_config(page_title="DataScribe", page_icon=":notebook_with_decorative_cover:", layout="wide") | |
with st.sidebar: | |
selected = option_menu( | |
"DataScribe Menu", | |
["Home", "Upload Data", "Define Query", "Extract Information", "View & Download"], | |
icons=["house", "cloud-upload", "gear", "search", "table"], | |
menu_icon="cast", | |
default_index=0 | |
) | |
if selected == "Home": | |
st.markdown(""" | |
<h1 style="text-align:center; color:#4CAF50; font-size: 40px;">🚀 Welcome to DataScribe</h1> | |
<p style="text-align:center; font-size: 18px; color:#333;">An AI-powered information extraction tool to streamline data retrieval and analysis.</p> | |
""", unsafe_allow_html=True) | |
st.markdown("""---""") | |
def feature_card(title, description, icon, page): | |
col1, col2 = st.columns([1, 4]) | |
with col1: | |
st.markdown(f"<div style='font-size: 40px; text-align:center;'>{icon}</div>", unsafe_allow_html=True) | |
with col2: | |
if st.button(f"{title}", key=title, help=description): | |
st.session_state.selected_page = page | |
st.markdown(f"<p style='font-size: 14px; color:#555;'>{description}</p>", unsafe_allow_html=True) | |
col1, col2 = st.columns([1, 1]) | |
with col1: | |
feature_card( | |
title="Upload Data", | |
description="Upload data from CSV or Google Sheets to get started with your extraction.", | |
icon="📄", | |
page="Upload Data" | |
) | |
with col2: | |
feature_card( | |
title="Define Custom Queries", | |
description="Set custom search queries for each entity in your dataset for specific information retrieval.", | |
icon="🔍", | |
page="Define Query" | |
) | |
col1, col2 = st.columns([1, 1]) | |
with col1: | |
feature_card( | |
title="Run Automated Searches", | |
description="Execute automated web searches and extract relevant information using an AI-powered agent.", | |
icon="🤖", | |
page="Extract Information" | |
) | |
with col2: | |
feature_card( | |
title="View & Download Results", | |
description="View extracted data in a structured format and download as a CSV or update Google Sheets.", | |
icon="📊", | |
page="View & Download" | |
) | |
elif selected == "Upload Data": | |
st.header("Upload or Connect Your Data") | |
data_source = st.radio("Choose data source:", ["CSV Files", "Google Sheets"]) | |
if data_source == "CSV Files": | |
if "data" in st.session_state: | |
st.success("Data uploaded successfully! Here is a preview:") | |
st.dataframe(st.session_state["data"].head(10)) # Display only the first 10 rows for a cleaner view | |
else: | |
uploaded_files = st.file_uploader("Upload your CSV files", type=["csv"], accept_multiple_files=True) | |
if uploaded_files is not None: | |
dfs = [] | |
for uploaded_file in uploaded_files: | |
try: | |
df = pd.read_csv(uploaded_file) | |
dfs.append(df) | |
except Exception as e: | |
st.error(f"Error reading file {uploaded_file.name}: {e}") | |
if dfs: | |
full_data = pd.concat(dfs, ignore_index=True) | |
st.session_state["data"] = full_data | |
st.success("Data uploaded successfully! Here is a preview:") | |
st.dataframe(full_data.head(10)) # Show preview of first 10 rows | |
else: | |
st.warning("No valid data found in the uploaded files.") | |
if st.button("Clear Data"): | |
del st.session_state["data"] | |
st.success("Data has been cleared!") | |
elif data_source == "Google Sheets": | |
sheet_id = st.text_input("Enter Google Sheet ID") | |
range_name = st.text_input("Enter the data range (e.g., Sheet1!A1:C100)") | |
if sheet_id and range_name: | |
if st.button("Fetch Data"): | |
with st.spinner("Fetching data from Google Sheets..."): | |
try: | |
data = get_google_sheet_data(sheet_id, range_name) | |
st.session_state["data"] = data | |
st.success("Data fetched successfully! Here is a preview:") | |
st.dataframe(data.head(10)) # Show preview of first 10 rows | |
except Exception as e: | |
st.error(f"Error fetching data: {e}") | |
else: | |
st.warning("Please enter both Sheet ID and Range name before fetching data.") | |
elif selected == "Define Query": | |
st.header("Define Your Custom Query") | |
if "data" not in st.session_state or st.session_state["data"] is None: | |
st.warning("Please upload data first! Use the 'Upload Data' section to upload your data.") | |
else: | |
column = st.selectbox( | |
"Select entity column", | |
st.session_state["data"].columns, | |
help="Select the column that contains the entities for which you want to define queries." | |
) | |
st.markdown(""" | |
<style> | |
div[data-baseweb="select"] div[data-id="select"] {{ | |
background-color: #f0f8ff; | |
}} | |
</style> | |
""", unsafe_allow_html=True) | |
st.subheader("Define Fields to Extract") | |
num_fields = st.number_input( | |
"Number of fields to extract", | |
min_value=1, | |
value=1, | |
step=1, | |
help="Specify how many fields you want to extract from each entity." | |
) | |
fields = [] | |
for i in range(num_fields): | |
field = st.text_input( | |
f"Field {i+1} name", | |
key=f"field_{i}", | |
placeholder=f"Enter field name for {i+1}", | |
help="Name the field you want to extract from the entity." | |
) | |
if field: | |
fields.append(field) | |
if fields: | |
st.subheader("Query Template") | |
query_template = st.text_area( | |
"Enter query template (Use '{entity}' to represent each entity)", | |
value=f"Find the {', '.join(fields)} for {{entity}}", | |
help="You can use {entity} as a placeholder to represent each entity in the query." | |
) | |
if "{entity}" in query_template: | |
example_entity = str(st.session_state["data"][column].iloc[0]) | |
example_query = query_template.replace("{entity}", example_entity) | |
st.write("### Example Query Preview") | |
st.code(example_query) | |
if st.button("Save Query Configuration"): | |
if not fields: | |
st.error("Please define at least one field to extract.") | |
elif not query_template: | |
st.error("Please enter a query template.") | |
else: | |
st.session_state["column_selection"] = column | |
st.session_state["query_template"] = query_template | |
st.session_state["extraction_fields"] = fields | |
st.success("Query configuration saved successfully!") | |
elif selected == "Extract Information": | |
st.header("Extract Information") | |
if "query_template" in st.session_state and "data" in st.session_state: | |
st.write("### Using Query Template:") | |
st.code(st.session_state["query_template"]) | |
column_selection = st.session_state["column_selection"] | |
entities_column = st.session_state["data"][column_selection] | |
st.write("### Selected Entity Column:") | |
st.dataframe(entities_column) | |
if st.button("Start Extraction"): | |
st.write("Data extraction is in progress. This may take a few moments.") | |
# Custom styled progress bar | |
progress_bar = st.progress(0) | |
try: | |
results = [] | |
for i, selected_entity in enumerate(entities_column): | |
user_query = st.session_state["query_template"].replace("{entity}", str(selected_entity)) | |
final_answer, search_results = refine_answer_with_searches(selected_entity, user_query) | |
results.append({ | |
"Entity": selected_entity, | |
"Extracted Information": final_answer, | |
"Search Results": search_results | |
}) | |
# Update progress bar with a smooth and cute animation | |
progress_bar.progress(int((i + 1) / len(entities_column) * 100)) | |
st.session_state["results"] = results | |
st.write("### Extracted Information") | |
for result in results: | |
st.write(f"**Entity:** {result['Entity']}") | |
st.write(f"**Extracted Information:** {result['Extracted Information']}") | |
st.write("### Web Results:") | |
for result in results: | |
st.write(result["Search Results"]) | |
except Exception as e: | |
st.error(f"An error occurred while extracting information: {e}") | |
else: | |
st.warning("Please upload your data and define the query template.") | |
elif selected == "View & Download": | |
st.header("View & Download Results") | |
if "results" in st.session_state: | |
results_df = pd.DataFrame(st.session_state["results"]) | |
st.write("### Results Preview") | |
# Display results with some background color for the relevant columns | |
st.dataframe(results_df.style.applymap(lambda val: 'background-color: #d3f4ff' if isinstance(val, str) else '', subset=["Extracted Information", "Search Results"])) | |
download_option = st.selectbox( | |
"Select data to download:", | |
["All Results", "Extracted Information", "Web Results"] | |
) | |
if download_option == "All Results": | |
data_to_download = results_df | |
elif download_option == "Extracted Information": | |
data_to_download = results_df[["Entity", "Extracted Information"]] | |
elif download_option == "Web Results": | |
data_to_download = results_df[["Entity", "Search Results"]] | |
st.download_button( | |
label=f"Download {download_option} as CSV", | |
data=data_to_download.to_csv(index=False), | |
file_name=f"{download_option.lower().replace(' ', '_')}.csv", | |
mime="text/csv" | |
) | |
# To ensure the inputs and button are persistent, store their values in session_state | |
if 'sheet_id' not in st.session_state: | |
st.session_state.sheet_id = '' | |
if 'range_name' not in st.session_state: | |
st.session_state.range_name = '' | |
sheet_id = st.text_input("Enter Google Sheet ID", value=st.session_state.sheet_id) | |
range_name = st.text_input("Enter Range (e.g., 'Sheet1!A1')", value=st.session_state.range_name) | |
if sheet_id and range_name: | |
st.session_state.sheet_id = sheet_id | |
st.session_state.range_name = range_name | |
# Define data_to_update to update the Google Sheet | |
data_to_update = [results_df.columns.tolist()] + results_df.values.tolist() | |
# Update Google Sheets button | |
if st.button("Update Google Sheet"): | |
try: | |
if '!' not in range_name: | |
st.error("Invalid range format. Please use the format 'SheetName!Range'.") | |
else: | |
sheet_name, cell_range = range_name.split('!', 1) | |
sheet = client.open_by_key(sheet_id).worksheet(sheet_name) | |
sheet.clear() # Clear the existing data before updating | |
sheet.update(f"{cell_range}", data_to_update) # Update the data to the specified range | |
st.success("Data updated in the Google Sheet!") | |
except Exception as e: | |
st.error(f"Error updating Google Sheet: {e}") | |
else: | |
st.warning("Please enter both the Sheet ID and Range name before updating.") | |
else: | |
st.warning("No results available to view. Please run the extraction process.") | |