from langchain_google_genai import ChatGoogleGenerativeAI import pandas as pd import os import io from flask import Flask, request, jsonify from flask_cors import CORS, cross_origin import firebase_admin import logging from firebase_admin import credentials, firestore from dotenv import load_dotenv from pandasai import SmartDatalake from pandasai.responses.response_parser import ResponseParser from langchain.prompts import PromptTemplate from langchain.chains import LLMChain from datetime import datetime import matplotlib.pyplot as plt from statsmodels.tsa.holtwinters import ExponentialSmoothing from prophet import Prophet load_dotenv() app = Flask(__name__) cors = CORS(app) # Initialize Firebase app if not firebase_admin._apps: cred = credentials.Certificate("quant-app-99d09-firebase-adminsdk-6prb1-37f34e1c91.json") firebase_admin.initialize_app(cred) db = firestore.client() class FlaskResponse(ResponseParser): def __init__(self, context) -> None: super().__init__(context) def format_dataframe(self, result): return result['value'].to_html() def format_plot(self, result): try: img_path = result['value'] except ValueError: img_path = str(result['value']) print("ValueError:", img_path) print("response_class_path:", img_path) return img_path def format_other(self, result): return str(result['value']) gemini_api_key = os.getenv('Gemini') llm = ChatGoogleGenerativeAI(api_key=gemini_api_key, model='gemini-1.5-flash-001', temperature=0.1) # Endpoint for handling questions to the bot using transaction data @app.route("/predict", methods=["POST"]) @cross_origin() def bot(): user_id = request.json.get("user_id") user_question = request.json.get("user_question") inventory_ref = db.collection("system_users").document(user_id).collection('inventory') tasks_ref = db.collection("system_users").document(user_id).collection('tasks') transactions_ref = db.collection("system_users").document(user_id).collection('transactions') inventory_list = [doc.to_dict() for doc in inventory_ref.stream()] tasks_list = [doc.to_dict() for doc in tasks_ref.stream()] transactions_list = [doc.to_dict() for doc in transactions_ref.stream()] inventory_df = pd.DataFrame(inventory_list) transactions_df = pd.DataFrame(transactions_list) tasks_df = pd.DataFrame(tasks_list) lake = SmartDatalake([inventory_df, transactions_df, tasks_df], config={"llm": llm, "response_parser": FlaskResponse, "enable_cache": False, "save_logs": False}) response = lake.chat(user_question) print(user_question) return jsonify(str(response)) # Marketing recommendations endpoint @app.route("/mrec", methods=["POST"]) @cross_origin() def marketing_rec(): user_id = request.json.get("user_id") transactions_ref = db.collection("system_users").document(user_id).collection('transactions') transactions_list = [doc.to_dict() for doc in transactions_ref.stream()] transactions_df = pd.DataFrame(transactions_list) prompt = PromptTemplate.from_template('You are a business analyst. Write a brief analysis and marketing tips for a small business using this transactions data {data_frame}') chain = LLMChain(llm=llm, prompt=prompt, verbose=True) response = chain.invoke(input=transactions_df) print(response) return jsonify(str(response['text'])) # Profit/Customer Engagement Prediction endpoint @app.route("/predict_metric", methods=["POST"]) @cross_origin() def predict_metric(): request_data = request.json user_id = request_data.get("user_id") interval = request_data.get("interval", 30) metric_type = request_data.get("metric_type", "Profit") # "Profit" or "Customer Engagement" transactions_ref = db.collection("system_users").document(user_id).collection("transactions") data = [] if metric_type == "Profit": # Fetch both Income and Expense transactions for Profit calculation income_query = transactions_ref.where("transactionType", "==", "Income").stream() expense_query = transactions_ref.where("transactionType", "==", "Expense").stream() income_data = {} expense_data = {} for doc in income_query: transaction = doc.to_dict() date_str = transaction["date"] amount = transaction["amountDue"] income_data[date_str] = income_data.get(date_str, 0) + amount for doc in expense_query: transaction = doc.to_dict() date_str = transaction["date"] amount = transaction["amountDue"] expense_data[date_str] = expense_data.get(date_str, 0) + amount # Calculate net profit for each date for date, income in income_data.items(): expense = expense_data.get(date, 0) data.append({"date": date, "amountDue": income - expense}) elif metric_type == "Customer Engagement": # Use count of Income transactions per day as Customer Engagement income_query = transactions_ref.where("transactionType", "==", "Income").stream() engagement_data = {} for doc in income_query: transaction = doc.to_dict() date_str = transaction["date"] engagement_data[date_str] = engagement_data.get(date_str, 0) + 1 for date, count in engagement_data.items(): data.append({"date": date, "amountDue": count}) # Create DataFrame from the aggregated data df = pd.DataFrame(data) # Ensure 'date' column is datetime df['date'] = pd.to_datetime(df['date']) df['date'] = df['date'].dt.tz_localize(None) # Set 'date' as index df = df.sort_values("date").set_index("date") # Resample daily to ensure regular intervals (fill missing dates) df = df.resample("D").sum().reset_index() df.columns = ["ds", "y"] # ds: date, y: target # Check if there's enough data to train the model if df.shape[0] < 10: return jsonify({"error": "Not enough data for prediction"}) # Initialize and fit the Prophet model model = Prophet(daily_seasonality=True, yearly_seasonality=True) model.fit(df) # DataFrame for future predictions future_dates = model.make_future_dataframe(periods=interval) forecast = model.predict(future_dates) # Extract the forecast for the requested interval forecast_data = forecast[['ds', 'yhat']].tail(interval) predictions = [{"date": row['ds'].strftime('%Y-%m-%d'), "value": row['yhat']} for _, row in forecast_data.iterrows()] # Return predictions in JSON format return jsonify({"predictedData": predictions}) if __name__ == "__main__": app.run(debug=True, host="0.0.0.0", port=7860)