ryuzaki-api / tools.py
randydev's picture
Update tools.py
4dba342 verified
raw
history blame
5.84 kB
import io
import requests
import os
import re
import uuid
from PIL import Image, ImageEnhance
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from fastapi import UploadFile
from fastapi import *
from fastapi.responses import *
from fastapi.responses import JSONResponse
from fastapi import HTTPException
from dotenv import load_dotenv
from pydantic import BaseModel
from pymongo import MongoClient
from models import *
from RyuzakiLib import AsyicXSearcher
load_dotenv()
TOOLS_NEW_URL = os.environ["TOOLS_NEW_URL"]
MONGO_URL = os.environ["MONGO_URL"]
client_mongo = MongoClient(MONGO_URL)
db = client_mongo["tiktokbot"]
collection = db["users"]
router = APIRouter()
class XnxxSearch(BaseModel):
q: str
class XnxxLinks(BaseModel):
url: str
def get_all_api_keys():
user = collection.find({})
api_keys = []
for x in user:
api_key = x.get("ryuzaki_api_key")
if api_key:
api_keys.append(api_key)
return api_keys
def validate_api_key(api_key: str = Header(...)):
USERS_API_KEYS = get_all_api_keys()
if api_key not in USERS_API_KEYS:
raise HTTPException(status_code=401, detail="Invalid API key")
async def tools_search(
name=None,
parameter=None,
ai_model=None,
upload_check=False
):
if upload_check:
TOOLS_API_URL = f"{TOOLS_NEW_URL}/{ai_model}/{parameter}"
return TOOLS_API_URL
else:
TOOLS_API_URL = f"{TOOLS_NEW_URL}/tools/{name}?{parameter}"
return TOOLS_API_URL
async def toanime(input):
url = await tools_search(
ai_model="ai",
parameter="toanime",
upload_check=True
)
try:
image = Image.open(input)
buffer = io.BytesIO()
image.save(buffer, format='JPEG')
buffer.seek(0)
files = {
'image': ('toanime.jpg', buffer, 'image/jpeg')
}
response = requests.post(
url,
files=files,
headers={
'accept': 'application/json'
}
)
if response.status_code == 200:
data = response.json()
res = {
"image_data": data['result'],
"image_size": data['size']
}
return res
else:
return 'Identifikasi Gagal'
except Exception:
return 'Identifikasi Gagal'
@router.post("/akeno/toanime", response_model=SuccessResponse, responses={422: {"model": SuccessResponse}})
async def toanime_endpoint(
file: UploadFile = File(...),
api_key: str = Depends(validate_api_key)
):
file_path = f"./uploads/{file.filename}"
try:
with open(file_path, "wb") as f:
f.write(await file.read())
except Exception:
raise HTTPException(status_code=500, detail="Failed to save file")
try:
response = await toanime(file_path)
url_image = response["image_data"]
return SuccessResponse(
status="True",
randydev={"url": url_image}
)
except Exception:
return SuccessResponse(
status="False",
randydev={"error": "Error during processing"}
)
finally:
if os.path.exists(file_path):
os.remove(file_path)
@router.post("/akeno/xnxxsearch", response_model=SuccessResponse, responses={422: {"model": SuccessResponse}})
async def xnxx_search(
payload: XnxxSearch,
api_key: None = Depends(validate_api_key)
):
url = await tools_search(name="xnxxsearch", parameter=f"q={payload.query}")
try:
response = await AsyicXSearcher.search(url, re_json=True)
result = response["result"]
return SuccessResponse(
status="True",
randydev={"results": result}
)
except:
return SuccessResponse(
status="False",
randydev={"error": "Error fucking"}
)
@router.post("/akeno/xnxx-dl", response_model=SuccessResponse, responses={422: {"model": SuccessResponse}})
async def xnxx_download(
payload: XnxxLinks,
api_key: None = Depends(validate_api_key)
):
url = await tools_search(name="xnxxdl", parameter=f"url={payload.link}")
try:
response = await AsyicXSearcher.search(url, re_json=True)
result = response["result"]
return SuccessResponse(
status="True",
randydev={"results": result}
)
except:
return SuccessResponse(
status="False",
randydev={"error": "Error fucking"}
)
@router.post("/akeno/xnxx-videodl", response_model=SuccessResponse, responses={422: {"model": SuccessResponse}})
async def xnxx_videodl(
payload: XnxxLinks,
api_key: None = Depends(validate_api_key)
):
url = await tools_search(name="xvideosdl", parameter=f"url={payload.link}")
try:
response = await AsyicXSearcher.search(url, re_json=True)
result = response["result"]
return SuccessResponse(
status="True",
randydev={"results": result}
)
except:
return SuccessResponse(
status="False",
randydev={"error": "Error fucking"}
)
@router.post("/akeno/instagramdl", response_model=SuccessResponse, responses={422: {"model": SuccessResponse}})
async def instagramdl(
payload: XnxxLinks,
api_key: None = Depends(validate_api_key)
):
url = await tools_search(name="instagramdl", parameter=f"url={payload.link}")
try:
response = await AsyicXSearcher.search(url, re_json=True)
result = response["result"]
return SuccessResponse(
status="True",
randydev={"results": result}
)
except:
return SuccessResponse(
status="False",
randydev={"error": "Error fucking"}
)