| |
| import os |
| import time |
| import uuid |
| import psutil |
| import asyncio |
| import platform |
| from datetime import datetime |
| from typing import List, Optional |
| from contextlib import asynccontextmanager |
| from cachetools import TTLCache |
| from pydantic import BaseModel |
| from fastapi.responses import FileResponse, Response, StreamingResponse |
| from fastapi import FastAPI, Query, HTTPException, Body, BackgroundTasks, Header |
|
|
| from app.xs_dl import XoSoApp |
| from app.cl_lq import ClLqApp |
| from app.ht_rd import HentaiRandomApp |
| from app.ht_noti import HentaiApp |
| from app.ios_app import IOSAppFetcher |
| from app.tik_dl import TikTokDownloader |
| from app.byps_uv import BypassUV |
| from app.gs_code import GSCodeApp |
| from app.gm_crate import GmailLogic |
| from app.gs_daily import GSDailyApp |
| from app.invite_logic import DiscordInviteLogic |
| from app.tik_dldrct import TikTokDirect |
|
|
| xs_app = XoSoApp() |
| lq_app = ClLqApp() |
| gm_app = GmailLogic() |
| gs_app = GSDailyApp() |
| bypass_app = BypassUV() |
| tik_app = TikTokDownloader() |
| tik_direct = TikTokDirect() |
| hentai_app = HentaiApp() |
| ios_app = IOSAppFetcher() |
| gs_code_app = GSCodeApp() |
| invite_app = DiscordInviteLogic() |
| ht_random_app = HentaiRandomApp() |
|
|
| API_KEY = os.getenv("API_KEY") |
| START_TIME = time.time() |
| order_cache = TTLCache(maxsize=100, ttl=86400) |
|
|
| class ProductItem(BaseModel): |
| name: str |
| price: str |
|
|
| class DailyRequest(BaseModel): |
| cookie: str |
| discord_id: str |
| server: str |
|
|
| class RedeemRequest(BaseModel): |
| cookie: str |
| server: str |
| code: str |
|
|
| class CompleteRequest(BaseModel): |
| order_id: str |
| tk: str |
| mk: str |
|
|
| class GmailRequest(BaseModel): |
| to_email: str |
| subject: str |
| customer_name: str |
| status: str |
| total_price: str |
| products: List[ProductItem] |
| order_id: Optional[str] = None |
| from_name: Optional[str] = "Celeste Store" |
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| await asyncio.gather( |
| invite_app.start(), lq_app.start(), gs_app.start(), |
| gs_code_app.start(), hentai_app.start(), ht_random_app.start(), |
| bypass_app.start(), ios_app.start(), xs_app.start(), tik_direct.start() |
| ) |
| yield |
| await asyncio.gather( |
| invite_app.stop(), lq_app.stop(), gs_app.stop(), |
| gs_code_app.stop(), hentai_app.stop(), ht_random_app.stop(), |
| bypass_app.stop(), ios_app.stop(), xs_app.stop(), tik_direct.stop() |
| ) |
|
|
| app = FastAPI(lifespan=lifespan) |
|
|
| @app.get("/") |
| def home(): |
| memory = psutil.virtual_memory() |
| cpu_percent = psutil.cpu_percent(interval=0.5) |
| uptime_seconds = int(time.time() - START_TIME) |
| uptime_string = str(datetime.utcfromtimestamp(uptime_seconds).strftime("%H:%M:%S")) |
| |
| return { |
| "status": "online", |
| "process_id": os.getpid(), |
| "hostname": platform.node(), |
| "uptime_hms": uptime_string, |
| "platform": platform.system(), |
| "uptime_seconds": uptime_seconds, |
| "ram_usage_percent": memory.percent, |
| "cpu_usage_percent": cpu_percent, |
| "platform_release": platform.release(), |
| "python_version": platform.python_version(), |
| "used_ram_mb": round(memory.used / 1024 / 1024, 2), |
| "total_ram_mb": round(memory.total / 1024 / 1024, 2) |
| } |
|
|
| @app.get("/api/v1/invites/discord") |
| async def get_new_invite(): |
| res = await invite_app.get_invite() |
| if "error" in res: |
| raise HTTPException(status_code=500, detail="500: Yo mama called, said stop hacking or she'll whoop ur ass. Listen to her for once.") |
| return res |
|
|
| @app.get("/api/v1/get/lqAov") |
| async def get_free_account(): |
| res = await lq_app.fetch_acc() |
| if not res["ok"]: |
| raise HTTPException(status_code=500, detail="500: Yo mama called, said stop hacking or she'll whoop ur ass. Listen to her for once.") |
| return res |
|
|
| @app.post("/api/v1/genshin/daily") |
| async def genshin_daily(data: DailyRequest, x_api_key: str = Header(None)): |
| if not API_KEY: |
| raise HTTPException(status_code=500, detail="500: Yo mama called, said stop hacking or she'll whoop ur ass. Listen to her for once.") |
| if x_api_key != API_KEY: |
| raise HTTPException(status_code=403, detail="API key invalid. Go touch grass") |
| |
| res = await gs_app.run_daily_and_capture(data.cookie, data.discord_id, data.server) |
| if not res["ok"]: |
| raise HTTPException(status_code=400, detail=res["error"]) |
| return res |
|
|
| @app.post("/api/v1/genshin/redeem") |
| async def genshin_redeem(data: RedeemRequest, x_api_key: str = Header(None)): |
| if x_api_key != API_KEY: |
| raise HTTPException(status_code=403, detail="API key invalid. Go touch grass") |
| |
| res = await gs_code_app.redeem_code(data.cookie, data.server, data.code) |
| if not res["ok"]: |
| raise HTTPException(status_code=400, detail=res) |
| return res |
|
|
| @app.get("/api/v1/hentai/newest") |
| async def hentai_newest(apikey: Optional[str] = Query(None), x_api_key: Optional[str] = Header(None, alias="X-API-Key")): |
| key_to_check = x_api_key or apikey |
| return await hentai_app.get_new(key_to_check, API_KEY) |
|
|
| @app.get("/api/v1/hentai/info") |
| async def hentai_info(slug: str = Query(...), apikey: Optional[str] = Query(None), x_api_key: Optional[str] = Header(None, alias="X-API-Key")): |
| key = x_api_key or apikey |
| if not API_KEY or key != API_KEY: |
| raise HTTPException(status_code=403, detail="API key invalid. Go touch grass") |
| return await hentai_app.fetch_info(slug) |
|
|
| @app.get("/api/v1/hentai/random") |
| async def hentai_random(apikey: Optional[str] = Query(None), limit: str = Query("1"), x_api_key: Optional[str] = Header(None, alias="X-API-Key")): |
| key = x_api_key or apikey |
| return await ht_random_app.get_random(key, API_KEY, limit) |
|
|
| @app.get("/api/v1/tiktok/download") |
| async def tiktok_download(url: str = Query(...), hd: int = Query(1), apikey: Optional[str] = Query(None), x_api_key: Optional[str] = Header(None, alias="X-API-Key")): |
| key = x_api_key or apikey |
| if not API_KEY or key != API_KEY: |
| raise HTTPException(status_code=403, detail="API key invalid. Go touch grass") |
| |
| res = await tik_app.fetch_video(url, hd) |
| if not res["ok"]: |
| raise HTTPException(status_code=400, detail=res["error"]) |
| return res |
|
|
| @app.get("/api/v1/tiktok/dl-direct") |
| async def tiktok_direct( |
| url: str = Query(None), |
| token: str = Query(None), |
| type: str = Query("mp4"), |
| apikey: Optional[str] = Query(None), |
| x_api_key: Optional[str] = Header(None, alias="X-API-Key") |
| ): |
| key = x_api_key or apikey |
| if not API_KEY or key != API_KEY: |
| raise HTTPException(status_code=403, detail="API key invalid") |
|
|
| if url: |
| res = await tik_direct.generate_token(url, type) |
| if not res["ok"]: |
| raise HTTPException(status_code=400, detail=res["error"]) |
| return res |
|
|
| if token: |
| data = tik_direct.get_token_data(token) |
| if not data: |
| raise HTTPException(status_code=404, detail="Token expired or invalid") |
|
|
| async def stream(): |
| async with httpx.AsyncClient() as client: |
| async with client.stream("GET", data["url"]) as r: |
| async for chunk in r.aiter_bytes(): |
| yield chunk |
|
|
| filename = f"tiktok_{int(time.time())}.{data['type']}" |
|
|
| return StreamingResponse( |
| stream(), |
| media_type="video/mp4" if data["type"] == "mp4" else "audio/mpeg", |
| headers={"Content-Disposition": f"attachment; filename={filename}"} |
| ) |
|
|
| raise HTTPException(status_code=400, detail="Missing url or token") |
|
|
| @app.get("/api/v1/bypass") |
| async def bypass_link(url: str = Query(...), apikey: Optional[str] = Query(None), x_api_key: Optional[str] = Header(None, alias="X-API-Key")): |
| key = x_api_key or apikey |
| if not API_KEY or key != API_KEY: |
| raise HTTPException(status_code=403, detail="API key invalid. Go touch grass") |
| |
| res = await bypass_app.bypass(url) |
| if not res["ok"]: |
| raise HTTPException(status_code=400, detail=res["error"]) |
| return res |
|
|
| @app.get("/api/v1/ios/app") |
| async def ios_app_fetch(n: str = Query(...), apikey: Optional[str] = Query(None), x_api_key: Optional[str] = Header(None, alias="X-API-Key")): |
| key = x_api_key or apikey |
| if not API_KEY or key != API_KEY: |
| raise HTTPException(status_code=403, detail="API key invalid. Go touch grass") |
| |
| res = await ios_app.get_app_accounts(n) |
| if not res["ok"]: |
| raise HTTPException(status_code=404, detail=res["error"]) |
| return res |
|
|
| @app.post("/api/v1/gmail/create") |
| async def create_gmail_notif(data: GmailRequest, x_api_key: str = Header(None)): |
| if x_api_key != API_KEY: |
| raise HTTPException(status_code=403, detail="API key invalid. Go touch grass") |
| |
| final_order_id = data.order_id or gm_app.generate_random_order_id() |
| order_cache[final_order_id] = { |
| "email": data.to_email, |
| "name": data.customer_name, |
| "product": data.products[0].name if data.products else "Sản phẩm", |
| "price": data.total_price |
| } |
| |
| res = await gm_app.send_order_email( |
| to_email=data.to_email, subject=data.subject, customer_name=data.customer_name, |
| order_id=final_order_id, status=data.status, products=[p.dict() for p in data.products], |
| total_price=data.total_price, from_name=data.from_name |
| ) |
| if not res["ok"]: |
| raise HTTPException(status_code=500, detail="500: Yo mama called, said stop hacking or she'll whoop ur ass. Listen to her for once.") |
| |
| res["order_id"] = final_order_id |
| return res |
|
|
| @app.post("/api/v1/gmail/done") |
| async def complete_order(data: CompleteRequest, x_api_key: str = Header(None)): |
| if x_api_key != API_KEY: |
| raise HTTPException(status_code=403, detail="API key invalid. Go touch grass") |
| |
| cached_order = order_cache.get(data.order_id) |
| if not cached_order: |
| raise HTTPException(status_code=404, detail="Order not found or expired") |
| |
| res = await gm_app.send_complete_email( |
| to_email=cached_order["email"], customer_name=cached_order["name"], |
| order_id=data.order_id, product_name=cached_order["product"], |
| price=cached_order["price"], tk=data.tk, mk=data.mk |
| ) |
| if not res["ok"]: |
| raise HTTPException(status_code=500, detail="500: Yo mama called, said stop hacking or she'll whoop ur ass. Listen to her for once.") |
| |
| del order_cache[data.order_id] |
| return {"ok": True, "message": f"Sent the account to {cached_order['email']}"} |
|
|
| @app.get("/api/v1/xoso/xsmn") |
| async def get_xsmn(type: str = Query("latest"), date: Optional[str] = Query(None), apikey: Optional[str] = Query(None), x_api_key: Optional[str] = Header(None, alias="X-API-Key")): |
| key = x_api_key or apikey |
| if not API_KEY or key != API_KEY: |
| raise HTTPException(status_code=403, detail="API key invalid. Go touch grass") |
| |
| res = await xs_app.fetch_mn(date if type == "date" else None) |
| if not res["ok"]: |
| raise HTTPException(status_code=400, detail=res["error"]) |
| return res |