File size: 8,941 Bytes
d4deb6e 8e30343 d10b568 8e30343 b58e8f7 d4deb6e 8e30343 9d603cc 8e30343 b58e8f7 8e30343 b58e8f7 8e30343 d4deb6e 8e30343 d4deb6e 8e30343 d4deb6e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
import streamlit as st
import subprocess
import os
import re
import time
import base64
from patterns import count_patterns, list_patterns, pdf_request_patterns, greetings, farewell # Import patterns
from google.cloud import storage
from tempfile import NamedTemporaryFile
import json
# Base GCS path and local download path
BASE_GCS_PATH = "gs://fynd-assets-private/documents/daytrader/"
LOCAL_DOWNLOAD_PATH = "/Users/ninadmandavkar/Desktop/daytrader/08-2024/"
def fetch_pdf_from_gcs(pdf_name):
gcs_search_path = f"{BASE_GCS_PATH}**/*{pdf_name}*.pdf"
result = subprocess.run(
["gsutil", "-m", "cp", "-r", gcs_search_path, LOCAL_DOWNLOAD_PATH],
capture_output=True,
text=True
)
return result
def count_invoices_in_month(month_year):
gcs_search_path = f"{BASE_GCS_PATH}PDFs/**/*{month_year}*.pdf"
result = subprocess.run(
["gsutil", "ls", gcs_search_path],
capture_output=True,
text=True
)
if result.returncode == 0:
files = [os.path.basename(line)[:-4] for line in result.stdout.strip().split("\n") if line] # Remove .pdf extension
return len(files), files
else:
return None, []
def extract_pdf_id(text):
match = re.search(r'([A-Z]{2}-[A-Z]-[A-Z0-9]+-FY\d{2})', text)
return match.group(0) if match else None
def month_to_str(month):
month_map = {
"january": "01", "february": "02", "march": "03", "april": "04",
"may": "05", "june": "06", "july": "07", "august": "08",
"september": "09", "october": "10", "november": "11", "december": "12"
}
if month.isdigit() and 1 <= int(month) <= 12:
return f"{int(month):02d}-2024" # Assume the year is 2024 for this context
return month_map.get(month.lower(), None)
def typing_effect(text):
typed_text_placeholder = st.empty()
styled_text = f"""
<span style="background-image: linear-gradient(to right, #800000, #ff0000);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;">
{text}
</span>
"""
for i in range(len(text) + 1):
typed_text_placeholder.markdown(styled_text.replace(text, text[:i]), unsafe_allow_html=True)
time.sleep(0.018)
typed_text_placeholder.markdown(styled_text, unsafe_allow_html=True)
def chatbot_response(user_input):
lower_input = user_input.lower()
if any(greet in lower_input for greet in greetings):
return "Hello. Fynder here. How can I help you today?"
elif any(fare in lower_input for fare in farewell):
return "Goodbye! Let me know if you need help again."
# Check for PDF requests using imported patterns
for pattern in pdf_request_patterns:
pdf_request_match = re.search(pattern, lower_input)
if pdf_request_match:
pdf_id = pdf_request_match.group(1)
return f"Looking for PDF for invoice '{pdf_id}'. Please wait...", pdf_id
# Check for count requests using imported patterns
for pattern in count_patterns:
month_year_match = re.search(pattern, lower_input)
if month_year_match:
month_year = month_year_match.group(1)
if len(month_year.split()) == 2:
month_str = month_to_str(month_year.split()[0])
if month_str:
month_year = f"{month_str}-{month_year.split()[1]}"
count, _ = count_invoices_in_month(month_year)
if count is not None:
return f"There are {count} invoices generated in {month_year}."
else:
return "I encountered an error while fetching the invoice count. Please try again later."
# Check for list requests using imported patterns
for pattern in list_patterns:
list_match = re.search(pattern, lower_input)
if list_match:
month_year = list_match.group(1)
if len(month_year.split()) == 2:
month_str = month_to_str(month_year.split()[0])
if month_str:
month_year = f"{month_str}-{month_year.split()[1]}"
_, files = count_invoices_in_month(month_year)
if files:
return [f"**{file}**" for file in files] # Use markdown bold for emphasis
else:
return "No invoices found for that month."
pdf_id = extract_pdf_id(user_input)
if pdf_id:
if "when" in lower_input and "created" in lower_input:
return f"Checking creation date for '{pdf_id}'. Please wait...", pdf_id, True
else:
return f"Looking for '{pdf_id}' in GCS fynd prod 393805 bucket. Please wait...", pdf_id
return "I didn't quite understand that ;("
def download_pdf(file_path):
"""Reads a PDF file and returns its content in Base64 format."""
with open(file_path, "rb") as f:
pdf_data = f.read()
return base64.b64encode(pdf_data).decode('utf-8') # Encode to Base64 and decode to string
def main():
# st.markdown(
# """
# <h1 style="
# background-image: linear-gradient(to right, #800000, #ff0000);
# -webkit-background-clip: text;
# -webkit-text-fill-color: transparent;
# font-size: 3rem; /* Adjust size as needed */
# text-align: center; /* Center the title */
# ">
# Fynder
# </h1>
# """,
# unsafe_allow_html=True
# )
# Upload JSON credentials
uploaded_file = st.file_uploader("Upload your fynd_prod.json file", type="json")
if uploaded_file is not None:
# Save the file temporarily
with NamedTemporaryFile(delete=False, suffix=".json") as temp_file:
temp_file.write(uploaded_file.read())
json_path = temp_file.name
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/Users/ninadmandavkar/Desktop/Fynd/alerter/fynd-prod.json"
with open(json_path) as f:
credentials_info = json.load(f)
st.write("Loaded credentials for:", credentials_info.get("client_email"))
st.markdown(
"""
<h1 style="
background-image: linear-gradient(to right, #800000, #ff0000);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
font-size: 3rem; /* Adjust size as needed */
text-align: center; /* Center the title */
">
Fynder
</h1>
""",
unsafe_allow_html=True
)
user_input = st.text_input("", value="", help="Type your prompt here")
if user_input:
response = chatbot_response(user_input)
if isinstance(response, tuple):
bot_response, pdf_name = response
typing_effect(bot_response)
result = fetch_pdf_from_gcs(pdf_name)
if result.returncode == 0:
downloaded_files = [f for f in os.listdir(LOCAL_DOWNLOAD_PATH) if pdf_name in f and f.endswith('.pdf')]
if downloaded_files:
for pdf_file in downloaded_files:
file_path = os.path.join(LOCAL_DOWNLOAD_PATH, pdf_file)
pdf_data = download_pdf(file_path)
# Create a custom download button with gradient styling
download_button_html = f"""
<a href="data:application/pdf;base64,{pdf_data}" download="{pdf_file}"
style="
display: inline-block;
padding: 10px 20px;
color: white;
text-decoration: none;
border-radius: 20px;
background-image: linear-gradient(to right, #800000, #ff0000);
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
">
Download {pdf_file}
</a>
"""
st.markdown(download_button_html, unsafe_allow_html=True)
else:
typing_effect("No files matching that name were found.")
else:
typing_effect("There was an error fetching the PDF. Please check the name and try again.")
st.write(result.stderr)
elif isinstance(response, list): # When the response is a list of files
for pdf_file in response:
typing_effect(pdf_file) # Apply typing effect to each file
else:
typing_effect(response)
else:
st.markdown("")
if __name__ == "__main__":
main()
|