test111 / scripts /db_helper.py
killbill007's picture
Upload 1398 files
35cdf61 verified
import psycopg2
from psycopg2 import sql
import uuid
from datetime import datetime, timedelta
db_params = {
'dbname': 'API_DB',
'user': 'postgres',
'password': '4b95dfe8-4644-46ce-a4fe-648d6d4860a4',
'host': 'localhost',
'port': '5432'
}
# conn = psycopg2.connect(**db_params)
# conn.autocommit = True
# cur = conn.cursor()
#user db
def insert_user(db_params ,uid: uuid.UUID, paid: bool, first_name: str, last_name: str, email: str, password: str):
if uid is None or paid is None:
return False
insert_query = """
INSERT INTO users (id, paid, first_name, last_name, email, dor, la, password)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s);
"""
try:
current_time = datetime.now()
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
cur.execute(insert_query, (str(uid), paid, first_name, last_name, email, current_time, current_time, password))
conn.commit()
return True
except Exception as e:
print(f"Error: {e}")
return False
finally:
if cur:
cur.close()
if conn:
conn.close()
def user_exists(db_params, user_id: uuid.UUID) -> bool:
try:
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
query = "SELECT EXISTS (SELECT 1 FROM users WHERE id = %s);"
cur.execute(query, (str(user_id),))
result = cur.fetchone()
if result:
exists = result[0]
return exists
else:
return False
except Exception as e:
print(f"Error checking if user exists: {e}")
return False
finally:
if cur:
cur.close()
if conn:
conn.close()
def has_user_paid(db_params, user_id: uuid.UUID) -> bool:
try:
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
query = "SELECT paid FROM users WHERE id = %s;"
cur.execute(query, (str(user_id),))
result = cur.fetchone()
if result:
paid_status = result[0]
return paid_status
else:
return False
except Exception as e:
print(f"Error checking if user has paid: {e}")
return False
finally:
if cur:
cur.close()
if conn:
conn.close()
def get_user_id_by_email(db_params, email: str) -> uuid.UUID:
try:
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
query = "SELECT id FROM users WHERE email = %s;"
cur.execute(query, (email,))
result = cur.fetchone()
if result:
user_id = result[0]
return uuid.UUID(user_id)
else:
return None
except Exception as e:
print(f"Error retrieving user ID by email: {e}")
return None
finally:
if cur:
cur.close()
if conn:
conn.close()
def user_change_password(db_params, email: str, old_password: str, new_password: str) -> bool:
try:
user_id = get_user_id_by_email(db_params, email)
if user_id is None:
return False
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
query = "SELECT password FROM users WHERE id = %s;"
cur.execute(query, (str(user_id),))
result = cur.fetchone()
if result:
current_password = result[0]
if current_password == old_password:
update_query = "UPDATE users SET password = %s WHERE id = %s;"
cur.execute(update_query, (new_password, str(user_id)))
conn.commit()
return True
else:
return False
else:
return False
except Exception as e:
print(f"Error changing password: {e}")
return False
finally:
if cur:
cur.close()
if conn:
conn.close()
def update_user_paid_state(db_params, email: str = None, user_id: uuid.UUID = None, paid_state: bool = False) -> bool:
if email is None and user_id is None:
print("Error: Either email or user ID must be provided.")
return False
if email is not None:
user_id = get_user_id_by_email(db_params, email)
if user_id is None:
return False
try:
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
update_query = "UPDATE users SET paid = %s WHERE id = %s;"
cur.execute(update_query, (paid_state, str(user_id)))
conn.commit()
if cur.rowcount > 0:
return True
else:
return False
except Exception as e:
print(f"Error updating paid state: {e}")
return False
finally:
if cur:
cur.close()
if conn:
conn.close()
def delete_user_by_id(db_params, user_id: uuid.UUID) -> bool:
try:
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
delete_query = "DELETE FROM users WHERE id = %s;"
cur.execute(delete_query, (str(user_id),))
conn.commit()
if cur.rowcount > 0:
return True
else:
return False
except Exception as e:
print(f"Error deleting user by ID: {e}")
return False
finally:
if cur:
cur.close()
if conn:
conn.close()
def update_last_activity(db_params, email: str = None, user_id: uuid.UUID = None) -> bool:
if email is None and user_id is None:
print("Error: Either email or user ID must be provided.")
return False
if email is not None:
user_id = get_user_id_by_email(db_params, email)
if user_id is None:
return False
try:
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
current_time = datetime.now()
update_query = "UPDATE users SET la = %s WHERE id = %s;"
cur.execute(update_query, (current_time, str(user_id)))
conn.commit()
if cur.rowcount > 0:
return True
else:
return False
except Exception as e:
print(f"Error updating last activity: {e}")
return False
finally:
if cur:
cur.close()
if conn:
conn.close()
#request table
def insert_request(db_params, request_id: uuid.UUID, video_id: uuid.UUID, user_id: uuid.UUID, image_base64: str, public: bool, report: bool,
request_service: str, registration_token: str) -> bool:
if not user_exists(db_params, user_id):
print("Error: User does not exist.")
return False
paid = has_user_paid(db_params, user_id)
insert_query = """
INSERT INTO request (id, video_id, user_id, image_base64, status, tor, toc, public, paid, report, request_service, registration_token)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
"""
conn = None
cur = None
try:
current_time = datetime.now()
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
cur.execute(insert_query, (
str(request_id),
str(video_id),
str(user_id),
image_base64,
'Requested',
current_time,
None,
public,
paid,
report,
request_service,
registration_token
))
conn.commit()
if cur.rowcount > 0:
return True
else:
return False
except Exception as e:
print(f"Error inserting request: {e}")
return False
finally:
if cur:
cur.close()
if conn:
conn.close()
def get_request_data(db_params, request_id: uuid.UUID):
select_query = """
SELECT image_base64, video_id, request_service, registration_token
FROM request
WHERE id = %s;
"""
try:
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
cur.execute(select_query, (str(request_id),))
result = cur.fetchone()
if result is None:
return None
image_base64 = result[0]
video_id = result[1]
request_service = result[2]
registration_token = result[3]
return [image_base64, video_id, request_service, registration_token]
except Exception as e:
print(f"Error retrieving image and video: {e}")
return None
finally:
if cur:
cur.close()
if conn:
conn.close()
def update_request_status(db_params, request_id: uuid.UUID, new_status: str) -> bool:
update_query = """
UPDATE request
SET status = %s,
dor = %s
WHERE id = %s;
"""
conn = None
cur = None
try:
current_time = datetime.now()
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
cur.execute(update_query, (new_status, current_time, str(request_id)))
conn.commit()
if cur.rowcount > 0:
return True
else:
return False
except Exception as e:
print(f"Error updating request status: {e}")
return False
finally:
if cur:
cur.close()
if conn:
conn.close()
def check_and_update_request(db_params, request_service: str, video_id: uuid.UUID, image_base64: str, registration_token: str):
five_minutes_ago = datetime.now() - timedelta(minutes=5)
check_query = """
SELECT id, dor
FROM request
WHERE video_id = %s
AND image_base64 = %s
AND created_at >= %s
AND status = 'Requested'
AND request_service = %s
"""
try:
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
cur.execute(check_query, (str(video_id), image_base64, five_minutes_ago, request_service))
existing_request = cur.fetchone()
if existing_request:
existing_request_id, dor, existing_token, existing_ls = existing_request
update_query = """
UPDATE request
SET dor = %s,
registration_token = %s,
ls = %s
WHERE id = %s
"""
cur.execute(update_query, (datetime.now(), registration_token, datetime.now(), str(existing_request_id)))
conn.commit()
return existing_request_id
return None
except Exception as e:
print(f"Error checking or updating request: {e}")
return None
finally:
if cur:
cur.close()
if conn:
conn.close()
def update_reported_status(db_params, request_id: uuid.UUID) -> bool:
update_query = """
UPDATE request
SET report = TRUE
WHERE id = %s;
"""
try:
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
cur.execute(update_query, (str(request_id),))
conn.commit()
if cur.rowcount > 0:
return True
else:
return False
except Exception as e:
print(f"Error updating reported status: {e}")
return False
finally:
if cur:
cur.close()
if conn:
conn.close()
def request_id_exists(db_params, request_id: uuid.UUID) -> bool:
check_query = """
SELECT EXISTS (
SELECT 1
FROM request
WHERE id = %s
);
"""
try:
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
cur.execute(check_query, (str(request_id),))
exists = cur.fetchone()[0]
return exists
except Exception as e:
print(f"Error checking if request ID exists: {e}")
return False
finally:
if cur:
cur.close()
if conn:
conn.close()
def delete_request(db_params, request_id: uuid.UUID) -> bool:
delete_query = """
DELETE FROM request
WHERE id = %s;
"""
try:
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
cur.execute(delete_query, (str(request_id),))
conn.commit()
if cur.rowcount > 0:
return True
else:
return False
except Exception as e:
print(f"Error deleting request: {e}")
return False
finally:
if cur:
cur.close()
if conn:
conn.close()
def update_request_status(db_params, request_id: uuid.UUID, new_status: str) -> bool:
update_query = """
UPDATE request
SET status = %s
WHERE id = %s;
"""
try:
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
cur.execute(update_query, (new_status, str(request_id)))
conn.commit()
if cur.rowcount > 0:
return True
else:
return False
except Exception as e:
print(f"Error updating request status: {e}")
return False
finally:
if cur:
cur.close()
if conn:
conn.close()
def get_oldest_requests(db_params, limit: int):
select_query = """
SELECT id, video_id, image_base64, tor, paid
FROM request
WHERE status = 'Requested'-- AND paid = True
ORDER BY CASE WHEN paid THEN 0 ELSE 1 END, tor ASC
LIMIT %s;
"""
try:
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
cur.execute(select_query, (limit,))
rows = cur.fetchall()
if rows:
return rows
else:
return None
except Exception as e:
print(f"Error retrieving oldest requests: {e}")
return None
finally:
if cur:
cur.close()
if conn:
conn.close()
def request_remove_image(db_params, request_id: uuid.UUID) -> bool:
check_query = """
SELECT image_base64
FROM request
WHERE id = %s;
"""
update_query = """
UPDATE request
SET image_base64 = 'image_placeholder'
WHERE id = %s AND image_base64 IS NOT NULL AND image_base64 <> '';
"""
try:
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
cur.execute(check_query, (str(request_id),))
result = cur.fetchone()
if result is None or result[0] in (None, ''):
return False
cur.execute(update_query, (str(request_id),))
conn.commit()
if cur.rowcount > 0:
return True
else:
return False
except Exception as e:
print(f"Error updating image_base64: {e}")
return False
finally:
if cur:
cur.close()
if conn:
conn.close()
# ban list
def insert_ban(db_params, ip: str, reason: str) -> bool:
try:
conn = psycopg2.connect(**db_params)
cursor = conn.cursor()
cursor.execute('''INSERT INTO ban_list (ip, reason) VALUES (%s, %s)''', (ip, reason))
conn.commit()
return True
except Exception as e:
print(f"Error inserting ban: {e}")
return False
finally:
if cursor:
cursor.close()
if conn:
conn.close()
def search_ban(db_params, ip: str) -> bool:
try:
conn = psycopg2.connect(**db_params)
cursor = conn.cursor()
cursor.execute('''SELECT * FROM ban_list WHERE ip = %s''', (ip,))
rows = cursor.fetchall()
return len(rows) > 0
except Exception as e:
print(f"Error searching ban list: {e}")
return False
finally:
if cursor:
cursor.close()
if conn:
conn.close()
#content
def get_videos_data(db_params):
conn = psycopg2.connect(**db_params)
cursor = conn.cursor()
try:
cursor.execute('''
SELECT id, title, backend_video_url, backend_thumbnail_url
FROM videos
ORDER BY created_at DESC
LIMIT 4
''')
latest_videos = cursor.fetchall()
cursor.execute('''
SELECT id, title, backend_video_url, backend_thumbnail_url
FROM videos
ORDER BY likes DESC
LIMIT 4
''')
trending_videos = cursor.fetchall()
cursor.execute('''
SELECT id, title, backend_video_url, backend_thumbnail_url
FROM videos
ORDER BY hits DESC
LIMIT 4
''')
hot_videos = cursor.fetchall()
cursor.execute('''
SELECT DISTINCT category FROM videos
''')
categories = cursor.fetchall()
def format_video(video):
return {
"id": video[0],
"url": video[2],
"title": video[1],
"info": "",
"thumbnail": video[3]
}
response = []
latest_category = {
"id": "latest",
"title": "Latest",
"urls": [format_video(video) for video in latest_videos]
}
trending_category = {
"id": "trending",
"title": "Trending",
"urls": [format_video(video) for video in trending_videos]
}
hot_category = {
"id": "hot",
"title": "Hot",
"urls": [format_video(video) for video in hot_videos]
}
response.extend([trending_category, latest_category, hot_category])
for category in categories:
category_name = category[0]
cursor.execute('''
SELECT id, title, backend_video_url, backend_thumbnail_url
FROM videos
WHERE category = %s
''', (category_name,))
videos_in_category = cursor.fetchall()
category_data = {
"id": category_name,
"title": category_name,
"urls": [format_video(video) for video in videos_in_category]
}
response.append(category_data)
except Exception as e:
print(f"An error occurred: {e}")
finally:
if cursor is not None:
cursor.close()
if conn is not None:
conn.close()
return response
def video_id_exists(db_params, video_id: uuid.UUID) -> bool:
check_query = """
SELECT EXISTS (
SELECT 1
FROM videos
WHERE id = %s
);
"""
try:
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
cur.execute(check_query, (str(video_id),))
exists = cur.fetchone()[0]
return exists
except Exception as e:
print(f"Error checking if video ID exists: {e}")
return False
finally:
if cur:
cur.close()
if conn:
conn.close()
def increment_likes(db_params, video_id: uuid.UUID) -> bool:
increment_query = """
UPDATE videos
SET likes = likes + 1
WHERE id = %s;
"""
try:
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
cur.execute(increment_query, (str(video_id),))
conn.commit()
if cur.rowcount > 0:
return True
else:
return False
except Exception as e:
print(f"Error incrementing likes: {e}")
return False
finally:
if cur:
cur.close()
if conn:
conn.close()
def increment_dislikes(db_params, video_id: uuid.UUID) -> bool:
increment_query = """
UPDATE videos
SET dislikes = dislikes + 1
WHERE id = %s;
"""
try:
conn = psycopg2.connect(**db_params)
cur = conn.cursor()
cur.execute(increment_query, (str(video_id),))
conn.commit()
if cur.rowcount > 0:
return True
else:
return False
except Exception as e:
print(f"Error incrementing dislikes: {e}")
return False
finally:
if cur:
cur.close()
if conn:
conn.close()