import os |
import logging |
from fastapi import FastAPI, File, UploadFile, HTTPException, APIRouter, Form, Request |
from pydantic import BaseModel |
from services.file_upload_service import FileHandler |
from services.chat_service import ChatAgentService |
from fastapi.staticfiles import StaticFiles |
from fastapi.templating import Jinja2Templates |
from fastapi.middleware.cors import CORSMiddleware |
from dotenv import load_dotenv |
import mysql.connector |
app = FastAPI(title="RedmindGen", description="Chat with your Data", version="1.0.0") |
load_dotenv() |
vector_db_path_documents = os.getenv('VECTOR_DB_PATH_DOCUMENTS') |
app.mount("/static", StaticFiles(directory="static"), name="static") |
templates = Jinja2Templates(directory="templates") |
LOG_PATH = os.getenv('LOG_PATH') |
LOG_FILENAME = "redmindgen.log" |
full_log_path = os.path.join(LOG_PATH, LOG_FILENAME) |
logging.basicConfig(filename=full_log_path, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
logging.info("File upload start") |
origins = [ |
"http://localhost:8000", |
"", |
"" |
] |
app.add_middleware( |
CORSMiddleware, |
allow_origins=origins, |
allow_credentials=True, |
allow_methods=["*"], |
allow_headers=["*"], |
) |
@app.get("/") |
async def read_root(request: Request): |
""" |
Root endpoint that returns the index.html template. |
""" |
return templates.TemplateResponse("index.html", {"request": request}) |
def verify_user(username: str, password: str): |
""" |
Function to verify user credentials by checking the username and password in the database. |
Returns "success" if the user is logged in, otherwise returns "failure". |
""" |
cnx = mysql.connector.connect(user='root', password='', |
host='localhost', |
database='redmind_gpt') |
cursor = cnx.cursor() |
query = "SELECT * FROM user_detail WHERE username = %s AND password = %s" |
values = (username, password) |
cursor.execute(query, values) |
result = cursor.fetchone() |
if result is not None: |
return "success" |
else: |
return "failure" |
cursor.close() |
cnx.close() |
@app.post("/validate-user") |
async def validate_user(request: Request, username: str = Form(...), password: str = Form(...)): |
""" |
Endpoint to validate user credentials. |
If the credentials are valid, it returns the dashboard.html template. |
Otherwise, it returns the index.html template. |
""" |
status = verify_user(username, password) |
if status == 'success': |
return templates.TemplateResponse("dashboard.html", {"request": request, "username": username}) |
else: |
return templates.TemplateResponse("index.html", {"request": request}) |
@app.get("/knowledgebase") |
async def knowledgebase(request: Request): |
""" |
Endpoint for the knowledgebase page. |
Returns the knowledgebase.html template. |
""" |
return templates.TemplateResponse("knowledgebase.html", {"request": request}) |
@app.get("/chatbot") |
async def chatbot(request: Request): |
""" |
Endpoint for the knowledgebase page. |
Returns the knowledgebase.html template. |
""" |
return templates.TemplateResponse("chatpage.html", {"request": request}) |
@app.get("/company_profile") |
async def company_profile(request: Request): |
""" |
Endpoint for the company profile page. |
Returns the company_profile.html template. |
""" |
return templates.TemplateResponse("company_profile.html", {"request": request}) |
@app.get("/data_connectors") |
async def data_connectors(request: Request): |
""" |
Endpoint for the company profile page. |
Returns the company_profile.html template. |
""" |
return templates.TemplateResponse("data_connectors.html", {"request": request}) |
@app.get("/API_connectors") |
async def API_connectors(request: Request): |
""" |
Endpoint for the company profile page. |
Returns the company_profile.html template. |
""" |
return templates.TemplateResponse("API_connectors.html", {"request": request}) |
@app.post("/upload_file/") |
async def upload_file( |
file: UploadFile = File(..., description="Upload a file"), |
document_name: str = Form(..., description="The name of the document."), |
document_description: str = Form(..., description="A brief description of the document."), |
department: str = Form(..., description="The department associated with the document."), |
version: str = Form(..., description="The version of the document."), |
last_updated: str = Form(..., description="The date when the document was last updated.") |
): |
""" |
Endpoint to handle file uploads. |
It validates the file type and calls the FileHandler to handle the file upload. |
Returns the result of the file upload.""" |
print("upload_file called") |
allowed_extensions = {'txt', 'pdf', 'docx', 'csv', 'xlsx'} |
if file.filename.split('.')[-1].lower() not in allowed_extensions: |
raise HTTPException(status_code=400, |
detail="Unsupported file type. Supported types: TXT, PDF, DOCX, CSV, XLSX.") |
try: |
file_handler = FileHandler(vector_db_path=vector_db_path_documents) |
print(vector_db_path_documents) |
result = await file_handler.handle_file_upload(file, document_name, document_description, department, version, |
last_updated) |
print("result") |
return result |
except Exception as e: |
logging.error(f"Error handling file uploads: {str(e)}") |
raise HTTPException(status_code=500, detail=str(e)) |
@app.post("/chat_with_agent") |
async def chat_with_agent(request: Request, user_question: str = Form(...)): |
""" |
Endpoint to chat with the chat agent. |
It calls the ChatAgentService to get the answer to the user's question. |
Returns the answer. |
""" |
print(user_question) |
chat_service = ChatAgentService() |
try: |
print("chat_with_agent called") |
answer = chat_service.answer_question(user_question) |
print("answer returned") |
return answer |
except Exception as e: |
logging.error(f"Error in chat api: {str(e)}") |
raise HTTPException(status_code=500, detail=str(e)) |