from flask import Flask, render_template, request, redirect, url_for, session, jsonify from sklearn.metrics.pairwise import cosine_similarity import numpy as np from bson.objectid import ObjectId import bcrypt from passlib.hash import sha256_crypt from flask_pymongo import PyMongo from models import model_ir from sklearn.feature_extraction.text import TfidfVectorizer from config import DATABASE_URI, SECRET_KEY app = Flask(__name__) # Access environment variables app.config["MONGO_URI"] = DATABASE_URI app.config["SECRET_KEY"] = SECRET_KEY mongo = PyMongo(app) #### DB CONNECTION CHECKING def test_db_connection(): try: # Try to perform a simple query to check if the database connection is working mongo.db.users.find_one() # Replace 'users' with the name of any existing collection in your database # If the query above doesn't raise an exception, the connection is successful return True except Exception as e: # If any exception occurs during the database query, the connection is not successful return False # ------------------------------------ ROUTES ------------------------------------------ #### AUTH PART ##### @app.route("/", methods=["GET", "POST"]) def login(): if request.method == "POST": username = request.form.get("email") password = request.form.get("password") # Validate the username against the user collection in the database user = mongo.db.users.find_one({"email": username}) if user: # Verify the password based on the user's role if user["role"] == "admin": if bcrypt.checkpw( password.encode("utf-8"), user["password"].encode("utf-8") ): # Store the user's ID and name in the session session["user_id"] = str(user["_id"]) session["user_name"] = user["name"] return redirect(url_for("admin")) else: if bcrypt.checkpw( password.encode("utf-8"), user["password"].encode("utf-8") ): # Store the user's ID and name in the session session["user_id"] = str(user["_id"]) session["user_name"] = user["name"] return redirect(url_for("regular_user")) # Handle invalid credentials (e.g., show an error message) return "Invalid credentials or Account not Registered" else: return render_template( "authentication-login.html", user_name=session.get("user_name") ) @app.route("/register", methods=["GET", "POST"]) def register(): if request.method == "POST": name = request.form.get("name") email = request.form.get("email") password = request.form.get("password") # Hash the password based on the user's role if email == "admin@example.com": hashed_password = sha256_crypt.hash(password) # Use sha256_crypt for admin else: hashed_password = bcrypt.hashpw( password.encode("utf-8"), bcrypt.gensalt() ).decode( "utf-8" ) # Use bcrypt for regular users role = "regular_user" # Default role for new users if email == "admin@arazim.com": role = "admin" # Create a new user document user = { "name": name, "email": email, "password": hashed_password, "role": role # Add other user properties as needed } # Insert the user document into the users collection mongo.db.users.insert_one(user) # Redirect the user to the login page return redirect(url_for("login")) else: return render_template("authentication-register.html") @app.route("/logout") def logout(): session.pop("user_id", None) session.pop("user_name", None) return redirect(url_for("login")) @app.route("/admin") def admin(): return render_template("admin-dashboard.html", user_name=session.get("user_name")) @app.route("/admin/Results", methods=["GET", "POST"]) def AdminResults(): if request.method == "POST": selected_rows = request.form.getlist("selected-rows") results = session.get("results") # Retrieve the results from the session # Store the selected_rows in the database using PyMongo for row_index in selected_rows: row = results[int(row_index) - 1] mongo.db.customers.insert_one({"post": row[0], "label": row[1]}) # Redirect to a success page or perform any other actions return render_template("success.html") else: results = model_ir.lr_BS() session["results"] = results # Store the results in the session return render_template( "admin-dashboard.html", results=results, user_name=session.get("user_name") ) @app.route("/regular_user") def regular_user(): return render_template("regular_user.html", user_name=session.get("user_name")) #### BUYER PART ##### @app.route("/buyer.html") def buyer(): results = get_buyer_posts() # Retrieve buyer posts from the database return render_template( "buyer.html", user_name=session.get("user_name"), results=results ) def get_buyer_posts(): # Retrieve posts from the customers collection with label "Buyer" results = mongo.db.customers.find({"label": "Buyer"}) # Convert the cursor to a list of dictionaries with post IDs included posts_with_ids = [ {"_id": str(post["_id"]), "post": post["post"]} for post in results ] return posts_with_ids @app.route("/similar_seller_posts/") def similar_seller_posts(buyer_post_id): similar_posts = get_similar_seller_posts(buyer_post_id) buyer_post = mongo.db.customers.find_one({"_id": ObjectId(buyer_post_id)}) return render_template( "post-display.html", buyer_post=buyer_post, user_name=session.get("user_name"), similar_posts=similar_posts, ) def get_similar_seller_posts(buyer_post_id): if buyer_post_id: buyer_post = mongo.db.customers.find_one({"_id": ObjectId(buyer_post_id)}) if buyer_post: buyer_attributes = extract_attributes(buyer_post["post"]) similar_seller_posts = content_based_filtering(buyer_attributes) return similar_seller_posts # Return an empty list if the buyer post is not found or buyer_post_id is None return [] def extract_attributes(post): # Extract relevant attributes from the post for content-based filtering attributes = { "content": post, # Use the entire post content as the "content" attribute } return attributes def content_based_filtering(buyer_attributes): # Retrieve seller posts from the database seller_posts = mongo.db.customers.find({"label": "Seller"}) # Initialize variables to keep track of the most similar post and its similarity score most_similar_post = None highest_similarity_score = -1 # Initialize with a negative value for post in seller_posts: seller_attributes = extract_attributes( post["post"] ) # Extract attributes from the "post" field similarity_score = compute_similarity(buyer_attributes, seller_attributes) # Check if the current post has a higher similarity score than the previous highest score if similarity_score > highest_similarity_score: highest_similarity_score = similarity_score most_similar_post = post if most_similar_post: return [most_similar_post] # Return the most similar post as a list else: return [] # Return an empty list if no similar post is found def compute_similarity(attributes1, attributes2): # Convert attributes to feature vectors vector1 = convert_attributes_to_vector(attributes1).reshape(1, -1) vector2 = convert_attributes_to_vector(attributes2).reshape(1, -1) # Compute cosine similarity between the vectors similarity_score = cosine_similarity(vector1, vector2)[0][0] return similarity_score def convert_attributes_to_vector(attributes): # Define a mapping between attributes and feature indices feature_indices = { "content": 0, # Use "content" as the only attribute } # Initialize a zero-filled feature vector vector_length = len(feature_indices) vector = np.zeros(vector_length) # TF-IDF Vectorization for the "content" attribute if "content" in attributes: content = attributes["content"] tfidf_vectorizer = TfidfVectorizer() tfidf_matrix = tfidf_vectorizer.fit_transform([content]) content_vector = tfidf_matrix.toarray() # Flatten the content vector to a 1D array content_vector = content_vector.flatten() # Assign the TF-IDF values to the corresponding feature index if "content" in feature_indices: feature_index = feature_indices["content"] vector[feature_index] = content_vector[ 0 ] # Assuming only one post in this case return vector #### SELLER PART ##### @app.route("/seller.html") def seller(): results = get_seller_posts() # Retrieve seller posts from the database return render_template( "seller.html", user_name=session.get("user_name"), results=results ) def get_seller_posts(): results = mongo.db.customers.find({"label": "Seller"}) # Convert the cursor to a list of dictionaries with post IDs included posts_with_ids = [ {"_id": str(post["_id"]), "post": post["post"]} for post in results ] return posts_with_ids @app.route("/similar_buyer_posts/") def similar_buyer_posts(seller_post_id): similar_posts = get_similar_buyer_posts(seller_post_id) seller_post = mongo.db.customers.find_one({"_id": ObjectId(seller_post_id)}) return render_template( "post-display-seller.html", seller_post=seller_post, user_name=session.get("user_name"), similar_posts=similar_posts, ) def get_similar_buyer_posts(seller_post_id): if seller_post_id: seller_post = mongo.db.customers.find_one({"_id": ObjectId(seller_post_id)}) if seller_post: seller_attributes = extract_attributes(seller_post["post"]) similar_buyer_posts = content_based_filtering_seller(seller_attributes) return similar_buyer_posts # Return an empty list if the buyer post is not found or seller_post_id is None return [] def content_based_filtering_seller(seller_attributes): # Retrieve seller posts from the database buyer_posts = mongo.db.customers.find({"label": "Buyer"}) # Initialize variables to keep track of the most similar post and its similarity score most_similar_post = None highest_similarity_score = -1 # Initialize with a negative value for post in buyer_posts: buyer_attributes = extract_attributes( post["post"] ) # Extract attributes from the "post" field similarity_score = compute_similarity(seller_attributes, buyer_attributes) # Check if the current post has a higher similarity score than the previous highest score if similarity_score > highest_similarity_score: highest_similarity_score = similarity_score most_similar_post = post if most_similar_post: return [most_similar_post] # Return the most similar post as a list else: return [] # Return an empty list if no similar post is found ##### OTHER FUNCTIONS ##### # New endpoint for seller and buyer post counts @app.route("/post_counts", methods=["GET"]) def post_counts(): seller_count = mongo.db.customers.count_documents({"label": "Seller"}) buyer_count = mongo.db.customers.count_documents({"label": "Buyer"}) return jsonify({"seller_posts": seller_count, "buyer_posts": buyer_count}) if __name__ == "__main__": try: with app.app_context(): # Check the database connection before running the application if test_db_connection(): app.run() else: print("Database connection failed.") except Exception as e: print(e)