File size: 5,933 Bytes
7484fba ce09a51 1b40fd9 ce09a51 c8babf9 28f91f7 c8babf9 16c2760 5ebd50e c8babf9 7484fba ce09a51 c8babf9 ce09a51 c8babf9 fb4c2c7 c8babf9 ce09a51 c8babf9 ce09a51 7484fba ce09a51 c8babf9 ce09a51 c8babf9 fb4c2c7 28f91f7 af8360f c8babf9 ce09a51 fb4c2c7 ce09a51 c8babf9 8a2c1f6 c8babf9 16c2760 c8babf9 121c381 fb4c2c7 121c381 fb4c2c7 c8babf9 ce09a51 28f91f7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
import os
import json
import hashlib
import time
import re
import argparse
from pathlib import Path
import markdown
from markdown.extensions import tables, fenced_code, codehilite, toc
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
BLOG_ID = os.environ["BLOG_ID"]
TOKEN_FILE = os.environ["TOKEN_FILE"]
PUBLISHED_FILE = "published_posts.json"
GH_PAGES_BASE = "https://kagvi13.github.io/HMP/"
def convert_md_links(md_text: str) -> str:
"""Конвертирует относительные ссылки (*.md) в абсолютные ссылки на GitHub Pages."""
def replacer(match):
text = match.group(1)
link = match.group(2)
if link.startswith("http://") or link.startswith("https://") or not link.endswith(".md"):
return match.group(0)
abs_link = GH_PAGES_BASE + link.replace(".md", "").lstrip("./")
return f"[{text}]({abs_link})"
return re.sub(r"\[([^\]]+)\]\(([^)]+)\)", replacer, md_text)
def load_published():
if Path(PUBLISHED_FILE).exists():
with open(PUBLISHED_FILE, "r", encoding="utf-8") as f:
return json.load(f)
print("⚠ published_posts.json не найден — начинаем с нуля.")
return {}
def save_published(data):
with open(PUBLISHED_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def file_hash(path):
return hashlib.md5(Path(path).read_bytes()).hexdigest()
def get_existing_posts(service):
"""Возвращает словарь title → post_id для постов, которые можно редактировать."""
existing = {}
nextPageToken = None
while True:
try:
response = service.posts().list(blogId=BLOG_ID, maxResults=500, pageToken=nextPageToken).execute()
for post in response.get("items", []):
post_id = post["id"]
title = post["title"]
existing[title] = post_id
nextPageToken = response.get("nextPageToken")
if not nextPageToken:
break
except HttpError as e:
print(f"❌ Ошибка при получении списка постов: {e}")
break
return existing
def main(force: bool = False):
creds = None
if os.path.exists(TOKEN_FILE):
creds = Credentials.from_authorized_user_file(TOKEN_FILE, ["https://www.googleapis.com/auth/blogger"])
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
with open(TOKEN_FILE, "w") as token_file:
token_file.write(creds.to_json())
service = build("blogger", "v3", credentials=creds)
published = load_published()
existing_posts = get_existing_posts(service)
md_files = list(Path("docs").rglob("*.md"))
for md_file in md_files:
name = md_file.stem
h = file_hash(md_file)
md_text = md_file.read_text(encoding="utf-8")
source_link = f"Источник: [ {md_file.name} ](https://github.com/kagvi13/HMP/blob/main/docs/{md_file.name})\n\n"
md_text = source_link + md_text
md_text = convert_md_links(md_text)
html_content = markdown.markdown(
md_text,
extensions=["tables", "fenced_code", "codehilite", "toc"]
)
style = """
<style>
table { display: block; max-width: 100%; overflow-x: auto; border-collapse: collapse; }
th, td { border: 1px solid #ccc; padding: 6px 12px; }
pre { display: block; max-width: 100%; overflow-x: auto; padding: 10px; background-color: #f8f8f8; border: 1px solid #ddd; border-radius: 6px; font-family: monospace; white-space: pre; }
</style>
"""
html_content = style + html_content
body = {
"kind": "blogger#post",
"title": name,
"content": html_content,
}
try:
post_id = existing_posts.get(name)
if post_id:
# Есть существующий пост, проверяем хэш
if not force and name in published and published[name]["hash"] == h:
print(f"✅ Пост '{name}' без изменений — пропускаем.")
continue
try:
post = service.posts().update(blogId=BLOG_ID, postId=post_id, body=body).execute()
print(f"♻ Обновлён пост: {post['url']}")
except HttpError as e:
if e.resp.status == 403:
post = service.posts().insert(blogId=BLOG_ID, body=body).execute()
print(f"⚠ Пост существовал, но прав нет. Создан новый: {post['url']}")
post_id = post["id"]
else:
raise e
else:
post = service.posts().insert(blogId=BLOG_ID, body=body).execute()
print(f"🆕 Пост опубликован: {post['url']}")
post_id = post["id"]
published[name] = {"id": post_id, "hash": h}
save_published(published)
print("⏱ Пауза 60 секунд перед следующим постом...")
time.sleep(60)
except HttpError as e:
print(f"❌ Ошибка при публикации {name}: {e}")
save_published(published)
break
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--force", action="store_true", help="Обновить все посты, даже без изменений")
args = parser.parse_args()
main(force=args.force)
|