Spaces:
Sleeping
Sleeping
import gradio as gr | |
import requests | |
from bs4 import BeautifulSoup | |
import urllib.parse # iframe ๊ฒฝ๋ก ๋ณด์ ์ ์ํ ๋ชจ๋ | |
import re | |
import logging | |
import tempfile | |
import pandas as pd | |
import mecab # pythonโmecabโko ๋ผ์ด๋ธ๋ฌ๋ฆฌ ์ฌ์ฉ | |
import os | |
import time | |
import hmac | |
import hashlib | |
import base64 | |
# ๋๋ฒ๊น (๋ก๊ทธ)์ฉ ํจ์ | |
def debug_log(message: str): | |
print(f"[DEBUG] {message}") | |
# [๊ธฐ๋ณธ์ฝ๋] - ๋ค์ด๋ฒ ๋ธ๋ก๊ทธ ์คํฌ๋ํ ๊ธฐ๋ฅ | |
def scrape_naver_blog(url: str) -> str: | |
debug_log("scrape_naver_blog ํจ์ ์์") | |
debug_log(f"์์ฒญ๋ฐ์ URL: {url}") | |
headers = { | |
"User-Agent": ( | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) " | |
"AppleWebKit/537.36 (KHTML, like Gecko) " | |
"Chrome/96.0.4664.110 Safari/537.36" | |
) | |
} | |
try: | |
# 1) ๋ค์ด๋ฒ ๋ธ๋ก๊ทธ '๋ฉ์ธ' ํ์ด์ง ์์ฒญ | |
response = requests.get(url, headers=headers) | |
debug_log("HTTP GET ์์ฒญ(๋ฉ์ธ ํ์ด์ง) ์๋ฃ") | |
if response.status_code != 200: | |
debug_log(f"์์ฒญ ์คํจ, ์ํ์ฝ๋: {response.status_code}") | |
return f"์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค. ์ํ์ฝ๋: {response.status_code}" | |
# 2) ๋ฉ์ธ ํ์ด์ง ํ์ฑ | |
soup = BeautifulSoup(response.text, "html.parser") | |
debug_log("HTML ํ์ฑ(๋ฉ์ธ ํ์ด์ง) ์๋ฃ") | |
# 3) iframe ํ๊ทธ ์ฐพ๊ธฐ | |
iframe = soup.select_one("iframe#mainFrame") | |
if not iframe: | |
debug_log("iframe#mainFrame ํ๊ทธ๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค.") | |
return "๋ณธ๋ฌธ iframe์ ์ฐพ์ ์ ์์ต๋๋ค." | |
iframe_src = iframe.get("src") | |
if not iframe_src: | |
debug_log("iframe src๊ฐ ์กด์ฌํ์ง ์์ต๋๋ค.") | |
return "๋ณธ๋ฌธ iframe์ src๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค." | |
# 4) iframe src ๋ณด์ (์ ๋๊ฒฝ๋ก ์ฒ๋ฆฌ) | |
parsed_iframe_url = urllib.parse.urljoin(url, iframe_src) | |
debug_log(f"iframe ํ์ด์ง ์์ฒญ URL: {parsed_iframe_url}") | |
# 5) iframe ํ์ด์ง ์์ฒญ ๋ฐ ํ์ฑ | |
iframe_response = requests.get(parsed_iframe_url, headers=headers) | |
debug_log("HTTP GET ์์ฒญ(iframe ํ์ด์ง) ์๋ฃ") | |
if iframe_response.status_code != 200: | |
debug_log(f"iframe ์์ฒญ ์คํจ, ์ํ์ฝ๋: {iframe_response.status_code}") | |
return f"iframe์์ ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค. ์ํ์ฝ๋: {iframe_response.status_code}" | |
iframe_soup = BeautifulSoup(iframe_response.text, "html.parser") | |
debug_log("HTML ํ์ฑ(iframe ํ์ด์ง) ์๋ฃ") | |
# 6) ์ ๋ชฉ๊ณผ ๋ณธ๋ฌธ ์ถ์ถ | |
title_div = iframe_soup.select_one('.se-module.se-module-text.se-title-text') | |
title = title_div.get_text(strip=True) if title_div else "์ ๋ชฉ์ ์ฐพ์ ์ ์์ต๋๋ค." | |
debug_log(f"์ถ์ถ๋ ์ ๋ชฉ: {title}") | |
content_div = iframe_soup.select_one('.se-main-container') | |
if content_div: | |
content = content_div.get_text("\n", strip=True) | |
else: | |
content = "๋ณธ๋ฌธ์ ์ฐพ์ ์ ์์ต๋๋ค." | |
debug_log("๋ณธ๋ฌธ ์ถ์ถ ์๋ฃ") | |
result = f"[์ ๋ชฉ]\n{title}\n\n[๋ณธ๋ฌธ]\n{content}" | |
debug_log("์ ๋ชฉ๊ณผ ๋ณธ๋ฌธ์ ํฉ์ณ ๋ฐํ ์ค๋น ์๋ฃ") | |
return result | |
except Exception as e: | |
debug_log(f"์๋ฌ ๋ฐ์: {str(e)}") | |
return f"์คํฌ๋ํ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {str(e)}" | |
# [์ฐธ์กฐ์ฝ๋-1] ํํ์ ๋ถ์ ๊ธฐ๋ฅ | |
def analyze_text(text: str): | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger(__name__) | |
logger.debug("์๋ณธ ํ ์คํธ: %s", text) | |
# 1. ํ๊ตญ์ด๋ง ๋จ๊ธฐ๊ธฐ (๊ณต๋ฐฑ, ์์ด, ๊ธฐํธ ๋ฑ ์ ๊ฑฐ) | |
filtered_text = re.sub(r'[^๊ฐ-ํฃ]', '', text) | |
logger.debug("ํํฐ๋ง๋ ํ ์คํธ (ํ๊ตญ์ด๋ง, ๊ณต๋ฐฑ ์ ๊ฑฐ): %s", filtered_text) | |
if not filtered_text: | |
logger.debug("์ ํจํ ํ๊ตญ์ด ํ ์คํธ๊ฐ ์์.") | |
return pd.DataFrame(columns=["๋จ์ด", "๋น๋์"]), "" | |
# 2. Mecab์ ์ด์ฉํ ํํ์ ๋ถ์ (๋ช ์ฌ์ ๋ณตํฉ๋ช ์ฌ๋ง ์ถ์ถ) | |
mecab_instance = mecab.MeCab() | |
tokens = mecab_instance.pos(filtered_text) | |
logger.debug("ํํ์ ๋ถ์ ๊ฒฐ๊ณผ: %s", tokens) | |
freq = {} | |
for word, pos in tokens: | |
if word and word.strip(): | |
if pos.startswith("NN"): | |
freq[word] = freq.get(word, 0) + 1 | |
logger.debug("๋จ์ด: %s, ํ์ฌ: %s, ํ์ฌ ๋น๋: %d", word, pos, freq[word]) | |
# 3. ๋น๋์๋ฅผ ๋ด๋ฆผ์ฐจ์ ์ ๋ ฌ | |
sorted_freq = sorted(freq.items(), key=lambda x: x[1], reverse=True) | |
logger.debug("๋ด๋ฆผ์ฐจ์ ์ ๋ ฌ๋ ๋จ์ด ๋น๋: %s", sorted_freq) | |
# 4. ๊ฒฐ๊ณผ DataFrame ์์ฑ | |
df = pd.DataFrame(sorted_freq, columns=["๋จ์ด", "๋น๋์"]) | |
logger.debug("๊ฒฐ๊ณผ DataFrame ์์ฑ๋จ, shape: %s", df.shape) | |
# 5. Excel ํ์ผ ์์ฑ (์์ ํ์ผ) | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx") | |
df.to_excel(temp_file.name, index=False, engine='openpyxl') | |
temp_file.close() | |
logger.debug("Excel ํ์ผ ์์ฑ๋จ: %s", temp_file.name) | |
return df, temp_file.name | |
# [์ฐธ์กฐ์ฝ๋-2] ๋ค์ด๋ฒ ๊ด๊ณ API ๋ฐ ๊ฒ์๋/๋ธ๋ก๊ทธ๋ฌธ์์ ์กฐํ ๊ธฐ๋ฅ | |
def generate_signature(timestamp, method, uri, secret_key): | |
message = f"{timestamp}.{method}.{uri}" | |
digest = hmac.new(secret_key.encode("utf-8"), message.encode("utf-8"), hashlib.sha256).digest() | |
return base64.b64encode(digest).decode() | |
def get_header(method, uri, api_key, secret_key, customer_id): | |
timestamp = str(round(time.time() * 1000)) | |
signature = generate_signature(timestamp, method, uri, secret_key) | |
return { | |
"Content-Type": "application/json; charset=UTF-8", | |
"X-Timestamp": timestamp, | |
"X-API-KEY": api_key, | |
"X-Customer": str(customer_id), | |
"X-Signature": signature | |
} | |
def fetch_related_keywords(keyword): | |
debug_log(f"fetch_related_keywords ํธ์ถ, ํค์๋: {keyword}") | |
API_KEY = os.environ["NAVER_API_KEY"] | |
SECRET_KEY = os.environ["NAVER_SECRET_KEY"] | |
CUSTOMER_ID = os.environ["NAVER_CUSTOMER_ID"] | |
BASE_URL = "https://api.naver.com" | |
uri = "/keywordstool" | |
method = "GET" | |
headers = get_header(method, uri, API_KEY, SECRET_KEY, CUSTOMER_ID) | |
params = { | |
"hintKeywords": [keyword], | |
"showDetail": "1" | |
} | |
response = requests.get(BASE_URL + uri, params=params, headers=headers) | |
data = response.json() | |
if "keywordList" not in data: | |
return pd.DataFrame() | |
df = pd.DataFrame(data["keywordList"]) | |
if len(df) > 100: | |
df = df.head(100) | |
def parse_count(x): | |
try: | |
return int(str(x).replace(",", "")) | |
except: | |
return 0 | |
df["PC์๊ฒ์๋"] = df["monthlyPcQcCnt"].apply(parse_count) | |
df["๋ชจ๋ฐ์ผ์๊ฒ์๋"] = df["monthlyMobileQcCnt"].apply(parse_count) | |
df["ํ ํ์๊ฒ์๋"] = df["PC์๊ฒ์๋"] + df["๋ชจ๋ฐ์ผ์๊ฒ์๋"] | |
df.rename(columns={"relKeyword": "์ ๋ณดํค์๋"}, inplace=True) | |
result_df = df[["์ ๋ณดํค์๋", "PC์๊ฒ์๋", "๋ชจ๋ฐ์ผ์๊ฒ์๋", "ํ ํ์๊ฒ์๋"]] | |
debug_log("fetch_related_keywords ์๋ฃ") | |
return result_df | |
def fetch_blog_count(keyword): | |
debug_log(f"fetch_blog_count ํธ์ถ, ํค์๋: {keyword}") | |
client_id = os.environ["NAVER_SEARCH_CLIENT_ID"] | |
client_secret = os.environ["NAVER_SEARCH_CLIENT_SECRET"] | |
url = "https://openapi.naver.com/v1/search/blog.json" | |
headers = { | |
"X-Naver-Client-Id": client_id, | |
"X-Naver-Client-Secret": client_secret | |
} | |
params = {"query": keyword, "display": 1} | |
response = requests.get(url, headers=headers, params=params) | |
if response.status_code == 200: | |
data = response.json() | |
debug_log(f"fetch_blog_count ๊ฒฐ๊ณผ: {data.get('total', 0)}") | |
return data.get("total", 0) | |
else: | |
debug_log(f"fetch_blog_count ์ค๋ฅ, ์ํ์ฝ๋: {response.status_code}") | |
return 0 | |
def create_excel_file(df): | |
with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as tmp: | |
excel_path = tmp.name | |
df.to_excel(excel_path, index=False) | |
debug_log(f"Excel ํ์ผ ์์ฑ๋จ: {excel_path}") | |
return excel_path | |
def process_keyword(keywords: str, include_related: bool): | |
debug_log(f"process_keyword ํธ์ถ, ํค์๋๋ค: {keywords}, ์ฐ๊ด๊ฒ์์ด ํฌํจ: {include_related}") | |
input_keywords = [k.strip() for k in keywords.splitlines() if k.strip()] | |
result_dfs = [] | |
for idx, kw in enumerate(input_keywords): | |
df_kw = fetch_related_keywords(kw) | |
if df_kw.empty: | |
continue | |
row_kw = df_kw[df_kw["์ ๋ณดํค์๋"] == kw] | |
if not row_kw.empty: | |
result_dfs.append(row_kw) | |
else: | |
result_dfs.append(df_kw.head(1)) | |
if include_related and idx == 0: | |
df_related = df_kw[df_kw["์ ๋ณดํค์๋"] != kw] | |
if not df_related.empty: | |
result_dfs.append(df_related) | |
if result_dfs: | |
result_df = pd.concat(result_dfs, ignore_index=True) | |
result_df.drop_duplicates(subset=["์ ๋ณดํค์๋"], inplace=True) | |
else: | |
result_df = pd.DataFrame(columns=["์ ๋ณดํค์๋", "PC์๊ฒ์๋", "๋ชจ๋ฐ์ผ์๊ฒ์๋", "ํ ํ์๊ฒ์๋"]) | |
result_df["๋ธ๋ก๊ทธ๋ฌธ์์"] = result_df["์ ๋ณดํค์๋"].apply(fetch_blog_count) | |
result_df.sort_values(by="ํ ํ์๊ฒ์๋", ascending=False, inplace=True) | |
debug_log("process_keyword ์๋ฃ") | |
return result_df, create_excel_file(result_df) | |
# [์ฐธ์กฐ์ฝ๋-1] ๋ฐ [์ฐธ์กฐ์ฝ๋-2]๋ฅผ ํ์ฉํ ํํ์ ๋ถ์ ๋ฐ ๊ฒ์๋, ๋ธ๋ก๊ทธ๋ฌธ์์ ์ถ๊ฐ (๋น๋์1 ์ ๊ฑฐ ์ต์ ํฌํจ) | |
def morphological_analysis_and_enrich(text: str, remove_freq1: bool): | |
debug_log("morphological_analysis_and_enrich ํจ์ ์์") | |
df_freq, _ = analyze_text(text) | |
if df_freq.empty: | |
debug_log("ํํ์ ๋ถ์ ๊ฒฐ๊ณผ๊ฐ ๋น ๋ฐ์ดํฐํ๋ ์์ ๋๋ค.") | |
return df_freq, "" | |
if remove_freq1: | |
before_shape = df_freq.shape | |
df_freq = df_freq[df_freq["๋น๋์"] != 1] | |
debug_log(f"๋น๋์ 1 ์ ๊ฑฐ ์ ์ฉ๋จ. {before_shape} -> {df_freq.shape}") | |
# ํํ์ ๋ถ์ ๊ฒฐ๊ณผ์์ ํค์๋ ์ถ์ถ (๊ฐ ๋จ์ด๋ฅผ ์ํฐ๋ก ๊ตฌ๋ถ) | |
keywords = "\n".join(df_freq["๋จ์ด"].tolist()) | |
debug_log(f"๋ถ์๋ ํค์๋: {keywords}") | |
# [์ฐธ์กฐ์ฝ๋-2]๋ฅผ ํ์ฉํ์ฌ ๊ฐ ํค์๋์ ๊ฒ์๋ ๋ฐ ๋ธ๋ก๊ทธ๋ฌธ์์ ์กฐํ (์ฐ๊ด๊ฒ์์ด ๋ฏธํฌํจ) | |
df_keyword_info, _ = process_keyword(keywords, include_related=False) | |
debug_log("๊ฒ์๋ ๋ฐ ๋ธ๋ก๊ทธ๋ฌธ์์ ์กฐํ ์๋ฃ") | |
# ํํ์ ๋ถ์ ๊ฒฐ๊ณผ์ ๊ฒ์๋ ์ ๋ณด๋ฅผ ๋ณํฉ (ํค์๋ ๊ธฐ์ค) | |
merged_df = pd.merge(df_freq, df_keyword_info, left_on="๋จ์ด", right_on="์ ๋ณดํค์๋", how="left") | |
merged_df.drop(columns=["์ ๋ณดํค์๋"], inplace=True) | |
# ๋ณํฉ ๊ฒฐ๊ณผ Excel ํ์ผ ์์ฑ | |
merged_excel_path = create_excel_file(merged_df) | |
debug_log("morphological_analysis_and_enrich ํจ์ ์๋ฃ") | |
return merged_df, merged_excel_path | |
# ์๋กญ๊ฒ ์ถ๊ฐ๋ ๊ธฐ๋ฅ: ์ ๋ ฅํ ๋ธ๋ก๊ทธ ๋งํฌ๋ก๋ถํฐ ์คํฌ๋ํํ์ฌ ์์ ๊ฐ๋ฅํ ํ ์คํธ ๋ฐ์ค์ ์ถ๋ ฅ | |
def fetch_blog_content(url: str): | |
debug_log("fetch_blog_content ํจ์ ์์") | |
content = scrape_naver_blog(url) | |
debug_log("fetch_blog_content ํจ์ ์๋ฃ") | |
return content | |
# Gradio ์ธํฐํ์ด์ค ๊ตฌ์ฑ (๋จ์ผ ํญ) | |
with gr.Blocks(title="๋ค์ด๋ฒ ๋ธ๋ก๊ทธ ํํ์ ๋ถ์ ์คํ์ด์ค", css=".gradio-container { max-width: 960px; margin: auto; }") as demo: | |
gr.Markdown("# ๋ค์ด๋ฒ ๋ธ๋ก๊ทธ ํํ์ ๋ถ์ ์คํ์ด์ค") | |
with gr.Row(): | |
blog_url_input = gr.Textbox(label="๋ค์ด๋ฒ ๋ธ๋ก๊ทธ ๋งํฌ", placeholder="์: https://blog.naver.com/ssboost/222983068507", lines=1) | |
with gr.Row(): | |
scrape_button = gr.Button("์คํฌ๋ํ ์คํ") | |
with gr.Row(): | |
blog_content_box = gr.Textbox(label="๋ธ๋ก๊ทธ ๋ด์ฉ (์์ ๊ฐ๋ฅ)", lines=10, placeholder="์คํฌ๋ํ๋ ๋ธ๋ก๊ทธ ๋ด์ฉ์ด ์ฌ๊ธฐ์ ํ์๋ฉ๋๋ค.") | |
with gr.Row(): | |
remove_freq_checkbox = gr.Checkbox(label="๋น๋์1 ์ ๊ฑฐ", value=False) | |
with gr.Row(): | |
analyze_button = gr.Button("๋ถ์ ์คํ") | |
with gr.Row(): | |
analysis_result = gr.Dataframe(label="๋ถ์ ๊ฒฐ๊ณผ (๋จ์ด, ๋น๋์, ๊ฒ์๋, ๋ธ๋ก๊ทธ๋ฌธ์์ ๋ฑ)") | |
with gr.Row(): | |
analysis_excel = gr.File(label="Excel ๋ค์ด๋ก๋") | |
# ์คํฌ๋ํ ์คํ ์ URL๋ก๋ถํฐ ๋ธ๋ก๊ทธ ๋ณธ๋ฌธ ์คํฌ๋ํ ํ ์์ ๊ฐ๋ฅํ ํ ์คํธ ๋ฐ์ค์ ์ถ๋ ฅ | |
scrape_button.click(fn=fetch_blog_content, inputs=blog_url_input, outputs=blog_content_box) | |
# ๋ถ์ ์คํ ์ ์์ ๋ ๋ธ๋ก๊ทธ ๋ด์ฉ์ ๋์์ผ๋ก ํํ์ ๋ถ์ ๋ฐ ๊ฒ์๋/๋ธ๋ก๊ทธ๋ฌธ์์ ์กฐํ ์งํ | |
analyze_button.click(fn=morphological_analysis_and_enrich, inputs=[blog_content_box, remove_freq_checkbox], outputs=[analysis_result, analysis_excel]) | |
if __name__ == "__main__": | |
debug_log("Gradio ์ฑ ์คํ ์์") | |
demo.launch() | |
debug_log("Gradio ์ฑ ์คํ ์ข ๋ฃ") | |