|
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify |
|
from flask_login import UserMixin, LoginManager, login_user, login_required, logout_user, current_user |
|
from flask_wtf.csrf import generate_csrf |
|
from werkzeug.security import generate_password_hash, check_password_hash |
|
from werkzeug.utils import secure_filename |
|
from datetime import datetime |
|
import sqlite3 |
|
import uuid |
|
import os |
|
import asyncio |
|
from telethon import TelegramClient, events |
|
from FastTelethonhelper import fast_download, fast_upload |
|
|
|
|
|
app_id = os.getenv("APP_ID") |
|
api_hash = os.getenv("API_HASH") |
|
btoken = os.getenv("BOT") |
|
chnl = os.getenv("CHN") |
|
|
|
api_id = int(app_id) |
|
bot_token = str(btoken) |
|
channel = int(chnl) |
|
|
|
app = Flask("Simplz") |
|
app.config['SECRET_KEY'] = 'your_secret_key' |
|
app.config['UPLOAD_FOLDER'] = 'static/users/uploaded_images' |
|
app.config['ALLOWED_EXTENSIONS'] = {'png', 'jpg', 'jpeg', 'gif'} |
|
|
|
login_manager = LoginManager(app) |
|
login_manager.login_view = 'login' |
|
|
|
|
|
def get_db(): |
|
conn = sqlite3.connect('instance/database.db') |
|
conn.row_factory = sqlite3.Row |
|
return conn |
|
|
|
def close_db(conn): |
|
conn.close() |
|
|
|
|
|
class User(UserMixin): |
|
def __init__(self, id, username, email, password): |
|
self.id = id |
|
self.username = username |
|
self.email = email |
|
self.password = password |
|
|
|
@login_manager.user_loader |
|
def load_user(user_id): |
|
conn = get_db() |
|
user_data = conn.execute('SELECT * FROM user WHERE id = ?', (user_id,)).fetchone() |
|
close_db(conn) |
|
if user_data: |
|
return User(user_data['id'], user_data['username'], user_data['email'], user_data['password']) |
|
return None |
|
|
|
@app.route('/') |
|
@login_required |
|
def index(): |
|
conn = get_db() |
|
posts = conn.execute(''' |
|
SELECT * FROM post WHERE user_id = ? |
|
UNION ALL |
|
SELECT * FROM post |
|
WHERE user_id IN (SELECT followed_id FROM followers WHERE follower_id = ?) |
|
ORDER BY created_at DESC |
|
''', (current_user.id, current_user.id)).fetchall() |
|
close_db(conn) |
|
|
|
csrf_token = request.environ.get('HTTP_X_CSRFTOKEN') |
|
return render_template('index.html', posts=posts, csrf_token=csrf_token) |
|
|
|
@app.route('/register', methods=['GET', 'POST']) |
|
def register(): |
|
if request.method == 'POST': |
|
email = request.form['email'] |
|
username = request.form['username'] |
|
password = request.form['password'] |
|
password2 = request.form['password2'] |
|
|
|
if password != password2: |
|
flash('Passwords do not match. Please try again.', 'error') |
|
return render_template('register.html') |
|
|
|
conn = get_db() |
|
existing_user_with_username = conn.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone() |
|
existing_user_with_email = conn.execute('SELECT * FROM user WHERE email = ?', (email,)).fetchone() |
|
|
|
if existing_user_with_username: |
|
flash('Username already exists. Please choose a different username.', 'error') |
|
return render_template('register.html') |
|
|
|
if existing_user_with_email: |
|
flash('Email address already registered. Please use a different email.', 'error') |
|
return render_template('register.html') |
|
|
|
hashed_password = generate_password_hash(password, method='scrypt') |
|
conn.execute('INSERT INTO user (username, email, password) VALUES (?, ?, ?)', (username, email, hashed_password)) |
|
conn.commit() |
|
close_db(conn) |
|
|
|
flash('Account created successfully! Please log in.', 'success') |
|
return redirect(url_for('login')) |
|
|
|
return render_template('register.html') |
|
|
|
@app.route('/login', methods=['GET', 'POST']) |
|
def login(): |
|
if request.method == 'POST': |
|
username = request.form['username'] |
|
password = request.form['password'] |
|
conn = get_db() |
|
user_data = conn.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone() |
|
close_db(conn) |
|
|
|
if user_data and check_password_hash(user_data['password'], password): |
|
user = User(user_data['id'], user_data['username'], user_data['email'], user_data['password']) |
|
login_user(user) |
|
return redirect(url_for('index')) |
|
else: |
|
flash('Invalid username or password. Fields are case sensitive.', 'error') |
|
return render_template('login.html') |
|
|
|
@app.route('/logout') |
|
@login_required |
|
def logout(): |
|
logout_user() |
|
return redirect(url_for('index')) |
|
|
|
@app.route('/create_post', methods=['POST']) |
|
@login_required |
|
def create_post(): |
|
content = request.form['content'] |
|
image = request.files['image'] |
|
|
|
if image and allowed_file(image.filename): |
|
filename = str(uuid.uuid4()) + secure_filename(image.filename) |
|
image.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) |
|
else: |
|
filename = None |
|
|
|
conn = get_db() |
|
conn.execute('INSERT INTO post (content, user_id, filename) VALUES (?, ?, ?)', (content, current_user.id, filename)) |
|
conn.commit() |
|
close_db(conn) |
|
return redirect(url_for('index')) |
|
|
|
@app.route('/delete_post/<int:post_id>', methods=['POST']) |
|
@login_required |
|
def delete_post(post_id): |
|
conn = get_db() |
|
post = conn.execute('SELECT * FROM post WHERE id = ?', (post_id,)).fetchone() |
|
|
|
if post and post['user_id'] == current_user.id: |
|
if post['filename']: |
|
try: |
|
os.remove(os.path.join(app.config['UPLOAD_FOLDER'], post['filename'])) |
|
except Exception as e: |
|
flash(f"Error deleting image file: {str(e)}", "error") |
|
|
|
conn.execute('DELETE FROM post WHERE id = ?', (post_id,)) |
|
conn.execute('DELETE FROM like WHERE post_id = ?', (post_id,)) |
|
conn.commit() |
|
close_db(conn) |
|
return redirect(url_for('index')) |
|
|
|
@app.route('/follow/<int:user_id>', methods=['POST']) |
|
@login_required |
|
def follow(user_id): |
|
conn = get_db() |
|
user_to_follow = conn.execute('SELECT * FROM user WHERE id = ?', (user_id,)).fetchone() |
|
|
|
if user_to_follow is None: |
|
flash('User not found.', 'error') |
|
return redirect(url_for('explore')) |
|
|
|
if current_user.is_following(user_to_follow): |
|
flash('You are already following this user.', 'info') |
|
return redirect(url_for('view_profile', user_id=user_id)) |
|
|
|
try: |
|
conn.execute('INSERT INTO followers (follower_id, followed_id) VALUES (?, ?)', (current_user.id, user_id)) |
|
conn.commit() |
|
except Exception as e: |
|
flash('Failed to follow the user. Please try again.', 'error') |
|
print(f"Error: {str(e)}") |
|
return redirect(url_for('view_profile', user_id=user_id)) |
|
|
|
flash(f"You are now following {user_to_follow['username']}.", 'success') |
|
close_db(conn) |
|
return redirect(url_for('view_profile', user_id=user_id)) |
|
|
|
@app.route('/unfollow/<int:user_id>', methods=['POST']) |
|
@login_required |
|
def unfollow(user_id): |
|
conn = get_db() |
|
user_to_unfollow = conn.execute('SELECT * FROM user WHERE id = ?', (user_id,)).fetchone() |
|
|
|
if user_to_unfollow is None: |
|
flash('User not found.', 'danger') |
|
return redirect(url_for('index')) |
|
|
|
if current_user.is_following(user_to_unfollow): |
|
conn.execute('DELETE FROM followers WHERE follower_id = ? AND followed_id = ?', (current_user.id, user_id)) |
|
conn.commit() |
|
flash('You have unfollowed {}.'.format(user_to_unfollow['username']), 'success') |
|
else: |
|
flash('You are not following this user.', 'info') |
|
|
|
close_db(conn) |
|
return redirect(url_for('view_profile', user_id=user_id)) |
|
|
|
@app.route('/search_user', methods=['GET']) |
|
def search_user(): |
|
search_query = request.args.get('search_query', '') |
|
conn = get_db() |
|
users = conn.execute(''' |
|
SELECT * FROM user |
|
WHERE username LIKE ? OR first_name LIKE ? OR last_name LIKE ? |
|
''', (f'%{search_query}%', f'%{search_query}%', f'%{search_query}%')).fetchall() |
|
close_db(conn) |
|
|
|
csrf_token = generate_csrf() |
|
return render_template('search_results.html', users=users, csrf_token=csrf_token, searchq=search_query) |
|
|
|
|
|
|
|
def start_flask_app(): |
|
with app.app_context(): |
|
|
|
conn = get_db() |
|
conn.execute('''CREATE TABLE IF NOT EXISTS user ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
username TEXT UNIQUE, |
|
email TEXT UNIQUE, |
|
password TEXT |
|
)''') |
|
|
|
conn.execute('''CREATE TABLE IF NOT EXISTS post ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
content TEXT, |
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, |
|
user_id INTEGER, |
|
filename TEXT, |
|
FOREIGN KEY (user_id) REFERENCES user (id) |
|
)''') |
|
|
|
conn.execute('''CREATE TABLE IF NOT EXISTS followers ( |
|
follower_id INTEGER, |
|
followed_id INTEGER, |
|
PRIMARY KEY (follower_id, followed_id), |
|
FOREIGN KEY (follower_id) REFERENCES user (id), |
|
FOREIGN KEY (followed_id) REFERENCES user (id) |
|
)''') |
|
|
|
close_db(conn) |
|
|
|
app.run(debug=True) |
|
|
|
if __name__ == '__main__': |
|
start_flask_app() |
|
|