File size: 16,150 Bytes
e8f910e
 
f10d12b
4597eac
f10d12b
4597eac
41eb20c
4597eac
 
5cb3728
e8f910e
41eb20c
 
4597eac
e8f910e
 
41eb20c
 
4597eac
e8f910e
0ea702f
2816de1
f10d12b
5cb3728
f10d12b
 
4597eac
bcefe57
 
4597eac
 
41eb20c
4597eac
 
 
 
 
 
 
 
e8f910e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6a952be
 
 
e8f910e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f10d12b
e85275b
4597eac
 
9aa69e0
4597eac
f10d12b
 
4597eac
 
 
 
 
 
 
 
 
 
 
e85275b
346a13c
fa0c81d
346a13c
dae6ebe
9aa69e0
346a13c
4597eac
 
 
 
 
e85275b
4597eac
 
 
 
 
 
 
 
 
dae6ebe
4597eac
 
 
 
 
 
5cb3728
e85275b
5cb3728
 
 
 
 
 
 
dae6ebe
5cb3728
 
 
 
 
e85275b
5cb3728
 
 
 
4597eac
 
e85275b
4597eac
 
 
 
51463c5
6fcad5b
 
 
 
 
 
c5b38e2
6fcad5b
 
 
 
 
 
 
 
 
4597eac
 
c5b38e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51463c5
4597eac
c5b38e2
4597eac
873da01
 
 
 
c5b38e2
 
873da01
 
 
 
 
 
 
 
 
 
 
 
 
4597eac
c5b38e2
d0efad7
c627ca5
d0efad7
 
 
c5b38e2
d0efad7
c5b38e2
d0efad7
e8f910e
 
c5b38e2
 
 
77df0ca
d0efad7
4597eac
c5b38e2
 
5cb3728
77df0ca
4597eac
c5b38e2
77df0ca
873da01
77df0ca
c5b38e2
77df0ca
873da01
 
dae6ebe
77df0ca
f10d12b
 
 
dae6ebe
77df0ca
d0efad7
 
 
e8f910e
d0efad7
6a952be
c5b38e2
77df0ca
c5b38e2
e8f910e
77df0ca
c5b38e2
4597eac
4bbd25f
873da01
 
4597eac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5cb3728
 
 
51463c5
5cb3728
 
 
 
 
4597eac
6dd44a3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
from asyncio import create_subprocess_shell, create_task, gather, run, sleep
from json import dumps
from logging import ERROR, INFO, basicConfig, getLogger
from pathlib import Path
from shutil import rmtree
from subprocess import CalledProcessError, PIPE
from typing import Any, List
from uuid import uuid4

from PIL import Image
from aiorentry.client import Client as RentryClient
from fastapi import FastAPI, HTTPException
from fastapi.responses import PlainTextResponse
from httpx import AsyncClient, HTTPStatusError, RequestError
from lxml import html
from markdown import markdown
from pydantic import BaseModel, HttpUrl
from uvicorn import run as uvicorn_run

proxy_endpoint = 'https://telegraphprxver.vercel.app'

need_logging = False

basicConfig(level=INFO if need_logging else ERROR)
logger = getLogger(__name__)

oxipng_bin = Path(__file__).parent / 'oxipng'
if not oxipng_bin.stat().st_mode & 0o111:
    oxipng_bin.chmod(0o755)

tokens = [
    # мне в общем-то все равно на эти токены
    '7e0ea3da6a73d77003c1abba7f0ea13c',
    'bc2e68b5918e5bb59ebca6c05d73daf9',
    'fecbfbe0938bcd1df27b7a9be1702cc9',
    '04e9981d4d0981964cb4c9753173244d',
    'dee75b07981c7aa211628ea7c7cbc03d',
]


class UserInfo:
    def __init__(self, token: str, short_name: str, author_name: str, author_url: str, auth_link: str):
        self.token = token
        self.short_name = short_name
        self.author_name = author_name
        self.author_url = author_url
        self.auth_link = auth_link

    def get_token(self) -> str:
        return self.token

    def get_short_name(self) -> str:
        return self.short_name

    def get_author_name(self) -> str:
        return self.author_name

    def get_author_url(self) -> str:
        return self.author_url

    def get_auth_link(self) -> str:
        return self.auth_link

    def __repr__(self):
        return f'UserAuthInfo(token={self.token}, short_name={self.short_name}, author_name={self.author_name}, author_url={self.author_url}, auth_link={self.auth_link})'


async def create_account(short_name: str, author_name: str, author_url: str) -> UserInfo:
    params = {
        'short_name': short_name,
        'author_name': author_name,
        'author_url': author_url
    }
    async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client:
        response = await client.get(f'{proxy_endpoint}/createAccount', params=params)
        json_response: dict = response.json()
        if json_response.get('ok'):
            result = json_response.get('result', {})
            return UserInfo(result['access_token'], result['short_name'], result['author_name'], result['author_url'], result['auth_url'])


tgph_acc = run(create_account('prolapse', 'prolapse', ''))


def md_to_dom(markdown_text: str) -> list[dict[str, str | list | dict | None]]:
    def handle_heading(element) -> dict[str, str | list | dict | None]:
        if element.tag == 'h1':
            return {'tag': 'h3', 'children': parse_children(element)}
        elif element.tag == 'h2':
            return {'tag': 'h4', 'children': parse_children(element)}
        else:
            return {'tag': 'p', 'children': [{'tag': 'strong', 'children': parse_children(element)}]}

    def handle_list(element) -> dict[str, str | list | dict | None]:
        return {'tag': element.tag, 'children': [{'tag': 'li', 'children': parse_children(li)} for li in element.xpath('./li')]}

    def handle_link(element) -> dict[str, str | list | dict | None]:
        return {'tag': 'a', 'attrs': {'href': element.get('href')}, 'children': parse_children(element)}

    def handle_media(element) -> dict[str, str | list | dict | None]:
        return {'tag': element.tag, 'attrs': {'src': element.get('src')}}

    def parse_children(element) -> list[str | dict[str, str | list | dict | None]]:
        return [parse_element(child) if child.tag else child.strip() for child in element.iterchildren() if child.tag or (isinstance(child, str) and child.strip())]

    def parse_element(element) -> dict[str, str | list | dict | None]:
        handlers = {'h1': handle_heading, 'h2': handle_heading, 'h3': handle_heading, 'h4': handle_heading, 'h5': handle_heading, 'h6': handle_heading, 'ul': handle_list, 'ol': handle_list, 'a': handle_link, 'img': handle_media, 'iframe': handle_media}
        handler = handlers.get(element.tag, lambda e: {'tag': e.tag, 'children': parse_children(e)})
        return handler(element)

    html_content = markdown(markdown_text, extensions=['extra', 'sane_lists'])
    tree = html.fromstring(html_content)
    return [parse_element(element) for element in tree.body]


async def tgph_create_page(token: str, title: str, markdown_text: str) -> str:
    params = {
        'access_token': token,
        'title': title,
        'content': dumps(md_to_dom(markdown_text)),
        'return_content': False
    }
    async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client:
        response = await client.get(f'{proxy_endpoint}/createPage', params=params)
        json_response: dict = response.json()
        if json_response.get('ok'):
            result = json_response.get('result', {})
        else:
            result = {}
            print(f'ошибка создания страницы: {json_response}')
        return result.get('path')


async def tgph_edit_page(token: str, page: str, title: str, markdown_text: str) -> str:
    params = {
        'access_token': token,
        'path': page,
        'title': title,
        'content': dumps(md_to_dom(markdown_text)),
        'return_content': False
    }
    async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client:
        response = await client.get(f'{proxy_endpoint}/editPage', params=params)
        json_response = response.json()
        if json_response.get('ok'):
            result = json_response.get('result', {})
        return result.get('path')


async def download_png(url: str, folder: str, client: AsyncClient, retries: int = 5) -> Path:
    # print(f'загрузка изображения: {url}')
    for attempt in range(retries):
        try:
            response = await client.get(url)
            response.raise_for_status()
            file_path = Path(__file__).parent / folder / f'{uuid4()}.png'
            file_path.parent.mkdir(parents=True, exist_ok=True)
            file_path.write_bytes(response.content)
            return file_path
        except (HTTPStatusError, RequestError) as e:
            if attempt < retries - 1:
                await sleep(2 ** attempt)
            else:
                raise e


async def download_pngs(urls: str | list[str]) -> list[Any]:
    urls = [urls] if isinstance(urls, str) else urls
    # print(f'скачивается список список из {len(urls)}: {urls}')
    # бот coze имеет баг, и из воркфлоу прибавляет предыдущий ответ к ссылкам, если включен контекст чата:
    valid_urls = [url for url in urls if url and '\n' not in url and '\\n' not in url and url.strip() != '']
    if len(valid_urls) != len(urls):
        print(f'некорректные ссылки удалены из списка: {set(urls) - set(valid_urls)}')
    async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client:
        tasks = [download_png(url, str(uuid4()), client) for url in valid_urls]
        return list(await gather(*tasks))


async def optimize_png(image_path: Path, retries: int = 3) -> None:
    command = f'{oxipng_bin.resolve()} --opt 2 --strip safe --out {image_path} {image_path}'
    # print(f'оптимизация картинки {image_path}')
    for attempt in range(retries):
        try:
            process = await create_subprocess_shell(command, stdout=PIPE, stderr=PIPE)
            stdout, stderr = await process.communicate()
            if process.returncode == 0:
                return
            else:
                raise CalledProcessError(process.returncode, command, output=stdout, stderr=stderr)
        except CalledProcessError as e:
            print(f'ошибка при оптимизации {image_path}')
            if attempt < retries - 1:
                await sleep(2 ** attempt)
            else:
                raise e


async def convert_to_jpeg(image_path: Path) -> Path:
    # print(f'конвертируется {image_path}')
    try:
        image = Image.open(image_path)
        output_path = image_path.with_suffix('.jpg')
        image.save(output_path, 'JPEG', quality=98, optimize=True)
        image_path.unlink(missing_ok=True)
        return output_path
    except:
        print(f'ошибка при конвертации {image_path}')
        return image_path


async def convert_to_jpegs(image_paths: list[str | Path] | str | Path) -> tuple[Path]:
    image_paths = [Path(image_file) for image_file in ([image_paths] if not isinstance(image_paths, list) else image_paths)]
    # print(f'оптимизируется список список из {len(image_paths)}: {image_paths}')
    tasks = [convert_to_jpeg(image_path) for image_path in image_paths]
    return await gather(*tasks)


async def optimize_pngs(image_paths: list[str | Path] | str | Path) -> None:
    image_paths = [Path(image_file) for image_file in ([image_paths] if not isinstance(image_paths, list) else image_paths)]
    # print(f'оптимизируется список список из {len(image_paths)}: {image_paths}')
    tasks = [optimize_png(image_path) for image_path in image_paths]
    await gather(*tasks)


async def upload_image_to_imgbb(file_path: Path, file_type: str = 'png') -> str | None:
    for token in tokens:
        url = f'https://api.imgbb.com/1/upload?key={token}'
        try:
            with file_path.open('rb') as file:
                files = {'image': (file_path.name, file, f'image/{file_type}')}
                data = {}
                async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client:
                    response = await client.post(url, files=files, data=data, timeout=30)
                    response.raise_for_status()
                    json = response.json()
                    if json.get('success'):
                        return json['data']['url']
        except Exception as e:
            print(f"ошибка при загрузке с {token}: {e}")
            continue
    return None


async def upload_image_to_freeimagehost(image_path: Path, file_type: str = 'png') -> str | None:
    try:
        async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client:
            with image_path.open("rb") as image_file:
                files = {'source': (image_path.name, image_file, f'image/{file_type}')}
                payload = {'key': '6d207e02198a847aa98d0a2a901485a5', 'action': 'upload', 'format': 'json'}
                response = await client.post('https://freeimage.host/api/1/upload', data=payload, files=files)
            response.raise_for_status()
            response_data = response.json()
            return response_data['image']['url']
    except Exception as e:
        print(f'ошибка при загрузке {image_path}: {e}')
        return None


async def upload_image(file_path: Path | str, file_type: str = 'png') -> str | None:
    file_path = Path(file_path)
    return await upload_image_to_freeimagehost(file_path, file_type) or upload_image_to_imgbb(file_path, file_type)


async def process_image(old_url: str, image_path: Path, convert: bool) -> tuple[str, Path]:
    new_url = await upload_image(image_path, 'png' if not convert else 'jpeg')
    if new_url:
        # print(f'загружено изображение {image_path} в {new_url}')
        pass
    else:
        new_url = old_url
        print(f'не удалось загрузить изображение {image_path}, оставим старую ссылку: {old_url}')

    try:
        image_path.unlink()
    except Exception as e:
        print(f'не удалось удалить файл {image_path}: {e}')

    return new_url, image_path


async def optimize_and_upload(images_urls: List[str] | str, convert: bool = False) -> List[str]:
    images_urls = [images_urls] if isinstance(images_urls, str) else images_urls
    if convert:
        try:
            ph_link = await tgph_create_page(tgph_acc.get_token(), 'DAll-E v3', '*изображения скоро появятся, обнови страницу...*')
        except Exception as e:
            print(f'не получилось создать страницу на телеграфе: {e}')
            ph_link = None
        async with RentryClient('https://rentry.org') as client:
            page = await client.new_page('изображения скоро появятся, обнови страницу...')
            page_id, code = page.url, page.edit_code
            continue_task = create_task(continue_optimizing_and_uploading(images_urls, page_id, code, ph_link))
            print(f'https://telegra.ph/{ph_link}', f'https://rentry.co/{page_id}', f'https://rentry.org/{page_id}')
            return [f'https://telegra.ph/{ph_link}', f'https://rentry.co/{page_id}', f'https://rentry.org/{page_id}'] if ph_link else [f'https://rentry.co/{page_id}', f'https://rentry.org/{page_id}']
    else:
        return await continue_optimizing_and_uploading(images_urls)


async def continue_optimizing_and_uploading(images_urls: list[str], page_id: str = None, code: str = None, ph_link: str = None) -> list[str]:
    images_paths = await download_pngs(images_urls)

    if not page_id:  # convert=False
        await optimize_pngs(images_paths)

    new_images_urls = []
    images_paths = images_paths if not page_id else await convert_to_jpegs(images_paths)

    tasks = []
    for old_url, image_path in zip(images_urls, images_paths):
        tasks.append(process_image(old_url, image_path, page_id is not None))

    results = await gather(*tasks)
    new_images_urls = [result[0] for result in results]
    print(f'новые ссылки: ({len(new_images_urls)}): {new_images_urls}')

    try:
        rmtree(images_paths[0].parent)
    except Exception as e:
        print(f'не удалось удалить файл {images_paths[0].parent}: {e}')

    content = '\n\n'.join([f'![]({url})' for url in new_images_urls])
    if ph_link:
        try:
            await tgph_edit_page(tgph_acc.get_token(), ph_link, 'DAll-E v3', content)
        except Exception as e:
            print(f'не удалось отредактировать на телеграфе: {e}')
    if page_id and code:
        try:
            async with RentryClient('https://rentry.org') as client:
                await client.edit_page(text=content, url=page_id, edit_code=code)
        except Exception as e:
            print(f'не удалось создать страницу в rentry: {e}')

    return new_images_urls if len(new_images_urls) > 0 and new_images_urls[0] else images_urls


app = FastAPI()


class ImageURLs(BaseModel):
    urls: List[HttpUrl]


@app.get('/')
async def read_root():
    return PlainTextResponse('ну пролапс, ну и что', status_code=200)


@app.post('/pngopt_by_urls/')
async def optimize_images_endpoint(image_urls: ImageURLs):
    try:
        optimized_urls = await optimize_and_upload([str(url) for url in image_urls.urls])
        return {"optimized_urls": optimized_urls}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.post('/jpegs_by_urls/')
async def optimize_images_endpoint(image_urls: ImageURLs):
    try:
        optimized_urls = await optimize_and_upload([str(url) for url in image_urls.urls], convert=True)
        return {"optimized_urls": optimized_urls}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


if __name__ == "__main__":
    uvicorn_run(app, host='0.0.0.0', port=7860, timeout_keep_alive=90, log_level='info', use_colors=False)