File size: 11,238 Bytes
c5b38e2 f10d12b 4597eac f10d12b 4597eac 41eb20c 4597eac 5cb3728 41eb20c 4597eac 41eb20c d0efad7 c5b38e2 41eb20c 4597eac 2816de1 f10d12b 5cb3728 f10d12b 4597eac bcefe57 4597eac 41eb20c 4597eac f10d12b e85275b 4597eac 9aa69e0 4597eac f10d12b 4597eac e85275b 346a13c fa0c81d 346a13c dae6ebe 9aa69e0 346a13c 4597eac e85275b 4597eac dae6ebe 4597eac 5cb3728 e85275b 5cb3728 dae6ebe 5cb3728 e85275b 5cb3728 4597eac e85275b 4597eac 51463c5 6fcad5b c5b38e2 6fcad5b 4597eac c5b38e2 51463c5 4597eac c5b38e2 4597eac 873da01 c5b38e2 873da01 4597eac c5b38e2 d0efad7 c5b38e2 d0efad7 c5b38e2 d0efad7 4bbd25f d0efad7 c5b38e2 77df0ca d0efad7 4597eac c5b38e2 5cb3728 77df0ca 4597eac c5b38e2 77df0ca 873da01 77df0ca c5b38e2 77df0ca 873da01 dae6ebe 77df0ca f10d12b dae6ebe 77df0ca d0efad7 c5b38e2 77df0ca c5b38e2 77df0ca c5b38e2 4597eac 4bbd25f 873da01 4597eac 5cb3728 51463c5 5cb3728 4597eac 6dd44a3 f34f487 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
from asyncio import create_subprocess_shell, create_task, gather, sleep
from logging import ERROR, INFO, basicConfig, getLogger
from pathlib import Path
from shutil import rmtree
from subprocess import CalledProcessError, PIPE
from typing import Any, List
from uuid import uuid4
from PIL import Image
from fastapi import FastAPI, HTTPException
from fastapi.responses import PlainTextResponse
from httpx import AsyncClient, HTTPStatusError, RequestError
from pydantic import BaseModel, HttpUrl
from ytelegraph import TelegraphAPI
from aiorentry.client import Client as RentryClient
from uvicorn import run as uvicorn_run
need_logging = False
basicConfig(level=INFO if need_logging else ERROR)
logger = getLogger(__name__)
oxipng_bin = Path(__file__).parent / 'oxipng'
if not oxipng_bin.stat().st_mode & 0o111:
oxipng_bin.chmod(0o755)
tokens = [
# мне в общем-то все равно на эти токены
'7e0ea3da6a73d77003c1abba7f0ea13c',
'bc2e68b5918e5bb59ebca6c05d73daf9',
'fecbfbe0938bcd1df27b7a9be1702cc9',
'04e9981d4d0981964cb4c9753173244d',
'dee75b07981c7aa211628ea7c7cbc03d',
]
async def download_png(url: str, folder: str, client: AsyncClient, retries: int = 5) -> Path:
# print(f'загрузка изображения: {url}')
for attempt in range(retries):
try:
response = await client.get(url)
response.raise_for_status()
file_path = Path(__file__).parent / folder / f'{uuid4()}.png'
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_bytes(response.content)
return file_path
except (HTTPStatusError, RequestError) as e:
if attempt < retries - 1:
await sleep(2 ** attempt)
else:
raise e
async def download_pngs(urls: str | list[str]) -> list[Any]:
urls = [urls] if isinstance(urls, str) else urls
# print(f'скачивается список список из {len(urls)}: {urls}')
# бот coze имеет баг, и из воркфлоу прибавляет предыдущий ответ к ссылкам, если включен контекст чата:
valid_urls = [url for url in urls if url and '\n' not in url and '\\n' not in url and url.strip() != '']
if len(valid_urls) != len(urls):
print(f'некорректные ссылки удалены из списка: {set(urls) - set(valid_urls)}')
async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client:
tasks = [download_png(url, str(uuid4()), client) for url in valid_urls]
return list(await gather(*tasks))
async def optimize_png(image_path: Path, retries: int = 3) -> None:
command = f'{oxipng_bin.resolve()} --opt 2 --strip safe --out {image_path} {image_path}'
# print(f'оптимизация картинки {image_path}')
for attempt in range(retries):
try:
process = await create_subprocess_shell(command, stdout=PIPE, stderr=PIPE)
stdout, stderr = await process.communicate()
if process.returncode == 0:
return
else:
raise CalledProcessError(process.returncode, command, output=stdout, stderr=stderr)
except CalledProcessError as e:
print(f'ошибка при оптимизации {image_path}')
if attempt < retries - 1:
await sleep(2 ** attempt)
else:
raise e
async def convert_to_jpeg(image_path: Path) -> Path:
# print(f'конвертируется {image_path}')
try:
image = Image.open(image_path)
output_path = image_path.with_suffix('.jpg')
image.save(output_path, 'JPEG', quality=98, optimize=True)
image_path.unlink(missing_ok=True)
return output_path
except:
print(f'ошибка при конвертации {image_path}')
return image_path
async def convert_to_jpegs(image_paths: list[str | Path] | str | Path) -> tuple[Path]:
image_paths = [Path(image_file) for image_file in ([image_paths] if not isinstance(image_paths, list) else image_paths)]
# print(f'оптимизируется список список из {len(image_paths)}: {image_paths}')
tasks = [convert_to_jpeg(image_path) for image_path in image_paths]
return await gather(*tasks)
async def optimize_pngs(image_paths: list[str | Path] | str | Path) -> None:
image_paths = [Path(image_file) for image_file in ([image_paths] if not isinstance(image_paths, list) else image_paths)]
# print(f'оптимизируется список список из {len(image_paths)}: {image_paths}')
tasks = [optimize_png(image_path) for image_path in image_paths]
await gather(*tasks)
async def upload_image_to_imgbb(file_path: Path, file_type: str = 'png') -> str | None:
for token in tokens:
url = f'https://api.imgbb.com/1/upload?key={token}'
try:
with file_path.open('rb') as file:
files = {'image': (file_path.name, file, f'image/{file_type}')}
data = {}
async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client:
response = await client.post(url, files=files, data=data, timeout=30)
response.raise_for_status()
json = response.json()
if json.get('success'):
return json['data']['url']
except Exception as e:
print(f"ошибка при загрузке с {token}: {e}")
continue
return None
async def upload_image_to_freeimagehost(image_path: Path, file_type: str = 'png') -> str | None:
try:
async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client:
with image_path.open("rb") as image_file:
files = {'source': (image_path.name, image_file, f'image/{file_type}')}
payload = {'key': '6d207e02198a847aa98d0a2a901485a5', 'action': 'upload', 'format': 'json'}
response = await client.post('https://freeimage.host/api/1/upload', data=payload, files=files)
response.raise_for_status()
response_data = response.json()
return response_data['image']['url']
except Exception as e:
print(f'ошибка при загрузке {image_path}: {e}')
return None
async def upload_image(file_path: Path | str, file_type: str = 'png') -> str | None:
file_path = Path(file_path)
return await upload_image_to_freeimagehost(file_path, file_type) or upload_image_to_imgbb(file_path, file_type)
async def process_image(old_url: str, image_path: Path, convert: bool) -> tuple[str, Path]:
new_url = await upload_image(image_path, 'png' if not convert else 'jpeg')
if new_url:
# print(f'загружено изображение {image_path} в {new_url}')
pass
else:
new_url = old_url
print(f'не удалось загрузить изображение {image_path}, оставим старую ссылку: {old_url}')
try:
image_path.unlink()
except Exception as e:
print(f'не удалось удалить файл {image_path}: {e}')
return new_url, image_path
async def optimize_and_upload(images_urls: List[str] | str, convert: bool = False) -> List[str]:
images_urls = [images_urls] if isinstance(images_urls, str) else images_urls
if convert:
try:
ph = TelegraphAPI()
ph_link = ph.create_page_md('DAll-E v3', 'изображения скоро появятся, обнови страницу...')
except Exception as e:
print(f'не получилось создать страницу на телеграфе: {e}')
ph_link = None
async with RentryClient('https://rentry.org') as client:
page = await client.new_page('изображения скоро появятся, обнови страницу...')
page_id, code = page.url, page.edit_code
continue_task = create_task(continue_optimizing_and_uploading(images_urls, page_id, code, ph_link))
print(f'https://rentry.co/{page_id}', f'https://rentry.org/{page_id}')
return [ph_link, f'https://rentry.co/{page_id}', f'https://rentry.org/{page_id}']
else:
return await continue_optimizing_and_uploading(images_urls)
async def continue_optimizing_and_uploading(images_urls: list[str], page_id: str = None, code: str = None, ph_link: str = None) -> list[str]:
images_paths = await download_pngs(images_urls)
if not page_id: # convert=False
await optimize_pngs(images_paths)
new_images_urls = []
images_paths = images_paths if not page_id else await convert_to_jpegs(images_paths)
tasks = []
for old_url, image_path in zip(images_urls, images_paths):
tasks.append(process_image(old_url, image_path, page_id is not None))
results = await gather(*tasks)
new_images_urls = [result[0] for result in results]
print(f'новые ссылки: ({len(new_images_urls)}): {new_images_urls}')
try:
rmtree(images_paths[0].parent)
except Exception as e:
print(f'не удалось удалить файл {images_paths[0].parent}: {e}')
content = '\n\n'.join([f'![]({url})' for url in new_images_urls])
if ph_link:
try:
ph = TelegraphAPI()
ph.edit_page_md(ph_link, content)
except Exception as e:
print(f'не удалось создать страницу на телеграфе: {e}')
if page_id and code:
try:
async with RentryClient('https://rentry.org') as client:
await client.edit_page(
text=content,
url=page_id,
edit_code=code,
)
except Exception as e:
print(f'не удалось создать страницу в rentry: {e}')
return new_images_urls if len(new_images_urls) > 0 and new_images_urls[0] else images_urls
app = FastAPI()
class ImageURLs(BaseModel):
urls: List[HttpUrl]
@app.get('/')
async def read_root():
return PlainTextResponse('ну пролапс, ну и что', status_code=200)
@app.post('/pngopt_by_urls/')
async def optimize_images_endpoint(image_urls: ImageURLs):
try:
optimized_urls = await optimize_and_upload([str(url) for url in image_urls.urls])
return {"optimized_urls": optimized_urls}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post('/jpegs_by_urls/')
async def optimize_images_endpoint(image_urls: ImageURLs):
try:
optimized_urls = await optimize_and_upload([str(url) for url in image_urls.urls], convert=True)
return {"optimized_urls": optimized_urls}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn_run(app, host='0.0.0.0', port=7860, timeout_keep_alive=90, log_level='info', use_colors=False)
|