emaildockerdemo / app.py
Rsnarsna's picture
Update app.py
012015d verified
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
import threading
import asyncio
import mysql.connector
import json
import logging
import pandas as pd
from llama_cpp import Llama
from transformers import pipeline
import os
app = FastAPI()
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Email and database configuration
DB_CONFIG = os.getenv('db')
# System prompt for LLM
prompt = os.getenv('prompt')
# Function to insert extracted shipment details into MySQL database
def insert_data(extracted_details):
try:
mydb = mysql.connector.connect(**DB_CONFIG)
cursor = mydb.cursor()
# Skip insertion if all required fields are empty
required_fields = ['origin', 'destination', 'expected_shipment_datetime',
'types_of_service', 'warehouse', 'description',
'quantities', 'carrier_details']
if all(extracted_details.get(field) in [None, ""] for field in required_fields):
logger.info("Skipping insertion: All extracted values are empty.")
return
sql = """
INSERT INTO shipment_details (
origin, destination, expected_shipment_datetime, types_of_service,
warehouse, description, quantities, carrier_details
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
values = (
extracted_details.get('origin'),
extracted_details.get('destination'),
extracted_details.get('expected_shipment_datetime'),
extracted_details.get('types_of_service'),
extracted_details.get('warehouse'),
extracted_details.get('description'),
extracted_details.get('quantities'),
extracted_details.get('carrier_details')
)
cursor.execute(sql, values)
mydb.commit()
logger.info("Data inserted successfully.")
except mysql.connector.Error as db_err:
logger.error(f"Database error: {db_err}")
except Exception as ex:
logger.error(f"Error inserting data: {ex}")
# Function to read and process emails
def read_email():
logger.info("Loading Llama model...")
llm = Llama.from_pretrained(
repo_id="microsoft/Phi-3-mini-4k-instruct-gguf",
filename="Phi-3-mini-4k-instruct-fp16.gguf", n_ctx=2048
)
logger.info("Llama model loaded.")
logger.info("Reading emails from CSV...")
df = pd.read_csv('./emails.csv')
for i in df['Body']:
logger.info(f"Processing email: {i}")
output = llm(
f"<|system|>\n{prompt}<|end|><|user|>\n{i}<|end|>\n<|assistant|>",
max_tokens=256,
stop=["<|end|>"],
echo=False)
logger.info("Extracting details...")
t = output['choices'][0]['text']
logger.info('the model output : \n',t)
extracted_details = json.loads(t[t.find('{'):t.find('}') + 1].replace("'", '"'))
extracted_details = {key.lower().replace(" ", "_"): value for key, value in extracted_details.items()}
# Add meta data placeholders
meta_data = {
'sender': None,
'receiver': None,
'cc': None,
'bcc': None,
'subject': None
}
extracted_details.update(meta_data)
logger.info(f"Full extracted data: {extracted_details}")
insert_data(extracted_details)
# Global variable to control the email processing loop
running = False
# HTML content for the web interface
html_content = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Email Processing</title>
<style>
body { font-family: Arial, sans-serif; margin: 50px; }
h1 { color: #333; }
button {
padding: 10px 20px;
margin: 10px;
background-color: #4CAF50;
color: white;
border: none;
cursor: pointer;
}
button.stop { background-color: #f44336; }
#status { font-weight: bold; }
</style>
<script>
async function startLoop() {
const response = await fetch('/start', { method: 'POST' });
const result = await response.text();
document.getElementById("status").innerHTML = result;
}
async function stopLoop() {
const response = await fetch('/stop', { method: 'POST' });
const result = await response.text();
document.getElementById("status").innerHTML = result;
}
</script>
</head>
<body>
<h1>Email Processing Status: <span id="status">{{ status }}</span></h1>
<button onclick="startLoop()">Start</button>
<button class="stop" onclick="stopLoop()">Stop</button>
</body>
</html>
"""
# Function to process emails in a loop asynchronously
async def email_processing_loop():
global running
logger.info("Starting email processing loop...")
while running:
logger.info("Processing emails...")
read_email()
await asyncio.sleep(10) # Non-blocking delay for the loop
# Endpoint to display the current email processor status
@app.get("/", response_class=HTMLResponse)
async def home():
global running
print(os.getenv('db'))
status = "Running" if running else "Stopped"
return HTMLResponse(content=html_content.replace("{{ status }}", status), status_code=200)
# Endpoint to start the email processing loop
@app.post("/start")
async def start_email_loop():
global running
if not running:
running = True
asyncio.ensure_future(email_processing_loop())
logger.info("Email processing loop started.")
return "Running"
else:
return "Already running"
# Endpoint to stop the email processing loop
@app.post("/stop")
async def stop_email_loop():
global running
if running:
running = False
logger.info("Email processing loop stopped.")
return "Stopped"
else:
return "Already stopped"
if __name__ == "__main__":
logger.info("Starting FastAPI server...")
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)