| """ |
| API OpenAI-compatible wrappant aifreeforever.com |
| Endpoints : |
| GET /v1/models |
| POST /v1/chat/completions (stream=false & stream=true) |
| """ |
|
|
| import asyncio, json, time, uuid |
| from typing import Optional |
| from fastapi import FastAPI, HTTPException |
| from fastapi.responses import StreamingResponse |
| from pydantic import BaseModel |
| from playwright.async_api import async_playwright, Page, TimeoutError as PWTimeout |
|
|
| |
| |
| |
| app = FastAPI(title="Free Chat API") |
|
|
| |
| SEM = asyncio.Semaphore(1) |
|
|
| MODEL_NAME = "aifreeforever" |
|
|
| |
| |
| |
| class Message(BaseModel): |
| role: str |
| content: str |
|
|
| class ChatRequest(BaseModel): |
| model: Optional[str] = MODEL_NAME |
| messages: list[Message] |
| stream: Optional[bool] = False |
| temperature: Optional[float] = None |
| max_tokens: Optional[int] = None |
|
|
| |
| |
| |
| URL = "https://aifreeforever.com/tools/free-chatgpt-no-login" |
| HEADLESS = True |
| MAX_RETRIES = 5 |
|
|
| SEL_TEXTAREA = 'textarea[placeholder*="Ask anything"]' |
| SEL_SEND_BTN = 'button.absolute.right-3.top-4.w-10.h-10' |
| SEL_BOT_MSG = '.flex.justify-start .rounded-2xl.shadow-sm.bg-white.border.border-gray-100' |
| SEL_BOT_CONTENT = '.markdown-content' |
| SEL_ACCEPT_BTN = 'button:has-text("Accept & Continue")' |
| SEL_AGE_BTN = 'button:has-text("13 and Over")' |
| SEL_COPY_BTN = ( |
| 'button.text-xs.text-gray-500.flex.items-center.gap-1' |
| '.px-2.py-1.rounded-md.transition-colors:has-text("Copy")' |
| ) |
| SEL_ERROR_MSG = 'p.text-red-600:has-text("Failed to send message")' |
|
|
| COOKIE_SELECTORS = [ |
| 'button.unic-agree-all-button', |
| 'button:has-text("Accepter et continuer")', |
| 'button:has-text("Accepter tout")', |
| 'button:has-text("Accept all")', |
| 'button:has-text("Accept All")', |
| 'button:has-text("I agree")', |
| ] |
|
|
| AD_SELECTORS = [ |
| 'button:has(path[fill="#1D1D1B"][opacity="0.7"])', |
| 'button:has(path[style*="stroke: rgb(255, 255, 255)"][style*="stroke-width: 6.353"])', |
| 'button[aria-label*="close" i]', |
| 'button[aria-label*="fermer" i]', |
| 'button[title*="close" i]', |
| 'button:has-text("Γ")', |
| 'button:has-text("β")', |
| 'button:has-text("Close")', |
| '[class*="close-btn"]', |
| ] |
|
|
| |
| |
| |
|
|
| async def _try_click(page: Page, selector: str, timeout: int) -> bool: |
| try: |
| btn = page.locator(selector).first |
| await btn.wait_for(state="visible", timeout=timeout) |
| await btn.click() |
| return True |
| except PWTimeout: |
| return False |
|
|
| async def click_first_visible(page: Page, selectors: list[str], timeout: int = 4000): |
| tasks = [_try_click(page, sel, timeout) for sel in selectors] |
| results = await asyncio.gather(*tasks, return_exceptions=True) |
| return any(r is True for r in results) |
|
|
| async def click_if_visible(page: Page, selector: str, timeout: int = 8000) -> bool: |
| return await _try_click(page, selector, timeout) |
|
|
| async def send_with_retry(page: Page, prompt: str) -> bool: |
| for attempt in range(1, MAX_RETRIES + 1): |
| if attempt > 1: |
| await page.locator(SEL_TEXTAREA).fill(prompt) |
| await page.locator(SEL_SEND_BTN).click(timeout=15_000) |
| await asyncio.sleep(2) |
| if await page.locator(SEL_ERROR_MSG).count() == 0: |
| return True |
| return False |
|
|
| async def try_copy_button(page: Page) -> str | None: |
| try: |
| msgs = page.locator(SEL_BOT_MSG) |
| count = await msgs.count() |
| if count == 0: |
| return None |
| last_msg = None |
| for i in range(count - 1, -1, -1): |
| msg = msgs.nth(i) |
| if await msg.locator(SEL_BOT_CONTENT).count() > 0: |
| last_msg = msg |
| break |
| if last_msg is None: |
| last_msg = msgs.nth(count - 1) |
| copy_btn = last_msg.locator(SEL_COPY_BTN) |
| await copy_btn.wait_for(state="visible", timeout=3000) |
| await copy_btn.click() |
| await asyncio.sleep(0.8) |
| text = await page.evaluate("async () => navigator.clipboard.readText()") |
| if text and len(text.strip()) > 20: |
| return text.strip() |
| except Exception: |
| pass |
| return None |
|
|
| async def scrape_last_bot_message(page: Page) -> str | None: |
| try: |
| msgs = page.locator(SEL_BOT_MSG) |
| count = await msgs.count() |
| for i in range(count - 1, -1, -1): |
| msg = msgs.nth(i) |
| content_el = msg.locator(SEL_BOT_CONTENT) |
| if await content_el.count() > 0: |
| text = (await content_el.first.inner_text()).strip() |
| if len(text) > 20: |
| return text |
| else: |
| text = (await msg.inner_text()).strip() |
| if len(text) > 80 and "Accept & Continue" not in text: |
| return text |
| except Exception: |
| pass |
| return None |
|
|
| async def get_response(page: Page) -> str: |
| text = await try_copy_button(page) |
| if text: |
| return text |
| return await scrape_last_bot_message(page) or "" |
|
|
| async def wait_for_stream_end(page: Page, timeout_s: int = 120) -> None: |
| prev_text = "" |
| stable = 0 |
| elapsed = 0.0 |
| interval = 0.6 |
| while elapsed < timeout_s: |
| await asyncio.sleep(interval) |
| elapsed += interval |
| msgs = page.locator(SEL_BOT_MSG) |
| count = await msgs.count() |
| if count == 0: |
| continue |
| current = "" |
| for i in range(count - 1, -1, -1): |
| content_el = msgs.nth(i).locator(SEL_BOT_CONTENT) |
| if await content_el.count() > 0: |
| t = (await content_el.first.inner_text()).strip() |
| if len(t) > 20: |
| current = t |
| break |
| if not current: |
| continue |
| if current != prev_text: |
| stable = 0 |
| else: |
| stable += 1 |
| if stable >= 3: |
| try: |
| await msgs.nth(count - 1).locator(SEL_COPY_BTN).wait_for( |
| state="visible", timeout=1500 |
| ) |
| return |
| except PWTimeout: |
| stable = 0 |
| prev_text = current |
|
|
| |
| |
| |
|
|
| async def chat(prompt: str) -> str: |
| async with async_playwright() as p: |
| browser = await p.chromium.launch( |
| headless=HEADLESS, |
| args=[ |
| "--disable-blink-features=AutomationControlled", |
| "--disable-extensions", "--disable-default-apps", |
| "--no-first-run", "--no-sandbox", "--disable-gpu", |
| "--disable-dev-shm-usage", |
| "--window-size=1280,720", |
| ], |
| ) |
| context = await browser.new_context( |
| user_agent=( |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
| "AppleWebKit/537.36 (KHTML, like Gecko) " |
| "Chrome/124.0.0.0 Safari/537.36" |
| ), |
| viewport={"width": 1280, "height": 720}, |
| permissions=["clipboard-read", "clipboard-write"], |
| java_script_enabled=True, |
| ) |
| await context.route( |
| "**/*", |
| lambda route: route.abort() |
| if route.request.resource_type in ("image", "media", "font") |
| else route.continue_(), |
| ) |
| page = await context.new_page() |
|
|
| try: |
| await page.goto(URL, wait_until="domcontentloaded", timeout=60_000) |
|
|
| await asyncio.gather( |
| click_first_visible(page, AD_SELECTORS, timeout=3000), |
| click_first_visible(page, COOKIE_SELECTORS, timeout=5000), |
| ) |
|
|
| await page.wait_for_selector(SEL_TEXTAREA, timeout=30_000) |
| await page.locator(SEL_TEXTAREA).fill(prompt) |
|
|
| if not await send_with_retry(page, prompt): |
| return "" |
|
|
| await asyncio.gather( |
| click_if_visible(page, SEL_AGE_BTN, timeout=8000), |
| click_if_visible(page, SEL_ACCEPT_BTN, timeout=8000), |
| ) |
|
|
| await wait_for_stream_end(page, timeout_s=120) |
| response = await get_response(page) |
| finally: |
| await browser.close() |
|
|
| return response |
|
|
| |
| |
| |
|
|
| def _make_id(): |
| return f"chatcmpl-{uuid.uuid4().hex[:29]}" |
|
|
| def _completion_response(content: str, model: str) -> dict: |
| return { |
| "id": _make_id(), |
| "object": "chat.completion", |
| "created": int(time.time()), |
| "model": model, |
| "choices": [ |
| { |
| "index": 0, |
| "message": {"role": "assistant", "content": content}, |
| "finish_reason": "stop", |
| } |
| ], |
| "usage": { |
| "prompt_tokens": 0, |
| "completion_tokens": 0, |
| "total_tokens": 0, |
| }, |
| } |
|
|
| async def _stream_chunks(content: str, model: str): |
| """Génère des SSE au format OpenAI streaming.""" |
| cid = _make_id() |
| created = int(time.time()) |
|
|
| |
| chunk = { |
| "id": cid, "object": "chat.completion.chunk", |
| "created": created, "model": model, |
| "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}], |
| } |
| yield f"data: {json.dumps(chunk)}\n\n" |
|
|
| |
| words = content.split(" ") |
| for i, word in enumerate(words): |
| token = word if i == 0 else f" {word}" |
| chunk = { |
| "id": cid, "object": "chat.completion.chunk", |
| "created": created, "model": model, |
| "choices": [{"index": 0, "delta": {"content": token}, "finish_reason": None}], |
| } |
| yield f"data: {json.dumps(chunk)}\n\n" |
| await asyncio.sleep(0.02) |
|
|
| |
| chunk = { |
| "id": cid, "object": "chat.completion.chunk", |
| "created": created, "model": model, |
| "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], |
| } |
| yield f"data: {json.dumps(chunk)}\n\n" |
| yield "data: [DONE]\n\n" |
|
|
| |
| |
| |
|
|
| @app.get("/") |
| async def root(): |
| return {"status": "ok", "message": "OpenAI-compatible API. Use /v1/chat/completions"} |
|
|
| @app.get("/v1/models") |
| async def list_models(): |
| return { |
| "object": "list", |
| "data": [ |
| { |
| "id": MODEL_NAME, |
| "object": "model", |
| "created": 1700000000, |
| "owned_by": "aifreeforever", |
| } |
| ], |
| } |
|
|
| @app.post("/v1/chat/completions") |
| async def chat_completions(req: ChatRequest): |
| |
| |
| parts = [] |
| for m in req.messages: |
| if m.role == "system": |
| parts.append(f"[System] {m.content}") |
| elif m.role == "user": |
| parts.append(f"{m.content}") |
| elif m.role == "assistant": |
| parts.append(f"[Assistant] {m.content}") |
| prompt = "\n\n".join(parts) |
|
|
| if not prompt.strip(): |
| raise HTTPException(status_code=400, detail="Empty prompt") |
|
|
| |
| async with SEM: |
| try: |
| content = await chat(prompt) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| if not content: |
| raise HTTPException(status_code=502, detail="No response from upstream") |
|
|
| model = req.model or MODEL_NAME |
|
|
| |
| if req.stream: |
| return StreamingResponse( |
| _stream_chunks(content, model), |
| media_type="text/event-stream", |
| ) |
|
|
| |
| return _completion_response(content, model) |