|
|
|
|
|
|
|
import os |
|
import logging |
|
from pymongo import MongoClient, errors |
|
from datetime import datetime |
|
from werkzeug.security import generate_password_hash |
|
from services.utils import PoultryFarmBot |
|
from services.chatbot import build_chatbot_interface |
|
from transformers import AutoModelForCausalLM, AutoTokenizer |
|
from flask import Flask, render_template, request, redirect, url_for, session |
|
from celery import Celery |
|
import time |
|
import threading |
|
import gunicorn.app.base |
|
from gunicorn.six import iteritems |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
app = Flask(__name__) |
|
app.secret_key = os.environ.get("SECRET_KEY", "default_secret_key") |
|
|
|
|
|
CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379/0") |
|
app.config['CELERY_BROKER_URL'] = CELERY_BROKER_URL |
|
app.config['CELERY_RESULT_BACKEND'] = CELERY_BROKER_URL |
|
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) |
|
celery.conf.update(app.config) |
|
|
|
|
|
MONGO_URI = os.environ.get("MONGO_URI") |
|
if not MONGO_URI: |
|
logger.error("MONGO_URI is not set in the environment variables.") |
|
raise ValueError("MONGO_URI environment variable is required but not set.") |
|
|
|
|
|
max_retries = 3 |
|
for attempt in range(max_retries): |
|
try: |
|
logger.info(f"Connecting to MongoDB (Attempt {attempt + 1}/{max_retries}).") |
|
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) |
|
client.server_info() |
|
db = client.poultry_farm |
|
enquiries_collection = db.enquiries |
|
users_collection = db.users |
|
logs_collection = db.logs |
|
break |
|
except errors.ServerSelectionTimeoutError as e: |
|
logger.error(f"Failed to connect to MongoDB (Attempt {attempt + 1}/{max_retries}): {e}") |
|
if attempt < max_retries - 1: |
|
time.sleep(5) |
|
else: |
|
raise ConnectionError("Could not connect to MongoDB. Please check the MONGO_URI and ensure the database is running.") |
|
|
|
def log_to_db(level, message): |
|
try: |
|
log_entry = { |
|
"level": level, |
|
"message": message, |
|
"timestamp": datetime.utcnow() |
|
} |
|
logs_collection.insert_one(log_entry) |
|
except Exception as e: |
|
logger.error(f"Failed to log to database: {e}") |
|
|
|
|
|
class MongoHandler(logging.Handler): |
|
def emit(self, record): |
|
log_entry = self.format(record) |
|
try: |
|
log_to_db(record.levelname, log_entry) |
|
except Exception as e: |
|
logger.error(f"Failed to emit log to MongoDB: {e}") |
|
|
|
mongo_handler = MongoHandler() |
|
mongo_handler.setLevel(logging.INFO) |
|
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') |
|
mongo_handler.setFormatter(formatter) |
|
logger.addHandler(mongo_handler) |
|
|
|
|
|
tok = os.environ.get('HF_TOKEN') |
|
if tok: |
|
|
|
logger.info("Logging in to Hugging Face.") |
|
else: |
|
logger.warning("Hugging Face token not found in environment variables.") |
|
|
|
|
|
logger.info("Initializing PoultryFarmBot instance.") |
|
bot = PoultryFarmBot(db) |
|
|
|
|
|
model = None |
|
tokenizer = None |
|
model_loading_event = threading.Event() |
|
|
|
|
|
@celery.task |
|
def load_model_and_tokenizer(): |
|
global model, tokenizer |
|
try: |
|
logger.info("Loading Llama 3.2 model and tokenizer.") |
|
model_name = "meta-llama/Llama-3.2-3B" |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
model = AutoModelForCausalLM.from_pretrained(model_name) |
|
if tokenizer.pad_token is None: |
|
logger.info("Adding padding token to tokenizer.") |
|
tokenizer.add_special_tokens({'pad_token': '[PAD]'}) |
|
model.resize_token_embeddings(len(tokenizer)) |
|
logger.info("Model and tokenizer loaded successfully.") |
|
model_loading_event.set() |
|
except Exception as e: |
|
logger.error(f"Failed to load Llama 3.2 model or tokenizer: {e}") |
|
model_loading_event.set() |
|
raise RuntimeError("Could not load the Llama 3.2 model or tokenizer. Please check the configuration.") |
|
|
|
|
|
def authenticate_user(username, password): |
|
""" |
|
Authenticate a user with username and password. |
|
|
|
Args: |
|
username (str): Username for authentication. |
|
password (str): Password for authentication. |
|
|
|
Returns: |
|
bool: True if authentication is successful, False otherwise. |
|
""" |
|
user = bot.authenticate_user(username, password) |
|
if user: |
|
return True, user |
|
else: |
|
return False, None |
|
|
|
|
|
def register_user(username, password): |
|
""" |
|
Register a new user with username and password. |
|
|
|
Args: |
|
username (str): Username for registration. |
|
password (str): Password for registration. |
|
|
|
Returns: |
|
bool: True if registration is successful, False otherwise. |
|
""" |
|
try: |
|
if users_collection.find_one({"username": username}): |
|
logger.warning("Username already exists: %s", username) |
|
return False |
|
hashed_password = generate_password_hash(password) |
|
users_collection.insert_one({"username": username, "password": hashed_password}) |
|
logger.info("User registered successfully: %s", username) |
|
return True |
|
except Exception as e: |
|
logger.error(f"Failed to register user: {e}") |
|
return False |
|
|
|
|
|
@app.route('/', methods=['GET', 'POST']) |
|
def login(): |
|
if request.method == 'POST': |
|
username = request.form['username'] |
|
password = request.form['password'] |
|
success, user = authenticate_user(username, password) |
|
if success: |
|
logger.info("Authentication successful for user: %s", username) |
|
session['username'] = username |
|
|
|
|
|
load_model_and_tokenizer.apply_async() |
|
|
|
return redirect(url_for('chatbot')) |
|
else: |
|
logger.warning("Authentication failed for user: %s", username) |
|
return render_template('login.html', error="Invalid username or password.") |
|
return render_template('login.html') |
|
|
|
@app.route('/register', methods=['GET', 'POST']) |
|
def register(): |
|
if request.method == 'POST': |
|
username = request.form['username'] |
|
password = request.form['password'] |
|
success = register_user(username, password) |
|
if success: |
|
logger.info("Registration successful for user: %s", username) |
|
return redirect(url_for('login')) |
|
else: |
|
logger.warning("Registration failed for user: %s", username) |
|
return render_template('register.html', error="Username already exists. Please choose a different one.") |
|
return render_template('register.html') |
|
|
|
@app.route('/chatbot') |
|
def chatbot(): |
|
if 'username' not in session: |
|
return redirect(url_for('login')) |
|
username = session['username'] |
|
|
|
|
|
if not model_loading_event.wait(timeout=100): |
|
return "Model loading timed out. Please try again later." |
|
|
|
if model is None or tokenizer is None: |
|
return "Model failed to load. Please try again later." |
|
|
|
logger.info("Launching Gradio chatbot interface for user: %s", username) |
|
chatbot_interface = build_chatbot_interface({'username': username}) |
|
return chatbot_interface.launch(inline=True, share=True) |
|
|
|
|
|
class StandaloneApplication(gunicorn.app.base.BaseApplication): |
|
def __init__(self, app, options=None): |
|
self.application = app |
|
self.options = options or {} |
|
super().__init__() |
|
|
|
def load_config(self): |
|
config = {key: value for key, value in iteritems(self.options) if key in self.cfg.settings and value is not None} |
|
for key, value in config.items(): |
|
self.cfg.set(key.lower(), value) |
|
|
|
def load(self): |
|
return self.application |
|
|
|
|
|
if __name__ == "__main__": |
|
try: |
|
options = { |
|
'bind': '%s:%s' % ('0.0.0.0', '5000'), |
|
'workers': 2, |
|
'threads': 4, |
|
'timeout': 120, |
|
} |
|
StandaloneApplication(app, options).run() |
|
except Exception as e: |
|
logger.error(f"Failed to launch Flask server: {e}") |
|
raise RuntimeError("Could not launch the Flask server. Please check the application setup.") |
|
|