File size: 8,941 Bytes
d4deb6e
 
 
 
 
 
8e30343
d10b568
8e30343
b58e8f7
d4deb6e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8e30343
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9d603cc
8e30343
b58e8f7
 
 
 
 
8e30343
 
 
 
 
 
 
 
 
 
 
 
b58e8f7
8e30343
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d4deb6e
8e30343
 
d4deb6e
8e30343
 
 
 
 
 
 
d4deb6e
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import streamlit as st
import subprocess
import os
import re
import time
import base64
from patterns import count_patterns, list_patterns, pdf_request_patterns, greetings, farewell  # Import patterns
from google.cloud import storage
from tempfile import NamedTemporaryFile
import json

# Base GCS path and local download path
BASE_GCS_PATH = "gs://fynd-assets-private/documents/daytrader/"
LOCAL_DOWNLOAD_PATH = "/Users/ninadmandavkar/Desktop/daytrader/08-2024/"

def fetch_pdf_from_gcs(pdf_name):
    gcs_search_path = f"{BASE_GCS_PATH}**/*{pdf_name}*.pdf"
    result = subprocess.run(
        ["gsutil", "-m", "cp", "-r", gcs_search_path, LOCAL_DOWNLOAD_PATH],
        capture_output=True,
        text=True
    )
    return result

def count_invoices_in_month(month_year):
    gcs_search_path = f"{BASE_GCS_PATH}PDFs/**/*{month_year}*.pdf"
    result = subprocess.run(
        ["gsutil", "ls", gcs_search_path],
        capture_output=True,
        text=True
    )
    if result.returncode == 0:
        files = [os.path.basename(line)[:-4] for line in result.stdout.strip().split("\n") if line]  # Remove .pdf extension
        return len(files), files
    else:
        return None, []

def extract_pdf_id(text):
    match = re.search(r'([A-Z]{2}-[A-Z]-[A-Z0-9]+-FY\d{2})', text)
    return match.group(0) if match else None

def month_to_str(month):
    month_map = {
        "january": "01", "february": "02", "march": "03", "april": "04",
        "may": "05", "june": "06", "july": "07", "august": "08",
        "september": "09", "october": "10", "november": "11", "december": "12"
    }
    if month.isdigit() and 1 <= int(month) <= 12:
        return f"{int(month):02d}-2024"  # Assume the year is 2024 for this context
    return month_map.get(month.lower(), None)

def typing_effect(text):
    typed_text_placeholder = st.empty()
    styled_text = f"""
    <span style="background-image: linear-gradient(to right, #800000, #ff0000); 
                -webkit-background-clip: text; 
                -webkit-text-fill-color: transparent;">
        {text}
    </span>
    """
    for i in range(len(text) + 1):
        typed_text_placeholder.markdown(styled_text.replace(text, text[:i]), unsafe_allow_html=True)
        time.sleep(0.018)
    typed_text_placeholder.markdown(styled_text, unsafe_allow_html=True)

def chatbot_response(user_input):
    lower_input = user_input.lower()

    if any(greet in lower_input for greet in greetings):
        return "Hello. Fynder here. How can I help you today?"

    elif any(fare in lower_input for fare in farewell):
        return "Goodbye! Let me know if you need help again."

    # Check for PDF requests using imported patterns
    for pattern in pdf_request_patterns:
        pdf_request_match = re.search(pattern, lower_input)
        if pdf_request_match:
            pdf_id = pdf_request_match.group(1)
            return f"Looking for PDF for invoice '{pdf_id}'. Please wait...", pdf_id

    # Check for count requests using imported patterns
    for pattern in count_patterns:
        month_year_match = re.search(pattern, lower_input)
        if month_year_match:
            month_year = month_year_match.group(1)
            if len(month_year.split()) == 2:
                month_str = month_to_str(month_year.split()[0])
                if month_str:
                    month_year = f"{month_str}-{month_year.split()[1]}"
            count, _ = count_invoices_in_month(month_year)
            if count is not None:
                return f"There are {count} invoices generated in {month_year}."
            else:
                return "I encountered an error while fetching the invoice count. Please try again later."

    # Check for list requests using imported patterns
    for pattern in list_patterns:
        list_match = re.search(pattern, lower_input)
        if list_match:
            month_year = list_match.group(1)
            if len(month_year.split()) == 2:
                month_str = month_to_str(month_year.split()[0])
                if month_str:
                    month_year = f"{month_str}-{month_year.split()[1]}"
            _, files = count_invoices_in_month(month_year)
            if files:
                return [f"**{file}**" for file in files]  # Use markdown bold for emphasis
            else:
                return "No invoices found for that month."

    pdf_id = extract_pdf_id(user_input)
    if pdf_id:
        if "when" in lower_input and "created" in lower_input:
            return f"Checking creation date for '{pdf_id}'. Please wait...", pdf_id, True
        else:
            return f"Looking for '{pdf_id}' in GCS fynd prod 393805 bucket. Please wait...", pdf_id

    return "I didn't quite understand that ;("

def download_pdf(file_path):
    """Reads a PDF file and returns its content in Base64 format."""
    with open(file_path, "rb") as f:
        pdf_data = f.read()
    return base64.b64encode(pdf_data).decode('utf-8')  # Encode to Base64 and decode to string

def main():
    # st.markdown(
    # """
    # <h1 style="
    #     background-image: linear-gradient(to right, #800000, #ff0000); 
    #     -webkit-background-clip: text; 
    #     -webkit-text-fill-color: transparent; 
    #     font-size: 3rem;  /* Adjust size as needed */
    #     text-align: center; /* Center the title */
    # ">
    #     Fynder
    # </h1>
    # """, 
    # unsafe_allow_html=True
    # )

    # Upload JSON credentials
    uploaded_file = st.file_uploader("Upload your fynd_prod.json file", type="json")
    
    if uploaded_file is not None:
        # Save the file temporarily
        with NamedTemporaryFile(delete=False, suffix=".json") as temp_file:
            temp_file.write(uploaded_file.read())
            json_path = temp_file.name
        
        os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/Users/ninadmandavkar/Desktop/Fynd/alerter/fynd-prod.json"

        with open(json_path) as f:
            credentials_info = json.load(f)
            st.write("Loaded credentials for:", credentials_info.get("client_email"))


        st.markdown(
            """
            <h1 style="
                background-image: linear-gradient(to right, #800000, #ff0000); 
                -webkit-background-clip: text; 
                -webkit-text-fill-color: transparent; 
                font-size: 3rem;  /* Adjust size as needed */
                text-align: center; /* Center the title */
            ">
                Fynder
            </h1>
            """, 
        unsafe_allow_html=True
        )
        
        
        user_input = st.text_input("", value="", help="Type your prompt here")

        if user_input:
            response = chatbot_response(user_input)

            if isinstance(response, tuple):
                bot_response, pdf_name = response
                typing_effect(bot_response)

                result = fetch_pdf_from_gcs(pdf_name)
                if result.returncode == 0:
                    downloaded_files = [f for f in os.listdir(LOCAL_DOWNLOAD_PATH) if pdf_name in f and f.endswith('.pdf')]
                    
                    if downloaded_files:
                        for pdf_file in downloaded_files:
                            file_path = os.path.join(LOCAL_DOWNLOAD_PATH, pdf_file)
                            pdf_data = download_pdf(file_path)

                            # Create a custom download button with gradient styling
                            download_button_html = f"""
                            <a href="data:application/pdf;base64,{pdf_data}" download="{pdf_file}" 
                            style="
                                    display: inline-block; 
                                    padding: 10px 20px; 
                                    color: white; 
                                    text-decoration: none; 
                                    border-radius: 20px; 
                                    background-image: linear-gradient(to right, #800000, #ff0000); 
                                    box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
                                ">
                                Download {pdf_file}
                            </a>
                            """
                            st.markdown(download_button_html, unsafe_allow_html=True)
                    else:
                        typing_effect("No files matching that name were found.")
                else:
                    typing_effect("There was an error fetching the PDF. Please check the name and try again.")
                    st.write(result.stderr)

            elif isinstance(response, list):  # When the response is a list of files
                for pdf_file in response:
                    typing_effect(pdf_file)  # Apply typing effect to each file
            else:
                typing_effect(response)
    else:
        st.markdown("")

if __name__ == "__main__":
    main()