|
import gradio as gr |
|
import requests |
|
import json |
|
import os |
|
from datetime import datetime, timedelta |
|
from huggingface_hub import InferenceClient |
|
|
|
|
|
|
|
API_KEY = os.getenv("SERPHOUSE_API_KEY") |
|
|
|
|
|
|
|
COUNTRY_LANGUAGES = { |
|
"United States": "en", |
|
"United Kingdom": "en", |
|
"Taiwan": "zh-TW", |
|
"Canada": "en", |
|
"Australia": "en", |
|
"Germany": "de", |
|
"France": "fr", |
|
"Japan": "ja", |
|
"South Korea": "ko", |
|
"China": "zh", |
|
"India": "hi", |
|
"Brazil": "pt", |
|
"Mexico": "es", |
|
"Russia": "ru", |
|
"Italy": "it", |
|
"Spain": "es", |
|
"Netherlands": "nl", |
|
"Singapore": "en", |
|
"Hong Kong": "zh-HK", |
|
"Indonesia": "id", |
|
"Malaysia": "ms", |
|
"Philippines": "tl", |
|
"Thailand": "th", |
|
"Vietnam": "vi", |
|
"Belgium": "nl", |
|
"Denmark": "da", |
|
"Finland": "fi", |
|
"Ireland": "en", |
|
"Norway": "no", |
|
"Poland": "pl", |
|
"Sweden": "sv", |
|
"Switzerland": "de", |
|
"Austria": "de", |
|
"Czech Republic": "cs", |
|
"Greece": "el", |
|
"Hungary": "hu", |
|
"Portugal": "pt", |
|
"Romania": "ro", |
|
"Turkey": "tr", |
|
"Israel": "he", |
|
"Saudi Arabia": "ar", |
|
"United Arab Emirates": "ar", |
|
"South Africa": "en", |
|
"Argentina": "es", |
|
"Chile": "es", |
|
"Colombia": "es", |
|
"Peru": "es", |
|
"Venezuela": "es", |
|
"New Zealand": "en", |
|
"Bangladesh": "bn", |
|
"Pakistan": "ur", |
|
"Egypt": "ar", |
|
"Morocco": "ar", |
|
"Nigeria": "en", |
|
"Kenya": "sw", |
|
"Ukraine": "uk", |
|
"Croatia": "hr", |
|
"Slovakia": "sk", |
|
"Bulgaria": "bg", |
|
"Serbia": "sr", |
|
"Estonia": "et", |
|
"Latvia": "lv", |
|
"Lithuania": "lt", |
|
"Slovenia": "sl", |
|
"Luxembourg": "fr", |
|
"Malta": "mt", |
|
"Cyprus": "el", |
|
"Iceland": "is" |
|
} |
|
|
|
COUNTRY_LOCATIONS = { |
|
"United States": "United States", |
|
"United Kingdom": "United Kingdom", |
|
"Taiwan": "Taiwan", |
|
"Canada": "Canada", |
|
"Australia": "Australia", |
|
"Germany": "Germany", |
|
"France": "France", |
|
"Japan": "Japan", |
|
"South Korea": "South Korea", |
|
"China": "China", |
|
"India": "India", |
|
"Brazil": "Brazil", |
|
"Mexico": "Mexico", |
|
"Russia": "Russia", |
|
"Italy": "Italy", |
|
"Spain": "Spain", |
|
"Netherlands": "Netherlands", |
|
"Singapore": "Singapore", |
|
"Hong Kong": "Hong Kong", |
|
"Indonesia": "Indonesia", |
|
"Malaysia": "Malaysia", |
|
"Philippines": "Philippines", |
|
"Thailand": "Thailand", |
|
"Vietnam": "Vietnam", |
|
"Belgium": "Belgium", |
|
"Denmark": "Denmark", |
|
"Finland": "Finland", |
|
"Ireland": "Ireland", |
|
"Norway": "Norway", |
|
"Poland": "Poland", |
|
"Sweden": "Sweden", |
|
"Switzerland": "Switzerland", |
|
"Austria": "Austria", |
|
"Czech Republic": "Czech Republic", |
|
"Greece": "Greece", |
|
"Hungary": "Hungary", |
|
"Portugal": "Portugal", |
|
"Romania": "Romania", |
|
"Turkey": "Turkey", |
|
"Israel": "Israel", |
|
"Saudi Arabia": "Saudi Arabia", |
|
"United Arab Emirates": "United Arab Emirates", |
|
"South Africa": "South Africa", |
|
"Argentina": "Argentina", |
|
"Chile": "Chile", |
|
"Colombia": "Colombia", |
|
"Peru": "Peru", |
|
"Venezuela": "Venezuela", |
|
"New Zealand": "New Zealand", |
|
"Bangladesh": "Bangladesh", |
|
"Pakistan": "Pakistan", |
|
"Egypt": "Egypt", |
|
"Morocco": "Morocco", |
|
"Nigeria": "Nigeria", |
|
"Kenya": "Kenya", |
|
"Ukraine": "Ukraine", |
|
"Croatia": "Croatia", |
|
"Slovakia": "Slovakia", |
|
"Bulgaria": "Bulgaria", |
|
"Serbia": "Serbia", |
|
"Estonia": "Estonia", |
|
"Latvia": "Latvia", |
|
"Lithuania": "Lithuania", |
|
"Slovenia": "Slovenia", |
|
"Luxembourg": "Luxembourg", |
|
"Malta": "Malta", |
|
"Cyprus": "Cyprus", |
|
"Iceland": "Iceland" |
|
} |
|
|
|
MAJOR_COUNTRIES = list(COUNTRY_LOCATIONS.keys()) |
|
|
|
def translate_query(query, country): |
|
try: |
|
|
|
if is_english(query): |
|
print(f"μμ΄ κ²μμ΄ κ°μ§ - μλ³Έ μ¬μ©: {query}") |
|
return query |
|
|
|
|
|
if country in COUNTRY_LANGUAGES: |
|
|
|
if country == "South Korea": |
|
print(f"νκ΅ μ ν - μλ³Έ μ¬μ©: {query}") |
|
return query |
|
|
|
target_lang = COUNTRY_LANGUAGES[country] |
|
print(f"λ²μ μλ: {query} -> {country}({target_lang})") |
|
|
|
|
|
url = f"https://translate.googleapis.com/translate_a/single" |
|
|
|
params = { |
|
"client": "gtx", |
|
"sl": "auto", |
|
"tl": target_lang, |
|
"dt": "t", |
|
"q": query |
|
} |
|
|
|
response = requests.get(url, params=params) |
|
translated_text = response.json()[0][0][0] |
|
|
|
print(f"λ²μ μλ£: {query} -> {translated_text} ({country})") |
|
return translated_text |
|
|
|
return query |
|
|
|
except Exception as e: |
|
print(f"λ²μ μ€λ₯: {str(e)}") |
|
return query |
|
|
|
def is_english(text): |
|
|
|
return all(ord(char) < 128 for char in text.replace(' ', '').replace('-', '').replace('_', '')) |
|
|
|
|
|
def is_korean(text): |
|
return any('\uAC00' <= char <= '\uD7A3' for char in text) |
|
|
|
def is_english(text): |
|
return all(ord(char) < 128 for char in text.replace(' ', '')) |
|
|
|
def is_korean(text): |
|
return any('\uAC00' <= char <= '\uD7A3' for char in text) |
|
|
|
def search_serphouse(query, country, page=1, num_result=10): |
|
url = "https://api.serphouse.com/serp/live" |
|
|
|
|
|
now = datetime.utcnow() |
|
yesterday = now - timedelta(days=1) |
|
date_range = f"{yesterday.strftime('%Y-%m-%d')},{now.strftime('%Y-%m-%d')}" |
|
|
|
|
|
translated_query = translate_query(query, country) |
|
print(f"Original query: {query}") |
|
print(f"Translated query: {translated_query}") |
|
|
|
payload = { |
|
"data": { |
|
"q": translated_query, |
|
"domain": "google.com", |
|
"loc": COUNTRY_LOCATIONS.get(country, "United States"), |
|
"lang": COUNTRY_LANGUAGES.get(country, "en"), |
|
"device": "desktop", |
|
"serp_type": "news", |
|
"page": "1", |
|
"num": "10", |
|
"date_range": date_range, |
|
"sort_by": "date" |
|
} |
|
} |
|
|
|
headers = { |
|
"accept": "application/json", |
|
"content-type": "application/json", |
|
"authorization": f"Bearer {API_KEY}" |
|
} |
|
|
|
try: |
|
response = requests.post(url, json=payload, headers=headers) |
|
print("Request payload:", json.dumps(payload, indent=2, ensure_ascii=False)) |
|
print("Response status:", response.status_code) |
|
|
|
response.raise_for_status() |
|
return {"results": response.json(), "translated_query": translated_query} |
|
except requests.RequestException as e: |
|
return {"error": f"Error: {str(e)}", "translated_query": query} |
|
|
|
def format_results_from_raw(response_data): |
|
if "error" in response_data: |
|
return "Error: " + response_data["error"], [] |
|
|
|
try: |
|
results = response_data["results"] |
|
translated_query = response_data["translated_query"] |
|
|
|
news_results = results.get('results', {}).get('results', {}).get('news', []) |
|
if not news_results: |
|
return "κ²μ κ²°κ³Όκ° μμ΅λλ€.", [] |
|
|
|
articles = [] |
|
for idx, result in enumerate(news_results, 1): |
|
articles.append({ |
|
"index": idx, |
|
"title": result.get("title", "μ λͺ© μμ"), |
|
"link": result.get("url", result.get("link", "#")), |
|
"snippet": result.get("snippet", "λ΄μ© μμ"), |
|
"channel": result.get("channel", result.get("source", "μ μ μμ")), |
|
"time": result.get("time", result.get("date", "μ μ μλ μκ°")), |
|
"image_url": result.get("img", result.get("thumbnail", "")), |
|
"translated_query": translated_query |
|
}) |
|
return "", articles |
|
except Exception as e: |
|
return f"κ²°κ³Ό μ²λ¦¬ μ€ μ€λ₯ λ°μ: {str(e)}", [] |
|
|
|
def serphouse_search(query, country): |
|
response_data = search_serphouse(query, country) |
|
return format_results_from_raw(response_data) |
|
|
|
css = """ |
|
footer {visibility: hidden;} |
|
""" |
|
|
|
with gr.Blocks(theme="Nymbo/Nymbo_Theme", css=css, title="NewsAI μλΉμ€") as iface: |
|
gr.Markdown("κ²μμ΄λ₯Ό μ
λ ₯νκ³ μνλ κ΅κ°(67κ°κ΅)λ₯Ό μ ννλ©΄, κ²μμ΄μ μΌμΉνλ 24μκ° μ΄λ΄ λ΄μ€λ₯Ό μ΅λ 100κ° μΆλ ₯ν©λλ€.") |
|
gr.Markdown("κ΅κ° μ νν κ²μμ΄μ 'νκΈ'μ μ
λ ₯νλ©΄ νμ§ μΈμ΄λ‘ λ²μλμ΄ κ²μν©λλ€. μ: 'Taiwan' κ΅κ° μ νν 'μΌμ±' μ
λ ₯μ 'δΈζ'μΌλ‘ μλ κ²μ ") |
|
|
|
with gr.Column(): |
|
with gr.Row(): |
|
query = gr.Textbox(label="κ²μμ΄") |
|
country = gr.Dropdown(MAJOR_COUNTRIES, label="κ΅κ°", value="South Korea") |
|
|
|
|
|
search_status = gr.Markdown(visible=False) |
|
|
|
|
|
translated_query_display = gr.Markdown(visible=False) |
|
|
|
search_button = gr.Button("κ²μ", variant="primary") |
|
|
|
progress = gr.Progress() |
|
status_message = gr.Markdown(visible=False) |
|
articles_state = gr.State([]) |
|
|
|
article_components = [] |
|
for i in range(100): |
|
with gr.Group(visible=False) as article_group: |
|
title = gr.Markdown() |
|
image = gr.Image(width=200, height=150) |
|
snippet = gr.Markdown() |
|
info = gr.Markdown() |
|
|
|
article_components.append({ |
|
'group': article_group, |
|
'title': title, |
|
'image': image, |
|
'snippet': snippet, |
|
'info': info, |
|
'index': i, |
|
}) |
|
|
|
def search_and_display(query, country, articles_state, progress=gr.Progress()): |
|
|
|
search_status_output = gr.update(value="κ²μμ μ§νμ€μ
λλ€. μ μλ§ κΈ°λ€λ¦¬μΈμ...", visible=True) |
|
|
|
progress(0, desc="κ²μμ΄ λ²μ μ€...") |
|
|
|
|
|
translated_query = translate_query(query, country) |
|
translated_display = f"**μλ³Έ κ²μμ΄:** {query}\n**λ²μλ κ²μμ΄:** {translated_query}" if translated_query != query else f"**κ²μμ΄:** {query}" |
|
|
|
progress(0.2, desc="κ²μ μμ...") |
|
error_message, articles = serphouse_search(query, country) |
|
progress(0.5, desc="κ²°κ³Ό μ²λ¦¬ μ€...") |
|
|
|
outputs = [ |
|
search_status_output, |
|
gr.update(value=translated_display, visible=True) |
|
] |
|
|
|
if error_message: |
|
outputs.append(gr.update(value=error_message, visible=True)) |
|
for comp in article_components: |
|
outputs.extend([ |
|
gr.update(visible=False), gr.update(), gr.update(), |
|
gr.update(), gr.update() |
|
]) |
|
articles_state = [] |
|
else: |
|
outputs.append(gr.update(value="", visible=False)) |
|
total_articles = len(articles) |
|
for idx, comp in enumerate(article_components): |
|
progress((idx + 1) / total_articles, desc=f"κ²°κ³Ό νμ μ€... {idx + 1}/{total_articles}") |
|
if idx < len(articles): |
|
article = articles[idx] |
|
image_url = article['image_url'] |
|
image_update = gr.update(value=image_url, visible=True) if image_url and not image_url.startswith('data:image') else gr.update(value=None, visible=False) |
|
|
|
outputs.extend([ |
|
gr.update(visible=True), |
|
gr.update(value=f"### [{article['title']}]({article['link']})"), |
|
image_update, |
|
gr.update(value=f"**μμ½:** {article['snippet']}"), |
|
gr.update(value=f"**μΆμ²:** {article['channel']} | **μκ°:** {article['time']}") |
|
]) |
|
else: |
|
outputs.extend([ |
|
gr.update(visible=False), gr.update(), gr.update(), |
|
gr.update(), gr.update() |
|
]) |
|
articles_state = articles |
|
|
|
progress(1.0, desc="μλ£!") |
|
outputs.append(articles_state) |
|
outputs.append(gr.update(visible=False)) |
|
|
|
|
|
search_status_output = gr.update(visible=False) |
|
outputs[0] = search_status_output |
|
|
|
return outputs |
|
|
|
search_outputs = [ |
|
search_status, |
|
translated_query_display, |
|
gr.Markdown(visible=False) |
|
] |
|
for comp in article_components: |
|
search_outputs.extend([comp['group'], comp['title'], comp['image'], |
|
comp['snippet'], comp['info']]) |
|
search_outputs.extend([articles_state, status_message]) |
|
|
|
search_button.click( |
|
search_and_display, |
|
inputs=[query, country, articles_state], |
|
outputs=search_outputs, |
|
show_progress=True |
|
) |
|
|
|
iface.launch() |