Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, Request | |
from fastapi.responses import HTMLResponse | |
import threading | |
import asyncio | |
import mysql.connector | |
import json | |
import logging | |
import pandas as pd | |
from llama_cpp import Llama | |
from transformers import pipeline | |
import os | |
app = FastAPI() | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Email and database configuration | |
DB_CONFIG = os.getenv('db') | |
# System prompt for LLM | |
prompt = os.getenv('prompt') | |
# Function to insert extracted shipment details into MySQL database | |
def insert_data(extracted_details): | |
try: | |
mydb = mysql.connector.connect(**DB_CONFIG) | |
cursor = mydb.cursor() | |
# Skip insertion if all required fields are empty | |
required_fields = ['origin', 'destination', 'expected_shipment_datetime', | |
'types_of_service', 'warehouse', 'description', | |
'quantities', 'carrier_details'] | |
if all(extracted_details.get(field) in [None, ""] for field in required_fields): | |
logger.info("Skipping insertion: All extracted values are empty.") | |
return | |
sql = """ | |
INSERT INTO shipment_details ( | |
origin, destination, expected_shipment_datetime, types_of_service, | |
warehouse, description, quantities, carrier_details | |
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) | |
""" | |
values = ( | |
extracted_details.get('origin'), | |
extracted_details.get('destination'), | |
extracted_details.get('expected_shipment_datetime'), | |
extracted_details.get('types_of_service'), | |
extracted_details.get('warehouse'), | |
extracted_details.get('description'), | |
extracted_details.get('quantities'), | |
extracted_details.get('carrier_details') | |
) | |
cursor.execute(sql, values) | |
mydb.commit() | |
logger.info("Data inserted successfully.") | |
except mysql.connector.Error as db_err: | |
logger.error(f"Database error: {db_err}") | |
except Exception as ex: | |
logger.error(f"Error inserting data: {ex}") | |
# Function to read and process emails | |
def read_email(): | |
logger.info("Loading Llama model...") | |
llm = Llama.from_pretrained( | |
repo_id="microsoft/Phi-3-mini-4k-instruct-gguf", | |
filename="Phi-3-mini-4k-instruct-fp16.gguf", n_ctx=2048 | |
) | |
logger.info("Llama model loaded.") | |
logger.info("Reading emails from CSV...") | |
df = pd.read_csv('./emails.csv') | |
for i in df['Body']: | |
logger.info(f"Processing email: {i}") | |
output = llm( | |
f"<|system|>\n{prompt}<|end|><|user|>\n{i}<|end|>\n<|assistant|>", | |
max_tokens=256, | |
stop=["<|end|>"], | |
echo=False) | |
logger.info("Extracting details...") | |
t = output['choices'][0]['text'] | |
logger.info('the model output : \n',t) | |
extracted_details = json.loads(t[t.find('{'):t.find('}') + 1].replace("'", '"')) | |
extracted_details = {key.lower().replace(" ", "_"): value for key, value in extracted_details.items()} | |
# Add meta data placeholders | |
meta_data = { | |
'sender': None, | |
'receiver': None, | |
'cc': None, | |
'bcc': None, | |
'subject': None | |
} | |
extracted_details.update(meta_data) | |
logger.info(f"Full extracted data: {extracted_details}") | |
insert_data(extracted_details) | |
# Global variable to control the email processing loop | |
running = False | |
# HTML content for the web interface | |
html_content = """ | |
<!DOCTYPE html> | |
<html lang="en"> | |
<head> | |
<meta charset="UTF-8"> | |
<meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
<title>Email Processing</title> | |
<style> | |
body { font-family: Arial, sans-serif; margin: 50px; } | |
h1 { color: #333; } | |
button { | |
padding: 10px 20px; | |
margin: 10px; | |
background-color: #4CAF50; | |
color: white; | |
border: none; | |
cursor: pointer; | |
} | |
button.stop { background-color: #f44336; } | |
#status { font-weight: bold; } | |
</style> | |
<script> | |
async function startLoop() { | |
const response = await fetch('/start', { method: 'POST' }); | |
const result = await response.text(); | |
document.getElementById("status").innerHTML = result; | |
} | |
async function stopLoop() { | |
const response = await fetch('/stop', { method: 'POST' }); | |
const result = await response.text(); | |
document.getElementById("status").innerHTML = result; | |
} | |
</script> | |
</head> | |
<body> | |
<h1>Email Processing Status: <span id="status">{{ status }}</span></h1> | |
<button onclick="startLoop()">Start</button> | |
<button class="stop" onclick="stopLoop()">Stop</button> | |
</body> | |
</html> | |
""" | |
# Function to process emails in a loop asynchronously | |
async def email_processing_loop(): | |
global running | |
logger.info("Starting email processing loop...") | |
while running: | |
logger.info("Processing emails...") | |
read_email() | |
await asyncio.sleep(10) # Non-blocking delay for the loop | |
# Endpoint to display the current email processor status | |
async def home(): | |
global running | |
print(os.getenv('db')) | |
status = "Running" if running else "Stopped" | |
return HTMLResponse(content=html_content.replace("{{ status }}", status), status_code=200) | |
# Endpoint to start the email processing loop | |
async def start_email_loop(): | |
global running | |
if not running: | |
running = True | |
asyncio.ensure_future(email_processing_loop()) | |
logger.info("Email processing loop started.") | |
return "Running" | |
else: | |
return "Already running" | |
# Endpoint to stop the email processing loop | |
async def stop_email_loop(): | |
global running | |
if running: | |
running = False | |
logger.info("Email processing loop stopped.") | |
return "Stopped" | |
else: | |
return "Already stopped" | |
if __name__ == "__main__": | |
logger.info("Starting FastAPI server...") | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) | |