Spaces:
sahanind
/
No application file

simplz / newweb.py
sahanind's picture
Create newweb.py
7c4a8d6 verified
raw
history blame
9.21 kB
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify
from flask_login import UserMixin, LoginManager, login_user, login_required, logout_user, current_user
from flask_wtf.csrf import generate_csrf
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.utils import secure_filename
from datetime import datetime
import sqlite3
import uuid
import os
import asyncio
from telethon import TelegramClient, events
from FastTelethonhelper import fast_download, fast_upload
# Configuration
app_id = os.getenv("APP_ID")
api_hash = os.getenv("API_HASH")
btoken = os.getenv("BOT")
chnl = os.getenv("CHN")
api_id = int(app_id)
bot_token = str(btoken)
channel = int(chnl)
app = Flask("Simplz")
app.config['SECRET_KEY'] = 'your_secret_key'
app.config['UPLOAD_FOLDER'] = 'static/users/uploaded_images'
app.config['ALLOWED_EXTENSIONS'] = {'png', 'jpg', 'jpeg', 'gif'}
login_manager = LoginManager(app)
login_manager.login_view = 'login'
# Database functions
def get_db():
conn = sqlite3.connect('instance/database.db')
conn.row_factory = sqlite3.Row
return conn
def close_db(conn):
conn.close()
# User model
class User(UserMixin):
def __init__(self, id, username, email, password):
self.id = id
self.username = username
self.email = email
self.password = password
@login_manager.user_loader
def load_user(user_id):
conn = get_db()
user_data = conn.execute('SELECT * FROM user WHERE id = ?', (user_id,)).fetchone()
close_db(conn)
if user_data:
return User(user_data['id'], user_data['username'], user_data['email'], user_data['password'])
return None
@app.route('/')
@login_required
def index():
conn = get_db()
posts = conn.execute('''
SELECT * FROM post WHERE user_id = ?
UNION ALL
SELECT * FROM post
WHERE user_id IN (SELECT followed_id FROM followers WHERE follower_id = ?)
ORDER BY created_at DESC
''', (current_user.id, current_user.id)).fetchall()
close_db(conn)
csrf_token = request.environ.get('HTTP_X_CSRFTOKEN')
return render_template('index.html', posts=posts, csrf_token=csrf_token)
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
email = request.form['email']
username = request.form['username']
password = request.form['password']
password2 = request.form['password2']
if password != password2:
flash('Passwords do not match. Please try again.', 'error')
return render_template('register.html')
conn = get_db()
existing_user_with_username = conn.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone()
existing_user_with_email = conn.execute('SELECT * FROM user WHERE email = ?', (email,)).fetchone()
if existing_user_with_username:
flash('Username already exists. Please choose a different username.', 'error')
return render_template('register.html')
if existing_user_with_email:
flash('Email address already registered. Please use a different email.', 'error')
return render_template('register.html')
hashed_password = generate_password_hash(password, method='scrypt')
conn.execute('INSERT INTO user (username, email, password) VALUES (?, ?, ?)', (username, email, hashed_password))
conn.commit()
close_db(conn)
flash('Account created successfully! Please log in.', 'success')
return redirect(url_for('login'))
return render_template('register.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
conn = get_db()
user_data = conn.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone()
close_db(conn)
if user_data and check_password_hash(user_data['password'], password):
user = User(user_data['id'], user_data['username'], user_data['email'], user_data['password'])
login_user(user)
return redirect(url_for('index'))
else:
flash('Invalid username or password. Fields are case sensitive.', 'error')
return render_template('login.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('index'))
@app.route('/create_post', methods=['POST'])
@login_required
def create_post():
content = request.form['content']
image = request.files['image']
if image and allowed_file(image.filename):
filename = str(uuid.uuid4()) + secure_filename(image.filename)
image.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
else:
filename = None
conn = get_db()
conn.execute('INSERT INTO post (content, user_id, filename) VALUES (?, ?, ?)', (content, current_user.id, filename))
conn.commit()
close_db(conn)
return redirect(url_for('index'))
@app.route('/delete_post/<int:post_id>', methods=['POST'])
@login_required
def delete_post(post_id):
conn = get_db()
post = conn.execute('SELECT * FROM post WHERE id = ?', (post_id,)).fetchone()
if post and post['user_id'] == current_user.id:
if post['filename']:
try:
os.remove(os.path.join(app.config['UPLOAD_FOLDER'], post['filename']))
except Exception as e:
flash(f"Error deleting image file: {str(e)}", "error")
conn.execute('DELETE FROM post WHERE id = ?', (post_id,))
conn.execute('DELETE FROM like WHERE post_id = ?', (post_id,))
conn.commit()
close_db(conn)
return redirect(url_for('index'))
@app.route('/follow/<int:user_id>', methods=['POST'])
@login_required
def follow(user_id):
conn = get_db()
user_to_follow = conn.execute('SELECT * FROM user WHERE id = ?', (user_id,)).fetchone()
if user_to_follow is None:
flash('User not found.', 'error')
return redirect(url_for('explore'))
if current_user.is_following(user_to_follow):
flash('You are already following this user.', 'info')
return redirect(url_for('view_profile', user_id=user_id))
try:
conn.execute('INSERT INTO followers (follower_id, followed_id) VALUES (?, ?)', (current_user.id, user_id))
conn.commit()
except Exception as e:
flash('Failed to follow the user. Please try again.', 'error')
print(f"Error: {str(e)}")
return redirect(url_for('view_profile', user_id=user_id))
flash(f"You are now following {user_to_follow['username']}.", 'success')
close_db(conn)
return redirect(url_for('view_profile', user_id=user_id))
@app.route('/unfollow/<int:user_id>', methods=['POST'])
@login_required
def unfollow(user_id):
conn = get_db()
user_to_unfollow = conn.execute('SELECT * FROM user WHERE id = ?', (user_id,)).fetchone()
if user_to_unfollow is None:
flash('User not found.', 'danger')
return redirect(url_for('index'))
if current_user.is_following(user_to_unfollow):
conn.execute('DELETE FROM followers WHERE follower_id = ? AND followed_id = ?', (current_user.id, user_id))
conn.commit()
flash('You have unfollowed {}.'.format(user_to_unfollow['username']), 'success')
else:
flash('You are not following this user.', 'info')
close_db(conn)
return redirect(url_for('view_profile', user_id=user_id))
@app.route('/search_user', methods=['GET'])
def search_user():
search_query = request.args.get('search_query', '')
conn = get_db()
users = conn.execute('''
SELECT * FROM user
WHERE username LIKE ? OR first_name LIKE ? OR last_name LIKE ?
''', (f'%{search_query}%', f'%{search_query}%', f'%{search_query}%')).fetchall()
close_db(conn)
csrf_token = generate_csrf()
return render_template('search_results.html', users=users, csrf_token=csrf_token, searchq=search_query)
# Other routes and functions remain mostly unchanged
def start_flask_app():
with app.app_context():
# Ensure database and tables exist
conn = get_db()
conn.execute('''CREATE TABLE IF NOT EXISTS user (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE,
email TEXT UNIQUE,
password TEXT
)''')
conn.execute('''CREATE TABLE IF NOT EXISTS post (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
user_id INTEGER,
filename TEXT,
FOREIGN KEY (user_id) REFERENCES user (id)
)''')
conn.execute('''CREATE TABLE IF NOT EXISTS followers (
follower_id INTEGER,
followed_id INTEGER,
PRIMARY KEY (follower_id, followed_id),
FOREIGN KEY (follower_id) REFERENCES user (id),
FOREIGN KEY (followed_id) REFERENCES user (id)
)''')
close_db(conn)
app.run(debug=True)
if __name__ == '__main__':
start_flask_app()