understanding commited on
Commit
0eb342a
·
verified ·
1 Parent(s): 83d77bb

Create bot/handlers.py

Browse files
Files changed (1) hide show
  1. bot/handlers.py +699 -0
bot/handlers.py ADDED
@@ -0,0 +1,699 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import os
4
+ import json
5
+ import asyncio
6
+ from typing import Optional
7
+
8
+ from hydrogram import Client, filters
9
+ from hydrogram.types import (
10
+ Message,
11
+ InlineKeyboardMarkup,
12
+ InlineKeyboardButton,
13
+ CallbackQuery,
14
+ )
15
+
16
+ from app.state import STATE, UploadJob
17
+ from app.settings import (
18
+ ROTATE_AFTER_PER_PROFILE,
19
+ DEFAULT_PRIVACY,
20
+ DEFAULT_TITLE_MODE,
21
+ YOUTUBE_CHUNK_SIZE,
22
+ PROGRESS_EDIT_EVERY_SEC,
23
+ MAX_CONCURRENT_UPLOADS,
24
+ TMP_DIR,
25
+ )
26
+ from app.progress import human_bytes, fmt_eta
27
+ from app.youtube_api import youtube_resumable_upload, _clean_title
28
+ from app.cf_api import CFClient
29
+ from app.config import Config
30
+
31
+ def _mkdir_tmp():
32
+ os.makedirs(TMP_DIR, exist_ok=True)
33
+
34
+ def menu_kb(is_owner: bool):
35
+ rows = [
36
+ [InlineKeyboardButton("📦 Accounts", callback_data="menu:accounts"),
37
+ InlineKeyboardButton("➕ Add Account", callback_data="menu:addacc")],
38
+ [InlineKeyboardButton("⚙️ Auto Mode", callback_data="menu:automode")],
39
+ ]
40
+ if is_owner:
41
+ rows.append([InlineKeyboardButton("📊 Stats", callback_data="menu:stats")])
42
+ rows.append([InlineKeyboardButton("✅ Allow User", callback_data="menu:allow"),
43
+ InlineKeyboardButton("⛔ Disallow User", callback_data="menu:disallow")])
44
+ return InlineKeyboardMarkup(rows)
45
+
46
+ def upload_options_kb(u: UploadJob):
47
+ # Compact stable format
48
+ rows = [
49
+ [
50
+ InlineKeyboardButton(f"Privacy: {u.privacy}", callback_data=f"u:{u.upload_id}:privacy"),
51
+ InlineKeyboardButton("Change", callback_data=f"u:{u.upload_id}:privacy_menu"),
52
+ ],
53
+ [
54
+ InlineKeyboardButton(f"Title: {u.title_mode}", callback_data=f"u:{u.upload_id}:title"),
55
+ InlineKeyboardButton("Change", callback_data=f"u:{u.upload_id}:title_menu"),
56
+ ],
57
+ [
58
+ InlineKeyboardButton("▶️ Start Upload", callback_data=f"u:{u.upload_id}:start"),
59
+ InlineKeyboardButton("✖️ Cancel", callback_data=f"u:{u.upload_id}:cancel"),
60
+ ],
61
+ ]
62
+ return InlineKeyboardMarkup(rows)
63
+
64
+ def privacy_kb(upload_id: str):
65
+ return InlineKeyboardMarkup([
66
+ [InlineKeyboardButton("private", callback_data=f"u:{upload_id}:set_priv:private"),
67
+ InlineKeyboardButton("unlisted", callback_data=f"u:{upload_id}:set_priv:unlisted"),
68
+ InlineKeyboardButton("public", callback_data=f"u:{upload_id}:set_priv:public")],
69
+ [InlineKeyboardButton("⬅ Back", callback_data=f"u:{upload_id}:back")],
70
+ ])
71
+
72
+ def title_kb(upload_id: str):
73
+ return InlineKeyboardMarkup([
74
+ [InlineKeyboardButton("filename", callback_data=f"u:{upload_id}:set_title:filename"),
75
+ InlineKeyboardButton("caption", callback_data=f"u:{upload_id}:set_title:caption")],
76
+ [InlineKeyboardButton("custom", callback_data=f"u:{upload_id}:set_title:custom")],
77
+ [InlineKeyboardButton("⬅ Back", callback_data=f"u:{upload_id}:back")],
78
+ ])
79
+
80
+ def accounts_kb(profiles: list, tg_id: str, default_profile_id: Optional[str], is_owner: bool):
81
+ rows = []
82
+ for p in profiles:
83
+ pid = p["profile_id"]
84
+ label = p.get("label") or "profile"
85
+ ch = p.get("channel_title") or p.get("channel_id") or "no-channel"
86
+ ok = "✅" if p.get("has_refresh") else "⚠️"
87
+ d = "⭐" if pid == default_profile_id else ""
88
+ rows.append([InlineKeyboardButton(f"{ok}{d} {label} — {ch}", callback_data=f"acc:info:{pid}")])
89
+ if profiles:
90
+ rows.append([InlineKeyboardButton("⭐ Set Default", callback_data="acc:setdef_menu")])
91
+ rows.append([InlineKeyboardButton("🗑 Remove Profile", callback_data="acc:remove_menu")])
92
+ rows.append([InlineKeyboardButton("⬅ Back", callback_data="menu:home")])
93
+ if is_owner:
94
+ rows.append([InlineKeyboardButton("List Allowed (if available)", callback_data="acc:list_allowed")])
95
+ return InlineKeyboardMarkup(rows)
96
+
97
+ def pick_profile_kb(profiles: list, action: str):
98
+ # action: "setdef" or "remove"
99
+ rows = []
100
+ for p in profiles:
101
+ pid = p["profile_id"]
102
+ label = p.get("label") or "profile"
103
+ ch = p.get("channel_title") or p.get("channel_id") or "no-channel"
104
+ ok = "✅" if p.get("has_refresh") else "⚠️"
105
+ rows.append([InlineKeyboardButton(f"{ok} {label} — {ch}", callback_data=f"acc:{action}:{pid}")])
106
+ rows.append([InlineKeyboardButton("⬅ Back", callback_data="menu:accounts")])
107
+ return InlineKeyboardMarkup(rows)
108
+
109
+ def is_owner(cfg: Config, user_id: int) -> bool:
110
+ return user_id == cfg.OWNER_ID
111
+
112
+ def _safe_title_from(job: UploadJob) -> str:
113
+ if job.title_mode == "custom" and job.custom_title.strip():
114
+ return _clean_title(job.custom_title.strip())
115
+ if job.title_mode == "filename":
116
+ base = os.path.splitext(job.file_name)[0]
117
+ return _clean_title(base)
118
+ # caption default
119
+ if job.caption.strip():
120
+ # first line as title
121
+ first = job.caption.strip().splitlines()[0]
122
+ return _clean_title(first)
123
+ base = os.path.splitext(job.file_name)[0]
124
+ return _clean_title(base)
125
+
126
+ async def register_handlers(app: Client, cfg: Config, cf: CFClient):
127
+
128
+ # semaphore size from settings
129
+ STATE.sem = asyncio.Semaphore(MAX_CONCURRENT_UPLOADS)
130
+
131
+ @app.on_message(filters.command("start"))
132
+ async def start_cmd(_, m: Message):
133
+ u = m.from_user.id
134
+ kb = menu_kb(is_owner(cfg, u))
135
+ await m.reply_text(
136
+ "✅ Bot online.\n\nSend a video to upload.\nUse buttons for accounts/settings.",
137
+ reply_markup=kb
138
+ )
139
+
140
+ @app.on_callback_query()
141
+ async def cb(_, q: CallbackQuery):
142
+ data = (q.data or "")
143
+ u = q.from_user.id
144
+ tg_id = str(u)
145
+
146
+ if data.startswith("menu:"):
147
+ act = data.split(":", 1)[1]
148
+
149
+ if act == "home":
150
+ await q.message.edit_text("Main menu:", reply_markup=menu_kb(is_owner(cfg, u)))
151
+ await q.answer()
152
+ return
153
+
154
+ if act == "accounts":
155
+ res = await cf.list_profiles_w2(tg_id)
156
+ profiles = res.get("profiles", [])
157
+ dpid = res.get("default_profile_id")
158
+ await q.message.edit_text(
159
+ f"📦 Accounts for {tg_id}\nDefault: {dpid}\n\n✅ authorized | ⚠️ not authorized",
160
+ reply_markup=accounts_kb(profiles, tg_id, dpid, is_owner(cfg, u))
161
+ )
162
+ await q.answer()
163
+ return
164
+
165
+ if act == "addacc":
166
+ STATE.waiting_client_id[u] = True
167
+ await q.message.edit_text(
168
+ "➕ Add Account\n\nSend:\n1) Google OAuth JSON file (client_secret_*.json)\nOR\n2) client_id text (ends with apps.googleusercontent.com)",
169
+ reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅ Back", callback_data="menu:home")]])
170
+ )
171
+ await q.answer()
172
+ return
173
+
174
+ if act == "automode":
175
+ cur = STATE.auto_mode.get(u, False)
176
+ STATE.auto_mode[u] = not cur
177
+ v = "ON ✅" if STATE.auto_mode[u] else "OFF ❌"
178
+ await q.answer(f"Auto Mode: {v}", show_alert=True)
179
+ # no edit required
180
+ return
181
+
182
+ if act == "stats":
183
+ if not is_owner(cfg, u):
184
+ await q.answer("Owner only", show_alert=True)
185
+ return
186
+ res = await cf.stats_today()
187
+ if not res.get("ok"):
188
+ await q.answer("stats failed", show_alert=True)
189
+ return
190
+ text = (
191
+ f"📊 Stats (UTC day {res.get('day')})\n"
192
+ f"Uploads today total: {res.get('uploads_today_total')}\n"
193
+ f"Active users today: {res.get('active_users_today')}\n\n"
194
+ f"Errors last20:\n"
195
+ )
196
+ errs = res.get("errors_last20") or []
197
+ if not errs:
198
+ text += "— none —"
199
+ else:
200
+ for e in errs[:20]:
201
+ text += f"- {e.get('where')} | {e.get('tg_id')} | {e.get('err')}\n"
202
+ await q.message.edit_text(text, reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅ Back", callback_data="menu:home")]]))
203
+ await q.answer()
204
+ return
205
+
206
+ if act == "allow":
207
+ if not is_owner(cfg, u):
208
+ await q.answer("Owner only", show_alert=True)
209
+ return
210
+ await q.message.edit_text(
211
+ "✅ Allow User\n\nForward a message from that user.\nIf forward privacy hides id, then send numeric user_id.",
212
+ reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅ Back", callback_data="menu:home")]])
213
+ )
214
+ # mark mode using waiting_client_secret dict hack? better separate:
215
+ STATE.waiting_client_secret[u] = "__ALLOW__"
216
+ await q.answer()
217
+ return
218
+
219
+ if act == "disallow":
220
+ if not is_owner(cfg, u):
221
+ await q.answer("Owner only", show_alert=True)
222
+ return
223
+ await q.message.edit_text(
224
+ "⛔ Disallow User\n\nForward a message from that user OR send numeric user_id.",
225
+ reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅ Back", callback_data="menu:home")]])
226
+ )
227
+ STATE.waiting_client_secret[u] = "__DISALLOW__"
228
+ await q.answer()
229
+ return
230
+
231
+ # accounts submenus
232
+ if data == "acc:setdef_menu":
233
+ res = await cf.list_profiles_w2(tg_id)
234
+ profiles = res.get("profiles", [])
235
+ await q.message.edit_text("Pick profile to set default:", reply_markup=pick_profile_kb(profiles, "setdef"))
236
+ await q.answer()
237
+ return
238
+
239
+ if data == "acc:remove_menu":
240
+ res = await cf.list_profiles_w2(tg_id)
241
+ profiles = res.get("profiles", [])
242
+ await q.message.edit_text("Pick profile to remove:", reply_markup=pick_profile_kb(profiles, "remove"))
243
+ await q.answer()
244
+ return
245
+
246
+ if data.startswith("acc:setdef:"):
247
+ pid = data.split(":", 2)[2]
248
+ r = await cf.profile_set_default(tg_id, pid)
249
+ await q.answer("Default set ✅" if r.get("ok") else f"Failed: {r.get('err')}", show_alert=True)
250
+ # back to accounts
251
+ res = await cf.list_profiles_w2(tg_id)
252
+ profiles = res.get("profiles", [])
253
+ dpid = res.get("default_profile_id")
254
+ await q.message.edit_text("📦 Accounts", reply_markup=accounts_kb(profiles, tg_id, dpid, is_owner(cfg, u)))
255
+ return
256
+
257
+ if data.startswith("acc:remove:"):
258
+ pid = data.split(":", 2)[2]
259
+ r = await cf.profile_remove(tg_id, pid)
260
+ await q.answer("Removed ✅" if r.get("ok") else f"Failed: {r.get('err')}", show_alert=True)
261
+ res = await cf.list_profiles_w2(tg_id)
262
+ profiles = res.get("profiles", [])
263
+ dpid = res.get("default_profile_id")
264
+ await q.message.edit_text("📦 Accounts", reply_markup=accounts_kb(profiles, tg_id, dpid, is_owner(cfg, u)))
265
+ return
266
+
267
+ # upload callbacks
268
+ if data.startswith("u:"):
269
+ parts = data.split(":")
270
+ upload_id = parts[1]
271
+ act = parts[2] if len(parts) > 2 else ""
272
+ job = STATE.uploads.get(upload_id)
273
+ if not job:
274
+ await q.answer("This upload expired/restarted.", show_alert=True)
275
+ return
276
+ if job.user_id != u:
277
+ await q.answer("Not yours", show_alert=True)
278
+ return
279
+
280
+ if act == "privacy_menu":
281
+ await q.message.edit_text("Choose privacy:", reply_markup=privacy_kb(upload_id))
282
+ await q.answer()
283
+ return
284
+ if act == "title_menu":
285
+ await q.message.edit_text("Choose title mode:", reply_markup=title_kb(upload_id))
286
+ await q.answer()
287
+ return
288
+ if act == "set_priv":
289
+ val = parts[3]
290
+ job.privacy = val
291
+ await q.message.edit_text(
292
+ f"📤 Ready\nFile: {job.file_name}\nPrivacy: {job.privacy}\nTitle mode: {job.title_mode}",
293
+ reply_markup=upload_options_kb(job)
294
+ )
295
+ await q.answer("Saved ✅")
296
+ return
297
+ if act == "set_title":
298
+ val = parts[3]
299
+ job.title_mode = val
300
+ if val == "custom":
301
+ STATE.waiting_custom_title[u] = upload_id
302
+ await q.message.edit_text("Send custom title text now (next message).", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅ Back", callback_data=f"u:{upload_id}:back")]]))
303
+ await q.answer()
304
+ return
305
+ await q.message.edit_text(
306
+ f"📤 Ready\nFile: {job.file_name}\nPrivacy: {job.privacy}\nTitle mode: {job.title_mode}",
307
+ reply_markup=upload_options_kb(job)
308
+ )
309
+ await q.answer("Saved ✅")
310
+ return
311
+ if act == "back":
312
+ await q.message.edit_text(
313
+ f"📤 Ready\nFile: {job.file_name}\nPrivacy: {job.privacy}\nTitle mode: {job.title_mode}",
314
+ reply_markup=upload_options_kb(job)
315
+ )
316
+ await q.answer()
317
+ return
318
+ if act == "cancel":
319
+ STATE.uploads.pop(upload_id, None)
320
+ await q.message.edit_text("Cancelled ✅", reply_markup=menu_kb(is_owner(cfg, u)))
321
+ await q.answer()
322
+ return
323
+ if act == "start":
324
+ await q.answer("Starting upload…")
325
+ asyncio.create_task(run_upload(app, cfg, cf, job))
326
+ return
327
+
328
+ await q.answer()
329
+
330
+ @app.on_message(filters.text & filters.private)
331
+ async def text_in(_, m: Message):
332
+ u = m.from_user.id
333
+
334
+ # custom title input
335
+ if u in STATE.waiting_custom_title:
336
+ upload_id = STATE.waiting_custom_title.pop(u)
337
+ job = STATE.uploads.get(upload_id)
338
+ if job:
339
+ job.custom_title = m.text.strip()
340
+ await m.reply_text(
341
+ f"✅ Custom title saved.\nNow press Start Upload for file: {job.file_name}",
342
+ reply_markup=upload_options_kb(job)
343
+ )
344
+ return
345
+
346
+ # owner allow/disallow mode
347
+ if u in STATE.waiting_client_secret and STATE.waiting_client_secret[u] in ("__ALLOW__", "__DISALLOW__"):
348
+ mode = STATE.waiting_client_secret.pop(u)
349
+ target = (m.text or "").strip()
350
+
351
+ # if numeric in text
352
+ tg_id = None
353
+ if target.isdigit():
354
+ tg_id = target
355
+ else:
356
+ await m.reply_text("Send numeric user_id or forward a message from user (if forward shows id).")
357
+ return
358
+
359
+ if mode == "__ALLOW__":
360
+ r = await cf.allow_user(tg_id)
361
+ await m.reply_text("✅ Allowed" if r.get("ok") else f"❌ Failed: {r.get('err')}", reply_markup=menu_kb(True))
362
+ return
363
+ else:
364
+ r = await cf.disallow_user(tg_id)
365
+ await m.reply_text("⛔ Disallowed" if r.get("ok") else f"❌ Failed: {r.get('err')}", reply_markup=menu_kb(True))
366
+ return
367
+
368
+ # add account flow (client_id then client_secret)
369
+ if STATE.waiting_client_id.get(u):
370
+ t = (m.text or "").strip()
371
+ if "apps.googleusercontent.com" in t:
372
+ STATE.waiting_client_id.pop(u, None)
373
+ STATE.waiting_client_secret[u] = t
374
+ await m.reply_text("Now send CLIENT_SECRET (starts with GOCSPX-...)")
375
+ return
376
+ await m.reply_text("Send client_id (apps.googleusercontent.com) or upload Google JSON file.")
377
+ return
378
+
379
+ if u in STATE.waiting_client_secret and STATE.waiting_client_secret[u] and not STATE.waiting_client_secret[u].startswith("__"):
380
+ client_id = STATE.waiting_client_secret[u]
381
+ client_secret = (m.text or "").strip()
382
+ if not client_secret:
383
+ await m.reply_text("Empty secret. Send again.")
384
+ return
385
+ STATE.waiting_client_secret.pop(u, None)
386
+
387
+ tg_id = str(u)
388
+ # create profile + login link
389
+ r = await cf.profile_add(tg_id, client_id, client_secret, label="main", ttl_sec=600)
390
+ if not r.get("ok"):
391
+ await m.reply_text(f"❌ Failed to add profile: {r.get('err')}")
392
+ return
393
+ login_url = r.get("login_url")
394
+ pid = r.get("profile_id")
395
+ await m.reply_text(
396
+ f"✅ Profile created: {pid}\n\nOpen login link to authorize:\n(If Telegram preview consumes ticket, just add again for now.)",
397
+ reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🔐 Login with Google", url=login_url)]])
398
+ )
399
+ return
400
+
401
+ @app.on_message(filters.document & filters.private)
402
+ async def doc_in(_, m: Message):
403
+ u = m.from_user.id
404
+ if not STATE.waiting_client_id.get(u):
405
+ return
406
+
407
+ doc = m.document
408
+ name = (doc.file_name or "").lower()
409
+ if not name.endswith(".json"):
410
+ await m.reply_text("Send Google OAuth JSON file (client_secret_*.json).")
411
+ return
412
+
413
+ _mkdir_tmp()
414
+ path = os.path.join(TMP_DIR, f"cred_{u}.json")
415
+ await m.download(file_name=path)
416
+
417
+ try:
418
+ with open(path, "r", encoding="utf-8") as f:
419
+ j = json.load(f)
420
+ block = j.get("installed") or j.get("web") or {}
421
+ client_id = block.get("client_id", "")
422
+ client_secret = block.get("client_secret", "")
423
+ if not client_id or not client_secret:
424
+ raise ValueError("missing client_id/client_secret in json")
425
+ except Exception as e:
426
+ await m.reply_text(f"❌ Bad JSON: {e}")
427
+ return
428
+ finally:
429
+ try:
430
+ os.remove(path)
431
+ except Exception:
432
+ pass
433
+
434
+ STATE.waiting_client_id.pop(u, None)
435
+ tg_id = str(u)
436
+ r = await cf.profile_add(tg_id, client_id, client_secret, label="main", ttl_sec=600)
437
+ if not r.get("ok"):
438
+ await m.reply_text(f"❌ Failed to add profile: {r.get('err')}")
439
+ return
440
+ await m.reply_text(
441
+ f"✅ Profile created: {r.get('profile_id')}\nAuthorize now:",
442
+ reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🔐 Login with Google", url=r.get("login_url"))]])
443
+ )
444
+
445
+ @app.on_message((filters.video | filters.document) & filters.private)
446
+ async def video_in(_, m: Message):
447
+ u = m.from_user.id
448
+ tg_id = str(u)
449
+
450
+ # owner always allowed
451
+ if not is_owner(cfg, u):
452
+ ok = await cf.is_allowed(tg_id)
453
+ if not ok:
454
+ await m.reply_text("⛔ You are not allowed. Ask owner to allow you.")
455
+ return
456
+
457
+ # Determine file
458
+ file_id = None
459
+ file_name = "video.mp4"
460
+ file_type = "video"
461
+ caption = (m.caption or "")
462
+
463
+ if m.video:
464
+ file_id = m.video.file_id
465
+ file_name = m.video.file_name or "video.mp4"
466
+ file_type = "video"
467
+ elif m.document and (m.document.mime_type or "").startswith("video/"):
468
+ file_id = m.document.file_id
469
+ file_name = m.document.file_name or "video.mp4"
470
+ file_type = "document"
471
+ else:
472
+ return
473
+
474
+ # Create upload job
475
+ upload_id = STATE.new_upload_id()
476
+ job = UploadJob(
477
+ upload_id=upload_id,
478
+ user_id=u,
479
+ chat_id=m.chat.id,
480
+ src_msg_id=m.id,
481
+ file_type=file_type,
482
+ tg_file_id=file_id,
483
+ file_name=file_name,
484
+ caption=caption,
485
+ privacy=DEFAULT_PRIVACY,
486
+ title_mode=DEFAULT_TITLE_MODE,
487
+ )
488
+ STATE.uploads[upload_id] = job
489
+
490
+ # Auto mode: start immediately with defaults (bulk-friendly)
491
+ auto = STATE.auto_mode.get(u, False)
492
+ if auto:
493
+ status = await m.reply_text(
494
+ f"📤 Auto Mode ON\nUploading with defaults…\nFile: {job.file_name}\nPrivacy: {job.privacy}\nTitle mode: {job.title_mode}",
495
+ )
496
+ job.status_msg_id = status.id
497
+ asyncio.create_task(run_upload(app, cfg, cf, job))
498
+ return
499
+
500
+ # Manual mode: show options
501
+ msg = await m.reply_text(
502
+ f"📤 Ready\nFile: {job.file_name}\nPrivacy: {job.privacy}\nTitle mode: {job.title_mode}",
503
+ reply_markup=upload_options_kb(job)
504
+ )
505
+ job.status_msg_id = msg.id
506
+
507
+ async def run_upload(app: Client, cfg: Config, cf: CFClient, job: UploadJob):
508
+ """
509
+ Upload pipeline:
510
+ - check default profile/channel
511
+ - pick rotated profile for same channel
512
+ - get access token
513
+ - download file
514
+ - resumable upload with progress (speed/ETA)
515
+ - record upload + cleanup
516
+ """
517
+ tg_id = str(job.user_id)
518
+
519
+ async with STATE.sem:
520
+ _mkdir_tmp()
521
+ local_path = os.path.join(TMP_DIR, f"{job.upload_id}_{job.file_name}")
522
+ status_msg = None
523
+ try:
524
+ status_msg = await app.get_messages(job.chat_id, job.status_msg_id) if job.status_msg_id else None
525
+ except Exception:
526
+ status_msg = None
527
+
528
+ def _edit(text: str):
529
+ async def _do():
530
+ try:
531
+ if status_msg:
532
+ await status_msg.edit_text(text)
533
+ except Exception:
534
+ pass
535
+ return _do()
536
+
537
+ try:
538
+ # 1) list profiles to find default + channel
539
+ profs = await cf.list_profiles_w2(tg_id)
540
+ if not profs.get("ok"):
541
+ await _edit(f"❌ Worker2 list_profiles failed: {profs.get('err')}")
542
+ return
543
+
544
+ default_pid = profs.get("default_profile_id")
545
+ profiles = profs.get("profiles") or []
546
+ default = next((p for p in profiles if p.get("profile_id") == default_pid), None)
547
+
548
+ if not default_pid or not default:
549
+ await _edit("⚠️ No default profile.\nGo: Accounts → set default.")
550
+ return
551
+ if not default.get("has_refresh"):
552
+ await _edit("⚠️ Default profile not authorized.\nGo: Accounts → choose an authorized profile and set default.")
553
+ return
554
+ channel_id = default.get("channel_id")
555
+ if not channel_id:
556
+ await _edit("⚠️ Default profile has no channel_id.\nAuthorize again and then set default.")
557
+ return
558
+
559
+ # 2) pick rotated profile for SAME channel
560
+ pick = await cf.pick_profile(tg_id, channel_id, ROTATE_AFTER_PER_PROFILE)
561
+ if not pick.get("ok"):
562
+ await _edit(f"❌ pick_profile failed: {pick.get('err')}")
563
+ return
564
+ profile_id = pick.get("profile_id")
565
+
566
+ # 3) access token
567
+ tok = await cf.access_token(tg_id, profile_id)
568
+ if not tok.get("ok"):
569
+ await cf.log_error(tg_id, profile_id, "access_token", tok.get("err", "token_failed"))
570
+ await _edit(f"❌ access_token failed: {tok.get('err')}")
571
+ return
572
+ access_token = tok.get("access_token")
573
+
574
+ # 4) download file with download progress
575
+ await _edit("⬇️ Downloading from Telegram…")
576
+
577
+ # hydrogram download (no progress hook reliably in all forks),
578
+ # so we just download and then show upload progress.
579
+ msg = await app.get_messages(job.chat_id, job.src_msg_id)
580
+ await msg.download(file_name=local_path)
581
+
582
+ size = os.path.getsize(local_path)
583
+
584
+ # 5) upload to YouTube with progress edits
585
+ title = _safe_title_from(job)
586
+ desc = job.caption.strip()
587
+
588
+ last_update = 0.0
589
+
590
+ async def progress_upload(done: int, total: int, speed: float, eta: float):
591
+ nonlocal last_update
592
+ import time
593
+ now = time.time()
594
+ if now - last_update < PROGRESS_EDIT_EVERY_SEC:
595
+ return
596
+ last_update = now
597
+ pct = (done / max(1, total)) * 100.0
598
+ text = (
599
+ f"⬆️ Uploading to YouTube…\n"
600
+ f"File: {job.file_name}\n"
601
+ f"Title: {title}\n"
602
+ f"Privacy: {job.privacy}\n\n"
603
+ f"{pct:.1f}% ({human_bytes(done)}/{human_bytes(total)})\n"
604
+ f"Speed: {human_bytes(speed)}/s\n"
605
+ f"ETA: {fmt_eta(eta)}"
606
+ )
607
+ await _edit(text)
608
+
609
+ await _edit(
610
+ f"⬆️ Starting YouTube upload…\n"
611
+ f"File: {job.file_name} ({human_bytes(size)})\n"
612
+ f"Title: {title}\n"
613
+ f"Privacy: {job.privacy}"
614
+ )
615
+
616
+ # First attempt
617
+ http = None
618
+ try:
619
+ import httpx
620
+ http = httpx.AsyncClient(timeout=None)
621
+ res = await youtube_resumable_upload(
622
+ access_token=access_token,
623
+ file_path=local_path,
624
+ title=title,
625
+ description=desc,
626
+ privacy=job.privacy,
627
+ chunk_size=YOUTUBE_CHUNK_SIZE,
628
+ progress_cb=lambda d, t, s, e: asyncio.create_task(progress_upload(d, t, s, e)),
629
+ http=http,
630
+ )
631
+ finally:
632
+ if http:
633
+ await http.aclose()
634
+
635
+ # Retry once if token expired
636
+ if not res.get("ok") and "401" in str(res.get("err", "")):
637
+ tok2 = await cf.access_token(tg_id, profile_id)
638
+ if tok2.get("ok"):
639
+ access_token = tok2.get("access_token")
640
+ http = None
641
+ try:
642
+ import httpx
643
+ http = httpx.AsyncClient(timeout=None)
644
+ res = await youtube_resumable_upload(
645
+ access_token=access_token,
646
+ file_path=local_path,
647
+ title=title,
648
+ description=desc,
649
+ privacy=job.privacy,
650
+ chunk_size=YOUTUBE_CHUNK_SIZE,
651
+ progress_cb=lambda d, t, s, e: asyncio.create_task(progress_upload(d, t, s, e)),
652
+ http=http,
653
+ )
654
+ finally:
655
+ if http:
656
+ await http.aclose()
657
+
658
+ if not res.get("ok"):
659
+ err = res.get("err", "upload_failed")
660
+ await cf.log_error(tg_id, profile_id, "youtube_upload", err)
661
+ await _edit(f"❌ Upload failed: {err}")
662
+ return
663
+
664
+ video_id = res.get("video_id")
665
+ link = f"https://youtu.be/{video_id}" if video_id else "no_link"
666
+
667
+ # 6) record upload
668
+ await cf.record_upload(tg_id, profile_id)
669
+
670
+ # 7) final message (stable format)
671
+ final_text = (
672
+ f"✅ Uploaded\n"
673
+ f"Title: {title}\n"
674
+ f"Privacy: {job.privacy}\n"
675
+ f"Channel: {tok.get('channel_title') or tok.get('channel_id')}\n"
676
+ f"Profile: {profile_id}\n"
677
+ f"Link: {link}"
678
+ )
679
+ await _edit(final_text)
680
+
681
+ except Exception as e:
682
+ try:
683
+ await cf.log_error(tg_id, "", "run_upload_exception", str(e)[:200])
684
+ except Exception:
685
+ pass
686
+ if status_msg:
687
+ try:
688
+ await status_msg.edit_text(f"❌ Crash: {e}")
689
+ except Exception:
690
+ pass
691
+ finally:
692
+ # cleanup
693
+ try:
694
+ if os.path.exists(local_path):
695
+ os.remove(local_path)
696
+ except Exception:
697
+ pass
698
+ # drop job
699
+ STATE.uploads.pop(job.upload_id, None)