|
import gradio as gr |
|
import requests |
|
import json |
|
import os |
|
from datetime import datetime, timedelta |
|
from huggingface_hub import InferenceClient |
|
|
|
MAX_COUNTRY_RESULTS = 100 |
|
MAX_GLOBAL_RESULTS = 1000 |
|
|
|
def create_article_components(max_results): |
|
article_components = [] |
|
for i in range(max_results): |
|
with gr.Group(visible=False) as article_group: |
|
title = gr.Markdown() |
|
image = gr.Image(width=200, height=150) |
|
snippet = gr.Markdown() |
|
info = gr.Markdown() |
|
|
|
article_components.append({ |
|
'group': article_group, |
|
'title': title, |
|
'image': image, |
|
'snippet': snippet, |
|
'info': info, |
|
'index': i, |
|
}) |
|
return article_components |
|
|
|
API_KEY = os.getenv("SERPHOUSE_API_KEY") |
|
|
|
|
|
|
|
COUNTRY_LANGUAGES = { |
|
"United States": "en", |
|
"United Kingdom": "en", |
|
"Taiwan": "zh-TW", |
|
"Canada": "en", |
|
"Australia": "en", |
|
"Germany": "de", |
|
"France": "fr", |
|
"Japan": "ja", |
|
"South Korea": "ko", |
|
"China": "zh", |
|
"India": "hi", |
|
"Brazil": "pt", |
|
"Mexico": "es", |
|
"Russia": "ru", |
|
"Italy": "it", |
|
"Spain": "es", |
|
"Netherlands": "nl", |
|
"Singapore": "en", |
|
"Hong Kong": "zh-HK", |
|
"Indonesia": "id", |
|
"Malaysia": "ms", |
|
"Philippines": "tl", |
|
"Thailand": "th", |
|
"Vietnam": "vi", |
|
"Belgium": "nl", |
|
"Denmark": "da", |
|
"Finland": "fi", |
|
"Ireland": "en", |
|
"Norway": "no", |
|
"Poland": "pl", |
|
"Sweden": "sv", |
|
"Switzerland": "de", |
|
"Austria": "de", |
|
"Czech Republic": "cs", |
|
"Greece": "el", |
|
"Hungary": "hu", |
|
"Portugal": "pt", |
|
"Romania": "ro", |
|
"Turkey": "tr", |
|
"Israel": "he", |
|
"Saudi Arabia": "ar", |
|
"United Arab Emirates": "ar", |
|
"South Africa": "en", |
|
"Argentina": "es", |
|
"Chile": "es", |
|
"Colombia": "es", |
|
"Peru": "es", |
|
"Venezuela": "es", |
|
"New Zealand": "en", |
|
"Bangladesh": "bn", |
|
"Pakistan": "ur", |
|
"Egypt": "ar", |
|
"Morocco": "ar", |
|
"Nigeria": "en", |
|
"Kenya": "sw", |
|
"Ukraine": "uk", |
|
"Croatia": "hr", |
|
"Slovakia": "sk", |
|
"Bulgaria": "bg", |
|
"Serbia": "sr", |
|
"Estonia": "et", |
|
"Latvia": "lv", |
|
"Lithuania": "lt", |
|
"Slovenia": "sl", |
|
"Luxembourg": "fr", |
|
"Malta": "mt", |
|
"Cyprus": "el", |
|
"Iceland": "is" |
|
} |
|
|
|
COUNTRY_LOCATIONS = { |
|
"United States": "United States", |
|
"United Kingdom": "United Kingdom", |
|
"Taiwan": "Taiwan", |
|
"Canada": "Canada", |
|
"Australia": "Australia", |
|
"Germany": "Germany", |
|
"France": "France", |
|
"Japan": "Japan", |
|
"South Korea": "South Korea", |
|
"China": "China", |
|
"India": "India", |
|
"Brazil": "Brazil", |
|
"Mexico": "Mexico", |
|
"Russia": "Russia", |
|
"Italy": "Italy", |
|
"Spain": "Spain", |
|
"Netherlands": "Netherlands", |
|
"Singapore": "Singapore", |
|
"Hong Kong": "Hong Kong", |
|
"Indonesia": "Indonesia", |
|
"Malaysia": "Malaysia", |
|
"Philippines": "Philippines", |
|
"Thailand": "Thailand", |
|
"Vietnam": "Vietnam", |
|
"Belgium": "Belgium", |
|
"Denmark": "Denmark", |
|
"Finland": "Finland", |
|
"Ireland": "Ireland", |
|
"Norway": "Norway", |
|
"Poland": "Poland", |
|
"Sweden": "Sweden", |
|
"Switzerland": "Switzerland", |
|
"Austria": "Austria", |
|
"Czech Republic": "Czech Republic", |
|
"Greece": "Greece", |
|
"Hungary": "Hungary", |
|
"Portugal": "Portugal", |
|
"Romania": "Romania", |
|
"Turkey": "Turkey", |
|
"Israel": "Israel", |
|
"Saudi Arabia": "Saudi Arabia", |
|
"United Arab Emirates": "United Arab Emirates", |
|
"South Africa": "South Africa", |
|
"Argentina": "Argentina", |
|
"Chile": "Chile", |
|
"Colombia": "Colombia", |
|
"Peru": "Peru", |
|
"Venezuela": "Venezuela", |
|
"New Zealand": "New Zealand", |
|
"Bangladesh": "Bangladesh", |
|
"Pakistan": "Pakistan", |
|
"Egypt": "Egypt", |
|
"Morocco": "Morocco", |
|
"Nigeria": "Nigeria", |
|
"Kenya": "Kenya", |
|
"Ukraine": "Ukraine", |
|
"Croatia": "Croatia", |
|
"Slovakia": "Slovakia", |
|
"Bulgaria": "Bulgaria", |
|
"Serbia": "Serbia", |
|
"Estonia": "Estonia", |
|
"Latvia": "Latvia", |
|
"Lithuania": "Lithuania", |
|
"Slovenia": "Slovenia", |
|
"Luxembourg": "Luxembourg", |
|
"Malta": "Malta", |
|
"Cyprus": "Cyprus", |
|
"Iceland": "Iceland" |
|
} |
|
|
|
MAJOR_COUNTRIES = list(COUNTRY_LOCATIONS.keys()) |
|
|
|
def translate_query(query, country): |
|
try: |
|
|
|
if is_english(query): |
|
print(f"μμ΄ κ²μμ΄ κ°μ§ - μλ³Έ μ¬μ©: {query}") |
|
return query |
|
|
|
|
|
if country in COUNTRY_LANGUAGES: |
|
|
|
if country == "South Korea": |
|
print(f"νκ΅ μ ν - μλ³Έ μ¬μ©: {query}") |
|
return query |
|
|
|
target_lang = COUNTRY_LANGUAGES[country] |
|
print(f"λ²μ μλ: {query} -> {country}({target_lang})") |
|
|
|
url = f"https://translate.googleapis.com/translate_a/single" |
|
params = { |
|
"client": "gtx", |
|
"sl": "auto", |
|
"tl": target_lang, |
|
"dt": "t", |
|
"q": query |
|
} |
|
|
|
response = requests.get(url, params=params) |
|
translated_text = response.json()[0][0][0] |
|
print(f"λ²μ μλ£: {query} -> {translated_text} ({country})") |
|
return translated_text |
|
|
|
return query |
|
|
|
except Exception as e: |
|
print(f"λ²μ μ€λ₯: {str(e)}") |
|
return query |
|
|
|
def translate_to_korean(text): |
|
try: |
|
url = "https://translate.googleapis.com/translate_a/single" |
|
params = { |
|
"client": "gtx", |
|
"sl": "auto", |
|
"tl": "ko", |
|
"dt": "t", |
|
"q": text |
|
} |
|
|
|
response = requests.get(url, params=params) |
|
translated_text = response.json()[0][0][0] |
|
return translated_text |
|
except Exception as e: |
|
print(f"νκΈ λ²μ μ€λ₯: {str(e)}") |
|
return text |
|
|
|
def is_english(text): |
|
return all(ord(char) < 128 for char in text.replace(' ', '').replace('-', '').replace('_', '')) |
|
|
|
def is_korean(text): |
|
return any('\uAC00' <= char <= '\uD7A3' for char in text) |
|
|
|
def search_serphouse(query, country, page=1, num_result=10): |
|
url = "https://api.serphouse.com/serp/live" |
|
|
|
now = datetime.utcnow() |
|
yesterday = now - timedelta(days=1) |
|
date_range = f"{yesterday.strftime('%Y-%m-%d')},{now.strftime('%Y-%m-%d')}" |
|
|
|
translated_query = translate_query(query, country) |
|
print(f"Original query: {query}") |
|
print(f"Translated query: {translated_query}") |
|
|
|
payload = { |
|
"data": { |
|
"q": translated_query, |
|
"domain": "google.com", |
|
"loc": COUNTRY_LOCATIONS.get(country, "United States"), |
|
"lang": COUNTRY_LANGUAGES.get(country, "en"), |
|
"device": "desktop", |
|
"serp_type": "news", |
|
"page": "1", |
|
"num": "10", |
|
"date_range": date_range, |
|
"sort_by": "date" |
|
} |
|
} |
|
|
|
headers = { |
|
"accept": "application/json", |
|
"content-type": "application/json", |
|
"authorization": f"Bearer {API_KEY}" |
|
} |
|
|
|
try: |
|
response = requests.post(url, json=payload, headers=headers) |
|
print("Request payload:", json.dumps(payload, indent=2, ensure_ascii=False)) |
|
print("Response status:", response.status_code) |
|
|
|
response.raise_for_status() |
|
return {"results": response.json(), "translated_query": translated_query} |
|
except requests.RequestException as e: |
|
return {"error": f"Error: {str(e)}", "translated_query": query} |
|
|
|
def format_results_from_raw(response_data): |
|
if "error" in response_data: |
|
return "Error: " + response_data["error"], [] |
|
|
|
try: |
|
results = response_data["results"] |
|
translated_query = response_data["translated_query"] |
|
|
|
news_results = results.get('results', {}).get('results', {}).get('news', []) |
|
if not news_results: |
|
return "κ²μ κ²°κ³Όκ° μμ΅λλ€.", [] |
|
|
|
articles = [] |
|
for idx, result in enumerate(news_results, 1): |
|
articles.append({ |
|
"index": idx, |
|
"title": result.get("title", "μ λͺ© μμ"), |
|
"link": result.get("url", result.get("link", "#")), |
|
"snippet": result.get("snippet", "λ΄μ© μμ"), |
|
"channel": result.get("channel", result.get("source", "μ μ μμ")), |
|
"time": result.get("time", result.get("date", "μ μ μλ μκ°")), |
|
"image_url": result.get("img", result.get("thumbnail", "")), |
|
"translated_query": translated_query |
|
}) |
|
return "", articles |
|
except Exception as e: |
|
return f"κ²°κ³Ό μ²λ¦¬ μ€ μ€λ₯ λ°μ: {str(e)}", [] |
|
|
|
def serphouse_search(query, country): |
|
response_data = search_serphouse(query, country) |
|
return format_results_from_raw(response_data) |
|
|
|
css = """ |
|
footer {visibility: hidden;} |
|
#status_area { |
|
background: rgba(255, 255, 255, 0.9); /* μ½κ° ν¬λͺ
ν ν°μ λ°°κ²½ */ |
|
padding: 15px; |
|
border-bottom: 1px solid #ddd; |
|
margin-bottom: 20px; |
|
box-shadow: 0 2px 5px rgba(0,0,0,0.1); /* λΆλλ¬μ΄ κ·Έλ¦Όμ ν¨κ³Ό */ |
|
} |
|
#results_area { |
|
padding: 10px; |
|
margin-top: 10px; |
|
} |
|
/* ν μ€νμΌ κ°μ */ |
|
.tabs { |
|
border-bottom: 2px solid #ddd !important; |
|
margin-bottom: 20px !important; |
|
} |
|
.tab-nav { |
|
border-bottom: none !important; |
|
margin-bottom: 0 !important; |
|
} |
|
.tab-nav button { |
|
font-weight: bold !important; |
|
padding: 10px 20px !important; |
|
} |
|
.tab-nav button.selected { |
|
border-bottom: 2px solid #1f77b4 !important; /* μ νλ ν κ°μ‘° */ |
|
color: #1f77b4 !important; |
|
} |
|
/* κ²μ μν λ©μμ§ μ€νμΌ */ |
|
#status_area .markdown-text { |
|
font-size: 1.1em; |
|
color: #2c3e50; |
|
padding: 10px 0; |
|
} |
|
/* κ²μ κ²°κ³Ό 컨ν
μ΄λ μ€νμΌ */ |
|
.group { |
|
border: 1px solid #eee; |
|
padding: 15px; |
|
margin-bottom: 15px; |
|
border-radius: 5px; |
|
background: white; |
|
} |
|
/* κ²μ λ²νΌ μ€νμΌ */ |
|
.primary-btn { |
|
background: #1f77b4 !important; |
|
border: none !important; |
|
} |
|
/* κ²μμ΄ μ
λ ₯μ°½ μ€νμΌ */ |
|
.textbox { |
|
border: 1px solid #ddd !important; |
|
border-radius: 4px !important; |
|
} |
|
""" |
|
|
|
with gr.Blocks(theme="Nymbo/Nymbo_Theme", css=css, title="NewsAI μλΉμ€") as iface: |
|
with gr.Tabs(): |
|
|
|
with gr.Tab("κ΅κ°λ³"): |
|
gr.Markdown("κ²μμ΄λ₯Ό μ
λ ₯νκ³ μνλ κ΅κ°(68κ°κ΅)λ₯Ό μ ννλ©΄, κ²μμ΄μ μΌμΉνλ 24μκ° μ΄λ΄ λ΄μ€λ₯Ό μ΅λ 100κ° μΆλ ₯ν©λλ€.") |
|
gr.Markdown("κ΅κ° μ νν κ²μμ΄μ 'νκΈ'μ μ
λ ₯νλ©΄ νμ§ μΈμ΄λ‘ λ²μλμ΄ κ²μν©λλ€. μ: 'Taiwan' κ΅κ° μ νν 'μΌμ±' μ
λ ₯μ 'δΈζ'μΌλ‘ μλ κ²μ") |
|
|
|
with gr.Column(): |
|
with gr.Row(): |
|
query = gr.Textbox(label="κ²μμ΄") |
|
country = gr.Dropdown(MAJOR_COUNTRIES, label="κ΅κ°", value="South Korea") |
|
|
|
status_message = gr.Markdown("", visible=True) |
|
translated_query_display = gr.Markdown(visible=False) |
|
search_button = gr.Button("κ²μ", variant="primary") |
|
|
|
progress = gr.Progress() |
|
articles_state = gr.State([]) |
|
|
|
article_components = [] |
|
for i in range(100): |
|
with gr.Group(visible=False) as article_group: |
|
title = gr.Markdown() |
|
image = gr.Image(width=200, height=150) |
|
snippet = gr.Markdown() |
|
info = gr.Markdown() |
|
|
|
article_components.append({ |
|
'group': article_group, |
|
'title': title, |
|
'image': image, |
|
'snippet': snippet, |
|
'info': info, |
|
'index': i, |
|
}) |
|
|
|
with gr.Tab("μ μΈκ³"): |
|
gr.Markdown("κ²μμ΄λ₯Ό μ
λ ₯νλ©΄ 68κ°κ΅ μ 체μ λν΄ κ΅κ°λ³λ‘ ꡬλΆνμ¬ 24μκ° μ΄λ΄ λ΄μ€κ° μ΅λ 1000κ° μμ°¨ μΆλ ₯λ©λλ€.") |
|
gr.Markdown("κ΅κ° μ νν κ²μμ΄μ 'νκΈ'μ μ
λ ₯νλ©΄ νμ§ μΈμ΄λ‘ λ²μλμ΄ κ²μν©λλ€. μ: 'Taiwan' κ΅κ° μ νν 'μΌμ±' μ
λ ₯μ 'δΈζ'μΌλ‘ μλ κ²μ") |
|
|
|
with gr.Column(): |
|
|
|
with gr.Column(elem_id="status_area"): |
|
with gr.Row(): |
|
query_global = gr.Textbox(label="κ²μμ΄") |
|
search_button_global = gr.Button("μ μΈκ³ κ²μ", variant="primary") |
|
|
|
status_message_global = gr.Markdown("") |
|
translated_query_display_global = gr.Markdown("") |
|
|
|
|
|
with gr.Column(elem_id="results_area"): |
|
articles_state_global = gr.State([]) |
|
|
|
global_article_components = [] |
|
for i in range(1000): |
|
with gr.Group(visible=False) as article_group: |
|
title = gr.Markdown() |
|
image = gr.Image(width=200, height=150) |
|
snippet = gr.Markdown() |
|
info = gr.Markdown() |
|
|
|
global_article_components.append({ |
|
'group': article_group, |
|
'title': title, |
|
'image': image, |
|
'snippet': snippet, |
|
'info': info, |
|
'index': i, |
|
}) |
|
|
|
def search_and_display(query, country, articles_state, progress=gr.Progress()): |
|
status_msg = "κ²μμ μ§νμ€μ
λλ€. μ μλ§ κΈ°λ€λ¦¬μΈμ..." |
|
|
|
progress(0, desc="κ²μμ΄ λ²μ μ€...") |
|
translated_query = translate_query(query, country) |
|
translated_display = f"**μλ³Έ κ²μμ΄:** {query}\n**λ²μλ κ²μμ΄:** {translated_query}" if translated_query != query else f"**κ²μμ΄:** {query}" |
|
|
|
progress(0.2, desc="κ²μ μμ...") |
|
error_message, articles = serphouse_search(query, country) |
|
progress(0.5, desc="κ²°κ³Ό μ²λ¦¬ μ€...") |
|
|
|
outputs = [] |
|
outputs.append(gr.update(value=status_msg, visible=True)) |
|
outputs.append(gr.update(value=translated_display, visible=True)) |
|
|
|
if error_message: |
|
outputs.append(gr.update(value=error_message, visible=True)) |
|
for comp in article_components: |
|
outputs.extend([ |
|
gr.update(visible=False), gr.update(), gr.update(), |
|
gr.update(), gr.update() |
|
]) |
|
articles_state = [] |
|
else: |
|
outputs.append(gr.update(value="", visible=False)) |
|
total_articles = len(articles) |
|
for idx, comp in enumerate(article_components): |
|
progress((idx + 1) / total_articles, desc=f"κ²°κ³Ό νμ μ€... {idx + 1}/{total_articles}") |
|
if idx < len(articles): |
|
article = articles[idx] |
|
image_url = article['image_url'] |
|
image_update = gr.update(value=image_url, visible=True) if image_url and not image_url.startswith('data:image') else gr.update(value=None, visible=False) |
|
|
|
korean_summary = translate_to_korean(article['snippet']) |
|
|
|
outputs.extend([ |
|
gr.update(visible=True), |
|
gr.update(value=f"### [{article['title']}]({article['link']})"), |
|
image_update, |
|
gr.update(value=f"**μμ½:** {article['snippet']}\n\n**νκΈ μμ½:** {korean_summary}"), |
|
gr.update(value=f"**μΆμ²:** {article['channel']} | **μκ°:** {article['time']}") |
|
]) |
|
else: |
|
outputs.extend([ |
|
gr.update(visible=False), gr.update(), gr.update(), |
|
gr.update(), gr.update() |
|
]) |
|
articles_state = articles |
|
|
|
progress(1.0, desc="μλ£!") |
|
outputs.append(articles_state) |
|
outputs[0] = gr.update(value="", visible=False) |
|
|
|
return outputs |
|
|
|
def search_global(query, articles_state_global): |
|
status_msg = "μ μΈκ³ κ²μμ μμν©λλ€..." |
|
all_results = [] |
|
|
|
outputs = [ |
|
gr.update(value=status_msg, visible=True), |
|
gr.update(value=f"**κ²μμ΄:** {query}", visible=True), |
|
] |
|
|
|
for _ in global_article_components: |
|
outputs.extend([ |
|
gr.update(visible=False), gr.update(), gr.update(), |
|
gr.update(), gr.update() |
|
]) |
|
outputs.append([]) |
|
|
|
yield outputs |
|
|
|
total_countries = len(COUNTRY_LOCATIONS) |
|
for idx, (country, location) in enumerate(COUNTRY_LOCATIONS.items(), 1): |
|
try: |
|
status_msg = f"{country} κ²μ μ€... ({idx}/{total_countries} κ΅κ°)" |
|
outputs[0] = gr.update(value=status_msg, visible=True) |
|
yield outputs |
|
|
|
error_message, articles = serphouse_search(query, country) |
|
if not error_message and articles: |
|
for article in articles: |
|
article['source_country'] = country |
|
|
|
all_results.extend(articles) |
|
sorted_results = sorted(all_results, key=lambda x: x.get('time', ''), reverse=True) |
|
|
|
seen_urls = set() |
|
unique_results = [] |
|
for article in sorted_results: |
|
url = article.get('link', '') |
|
if url not in seen_urls: |
|
seen_urls.add(url) |
|
unique_results.append(article) |
|
|
|
unique_results = unique_results[:1000] |
|
|
|
outputs = [ |
|
gr.update(value=f"{idx}/{total_countries} κ΅κ° κ²μ μλ£\nνμ¬κΉμ§ λ°κ²¬λ λ΄μ€: {len(unique_results)}건", visible=True), |
|
gr.update(value=f"**κ²μμ΄:** {query}", visible=True), |
|
] |
|
|
|
for idx, comp in enumerate(global_article_components): |
|
if idx < len(unique_results): |
|
article = unique_results[idx] |
|
image_url = article.get('image_url', '') |
|
image_update = gr.update(value=image_url, visible=True) if image_url and not image_url.startswith('data:image') else gr.update(value=None, visible=False) |
|
|
|
korean_summary = translate_to_korean(article['snippet']) |
|
|
|
outputs.extend([ |
|
gr.update(visible=True), |
|
gr.update(value=f"### [{article['title']}]({article['link']})"), |
|
image_update, |
|
gr.update(value=f"**μμ½:** {article['snippet']}\n\n**νκΈ μμ½:** {korean_summary}"), |
|
gr.update(value=f"**μΆμ²:** {article['channel']} | **κ΅κ°:** {article['source_country']} | **μκ°:** {article['time']}") |
|
]) |
|
else: |
|
outputs.extend([ |
|
gr.update(visible=False), gr.update(), gr.update(), |
|
gr.update(), gr.update() |
|
]) |
|
|
|
outputs.append(unique_results) |
|
yield outputs |
|
|
|
except Exception as e: |
|
print(f"Error searching {country}: {str(e)}") |
|
continue |
|
|
|
final_status = f"κ²μ μλ£! μ΄ {len(unique_results)}κ°μ λ΄μ€κ° λ°κ²¬λμμ΅λλ€." |
|
outputs[0] = gr.update(value=final_status, visible=True) |
|
yield outputs |
|
|
|
search_outputs = [ |
|
status_message, |
|
translated_query_display, |
|
gr.Markdown(visible=False) |
|
] |
|
|
|
for comp in article_components: |
|
search_outputs.extend([ |
|
comp['group'], comp['title'], comp['image'], |
|
comp['snippet'], comp['info'] |
|
]) |
|
search_outputs.append(articles_state) |
|
|
|
search_button.click( |
|
search_and_display, |
|
inputs=[query, country, articles_state], |
|
outputs=search_outputs, |
|
show_progress=True |
|
) |
|
|
|
global_search_outputs = [ |
|
status_message_global, |
|
translated_query_display_global, |
|
] |
|
|
|
for comp in global_article_components: |
|
global_search_outputs.extend([ |
|
comp['group'], comp['title'], comp['image'], |
|
comp['snippet'], comp['info'] |
|
]) |
|
global_search_outputs.append(articles_state_global) |
|
|
|
search_button_global.click( |
|
search_global, |
|
inputs=[query_global, articles_state_global], |
|
outputs=global_search_outputs |
|
) |
|
|
|
iface.launch(auth=("it1","chosun1")) |