cdarticle / app.py
AllanHill's picture
Update app.py
95c4744 verified
import gradio as gr
import requests
import re
import json
from datetime import datetime
import os
from dotenv import load_dotenv
import markdown
from PIL import Image
from io import BytesIO
import time
# ========== 加载环境变量 ==========
load_dotenv(dotenv_path='rank_cd.env')
# ========== 初始化全局状态 ==========
parsed_data = None
uploaded_image = None
uploaded_image_path = None # 新增:存储上传图片的路径
image_description = ""
wp_config = {
'url': os.getenv('WORDPRESS_URL', 'https://cdgarment.com'),
'username': os.getenv('WORDPRESS_USERNAME', ''),
'password': os.getenv('WORDPRESS_APP_PASSWORD', ''),
'status': os.getenv('DEFAULT_STATUS', 'draft')
}
wp_config_locked = False
post_status = "draft"
parse_method = "auto"
# ========== 增强解析器类 ==========
class GeminiContentParser:
def __init__(self):
self.parsers = [
self.parse_json_format,
self.parse_markdown_table_format,
self.parse_simple_format
]
def parse(self, content: str):
"""尝试多种解析策略"""
for parser in self.parsers:
result = parser(content)
if result and (result.get('article_content') or result.get('content')):
return result
return None
def parse_json_format(self, content: str):
"""解析JSON格式"""
json_patterns = [
r'```json\s*(.*?)\s*```',
r'{\s*"post_id".*?}',
r'.*# Machine-Readable Data.*?({.*})',
r'.*Machine-Readable Data.*?JSON.*?({.*})',
]
for pattern in json_patterns:
match = re.search(pattern, content, re.DOTALL | re.IGNORECASE)
if match:
try:
json_str = match.group(1)
except IndexError:
json_str = match.group(0)
try:
data = json.loads(json_str)
seo_toolkit = data.get('seo_toolkit', {})
article_content = data.get('article_content', '')
if not article_content:
before_json = content[:match.start()].strip()
after_json = content[match.end():].strip()
article_content = after_json if len(after_json) > len(before_json) else before_json
lines = article_content.split('\n')
article_title = lines[0].strip('# ').strip() if lines else ""
result = {
'seo_title': seo_toolkit.get('seo_title', article_title),
'primary_keyword': seo_toolkit.get('primary_keyword', ''),
'secondary_keywords': seo_toolkit.get('secondary_keywords', []),
'meta_description': seo_toolkit.get('meta_description', seo_toolkit.get('description', '')),
'tags': seo_toolkit.get('tags', []),
'article_title': article_title,
'content': article_content,
'post_id': data.get('post_id', ''),
'character_count': seo_toolkit.get('character_count', 0),
'parse_method': 'json'
}
if result['seo_title']:
slug = result['seo_title'].lower()
slug = re.sub(r'[^\w\s-]', '', slug)
slug = re.sub(r'[-\s]+', '-', slug)
result['url_slug'] = slug[:100]
return result
except json.JSONDecodeError:
continue
return None
def parse_markdown_table_format(self, content):
"""解析Markdown表格格式"""
data = {
'seo_title': '',
'primary_keyword': '',
'meta_description': '',
'tags': [],
'article_title': '',
'content': '',
'url_slug': '',
'parse_method': 'markdown_table'
}
try:
seo_title_match = re.search(r'SEO Title:\s*(.+?)(?=\n)', content)
if seo_title_match:
data['seo_title'] = seo_title_match.group(1).strip()
keyword_match = re.search(r'Primary Keyword:\s*(.+?)(?=\n)', content)
if keyword_match:
data['primary_keyword'] = keyword_match.group(1).strip()
meta_match = re.search(r'Meta Description:\s*(.+?)(?=\n)', content)
if meta_match:
data['meta_description'] = meta_match.group(1).strip()
tags_match = re.search(r'Tags:\s*(.+?)(?=\n|📝|\$)', content)
if tags_match:
tags_str = tags_match.group(1).strip()
data['tags'] = [tag.strip() for tag in re.split(r'[,;]', tags_str) if tag.strip()]
article_match = re.search(r'(?:📝|▶|●|◆).*?(?:Article|Content)[:-]?\s*(.+)', content, re.DOTALL)
if article_match:
full_content = article_match.group(1).strip()
lines = full_content.split('\n')
if lines:
data['article_title'] = lines[0].strip()
data['content'] = '\n'.join(lines[1:])
if data['seo_title']:
slug = data['seo_title'].lower()
slug = re.sub(r'[^\w\s-]', '', slug)
slug = re.sub(r'[-\s]+', '-', slug)
data['url_slug'] = slug[:100]
if not data['seo_title'] and data['article_title']:
data['seo_title'] = data['article_title']
return data
except Exception:
return None
def parse_simple_format(self, content):
"""解析简单冒号分隔格式"""
data = {
'seo_title': '',
'primary_keyword': '',
'meta_description': '',
'tags': [],
'article_title': '',
'content': '',
'url_slug': '',
'parse_method': 'simple'
}
try:
seo_title_match = re.search(r'SEO Title[:\s]+(.+)', content, re.IGNORECASE)
if not seo_title_match:
seo_title_match = re.search(r'Title[:\s]+(.+)', content, re.IGNORECASE)
if seo_title_match:
data['seo_title'] = seo_title_match.group(1).strip()
keyword_match = re.search(r'Primary Keyword[:\s]+(.+)', content, re.IGNORECASE)
if not keyword_match:
keyword_match = re.search(r'Keyword[:\s]+(.+)', content, re.IGNORECASE)
if keyword_match:
data['primary_keyword'] = keyword_match.group(1).strip()
meta_match = re.search(r'Meta Description[:\s]+(.+)', content, re.IGNORECASE)
if not meta_match:
meta_match = re.search(r'Description[:\s]+(.+)', content, re.IGNORECASE)
if meta_match:
data['meta_description'] = meta_match.group(1).strip()
tags_match = re.search(r'Tags[:\s]+(.+)', content, re.IGNORECASE)
if tags_match:
tags_str = tags_match.group(1).strip()
data['tags'] = [tag.strip() for tag in re.split(r'[,;]', tags_str) if tag.strip()]
article_match = re.search(r'(?:Article|Content)[:\s]+(.+)', content, re.DOTALL | re.IGNORECASE)
if not article_match:
metadata_end = content.find('\n\n')
if metadata_end != -1:
article_content = content[metadata_end:].strip()
lines = article_content.split('\n')
if lines:
data['article_title'] = lines[0].strip()
data['content'] = '\n'.join(lines[1:])
else:
full_content = article_match.group(1).strip()
lines = full_content.split('\n')
if lines:
data['article_title'] = lines[0].strip()
data['content'] = '\n'.join(lines[1:])
if data['seo_title']:
slug = data['seo_title'].lower()
slug = re.sub(r'[^\w\s-]', '', slug)
slug = re.sub(r'[-\s]+', '-', slug)
data['url_slug'] = slug[:100]
if not data['seo_title'] and data['article_title']:
data['seo_title'] = data['article_title']
return data
except Exception:
return None
# ========== 功能函数 ==========
def parse_gemini_content(content):
global parsed_data, parse_method
if not content or not content.strip():
return None, "❌ 内容为空"
parser = GeminiContentParser()
result = parser.parse(content)
if result:
parse_method = result.get('parse_method', 'unknown')
parsed_data = result
method_msg = {
'json': '✅ 使用 JSON 格式解析',
'markdown_table': 'ℹ️ 使用 Markdown 表格格式解析',
'simple': 'ℹ️ 使用简单格式解析'
}.get(parse_method, f'ℹ️ 使用 {parse_method} 格式解析')
return result, method_msg
return None, "❌ 无法用任何已知格式解析内容"
def get_or_create_tag(tag_name, wp_config):
try:
auth = (wp_config['username'], wp_config['password'])
response = requests.get(
f"{wp_config['url']}/wp-json/wp/v2/tags",
auth=auth,
params={'search': tag_name, 'per_page': 10}
)
if response.status_code == 200:
tags = response.json()
for tag in tags:
if tag['name'].lower() == tag_name.lower():
return tag['id']
create_response = requests.post(
f"{wp_config['url']}/wp-json/wp/v2/tags",
auth=auth,
json={'name': tag_name}
)
if create_response.status_code == 201:
return create_response.json()['id']
return None
except Exception as e:
return f"❌ 处理标签 '{tag_name}' 时出错: {str(e)}"
def upload_image_to_wordpress(image_file, wp_config, filename_slug):
"""上传图片到WordPress并返回媒体ID和图片URL"""
try:
# 如果是文件路径,打开文件
if isinstance(image_file, str):
with open(image_file, 'rb') as f:
image_data = f.read()
img = Image.open(image_file)
else:
image_data = image_file.read()
img = Image.open(BytesIO(image_data))
if img.mode == 'RGBA':
background = Image.new('RGB', img.size, (255, 255, 255))
if 'A' in img.getbands():
background.paste(img, mask=img.split()[-1])
else:
background.paste(img)
img = background
elif img.mode != 'RGB':
img = img.convert('RGB')
# 保持原始大小,不进行缩放
# 只检查是否太大需要压缩(超过5MB)
if len(image_data) > 5 * 1024 * 1024:
# 如果图片太大,适当压缩质量
max_size = 2000
if max(img.size) > max_size:
ratio = max_size / max(img.size)
new_size = tuple([int(dim * ratio) for dim in img.size])
img = img.resize(new_size, Image.Resampling.LANCZOS)
buffer = BytesIO()
img.save(buffer, format='JPEG', quality=85, optimize=True)
image_data = buffer.getvalue()
filename = f"{filename_slug}.jpg"
files = {
'file': (filename, image_data, 'image/jpeg')
}
auth = (wp_config['username'], wp_config['password'])
max_retries = 3
retry_delay = 2
for attempt in range(max_retries):
try:
response = requests.post(
f"{wp_config['url']}/wp-json/wp/v2/media",
auth=auth,
files=files,
timeout=60,
verify=False
)
if response.status_code == 201:
media_data = response.json()
try:
update_response = requests.post(
f"{wp_config['url']}/wp-json/wp/v2/media/{media_data['id']}",
auth=auth,
json={
'alt_text': image_description,
'caption': filename_slug.replace('-', ' ').title(),
'description': f"Featured image for: {filename_slug.replace('-', ' ').title()}"
},
timeout=10
)
except Exception:
pass
# 获取图片URL
media_url = media_data.get('source_url', '')
return media_data['id'], media_url, f"✅ 图片上传成功!(ID: {media_data['id']})"
elif response.status_code == 413:
return None, None, "❌ 图片太大,请压缩后重试"
elif response.status_code == 401:
return None, None, "❌ 认证失败,请检查 WordPress 凭据"
else:
if attempt < max_retries - 1:
time.sleep(retry_delay * (attempt + 1))
continue
else:
return None, None, f"❌ 图片上传失败,状态码: {response.status_code}"
except requests.exceptions.Timeout:
if attempt < max_retries - 1:
time.sleep(retry_delay * (attempt + 1))
continue
else:
return None, None, "❌ 图片上传超时"
except requests.exceptions.ConnectionError:
if attempt < max_retries - 1:
time.sleep(retry_delay * (attempt + 1))
continue
else:
return None, None, "❌ 连接失败,请检查网络和服务器"
except Exception as e:
return None, None, f"❌ 上传时出错: {str(e)}"
return None, None, "❌ 图片上传失败"
except Exception as e:
return None, None, f"❌ 处理图片时出错: {str(e)}"
def create_wordpress_post(parsed_data, wp_config, media_id=None, media_url=None):
"""创建WordPress文章,如果上传了图片,将图片以原始大小添加到文章底部"""
try:
rank_math_meta = {
'rank_math_title': parsed_data.get('seo_title', ''),
'rank_math_description': parsed_data.get('meta_description', ''),
'rank_math_robots': ['index'] if post_status == 'publish' else ['noindex'],
'rank_math_news_sitemap_robots': 'index',
'rank_math_facebook_title': parsed_data.get('seo_title', ''),
'rank_math_facebook_description': parsed_data.get('meta_description', ''),
'rank_math_twitter_title': parsed_data.get('seo_title', ''),
'rank_math_twitter_description': parsed_data.get('meta_description', ''),
'rank_math_canonical_url': '',
}
if parsed_data.get('primary_keyword'):
rank_math_meta['rank_math_focus_keyword'] = parsed_data['primary_keyword']
# 转换Markdown为HTML
html_content = markdown.markdown(parsed_data.get('content', ''))
# 如果上传了图片且有媒体URL,将图片添加到文章内容底部
if media_url:
# 获取原始图片尺寸信息(如果可用)
size_info = ""
try:
if uploaded_image_path:
with Image.open(uploaded_image_path) as img:
width, height = img.size
size_info = f"尺寸: {width}×{height}px"
except:
pass
# 创建图片HTML,使用原始大小(不设置width/height,让WordPress决定)
img_html = f'''
<div class="uploaded-image-container" style="margin-top: 2rem; padding-top: 2rem; border-top: 1px solid #eee;">
<h3 style="margin-bottom: 1rem;">图片附件</h3>
<figure style="margin: 0;">
<img src="{media_url}" alt="{image_description}"
style="max-width: 100%; height: auto; display: block; margin: 0 auto;">
<figcaption style="text-align: center; font-style: italic; color: #666; margin-top: 0.5rem;">
{image_description if image_description else "文章配图"} {size_info}
</figcaption>
</figure>
</div>
'''
# 将图片HTML添加到内容底部
html_content += img_html
# 基础文章数据
post_data = {
'title': parsed_data.get('seo_title', 'Untitled'),
'content': html_content,
'slug': parsed_data.get('url_slug', ''),
'status': post_status,
'meta': rank_math_meta
}
# 添加标签
if parsed_data.get('tags'):
tag_ids = []
for tag_name in parsed_data['tags']:
tag_id = get_or_create_tag(tag_name, wp_config)
if tag_id and isinstance(tag_id, int):
tag_ids.append(tag_id)
if tag_ids:
post_data['tags'] = tag_ids
# 默认分类
post_data['categories'] = [1]
# 设置特色图片
if media_id:
post_data['featured_media'] = media_id
# 设置社交图片
try:
media_response = requests.get(
f"{wp_config['url']}/wp-json/wp/v2/media/{media_id}",
auth=(wp_config['username'], wp_config['password'])
)
if media_response.status_code == 200:
media_url_from_api = media_response.json().get('source_url', media_url)
rank_math_meta['rank_math_facebook_image'] = media_url_from_api
rank_math_meta['rank_math_twitter_image'] = media_url_from_api
except:
if media_url:
rank_math_meta['rank_math_facebook_image'] = media_url
rank_math_meta['rank_math_twitter_image'] = media_url
# 发送到WordPress
response = requests.post(
f"{wp_config['url']}/wp-json/wp/v2/posts",
auth=(wp_config['username'], wp_config['password']),
json=post_data,
headers={'Content-Type': 'application/json'}
)
if response.status_code == 201:
post_result = response.json()
# 更新规范URL
update_data = {
'meta': {
'rank_math_canonical_url': post_result['link']
}
}
update_response = requests.post(
f"{wp_config['url']}/wp-json/wp/v2/posts/{post_result['id']}",
auth=(wp_config['username'], wp_config['password']),
json=update_data
)
return post_result, None
else:
return None, f"WordPress API 错误: {response.text}"
except Exception as e:
return None, f"创建文章时出错: {str(e)}"
# ========== Gradio UI 回调函数 ==========
def parse_content_callback(content):
global parsed_data
result, msg = parse_gemini_content(content)
if result:
preview = f"""
### 📊 解析结果
**标题:** {result['seo_title']}
**主关键词:** {result.get('primary_keyword', '无')}
**标签:** {', '.join(result.get('tags', []))}
**URL Slug:** {result.get('url_slug', '自动生成')}
**内容长度:** {len(result.get('content', ''))} 字符
"""
return msg, preview
return msg, ""
def update_image_description_callback(image):
global uploaded_image, uploaded_image_path, image_description
if image:
uploaded_image_path = image # 存储文件路径
with open(image, 'rb') as f:
uploaded_image = f.read()
filename = os.path.basename(image)
name_without_ext = '.'.join(filename.split('.')[:-1])
readable_name = name_without_ext.replace('_', ' ').replace('-', ' ').title()
image_description = f"Featured image: {readable_name}"
# 获取图片尺寸信息
try:
with Image.open(image) as img:
width, height = img.size
size_info = f" ({width}×{height}px)"
image_description += size_info
except:
pass
return image_description, image_description, gr.update(value=image, label=f"📷 {filename}")
return "", "", gr.update()
def save_config_callback(url, username, password, status):
global wp_config, wp_config_locked, post_status
if not wp_config_locked:
wp_config = {
'url': url.rstrip('/'),
'username': username,
'password': password,
'status': status
}
post_status = status
return "✅ 配置已保存!"
return "🔒 配置已锁定,无法保存"
def toggle_lock_callback(locked):
global wp_config_locked
wp_config_locked = locked
return gr.update(interactive=not locked), gr.update(interactive=not locked), gr.update(interactive=not locked)
def publish_callback():
global parsed_data, uploaded_image_path, wp_config, post_status, image_description
if not parsed_data:
return "❌ 请先解析内容", "", ""
if not all([wp_config.get('url'), wp_config.get('username'), wp_config.get('password')]):
return "❌ WordPress 配置不完整", "", ""
yield "🖼️ 正在上传图片到 WordPress...", "", ""
media_id = None
media_url = None
if uploaded_image_path:
media_id, media_url, img_msg = upload_image_to_wordpress(
uploaded_image_path,
wp_config,
parsed_data.get('url_slug', 'post')
)
if not media_id:
return f"❌ {img_msg}", "", ""
yield "📤 正在创建文章并设置 Rank Math 元数据...", "", ""
result, error = create_wordpress_post(parsed_data, wp_config, media_id, media_url)
if result:
yield f"✅ 文章{'已发布' if post_status == 'publish' else '已保存为草稿'}!(ID: {result['id']})", f"""
### 📝 发布成功
**文章ID:** {result['id']}
**状态:** {result['status']}
**日期:** {result['date'][:10]}
**链接:** {result['link']}
**图片位置:** 已添加到文章内容底部(原始大小)
""", json.dumps({
'post_id': result['id'],
'title': result['title']['rendered'],
'link': result['link'],
'status': result['status'],
'slug': result['slug'],
'published_at': datetime.now().isoformat(),
'parse_method': parse_method,
'image_added_to_content': bool(media_url),
'image_url': media_url
}, indent=2)
else:
yield f"❌ 发布失败: {error}", "", ""
# ========== Gradio UI 布局 ==========
with gr.Blocks(title="CdGarment WordPress Publisher", theme=gr.themes.Soft()) as demo:
gr.Markdown("# 🏭 CdGarment WordPress Publisher")
gr.Markdown("### ✓ 专为 Rank Math SEO 优化 • 图片将添加到文章底部(原始大小)")
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("### 📋 粘贴 Gemini 内容")
gr.Markdown("支持 JSON 和传统格式")
example_content = """# Article #25: Smart Manufacturing: AI & Automation in our Humen Factory
The apparel industry is undergoing a digital revolution...
---
# Machine-Readable Data (For your Script)
```json
{
"post_id": "25",
"seo_toolkit": {
"primary_keyword": "AI in Garment Manufacturing",
"secondary_keywords": [
"Smart apparel factory Humen",
"Automated fabric cutting",
"Digital clothing production 2026"
],
"seo_title": "Smart Manufacturing: AI & Automation in our Humen Factory | CdGarment",
"description": "Explore the future of fashion...",
"character_count": 159,
"tags": ["AI garment manufacturing", "smart factory", "automated apparel production", "digital fashion", "Humen factory"]
}
}
```"""
content_input = gr.Textbox(
label="粘贴你的 Gemini 输出",
value=example_content,
lines=15,
placeholder="在此粘贴完整的 Gemini 输出(JSON 或传统格式)"
)
parse_btn = gr.Button("🔍 解析内容", variant="primary")
parse_msg = gr.Markdown()
preview_box = gr.Markdown()
with gr.Column(scale=1):
gr.Markdown("### 🖼️ 图片上传")
gr.Markdown("上传的图片将:1) 设置为特色图片 2) 以原始大小添加到文章内容底部")
image_upload = gr.File(
label="上传特色图片",
file_types=["image"],
type="filepath"
)
img_desc_input = gr.Textbox(
label="图片描述(用于alt文本)",
placeholder="输入图片描述,将显示在图片下方",
value=""
)
image_preview = gr.Image(label="图片预览", height=200)
gr.Markdown("---")
gr.Markdown("### ⚙️ WordPress 设置")
lock_toggle = gr.Checkbox(label="🔒 锁定配置", value=False)
wp_url = gr.Textbox(
label="WordPress URL",
value=wp_config.get('url', 'https://cdgarment.com')
)
wp_username = gr.Textbox(
label="用户名",
value=wp_config.get('username', ''),
placeholder="admin"
)
wp_password = gr.Textbox(
label="应用密码",
value=wp_config.get('password', ''),
type="password",
placeholder="••••••••"
)
status_select = gr.Radio(
choices=["draft", "publish"],
value=post_status,
label="文章状态"
)
save_btn = gr.Button("💾 保存配置")
save_msg = gr.Markdown()
gr.Markdown("---")
gr.Markdown("### 🚀 准备发布")
with gr.Row():
content_status = gr.Markdown("❌ 内容")
image_status = gr.Markdown("⚠️ 图片可选")
wp_status = gr.Markdown("❓ WordPress")
status_display = gr.Markdown(f"📊 状态: {post_status.upper()}")
publish_btn = gr.Button("🚀 推送到 WordPress", variant="primary", size="lg")
publish_progress = gr.Markdown()
publish_result = gr.Markdown()
json_output = gr.JSON(label="文章数据", visible=False)
download_btn = gr.DownloadButton(
"📥 下载文章数据",
value="",
visible=False
)
# ========== 事件绑定 ==========
parse_btn.click(
parse_content_callback,
inputs=[content_input],
outputs=[parse_msg, preview_box]
)
image_upload.change(
update_image_description_callback,
inputs=[image_upload],
outputs=[img_desc_input, img_desc_input, image_preview]
)
lock_toggle.change(
toggle_lock_callback,
inputs=[lock_toggle],
outputs=[wp_url, wp_username, wp_password]
)
save_btn.click(
save_config_callback,
inputs=[wp_url, wp_username, wp_password, status_select],
outputs=[save_msg]
)
publish_btn.click(
publish_callback,
inputs=[],
outputs=[publish_progress, publish_result, json_output]
).then(
lambda: (gr.update(visible=True), gr.update(visible=True)),
outputs=[json_output, download_btn]
)
gr.Markdown(f"---\nCdGarment WordPress Publisher • {datetime.now().year} • 格式: {parse_method.upper()} • 图片将添加到文章底部")
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860, share=True)