import streamlit as st import pandas as pd import requests from bs4 import BeautifulSoup import re import time import nltk from nltk.tokenize import word_tokenize from nltk.corpus import stopwords from collections import Counter import json import os from datetime import datetime, timedelta from openai import OpenAI # 새로운 import 방식 from dotenv import load_dotenv import traceback import plotly.graph_objects as go import schedule import threading import matplotlib.pyplot as plt import kss # KoNLPy 대신 KSS 사용 from PIL import Image import base64 from io import BytesIO import logging # 로깅 설정 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('/tmp/crawler.log') ] ) # 워드클라우드 추가 try: from wordcloud import WordCloud except ImportError: st.error("wordcloud 패키지를 설치해주세요: pip install wordcloud") WordCloud = None # 스케줄러 상태 클래스 추가 class SchedulerState: def __init__(self): self.is_running = False self.thread = None self.last_run = None self.next_run = None self.scheduled_jobs = [] self.scheduled_results = [] # 전역 스케줄러 상태 객체 생성 (스레드 안에서 사용) global_scheduler_state = SchedulerState() # API 키 관리를 위한 세션 상태 초기화 if 'openai_client' not in st.session_state: st.session_state.openai_client = None # 여러 방법으로 API 키 로드 시도 load_dotenv() # .env 파일에서 로드 시도 # OpenAI 클라이언트 초기화를 위한 함수 def init_openai_client(api_key=None): try: if api_key: client = OpenAI(api_key=api_key) # 간단한 API 키 유효성 검사 client.models.list() # API 키가 유효한지 테스트 return client return None except Exception as e: st.error(f"API 키 초기화 오류: {str(e)}") return None # 1. 환경 변수에서 API 키 확인 api_key = os.environ.get('OPENAI_API_KEY') if api_key: st.session_state.openai_client = init_openai_client(api_key) # 2. Streamlit secrets에서 API 키 확인 if not st.session_state.openai_client: try: if 'OPENAI_API_KEY' in st.secrets: st.session_state.openai_client = init_openai_client(st.secrets['OPENAI_API_KEY']) except Exception as e: pass # secrets 파일이 없어도 오류 발생하지 않음 # NLTK 데이터 경로 설정 - 현재 워크스페이스의 nltk_data 사용 nltk_data_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'nltk_data') nltk.data.path.insert(0, nltk_data_path) # 필요한 NLTK 데이터 확인 try: nltk.data.find('tokenizers/punkt') except LookupError: nltk.download('punkt', download_dir=nltk_data_path) try: nltk.data.find('corpora/stopwords') except LookupError: nltk.download('stopwords', download_dir=nltk_data_path) # 페이지 설정 st.set_page_config(page_title="뉴스 기사 도구", page_icon="📰", layout="wide") # 사이드바에 API 키 입력 필드 추가 with st.sidebar: st.title("뉴스 기사 도구") menu = st.radio( "메뉴 선택", ["뉴스 기사 크롤링", "기사 분석하기", "새 기사 생성하기", "뉴스 기사 예약하기"] ) st.divider() api_key = st.text_input("OpenAI API 키 입력", type="password") if api_key: client = init_openai_client(api_key) if client: st.session_state.openai_client = client st.success("API 키가 성공적으로 설정되었습니다!") else: st.error("유효하지 않은 API 키입니다.") # 저장된 기사를 불러오는 함수 def load_saved_articles(): if os.path.exists('/tmp/saved_articles/articles.json'): with open('/tmp/saved_articles/articles.json', 'r', encoding='utf-8') as f: return json.load(f) return [] # 기사를 저장하는 함수 def save_articles(articles): os.makedirs('/tmp/saved_articles', exist_ok=True) with open('/tmp/saved_articles/articles.json', 'w', encoding='utf-8') as f: json.dump(articles, f, ensure_ascii=False, indent=2) @st.cache_data def crawl_naver_news(keyword, num_articles=5): """ 네이버 뉴스 기사를 수집하는 함수 """ logging.info(f"크롤링 시작: 키워드={keyword}, 기사 수={num_articles}") url = f"https://search.naver.com/search.naver?where=news&query={keyword}" results = [] try: # 페이지 요청 logging.info(f"요청 URL: {url}") response = requests.get(url) logging.info(f"응답 상태 코드: {response.status_code}") soup = BeautifulSoup(response.text, 'html.parser') # 뉴스 아이템 찾기 news_items = soup.select('div.sds-comps-base-layout.sds-comps-full-layout') logging.info(f"찾은 뉴스 아이템 수: {len(news_items)}") # 각 뉴스 아이템에서 정보 추출 for i, item in enumerate(news_items): if i >= num_articles: break try: # 제목과 링크 추출 title_element = item.select_one('a.X0fMYp2dHd0TCUS2hjww span') if not title_element: continue title = title_element.text.strip() link_element = item.select_one('a.X0fMYp2dHd0TCUS2hjww') link = link_element['href'] if link_element else "" # 언론사 추출 press_element = item.select_one('div.sds-comps-profile-info-title span.sds-comps-text-type-body2') source = press_element.text.strip() if press_element else "알 수 없음" # 날짜 추출 date_element = item.select_one('span.r0VOr') date = date_element.text.strip() if date_element else "알 수 없음" # 미리보기 내용 추출 desc_element = item.select_one('a.X0fMYp2dHd0TCUS2hjww.IaKmSOGPdofdPwPE6cyU > span') description = desc_element.text.strip() if desc_element else "내용 없음" results.append({ 'title': title, 'link': link, 'description': description, 'source': source, 'date': date, 'content': "" }) logging.info(f"기사 추출 성공: {title}") except Exception as e: logging.error(f"기사 정보 추출 중 오류 발생: {str(e)}", exc_info=True) continue except Exception as e: logging.error(f"페이지 요청 중 오류 발생: {str(e)}", exc_info=True) logging.info(f"크롤링 완료: {len(results)}개 기사 수집") return results # 기사 원문 가져오기 def get_article_content(url): logging.info(f"기사 원문 가져오기 시작: {url}") try: response = requests.get(url, timeout=5) logging.info(f"원문 요청 상태 코드: {response.status_code}") soup = BeautifulSoup(response.text, 'html.parser') # 네이버 뉴스 본문 찾기 content = soup.select_one('#dic_area') if content: text = content.text.strip() text = re.sub(r'\s+', ' ', text) logging.info("네이버 뉴스 본문 추출 성공") return text # 다른 뉴스 사이트 본문 찾기 content = soup.select_one('.article_body, .article-body, .article-content, .news-content-inner') if content: text = content.text.strip() text = re.sub(r'\s+', ' ', text) logging.info("일반 뉴스 본문 추출 성공") return text logging.warning("본문을 찾을 수 없음") return "본문을 가져올 수 없습니다." except Exception as e: logging.error(f"원문 가져오기 오류: {str(e)}", exc_info=True) return f"오류 발생: {str(e)}" # NLTK를 이용한 키워드 분석 (KSS 활용) def analyze_keywords(text, top_n=10): # 한국어 불용어 목록 korean_stopwords = ['이', '그', '저', '것', '및', '등', '를', '을', '에', '에서', '의', '으로', '로'] # KSS를 사용한 문장 분리 및 토큰화 try: sentences = kss.split_sentences(text) tokens = [] for sentence in sentences: # 간단한 토큰화 (공백 기준) tokens.extend(sentence.split()) except: # KSS 실패시 기본 토큰화 tokens = text.split() tokens = [word for word in tokens if word.isalnum() and len(word) > 1 and word not in korean_stopwords] word_count = Counter(tokens) top_keywords = word_count.most_common(top_n) return top_keywords #워드 클라우드용 분석 def extract_keywords_for_wordcloud(text, top_n=50): if not text or len(text.strip()) < 10: return {} try: try: tokens = word_tokenize(text.lower()) except Exception as e: st.warning(f"{str(e)} 오류발생") tokens = text.lower().split() stop_words = set() try: stop_words = set(stopwords.words('english')) except Exception: pass korea_stop_words = { '및', '등', '를', '이', '의', '가', '에', '는', '으로', '에서', '그', '또', '또는', '하는', '할', '하고', '있다', '이다', '위해', '것이다', '것은', '대한', '때문', '그리고', '하지만', '그러나', '그래서', '입니다', '합니다', '습니다', '요', '죠', '고', '과', '와', '도', '은', '수', '것', '들', '제', '저', '년', '월', '일', '시', '분', '초', '지난', '올해', '내년', '최근', '현재', '오늘', '내일', '어제', '오전', '오후', '부터', '까지', '에게', '께서', '이라고', '라고', '하며', '하면서', '따라', '통해', '관련', '한편', '특히', '가장', '매우', '더', '덜', '많이', '조금', '항상', '자주', '가끔', '거의', '전혀', '바로', '정말', '만약', '비롯한', '등을', '등이', '등의', '등과', '등도', '등에', '등에서', '기자', '뉴스', '사진', '연합뉴스', '뉴시스', '제공', '무단', '전재', '재배포', '금지', '앵커', '멘트', '일보', '데일리', '경제', '사회', '정치', '세계', '과학', '아이티', '닷컴', '씨넷', '블로터', '전자신문' } stop_words.update(korea_stop_words) # 1글자 이상이고 불용어가 아닌 토큰만 필터링 filtered_tokens = [word for word in tokens if len(word) > 1 and word not in stop_words] # 단어 빈도 계산 word_freq = {} for word in filtered_tokens: if word.isalnum(): # 알파벳과 숫자만 포함된 단어만 허용 word_freq[word] = word_freq.get(word, 0) + 1 # 빈도순으로 정렬하여 상위 n개 반환 sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True) if not sorted_words: return {"data": 1, "analysis": 1, "news": 1} return dict(sorted_words[:top_n]) except Exception as e: st.error(f"오류발생 {str(e)}") return {"data": 1, "analysis": 1, "news": 1} # 워드 클라우드 생성 함수 def generate_wordcloud(keywords_dict): if not WordCloud: st.warning("워드클라우드 설치안되어 있습니다.") return None try: # 기본 WordCloud 객체 (폰트 경로 없이) wc = WordCloud( width=800, height=400, background_color='white', colormap='viridis', max_font_size=150, random_state=42 ) try: import os script_dir = os.path.dirname(os.path.abspath(__file__)) # 사용자가 루트에 넣은 폰트 파일 이름을 지정합니다. # 만약 다른 이름의 폰트를 사용했다면 이 부분을 수정해주세요. (예: "YourFontName.ttf") possible_font_paths = ["NanumGothic.ttf"] font_path = None for path_segment in possible_font_paths: candidate = os.path.join(script_dir, path_segment) if os.path.exists(candidate): font_path = candidate break # font_path가 성공적으로 찾아진 경우에만 폰트 경로를 포함하여 WordCloud 재생성 if font_path: wc = WordCloud( font_path=font_path, width=800, height=400, background_color='white', colormap='viridis', max_font_size=150, random_state=42 ).generate_from_frequencies(keywords_dict) else: st.warning(f"지정된 한국어 글꼴 파일({', '.join(possible_font_paths)})을 스크립트 디렉터리에서 찾을 수 없습니다. 워드클라우드가 깨질 수 있습니다.") except Exception as e: print(f"글꼴 로딩 중 오류 발생: {str(e)}") st.warning(f"글꼴 로딩 중 예상치 못한 오류가 발생했습니다: {str(e)}") # 사용자에게도 경고 표시 # 최종적으로 wc 객체 반환 (폰트가 적용되었거나, 기본 객체이거나) return wc.generate_from_frequencies(keywords_dict) if isinstance(wc, WordCloud) else None except Exception as e: st.error(f"워드클라우드 생성 중 오류발생: {str(e)}") return None # 뉴스 분석 함수 def analyze_news_content(news_df): if news_df.empty: return "데이터가 없습니다" results = {} #카테고리별 if 'source' in news_df.columns: results['source_counts'] = news_df['source'].value_counts().to_dict() #카테고리별 if 'date' in news_df.columns: results['date_counts'] = news_df['date'].value_counts().to_dict() #키워드분석 all_text = " ".join(news_df['title'].fillna('') + " " + news_df['content'].fillna('')) if len(all_text.strip()) > 0: results['top_keywords_for_wordcloud']= extract_keywords_for_wordcloud(all_text, top_n=50) results['top_keywords'] = analyze_keywords(all_text) else: results['top_keywords_for_wordcloud']={} results['top_keywords'] = [] return results # OpenAI API를 이용한 새 기사 생성 (새로운 버전 방식) def generate_article(original_content, prompt_text): try: if not st.session_state.openai_client: return "OpenAI API 키가 설정되지 않았습니다." response = st.session_state.openai_client.chat.completions.create( model="gpt-4.1-nano", # 또는 사용 가능한 적절한 모델 messages=[ {"role": "system", "content": "당신은 전문적인 뉴스 기자입니다. 주어진 내용을 바탕으로 새로운 기사를 작성해주세요."}, {"role": "user", "content": f"다음 내용을 바탕으로 {prompt_text}\n\n{original_content[:1000]}"} ], max_tokens=2000 ) return response.choices[0].message.content except Exception as e: return f"기사 생성 오류: {str(e)}" # 여러 제목으로부터 기사 생성하는 함수 추가 def generate_article_from_titles(titles, prompt_text): try: if not st.session_state.openai_client: return "OpenAI API 키가 설정되지 않았습니다." titles_text = "\n".join([f"- {title}" for title in titles]) response = st.session_state.openai_client.chat.completions.create( model="gpt-4.1-nano", # 또는 사용 가능한 적절한 모델 messages=[ {"role": "system", "content": "당신은 전문적인 뉴스 기자입니다. 주어진 여러 뉴스 제목을 바탕으로 새로운 통합 기사를 작성해주세요."}, {"role": "user", "content": f"다음 뉴스 제목들을 바탕으로 {prompt_text}\n\n{titles_text}"} ], max_tokens=2000 ) return response.choices[0].message.content except Exception as e: return f"기사 생성 오류: {str(e)}" # OpenAI API를 이용한 이미지 생성 (새로운 버전 방식) def generate_image(prompt): try: if not st.session_state.openai_client: return "OpenAI API 키가 설정되지 않았습니다." # GPT Image 1 모델로 이미지 생성 result = st.session_state.openai_client.images.generate( model="gpt-image-1", # 새로운 모델명 사용 prompt=prompt, size="1024x1024" ) # base64 이미지 데이터를 디코딩 image_base64 = result.data[0].b64_json image_bytes = base64.b64decode(image_base64) # BytesIO 객체로 변환 image = BytesIO(image_bytes) # PIL Image로 변환하여 크기 조정 (선택사항) pil_image = Image.open(image) pil_image = pil_image.resize((800, 800), Image.LANCZOS) # 크기 조정 # 다시 BytesIO로 변환 output = BytesIO() pil_image.save(output, format="JPEG", quality=80, optimize=True) output.seek(0) return output except Exception as e: return f"이미지 생성 오류: {str(e)}" # 스케줄러 관련 함수들 def get_next_run_time(hour, minute): now = datetime.now() next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0) if next_run <= now: next_run += timedelta(days=1) return next_run def run_scheduled_task(): try: while global_scheduler_state.is_running: schedule.run_pending() time.sleep(1) except Exception as e: print(f"스케줄러 에러 발생: {e}") traceback.print_exc() def perform_news_task(task_type, keyword, num_articles, file_prefix): logging.info(f"스케줄러 작업 시작: {task_type}, 키워드={keyword}") try: articles = crawl_naver_news(keyword, num_articles) logging.info(f"수집된 기사 수: {len(articles)}") # 기사 내용 가져오기 for i, article in enumerate(articles): logging.info(f"기사 {i+1}/{len(articles)} 원문 가져오기: {article['title']}") article['content'] = get_article_content(article['link']) time.sleep(0.5) # 서버 부하 방지 # 결과 저장 os.makedirs('/tmp/scheduled_news', exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"/tmp/scheduled_news/{file_prefix}_{task_type}_{timestamp}.json" with open(filename, 'w', encoding='utf-8') as f: json.dump(articles, f, ensure_ascii=False, indent=2) logging.info(f"결과 저장 완료: {filename}") global_scheduler_state.last_run = datetime.now() print(f"{datetime.now()} - {task_type} 뉴스 기사 수집 완료: {keyword}") # 전역 상태에 수집 결과를 저장 result_item = { 'task_type': task_type, 'keyword': keyword, 'timestamp': timestamp, 'num_articles': len(articles), 'filename': filename } global_scheduler_state.scheduled_results.append(result_item) except Exception as e: logging.error(f"작업 실행 중 오류 발생: {str(e)}", exc_info=True) traceback.print_exc() def start_scheduler(daily_tasks, interval_tasks): if not global_scheduler_state.is_running: schedule.clear() global_scheduler_state.scheduled_jobs = [] # 일별 태스크 등록 for task in daily_tasks: hour = task['hour'] minute = task['minute'] keyword = task['keyword'] num_articles = task['num_articles'] job_id = f"daily_{keyword}_{hour}_{minute}" schedule.every().day.at(f"{hour:02d}:{minute:02d}").do( perform_news_task, "daily", keyword, num_articles, job_id ).tag(job_id) global_scheduler_state.scheduled_jobs.append({ 'id': job_id, 'type': 'daily', 'time': f"{hour:02d}:{minute:02d}", 'keyword': keyword, 'num_articles': num_articles }) # 시간 간격 태스크 등록 for task in interval_tasks: interval_minutes = task['interval_minutes'] keyword = task['keyword'] num_articles = task['num_articles'] run_immediately = task['run_immediately'] job_id = f"interval_{keyword}_{interval_minutes}" if run_immediately: # 즉시 실행 perform_news_task("interval", keyword, num_articles, job_id) # 분 간격으로 예약 schedule.every(interval_minutes).minutes.do( perform_news_task, "interval", keyword, num_articles, job_id ).tag(job_id) global_scheduler_state.scheduled_jobs.append({ 'id': job_id, 'type': 'interval', 'interval': f"{interval_minutes}분마다", 'keyword': keyword, 'num_articles': num_articles, 'run_immediately': run_immediately }) # 다음 실행 시간 계산 next_run = schedule.next_run() if next_run: global_scheduler_state.next_run = next_run # 스케줄러 쓰레드 시작 global_scheduler_state.is_running = True global_scheduler_state.thread = threading.Thread( target=run_scheduled_task, daemon=True ) global_scheduler_state.thread.start() # 상태를 세션 상태로도 복사 (UI 표시용) if 'scheduler_status' not in st.session_state: st.session_state.scheduler_status = {} st.session_state.scheduler_status = { 'is_running': global_scheduler_state.is_running, 'last_run': global_scheduler_state.last_run, 'next_run': global_scheduler_state.next_run, 'jobs_count': len(global_scheduler_state.scheduled_jobs) } def stop_scheduler(): if global_scheduler_state.is_running: global_scheduler_state.is_running = False schedule.clear() if global_scheduler_state.thread: global_scheduler_state.thread.join(timeout=1) global_scheduler_state.next_run = None global_scheduler_state.scheduled_jobs = [] # UI 상태 업데이트 if 'scheduler_status' in st.session_state: st.session_state.scheduler_status['is_running'] = False # 메뉴에 따른 화면 표시 if menu == "뉴스 기사 크롤링": st.header("뉴스 기사 크롤링") keyword = st.text_input("검색어 입력", "인공지능") num_articles = st.slider("가져올 기사 수", min_value=1, max_value=20, value=5) if st.button("기사 가져오기"): with st.spinner("기사를 수집 중입니다..."): articles = crawl_naver_news(keyword, num_articles) # 기사 내용 가져오기 for i, article in enumerate(articles): st.progress((i + 1) / len(articles)) article['content'] = get_article_content(article['link']) time.sleep(0.5) # 서버 부하 방지 # 결과 저장 및 표시 save_articles(articles) st.success(f"{len(articles)}개의 기사를 수집했습니다!") # 수집한 기사 표시 for article in articles: with st.expander(f"{article['title']} - {article['source']}"): st.write(f"**출처:** {article['source']}") st.write(f"**날짜:** {article['date']}") st.write(f"**요약:** {article['description']}") st.write(f"**링크:** {article['link']}") st.write("**본문 미리보기:**") st.write(article['content'][:300] + "..." if len(article['content']) > 300 else article['content']) elif menu == "기사 분석하기": st.header("기사 분석하기") articles = load_saved_articles() if not articles: st.warning("저장된 기사가 없습니다. 먼저 '뉴스 기사 크롤링' 메뉴에서 기사를 수집해주세요.") else: # 기사 선택 titles = [article['title'] for article in articles] selected_title = st.selectbox("분석할 기사 선택", titles) selected_article = next((a for a in articles if a['title'] == selected_title), None) if selected_article: st.write(f"**제목:** {selected_article['title']}") st.write(f"**출처:** {selected_article['source']}") # 본문 표시 with st.expander("기사 본문 보기"): st.write(selected_article['content']) # 분석 방법 선택 analysis_type = st.radio( "분석 방법", ["키워드 분석", "감정 분석", "텍스트 통계"] ) if analysis_type == "키워드 분석": if st.button("키워드 분석하기"): with st.spinner("키워드를 분석 중입니다..."): keyword_tab1, keyword_tab2 = st.tabs(["키워드 빈도", "워드클라우드"]) with keyword_tab1: keywords = analyze_keywords(selected_article['content']) # Plotly를 사용한 시각화 df = pd.DataFrame(keywords, columns=['단어', '빈도수']) fig = go.Figure(data=[ go.Bar( x=df['단어'], y=df['빈도수'], marker_color='rgb(55, 83, 109)' ) ]) fig.update_layout( title='키워드 빈도 분석', xaxis_title='키워드', yaxis_title='빈도수', height=500, margin=dict(l=50, r=50, t=80, b=50) ) st.plotly_chart(fig, use_container_width=True) st.write("**주요 키워드:**") for word, count in keywords: st.write(f"- {word}: {count}회") with keyword_tab2: keyword_dict = extract_keywords_for_wordcloud(selected_article['content']) wc = generate_wordcloud(keyword_dict) if wc: fig, ax = plt.subplots(figsize=(10, 5)) ax.imshow(wc, interpolation='bilinear') ax.axis('off') st.pyplot(fig) # 키워드 상위 20개 표시 st.write("**상위 20개 키워드:**") top_keywords = sorted(keyword_dict.items(), key=lambda x: x[1], reverse=True)[:20] keyword_df = pd.DataFrame(top_keywords, columns=['키워드', '빈도']) st.dataframe(keyword_df) else: st.error("워드클라우드를 생성할 수 없습니다.") elif analysis_type == "텍스트 통계": if st.button("텍스트 통계 분석"): content = selected_article['content'] # 텍스트 통계 계산 word_count = len(re.findall(r'\b\w+\b', content)) char_count = len(content) try: # KSS로 문장 분리 sentences = kss.split_sentences(content) sentence_count = len(sentences) except: # KSS 실패시 기본 문장 분리 sentence_count = len(re.split(r'[.!?]+', content)) avg_word_length = sum(len(word) for word in re.findall(r'\b\w+\b', content)) / word_count if word_count > 0 else 0 avg_sentence_length = word_count / sentence_count if sentence_count > 0 else 0 # 통계 표시 st.subheader("텍스트 통계") col1, col2, col3 = st.columns(3) with col1: st.metric("단어 수", f"{word_count:,}") with col2: st.metric("문자 수", f"{char_count:,}") with col3: st.metric("문장 수", f"{sentence_count:,}") col1, col2 = st.columns(2) with col1: st.metric("평균 단어 길이", f"{avg_word_length:.1f}자") with col2: st.metric("평균 문장 길이", f"{avg_sentence_length:.1f}단어") # 텍스트 복잡성 점수 complexity_score = min(10, (avg_sentence_length / 10) * 5 + (avg_word_length / 5) * 5) st.progress(complexity_score / 10) st.write(f"텍스트 복잡성 점수: {complexity_score:.1f}/10") # 품사 분석 부분 제거 (KoNLPy 의존성 제거) st.info("상세 품사 분석은 현재 지원되지 않습니다.") elif analysis_type == "감정 분석": if st.button("감정 분석하기"): if st.session_state.openai_client: with st.spinner("기사의 감정을 분석 중입니다..."): try: response = st.session_state.openai_client.chat.completions.create( model="gpt-4.1-mini", messages=[ {"role": "system", "content": """당신은 텍스트의 감정과 논조를 분석하는 전문가입니다. 다음 뉴스 기사의 감정과 논조를 분석하고, 반드시 아래 형식의 JSON으로 응답해주세요: { "sentiment": "긍정적/부정적/중립적", "reason": "이유 설명...", "keywords": [ {"word": "키워드1", "score": 8}, {"word": "키워드2", "score": 7} ] }"""}, {"role": "user", "content": f"다음 뉴스 기사를 분석해 주세요:\n\n제목: {selected_article['title']}\n\n내용: {selected_article['content'][:1500]}"} ], max_tokens=800, response_format={ "type": "json_object" } # JSON 응답 형식 강제 ) # 응답 내용 확인 및 디버깅 content = response.choices[0].message.content logging.info(f"API 응답: {content}") # JSON 파싱 try: analysis_result = json.loads(content) except json.JSONDecodeError as e: logging.error(f"JSON 파싱 오류: {str(e)}") logging.error(f"파싱 시도한 내용: {content}") st.error("API 응답을 파싱하는 중 오류가 발생했습니다. 응답 형식이 올바르지 않습니다.") st.stop() # return 대신 st.stop() 사용 # 결과 시각화 st.subheader("감정 분석 결과") # 1. 감정 타입에 따른 시각적 표현 sentiment_type = analysis_result.get('sentiment', '중립적') col1, col2, col3 = st.columns([1, 3, 1]) with col2: if sentiment_type == "긍정적": st.markdown(f"""
감정 강도: 높음
감정 강도: 높음
감정 강도: 중간
강도: {score}/10