oxipng / app.py
getapi's picture
Update app.py
873da01 verified
raw
history blame
9.26 kB
from asyncio import create_subprocess_shell, gather, sleep
from logging import ERROR, INFO, basicConfig, getLogger
from pathlib import Path
from shutil import rmtree
from subprocess import CalledProcessError, PIPE
from typing import Any, List
from uuid import uuid4
from PIL import Image
from fastapi import FastAPI, HTTPException
from fastapi.responses import PlainTextResponse
from httpx import AsyncClient, HTTPStatusError, RequestError
from pydantic import BaseModel, HttpUrl
from ytelegraph import TelegraphAPI
from uvicorn import run as uvicorn_run
need_logging = False
basicConfig(level=INFO if need_logging else ERROR)
logger = getLogger(__name__)
oxipng_bin = Path(__file__).parent / 'oxipng'
if not oxipng_bin.stat().st_mode & 0o111:
oxipng_bin.chmod(0o755)
tokens = [
# мне в общем-то все равно на эти токены
'7e0ea3da6a73d77003c1abba7f0ea13c',
'bc2e68b5918e5bb59ebca6c05d73daf9',
'fecbfbe0938bcd1df27b7a9be1702cc9',
'04e9981d4d0981964cb4c9753173244d',
'dee75b07981c7aa211628ea7c7cbc03d',
]
async def download_png(url: str, folder: str, client: AsyncClient, retries: int = 5) -> Path:
print(f'загрузка изображения: {url}')
for attempt in range(retries):
try:
response = await client.get(url)
response.raise_for_status()
file_path = Path(__file__).parent / folder / f'{uuid4()}.png'
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_bytes(response.content)
return file_path
except (HTTPStatusError, RequestError) as e:
if attempt < retries - 1:
await sleep(2 ** attempt)
else:
raise e
async def download_pngs(urls: str | list[str]) -> list[Any]:
urls = [urls] if isinstance(urls, str) else urls
print(f'скачивается список список из {len(urls)}: {urls}')
# бот coze имеет баг, и из воркфлоу прибавляет предыдущий ответ к ссылкам, если включен контекст чата:
valid_urls = [url for url in urls if url and '\n' not in url and '\\n' not in url and url.strip() != '']
if len(valid_urls) != len(urls):
print(f'некорректные ссылки удалены из списка: {set(urls) - set(valid_urls)}')
async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client:
tasks = [download_png(url, str(uuid4()), client) for url in valid_urls]
return list(await gather(*tasks))
async def optimize_png(image_path: Path, retries: int = 3) -> None:
command = f'{oxipng_bin.resolve()} --opt 2 --strip safe --out {image_path} {image_path}'
print(f'оптимизация картинки {image_path}')
for attempt in range(retries):
try:
process = await create_subprocess_shell(command, stdout=PIPE, stderr=PIPE)
stdout, stderr = await process.communicate()
if process.returncode == 0:
return
else:
raise CalledProcessError(process.returncode, command, output=stdout, stderr=stderr)
except CalledProcessError as e:
print(f'ошибка при оптимизации {image_path}')
if attempt < retries - 1:
await sleep(2 ** attempt)
else:
raise e
async def convert_to_jpeg(image_path: Path) -> Path:
print(f'конвертируется {image_path}')
try:
image = Image.open(image_path)
output_path = image_path.with_suffix('.jpg')
image.save(output_path, 'JPEG', quality=98, optimize=True)
image_path.unlink(missing_ok=True)
return output_path
except:
print(f'ошибка при конвертации {image_path}')
return image_path
async def convert_to_jpegs(image_paths: list[str | Path] | str | Path) -> tuple[Path]:
image_paths = [Path(image_file) for image_file in ([image_paths] if not isinstance(image_paths, list) else image_paths)]
print(f'оптимизируется список список из {len(image_paths)}: {image_paths}')
tasks = [convert_to_jpeg(image_path) for image_path in image_paths]
return await gather(*tasks)
async def optimize_pngs(image_paths: list[str | Path] | str | Path) -> None:
image_paths = [Path(image_file) for image_file in ([image_paths] if not isinstance(image_paths, list) else image_paths)]
print(f'оптимизируется список список из {len(image_paths)}: {image_paths}')
tasks = [optimize_png(image_path) for image_path in image_paths]
await gather(*tasks)
async def upload_image_to_imgbb(file_path: Path, file_type: str = 'png') -> str | None:
for token in tokens:
url = f'https://api.imgbb.com/1/upload?key={token}'
try:
with file_path.open('rb') as file:
files = {'image': (file_path.name, file, f'image/{file_type}')}
data = {}
async with AsyncClient() as client:
response = await client.post(url, files=files, data=data, timeout=30)
response.raise_for_status()
json = response.json()
if json.get('success'):
return json['data']['url']
except Exception as e:
print(f"ошибка при загрузке с {token}: {e}")
continue
return None
async def upload_image(file_path: Path | str, file_type: str = 'png') -> str | None:
file_path = Path(file_path)
return await upload_image_to_imgbb(file_path, file_type)
async def process_image(old_url: str, image_path: Path, convert: bool) -> tuple[str, Path]:
new_url = await upload_image(image_path, 'png' if not convert else 'jpeg')
if new_url:
print(f'загружено изображение {image_path} в {new_url}')
else:
new_url = old_url
print(f'не удалось загрузить изображение {image_path}, оставим старую ссылку: {old_url}')
try:
image_path.unlink()
except Exception as e:
print(f'не удалось удалить файл {image_path}: {e}')
return new_url, image_path
async def optimize_and_upload(images_urls: List[str] | str, convert: bool = False) -> List[str]:
images_urls = [images_urls] if isinstance(images_urls, str) else images_urls
print(f'принятые ссылки в обработку ({len(images_urls)}): {images_urls}')
images_paths = await download_pngs(images_urls)
if not convert:
await optimize_pngs(images_paths)
new_images_urls = []
images_paths = images_paths if not convert else await convert_to_jpegs(images_paths)
tasks = []
for old_url, image_path in zip(images_urls, images_paths):
tasks.append(process_image(old_url, image_path, convert))
results = await gather(*tasks)
new_images_urls = [result[0] for result in results]
print(f'новые ссылки: ({len(new_images_urls)}): {new_images_urls}')
try:
rmtree(images_paths[0].parent)
except Exception as e:
print(f'не удалось удалить файл {images_paths[0].parent}: {e}')
if convert:
content = '\n\n'.join([f'![]({url})' for url in new_images_urls])
try:
ph = TelegraphAPI()
ph_link = ph.create_page_md('Dall-E 3 картинки', content)
return [ph_link]
except Exception as e:
print(f'не удалось создать страницу в телеграфе: {e}')
try:
from aiorentry.client import Client as RentryClient
async with RentryClient('https://rentry.org') as client:
page = await client.new_page(content)
page_id = page.url
return [f'https://rentry.co/{page_id}', f'https://rentry.org/{page_id}']
except Exception as e:
print(f'не удалось создать страницу в rentry: {e}')
return new_images_urls
app = FastAPI()
class ImageURLs(BaseModel):
urls: List[HttpUrl]
@app.get('/')
async def read_root():
return PlainTextResponse('ну пролапс, ну и что', status_code=200)
@app.post('/pngopt_by_urls/')
async def optimize_images_endpoint(image_urls: ImageURLs):
try:
optimized_urls = await optimize_and_upload([str(url) for url in image_urls.urls])
return {"optimized_urls": optimized_urls}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post('/jpegs_by_urls/')
async def optimize_images_endpoint(image_urls: ImageURLs):
try:
optimized_urls = await optimize_and_upload([str(url) for url in image_urls.urls], convert=True)
return {"optimized_urls": optimized_urls}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn_run(app, host='0.0.0.0', port=7860, timeout_keep_alive=90, log_level='info', use_colors=False)