import streamlit as st import pandas as pd import requests from bs4 import BeautifulSoup import re import time import nltk from nltk.tokenize import word_tokenize from nltk.corpus import stopwords from collections import Counter import json import os from datetime import datetime, timedelta from openai import OpenAI # 새로운 import 방식 from dotenv import load_dotenv import traceback import plotly.graph_objects as go import schedule import threading import matplotlib.pyplot as plt import kss # KoNLPy 대신 KSS 사용 from PIL import Image import base64 from io import BytesIO import logging # 로깅 설정 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('/tmp/crawler.log') ] ) # 워드클라우드 추가 try: from wordcloud import WordCloud except ImportError: st.error("wordcloud 패키지를 설치해주세요: pip install wordcloud") WordCloud = None # 스케줄러 상태 클래스 추가 class SchedulerState: def __init__(self): self.is_running = False self.thread = None self.last_run = None self.next_run = None self.scheduled_jobs = [] self.scheduled_results = [] # 전역 스케줄러 상태 객체 생성 (스레드 안에서 사용) global_scheduler_state = SchedulerState() # API 키 관리를 위한 세션 상태 초기화 if 'openai_client' not in st.session_state: st.session_state.openai_client = None # 여러 방법으로 API 키 로드 시도 load_dotenv() # .env 파일에서 로드 시도 # OpenAI 클라이언트 초기화를 위한 함수 def init_openai_client(api_key=None): try: if api_key: client = OpenAI(api_key=api_key) # 간단한 API 키 유효성 검사 client.models.list() # API 키가 유효한지 테스트 return client return None except Exception as e: st.error(f"API 키 초기화 오류: {str(e)}") return None # 1. 환경 변수에서 API 키 확인 api_key = os.environ.get('OPENAI_API_KEY') if api_key: st.session_state.openai_client = init_openai_client(api_key) # 2. Streamlit secrets에서 API 키 확인 if not st.session_state.openai_client: try: if 'OPENAI_API_KEY' in st.secrets: st.session_state.openai_client = init_openai_client(st.secrets['OPENAI_API_KEY']) except Exception as e: pass # secrets 파일이 없어도 오류 발생하지 않음 # NLTK 데이터 경로 설정 - 현재 워크스페이스의 nltk_data 사용 nltk_data_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'nltk_data') nltk.data.path.insert(0, nltk_data_path) # 필요한 NLTK 데이터 확인 try: nltk.data.find('tokenizers/punkt') except LookupError: nltk.download('punkt', download_dir=nltk_data_path) try: nltk.data.find('corpora/stopwords') except LookupError: nltk.download('stopwords', download_dir=nltk_data_path) # 페이지 설정 st.set_page_config(page_title="뉴스 기사 도구", page_icon="📰", layout="wide") # 사이드바에 API 키 입력 필드 추가 with st.sidebar: st.title("뉴스 기사 도구") menu = st.radio( "메뉴 선택", ["뉴스 기사 크롤링", "기사 분석하기", "새 기사 생성하기", "뉴스 기사 예약하기"] ) st.divider() api_key = st.text_input("OpenAI API 키 입력", type="password") if api_key: client = init_openai_client(api_key) if client: st.session_state.openai_client = client st.success("API 키가 성공적으로 설정되었습니다!") else: st.error("유효하지 않은 API 키입니다.") # 저장된 기사를 불러오는 함수 def load_saved_articles(): if os.path.exists('/tmp/saved_articles/articles.json'): with open('/tmp/saved_articles/articles.json', 'r', encoding='utf-8') as f: return json.load(f) return [] # 기사를 저장하는 함수 def save_articles(articles): os.makedirs('/tmp/saved_articles', exist_ok=True) with open('/tmp/saved_articles/articles.json', 'w', encoding='utf-8') as f: json.dump(articles, f, ensure_ascii=False, indent=2) @st.cache_data def crawl_naver_news(keyword, num_articles=5): """ 네이버 뉴스 기사를 수집하는 함수 """ logging.info(f"크롤링 시작: 키워드={keyword}, 기사 수={num_articles}") url = f"https://search.naver.com/search.naver?where=news&query={keyword}" results = [] try: # 페이지 요청 logging.info(f"요청 URL: {url}") response = requests.get(url) logging.info(f"응답 상태 코드: {response.status_code}") soup = BeautifulSoup(response.text, 'html.parser') # 뉴스 아이템 찾기 news_items = soup.select('div.sds-comps-base-layout.sds-comps-full-layout') logging.info(f"찾은 뉴스 아이템 수: {len(news_items)}") # 각 뉴스 아이템에서 정보 추출 for i, item in enumerate(news_items): if i >= num_articles: break try: # 제목과 링크 추출 title_element = item.select_one('a.X0fMYp2dHd0TCUS2hjww span') if not title_element: continue title = title_element.text.strip() link_element = item.select_one('a.X0fMYp2dHd0TCUS2hjww') link = link_element['href'] if link_element else "" # 언론사 추출 press_element = item.select_one('div.sds-comps-profile-info-title span.sds-comps-text-type-body2') source = press_element.text.strip() if press_element else "알 수 없음" # 날짜 추출 date_element = item.select_one('span.r0VOr') date = date_element.text.strip() if date_element else "알 수 없음" # 미리보기 내용 추출 desc_element = item.select_one('a.X0fMYp2dHd0TCUS2hjww.IaKmSOGPdofdPwPE6cyU > span') description = desc_element.text.strip() if desc_element else "내용 없음" results.append({ 'title': title, 'link': link, 'description': description, 'source': source, 'date': date, 'content': "" }) logging.info(f"기사 추출 성공: {title}") except Exception as e: logging.error(f"기사 정보 추출 중 오류 발생: {str(e)}", exc_info=True) continue except Exception as e: logging.error(f"페이지 요청 중 오류 발생: {str(e)}", exc_info=True) logging.info(f"크롤링 완료: {len(results)}개 기사 수집") return results # 기사 원문 가져오기 def get_article_content(url): logging.info(f"기사 원문 가져오기 시작: {url}") try: response = requests.get(url, timeout=5) logging.info(f"원문 요청 상태 코드: {response.status_code}") soup = BeautifulSoup(response.text, 'html.parser') # 네이버 뉴스 본문 찾기 content = soup.select_one('#dic_area') if content: text = content.text.strip() text = re.sub(r'\s+', ' ', text) logging.info("네이버 뉴스 본문 추출 성공") return text # 다른 뉴스 사이트 본문 찾기 content = soup.select_one('.article_body, .article-body, .article-content, .news-content-inner') if content: text = content.text.strip() text = re.sub(r'\s+', ' ', text) logging.info("일반 뉴스 본문 추출 성공") return text logging.warning("본문을 찾을 수 없음") return "본문을 가져올 수 없습니다." except Exception as e: logging.error(f"원문 가져오기 오류: {str(e)}", exc_info=True) return f"오류 발생: {str(e)}" # NLTK를 이용한 키워드 분석 (KSS 활용) def analyze_keywords(text, top_n=10): # 한국어 불용어 목록 korean_stopwords = ['이', '그', '저', '것', '및', '등', '를', '을', '에', '에서', '의', '으로', '로'] # KSS를 사용한 문장 분리 및 토큰화 try: sentences = kss.split_sentences(text) tokens = [] for sentence in sentences: # 간단한 토큰화 (공백 기준) tokens.extend(sentence.split()) except: # KSS 실패시 기본 토큰화 tokens = text.split() tokens = [word for word in tokens if word.isalnum() and len(word) > 1 and word not in korean_stopwords] word_count = Counter(tokens) top_keywords = word_count.most_common(top_n) return top_keywords #워드 클라우드용 분석 def extract_keywords_for_wordcloud(text, top_n=50): if not text or len(text.strip()) < 10: return {} try: try: tokens = word_tokenize(text.lower()) except Exception as e: st.warning(f"{str(e)} 오류발생") tokens = text.lower().split() stop_words = set() try: stop_words = set(stopwords.words('english')) except Exception: pass korea_stop_words = { '및', '등', '를', '이', '의', '가', '에', '는', '으로', '에서', '그', '또', '또는', '하는', '할', '하고', '있다', '이다', '위해', '것이다', '것은', '대한', '때문', '그리고', '하지만', '그러나', '그래서', '입니다', '합니다', '습니다', '요', '죠', '고', '과', '와', '도', '은', '수', '것', '들', '제', '저', '년', '월', '일', '시', '분', '초', '지난', '올해', '내년', '최근', '현재', '오늘', '내일', '어제', '오전', '오후', '부터', '까지', '에게', '께서', '이라고', '라고', '하며', '하면서', '따라', '통해', '관련', '한편', '특히', '가장', '매우', '더', '덜', '많이', '조금', '항상', '자주', '가끔', '거의', '전혀', '바로', '정말', '만약', '비롯한', '등을', '등이', '등의', '등과', '등도', '등에', '등에서', '기자', '뉴스', '사진', '연합뉴스', '뉴시스', '제공', '무단', '전재', '재배포', '금지', '앵커', '멘트', '일보', '데일리', '경제', '사회', '정치', '세계', '과학', '아이티', '닷컴', '씨넷', '블로터', '전자신문' } stop_words.update(korea_stop_words) # 1글자 이상이고 불용어가 아닌 토큰만 필터링 filtered_tokens = [word for word in tokens if len(word) > 1 and word not in stop_words] # 단어 빈도 계산 word_freq = {} for word in filtered_tokens: if word.isalnum(): # 알파벳과 숫자만 포함된 단어만 허용 word_freq[word] = word_freq.get(word, 0) + 1 # 빈도순으로 정렬하여 상위 n개 반환 sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True) if not sorted_words: return {"data": 1, "analysis": 1, "news": 1} return dict(sorted_words[:top_n]) except Exception as e: st.error(f"오류발생 {str(e)}") return {"data": 1, "analysis": 1, "news": 1} # 워드 클라우드 생성 함수 def generate_wordcloud(keywords_dict): if not WordCloud: st.warning("워드클라우드 설치안되어 있습니다.") return None try: # 기본 WordCloud 객체 (폰트 경로 없이) wc = WordCloud( width=800, height=400, background_color='white', colormap='viridis', max_font_size=150, random_state=42 ) try: import os script_dir = os.path.dirname(os.path.abspath(__file__)) # 사용자가 루트에 넣은 폰트 파일 이름을 지정합니다. # 만약 다른 이름의 폰트를 사용했다면 이 부분을 수정해주세요. (예: "YourFontName.ttf") possible_font_paths = ["NanumGothic.ttf"] font_path = None for path_segment in possible_font_paths: candidate = os.path.join(script_dir, path_segment) if os.path.exists(candidate): font_path = candidate break # font_path가 성공적으로 찾아진 경우에만 폰트 경로를 포함하여 WordCloud 재생성 if font_path: wc = WordCloud( font_path=font_path, width=800, height=400, background_color='white', colormap='viridis', max_font_size=150, random_state=42 ).generate_from_frequencies(keywords_dict) else: st.warning(f"지정된 한국어 글꼴 파일({', '.join(possible_font_paths)})을 스크립트 디렉터리에서 찾을 수 없습니다. 워드클라우드가 깨질 수 있습니다.") except Exception as e: print(f"글꼴 로딩 중 오류 발생: {str(e)}") st.warning(f"글꼴 로딩 중 예상치 못한 오류가 발생했습니다: {str(e)}") # 사용자에게도 경고 표시 # 최종적으로 wc 객체 반환 (폰트가 적용되었거나, 기본 객체이거나) return wc.generate_from_frequencies(keywords_dict) if isinstance(wc, WordCloud) else None except Exception as e: st.error(f"워드클라우드 생성 중 오류발생: {str(e)}") return None # 뉴스 분석 함수 def analyze_news_content(news_df): if news_df.empty: return "데이터가 없습니다" results = {} #카테고리별 if 'source' in news_df.columns: results['source_counts'] = news_df['source'].value_counts().to_dict() #카테고리별 if 'date' in news_df.columns: results['date_counts'] = news_df['date'].value_counts().to_dict() #키워드분석 all_text = " ".join(news_df['title'].fillna('') + " " + news_df['content'].fillna('')) if len(all_text.strip()) > 0: results['top_keywords_for_wordcloud']= extract_keywords_for_wordcloud(all_text, top_n=50) results['top_keywords'] = analyze_keywords(all_text) else: results['top_keywords_for_wordcloud']={} results['top_keywords'] = [] return results # OpenAI API를 이용한 새 기사 생성 (새로운 버전 방식) def generate_article(original_content, prompt_text): try: if not st.session_state.openai_client: return "OpenAI API 키가 설정되지 않았습니다." response = st.session_state.openai_client.chat.completions.create( model="gpt-4.1-nano", # 또는 사용 가능한 적절한 모델 messages=[ {"role": "system", "content": "당신은 전문적인 뉴스 기자입니다. 주어진 내용을 바탕으로 새로운 기사를 작성해주세요."}, {"role": "user", "content": f"다음 내용을 바탕으로 {prompt_text}\n\n{original_content[:1000]}"} ], max_tokens=2000 ) return response.choices[0].message.content except Exception as e: return f"기사 생성 오류: {str(e)}" # 여러 제목으로부터 기사 생성하는 함수 추가 def generate_article_from_titles(titles, prompt_text): try: if not st.session_state.openai_client: return "OpenAI API 키가 설정되지 않았습니다." titles_text = "\n".join([f"- {title}" for title in titles]) response = st.session_state.openai_client.chat.completions.create( model="gpt-4.1-nano", # 또는 사용 가능한 적절한 모델 messages=[ {"role": "system", "content": "당신은 전문적인 뉴스 기자입니다. 주어진 여러 뉴스 제목을 바탕으로 새로운 통합 기사를 작성해주세요."}, {"role": "user", "content": f"다음 뉴스 제목들을 바탕으로 {prompt_text}\n\n{titles_text}"} ], max_tokens=2000 ) return response.choices[0].message.content except Exception as e: return f"기사 생성 오류: {str(e)}" # OpenAI API를 이용한 이미지 생성 (새로운 버전 방식) def generate_image(prompt): try: if not st.session_state.openai_client: return "OpenAI API 키가 설정되지 않았습니다." # GPT Image 1 모델로 이미지 생성 result = st.session_state.openai_client.images.generate( model="gpt-image-1", # 새로운 모델명 사용 prompt=prompt, size="1024x1024" ) # base64 이미지 데이터를 디코딩 image_base64 = result.data[0].b64_json image_bytes = base64.b64decode(image_base64) # BytesIO 객체로 변환 image = BytesIO(image_bytes) # PIL Image로 변환하여 크기 조정 (선택사항) pil_image = Image.open(image) pil_image = pil_image.resize((800, 800), Image.LANCZOS) # 크기 조정 # 다시 BytesIO로 변환 output = BytesIO() pil_image.save(output, format="JPEG", quality=80, optimize=True) output.seek(0) return output except Exception as e: return f"이미지 생성 오류: {str(e)}" # 스케줄러 관련 함수들 def get_next_run_time(hour, minute): now = datetime.now() next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0) if next_run <= now: next_run += timedelta(days=1) return next_run def run_scheduled_task(): try: while global_scheduler_state.is_running: schedule.run_pending() time.sleep(1) except Exception as e: print(f"스케줄러 에러 발생: {e}") traceback.print_exc() def perform_news_task(task_type, keyword, num_articles, file_prefix): logging.info(f"스케줄러 작업 시작: {task_type}, 키워드={keyword}") try: articles = crawl_naver_news(keyword, num_articles) logging.info(f"수집된 기사 수: {len(articles)}") # 기사 내용 가져오기 for i, article in enumerate(articles): logging.info(f"기사 {i+1}/{len(articles)} 원문 가져오기: {article['title']}") article['content'] = get_article_content(article['link']) time.sleep(0.5) # 서버 부하 방지 # 결과 저장 os.makedirs('/tmp/scheduled_news', exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"/tmp/scheduled_news/{file_prefix}_{task_type}_{timestamp}.json" with open(filename, 'w', encoding='utf-8') as f: json.dump(articles, f, ensure_ascii=False, indent=2) logging.info(f"결과 저장 완료: {filename}") global_scheduler_state.last_run = datetime.now() print(f"{datetime.now()} - {task_type} 뉴스 기사 수집 완료: {keyword}") # 전역 상태에 수집 결과를 저장 result_item = { 'task_type': task_type, 'keyword': keyword, 'timestamp': timestamp, 'num_articles': len(articles), 'filename': filename } global_scheduler_state.scheduled_results.append(result_item) except Exception as e: logging.error(f"작업 실행 중 오류 발생: {str(e)}", exc_info=True) traceback.print_exc() def start_scheduler(daily_tasks, interval_tasks): if not global_scheduler_state.is_running: schedule.clear() global_scheduler_state.scheduled_jobs = [] # 일별 태스크 등록 for task in daily_tasks: hour = task['hour'] minute = task['minute'] keyword = task['keyword'] num_articles = task['num_articles'] job_id = f"daily_{keyword}_{hour}_{minute}" schedule.every().day.at(f"{hour:02d}:{minute:02d}").do( perform_news_task, "daily", keyword, num_articles, job_id ).tag(job_id) global_scheduler_state.scheduled_jobs.append({ 'id': job_id, 'type': 'daily', 'time': f"{hour:02d}:{minute:02d}", 'keyword': keyword, 'num_articles': num_articles }) # 시간 간격 태스크 등록 for task in interval_tasks: interval_minutes = task['interval_minutes'] keyword = task['keyword'] num_articles = task['num_articles'] run_immediately = task['run_immediately'] job_id = f"interval_{keyword}_{interval_minutes}" if run_immediately: # 즉시 실행 perform_news_task("interval", keyword, num_articles, job_id) # 분 간격으로 예약 schedule.every(interval_minutes).minutes.do( perform_news_task, "interval", keyword, num_articles, job_id ).tag(job_id) global_scheduler_state.scheduled_jobs.append({ 'id': job_id, 'type': 'interval', 'interval': f"{interval_minutes}분마다", 'keyword': keyword, 'num_articles': num_articles, 'run_immediately': run_immediately }) # 다음 실행 시간 계산 next_run = schedule.next_run() if next_run: global_scheduler_state.next_run = next_run # 스케줄러 쓰레드 시작 global_scheduler_state.is_running = True global_scheduler_state.thread = threading.Thread( target=run_scheduled_task, daemon=True ) global_scheduler_state.thread.start() # 상태를 세션 상태로도 복사 (UI 표시용) if 'scheduler_status' not in st.session_state: st.session_state.scheduler_status = {} st.session_state.scheduler_status = { 'is_running': global_scheduler_state.is_running, 'last_run': global_scheduler_state.last_run, 'next_run': global_scheduler_state.next_run, 'jobs_count': len(global_scheduler_state.scheduled_jobs) } def stop_scheduler(): if global_scheduler_state.is_running: global_scheduler_state.is_running = False schedule.clear() if global_scheduler_state.thread: global_scheduler_state.thread.join(timeout=1) global_scheduler_state.next_run = None global_scheduler_state.scheduled_jobs = [] # UI 상태 업데이트 if 'scheduler_status' in st.session_state: st.session_state.scheduler_status['is_running'] = False # 메뉴에 따른 화면 표시 if menu == "뉴스 기사 크롤링": st.header("뉴스 기사 크롤링") keyword = st.text_input("검색어 입력", "인공지능") num_articles = st.slider("가져올 기사 수", min_value=1, max_value=20, value=5) if st.button("기사 가져오기"): with st.spinner("기사를 수집 중입니다..."): articles = crawl_naver_news(keyword, num_articles) # 기사 내용 가져오기 for i, article in enumerate(articles): st.progress((i + 1) / len(articles)) article['content'] = get_article_content(article['link']) time.sleep(0.5) # 서버 부하 방지 # 결과 저장 및 표시 save_articles(articles) st.success(f"{len(articles)}개의 기사를 수집했습니다!") # 수집한 기사 표시 for article in articles: with st.expander(f"{article['title']} - {article['source']}"): st.write(f"**출처:** {article['source']}") st.write(f"**날짜:** {article['date']}") st.write(f"**요약:** {article['description']}") st.write(f"**링크:** {article['link']}") st.write("**본문 미리보기:**") st.write(article['content'][:300] + "..." if len(article['content']) > 300 else article['content']) elif menu == "기사 분석하기": st.header("기사 분석하기") articles = load_saved_articles() if not articles: st.warning("저장된 기사가 없습니다. 먼저 '뉴스 기사 크롤링' 메뉴에서 기사를 수집해주세요.") else: # 기사 선택 titles = [article['title'] for article in articles] selected_title = st.selectbox("분석할 기사 선택", titles) selected_article = next((a for a in articles if a['title'] == selected_title), None) if selected_article: st.write(f"**제목:** {selected_article['title']}") st.write(f"**출처:** {selected_article['source']}") # 본문 표시 with st.expander("기사 본문 보기"): st.write(selected_article['content']) # 분석 방법 선택 analysis_type = st.radio( "분석 방법", ["키워드 분석", "감정 분석", "텍스트 통계"] ) if analysis_type == "키워드 분석": if st.button("키워드 분석하기"): with st.spinner("키워드를 분석 중입니다..."): keyword_tab1, keyword_tab2 = st.tabs(["키워드 빈도", "워드클라우드"]) with keyword_tab1: keywords = analyze_keywords(selected_article['content']) # Plotly를 사용한 시각화 df = pd.DataFrame(keywords, columns=['단어', '빈도수']) fig = go.Figure(data=[ go.Bar( x=df['단어'], y=df['빈도수'], marker_color='rgb(55, 83, 109)' ) ]) fig.update_layout( title='키워드 빈도 분석', xaxis_title='키워드', yaxis_title='빈도수', height=500, margin=dict(l=50, r=50, t=80, b=50) ) st.plotly_chart(fig, use_container_width=True) st.write("**주요 키워드:**") for word, count in keywords: st.write(f"- {word}: {count}회") with keyword_tab2: keyword_dict = extract_keywords_for_wordcloud(selected_article['content']) wc = generate_wordcloud(keyword_dict) if wc: fig, ax = plt.subplots(figsize=(10, 5)) ax.imshow(wc, interpolation='bilinear') ax.axis('off') st.pyplot(fig) # 키워드 상위 20개 표시 st.write("**상위 20개 키워드:**") top_keywords = sorted(keyword_dict.items(), key=lambda x: x[1], reverse=True)[:20] keyword_df = pd.DataFrame(top_keywords, columns=['키워드', '빈도']) st.dataframe(keyword_df) else: st.error("워드클라우드를 생성할 수 없습니다.") elif analysis_type == "텍스트 통계": if st.button("텍스트 통계 분석"): content = selected_article['content'] # 텍스트 통계 계산 word_count = len(re.findall(r'\b\w+\b', content)) char_count = len(content) try: # KSS로 문장 분리 sentences = kss.split_sentences(content) sentence_count = len(sentences) except: # KSS 실패시 기본 문장 분리 sentence_count = len(re.split(r'[.!?]+', content)) avg_word_length = sum(len(word) for word in re.findall(r'\b\w+\b', content)) / word_count if word_count > 0 else 0 avg_sentence_length = word_count / sentence_count if sentence_count > 0 else 0 # 통계 표시 st.subheader("텍스트 통계") col1, col2, col3 = st.columns(3) with col1: st.metric("단어 수", f"{word_count:,}") with col2: st.metric("문자 수", f"{char_count:,}") with col3: st.metric("문장 수", f"{sentence_count:,}") col1, col2 = st.columns(2) with col1: st.metric("평균 단어 길이", f"{avg_word_length:.1f}자") with col2: st.metric("평균 문장 길이", f"{avg_sentence_length:.1f}단어") # 텍스트 복잡성 점수 complexity_score = min(10, (avg_sentence_length / 10) * 5 + (avg_word_length / 5) * 5) st.progress(complexity_score / 10) st.write(f"텍스트 복잡성 점수: {complexity_score:.1f}/10") # 품사 분석 부분 제거 (KoNLPy 의존성 제거) st.info("상세 품사 분석은 현재 지원되지 않습니다.") elif analysis_type == "감정 분석": if st.button("감정 분석하기"): if st.session_state.openai_client: with st.spinner("기사의 감정을 분석 중입니다..."): try: response = st.session_state.openai_client.chat.completions.create( model="gpt-4.1-mini", messages=[ {"role": "system", "content": """당신은 텍스트의 감정과 논조를 분석하는 전문가입니다. 다음 뉴스 기사의 감정과 논조를 분석하고, 반드시 아래 형식의 JSON으로 응답해주세요: { "sentiment": "긍정적/부정적/중립적", "reason": "이유 설명...", "keywords": [ {"word": "키워드1", "score": 8}, {"word": "키워드2", "score": 7} ] }"""}, {"role": "user", "content": f"다음 뉴스 기사를 분석해 주세요:\n\n제목: {selected_article['title']}\n\n내용: {selected_article['content'][:1500]}"} ], max_tokens=800, response_format={ "type": "json_object" } # JSON 응답 형식 강제 ) # 응답 내용 확인 및 디버깅 content = response.choices[0].message.content logging.info(f"API 응답: {content}") # JSON 파싱 try: analysis_result = json.loads(content) except json.JSONDecodeError as e: logging.error(f"JSON 파싱 오류: {str(e)}") logging.error(f"파싱 시도한 내용: {content}") st.error("API 응답을 파싱하는 중 오류가 발생했습니다. 응답 형식이 올바르지 않습니다.") st.stop() # return 대신 st.stop() 사용 # 결과 시각화 st.subheader("감정 분석 결과") # 1. 감정 타입에 따른 시각적 표현 sentiment_type = analysis_result.get('sentiment', '중립적') col1, col2, col3 = st.columns([1, 3, 1]) with col2: if sentiment_type == "긍정적": st.markdown(f"""

😀 긍정적 논조 😀

감정 강도: 높음

""", unsafe_allow_html=True) elif sentiment_type == "부정적": st.markdown(f"""

😞 부정적 논조 😞

감정 강도: 높음

""", unsafe_allow_html=True) else: st.markdown(f"""

😐 중립적 논조 😐

감정 강도: 중간

""", unsafe_allow_html=True) # 2. 이유 설명 st.markdown("### 분석 근거") st.markdown(f"
{analysis_result.get('reason', '')}
", unsafe_allow_html=True) # 3. 감정 키워드 시각화 st.markdown("### 핵심 감정 키워드") # 키워드 데이터 준비 keywords = analysis_result.get('keywords', []) if keywords: # 막대 차트용 데이터 keyword_names = [item.get('word', '') for item in keywords] keyword_scores = [item.get('score', 0) for item in keywords] # 레이더 차트 생성 fig = go.Figure() # 색상 설정 if sentiment_type == "긍정적": fill_color = 'rgba(76, 175, 80, 0.3)' # 연한 초록색 line_color = 'rgba(76, 175, 80, 1)' # 진한 초록색 elif sentiment_type == "부정적": fill_color = 'rgba(244, 67, 54, 0.3)' # 연한 빨간색 line_color = 'rgba(244, 67, 54, 1)' # 진한 빨간색 else: fill_color = 'rgba(158, 158, 158, 0.3)' # 연한 회색 line_color = 'rgba(158, 158, 158, 1)' # 진한 회색 # 레이더 차트 데이터 준비 - 마지막 점이 첫 점과 연결되도록 데이터 추가 radar_keywords = keyword_names.copy() radar_scores = keyword_scores.copy() # 레이더 차트 생성 fig.add_trace(go.Scatterpolar( r=radar_scores, theta=radar_keywords, fill='toself', fillcolor=fill_color, line=dict(color=line_color, width=2), name='감정 키워드' )) # 레이더 차트 레이아웃 설정 fig.update_layout( polar=dict( radialaxis=dict( visible=True, range=[0, 10], tickmode='linear', tick0=0, dtick=2 ) ), showlegend=False, title={ 'text': '감정 키워드 레이더 분석', 'y':0.95, 'x':0.5, 'xanchor': 'center', 'yanchor': 'top' }, height=500, width=500, margin=dict(l=80, r=80, t=80, b=80) ) # 차트 중앙에 표시 col1, col2, col3 = st.columns([1, 2, 1]) with col2: st.plotly_chart(fig) # 키워드 카드로 표시 st.markdown("#### 키워드 세부 설명") cols = st.columns(min(len(keywords), 5)) for i, keyword in enumerate(keywords): with cols[i % len(cols)]: word = keyword.get('word', '') score = keyword.get('score', 0) # 점수에 따른 색상 계산 r, g, b = 0, 0, 0 if sentiment_type == "긍정적": g = min(200 + score * 5, 255) r = max(255 - score * 20, 100) elif sentiment_type == "부정적": r = min(200 + score * 5, 255) g = max(255 - score * 20, 100) else: r = g = b = 128 # 카드 생성 st.markdown(f"""

{word}

강도: {score}/10

""", unsafe_allow_html=True) else: st.info("키워드를 추출하지 못했습니다.") # 4. 요약 통계 st.markdown("### 주요 통계") col1, col2, col3 = st.columns(3) with col1: st.metric(label="긍정/부정 점수", value=f"{7 if sentiment_type == '긍정적' else 3 if sentiment_type == '부정적' else 5}/10") with col2: st.metric(label="키워드 수", value=len(keywords)) with col3: avg_score = sum(keyword_scores) / len(keyword_scores) if keyword_scores else 0 st.metric(label="평균 강도", value=f"{avg_score:.1f}/10") except Exception as e: st.error(f"감정 분석 오류: {str(e)}") st.error(traceback.format_exc()) else: st.warning("OpenAI API 키를 사이드바에서 설정해주세요.") elif menu == "새 기사 생성하기": st.header("새 기사 생성하기") articles = load_saved_articles() if not articles: st.warning("저장된 기사가 없습니다. 먼저 '뉴스 기사 크롤링' 메뉴에서 기사를 수집해주세요.") else: # 탭 추가: 단일 기사로 생성 vs 다중 제목으로 생성 tab1, tab2 = st.tabs(["단일 기사로 생성", "여러 제목으로 생성"]) with tab1: # 기존 코드: 단일 기사 선택 titles = [article['title'] for article in articles] selected_title = st.selectbox("원본 기사 선택", titles, key="single_article") selected_article = next((a for a in articles if a['title'] == selected_title), None) if selected_article: st.write(f"**원본 제목:** {selected_article['title']}") with st.expander("원본 기사 내용"): st.write(selected_article['content']) prompt_text ="""다음 기사 양식을 따라서 다시 작성해줘. 역할: 당신은 신문사의 기자입니다. 작업: 최근 일어난 사건에 대한 보도자료를 작성해야 합니다. 자료는 사실을 기반으로 하며, 객관적이고 정확해야 합니다. 지침: 제공된 정보를 바탕으로 신문 보도자료 형식에 맞춰 기사를 작성하세요. 기사 제목은 주제를 명확히 반영하고 독자의 관심을 끌 수 있도록 작성합니다. 기사 내용은 정확하고 간결하며 설득력 있는 문장으로 구성합니다. 관련자의 인터뷰를 인용 형태로 넣어주세요. 위의 정보와 지침을 참고하여 신문 보도자료 형식의 기사를 작성해 주세요""" # 이미지 생성 여부 선택 옵션 추가 generate_image_too = st.checkbox("기사 생성 후 이미지도 함께 생성하기", value=True, key="single_image") if st.button("새 기사 생성하기", key="generate_single"): if st.session_state.openai_client: with st.spinner("기사를 생성 중입니다..."): new_article = generate_article(selected_article['content'], prompt_text) st.write("**생성된 기사:**") st.write(new_article) # 이미지 생성하기 (옵션이 선택된 경우) if generate_image_too: with st.spinner("기사 관련 이미지를 생성 중입니다..."): image_prompt = f"""신문기사 제목 "{selected_article['title']}" 을 보고 이미지를 만들어줘 이미지에는 다음 요소가 포함되어야 합니다: - 기사를 이해할 수 있는 도식 - 기사 내용과 관련된 텍스트 - 심플하게 처리 """ # 이미지 생성 image = generate_image(image_prompt) if isinstance(image, BytesIO): st.subheader("생성된 이미지:") st.image(image, use_column_width=True) else: st.error(image) # 생성된 기사 저장 옵션 if st.button("생성된 기사 저장", key="save_single"): new_article_data = { 'title': f"[생성됨] {selected_article['title']}", 'source': f"AI 생성 (원본: {selected_article['source']})", 'date': datetime.now().strftime("%Y-%m-%d %H:%M"), 'description': new_article[:100] + "...", 'link': "", 'content': new_article } articles.append(new_article_data) save_articles(articles) st.success("생성된 기사가 저장되었습니다!") else: st.warning("OpenAI API 키를 사이드바에서 설정해주세요.") with tab2: # 새로운 기능: 여러 제목으로 기사 생성 st.subheader("여러 제목을 기반으로 하나의 기사 생성하기") # 다중 선택 위젯으로 여러 제목 선택 가능 titles = [article['title'] for article in articles] selected_titles = st.multiselect("여러 기사 제목 선택 (2개 이상 권장)", titles) if selected_titles: st.write(f"**선택된 제목 수:** {len(selected_titles)}개") with st.expander("선택된 제목 목록"): for i, title in enumerate(selected_titles): st.write(f"{i+1}. {title}") multi_prompt_text = """다음 뉴스 제목들을 종합하여 하나의 통합된 기사로 작성해줘. 역할: 당신은 신문사의 기자입니다. 작업: 여러 뉴스 제목에서 공통 주제를 파악하고, 이를 종합한 보도자료를 작성해야 합니다. 지침: - 제공된 여러 제목을 종합적으로 분석하여 하나의 일관된 기사를 작성하세요. - 기사 제목은 제공된 모든 제목의 핵심 주제를 담아야 합니다. - 기사 내용은 제목들이 다루는 모든 주요 주제를 포함해야 합니다. - 관련자의 가상 인터뷰를 인용 형태로 넣어주세요. - 제공된 제목들의 맥락을 유지하면서 일관성 있는 내러티브를 구성하세요.""" # 프롬프트 편집 옵션 custom_prompt = st.checkbox("직접 프롬프트 작성하기") if custom_prompt: multi_prompt_text = st.text_area("프롬프트 직접 입력", multi_prompt_text, height=250) # 이미지 생성 옵션 generate_multi_image = st.checkbox("기사 생성 후 이미지도 함께 생성하기", value=True, key="multi_image") if st.button("새 기사 생성하기", key="generate_multi"): if st.session_state.openai_client: if len(selected_titles) < 1: st.error("최소 1개 이상의 제목을 선택해주세요.") else: with st.spinner("여러 제목으로부터 기사를 생성 중입니다..."): # 선택된 제목들을 이용하여 새 기사 생성 new_article = generate_article_from_titles(selected_titles, multi_prompt_text) st.write("**생성된 기사:**") st.write(new_article) # 이미지 생성 (옵션이 선택된 경우) if generate_multi_image: with st.spinner("기사 관련 이미지를 생성 중입니다..."): combined_titles = " / ".join(selected_titles[:3]) # 처음 3개 제목만 사용 image_prompt = f"""여러 뉴스를 종합한 기사 "{combined_titles}" 관련 이미지를 만들어줘. 이미지에는 다음 요소가 포함되어야 합니다: - 여러 뉴스의 공통 주제를 시각화한 도식 - 핵심 키워드나 개념 - 심플하고 통합된 디자인 """ # 이미지 생성 image = generate_image(image_prompt) if isinstance(image, BytesIO): st.subheader("생성된 이미지:") st.image(image, use_column_width=True) else: st.error(image) # 생성된 기사 저장 옵션 if st.button("생성된 기사 저장", key="save_multi"): # 통합 제목 생성 (첫 번째 제목 + 추가 제목 수) if len(selected_titles) > 1: combined_title = f"{selected_titles[0]} 외 {len(selected_titles)-1}건 관련 소식" else: combined_title = selected_titles[0] new_article_data = { 'title': f"[여러 제목 통합] {combined_title}", 'source': "AI 생성 (여러 제목 통합)", 'date': datetime.now().strftime("%Y-%m-%d %H:%M"), 'description': new_article[:100] + "...", 'link': "", 'content': new_article } articles.append(new_article_data) save_articles(articles) st.success("생성된 기사가 저장되었습니다!") else: st.warning("OpenAI API 키를 사이드바에서 설정해주세요.") elif menu == "뉴스 기사 예약하기": st.header("뉴스 기사 예약하기") # 탭 생성 tab1, tab2, tab3 = st.tabs(["일별 예약", "시간 간격 예약", "스케줄러 상태"]) # 일별 예약 탭 with tab1: st.subheader("매일 정해진 시간에 기사 수집하기") # 키워드 입력 daily_keyword = st.text_input("검색 키워드", value="인공지능", key="daily_keyword") daily_num_articles = st.slider("수집할 기사 수", min_value=1, max_value=20, value=5, key="daily_num_articles") # 시간 설정 daily_col1, daily_col2 = st.columns(2) with daily_col1: daily_hour = st.selectbox("시", range(24), format_func=lambda x: f"{x:02d}시", key="daily_hour") with daily_col2: daily_minute = st.selectbox("분", range(0, 60, 5), format_func=lambda x: f"{x:02d}분", key="daily_minute") # 일별 예약 리스트 if 'daily_tasks' not in st.session_state: st.session_state.daily_tasks = [] if st.button("일별 예약 추가"): st.session_state.daily_tasks.append({ 'hour': daily_hour, 'minute': daily_minute, 'keyword': daily_keyword, 'num_articles': daily_num_articles }) st.success(f"일별 예약이 추가되었습니다: 매일 {daily_hour:02d}:{daily_minute:02d} - '{daily_keyword}'") # 예약 목록 표시 if st.session_state.daily_tasks: st.subheader("일별 예약 목록") for i, task in enumerate(st.session_state.daily_tasks): st.write(f"{i+1}. 매일 {task['hour']:02d}:{task['minute']:02d} - '{task['keyword']}' ({task['num_articles']}개)") if st.button("일별 예약 초기화"): st.session_state.daily_tasks = [] st.warning("일별 예약이 모두 초기화되었습니다.") # 시간 간격 예약 탭 with tab2: st.subheader("시간 간격으로 기사 수집하기") # 키워드 입력 interval_keyword = st.text_input("검색 키워드", value="빅데이터", key="interval_keyword") interval_num_articles = st.slider("수집할 기사 수", min_value=1, max_value=20, value=5, key="interval_num_articles") # 시간 간격 설정 interval_minutes = st.number_input("실행 간격(분)", min_value=1, max_value=60*24, value=30, key="interval_minutes") # 즉시 실행 여부 run_immediately = st.checkbox("즉시 실행", value=True, help="체크하면 스케줄러 시작 시 즉시 실행합니다.") # 시간 간격 예약 리스트 if 'interval_tasks' not in st.session_state: st.session_state.interval_tasks = [] if st.button("시간 간격 예약 추가"): st.session_state.interval_tasks.append({ 'interval_minutes': interval_minutes, 'keyword': interval_keyword, 'num_articles': interval_num_articles, 'run_immediately': run_immediately }) st.success(f"시간 간격 예약이 추가되었습니다: {interval_minutes}분마다 - '{interval_keyword}'") # 예약 목록 표시 if st.session_state.interval_tasks: st.subheader("시간 간격 예약 목록") for i, task in enumerate(st.session_state.interval_tasks): immediate_text = "즉시 실행 후 " if task['run_immediately'] else "" st.write(f"{i+1}. {immediate_text}{task['interval_minutes']}분마다 - '{task['keyword']}' ({task['num_articles']}개)") if st.button("시간 간격 예약 초기화"): st.session_state.interval_tasks = [] st.warning("시간 간격 예약이 모두 초기화되었습니다.") # 스케줄러 상태 탭 with tab3: st.subheader("스케줄러 제어 및 상태") # 로그 뷰어를 상단에 배치 st.subheader("실시간 로그") log_container = st.empty() def update_logs(): try: with open('/tmp/crawler.log', 'r') as f: logs = f.readlines() return ''.join(logs[-100:]) # 최근 100줄만 표시 except Exception as e: return f"로그 파일을 읽을 수 없습니다: {str(e)}" # 로그 자동 업데이트 if st.checkbox("로그 자동 업데이트", value=True): log_content = update_logs() log_container.text_area("최근 로그", value=log_content, height=400) else: if st.button("로그 새로고침"): log_content = update_logs() log_container.text_area("최근 로그", value=log_content, height=400) st.divider() # 스케줄러 제어 col1, col2 = st.columns(2) with col1: # 스케줄러 시작/중지 버튼 if not global_scheduler_state.is_running: if st.button("스케줄러 시작"): if not st.session_state.daily_tasks and not st.session_state.interval_tasks: st.error("예약된 작업이 없습니다. 먼저 일별 예약 또는 시간 간격 예약을 추가해주세요.") else: start_scheduler(st.session_state.daily_tasks, st.session_state.interval_tasks) st.success("스케줄러가 시작되었습니다.") else: if st.button("스케줄러 중지"): stop_scheduler() st.warning("스케줄러가 중지되었습니다.") with col2: # 스케줄러 상태 표시 if 'scheduler_status' in st.session_state: st.write(f"상태: {'실행중' if global_scheduler_state.is_running else '중지'}") if global_scheduler_state.last_run: st.write(f"마지막 실행: {global_scheduler_state.last_run.strftime('%Y-%m-%d %H:%M:%S')}") if global_scheduler_state.next_run and global_scheduler_state.is_running: st.write(f"다음 실행: {global_scheduler_state.next_run.strftime('%Y-%m-%d %H:%M:%S')}") else: st.write("상태: 중지") # 예약된 작업 목록 if global_scheduler_state.scheduled_jobs: st.subheader("현재 실행 중인 예약 작업") for i, job in enumerate(global_scheduler_state.scheduled_jobs): if job['type'] == 'daily': st.write(f"{i+1}. [일별] 매일 {job['time']} - '{job['keyword']}' ({job['num_articles']}개)") else: immediate_text = "[즉시 실행 후] " if job.get('run_immediately', False) else "" st.write(f"{i+1}. [간격] {immediate_text}{job['interval']} - '{job['keyword']}' ({job['num_articles']}개)") # 스케줄러 실행 결과 if global_scheduler_state.scheduled_results: st.subheader("스케줄러 실행 결과") # 결과를 UI에 표시하기 전에 복사 results_for_display = global_scheduler_state.scheduled_results.copy() if results_for_display: result_df = pd.DataFrame(results_for_display) result_df['실행시간'] = result_df['timestamp'].apply(lambda x: datetime.strptime(x, "%Y%m%d_%H%M%S").strftime("%Y-%m-%d %H:%M:%S")) result_df = result_df.rename(columns={ 'task_type': '작업유형', 'keyword': '키워드', 'num_articles': '기사수', 'filename': '파일명' }) result_df['작업유형'] = result_df['작업유형'].apply(lambda x: '일별' if x == 'daily' else '시간간격') st.dataframe( result_df[['작업유형', '키워드', '기사수', '실행시간', '파일명']], hide_index=True ) # 수집된 파일 보기 if os.path.exists('/tmp/scheduled_news'): files = [f for f in os.listdir('/tmp/scheduled_news') if f.endswith('.json')] if files: st.subheader("수집된 파일 열기") selected_file = st.selectbox("파일 선택", files, index=len(files)-1) if selected_file and st.button("파일 내용 보기"): with open(os.path.join('/tmp/scheduled_news', selected_file), 'r', encoding='utf-8') as f: articles = json.load(f) st.write(f"**파일명:** {selected_file}") st.write(f"**수집 기사 수:** {len(articles)}개") for article in articles: with st.expander(f"{article['title']} - {article['source']}"): st.write(f"**출처:** {article['source']}") st.write(f"**날짜:** {article['date']}") st.write(f"**링크:** {article['link']}") st.write("**본문:**") st.write(article['content'][:500] + "..." if len(article['content']) > 500 else article['content']) # 푸터 st.markdown("---") st.markdown("© 뉴스 기사 도구 @conanssam")