|
from asyncio import create_subprocess_shell, gather, sleep |
|
from logging import ERROR, INFO, basicConfig, getLogger |
|
from pathlib import Path |
|
from shutil import rmtree |
|
from subprocess import CalledProcessError, PIPE |
|
from typing import Any, List |
|
from uuid import uuid4 |
|
|
|
from PIL import Image |
|
from fastapi import FastAPI, HTTPException |
|
from fastapi.responses import PlainTextResponse |
|
from httpx import AsyncClient, HTTPStatusError, RequestError |
|
from pydantic import BaseModel, HttpUrl |
|
from ytelegraph import TelegraphAPI |
|
from uvicorn import run as uvicorn_run |
|
|
|
need_logging = False |
|
|
|
basicConfig(level=INFO if need_logging else ERROR) |
|
logger = getLogger(__name__) |
|
|
|
oxipng_bin = Path(__file__).parent / 'oxipng' |
|
if not oxipng_bin.stat().st_mode & 0o111: |
|
oxipng_bin.chmod(0o755) |
|
|
|
tokens = [ |
|
|
|
'7e0ea3da6a73d77003c1abba7f0ea13c', |
|
'bc2e68b5918e5bb59ebca6c05d73daf9', |
|
'fecbfbe0938bcd1df27b7a9be1702cc9', |
|
'04e9981d4d0981964cb4c9753173244d', |
|
'dee75b07981c7aa211628ea7c7cbc03d', |
|
] |
|
|
|
|
|
async def download_png(url: str, folder: str, client: AsyncClient, retries: int = 5) -> Path: |
|
|
|
for attempt in range(retries): |
|
try: |
|
response = await client.get(url) |
|
response.raise_for_status() |
|
file_path = Path(__file__).parent / folder / f'{uuid4()}.png' |
|
file_path.parent.mkdir(parents=True, exist_ok=True) |
|
file_path.write_bytes(response.content) |
|
return file_path |
|
except (HTTPStatusError, RequestError) as e: |
|
if attempt < retries - 1: |
|
await sleep(2 ** attempt) |
|
else: |
|
raise e |
|
|
|
|
|
async def download_pngs(urls: str | list[str]) -> list[Any]: |
|
urls = [urls] if isinstance(urls, str) else urls |
|
|
|
|
|
valid_urls = [url for url in urls if url and '\n' not in url and '\\n' not in url and url.strip() != ''] |
|
if len(valid_urls) != len(urls): |
|
print(f'некорректные ссылки удалены из списка: {set(urls) - set(valid_urls)}') |
|
async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client: |
|
tasks = [download_png(url, str(uuid4()), client) for url in valid_urls] |
|
return list(await gather(*tasks)) |
|
|
|
|
|
async def optimize_png(image_path: Path, retries: int = 3) -> None: |
|
command = f'{oxipng_bin.resolve()} --opt 2 --strip safe --out {image_path} {image_path}' |
|
|
|
for attempt in range(retries): |
|
try: |
|
process = await create_subprocess_shell(command, stdout=PIPE, stderr=PIPE) |
|
stdout, stderr = await process.communicate() |
|
if process.returncode == 0: |
|
return |
|
else: |
|
raise CalledProcessError(process.returncode, command, output=stdout, stderr=stderr) |
|
except CalledProcessError as e: |
|
print(f'ошибка при оптимизации {image_path}') |
|
if attempt < retries - 1: |
|
await sleep(2 ** attempt) |
|
else: |
|
raise e |
|
|
|
|
|
async def convert_to_jpeg(image_path: Path) -> Path: |
|
|
|
try: |
|
image = Image.open(image_path) |
|
output_path = image_path.with_suffix('.jpg') |
|
image.save(output_path, 'JPEG', quality=98, optimize=True) |
|
image_path.unlink(missing_ok=True) |
|
return output_path |
|
except: |
|
print(f'ошибка при конвертации {image_path}') |
|
return image_path |
|
|
|
|
|
async def convert_to_jpegs(image_paths: list[str | Path] | str | Path) -> tuple[Path]: |
|
image_paths = [Path(image_file) for image_file in ([image_paths] if not isinstance(image_paths, list) else image_paths)] |
|
|
|
tasks = [convert_to_jpeg(image_path) for image_path in image_paths] |
|
return await gather(*tasks) |
|
|
|
|
|
async def optimize_pngs(image_paths: list[str | Path] | str | Path) -> None: |
|
image_paths = [Path(image_file) for image_file in ([image_paths] if not isinstance(image_paths, list) else image_paths)] |
|
|
|
tasks = [optimize_png(image_path) for image_path in image_paths] |
|
await gather(*tasks) |
|
|
|
|
|
async def upload_image_to_imgbb(file_path: Path, file_type: str = 'png') -> str | None: |
|
for token in tokens: |
|
url = f'https://api.imgbb.com/1/upload?key={token}' |
|
try: |
|
with file_path.open('rb') as file: |
|
files = {'image': (file_path.name, file, f'image/{file_type}')} |
|
data = {} |
|
async with AsyncClient() as client: |
|
response = await client.post(url, files=files, data=data, timeout=30) |
|
response.raise_for_status() |
|
json = response.json() |
|
if json.get('success'): |
|
return json['data']['url'] |
|
except Exception as e: |
|
print(f"ошибка при загрузке с {token}: {e}") |
|
continue |
|
return None |
|
|
|
|
|
async def upload_image(file_path: Path | str, file_type: str = 'png') -> str | None: |
|
file_path = Path(file_path) |
|
return await upload_image_to_imgbb(file_path, file_type) |
|
|
|
|
|
|
|
async def process_image(old_url: str, image_path: Path, convert: bool) -> tuple[str, Path]: |
|
new_url = await upload_image(image_path, 'png' if not convert else 'jpeg') |
|
if new_url: |
|
print(f'загружено изображение {image_path} в {new_url}') |
|
else: |
|
new_url = old_url |
|
print(f'не удалось загрузить изображение {image_path}, оставим старую ссылку: {old_url}') |
|
|
|
try: |
|
image_path.unlink() |
|
except Exception as e: |
|
print(f'не удалось удалить файл {image_path}: {e}') |
|
|
|
return new_url, image_path |
|
|
|
|
|
async def optimize_and_upload(images_urls: List[str] | str, convert: bool = False) -> List[str]: |
|
images_urls = [images_urls] if isinstance(images_urls, str) else images_urls |
|
|
|
|
|
images_paths = await download_pngs(images_urls) |
|
if not convert: |
|
await optimize_pngs(images_paths) |
|
|
|
new_images_urls = [] |
|
images_paths = images_paths if not convert else await convert_to_jpegs(images_paths) |
|
|
|
tasks = [] |
|
for old_url, image_path in zip(images_urls, images_paths): |
|
tasks.append(process_image(old_url, image_path, convert)) |
|
|
|
results = await gather(*tasks) |
|
new_images_urls = [result[0] for result in results] |
|
print(f'новые ссылки: ({len(new_images_urls)}): {new_images_urls}') |
|
|
|
try: |
|
rmtree(images_paths[0].parent) |
|
except Exception as e: |
|
print(f'не удалось удалить файл {images_paths[0].parent}: {e}') |
|
|
|
if convert: |
|
content = '\n\n'.join([f'![]({url})' for url in new_images_urls]) |
|
try: |
|
ph = TelegraphAPI() |
|
ph_link = ph.create_page_md('Dall-E 3 картинки', content) |
|
return [ph_link] |
|
except Exception as e: |
|
print(f'не удалось создать страницу в телеграфе: {e}') |
|
try: |
|
from aiorentry.client import Client as RentryClient |
|
async with RentryClient('https://rentry.org') as client: |
|
page = await client.new_page(content) |
|
page_id = page.url |
|
print(f'https://rentry.co/{page_id}', f'https://rentry.org/{page_id}') |
|
return [f'https://rentry.co/{page_id}', f'https://rentry.org/{page_id}'] |
|
except Exception as e: |
|
print(f'не удалось создать страницу в rentry: {e}') |
|
return new_images_urls |
|
|
|
|
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
class ImageURLs(BaseModel): |
|
urls: List[HttpUrl] |
|
|
|
|
|
@app.get('/') |
|
async def read_root(): |
|
return PlainTextResponse('ну пролапс, ну и что', status_code=200) |
|
|
|
|
|
@app.post('/pngopt_by_urls/') |
|
async def optimize_images_endpoint(image_urls: ImageURLs): |
|
try: |
|
optimized_urls = await optimize_and_upload([str(url) for url in image_urls.urls]) |
|
return {"optimized_urls": optimized_urls} |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.post('/jpegs_by_urls/') |
|
async def optimize_images_endpoint(image_urls: ImageURLs): |
|
try: |
|
optimized_urls = await optimize_and_upload([str(url) for url in image_urls.urls], convert=True) |
|
return {"optimized_urls": optimized_urls} |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
if __name__ == "__main__": |
|
uvicorn_run(app, host='0.0.0.0', port=7860, timeout_keep_alive=90, log_level='info', use_colors=False) |
|
|
|
|