from asyncio import Queue, create_task, gather, sleep |
from contextlib import asynccontextmanager |
from json import dumps, loads |
from logging import INFO, basicConfig, getLogger |
from pathlib import Path |
from typing import Literal |
from apscheduler.schedulers.asyncio import AsyncIOScheduler |
from fastapi import FastAPI, HTTPException |
from fastapi.responses import JSONResponse, PlainTextResponse |
from proxybroker import Broker |
from uvicorn import run as uvicorn_run |
scheduler = AsyncIOScheduler() |
try: |
workdir = Path(__file__).parent |
except: |
workdir = Path.cwd().parent |
logfile = workdir / 'log.log' |
basicConfig( |
level=INFO, |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
filename=str(logfile), |
filemode='a' |
) |
logger = getLogger('proxy_collector') |
logger.info('запуск...') |
def delete_logs(): |
try: |
logfile.unlink() |
except Exception as e: |
logger.error(f'ошибка при удалении логов: {e}') |
is_first_run = True |
http_collected_json = workdir / 'http_proxies.json' |
https_collected_json = workdir / 'https_proxies.json' |
socks5_collected_json = workdir / 'socks5_proxies.json' |
collected_json = workdir / 'proxies.json' |
countries_list = ['US', 'CA', 'FR', 'FI', 'HR', 'ME', 'CH', 'SE', 'EE', 'DE', 'GB', 'IT', 'NL', 'PL', 'CZ', 'RS', 'RO', 'MD', 'AT', 'BE', 'BG', 'HU', 'DK', 'IS', 'KZ', 'LV', 'LT', 'LU', 'NO', 'PT', 'SK', 'SI'] |
def create_json_from_proxies(proxy_lines: list[str], filename: Path): |
logger.info('сохранение файла прокси') |
countries = set() |
proxies = [] |
for line in proxy_lines: |
parts = line.split() |
country = parts[1] |
ping = float(parts[2].strip('s')) |
protocol = parts[3].strip('[]') |
host = parts[4].rstrip('>') |
if "HTTP:" in protocol: |
protocol = "HTTP" |
host = parts[5].rstrip(']>') |
countries.add(country) |
proxies.append({"country": country, "ping": ping, "protocol": protocol, "host": host}) |
data = { |
'countries': sorted(list(countries)), |
'proxies': proxies |
} |
filename.write_text(dumps(data, indent=4)) |
return filename |
async def collect_proxies(proxies_queue: Queue): |
proxies_list = [] |
while True: |
proxy = await proxies_queue.get() |
if proxy is None: |
break |
proxies_list.append(f'{proxy}') |
return proxies_list |
async def sort_proxies_and_merge(files: list[Path], output_file: Path): |
logger.info('объединение файлов прокси') |
all_countries = set() |
proxies_by_type = {} |
for file in files: |
if file.is_file() and file.stat().st_size > 0: |
data = loads(file.read_text(encoding='utf-8')) |
proxies = data.get('proxies') |
if proxies: |
first_proxy = proxies[0] if proxies else None |
proxy_type = first_proxy.get('protocol').lower() if first_proxy and first_proxy.get('protocol') else None |
if proxy_type: |
sorted_proxies = sorted(proxies, key=lambda x: x.get('ping')) |
proxies_by_type[proxy_type] = { |
'countries': list(set(proxy.get('country') for proxy in sorted_proxies if proxy.get('country'))), |
'proxies': sorted_proxies |
} |
all_countries.update(proxies_by_type[proxy_type]["countries"]) |
all_countries = sorted(all_countries) |
merged_data = {'countries': all_countries, 'proxies_by_type': proxies_by_type} |
output_file.write_text(dumps(merged_data, indent=4)) |
return output_file |
async def stop_broker_after_timeout(broker: Broker, timeout_minutes: int): |
await sleep(timeout_minutes * 60) |
try: |
broker.stop() |
except: |
pass |
async def find_proxies_by_type(proxy_type: Literal['HTTP', 'HTTPS', 'SOCKS5'], output_json_file: Path, timeout_minutes: int = 50): |
logger.info(f'начат сбор прокси {proxy_type}') |
output_json_file.write_text(dumps({'countries': None, 'proxies': []}, indent=4)) |
proxies_queue = Queue() |
broker = Broker(proxies_queue, timeout=8, max_conn=200, max_tries=3, verify_ssl=False) |
stop_task = create_task(stop_broker_after_timeout(broker, timeout_minutes)) |
await broker.find(types=[proxy_type], countries=countries_list, limit=0) |
await stop_task |
proxies_list = await collect_proxies(proxies_queue) |
saved_proxy = create_json_from_proxies(proxies_list, output_json_file) |
logger.info(f'завершён сбор прокси {proxy_type}') |
return saved_proxy |
async def find_proxies(): |
global is_first_run |
timeout_minutes = 10 if is_first_run else 50 |
logger.info(f'запущены задачи по сбору всех типов прокси') |
results = await gather( |
find_proxies_by_type('HTTP', http_collected_json, timeout_minutes), |
find_proxies_by_type('HTTPS', https_collected_json, timeout_minutes), |
find_proxies_by_type('SOCKS5', socks5_collected_json, timeout_minutes) |
) |
await sort_proxies_and_merge(list(results), collected_json) |
is_first_run = False |
logger.info(f'задачи по сбору прокси завершены') |
scheduler.add_job(find_proxies, 'interval', max_instances=1, minutes=60) |
scheduler.add_job(delete_logs, 'interval', max_instances=1, minutes=1440) |
@asynccontextmanager |
async def app_lifespan(app: FastAPI): |
scheduler.start() |
task = create_task(find_proxies()) |
yield |
await task |
scheduler.shutdown() |
app = FastAPI(lifespan=app_lifespan) |
def not_redy_yet(): |
return JSONResponse({"error": "ёще не готово, сбор и проверка прокси занимает около часа"}, status_code=200) |
@app.post('*') |
async def read_root(): |
return HTTPException(405) |
@app.get('/all/') |
async def get_proxies(): |
if collected_json.exists(): |
return loads(collected_json.read_text()) |
else: |
return not_redy_yet() |
@app.get('/http/') |
async def get_http_proxies(): |
if http_collected_json.exists(): |
return loads(http_collected_json.read_text()) |
else: |
return not_redy_yet() |
@app.get('/https/') |
async def get_https_proxies(): |
if https_collected_json.exists(): |
return loads(https_collected_json.read_text()) |
else: |
return not_redy_yet() |
@app.get('/socks5/') |
async def get_socks5_proxies(): |
if socks5_collected_json.exists(): |
return loads(socks5_collected_json.read_text()) |
else: |
return not_redy_yet() |
@app.get('/log/') |
async def get_logs(): |
if logfile.exists(): |
return PlainTextResponse(logfile.read_text(encoding='utf-8'), status_code=200) |
else: |
return PlainTextResponse('лог пуст', status_code=200) |
@app.get('/') |
async def read_root(): |
return PlainTextResponse('ну пролапс, ну и что', status_code=200) |
if __name__ == "__main__": |
uvicorn_run(app, host='', port=7860, timeout_keep_alive=90) |