ARAZIM / app.py
RANA
connection issue 6
eddd084
raw
history blame contribute delete
No virus
12.4 kB
from flask import Flask, render_template, request, redirect, url_for, session, jsonify
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from bson.objectid import ObjectId
import bcrypt
from passlib.hash import sha256_crypt
from flask_pymongo import PyMongo
from models import model_ir
from sklearn.feature_extraction.text import TfidfVectorizer
from config import DATABASE_URI, SECRET_KEY
app = Flask(__name__)
# Access environment variables
app.config["MONGO_URI"] = DATABASE_URI
app.config["SECRET_KEY"] = SECRET_KEY
mongo = PyMongo(app)
#### DB CONNECTION CHECKING
def test_db_connection():
try:
# Try to perform a simple query to check if the database connection is working
mongo.db.users.find_one() # Replace 'users' with the name of any existing collection in your database
# If the query above doesn't raise an exception, the connection is successful
return True
except Exception as e:
# If any exception occurs during the database query, the connection is not successful
return False
# ------------------------------------ ROUTES ------------------------------------------
#### AUTH PART #####
@app.route("/", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form.get("email")
password = request.form.get("password")
# Validate the username against the user collection in the database
user = mongo.db.users.find_one({"email": username})
if user:
# Verify the password based on the user's role
if user["role"] == "admin":
if bcrypt.checkpw(
password.encode("utf-8"), user["password"].encode("utf-8")
):
# Store the user's ID and name in the session
session["user_id"] = str(user["_id"])
session["user_name"] = user["name"]
return redirect(url_for("admin"))
else:
if bcrypt.checkpw(
password.encode("utf-8"), user["password"].encode("utf-8")
):
# Store the user's ID and name in the session
session["user_id"] = str(user["_id"])
session["user_name"] = user["name"]
return redirect(url_for("regular_user"))
# Handle invalid credentials (e.g., show an error message)
return "Invalid credentials or Account not Registered"
else:
return render_template(
"authentication-login.html", user_name=session.get("user_name")
)
@app.route("/register", methods=["GET", "POST"])
def register():
if request.method == "POST":
name = request.form.get("name")
email = request.form.get("email")
password = request.form.get("password")
# Hash the password based on the user's role
if email == "admin@example.com":
hashed_password = sha256_crypt.hash(password) # Use sha256_crypt for admin
else:
hashed_password = bcrypt.hashpw(
password.encode("utf-8"), bcrypt.gensalt()
).decode(
"utf-8"
) # Use bcrypt for regular users
role = "regular_user" # Default role for new users
if email == "admin@arazim.com":
role = "admin"
# Create a new user document
user = {
"name": name,
"email": email,
"password": hashed_password,
"role": role
# Add other user properties as needed
}
# Insert the user document into the users collection
mongo.db.users.insert_one(user)
# Redirect the user to the login page
return redirect(url_for("login"))
else:
return render_template("authentication-register.html")
@app.route("/logout")
def logout():
session.pop("user_id", None)
session.pop("user_name", None)
return redirect(url_for("login"))
@app.route("/admin")
def admin():
return render_template("admin-dashboard.html", user_name=session.get("user_name"))
@app.route("/admin/Results", methods=["GET", "POST"])
def AdminResults():
if request.method == "POST":
selected_rows = request.form.getlist("selected-rows")
results = session.get("results") # Retrieve the results from the session
# Store the selected_rows in the database using PyMongo
for row_index in selected_rows:
row = results[int(row_index) - 1]
mongo.db.customers.insert_one({"post": row[0], "label": row[1]})
# Redirect to a success page or perform any other actions
return render_template("success.html")
else:
results = model_ir.lr_BS()
session["results"] = results # Store the results in the session
return render_template(
"admin-dashboard.html", results=results, user_name=session.get("user_name")
)
@app.route("/regular_user")
def regular_user():
return render_template("regular_user.html", user_name=session.get("user_name"))
#### BUYER PART #####
@app.route("/buyer.html")
def buyer():
results = get_buyer_posts() # Retrieve buyer posts from the database
return render_template(
"buyer.html", user_name=session.get("user_name"), results=results
)
def get_buyer_posts():
# Retrieve posts from the customers collection with label "Buyer"
results = mongo.db.customers.find({"label": "Buyer"})
# Convert the cursor to a list of dictionaries with post IDs included
posts_with_ids = [
{"_id": str(post["_id"]), "post": post["post"]} for post in results
]
return posts_with_ids
@app.route("/similar_seller_posts/<buyer_post_id>")
def similar_seller_posts(buyer_post_id):
similar_posts = get_similar_seller_posts(buyer_post_id)
buyer_post = mongo.db.customers.find_one({"_id": ObjectId(buyer_post_id)})
return render_template(
"post-display.html",
buyer_post=buyer_post,
user_name=session.get("user_name"),
similar_posts=similar_posts,
)
def get_similar_seller_posts(buyer_post_id):
if buyer_post_id:
buyer_post = mongo.db.customers.find_one({"_id": ObjectId(buyer_post_id)})
if buyer_post:
buyer_attributes = extract_attributes(buyer_post["post"])
similar_seller_posts = content_based_filtering(buyer_attributes)
return similar_seller_posts
# Return an empty list if the buyer post is not found or buyer_post_id is None
return []
def extract_attributes(post):
# Extract relevant attributes from the post for content-based filtering
attributes = {
"content": post, # Use the entire post content as the "content" attribute
}
return attributes
def content_based_filtering(buyer_attributes):
# Retrieve seller posts from the database
seller_posts = mongo.db.customers.find({"label": "Seller"})
# Initialize variables to keep track of the most similar post and its similarity score
most_similar_post = None
highest_similarity_score = -1 # Initialize with a negative value
for post in seller_posts:
seller_attributes = extract_attributes(
post["post"]
) # Extract attributes from the "post" field
similarity_score = compute_similarity(buyer_attributes, seller_attributes)
# Check if the current post has a higher similarity score than the previous highest score
if similarity_score > highest_similarity_score:
highest_similarity_score = similarity_score
most_similar_post = post
if most_similar_post:
return [most_similar_post] # Return the most similar post as a list
else:
return [] # Return an empty list if no similar post is found
def compute_similarity(attributes1, attributes2):
# Convert attributes to feature vectors
vector1 = convert_attributes_to_vector(attributes1).reshape(1, -1)
vector2 = convert_attributes_to_vector(attributes2).reshape(1, -1)
# Compute cosine similarity between the vectors
similarity_score = cosine_similarity(vector1, vector2)[0][0]
return similarity_score
def convert_attributes_to_vector(attributes):
# Define a mapping between attributes and feature indices
feature_indices = {
"content": 0, # Use "content" as the only attribute
}
# Initialize a zero-filled feature vector
vector_length = len(feature_indices)
vector = np.zeros(vector_length)
# TF-IDF Vectorization for the "content" attribute
if "content" in attributes:
content = attributes["content"]
tfidf_vectorizer = TfidfVectorizer()
tfidf_matrix = tfidf_vectorizer.fit_transform([content])
content_vector = tfidf_matrix.toarray()
# Flatten the content vector to a 1D array
content_vector = content_vector.flatten()
# Assign the TF-IDF values to the corresponding feature index
if "content" in feature_indices:
feature_index = feature_indices["content"]
vector[feature_index] = content_vector[
0
] # Assuming only one post in this case
return vector
#### SELLER PART #####
@app.route("/seller.html")
def seller():
results = get_seller_posts() # Retrieve seller posts from the database
return render_template(
"seller.html", user_name=session.get("user_name"), results=results
)
def get_seller_posts():
results = mongo.db.customers.find({"label": "Seller"})
# Convert the cursor to a list of dictionaries with post IDs included
posts_with_ids = [
{"_id": str(post["_id"]), "post": post["post"]} for post in results
]
return posts_with_ids
@app.route("/similar_buyer_posts/<seller_post_id>")
def similar_buyer_posts(seller_post_id):
similar_posts = get_similar_buyer_posts(seller_post_id)
seller_post = mongo.db.customers.find_one({"_id": ObjectId(seller_post_id)})
return render_template(
"post-display-seller.html",
seller_post=seller_post,
user_name=session.get("user_name"),
similar_posts=similar_posts,
)
def get_similar_buyer_posts(seller_post_id):
if seller_post_id:
seller_post = mongo.db.customers.find_one({"_id": ObjectId(seller_post_id)})
if seller_post:
seller_attributes = extract_attributes(seller_post["post"])
similar_buyer_posts = content_based_filtering_seller(seller_attributes)
return similar_buyer_posts
# Return an empty list if the buyer post is not found or seller_post_id is None
return []
def content_based_filtering_seller(seller_attributes):
# Retrieve seller posts from the database
buyer_posts = mongo.db.customers.find({"label": "Buyer"})
# Initialize variables to keep track of the most similar post and its similarity score
most_similar_post = None
highest_similarity_score = -1 # Initialize with a negative value
for post in buyer_posts:
buyer_attributes = extract_attributes(
post["post"]
) # Extract attributes from the "post" field
similarity_score = compute_similarity(seller_attributes, buyer_attributes)
# Check if the current post has a higher similarity score than the previous highest score
if similarity_score > highest_similarity_score:
highest_similarity_score = similarity_score
most_similar_post = post
if most_similar_post:
return [most_similar_post] # Return the most similar post as a list
else:
return [] # Return an empty list if no similar post is found
##### OTHER FUNCTIONS #####
# New endpoint for seller and buyer post counts
@app.route("/post_counts", methods=["GET"])
def post_counts():
seller_count = mongo.db.customers.count_documents({"label": "Seller"})
buyer_count = mongo.db.customers.count_documents({"label": "Buyer"})
return jsonify({"seller_posts": seller_count, "buyer_posts": buyer_count})
if __name__ == "__main__":
try:
with app.app_context():
# Check the database connection before running the application
if test_db_connection():
app.run()
else:
print("Database connection failed.")
except Exception as e:
print(e)