import asyncio from fastapi import FastAPI, HTTPException from fastapi.responses import RedirectResponse import httpx from aiocache import caches, Cache from aiocache.decorators import cached app = FastAPI() # Configure the cache caches.set_config({ 'default': { 'cache': "aiocache.SimpleMemoryCache", 'ttl': 1800, # Time to live in seconds (30 minutes) } }) async def get_proxies(url): async with httpx.AsyncClient() as client: try: response = await client.get(url) response.raise_for_status() return response.text.strip().split('\n') except httpx.HTTPStatusError as e: raise HTTPException(status_code=500, detail=f"HTTP status error while fetching {url}: {str(e)}") except httpx.RequestError as e: raise HTTPException(status_code=500, detail=f"Request error while fetching {url}: {str(e)}") async def fetch_proxies(urls): proxies = [] tasks = [] for url in urls: task = asyncio.create_task(get_proxies(url)) tasks.append(task) results = await asyncio.gather(*tasks, return_exceptions=True) for result in results: if isinstance(result, Exception): # Log the error for debugging purposes print(f"Error occurred: {result}") continue proxies.extend(result) return proxies def get_http(): urls = [ "https://openproxylist.xyz/http.txt", "https://api.proxyscrape.com/v2/?request=displayproxies&protocol=http&timeout=10000&country=all&ssl=all&anonymity=all", "https://proxyspace.pro/http.txt", "https://proxyspace.pro/https.txt", "https://www.proxy-list.download/api/v1/get?type=http", "https://www.proxy-list.download/api/v1/get?type=https", "https://rootjazz.com/proxies/proxies.txt", "https://cdn.jsdelivr.net/gh/aslisk/proxyhttps/https.txt", "https://cdn.jsdelivr.net/gh/clarketm/proxy-list/proxy-list-raw.txt", "https://cdn.jsdelivr.net/gh/hendrikbgr/Free-Proxy-Repo/proxy_list.txt", "https://cdn.jsdelivr.net/gh/jetkai/proxy-list/online-proxies/txt/proxies-http.txt", "https://cdn.jsdelivr.net/gh/jetkai/proxy-list/online-proxies/txt/proxies-https.txt", "https://cdn.jsdelivr.net/gh/mmpx12/proxy-list/https.txt", "https://cdn.jsdelivr.net/gh/mmpx12/proxy-list/http.txt", "https://cdn.jsdelivr.net/gh/roosterkid/openproxylist/HTTPS_RAW.txt", "https://cdn.jsdelivr.net/gh/ShiftyTR/Proxy-List/https.txt", "https://cdn.jsdelivr.net/gh/sunny9577/proxy-scraper/proxies.txt" ] return urls def get_socks4(): urls = [ "https://openproxylist.xyz/socks4.txt", "https://api.proxyscrape.com/v2/?request=getproxies&protocol=socks4", "https://proxyspace.pro/socks4.txt", "https://www.proxy-list.download/api/v1/get?type=socks4", "https://cdn.jsdelivr.net/gh/B4RC0DE-TM/proxy-list/SOCKS4.txt", "https://cdn.jsdelivr.net/gh/jetkai/proxy-list/online-proxies/txt/proxies-socks4.txt", "https://cdn.jsdelivr.net/gh/roosterkid/openproxylist/SOCKS4_RAW.txt", "https://cdn.jsdelivr.net/gh/TheSpeedX/PROXY-List/socks4.txt" ] return urls def get_socks5(): urls = [ "https://openproxylist.xyz/socks5.txt", "https://api.proxyscrape.com/v2/?request=getproxies&protocol=socks5", "https://proxyspace.pro/socks5.txt", "https://www.proxy-list.download/api/v1/get?type=socks5", "https://cdn.jsdelivr.net/gh/jetkai/proxy-list/online-proxies/txt/proxies-socks5.txt", "https://cdn.jsdelivr.net/gh/mmpx12/proxy-list/socks5.txt", "https://cdn.jsdelivr.net/gh/roosterkid/openproxylist/SOCKS5_RAW.txt", "https://cdn.jsdelivr.net/gh/TheSpeedX/PROXY-List/socks5.txt" ] return urls def get_all(): urls = get_http() + get_socks4() + get_socks5() return urls @app.get("/", include_in_schema=False) async def head_root(): return RedirectResponse(url="/http") @cached(key="http_proxies", ttl=1800) @app.get("/http", response_model=dict) async def get_http_proxies(): urls = get_http() proxies = await fetch_proxies(urls) return {"proxies": proxies} @cached(key="socks4_proxies", ttl=1800) @app.get("/socks4", response_model=dict) async def get_socks4_proxies(): urls = get_socks4() proxies = await fetch_proxies(urls) return {"proxies": proxies} @cached(key="socks5_proxies", ttl=1800) @app.get("/socks5", response_model=dict) async def get_socks5_proxies(): urls = get_socks5() proxies = await fetch_proxies(urls) return {"proxies": proxies} @cached(key="all_proxies", ttl=1800) @app.get("/all", response_model=dict) async def get_all_proxies(): urls = get_all() proxies = await fetch_proxies(urls) return {"proxies": proxies} if __name__ == "__main__": import uvicorn uvicorn.run(app)