Qxlite-AI / main.py
yoursdvniel's picture
Update main.py
a588d97 verified
raw
history blame
6.81 kB
from langchain_google_genai import ChatGoogleGenerativeAI
import pandas as pd
import os
import io
from flask import Flask, request, jsonify
from flask_cors import CORS, cross_origin
import firebase_admin
import logging
from firebase_admin import credentials, firestore
from dotenv import load_dotenv
from pandasai import SmartDatalake
from pandasai.responses.response_parser import ResponseParser
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from datetime import datetime
import matplotlib.pyplot as plt
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from prophet import Prophet
load_dotenv()
app = Flask(__name__)
cors = CORS(app)
# Initialize Firebase app
if not firebase_admin._apps:
cred = credentials.Certificate("quant-app-99d09-firebase-adminsdk-6prb1-37f34e1c91.json")
firebase_admin.initialize_app(cred)
db = firestore.client()
class FlaskResponse(ResponseParser):
def __init__(self, context) -> None:
super().__init__(context)
def format_dataframe(self, result):
return result['value'].to_html()
def format_plot(self, result):
try:
img_path = result['value']
except ValueError:
img_path = str(result['value'])
print("ValueError:", img_path)
print("response_class_path:", img_path)
return img_path
def format_other(self, result):
return str(result['value'])
gemini_api_key = os.getenv('Gemini')
llm = ChatGoogleGenerativeAI(api_key=gemini_api_key, model='gemini-1.5-flash-001', temperature=0.1)
# Endpoint for handling questions to the bot using transaction data
@app.route("/predict", methods=["POST"])
@cross_origin()
def bot():
user_id = request.json.get("user_id")
user_question = request.json.get("user_question")
inventory_ref = db.collection("system_users").document(user_id).collection('inventory')
tasks_ref = db.collection("system_users").document(user_id).collection('tasks')
transactions_ref = db.collection("system_users").document(user_id).collection('transactions')
inventory_list = [doc.to_dict() for doc in inventory_ref.stream()]
tasks_list = [doc.to_dict() for doc in tasks_ref.stream()]
transactions_list = [doc.to_dict() for doc in transactions_ref.stream()]
inventory_df = pd.DataFrame(inventory_list)
transactions_df = pd.DataFrame(transactions_list)
tasks_df = pd.DataFrame(tasks_list)
lake = SmartDatalake([inventory_df, transactions_df, tasks_df], config={"llm": llm, "response_parser": FlaskResponse, "enable_cache": False, "save_logs": False})
response = lake.chat(user_question)
print(user_question)
return jsonify(str(response))
# Marketing recommendations endpoint
@app.route("/mrec", methods=["POST"])
@cross_origin()
def marketing_rec():
user_id = request.json.get("user_id")
transactions_ref = db.collection("system_users").document(user_id).collection('transactions')
transactions_list = [doc.to_dict() for doc in transactions_ref.stream()]
transactions_df = pd.DataFrame(transactions_list)
prompt = PromptTemplate.from_template('You are a business analyst. Write a brief analysis and marketing tips for a small business using this transactions data {data_frame}')
chain = LLMChain(llm=llm, prompt=prompt, verbose=True)
response = chain.invoke(input=transactions_df)
print(response)
return jsonify(str(response['text']))
# Profit/Customer Engagement Prediction endpoint
@app.route("/predict_metric", methods=["POST"])
@cross_origin()
def predict_metric():
request_data = request.json
user_id = request_data.get("user_id")
interval = request_data.get("interval", 30)
metric_type = request_data.get("metric_type", "Profit") # "Profit" or "Customer Engagement"
transactions_ref = db.collection("system_users").document(user_id).collection("transactions")
data = []
if metric_type == "Profit":
# Fetch both Income and Expense transactions for Profit calculation
income_query = transactions_ref.where("transactionType", "==", "Income").stream()
expense_query = transactions_ref.where("transactionType", "==", "Expense").stream()
income_data = {}
expense_data = {}
for doc in income_query:
transaction = doc.to_dict()
date_str = transaction["date"]
amount = transaction["amountDue"]
income_data[date_str] = income_data.get(date_str, 0) + amount
for doc in expense_query:
transaction = doc.to_dict()
date_str = transaction["date"]
amount = transaction["amountDue"]
expense_data[date_str] = expense_data.get(date_str, 0) + amount
# Calculate net profit for each date
for date, income in income_data.items():
expense = expense_data.get(date, 0)
data.append({"date": date, "amountDue": income - expense})
elif metric_type == "Customer Engagement":
# Use count of Income transactions per day as Customer Engagement
income_query = transactions_ref.where("transactionType", "==", "Income").stream()
engagement_data = {}
for doc in income_query:
transaction = doc.to_dict()
date_str = transaction["date"]
engagement_data[date_str] = engagement_data.get(date_str, 0) + 1
for date, count in engagement_data.items():
data.append({"date": date, "amountDue": count})
# Create DataFrame from the aggregated data
df = pd.DataFrame(data)
# Ensure 'date' column is datetime
df['date'] = pd.to_datetime(df['date'])
df['date'] = df['date'].dt.tz_localize(None)
# Set 'date' as index
df = df.sort_values("date").set_index("date")
# Resample daily to ensure regular intervals (fill missing dates)
df = df.resample("D").sum().reset_index()
df.columns = ["ds", "y"] # ds: date, y: target
# Check if there's enough data to train the model
if df.shape[0] < 10:
return jsonify({"error": "Not enough data for prediction"})
# Initialize and fit the Prophet model
model = Prophet(daily_seasonality=True, yearly_seasonality=True)
model.fit(df)
# DataFrame for future predictions
future_dates = model.make_future_dataframe(periods=interval)
forecast = model.predict(future_dates)
# Extract the forecast for the requested interval
forecast_data = forecast[['ds', 'yhat']].tail(interval)
predictions = [{"date": row['ds'].strftime('%Y-%m-%d'), "value": row['yhat']} for _, row in forecast_data.iterrows()]
# Return predictions in JSON format
return jsonify({"predictedData": predictions})
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=7860)