Spaces:
Running
Running
import os | |
import random | |
import uuid | |
import time | |
import json | |
import base64 | |
import asyncio | |
import logging | |
import atexit | |
import numpy as np | |
import requests | |
from flask import Flask, render_template, request, redirect | |
from threading import Thread | |
from pymongo import MongoClient | |
from google.oauth2.credentials import Credentials | |
from google.auth.transport.requests import Request | |
from googleapiclient.discovery import build | |
from googleapiclient.http import MediaFileUpload | |
from moviepy.editor import VideoFileClip, ColorClip, CompositeVideoClip, ImageClip | |
from moviepy.video.fx import resize | |
from moviepy.editor import VideoFileClip, CompositeVideoClip, ColorClip, ImageClip, TextClip | |
from PIL import Image, ImageDraw, ImageFont | |
from datetime import datetime, timedelta, timezone | |
from moviepy.editor import * | |
import builtins | |
import logging | |
import re | |
import builtins | |
import re | |
import subprocess | |
# paste your full cookie JSON here | |
raw_cookies = [ | |
{"name": "datr", "value": "ejpyaDZsc2mYbftSKZ3Sk2Lu"}, | |
{"name": "ds_user_id", "value": "75543047114"}, | |
{"name": "csrftoken", "value": "UCHoYMciHF3W2HynHCA3fBEZfJd1beB5"}, | |
{"name": "ig_did", "value": "E6AD4B02-7FC5-4C36-9570-7AE89F1A85D3"}, | |
{"name": "wd", "value": "468x935"}, | |
{"name": "mid", "value": "aHI6egABAAEk7DQNY7Nj1qArGgnq"}, | |
{"name": "sessionid", "value": "75543047114%3AmWa1OwxIcbgqMd%3A6%3AAYcaKZIghR6DJJsEZSZjsunYklzQ4jcTREcsdBkIrA"}, | |
{"name": "dpr", "value": "2.608695652173913"}, | |
{"name": "rur", "value": "\"NHA\\05475543047114\\0541783852609:01feb3e34e17329d37efbc1dfbc7d6c7cbdfd7496e0c5ea0f5e76a3125f0dda6e0b45e18\""} | |
] | |
# Convert to requests cookie dict | |
cookies = {c['name']: c['value'] for c in raw_cookies} | |
#====== LOGGING SETUP ====== | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
handlers=[ | |
logging.FileHandler("app.log"), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# ====== FLASK APP ====== | |
app = Flask(__name__) | |
# ====== MONGO DB ====== | |
client = MongoClient(os.getenv("MONGO_URI", "mongodb+srv://kanhagarg930123:kanha@kanha.jhlzb3k.mongodb.net/?retryWrites=true&w=majority&appName=kanha")) | |
db1 = client.shortttt | |
meta = db1.meta | |
db2 = client["instagram_bot"] | |
reel_progress = db2["reel_progresss"] | |
# ====== CONSTANTS ====== | |
CAPTIONS = [ | |
"Wait for it π", "Watch till end π", "Try not to laugh π€£", | |
"Don't skip this π₯", "You won't expect this! π", "Keep watching π", | |
"Stay till end! π₯", "Funniest one yet" | |
] | |
BLOCKLIST = [ | |
"nsfw","18+", "xxx", "sexy", "adult", "porn", "onlyfans", "escort", | |
"betting", "gambling", "iplwin", "1xbet", "winzo", "my11circle", "dream11", | |
"rummy", "teenpatti", "fantasy", "casino", "promotion" | |
] | |
UPLOAD_TIMES = [] | |
NEXT_RESET = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) | |
# ====== FUNCTIONS ====== | |
def get_next_part(): | |
last = meta.find_one(sort=[("part", -1)]) | |
return 1 if not last else last["part"] + 1 | |
def generate_description(title): | |
return f"Watch this hilarious clip: {title}" | |
# MongoDB integration assumed via reel_progress collection | |
def get_next_fetch_index(username): | |
entry = reel_progress.find_one({"username": username}) | |
return entry["last_index"] if entry and "last_index" in entry else 0 | |
def update_fetch_index(username, new_index): | |
reel_progress.update_one( | |
{"username": username}, | |
{"$set": {"last_index": new_index}}, | |
upsert=True | |
) | |
def is_reel_fetched_or_skipped(username, reel_id): | |
entry = reel_progress.find_one({"username": username}) | |
if not entry: | |
return False | |
return reel_id in entry.get("fetched_ids", []) or reel_id in entry.get("skipped_ids", []) | |
def mark_reel_fetched(username, reel_id): | |
reel_progress.update_one( | |
{"username": username}, | |
{"$addToSet": {"fetched_ids": reel_id}}, | |
upsert=True | |
) | |
def mark_reel_skipped(username, reel_id): | |
reel_progress.update_one( | |
{"username": username}, | |
{"$addToSet": {"skipped_ids": reel_id}}, | |
upsert=True | |
) | |
import os | |
import base64 | |
import tempfile | |
from telethon.sync import TelegramClient | |
import os | |
from telethon.sync import TelegramClient | |
from telethon.sessions import StringSession | |
TG_API_ID = int(os.getenv("API_ID", 3704772)) | |
TG_API_HASH = os.getenv("API_HASH", "b8e50a035abb851c0dd424e14cac4c06") | |
SESSION_B64 = os.getenv("SESSION", "1BVtsOIEBu5jNa-E88vbC710wzyAKnteiQK7WQNvDK6wCeD3Q_c33uSaqvlvCUMglbhuVtugstOwIuE7WV2EuLGUEHH3AFJ84MZ93WVs010bx4N9nQOaQJNjIZDe4Nllq9r5PKRXxjgwuSqN-B7_TfpjQJT_ztOqQNZTQV3o9EqPBXfpiMzVF5U638wuRDVkInjbgAkI9ao36KDmcvBJzAW91l27loIsUL-Zst9H6XbAbVgqfs1fTOI6xQqPEyrFA-gB7lHbKrkqILwAmK88tTRQFHOvvXkGkJpDrXb1MZQssaVilGfXQ7sXWPltMpHklRy8HaPdfOUJ8t105DHXf-CN6SvzombM=") | |
if not SESSION_B64: | |
raise ValueError("SESSION is not set") | |
client = TelegramClient(StringSession(SESSION_B64), TG_API_ID, TG_API_HASH) | |
import json | |
import time | |
import random | |
import pathlib | |
import requests | |
from typing import Optional, Tuple | |
from telethon.sync import TelegramClient | |
from telethon.tl.functions.messages import GetHistoryRequest | |
# Assumes MongoDB functions are available: | |
# get_next_fetch_index, update_fetch_index, is_reel_fetched_or_skipped, | |
# mark_reel_fetched, mark_reel_skipped | |
RAW_USERNAMES = os.getenv("RAW_USERNAMES", "nickyteja06,videshi__indian,_notyourtype_yt_,sisterfunnyreel").split(",") | |
REACTED_USERNAMES = os.getenv("REACTED_USERNAMES", "biggestreact").split(",") | |
import os | |
import json | |
import time | |
import random | |
import pathlib | |
import requests | |
from typing import Optional, Tuple | |
from telethon.tl.functions.messages import GetHistoryRequest | |
# Constants | |
IG_HEADERS = { | |
"User-Agent": "Mozilla/5.0", | |
# Optional: Uncomment if using sessionid | |
# "Cookie": f"sessionid={os.getenv('IG_SESSION_ID')}" | |
} | |
import requests | |
def get_user_id(username: str) -> Optional[str]: | |
Β Β headers = { | |
Β Β Β Β "User-Agent": "Mozilla/5.0 (Linux; Android 10; Mobile)", | |
Β Β Β Β "X-IG-App-ID": "936619743392459", | |
Β Β Β Β "Referer": f"https://www.instagram.com/{username}/", | |
Β Β Β Β "X-Requested-With": "XMLHttpRequest" | |
Β Β } | |
Β Β try: | |
Β Β Β Β res = requests.get( | |
Β Β Β Β Β Β f"https://www.instagram.com/api/v1/users/web_profile_info/?username={username}", | |
Β Β Β Β Β Β headers=headers, | |
Β Β Β Β Β Β cookies=cookies, Β # cookies from JSON you pasted | |
Β Β Β Β Β Β timeout=10 | |
Β Β Β Β ) | |
Β Β Β Β if res.status_code == 200: | |
Β Β Β Β Β Β return res.json()["data"]["user"]["id"] | |
Β Β Β Β print(f"β οΈ Failed to get user ID: {res.status_code}") | |
Β Β except Exception as e: | |
Β Β Β Β print(f"β Error fetching user ID for @{username}: {e}") | |
Β Β return None | |
def get_links(user_id: str, limit: int = 5) -> list[Tuple[str, str]]: | |
links = [] | |
query_hash = "58b6785bea111c67129decbe6a448951" | |
end_cursor = "" | |
while len(links) < limit: | |
variables = { | |
"id": user_id, | |
"first": 12, | |
"after": end_cursor | |
} | |
params = { | |
"query_hash": query_hash, | |
"variables": json.dumps(variables) | |
} | |
try: | |
res = requests.get("https://www.instagram.com/graphql/query/", params=params, headers=IG_HEADERS, timeout=10) | |
if res.status_code != 200: | |
logger.warning(f"[β οΈ] Failed to get links for user {user_id}, status {res.status_code}") | |
break | |
media = res.json()["data"]["user"]["edge_owner_to_timeline_media"] | |
for edge in media["edges"]: | |
reel_id = edge["node"]["id"] | |
shortcode = edge["node"]["shortcode"] | |
link = f"https://www.instagram.com/p/{shortcode}/" | |
links.append((reel_id, link)) | |
if len(links) >= limit: | |
break | |
if not media["page_info"]["has_next_page"]: | |
break | |
end_cursor = media["page_info"]["end_cursor"] | |
except Exception as e: | |
logger.warning(f"[β] get_links() error: {e}") | |
break | |
logger.info(f"[π] Found {len(links)} links for user ID {user_id}") | |
return links | |
def send_to_bot_and_get_video(link: str) -> Optional[str]: | |
try: | |
entity = client.loop.run_until_complete(client.get_entity("instasavegrambot")) | |
client.loop.run_until_complete(client.send_message(entity, link)) | |
for _ in range(15): # Wait up to 30s | |
time.sleep(2) | |
history = client.loop.run_until_complete( | |
client(GetHistoryRequest( | |
peer=entity, | |
limit=2, | |
offset_id=0, | |
offset_date=None, | |
add_offset=0, | |
max_id=0, | |
min_id=0, | |
hash=0 | |
)) | |
) | |
for msg in history.messages: | |
logger.info(f"[π¨] Bot message: {msg.message or '[media]'}") | |
if msg.video and 20 <= msg.video.duration <= 180: | |
path = f"reels/{msg.id}.mp4" | |
pathlib.Path("reels").mkdir(exist_ok=True) | |
client.loop.run_until_complete(msg.download_media(file=path)) | |
logger.info(f"[β ] Downloaded reel to {path}") | |
return path | |
except Exception as e: | |
logger.warning(f"[β] send_to_bot_and_get_video error: {e}") | |
return None | |
def fetch_valid_reel() -> Tuple[Optional[str], Optional[str]]: | |
with client: | |
pools = [(RAW_USERNAMES, "raw"), (REACTED_USERNAMES, "reacted")] | |
pools = [(p, t) for p, t in pools if p] | |
if not pools: | |
logger.warning("[β οΈ] No usernames in either RAW or REACTED pools.") | |
return None, None | |
random.shuffle(pools) | |
for usernames, reel_type in pools: | |
random.shuffle(usernames) | |
for username in usernames: | |
logger.info(f"[π] Trying @{username} ({reel_type})") | |
uid = get_user_id(username) | |
if not uid: | |
logger.warning(f"[π«] Skipping @{username}, failed to get user ID.") | |
continue | |
fetch_idx = get_next_fetch_index(username) | |
links = get_links(uid, limit=5) | |
for idx, (reel_id, link) in enumerate(links[fetch_idx:], start=fetch_idx): | |
if is_reel_fetched_or_skipped(username, reel_id): | |
logger.info(f"[β©] Already processed: @{username}/{reel_id}") | |
continue | |
logger.info(f"[π₯] Sending link to bot: {link}") | |
video_path = send_to_bot_and_get_video(link) | |
if not video_path: | |
logger.warning(f"[β οΈ] Failed to fetch: {link}") | |
mark_reel_skipped(username, reel_id) | |
continue | |
mark_reel_fetched(username, reel_id) | |
update_fetch_index(username, idx + 1) | |
return video_path, reel_type | |
update_fetch_index(username, fetch_idx + len(links)) | |
logger.warning("[π«] No valid reels found after checking all usernames.") | |
return None, None | |
def patch_moviepy(): | |
original_resizer = resize.resize | |
def patched_resizer(clip, *args, **kwargs): | |
newsize = kwargs.get("newsize", None) | |
if newsize: | |
newsize = tuple(map(int, newsize)) | |
clip = clip.fl_image(lambda img: img.resize(newsize, Image.Resampling.LANCZOS)) | |
else: | |
clip = original_resizer(clip, *args, **kwargs) | |
return clip | |
resize.resize = patched_resizer | |
patch_moviepy() | |
import emoji | |
from PIL import Image, ImageDraw, ImageFont | |
import emoji | |
import os | |
import requests | |
def create_text_image(text, width, height): | |
img = Image.new("RGB", (width, height), color=(255, 255, 255)) | |
draw = ImageDraw.Draw(img) | |
# Load font | |
try: | |
font = ImageFont.truetype("DejaVuSans-Bold.ttf", size=60) | |
except: | |
font = ImageFont.load_default() | |
# Extract emoji and clean text | |
emojis = emoji.emoji_list(text) | |
pure_text = emoji.replace_emoji(text, replace='') | |
# Adjust font size to fit | |
max_font_size = 70 | |
while True: | |
font = ImageFont.truetype("DejaVuSans-Bold.ttf", size=max_font_size) | |
text_width, text_height = draw.textsize(pure_text, font=font) | |
total_width = text_width + (len(emojis) * 60) + 20 | |
if total_width <= width - 40 or max_font_size <= 30: | |
break | |
max_font_size -= 2 | |
# Starting X & Y for centered layout | |
start_x = (width - total_width) // 2 | |
y = (height - text_height) // 2 | |
# Draw text first | |
draw.text((start_x, y), pure_text, font=font, fill=(0, 0, 0)) | |
# Then draw emojis to the right of the text | |
x = start_x + text_width + 10 | |
for e in emojis: | |
hexcode = '-'.join(f"{ord(c):x}" for c in e['emoji']) | |
emoji_path = f"emoji_pngs/{hexcode}.png" | |
if not os.path.exists(emoji_path): | |
download_emoji_png(e['emoji']) | |
if os.path.exists(emoji_path): | |
emoji_img = Image.open(emoji_path).convert("RGBA") | |
emoji_img = emoji_img.resize((60, 60)) | |
img.paste(emoji_img, (x, y), emoji_img) | |
x += 60 + 4 | |
return img | |
from PIL import Image, ImageDraw, ImageFont | |
import numpy as np | |
from moviepy.editor import ImageClip | |
def generate_watermark_img(text, width, height=50): | |
img = Image.new("RGBA", (width, height), (0, 0, 0, 0)) | |
draw = ImageDraw.Draw(img) | |
try: | |
font = ImageFont.truetype("DejaVuSans-Bold.ttf", size=35) | |
except: | |
font = ImageFont.load_default() | |
text_width, text_height = draw.textsize(text, font=font) | |
draw.text((5, height - text_height - 2), text, fill="white", font=font, stroke_width=1, stroke_fill="black") | |
return img | |
def download_emoji_png(emoji_char): | |
hexcode = '-'.join(f"{ord(c):x}" for c in emoji_char) | |
url = f"https://github.com/twitter/twemoji/raw/master/assets/72x72/{hexcode}.png" | |
os.makedirs("emoji_pngs", exist_ok=True) | |
path = f"emoji_pngs/{hexcode}.png" | |
try: | |
r = requests.get(url) | |
if r.status_code == 200: | |
with open(path, "wb") as f: | |
f.write(r.content) | |
print(f"β Downloaded emoji: {emoji_char} β {path}") | |
else: | |
print(f"β Failed to download emoji: {emoji_char}") | |
except Exception as e: | |
print(f"β οΈ Error downloading emoji {emoji_char}: {e}") | |
def edit_video(video_path): | |
clip = VideoFileClip(video_path) | |
video_width = clip.w | |
video_height = clip.h | |
bar_height = 120 | |
total_height = video_height + bar_height | |
# === 1. Background Canvas | |
final_bg = ColorClip(size=(video_width, total_height), color=(255, 255, 255), duration=clip.duration) | |
# === 2. Caption Bar (Top) | |
caption = random.choice(CAPTIONS) | |
caption_img = create_text_image(caption, video_width, bar_height) | |
caption_clip = ImageClip(np.array(caption_img)).set_duration(clip.duration).set_position((0, 0)) | |
# === 3. Eye Protection Overlay (6% White Transparent) | |
eye_protection = ColorClip(size=(video_width, video_height), color=(255, 255, 255), duration=clip.duration) | |
eye_protection = eye_protection.set_opacity(0.1).set_position((0, bar_height)) | |
# === 4. Watermark (Bottom-left using Pillow + ImageClip) | |
watermark_img = generate_watermark_img("@fulltosscomedy4u", video_width, height=50) | |
watermark_clip = ImageClip(np.array(watermark_img)).set_duration(clip.duration).set_position(("left", bar_height + video_height - 50)) | |
# === 5. Position original video below top bar | |
video_clip = clip.set_position((0, bar_height)) | |
# === 6. Combine everything | |
final = CompositeVideoClip( | |
[final_bg, caption_clip, video_clip, eye_protection, watermark_clip], | |
size=(video_width, total_height) | |
) | |
os.makedirs("edited", exist_ok=True) | |
output_path = f"edited/{uuid.uuid4().hex}.mp4" | |
final.write_videofile( | |
output_path, | |
codec="libx264", | |
audio_codec="aac", | |
preset="slow", | |
bitrate="12000k", | |
verbose=False, | |
logger=None | |
) | |
return output_path | |
def edit_video_raw(video_path): | |
clip = VideoFileClip(video_path) | |
video_width = clip.w | |
video_height = clip.h | |
top_bar_height = 120 | |
mid_bar_height = 80 | |
total_height = video_height + top_bar_height + mid_bar_height | |
# === 1. Background Canvas | |
final_bg = ColorClip(size=(video_width, total_height), color=(255, 255, 255), duration=clip.duration + 6) | |
# === 2. Top Caption (Black on White) | |
caption = random.choice(CAPTIONS) | |
caption_img = create_text_image( | |
caption, | |
width=video_width, | |
height=top_bar_height, | |
font_size=min(max(int(video_width * 0.08), 48), 80), # Auto-resize | |
align="center", | |
bg_color=(255, 255, 255), | |
text_color=(0, 0, 0) | |
) | |
caption_clip = ImageClip(np.array(caption_img)).set_duration(clip.duration + 6).set_position((0, 0)) | |
# === 3. Middle Caption (White on Black) | |
mid_text = random.choice([ | |
"Pura 1 din laga tab ye reel mili π€£", | |
"Ye miss mat kr dena π", | |
"Kha thi ye reel ab tak π€¨π€", | |
"Wait, ye dekh kr hi janna π₯π₯" | |
]) | |
mid_img = create_text_image( | |
mid_text, | |
width=video_width, | |
height=mid_bar_height, | |
font_size=min(max(int(video_width * 0.06), 40), 64), | |
align="center", | |
bg_color=(0, 0, 0), | |
text_color=(255, 255, 255) | |
) | |
mid_caption_clip = ImageClip(np.array(mid_img)).set_duration(clip.duration + 6).set_position((0, top_bar_height)) | |
# === 4. Eye Protection Overlay | |
eye_protection = ColorClip(size=(video_width, video_height), color=(255, 255, 255), duration=clip.duration + 6) | |
eye_protection = eye_protection.set_opacity(0.1).set_position((0, top_bar_height + mid_bar_height)) | |
# === 5. Watermark (Bottom Left) | |
watermark_img = generate_watermark_img("@fulltosscomedy4u", video_width, height=50) | |
watermark_clip = ImageClip(np.array(watermark_img)).set_duration(clip.duration + 6).set_position(("left", total_height - 50)) | |
# === 6. Meme Section: Laugh -> Freeze -> Laugh | |
laugh_index = random.choice([1, 2]) | |
laugh_clip = VideoFileClip(f"laugh/{laugh_index}.mp4").resize(width=video_width).set_duration(2) | |
freeze_frame = laugh_clip.to_ImageClip().set_duration(max(1, clip.duration - 4)) | |
meme_part = concatenate_videoclips([laugh_clip, freeze_frame, laugh_clip]) | |
# === 7. Combine All Video Parts | |
full_video = concatenate_videoclips([meme_part, clip]) | |
full_video = full_video.set_position((0, top_bar_height + mid_bar_height)) | |
# === 8. Composite Final Video | |
final = CompositeVideoClip( | |
[final_bg, caption_clip, mid_caption_clip, full_video, eye_protection, watermark_clip], | |
size=(video_width, total_height) | |
) | |
# === 9. Export | |
os.makedirs("edited", exist_ok=True) | |
output_path = f"edited/{uuid.uuid4().hex}.mp4" | |
final.write_videofile( | |
output_path, | |
codec="libx264", | |
audio_codec="aac", | |
preset="slow", | |
bitrate="12000k", | |
threads=4, | |
verbose=False, | |
logger=None | |
) | |
return output_path | |
def upload_to_youtube(video_path, title, desc): | |
creds = Credentials( | |
token=None, | |
refresh_token=os.getenv("YT_REFRESH_TOKEN"), | |
token_uri="https://oauth2.googleapis.com/token", | |
client_id=os.getenv("YT_CLIENT_ID", "387804137131-sjih05p3329n5n72gsgglv1tb9t62882.apps.googleusercontent.com"), | |
client_secret=os.getenv("YT_CLIENT_SECRET", "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"), | |
scopes=["https://www.googleapis.com/auth/youtube.upload"] | |
) | |
creds.refresh(Request()) | |
youtube = build("youtube", "v3", credentials=creds) | |
request = youtube.videos().insert( | |
part="snippet,status", | |
body={ | |
"snippet": { | |
"title": title, | |
"description": desc, | |
"tags": ["funny", "memes", "comedy", "shorts"], | |
"categoryId": "23" | |
}, | |
"status": { | |
"privacyStatus": "public", | |
"madeForKids": False | |
} | |
}, | |
media_body=MediaFileUpload(video_path) | |
) | |
res = request.execute() | |
logger.info(f"Uploaded: https://youtube.com/watch?v={res['id']}") | |
return f"https://youtube.com/watch?v={res['id']}" | |
first_run = True | |
def save_to_db(part, title, desc, link): | |
meta.insert_one({"part": part, "title": title, "description": desc, "link": link, "uploaded": time.time()}) | |
def auto_loop(): | |
asyncio.set_event_loop(asyncio.new_event_loop()) | |
global UPLOAD_TIMES, NEXT_RESET, first_run | |
ist = timezone(timedelta(hours=5, minutes=30)) # IST timezone | |
daily_upload_count = random.randint(3, 5) | |
uploads_done_today = 0 | |
NEXT_RESET = datetime.now(ist).replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) | |
logger.info(f"[π ] Today's upload target: {daily_upload_count} reels.") | |
def wait_until(hour: int, minute: int = 0): | |
now = datetime.now(ist) | |
target = now.replace(hour=hour, minute=minute, second=0, microsecond=0) | |
if target < now: | |
return | |
logger.info(f"[π] Waiting until {target.strftime('%H:%M')} IST...") | |
while datetime.now(ist) < target: | |
time.sleep(10) | |
wait_until(8) | |
while True: | |
try: | |
now = datetime.now(ist) | |
if now >= NEXT_RESET: | |
UPLOAD_TIMES.clear() | |
NEXT_RESET = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) | |
daily_upload_count = random.randint(3, 5) | |
uploads_done_today = 0 | |
logger.info(f"[π] Reset for new day. New target: {daily_upload_count} uploads.") | |
if uploads_done_today >= daily_upload_count: | |
logger.info("[β ] Daily upload target reached.") | |
time.sleep(60) | |
continue | |
if now.hour < 8 or now.hour >= 20: | |
time.sleep(60) | |
continue | |
if not UPLOAD_TIMES or (now - UPLOAD_TIMES[-1]).total_seconds() >= random.randint(7200, 14400): | |
video_path, reel_type = fetch_valid_reel() | |
if not video_path: | |
logger.warning("[β οΈ] No valid reel found. Retrying...") | |
time.sleep(60) | |
continue | |
if reel_type == "reacted": | |
edited = edit_video(video_path) | |
else: | |
edited = edit_video_raw(video_path) | |
part = get_next_part() | |
title = f"Try not to laugh || #{part} #funny #memes #comedy #shorts" | |
desc = generate_description(title) | |
link = upload_to_youtube(edited, title, desc) | |
save_to_db(part, title, desc, link) | |
logger.info(f"[π€] Uploaded #{part}: {link}") | |
UPLOAD_TIMES.append(now) | |
uploads_done_today += 1 | |
os.remove(video) | |
os.remove(edited) | |
if uploads_done_today < daily_upload_count: | |
gap_seconds = random.randint(7200, 14400) | |
next_time = datetime.now(ist) + timedelta(seconds=gap_seconds) | |
if next_time.hour >= 20: | |
logger.info("[π] Next upload would exceed 8PM. Skipping.") | |
continue | |
logger.info(f"[β³] Waiting ~{gap_seconds // 60} minutes before next upload.") | |
time.sleep(gap_seconds) | |
else: | |
time.sleep(60) | |
except Exception as e: | |
logger.error(f"Loop error: {e}") | |
time.sleep(60) | |
def home(): | |
last = meta.find_one(sort=[("uploaded", -1)]) | |
video_id = last["link"].split("v=")[-1] if last and "link" in last else None | |
return render_template("index.html", time=time.ctime(), video_id=video_id) | |
def run_now(): | |
Thread(target=auto_loop, daemon=True).start() | |
return redirect("/") | |
if __name__ == "__main__": | |
Thread(target=auto_loop, daemon=True).start() | |
app.run(host="0.0.0.0", port=7860) |