|
from asyncio import Queue, create_task, gather, sleep |
|
from contextlib import asynccontextmanager |
|
from json import dumps, loads |
|
from logging import INFO, basicConfig, getLogger |
|
from pathlib import Path |
|
from typing import Literal |
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler |
|
from fastapi import FastAPI, HTTPException |
|
from fastapi.responses import JSONResponse, PlainTextResponse |
|
from proxybroker import Broker |
|
from uvicorn import run as uvicorn_run |
|
|
|
|
|
scheduler = AsyncIOScheduler() |
|
|
|
try: |
|
workdir = Path(__file__).parent |
|
except: |
|
workdir = Path.cwd().parent |
|
|
|
logfile = workdir / 'log.log' |
|
|
|
basicConfig( |
|
level=INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
filename=str(logfile), |
|
filemode='a' |
|
) |
|
logger = getLogger('proxy_collector') |
|
logger.info('запуск...') |
|
|
|
|
|
def delete_logs(): |
|
try: |
|
logfile.unlink() |
|
except Exception as e: |
|
logger.error(f'ошибка при удалении логов: {e}') |
|
|
|
|
|
is_first_run = True |
|
|
|
http_collected_json = workdir / 'http_proxies.json' |
|
https_collected_json = workdir / 'https_proxies.json' |
|
socks5_collected_json = workdir / 'socks5_proxies.json' |
|
collected_json = workdir / 'proxies.json' |
|
|
|
countries_list = ['US', 'CA', 'FR', 'FI', 'HR', 'ME', 'CH', 'SE', 'EE', 'DE', 'GB', 'IT', 'NL', 'PL', 'CZ', 'RS', 'RO', 'MD', 'AT', 'BE', 'BG', 'HU', 'DK', 'IS', 'KZ', 'LV', 'LT', 'LU', 'NO', 'PT', 'SK', 'SI'] |
|
|
|
|
|
def create_json_from_proxies(proxy_lines: list[str], filename: Path): |
|
logger.info('сохранение файла прокси') |
|
countries = set() |
|
proxies = [] |
|
|
|
for line in proxy_lines: |
|
parts = line.split() |
|
country = parts[1] |
|
ping = float(parts[2].strip('s')) |
|
protocol = parts[3].strip('[]') |
|
host = parts[4].rstrip('>') |
|
|
|
if "HTTP:" in protocol: |
|
protocol = "HTTP" |
|
host = parts[5].rstrip(']>') |
|
|
|
countries.add(country) |
|
proxies.append({"country": country, "ping": ping, "protocol": protocol, "host": host}) |
|
|
|
data = { |
|
'countries': sorted(list(countries)), |
|
'proxies': proxies |
|
} |
|
filename.write_text(dumps(data, indent=4)) |
|
return filename |
|
|
|
|
|
async def collect_proxies(proxies_queue: Queue): |
|
proxies_list = [] |
|
while True: |
|
proxy = await proxies_queue.get() |
|
if proxy is None: |
|
break |
|
proxies_list.append(f'{proxy}') |
|
|
|
return proxies_list |
|
|
|
|
|
async def sort_proxies_and_merge(files: list[Path], output_file: Path): |
|
logger.info('объединение файлов прокси') |
|
all_countries = set() |
|
proxies_by_type = {} |
|
for file in files: |
|
if file.is_file() and file.stat().st_size > 0: |
|
data = loads(file.read_text(encoding='utf-8')) |
|
proxies = data.get('proxies') |
|
if proxies: |
|
first_proxy = proxies[0] if proxies else None |
|
proxy_type = first_proxy.get('protocol').lower() if first_proxy and first_proxy.get('protocol') else None |
|
if proxy_type: |
|
sorted_proxies = sorted(proxies, key=lambda x: x.get('ping')) |
|
proxies_by_type[proxy_type] = { |
|
'countries': list(set(proxy.get('country') for proxy in sorted_proxies if proxy.get('country'))), |
|
'proxies': sorted_proxies |
|
} |
|
all_countries.update(proxies_by_type[proxy_type]["countries"]) |
|
all_countries = sorted(all_countries) |
|
merged_data = {'countries': all_countries, 'proxies_by_type': proxies_by_type} |
|
output_file.write_text(dumps(merged_data, indent=4)) |
|
return output_file |
|
|
|
|
|
async def stop_broker_after_timeout(broker: Broker, timeout_minutes: int): |
|
await sleep(timeout_minutes * 60) |
|
try: |
|
broker.stop() |
|
except: |
|
pass |
|
|
|
|
|
async def find_proxies_by_type(proxy_type: Literal['HTTP', 'HTTPS', 'SOCKS5'], output_json_file: Path, timeout_minutes: int = 50): |
|
logger.info(f'начат сбор прокси {proxy_type}') |
|
output_json_file.write_text(dumps({'countries': None, 'proxies': []}, indent=4)) |
|
proxies_queue = Queue() |
|
broker = Broker(proxies_queue, timeout=8, max_conn=200, max_tries=3, verify_ssl=False) |
|
stop_task = create_task(stop_broker_after_timeout(broker, timeout_minutes)) |
|
await broker.find(types=[proxy_type], countries=countries_list, limit=0) |
|
await stop_task |
|
proxies_list = await collect_proxies(proxies_queue) |
|
saved_proxy = create_json_from_proxies(proxies_list, output_json_file) |
|
logger.info(f'завершён сбор прокси {proxy_type}') |
|
return saved_proxy |
|
|
|
|
|
async def find_proxies(): |
|
global is_first_run |
|
timeout_minutes = 10 if is_first_run else 50 |
|
logger.info(f'запущены задачи по сбору всех типов прокси') |
|
results = await gather( |
|
find_proxies_by_type('HTTP', http_collected_json, timeout_minutes), |
|
find_proxies_by_type('HTTPS', https_collected_json, timeout_minutes), |
|
find_proxies_by_type('SOCKS5', socks5_collected_json, timeout_minutes) |
|
) |
|
await sort_proxies_and_merge(list(results), collected_json) |
|
is_first_run = False |
|
logger.info(f'задачи по сбору прокси завершены') |
|
|
|
|
|
scheduler.add_job(find_proxies, 'interval', max_instances=1, minutes=60) |
|
scheduler.add_job(delete_logs, 'interval', max_instances=1, minutes=1440) |
|
|
|
|
|
@asynccontextmanager |
|
async def app_lifespan(app: FastAPI): |
|
scheduler.start() |
|
task = create_task(find_proxies()) |
|
yield |
|
await task |
|
scheduler.shutdown() |
|
|
|
|
|
app = FastAPI(lifespan=app_lifespan) |
|
|
|
|
|
def not_redy_yet(): |
|
return JSONResponse({"error": "ёще не готово, сбор и проверка прокси занимает около часа"}, status_code=200) |
|
|
|
|
|
@app.post('*') |
|
async def read_root(): |
|
return HTTPException(405) |
|
|
|
|
|
@app.get('/all/') |
|
async def get_proxies(): |
|
if collected_json.exists(): |
|
return loads(collected_json.read_text()) |
|
else: |
|
return not_redy_yet() |
|
|
|
|
|
@app.get('/http/') |
|
async def get_http_proxies(): |
|
if http_collected_json.exists(): |
|
return loads(http_collected_json.read_text()) |
|
else: |
|
return not_redy_yet() |
|
|
|
|
|
@app.get('/https/') |
|
async def get_https_proxies(): |
|
if https_collected_json.exists(): |
|
return loads(https_collected_json.read_text()) |
|
else: |
|
return not_redy_yet() |
|
|
|
|
|
@app.get('/socks5/') |
|
async def get_socks5_proxies(): |
|
if socks5_collected_json.exists(): |
|
return loads(socks5_collected_json.read_text()) |
|
else: |
|
return not_redy_yet() |
|
|
|
|
|
@app.get('/log/') |
|
async def get_logs(): |
|
if logfile.exists(): |
|
return PlainTextResponse(logfile.read_text(encoding='utf-8'), status_code=200) |
|
else: |
|
return PlainTextResponse('лог пуст', status_code=200) |
|
|
|
|
|
@app.get('/') |
|
async def read_root(): |
|
return PlainTextResponse('ну пролапс, ну и что', status_code=200) |
|
|
|
|
|
if __name__ == "__main__": |
|
uvicorn_run(app, host='0.0.0.0', port=7860, timeout_keep_alive=90) |
|
|