from base64 import b64decode from contextlib import asynccontextmanager from datetime import datetime from io import BytesIO from json import loads from logging import DEBUG, Formatter, INFO, StreamHandler, WARNING, getLogger from pathlib import Path from typing import AsyncGenerator, BinaryIO from apscheduler.schedulers.asyncio import AsyncIOScheduler from fastapi import FastAPI, File, Form, Request, UploadFile from fastapi.responses import FileResponse, HTMLResponse, JSONResponse from httpx import AsyncClient from patchright.async_api import Page, async_playwright from prlps_fakeua import UserAgent from starlette.responses import Response ua = UserAgent() AUTH_TOKEN = '' screenshot_path = Path(__file__).parent / 'screenshot.jpeg' token_path = Path(__file__).parent / 'token.json' BROWSER_TIMEOUT_SECONDS = 15 logger = getLogger('RHYMES_AI_API') logger.setLevel(DEBUG) handler = StreamHandler() handler.setLevel(INFO) formatter = Formatter('%(asctime)s | %(levelname)s : %(message)s', datefmt='%d.%m.%Y %H:%M:%S') handler.setFormatter(formatter) logger.addHandler(handler) getLogger('httpx').setLevel(WARNING) logger.info('инициализация приложения...') async def make_screenshot(page: Page): await page.screenshot(type='jpeg', path=screenshot_path.resolve().as_posix(), quality=85, full_page=True) logger.info('скриншот создан') async def refresh_token(): max_timeout = BROWSER_TIMEOUT_SECONDS * 1000 user_token = None async with async_playwright() as playwright: logger.info('запуск браузера') browser = await playwright.chromium.launch(headless=True, args=['--disable-blink-features=AutomationControlled']) context = await browser.new_context( color_scheme='dark', ignore_https_errors=True, locale='en-US', user_agent=ua.random, no_viewport=True, ) context.set_default_timeout(max_timeout) context.set_default_navigation_timeout(max_timeout) page = await context.new_page() page.set_default_timeout(max_timeout) page.set_default_navigation_timeout(max_timeout) try: logger.info('открытие страницы авторизации') await page.goto('https://rhymes.ai/', wait_until='networkidle') await make_screenshot(page) local_storage = await page.evaluate('() => {return JSON.stringify(localStorage);}') user_token = loads(local_storage).get('y-rhymes-user-token', '').split('|-door-|')[-1] await make_screenshot(page) finally: await page.close() await context.close() await browser.close() logger.info('работа браузера завершена') return user_token async def get_token(): global AUTH_TOKEN token = await refresh_token() if token: AUTH_TOKEN = token logger.info('токен получен') else: logger.error('токен не был получен') async def periodic_get_token(scheduler: AsyncIOScheduler): logger.info('запуск задачи периодического обновления токена') scheduler.add_job( get_token, trigger='interval', hours=24, next_run_time=datetime.now(), misfire_grace_time=3600 ) async def process_requests(api_key: str, image_file: BinaryIO, content: str) -> str | None: async with AsyncClient() as client: models_response = await client.get( 'https://api.rhymes.ai/web/v1/models', headers={ 'accept': '*/*', 'authorization': api_key, 'content-type': 'application/json' } ) if models_response.status_code != 200: return None models_data = models_response.json() model_id = models_data.get('data', [{}])[0].get('id') if not model_id: return None conversation_response = await client.post( 'https://api.rhymes.ai/web/v1/conversation/create', headers={ 'accept': '*/*', 'authorization': api_key, 'content-type': 'application/json' }, json={ 'model': model_id, 'language': 'en' } ) if conversation_response.status_code != 200: return None conversation_data = conversation_response.json() conversation_id = conversation_data.get('data', {}).get('conversationId') if not conversation_id: return None upload_response = await client.post( 'https://api.rhymes.ai/web/v1/upload/files', headers={ 'accept': '*/*', 'authorization': api_key }, files={ 'file': ('image.jpeg', image_file, 'image/jpeg'), 'la': (None, '1') } ) if upload_response.status_code != 200: return None upload_data = upload_response.json() file_url = upload_data.get('data', [{}])[0].get('fileUrl') if not file_url: return None chat_response = await client.post( 'https://api.rhymes.ai/web/v1/stream/chat', headers={ 'accept': 'text/event-stream', 'authorization': api_key, 'content-type': 'application/json' }, json={ 'content': content, 'conversationId': conversation_id, 'regenerate': 0, 'parent_message_id': None, 'model': model_id, 'imageList': [file_url], 'videoImageUrl': {}, 'fullImagePath': '', 'split_image': False } ) if chat_response.status_code != 200: return None async for line in chat_response.aiter_lines(): if line.startswith('data:'): message_data = loads(line[5:]) if message_data.get('lastOne'): result: str = message_data.get('content') return result return None @asynccontextmanager async def app_lifespan(_) -> AsyncGenerator: logger.info('запуск приложения') scheduler = AsyncIOScheduler() await periodic_get_token(scheduler) try: logger.info('запуск переодических задач') scheduler.start() logger.info('старт API') yield finally: scheduler.shutdown() logger.info('приложение завершено') app = FastAPI(lifespan=app_lifespan, title='RHYMES_AI_API') banned_endpoints = [ '/openapi.json', '/docs', '/docs/oauth2-redirect', 'swagger_ui_redirect', '/redoc', ] @app.middleware('http') async def block_banned_endpoints(request: Request, call_next): logger.debug(f'получен запрос: {request.url.path}') if request.url.path in banned_endpoints: logger.warning(f'запрещенный endpoint: {request.url.path}') return Response(status_code=403) response = await call_next(request) return response @app.post('/describe/') async def describe(question: str = Form(...), file: UploadFile = File(...)): logger.info('запрос `describe`') caption = await process_requests(AUTH_TOKEN, file.file, question) return JSONResponse({'caption': caption}, status_code=200, media_type='application/json') @app.post('/v1/describe') async def describe_v1(request: Request): logger.info('запрос `describe_v1`') body = await request.json() content_text = '' image_data = None messages = body.get('messages', []) for message in messages: role = message.get('role') content = message.get('content') if role in ['system', 'user']: if isinstance(content, str): content_text += content + ' ' elif isinstance(content, list): for item in content: if item.get('type') == 'text': content_text += item.get('text', '') + ' ' elif item.get('type') == 'image_url': image_url = item.get('image_url', {}) url = image_url.get('url') if url and url.startswith('data:image/'): base64_encoded_data = url.split('base64,', 1)[1] image_data = b64decode(base64_encoded_data) if not content_text.strip() or image_data is None: return JSONResponse({'caption': 'изображение должно быть передано как строка base64 `data:image/jpeg;base64,{base64_img}` а также текст'}, status_code=400) try: caption = await process_requests(AUTH_TOKEN, BytesIO(image_data), content_text.strip()) return JSONResponse({'caption': caption}, status_code=200) except Exception as e: return JSONResponse({'caption': str(e)}, status_code=500) @app.get('/') async def root(): return HTMLResponse('ну пролапс, ну и что', status_code=200) @app.get('/api/update_token') async def update_token(): logger.info('запрос `update_token`') task = get_token() return JSONResponse({'status': 'обновление токена запущено'}, status_code=200, media_type='application/json') @app.get('/api/show_last_screen') async def show_last_screen(): logger.info('запрос `show_last_screen`') return FileResponse(screenshot_path.resolve().as_posix(), media_type='image/jpeg', status_code=200) if __name__ == '__main__': from uvicorn import run as uvicorn_run logger.info('запуск сервера uvicorn') uvicorn_run(app, host='0.0.0.0', port=7860)