| | """ |
| | AETHER Proto-AGI v2.2 - Core Infrastructure Module |
| | AI/데이터 인프라, 유틸리티, 외부 서비스 통합 |
| | """ |
| |
|
| | import json |
| | import sqlite3 |
| | import hashlib |
| | import numpy as np |
| | import requests |
| | from datetime import datetime, timedelta |
| | from dataclasses import dataclass, field, asdict |
| | from typing import Optional, List, Dict, Any, Generator, Tuple |
| | from enum import Enum |
| | from pathlib import Path |
| | from itertools import product |
| | import os |
| | import re |
| | import random |
| | import shutil |
| | from urllib.parse import urlparse, urljoin |
| | import time as time_module |
| | import tempfile |
| |
|
| | try: |
| | from bs4 import BeautifulSoup |
| | HAS_BS4 = True |
| | except ImportError: |
| | HAS_BS4 = False |
| | print("⚠️ beautifulsoup4 미설치. pip install beautifulsoup4") |
| |
|
| | try: |
| | from groq import Groq |
| | HAS_GROQ = True |
| | except ImportError: |
| | HAS_GROQ = False |
| | print("⚠️ groq 미설치. pip install groq") |
| |
|
| | try: |
| | import PyPDF2 |
| | HAS_PYPDF2 = True |
| | except ImportError: |
| | HAS_PYPDF2 = False |
| | print("⚠️ PyPDF2 미설치. pip install PyPDF2") |
| |
|
| | try: |
| | from docx import Document as DocxDocument |
| | HAS_DOCX = True |
| | except ImportError: |
| | HAS_DOCX = False |
| | print("⚠️ python-docx 미설치. pip install python-docx") |
| |
|
| | |
| |
|
| | PERSISTENT_DIR = "/data" |
| | LOCAL_FALLBACK_DIR = "./data" |
| | BACKUP_INTERVAL_MINUTES = 30 |
| |
|
| | VECTOR_DIM = 384 |
| | MAX_CONCURRENT_USERS = 10 |
| | MAX_QUEUE_SIZE = 30 |
| | STATUS_UPDATE_RATE = 10 |
| |
|
| | MEMORY_CONFIG = { |
| | "short_term": {"max_items": 50, "ttl_hours": 24}, |
| | "mid_term": {"max_items": 200, "ttl_days": 30}, |
| | "long_term": {"max_items": 1000, "ttl_days": 365} |
| | } |
| |
|
| | BRAVE_SEARCH_PURPOSES = { |
| | "土": {"purpose": "목표 관련 최신 동향 및 전체 맥락", "query_prefix": "latest trends"}, |
| | "金": {"purpose": "팩트체크 및 반론 근거 검증", "query_prefix": "fact check verify"}, |
| | "水": {"purpose": "심층 자료 조사 및 근거 수집", "query_prefix": "research data evidence"}, |
| | "木": {"purpose": "유사 사례 및 영감 소스 탐색", "query_prefix": "innovative examples case study"}, |
| | "火": {"purpose": "구현 방법 및 기술 문서 검색", "query_prefix": "how to implement tutorial"} |
| | } |
| |
|
| | COMIC_CSS = """ |
| | @import url('https://fonts.googleapis.com/css2?family=Bangers&family=Comic+Neue:wght@400;700&family=Noto+Sans+KR:wght@400;700&display=swap'); |
| | .gradio-container { background-color: #FEF9C3 !important; background-image: radial-gradient(#1F2937 1px, transparent 1px) !important; background-size: 20px 20px !important; min-height: 100vh !important; font-family: 'Noto Sans KR', 'Comic Neue', cursive, sans-serif !important; } |
| | footer, .footer, .gradio-container footer, .built-with, [class*="footer"], .gradio-footer, a[href*="gradio.app"] { display: none !important; visibility: hidden !important; height: 0 !important; } |
| | .header-container { text-align: center; padding: 25px 20px; background: #3B82F6; border: 4px solid #1F2937; border-radius: 12px; margin-bottom: 20px; box-shadow: 8px 8px 0 #1F2937; } |
| | .header-title { font-family: 'Bangers', cursive !important; color: #FFF !important; font-size: 3rem !important; text-shadow: 3px 3px 0 #1F2937 !important; letter-spacing: 3px !important; margin: 0 !important; } |
| | .header-subtitle { font-family: 'Noto Sans KR', sans-serif !important; font-size: 1.1rem !important; color: #FEF9C3 !important; margin-top: 8px !important; font-weight: 700 !important; } |
| | .element-badge { display: inline-block; padding: 8px 16px; border-radius: 20px; font-size: 0.95rem; margin: 4px; font-weight: 700; border: 3px solid #1F2937; box-shadow: 3px 3px 0 #1F2937; font-family: 'Noto Sans KR', sans-serif !important; } |
| | .badge-earth { background: linear-gradient(135deg, #D2691E, #8B4513); color: #FFF; } |
| | .badge-metal { background: linear-gradient(135deg, #E8E8E8, #C0C0C0); color: #1F2937; } |
| | .badge-water { background: linear-gradient(135deg, #00BFFF, #1E90FF); color: #FFF; } |
| | .badge-wood { background: linear-gradient(135deg, #32CD32, #228B22); color: #FFF; } |
| | .badge-fire { background: linear-gradient(135deg, #FF6347, #FF4500); color: #FFF; } |
| | .gr-panel, .gr-box, .gr-form, .block, .gr-group { background: #FFF !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; box-shadow: 5px 5px 0 #1F2937 !important; } |
| | .gr-button-primary, button.primary, .gr-button.primary { background: #EF4444 !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; color: #FFF !important; font-family: 'Bangers', cursive !important; font-size: 1.2rem !important; letter-spacing: 2px !important; padding: 12px 24px !important; box-shadow: 4px 4px 0 #1F2937 !important; text-shadow: 1px 1px 0 #1F2937 !important; transition: all 0.2s !important; } |
| | .gr-button-primary:hover, button.primary:hover { background: #DC2626 !important; transform: translate(-2px, -2px) !important; box-shadow: 6px 6px 0 #1F2937 !important; } |
| | .gr-button-secondary, button.secondary { background: #10B981 !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; color: #FFF !important; font-weight: 700 !important; } |
| | textarea, input[type="text"], input[type="number"] { background: #FFF !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; color: #1F2937 !important; font-family: 'Noto Sans KR', sans-serif !important; font-weight: 700 !important; } |
| | textarea:focus, input[type="text"]:focus { border-color: #3B82F6 !important; box-shadow: 3px 3px 0 #3B82F6 !important; } |
| | .info-box { background: #FACC15 !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; padding: 12px 15px !important; margin: 10px 0 !important; box-shadow: 4px 4px 0 #1F2937 !important; font-family: 'Noto Sans KR', sans-serif !important; font-weight: 700 !important; color: #1F2937 !important; } |
| | .final-report-box { background: #ECFDF5 !important; border: 4px solid #10B981 !important; border-radius: 12px !important; box-shadow: 6px 6px 0 #059669 !important; padding: 5px !important; } |
| | .orchestration-log textarea { background: #1F2937 !important; color: #10B981 !important; font-family: 'Courier New', monospace !important; border: 3px solid #374151 !important; border-radius: 8px !important; font-size: 0.85rem !important; line-height: 1.4 !important; } |
| | label, .gr-input-label, .gr-block-label { color: #1F2937 !important; font-family: 'Noto Sans KR', sans-serif !important; font-weight: 700 !important; } |
| | .gr-accordion { background: #E0F2FE !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; box-shadow: 4px 4px 0 #1F2937 !important; } |
| | .tab-nav button { font-family: 'Noto Sans KR', sans-serif !important; font-weight: 700 !important; border: 2px solid #1F2937 !important; margin: 2px !important; background: #FFF !important; transition: all 0.2s !important; } |
| | .tab-nav button.selected { background: #3B82F6 !important; color: #FFF !important; box-shadow: 3px 3px 0 #1F2937 !important; } |
| | .footer-comic { text-align: center; padding: 20px; background: #3B82F6; border: 4px solid #1F2937; border-radius: 12px; margin-top: 20px; box-shadow: 6px 6px 0 #1F2937; } |
| | .footer-comic p { font-family: 'Noto Sans KR', sans-serif !important; color: #FFF !important; margin: 5px 0 !important; font-weight: 700 !important; } |
| | ::-webkit-scrollbar { width: 12px; height: 12px; } |
| | ::-webkit-scrollbar-track { background: #FEF9C3; border: 2px solid #1F2937; } |
| | ::-webkit-scrollbar-thumb { background: #3B82F6; border: 2px solid #1F2937; border-radius: 6px; } |
| | ::-webkit-scrollbar-thumb:hover { background: #EF4444; } |
| | ::selection { background: #FACC15; color: #1F2937; } |
| | .gr-slider input[type="range"] { accent-color: #3B82F6 !important; } |
| | .gr-json { background: #FFF !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; } |
| | .gr-plot { background: #FFF !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; box-shadow: 4px 4px 0 #1F2937 !important; } |
| | .huggingface-space-link, a[href*="huggingface.co/spaces"], button[class*="share"], .share-button, [class*="hf-logo"], .gr-share-btn, #hf-logo, .hf-icon, svg[class*="hf"], div[class*="huggingface"], a[class*="huggingface"], .svelte-1rjryqp, header a[href*="huggingface"], .space-header { display: none !important; visibility: hidden !important; opacity: 0 !important; pointer-events: none !important; } |
| | .gr-radio label { padding: 12px 16px !important; border: 2px solid #1F2937 !important; border-radius: 8px !important; margin: 4px 0 !important; background: #FFF !important; transition: all 0.2s ease !important; cursor: pointer !important; } |
| | .gr-radio label:hover { background: #FEF3C7 !important; transform: translateX(3px) !important; } |
| | .gr-radio input:checked + label { background: linear-gradient(135deg, #3B82F6, #1E40AF) !important; color: #FFF !important; box-shadow: 3px 3px 0 #1F2937 !important; } |
| | .gr-radio input:disabled + label { opacity: 0.5 !important; cursor: not-allowed !important; background: #E5E7EB !important; color: #9CA3AF !important; } |
| | .model-info-box { background: #FEF3C7 !important; border: 2px dashed #F59E0B !important; border-radius: 8px !important; padding: 8px 12px !important; font-size: 0.9rem !important; } |
| | .upload-box { background: #F0FDF4 !important; border: 3px dashed #10B981 !important; border-radius: 12px !important; padding: 20px !important; } |
| | .download-btn { background: #8B5CF6 !important; color: white !important; } |
| | """ |
| |
|
| | |
| |
|
| | class UncertaintyLevel(Enum): |
| | HIGH = "high" |
| | MEDIUM = "medium" |
| | LOW = "low" |
| |
|
| | @dataclass |
| | class QualityAssessment: |
| | factual_confidence: float |
| | logical_coherence: float |
| | completeness: float |
| | specificity: float |
| | overall_score: float |
| | uncertainty_flags: List[str] = field(default_factory=list) |
| | needs_verification: List[str] = field(default_factory=list) |
| | recommendations: List[str] = field(default_factory=list) |
| |
|
| | @dataclass |
| | class GoalClarity: |
| | clarity_score: float |
| | ambiguous_terms: List[str] |
| | missing_context: List[str] |
| | suggested_clarifications: List[str] |
| | is_actionable: bool |
| | goal_type: str = "general" |
| |
|
| | @dataclass |
| | class Memory: |
| | id: str |
| | content: str |
| | memory_type: str |
| | element: str |
| | goal_context: str |
| | importance: float |
| | access_count: int |
| | created_at: str |
| | last_accessed: str |
| | embedding: Optional[List[float]] = None |
| | metadata: Dict[str, Any] = field(default_factory=dict) |
| |
|
| | @dataclass |
| | class SearchResult: |
| | query: str |
| | element: str |
| | results: List[Dict] |
| | timestamp: str |
| |
|
| | @dataclass |
| | class Knowledge: |
| | id: str |
| | goal: str |
| | query: str |
| | element: str |
| | search_results: str |
| | agent_output: str |
| | final_result: str |
| | embedding: List[float] |
| | created_at: str |
| | access_count: int = 0 |
| |
|
| | |
| |
|
| | _STORAGE_BASE_PATH = None |
| |
|
| | def _try_enable_persistent_storage_via_api() -> bool: |
| | hf_token = os.getenv("HF_TOKEN") |
| | space_id = os.getenv("SPACE_ID") |
| | if not hf_token: |
| | print("⚠️ HF_TOKEN 환경변수 없음 - API 활성화 불가") |
| | return False |
| | if not space_id: |
| | print("⚠️ SPACE_ID 환경변수 없음 - HF Spaces 환경이 아닌 것 같습니다") |
| | return False |
| | print(f"🔄 Persistent Storage API 활성화 시도...") |
| | print(f" Space ID: {space_id}") |
| | try: |
| | from huggingface_hub import HfApi, SpaceStorage |
| | api = HfApi(token=hf_token) |
| | try: |
| | runtime = api.get_space_runtime(repo_id=space_id) |
| | current_storage = getattr(runtime, 'storage', None) |
| | if current_storage: |
| | print(f"✅ Persistent Storage 이미 활성화됨: {current_storage}") |
| | return True |
| | else: |
| | print("📦 Persistent Storage 비활성화 상태 - 활성화 시도...") |
| | except Exception as e: |
| | print(f"⚠️ Space 상태 확인 실패: {e}") |
| | api.request_space_storage(repo_id=space_id, storage=SpaceStorage.SMALL) |
| | print("✅ Persistent Storage SMALL 활성화 요청 완료!") |
| | print("⏳ Space가 재시작됩니다. 잠시 후 /data 디렉토리가 생성됩니다.") |
| | return True |
| | except ImportError: |
| | print("⚠️ huggingface_hub 라이브러리 없음") |
| | return False |
| | except Exception as e: |
| | error_msg = str(e) |
| | if "already" in error_msg.lower() or "exists" in error_msg.lower(): |
| | print(f"✅ Persistent Storage 이미 활성화됨") |
| | return True |
| | elif "payment" in error_msg.lower() or "billing" in error_msg.lower(): |
| | print(f"⚠️ 결제 정보 필요: {error_msg}") |
| | else: |
| | print(f"⚠️ API 활성화 실패: {error_msg}") |
| | return False |
| |
|
| | def _determine_storage_path() -> str: |
| | global _STORAGE_BASE_PATH |
| | if _STORAGE_BASE_PATH is not None: |
| | return _STORAGE_BASE_PATH |
| | print("\n" + "=" * 60) |
| | print("🔍 스토리지 초기화 중...") |
| | print("=" * 60) |
| | if os.path.exists(PERSISTENT_DIR): |
| | try: |
| | test_file = os.path.join(PERSISTENT_DIR, ".write_test") |
| | with open(test_file, "w") as f: |
| | f.write("test") |
| | os.remove(test_file) |
| | _STORAGE_BASE_PATH = PERSISTENT_DIR |
| | print(f"✅ HF Spaces 영구 스토리지 활성화: {PERSISTENT_DIR}") |
| | existing_files = [f for f in os.listdir(PERSISTENT_DIR) if f.endswith('.db') or f.endswith('.json')] |
| | if existing_files: |
| | print(f"📁 기존 파일 발견: {existing_files}") |
| | else: |
| | print("📁 기존 파일 없음 (새로운 스토리지)") |
| | print("=" * 60 + "\n") |
| | return _STORAGE_BASE_PATH |
| | except Exception as e: |
| | print(f"⚠️ /data 쓰기 테스트 실패: {e}") |
| | else: |
| | print(f"⚠️ {PERSISTENT_DIR} 디렉토리 없음") |
| | print("\n🚀 API로 Persistent Storage 활성화 시도...") |
| | if _try_enable_persistent_storage_via_api(): |
| | print("💡 API 요청 완료. Space 재시작 후 /data 사용 가능") |
| | os.makedirs(LOCAL_FALLBACK_DIR, exist_ok=True) |
| | _STORAGE_BASE_PATH = LOCAL_FALLBACK_DIR |
| | print(f"\n🟡 현재 로컬 스토리지 사용: {LOCAL_FALLBACK_DIR}") |
| | print(" (Persistent Storage 활성화 후 Space 재시작 필요)") |
| | print("=" * 60 + "\n") |
| | return _STORAGE_BASE_PATH |
| |
|
| | def get_persistent_path(filename: str) -> str: |
| | base_path = _determine_storage_path() |
| | return os.path.join(base_path, filename) |
| |
|
| | def get_storage_info() -> dict: |
| | base_path = _determine_storage_path() |
| | db_path = os.path.join(base_path, "soma_ohaeng.db") |
| | info = { |
| | "base_path": base_path, |
| | "is_persistent": base_path == PERSISTENT_DIR, |
| | "db_path": db_path, |
| | "db_exists": os.path.exists(db_path), |
| | "db_size": 0, |
| | "files": [] |
| | } |
| | if os.path.exists(db_path): |
| | info["db_size"] = os.path.getsize(db_path) |
| | if os.path.exists(base_path): |
| | info["files"] = [f for f in os.listdir(base_path) if not f.startswith('.')] |
| | return info |
| |
|
| | def ensure_persistent_storage(): |
| | base_path = _determine_storage_path() |
| | os.makedirs(base_path, exist_ok=True) |
| | return base_path == PERSISTENT_DIR |
| |
|
| | def migrate_to_persistent_storage(): |
| | base_path = _determine_storage_path() |
| | local_files = ["soma_ohaeng.db", "creat.json"] |
| | for filename in local_files: |
| | target_path = os.path.join(base_path, filename) |
| | if os.path.exists(target_path): |
| | size = os.path.getsize(target_path) |
| | print(f"📁 {filename} 이미 존재 ({size:,} bytes)") |
| | continue |
| | source_paths = [ |
| | filename, |
| | os.path.join("./data", filename), |
| | os.path.join("/tmp", filename), |
| | ] |
| | for source_path in source_paths: |
| | if os.path.exists(source_path) and source_path != target_path: |
| | try: |
| | shutil.copy2(source_path, target_path) |
| | print(f"✅ 마이그레이션: {source_path} → {target_path}") |
| | break |
| | except Exception as e: |
| | print(f"⚠️ 마이그레이션 실패 {source_path}: {e}") |
| |
|
| | def backup_database(): |
| | db_path = get_persistent_path("soma_ohaeng.db") |
| | if os.path.exists(db_path): |
| | backup_name = f"soma_ohaeng_backup_{datetime.now().strftime('%Y%m%d_%H%M')}.db" |
| | backup_path = get_persistent_path(backup_name) |
| | try: |
| | shutil.copy2(db_path, backup_path) |
| | print(f"✅ DB 백업 완료: {backup_path}") |
| | cleanup_old_backups() |
| | return True |
| | except Exception as e: |
| | print(f"⚠️ 백업 실패: {e}") |
| | return False |
| |
|
| | def cleanup_old_backups(): |
| | base_path = _determine_storage_path() |
| | try: |
| | backup_files = sorted([ |
| | f for f in os.listdir(base_path) |
| | if f.startswith("soma_ohaeng_backup_") and f.endswith(".db") |
| | ], reverse=True) |
| | for old_backup in backup_files[5:]: |
| | try: |
| | os.remove(os.path.join(base_path, old_backup)) |
| | print(f"🗑️ 오래된 백업 삭제: {old_backup}") |
| | except: |
| | pass |
| | except: |
| | pass |
| |
|
| | def verify_db_persistence(): |
| | db_path = get_persistent_path("soma_ohaeng.db") |
| | print(f"\n🔍 DB 영속성 검증:") |
| | print(f" 경로: {db_path}") |
| | print(f" 존재: {os.path.exists(db_path)}") |
| | if os.path.exists(db_path): |
| | print(f" 크기: {os.path.getsize(db_path):,} bytes") |
| | try: |
| | conn = sqlite3.connect(db_path) |
| | c = conn.cursor() |
| | c.execute("SELECT name FROM sqlite_master WHERE type='table'") |
| | tables = [row[0] for row in c.fetchall()] |
| | print(f" 테이블: {tables}") |
| | for table in tables: |
| | c.execute(f"SELECT COUNT(*) FROM {table}") |
| | count = c.fetchone()[0] |
| | print(f" - {table}: {count}개 레코드") |
| | conn.close() |
| | except Exception as e: |
| | print(f" DB 읽기 오류: {e}") |
| | print() |
| |
|
| | |
| | ensure_persistent_storage() |
| | migrate_to_persistent_storage() |
| | verify_db_persistence() |
| |
|
| | DB_PATH = get_persistent_path("soma_ohaeng.db") |
| | CREAT_JSON_PATH = get_persistent_path("creat.json") |
| |
|
| | |
| |
|
| | def ensure_string(value: Any) -> str: |
| | if value is None: |
| | return "" |
| | if isinstance(value, str): |
| | return value |
| | if isinstance(value, (list, dict)): |
| | return json.dumps(value, ensure_ascii=False) |
| | return str(value) |
| |
|
| | def safe_float(value: Any, default: float = 0.0) -> float: |
| | if value is None: |
| | return default |
| | try: |
| | return float(value) |
| | except (ValueError, TypeError): |
| | return default |
| |
|
| | |
| |
|
| | class TimeAwareness: |
| | @staticmethod |
| | def now() -> datetime: |
| | return datetime.now() |
| | |
| | @staticmethod |
| | def get_formatted_time() -> str: |
| | now = datetime.now() |
| | weekdays = ["월요일", "화요일", "수요일", "목요일", "금요일", "토요일", "일요일"] |
| | weekday = weekdays[now.weekday()] |
| | return now.strftime(f"%Y년 %m월 %d일 ({weekday}) %H:%M:%S") |
| | |
| | @staticmethod |
| | def get_context_time() -> Dict: |
| | now = datetime.now() |
| | hour = now.hour |
| | if 5 <= hour < 12: time_of_day, greeting = "오전", "좋은 아침입니다" |
| | elif 12 <= hour < 14: time_of_day, greeting = "점심", "점심 시간입니다" |
| | elif 14 <= hour < 18: time_of_day, greeting = "오후", "좋은 오후입니다" |
| | elif 18 <= hour < 22: time_of_day, greeting = "저녁", "좋은 저녁입니다" |
| | else: time_of_day, greeting = "밤", "밤늦게까지 수고하십니다" |
| | month = now.month |
| | if month <= 3: quarter, half = "1분기", "상반기" |
| | elif month <= 6: quarter, half = "2분기", "상반기" |
| | elif month <= 9: quarter, half = "3분기", "하반기" |
| | else: quarter, half = "4분기", "하반기" |
| | return { |
| | "datetime": now, "formatted": TimeAwareness.get_formatted_time(), |
| | "year": now.year, "month": now.month, "day": now.day, |
| | "hour": now.hour, "minute": now.minute, |
| | "weekday": now.strftime("%A"), |
| | "weekday_kr": ["월요일", "화요일", "수요일", "목요일", "금요일", "토요일", "일요일"][now.weekday()], |
| | "time_of_day": time_of_day, "greeting": greeting, |
| | "quarter": quarter, "half": half, "timestamp": now.timestamp() |
| | } |
| | |
| | @staticmethod |
| | def get_time_prompt() -> str: |
| | ctx = TimeAwareness.get_context_time() |
| | return f"""[현재 시간 정보] |
| | - 일시: {ctx['formatted']} |
| | - 시간대: {ctx['time_of_day']} |
| | - 분기: {ctx['year']}년 {ctx['quarter']} ({ctx['half']}) |
| | """ |
| |
|
| | |
| |
|
| | class FileProcessor: |
| | @staticmethod |
| | def extract_text_from_pdf(file_path: str) -> str: |
| | if not HAS_PYPDF2: |
| | return "[ERROR] PyPDF2가 설치되지 않았습니다. pip install PyPDF2" |
| | try: |
| | text_content = [] |
| | with open(file_path, 'rb') as file: |
| | pdf_reader = PyPDF2.PdfReader(file) |
| | for page_num, page in enumerate(pdf_reader.pages): |
| | page_text = page.extract_text() |
| | if page_text: |
| | text_content.append(f"[페이지 {page_num + 1}]\n{page_text}") |
| | return "\n\n".join(text_content) if text_content else "[PDF에서 텍스트를 추출할 수 없습니다]" |
| | except Exception as e: |
| | return f"[PDF 읽기 오류: {str(e)}]" |
| | |
| | @staticmethod |
| | def extract_text_from_docx(file_path: str) -> str: |
| | if not HAS_DOCX: |
| | return "[ERROR] python-docx가 설치되지 않았습니다. pip install python-docx" |
| | try: |
| | doc = DocxDocument(file_path) |
| | paragraphs = [para.text for para in doc.paragraphs if para.text.strip()] |
| | return "\n\n".join(paragraphs) if paragraphs else "[문서에서 텍스트를 추출할 수 없습니다]" |
| | except Exception as e: |
| | return f"[DOCX 읽기 오류: {str(e)}]" |
| | |
| | @staticmethod |
| | def extract_text_from_txt(file_path: str) -> str: |
| | try: |
| | with open(file_path, 'r', encoding='utf-8') as f: |
| | return f.read() |
| | except UnicodeDecodeError: |
| | try: |
| | with open(file_path, 'r', encoding='cp949') as f: |
| | return f.read() |
| | except: |
| | return "[텍스트 파일 인코딩 오류]" |
| | except Exception as e: |
| | return f"[파일 읽기 오류: {str(e)}]" |
| | |
| | @staticmethod |
| | def process_uploaded_file(file) -> Tuple[str, str]: |
| | if file is None: |
| | return "", "" |
| | file_path = file.name if hasattr(file, 'name') else str(file) |
| | file_name = os.path.basename(file_path) |
| | file_ext = os.path.splitext(file_name)[1].lower() |
| | if file_ext == '.pdf': |
| | text = FileProcessor.extract_text_from_pdf(file_path) |
| | file_info = f"📄 PDF 파일: {file_name}" |
| | elif file_ext in ['.docx', '.doc']: |
| | text = FileProcessor.extract_text_from_docx(file_path) |
| | file_info = f"📝 Word 문서: {file_name}" |
| | elif file_ext in ['.txt', '.md', '.csv']: |
| | text = FileProcessor.extract_text_from_txt(file_path) |
| | file_info = f"📃 텍스트 파일: {file_name}" |
| | elif file_ext == '.json': |
| | text = FileProcessor.extract_text_from_txt(file_path) |
| | file_info = f"📋 JSON 파일: {file_name}" |
| | else: |
| | text = f"[지원하지 않는 파일 형식: {file_ext}]" |
| | file_info = f"❌ 지원 불가: {file_name}" |
| | if len(text) > 50000: |
| | text = text[:50000] + f"\n\n[... 총 {len(text):,}자 중 50,000자만 표시 ...]" |
| | return text, file_info |
| |
|
| | |
| |
|
| | class ExportManager: |
| | @staticmethod |
| | def export_to_markdown(report: str, goal: str) -> str: |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | filename = f"aether_report_{timestamp}.md" |
| | filepath = os.path.join(tempfile.gettempdir(), filename) |
| | content = f"""# AETHER Proto-AGI 분석 보고서 |
| | **생성 일시**: {datetime.now().strftime("%Y년 %m월 %d일 %H:%M:%S")} |
| | **분석 목표**: {goal} |
| | --- |
| | {report} |
| | --- |
| | *이 보고서는 AETHER Proto-AGI (SOMA 오행 순환 · SLAI 자기학습 · MAIA 창발)에 의해 자동 생성되었습니다.* |
| | """ |
| | with open(filepath, 'w', encoding='utf-8') as f: |
| | f.write(content) |
| | return filepath |
| | |
| | @staticmethod |
| | def export_to_json(report: str, goal: str, log: str, element_outputs: Dict) -> str: |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | filename = f"aether_report_{timestamp}.json" |
| | filepath = os.path.join(tempfile.gettempdir(), filename) |
| | data = { |
| | "meta": { |
| | "generator": "AETHER Proto-AGI", |
| | "version": "2.2", |
| | "timestamp": datetime.now().isoformat(), |
| | "goal": goal |
| | }, |
| | "report": report, |
| | "orchestration_log": log, |
| | "element_outputs": element_outputs, |
| | "analysis_summary": { |
| | "total_elements": len(element_outputs), |
| | "elements_processed": list(element_outputs.keys()) |
| | } |
| | } |
| | with open(filepath, 'w', encoding='utf-8') as f: |
| | json.dump(data, f, ensure_ascii=False, indent=2) |
| | return filepath |
| | |
| | @staticmethod |
| | def export_log_to_txt(log: str) -> str: |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | filename = f"aether_log_{timestamp}.txt" |
| | filepath = os.path.join(tempfile.gettempdir(), filename) |
| | with open(filepath, 'w', encoding='utf-8') as f: |
| | f.write(f"AETHER Proto-AGI 오케스트레이션 로그\n") |
| | f.write(f"생성 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") |
| | f.write("=" * 60 + "\n\n") |
| | f.write(log) |
| | return filepath |
| |
|
| | |
| |
|
| | class OutputFormatter: |
| | ELEMENT_STYLES = { |
| | "土": {"emoji": "🟤", "name": "감독", "icon": "🏛️"}, |
| | "金": {"emoji": "⚪", "name": "비평", "icon": "⚔️"}, |
| | "水": {"emoji": "🔵", "name": "리서치", "icon": "🔬"}, |
| | "木": {"emoji": "🟢", "name": "창발", "icon": "💡"}, |
| | "火": {"emoji": "🔴", "name": "실행", "icon": "🚀"} |
| | } |
| | |
| | @staticmethod |
| | def header(title: str, emoji: str = "🌀") -> str: |
| | return f"\n{'━' * 50}\n {emoji} {title}\n{'━' * 50}\n" |
| | |
| | @staticmethod |
| | def element_header(element_name: str) -> str: |
| | style = OutputFormatter.ELEMENT_STYLES.get(element_name, {}) |
| | emoji = style.get("emoji", "●") |
| | name = style.get("name", element_name) |
| | icon = style.get("icon", "") |
| | return f"\n┏━━ {emoji} {element_name}({name}) {icon} ━━┓\n" |
| | |
| | @staticmethod |
| | def search_result(count: int, cached: bool = False) -> str: |
| | status = "📦캐시" if cached else "🔍검색" |
| | bar = "█" * min(count, 10) + "░" * (10 - min(count, 10)) |
| | return f" {status} │{bar}│ {count}건\n" |
| | |
| | @staticmethod |
| | def satisfaction_bar(score: float) -> str: |
| | filled = int(score * 20) |
| | bar = "█" * filled + "░" * (20 - filled) |
| | pct = score * 100 |
| | if score >= 0.85: emoji, status = "🎯", "달성" |
| | elif score >= 0.7: emoji, status = "📈", "양호" |
| | elif score >= 0.5: emoji, status = "🔄", "진행" |
| | else: emoji, status = "⚠️", "개선" |
| | return f" {emoji} 만족도: {bar} {pct:.0f}% ({status})\n" |
| | |
| | @staticmethod |
| | def cycle_start(iteration: int, time_str: str) -> str: |
| | return f"\n╔══ 🔄 순환 #{iteration} ══╗\n║ ⏰ {time_str}\n╚{'═' * 30}╝\n" |
| | |
| | @staticmethod |
| | def compact_json(parsed: Dict, element_name: str) -> str: |
| | lines = [] |
| | style = OutputFormatter.ELEMENT_STYLES.get(element_name, {}) |
| | emoji = style.get("emoji", "●") |
| | if element_name == "土": |
| | if "assessment" in parsed: |
| | assessment = str(parsed['assessment'])[:200] |
| | lines.append(f" {emoji} 평가: {assessment}...") |
| | if "direction" in parsed: |
| | lines.append(f" → 방향: {str(parsed['direction'])[:150]}") |
| | if "fact_check" in parsed: |
| | lines.append(f" ✓ 팩트: {str(parsed['fact_check'])[:100]}") |
| | elif element_name == "金": |
| | if "critiques" in parsed and parsed["critiques"]: |
| | lines.append(f" {emoji} 비판점 {len(parsed['critiques'])}개:") |
| | for c in parsed["critiques"][:2]: |
| | lines.append(f" • {str(c)[:80]}") |
| | if "risks" in parsed and parsed["risks"]: |
| | lines.append(f" ⚠️ 리스크 {len(parsed['risks'])}개:") |
| | for r in parsed["risks"][:2]: |
| | lines.append(f" • {str(r)[:80]}") |
| | if "verified_facts" in parsed and parsed["verified_facts"]: |
| | lines.append(f" ✅ 검증 {len(parsed['verified_facts'])}개") |
| | elif element_name == "水": |
| | if "findings" in parsed and parsed["findings"]: |
| | lines.append(f" {emoji} 발견 {len(parsed['findings'])}건:") |
| | for f in parsed["findings"][:3]: |
| | lines.append(f" • {str(f)[:80]}") |
| | if "evidence" in parsed and parsed["evidence"]: |
| | lines.append(f" 📚 근거 {len(parsed['evidence'])}건") |
| | if "gaps" in parsed and parsed["gaps"]: |
| | lines.append(f" ❓ 추가조사 {len(parsed['gaps'])}건") |
| | elif element_name == "木": |
| | if "ideas" in parsed and parsed["ideas"]: |
| | lines.append(f" {emoji} 아이디어 {len(parsed['ideas'])}개 생성") |
| | if "selected_top3" in parsed and parsed["selected_top3"]: |
| | lines.append(f" 🏆 TOP 3:") |
| | for i, idea in enumerate(parsed["selected_top3"][:3], 1): |
| | lines.append(f" {i}. {str(idea)[:70]}...") |
| | if "novel_connections" in parsed and parsed["novel_connections"]: |
| | lines.append(f" 🔗 새로운 연결 {len(parsed['novel_connections'])}개") |
| | elif element_name == "火": |
| | if "result" in parsed: |
| | result = str(parsed['result'])[:300] |
| | lines.append(f" {emoji} 결과:") |
| | for i, chunk in enumerate(result.split('. ')[:4]): |
| | if chunk.strip(): |
| | lines.append(f" {chunk.strip()}.") |
| | if "deliverables" in parsed and parsed["deliverables"]: |
| | lines.append(f" 📦 산출물 {len(parsed['deliverables'])}개:") |
| | for d in parsed["deliverables"][:3]: |
| | lines.append(f" • {str(d)[:60]}") |
| | if "implementation" in parsed: |
| | lines.append(f" 🔧 구현: {str(parsed['implementation'])[:100]}...") |
| | return "\n".join(lines) + "\n" if lines else "" |
| | |
| | @staticmethod |
| | def final_marker() -> str: |
| | return "\n" + "═" * 50 + "\n 🎉 목표 달성 완료!\n" + "═" * 50 + "\n" |
| |
|
| | |
| |
|
| | class ReportGenerator: |
| | @staticmethod |
| | def generate(state, stats: Dict) -> str: |
| | goal = state.goal |
| | score = state.satisfaction_score |
| | iterations = state.iteration |
| | element_outputs = state.element_outputs |
| | earth_data = element_outputs.get("土", {}) |
| | metal_data = element_outputs.get("金", {}) |
| | water_data = element_outputs.get("水", {}) |
| | wood_data = element_outputs.get("木", {}) |
| | fire_data = element_outputs.get("火", {}) |
| | main_result = fire_data.get("result", "") |
| | key_insight = wood_data.get("key_insight", wood_data.get("reframe", "")) |
| | report = f"""# 🎯 분석 결과 |
| | --- |
| | ## 📋 질문 |
| | > **{goal}** |
| | --- |
| | ## 💡 핵심 답변 |
| | """ |
| | if main_result: |
| | report += f"{main_result}\n\n" |
| | else: |
| | assessment = earth_data.get("assessment", "") |
| | if assessment: |
| | report += f"{assessment}\n\n" |
| | else: |
| | report += "*분석이 완료되지 않았습니다. 순환 횟수를 늘려 다시 시도해주세요.*\n\n" |
| | if key_insight: |
| | report += f"""--- |
| | ## 🔑 핵심 인사이트 |
| | {key_insight} |
| | """ |
| | top3 = wood_data.get("selected_top3", []) |
| | if top3: |
| | report += """--- |
| | ## 💡 주요 아이디어 |
| | """ |
| | for i, idea in enumerate(top3[:3], 1): |
| | report += f"**{i}.** {idea}\n\n" |
| | verified = metal_data.get("verified_facts", []) |
| | if verified: |
| | report += """--- |
| | ## ✅ 검증된 사실 |
| | """ |
| | for fact in verified[:5]: |
| | report += f"- {fact}\n" |
| | report += "\n" |
| | risks = metal_data.get("risks", []) |
| | if risks: |
| | report += """--- |
| | ## ⚠️ 주요 리스크 |
| | """ |
| | for risk in risks[:3]: |
| | report += f"- {risk}\n" |
| | report += "\n" |
| | findings = water_data.get("findings", []) |
| | if findings: |
| | report += """--- |
| | ## 📊 주요 발견 |
| | """ |
| | for finding in findings[:5]: |
| | report += f"- {finding}\n" |
| | report += "\n" |
| | if hasattr(state, 'metacog_assessments') and state.metacog_assessments: |
| | avg_quality = sum(a.overall_score for a in state.metacog_assessments) / len(state.metacog_assessments) |
| | report += f"""--- |
| | ## 🧠 메타인지 품질 평가 |
| | | 지표 | 평균 점수 | |
| | |------|-----------| |
| | | 종합 품질 | {'█' * int(avg_quality * 10)}{'░' * (10 - int(avg_quality * 10))} {avg_quality:.0%} | |
| | | 분석 단계 | {len(state.metacog_assessments)}회 평가 | |
| | """ |
| | report += f"""--- |
| | ## 📊 분석 정보 |
| | | 항목 | 값 | |
| | |------|-----| |
| | | 분석 신뢰도 | {'█' * int(score * 10)}{'░' * (10 - int(score * 10))} {score:.0%} | |
| | | 분석 순환 | {iterations}회 | |
| | | 세션 ID | `{state.session_id}` | |
| | | 생성 시간 | {TimeAwareness.get_formatted_time()} | |
| | """ |
| | return report |
| | |
| | @staticmethod |
| | def generate_progress(state) -> str: |
| | progress_bar = '█' * int(state.satisfaction_score * 10) + '░' * (10 - int(state.satisfaction_score * 10)) |
| | current_element = "" |
| | if state.history: |
| | last = state.history[-1] |
| | elem_name = last.get("element", "") |
| | elem_map = { |
| | "土": "🟤 土 감독", "金": "⚪ 金 비평", "水": "🔵 水 리서치", |
| | "木": "🟢 木 창발", "火": "🔴 火 실행" |
| | } |
| | current_element = elem_map.get(elem_name, elem_name) |
| | return f"""# ⏳ 분석 진행 중... |
| | ## 📋 질문 |
| | > **{state.goal}** |
| | ## 📊 현재 상태 |
| | | 항목 | 상태 | |
| | |------|------| |
| | | 진행 순환 | {state.iteration}회 | |
| | | 현재 단계 | {current_element} | |
| | | 진행도 | {progress_bar} {state.satisfaction_score:.0%} | |
| | --- |
| | 💡 **오케스트레이션 로그**를 펼쳐서 실시간 분석 과정을 확인하세요. |
| | *土(감독) → 金(비평) → 水(리서치) → 木(창발) → 火(실행) 순환 중...* |
| | """ |
| |
|
| | |
| |
|
| | class MetaCognition: |
| | UNCERTAINTY_MARKERS = [ |
| | "아마", "maybe", "perhaps", "possibly", "might", "could be", |
| | "불확실", "uncertain", "unclear", "추측", "guess", "assume", |
| | "~일 수 있", "~할 수도", "정확하지 않", "확인 필요", |
| | "것 같", "듯하", "보임", "추정" |
| | ] |
| | HEDGE_WORDS = [ |
| | "일반적으로", "보통", "대체로", "주로", "often", "usually", |
| | "typically", "generally", "sometimes", "occasionally", |
| | "대부분", "많은 경우" |
| | ] |
| | FACTUAL_INDICATORS = [ |
| | r'\d{4}년', r'\d+%', r'\d+억', r'\d+조', r'\$\d+', |
| | r'[A-Z]{2,}', r'「.+」', r'".+"', r'\[출처\]', |
| | r'\d+월', r'\d+일', r'약 \d+', r'총 \d+' |
| | ] |
| | LOGIC_CONNECTORS = [ |
| | "따라서", "그러므로", "때문에", "결과적으로", "왜냐하면", |
| | "therefore", "because", "consequently", "thus", "hence", |
| | "이로 인해", "그 결과", "이에 따라" |
| | ] |
| | ABSTRACT_WARNINGS = [ |
| | "플랫폼", "시스템", "프레임워크", "프로그램", "메커니즘", |
| | "글로벌 거버넌스", "국제 협력", "전략적 파트너십", |
| | "~를 개발", "~를 구축", "~를 설계" |
| | ] |
| | |
| | def __init__(self): |
| | self.confidence_history: Dict[str, List[float]] = {} |
| | self.failure_patterns: List[Dict] = [] |
| | self.domain_confidence: Dict[str, float] = {} |
| | self.uncertainty_threshold = 0.35 |
| | self.min_specificity_threshold = 0.4 |
| | |
| | def assess_response_quality(self, response: str, goal: str, context: Dict = None) -> QualityAssessment: |
| | if not response or not response.strip(): |
| | return QualityAssessment( |
| | factual_confidence=0.0, logical_coherence=0.0, |
| | completeness=0.0, specificity=0.0, overall_score=0.0, |
| | uncertainty_flags=["빈 응답"], |
| | needs_verification=["전체 재생성 필요"], |
| | recommendations=["응답 재생성 필요"] |
| | ) |
| | factual = self._assess_factual_grounding(response) |
| | logic = self._assess_logical_coherence(response) |
| | complete = self._assess_completeness(response, goal) |
| | specific = self._assess_specificity(response) |
| | abstract_penalty = self._check_abstract_expressions(response) |
| | uncertainties = self._identify_uncertainties(response) |
| | verifications = self._identify_verification_needs(response, goal) |
| | weights = {"factual": 0.3, "logic": 0.2, "complete": 0.3, "specific": 0.2} |
| | overall = (factual * weights["factual"] + |
| | logic * weights["logic"] + |
| | complete * weights["complete"] + |
| | specific * weights["specific"]) |
| | if len(uncertainties) > 3: |
| | overall *= 0.85 |
| | overall *= (1 - abstract_penalty * 0.15) |
| | recommendations = self._generate_recommendations( |
| | factual, logic, complete, specific, uncertainties, abstract_penalty |
| | ) |
| | return QualityAssessment( |
| | factual_confidence=round(factual, 3), |
| | logical_coherence=round(logic, 3), |
| | completeness=round(complete, 3), |
| | specificity=round(specific, 3), |
| | overall_score=round(max(0, min(1, overall)), 3), |
| | uncertainty_flags=uncertainties, |
| | needs_verification=verifications, |
| | recommendations=recommendations |
| | ) |
| | |
| | def _assess_factual_grounding(self, response: str) -> float: |
| | score = 0.3 |
| | for pattern in self.FACTUAL_INDICATORS: |
| | matches = re.findall(pattern, response) |
| | score += min(len(matches) * 0.05, 0.25) |
| | source_patterns = [r'\[출처[:\s]', r'에 따르면', r'보도에 의하면', r'발표한 바', r'연구에 따르면'] |
| | for pattern in source_patterns: |
| | if re.search(pattern, response): |
| | score += 0.1 |
| | uncertainty_count = sum(1 for marker in self.UNCERTAINTY_MARKERS if marker in response.lower()) |
| | score -= uncertainty_count * 0.05 |
| | return max(0.0, min(1.0, score)) |
| | |
| | def _assess_logical_coherence(self, response: str) -> float: |
| | score = 0.5 |
| | connector_count = sum(1 for conn in self.LOGIC_CONNECTORS if conn in response) |
| | score += min(connector_count * 0.08, 0.25) |
| | sentences = re.split(r'[.!?。]', response) |
| | if len(sentences) > 3: |
| | score += 0.1 |
| | structure_patterns = [r'첫째|둘째|셋째', r'1\.|2\.|3\.', r'###\s', r'\*\*[^*]+\*\*', r'\|.*\|'] |
| | for pattern in structure_patterns: |
| | if re.search(pattern, response): |
| | score += 0.05 |
| | contradictions = self._detect_contradictions(response) |
| | score -= len(contradictions) * 0.15 |
| | return max(0.0, min(1.0, score)) |
| | |
| | def _detect_contradictions(self, response: str) -> List[str]: |
| | contradictions = [] |
| | contradiction_pairs = [ |
| | (r'증가.{0,30}감소', "증가/감소 모순"), |
| | (r'성공.{0,30}실패', "성공/실패 모순"), |
| | (r'긍정.{0,30}부정', "긍정/부정 모순"), |
| | (r'불가능.{0,20}가능', "가능/불가능 모순"), |
| | (r'강화.{0,20}약화', "강화/약화 모순"), |
| | ] |
| | for pattern, desc in contradiction_pairs: |
| | if re.search(pattern, response): |
| | contradictions.append(desc) |
| | return contradictions |
| | |
| | def _assess_completeness(self, response: str, goal: str) -> float: |
| | goal_keywords = self._extract_keywords(goal) |
| | if not goal_keywords: |
| | return 0.5 |
| | response_lower = response.lower() |
| | covered = sum(1 for kw in goal_keywords if kw.lower() in response_lower) |
| | coverage_ratio = covered / len(goal_keywords) if goal_keywords else 0 |
| | length_score = min(len(response) / 1500, 1.0) * 0.3 |
| | return min(1.0, coverage_ratio * 0.7 + length_score) |
| | |
| | def _extract_keywords(self, text: str) -> List[str]: |
| | stopwords = { |
| | '의', '가', '이', '은', '들', '는', '좀', '잘', '를', '을', '로', '으로', |
| | '에', '와', '과', '도', '만', '라', '하다', '있다', '되다', '이다', '그', |
| | '저', '것', '수', '등', '더', '때', 'the', 'a', 'an', 'is', 'are', 'was', |
| | 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', |
| | 'will', 'would', 'could', 'should', 'may', 'might', 'must', 'shall', 'can', |
| | 'need', 'to', 'of', 'in', 'for', 'on', 'with', 'at', 'by', 'from', 'as', |
| | 'what', 'which', 'who', 'how', 'why', 'when', 'where' |
| | } |
| | words = re.findall(r'[가-힣a-zA-Z0-9]+', text) |
| | keywords = [w for w in words if len(w) > 1 and w.lower() not in stopwords] |
| | return keywords[:15] |
| | |
| | def _assess_specificity(self, response: str) -> float: |
| | score = 0.2 |
| | number_matches = re.findall(r'\d+(?:\.\d+)?(?:%|억|만|천|조|달러|원)?', response) |
| | score += min(len(number_matches) * 0.04, 0.25) |
| | proper_nouns = re.findall(r'[A-Z][a-z]+(?:\s[A-Z][a-z]+)*', response) |
| | korean_proper = re.findall( |
| | r'(?:미국|중국|일본|한국|유럽|러시아|인도|독일|영국|프랑스|' |
| | r'삼성|애플|구글|마이크로소프트|테슬라|아마존|화웨이|TSMC|엔비디아|' |
| | r'트럼프|바이든|시진핑|푸틴|OpenAI|Anthropic)', response |
| | ) |
| | score += min((len(proper_nouns) + len(korean_proper)) * 0.03, 0.25) |
| | date_patterns = re.findall(r'\d{4}년|\d{1,2}월|\d{1,2}일|20\d{2}', response) |
| | score += min(len(date_patterns) * 0.05, 0.15) |
| | vague_terms = ['것', '등', '여러', '다양한', '많은', '일부', '어떤', 'something', 'various', 'many', 'some'] |
| | vague_count = sum(1 for term in vague_terms if term in response) |
| | score -= vague_count * 0.03 |
| | return max(0.0, min(1.0, score)) |
| | |
| | def _check_abstract_expressions(self, response: str) -> float: |
| | count = 0 |
| | for expr in self.ABSTRACT_WARNINGS: |
| | if expr in response: |
| | count += 1 |
| | return min(count / 5, 1.0) |
| | |
| | def _identify_uncertainties(self, response: str) -> List[str]: |
| | uncertainties = [] |
| | response_lower = response.lower() |
| | for marker in self.UNCERTAINTY_MARKERS: |
| | if marker in response_lower: |
| | idx = response_lower.find(marker) |
| | context = response[max(0, idx-20):min(len(response), idx+50)] |
| | uncertainties.append(f"불확실: '{marker}' - ...{context}...") |
| | if len(uncertainties) >= 5: |
| | break |
| | return uncertainties |
| | |
| | def _identify_verification_needs(self, response: str, goal: str) -> List[str]: |
| | needs = [] |
| | numbers = re.findall(r'\d+(?:\.\d+)?(?:%|억|조)', response) |
| | if numbers: |
| | needs.append(f"숫자 데이터 검증 필요: {', '.join(numbers[:3])}") |
| | quotes = re.findall(r'"([^"]+)"', response) |
| | if quotes: |
| | needs.append(f"인용문 검증 필요: {len(quotes)}개") |
| | return needs[:5] |
| | |
| | def _generate_recommendations(self, factual: float, logic: float, |
| | complete: float, specific: float, |
| | uncertainties: List[str], |
| | abstract_penalty: float) -> List[str]: |
| | recommendations = [] |
| | if factual < 0.5: |
| | recommendations.append("검색 결과에서 구체적 사실/수치 인용 강화") |
| | if logic < 0.5: |
| | recommendations.append("논리 연결어 사용하여 인과관계 명확화") |
| | if complete < 0.5: |
| | recommendations.append("목표의 모든 측면에 대한 답변 보완") |
| | if specific < 0.4: |
| | recommendations.append("국가명, 기업명, 수치 등 구체적 정보 추가") |
| | if len(uncertainties) > 2: |
| | recommendations.append("불확실한 표현 대신 검증된 사실로 대체") |
| | if abstract_penalty > 0.3: |
| | recommendations.append("'플랫폼', '시스템' 등 추상적 용어 대신 구체적 방안 제시") |
| | return recommendations[:5] |
| | |
| | def analyze_goal_clarity(self, goal: str) -> GoalClarity: |
| | score = 0.5 |
| | ambiguous = [] |
| | missing = [] |
| | suggestions = [] |
| | ambiguous_patterns = [ |
| | ('좋은', "좋은의 기준 모호"), |
| | ('최적', "최적의 기준 모호"), |
| | ('효과적', "효과적의 기준 모호"), |
| | ('적절한', "적절함의 기준 모호"), |
| | ('더 나은', "비교 대상 불명확"), |
| | ] |
| | for term, reason in ambiguous_patterns: |
| | if term in goal: |
| | ambiguous.append(reason) |
| | score -= 0.1 |
| | if not re.search(r'\d{4}|20\d{2}', goal): |
| | missing.append("시간 범위") |
| | suggestions.append("시간 범위를 명시해주세요 (예: 2025년, 향후 5년)") |
| | if len(goal) < 20: |
| | missing.append("상세 맥락") |
| | suggestions.append("구체적인 맥락이나 조건을 추가해주세요") |
| | goal_type = self._detect_goal_type(goal) |
| | if goal_type in ["prediction", "strategy", "comparison", "analysis"]: |
| | score += 0.1 |
| | is_actionable = score >= 0.5 and len(missing) <= 2 |
| | return GoalClarity( |
| | clarity_score=round(max(0, min(1, score)), 3), |
| | ambiguous_terms=ambiguous, |
| | missing_context=missing, |
| | suggested_clarifications=suggestions[:5], |
| | is_actionable=is_actionable, |
| | goal_type=goal_type |
| | ) |
| | |
| | def _detect_goal_type(self, goal: str) -> str: |
| | goal_lower = goal.lower() |
| | if any(kw in goal_lower for kw in ['예측', '전망', '될까', '될 것', '미래', '2025', '2026', '2027', '2028']): |
| | return "prediction" |
| | elif any(kw in goal_lower for kw in ['전략', '방법', '어떻게', '방안', '계획', '수립']): |
| | return "strategy" |
| | elif any(kw in goal_lower for kw in ['비교', '차이', 'vs', 'VS', '대', '어느 것']): |
| | return "comparison" |
| | elif any(kw in goal_lower for kw in ['분석', '왜', '원인', '영향', '현황']): |
| | return "analysis" |
| | elif any(kw in goal_lower for kw in ['특허', '발명', '아이디어', '혁신']): |
| | return "invention" |
| | elif any(kw in goal_lower for kw in ['소설', '스토리', '시나리오', '웹툰']): |
| | return "story" |
| | elif any(kw in goal_lower for kw in ['요리', '레시피', '음식']): |
| | return "recipe" |
| | else: |
| | return "general" |
| | |
| | def should_ask_clarification(self, goal: str) -> Tuple[bool, Optional[str]]: |
| | clarity = self.analyze_goal_clarity(goal) |
| | if clarity.clarity_score < self.uncertainty_threshold: |
| | return True, self._build_clarification_message(clarity) |
| | if not clarity.is_actionable: |
| | return True, self._build_clarification_message(clarity) |
| | return False, None |
| | |
| | def _build_clarification_message(self, clarity: GoalClarity) -> str: |
| | msg_parts = ["🤔 목표를 더 명확히 해주시면 더 좋은 분석이 가능합니다:\n"] |
| | if clarity.ambiguous_terms: |
| | msg_parts.append(f"• 모호한 표현: {', '.join(clarity.ambiguous_terms[:3])}") |
| | if clarity.missing_context: |
| | msg_parts.append(f"• 누락된 맥락: {', '.join(clarity.missing_context[:3])}") |
| | if clarity.suggested_clarifications: |
| | msg_parts.append("\n💡 제안:") |
| | for i, sugg in enumerate(clarity.suggested_clarifications[:3], 1): |
| | msg_parts.append(f" {i}. {sugg}") |
| | return "\n".join(msg_parts) |
| | |
| | def format_assessment_for_log(self, assessment: QualityAssessment) -> str: |
| | lines = [ |
| | "┌─ 🧠 메타인지 평가 ─┐", |
| | f"│ 사실근거: {'█' * int(assessment.factual_confidence * 10)}{'░' * (10 - int(assessment.factual_confidence * 10))} {assessment.factual_confidence:.0%}", |
| | f"│ 논리성: {'█' * int(assessment.logical_coherence * 10)}{'░' * (10 - int(assessment.logical_coherence * 10))} {assessment.logical_coherence:.0%}", |
| | f"│ 완성도: {'█' * int(assessment.completeness * 10)}{'░' * (10 - int(assessment.completeness * 10))} {assessment.completeness:.0%}", |
| | f"│ 구체성: {'█' * int(assessment.specificity * 10)}{'░' * (10 - int(assessment.specificity * 10))} {assessment.specificity:.0%}", |
| | f"│ ────────────────", |
| | f"│ 종합: {'█' * int(assessment.overall_score * 10)}{'░' * (10 - int(assessment.overall_score * 10))} {assessment.overall_score:.0%}", |
| | ] |
| | if assessment.uncertainty_flags: |
| | lines.append(f"│ ⚠️ 불확실: {len(assessment.uncertainty_flags)}건") |
| | if assessment.recommendations: |
| | lines.append(f"│ 💡 권장: {assessment.recommendations[0][:25]}...") |
| | lines.append("└────────────────────┘") |
| | return "\n".join(lines) |
| |
|
| | |
| |
|
| | class BraveSearchClient: |
| | def __init__(self): |
| | self.api_key = os.getenv("BRAVE_API_KEY") |
| | self.base_url = "https://api.search.brave.com/res/v1/web/search" |
| | |
| | def search(self, query: str, element: str, count: int = 5) -> Dict: |
| | if not self.api_key: |
| | return {"success": False, "error": "BRAVE_API_KEY 환경변수가 설정되지 않았습니다.", "results": []} |
| | purpose_info = BRAVE_SEARCH_PURPOSES.get(element, {}) |
| | query_prefix = purpose_info.get("query_prefix", "") |
| | optimized_query = f"{query_prefix} {query}".strip() |
| | headers = { |
| | "Accept": "application/json", |
| | "Accept-Encoding": "gzip", |
| | "X-Subscription-Token": self.api_key |
| | } |
| | params = {"q": optimized_query, "count": count, "text_decorations": False, "search_lang": "ko"} |
| | try: |
| | response = requests.get(self.base_url, headers=headers, params=params, timeout=10) |
| | response.raise_for_status() |
| | data = response.json() |
| | results = [] |
| | for item in data.get("web", {}).get("results", []): |
| | results.append({ |
| | "title": item.get("title", ""), |
| | "url": item.get("url", ""), |
| | "description": item.get("description", ""), |
| | "age": item.get("age", "") |
| | }) |
| | return { |
| | "success": True, "query": optimized_query, "element": element, |
| | "purpose": purpose_info.get("purpose", ""), "results": results, "count": len(results) |
| | } |
| | except requests.exceptions.RequestException as e: |
| | return {"success": False, "error": str(e), "results": []} |
| | |
| | def format_for_prompt(self, search_result: Dict) -> str: |
| | if not search_result.get("success") or not search_result.get("results"): |
| | return "[검색 결과 없음]" |
| | lines = [f"[Brave Search - {search_result.get('purpose', '')}]", f"쿼리: {search_result.get('query', '')}", ""] |
| | for i, result in enumerate(search_result.get("results", [])[:5], 1): |
| | lines.append(f"{i}. **{result.get('title', '')}**") |
| | lines.append(f" {result.get('description', '')[:200]}...") |
| | lines.append(f" 출처: {result.get('url', '')}") |
| | lines.append("") |
| | return "\n".join(lines) |
| |
|
| | |
| |
|
| | class URLCrawler: |
| | def __init__(self): |
| | self.session = requests.Session() |
| | self.session.headers.update({ |
| | "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", |
| | "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", |
| | "Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7" |
| | }) |
| | |
| | @staticmethod |
| | def extract_urls(text: str) -> List[str]: |
| | full_url_pattern = r'(?:https?://)?(?:www\.)?(?:[a-zA-Z0-9][-a-zA-Z0-9]*\.)+[a-zA-Z]{2,}(?:/[^\s]*)?' |
| | urls = re.findall(full_url_pattern, text) |
| | cleaned_urls = [] |
| | for url in urls: |
| | url = url.strip('.,;:!?()') |
| | if not url.startswith(('http://', 'https://')): |
| | url = 'https://' + url |
| | cleaned_urls.append(url) |
| | return list(set(cleaned_urls)) |
| | |
| | def crawl(self, url: str, max_length: int = 5000) -> Dict: |
| | if not HAS_BS4: |
| | return {"success": False, "error": "beautifulsoup4가 설치되지 않았습니다.", "url": url} |
| | try: |
| | if not url.startswith(('http://', 'https://')): |
| | url = 'https://' + url |
| | response = self.session.get(url, timeout=15, allow_redirects=True) |
| | response.raise_for_status() |
| | if response.encoding is None or response.encoding == 'ISO-8859-1': |
| | response.encoding = response.apparent_encoding or 'utf-8' |
| | soup = BeautifulSoup(response.text, 'html.parser') |
| | for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'button', 'iframe', 'noscript']): |
| | element.decompose() |
| | title = soup.title.string if soup.title else "" |
| | meta_desc = "" |
| | meta_tag = soup.find('meta', attrs={'name': 'description'}) |
| | if meta_tag: |
| | meta_desc = meta_tag.get('content', '') |
| | main_content = soup.find('main') or soup.find('article') or soup.find('body') |
| | if main_content: |
| | text = main_content.get_text(separator='\n', strip=True) |
| | else: |
| | text = soup.get_text(separator='\n', strip=True) |
| | lines = [line.strip() for line in text.split('\n') if line.strip()] |
| | text = '\n'.join(lines) |
| | if len(text) > max_length: |
| | text = text[:max_length] + "...[truncated]" |
| | links = [] |
| | for a_tag in soup.find_all('a', href=True)[:10]: |
| | href = a_tag['href'] |
| | link_text = a_tag.get_text(strip=True) |
| | if href.startswith('/'): |
| | href = urljoin(url, href) |
| | if href.startswith('http'): |
| | links.append({"text": link_text[:50], "url": href}) |
| | return { |
| | "success": True, "url": url, "final_url": response.url, "title": title, |
| | "meta_description": meta_desc, "content": text, "content_length": len(text), |
| | "links": links, "status_code": response.status_code, |
| | "crawled_at": TimeAwareness.get_formatted_time() |
| | } |
| | except requests.exceptions.Timeout: |
| | return {"success": False, "error": "요청 시간 초과 (15초)", "url": url} |
| | except requests.exceptions.SSLError: |
| | return {"success": False, "error": "SSL 인증서 오류", "url": url} |
| | except requests.exceptions.RequestException as e: |
| | return {"success": False, "error": str(e), "url": url} |
| | except Exception as e: |
| | return {"success": False, "error": f"크롤링 오류: {str(e)}", "url": url} |
| |
|
| | |
| |
|
| | class SimpleVectorDB: |
| | def __init__(self, dimension: int = VECTOR_DIM): |
| | self.dimension = dimension |
| | self.vectors: Dict[str, np.ndarray] = {} |
| | self.metadata: Dict[str, Dict] = {} |
| | |
| | def add(self, id: str, vector: List[float], metadata: Dict = None): |
| | self.vectors[id] = np.array(vector, dtype=np.float32) |
| | self.metadata[id] = metadata or {} |
| | |
| | def search(self, query_vector: List[float], top_k: int = 5) -> List[Tuple[str, float, Dict]]: |
| | if not self.vectors: |
| | return [] |
| | query = np.array(query_vector, dtype=np.float32) |
| | results = [] |
| | for id, vec in self.vectors.items(): |
| | similarity = np.dot(query, vec) / (np.linalg.norm(query) * np.linalg.norm(vec) + 1e-8) |
| | results.append((id, float(similarity), self.metadata.get(id, {}))) |
| | results.sort(key=lambda x: x[1], reverse=True) |
| | return results[:top_k] |
| | |
| | def delete(self, id: str): |
| | self.vectors.pop(id, None) |
| | self.metadata.pop(id, None) |
| | |
| | def simple_embed(self, text: str) -> List[float]: |
| | hash_bytes = hashlib.sha384(text.encode()).digest() |
| | embedding = [float(b) / 255.0 - 0.5 for b in hash_bytes] |
| | while len(embedding) < self.dimension: |
| | embedding.extend(embedding[:self.dimension - len(embedding)]) |
| | return embedding[:self.dimension] |
| |
|
| | |
| |
|
| | class SLAIMemorySystem: |
| | def __init__(self, db_path: str = DB_PATH): |
| | self.db_path = db_path |
| | self.vector_db = SimpleVectorDB() |
| | self.knowledge_vector_db = SimpleVectorDB() |
| | self._init_db() |
| | self._load_vectors() |
| | |
| | def _init_db(self): |
| | conn = sqlite3.connect(self.db_path) |
| | c = conn.cursor() |
| | c.execute("""CREATE TABLE IF NOT EXISTS memories ( |
| | id TEXT PRIMARY KEY, content TEXT NOT NULL, memory_type TEXT NOT NULL, |
| | element TEXT, goal_context TEXT, importance REAL DEFAULT 0.5, |
| | access_count INTEGER DEFAULT 0, created_at TEXT, last_accessed TEXT, |
| | embedding BLOB, metadata TEXT)""") |
| | c.execute("""CREATE TABLE IF NOT EXISTS learning_patterns ( |
| | id TEXT PRIMARY KEY, pattern_type TEXT, input_pattern TEXT, |
| | output_pattern TEXT, success_count INTEGER DEFAULT 0, |
| | fail_count INTEGER DEFAULT 0, confidence REAL DEFAULT 0.5, |
| | created_at TEXT, updated_at TEXT)""") |
| | c.execute("""CREATE TABLE IF NOT EXISTS sessions ( |
| | id TEXT PRIMARY KEY, goal TEXT, iterations INTEGER, satisfied INTEGER, |
| | satisfaction_score REAL, final_output TEXT, created_at TEXT, history TEXT)""") |
| | c.execute("""CREATE TABLE IF NOT EXISTS knowledge ( |
| | id TEXT PRIMARY KEY, goal TEXT NOT NULL, query TEXT, element TEXT, |
| | search_results TEXT, agent_output TEXT, final_result TEXT, |
| | embedding BLOB, created_at TEXT, access_count INTEGER DEFAULT 0, |
| | tags TEXT, quality_score REAL DEFAULT 0.5)""") |
| | c.execute("""CREATE TABLE IF NOT EXISTS search_cache ( |
| | id TEXT PRIMARY KEY, query TEXT NOT NULL, element TEXT, |
| | results TEXT, created_at TEXT, expiry_at TEXT)""") |
| | conn.commit() |
| | conn.close() |
| | print(f"✅ DB 초기화 완료: {self.db_path}") |
| | |
| | def _load_vectors(self): |
| | conn = sqlite3.connect(self.db_path) |
| | c = conn.cursor() |
| | c.execute("SELECT id, content, embedding, metadata FROM memories WHERE embedding IS NOT NULL") |
| | for row in c.fetchall(): |
| | id, content, embedding_blob, metadata_str = row |
| | if embedding_blob: |
| | embedding = json.loads(embedding_blob) |
| | metadata = json.loads(metadata_str) if metadata_str else {} |
| | metadata["content"] = content |
| | self.vector_db.add(id, embedding, metadata) |
| | c.execute("SELECT id, goal, final_result, embedding FROM knowledge WHERE embedding IS NOT NULL") |
| | for row in c.fetchall(): |
| | id, goal, final_result, embedding_blob = row |
| | if embedding_blob: |
| | embedding = json.loads(embedding_blob) |
| | self.knowledge_vector_db.add(id, embedding, { |
| | "goal": goal, |
| | "final_result": final_result[:500] if final_result else "" |
| | }) |
| | conn.close() |
| | |
| | def store_memory(self, content: str, memory_type: str, element: str, goal_context: str, importance: float = 0.5) -> str: |
| | content = ensure_string(content) |
| | memory_type = ensure_string(memory_type) |
| | element = ensure_string(element) |
| | goal_context = ensure_string(goal_context) |
| | importance = safe_float(importance, 0.5) |
| | memory_id = hashlib.md5(f"{content}{datetime.now().isoformat()}".encode()).hexdigest()[:16] |
| | now = datetime.now().isoformat() |
| | embedding = self.vector_db.simple_embed(content) |
| | conn = sqlite3.connect(self.db_path) |
| | c = conn.cursor() |
| | c.execute("""INSERT OR REPLACE INTO memories |
| | (id, content, memory_type, element, goal_context, importance, access_count, |
| | created_at, last_accessed, embedding, metadata) |
| | VALUES (?, ?, ?, ?, ?, ?, 0, ?, ?, ?, ?)""", |
| | (memory_id, content, memory_type, element, goal_context, importance, |
| | now, now, json.dumps(embedding), json.dumps({}))) |
| | conn.commit() |
| | conn.close() |
| | self.vector_db.add(memory_id, embedding, { |
| | "content": content, "memory_type": memory_type, |
| | "element": element, "goal_context": goal_context |
| | }) |
| | self._cleanup_memories(memory_type) |
| | return memory_id |
| | |
| | def retrieve_memories(self, query: str, memory_type: str = None, top_k: int = 5) -> List[Dict]: |
| | query_embedding = self.vector_db.simple_embed(query) |
| | results = self.vector_db.search(query_embedding, top_k * 2) |
| | filtered = [] |
| | for id, similarity, metadata in results: |
| | if memory_type and metadata.get("memory_type") != memory_type: |
| | continue |
| | filtered.append({ |
| | "id": id, "content": metadata.get("content", ""), |
| | "similarity": similarity, "memory_type": metadata.get("memory_type"), |
| | "element": metadata.get("element") |
| | }) |
| | if len(filtered) >= top_k: |
| | break |
| | self._update_access_count([r["id"] for r in filtered]) |
| | return filtered |
| | |
| | def promote_memory(self, memory_id: str): |
| | conn = sqlite3.connect(self.db_path) |
| | c = conn.cursor() |
| | c.execute("SELECT memory_type, access_count, importance FROM memories WHERE id = ?", (memory_id,)) |
| | row = c.fetchone() |
| | if row: |
| | current_type, access_count, importance = row |
| | if current_type == "short_term" and (access_count >= 3 or importance >= 0.7): |
| | new_type = "mid_term" |
| | elif current_type == "mid_term" and (access_count >= 10 or importance >= 0.85): |
| | new_type = "long_term" |
| | else: |
| | new_type = current_type |
| | if new_type != current_type: |
| | c.execute("UPDATE memories SET memory_type = ? WHERE id = ?", (new_type, memory_id)) |
| | conn.commit() |
| | conn.close() |
| | |
| | def store_knowledge(self, goal: str, query: str, element: str, search_results: Dict, |
| | agent_output: str, final_result: Any, quality_score: float = 0.5, |
| | tags: List[str] = None) -> str: |
| | knowledge_id = hashlib.md5(f"{goal}{element}{datetime.now().isoformat()}".encode()).hexdigest()[:16] |
| | now = datetime.now().isoformat() |
| | goal = ensure_string(goal) |
| | query = ensure_string(query) |
| | element = ensure_string(element) |
| | agent_output = ensure_string(agent_output) |
| | final_result = ensure_string(final_result) |
| | quality_score = safe_float(quality_score, 0.5) |
| | if isinstance(search_results, dict): |
| | search_results_str = json.dumps(search_results, ensure_ascii=False) |
| | else: |
| | search_results_str = ensure_string(search_results) |
| | if tags is None: |
| | tags = [] |
| | tags_str = json.dumps(tags, ensure_ascii=False) |
| | embedding_text = f"{goal} {final_result[:500]}" |
| | embedding = self.knowledge_vector_db.simple_embed(embedding_text) |
| | conn = sqlite3.connect(self.db_path) |
| | c = conn.cursor() |
| | c.execute("""INSERT OR REPLACE INTO knowledge |
| | (id, goal, query, element, search_results, agent_output, final_result, |
| | embedding, created_at, access_count, tags, quality_score) |
| | VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 0, ?, ?)""", |
| | (knowledge_id, goal, query, element, search_results_str, agent_output, |
| | final_result, json.dumps(embedding), now, tags_str, quality_score)) |
| | conn.commit() |
| | conn.close() |
| | self.knowledge_vector_db.add(knowledge_id, embedding, { |
| | "goal": goal, "final_result": final_result[:500] |
| | }) |
| | return knowledge_id |
| | |
| | def retrieve_knowledge(self, query: str, top_k: int = 5, min_similarity: float = 0.3) -> List[Dict]: |
| | query_embedding = self.knowledge_vector_db.simple_embed(query) |
| | results = self.knowledge_vector_db.search(query_embedding, top_k * 2) |
| | filtered = [] |
| | conn = sqlite3.connect(self.db_path) |
| | c = conn.cursor() |
| | for id, similarity, metadata in results: |
| | if similarity < min_similarity: |
| | continue |
| | c.execute("""SELECT goal, query, element, search_results, agent_output, |
| | final_result, quality_score, created_at FROM knowledge WHERE id = ?""", (id,)) |
| | row = c.fetchone() |
| | if row: |
| | try: |
| | search_results_parsed = json.loads(row[3]) if row[3] else {} |
| | except: |
| | search_results_parsed = {} |
| | filtered.append({ |
| | "id": id, "similarity": similarity, "goal": row[0], "query": row[1], |
| | "element": row[2], "search_results": search_results_parsed, |
| | "agent_output": row[4], "final_result": row[5], |
| | "quality_score": row[6], "created_at": row[7] |
| | }) |
| | c.execute("UPDATE knowledge SET access_count = access_count + 1 WHERE id = ?", (id,)) |
| | if len(filtered) >= top_k: |
| | break |
| | conn.commit() |
| | conn.close() |
| | return filtered |
| | |
| | def get_related_knowledge_for_element(self, goal: str, element: str, top_k: int = 3) -> List[Dict]: |
| | conn = sqlite3.connect(self.db_path) |
| | c = conn.cursor() |
| | query_embedding = self.knowledge_vector_db.simple_embed(goal) |
| | results = self.knowledge_vector_db.search(query_embedding, top_k * 3) |
| | filtered = [] |
| | for id, similarity, metadata in results: |
| | c.execute("SELECT element, agent_output, final_result FROM knowledge WHERE id = ?", (id,)) |
| | row = c.fetchone() |
| | if row and row[0] == element: |
| | filtered.append({ |
| | "id": id, "similarity": similarity, |
| | "agent_output": row[1][:300] if row[1] else "", |
| | "final_result": row[2][:300] if row[2] else "" |
| | }) |
| | if len(filtered) >= top_k: |
| | break |
| | conn.close() |
| | return filtered |
| | |
| | def cache_search(self, query: str, element: str, results: Dict, ttl_hours: int = 24) -> str: |
| | cache_id = hashlib.md5(f"{query}{element}".encode()).hexdigest()[:16] |
| | now = datetime.now() |
| | expiry = now + timedelta(hours=ttl_hours) |
| | conn = sqlite3.connect(self.db_path) |
| | c = conn.cursor() |
| | c.execute("""INSERT OR REPLACE INTO search_cache |
| | (id, query, element, results, created_at, expiry_at) VALUES (?, ?, ?, ?, ?, ?)""", |
| | (cache_id, query, element, json.dumps(results, ensure_ascii=False), |
| | now.isoformat(), expiry.isoformat())) |
| | conn.commit() |
| | conn.close() |
| | return cache_id |
| | |
| | def get_cached_search(self, query: str, element: str) -> Optional[Dict]: |
| | cache_id = hashlib.md5(f"{query}{element}".encode()).hexdigest()[:16] |
| | conn = sqlite3.connect(self.db_path) |
| | c = conn.cursor() |
| | c.execute("""SELECT results, expiry_at FROM search_cache |
| | WHERE id = ? AND expiry_at > ?""", (cache_id, datetime.now().isoformat())) |
| | row = c.fetchone() |
| | conn.close() |
| | if row: |
| | return json.loads(row[0]) |
| | return None |
| | |
| | def learn_pattern(self, input_pattern: str, output_pattern: str, pattern_type: str, success: bool): |
| | input_pattern = ensure_string(input_pattern) |
| | output_pattern = ensure_string(output_pattern) |
| | pattern_type = ensure_string(pattern_type) |
| | pattern_id = hashlib.md5(f"{pattern_type}{input_pattern}".encode()).hexdigest()[:16] |
| | now = datetime.now().isoformat() |
| | conn = sqlite3.connect(self.db_path) |
| | c = conn.cursor() |
| | c.execute("SELECT success_count, fail_count FROM learning_patterns WHERE id = ?", (pattern_id,)) |
| | row = c.fetchone() |
| | if row: |
| | success_count, fail_count = row |
| | if success: |
| | success_count += 1 |
| | else: |
| | fail_count += 1 |
| | confidence = success_count / (success_count + fail_count + 1) |
| | c.execute("""UPDATE learning_patterns |
| | SET success_count = ?, fail_count = ?, confidence = ?, updated_at = ? WHERE id = ?""", |
| | (success_count, fail_count, confidence, now, pattern_id)) |
| | else: |
| | success_count = 1 if success else 0 |
| | fail_count = 0 if success else 1 |
| | confidence = success_count / (success_count + fail_count + 1) |
| | c.execute("""INSERT INTO learning_patterns |
| | (id, pattern_type, input_pattern, output_pattern, success_count, fail_count, confidence, created_at, updated_at) |
| | VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", |
| | (pattern_id, pattern_type, input_pattern, output_pattern, |
| | success_count, fail_count, confidence, now, now)) |
| | conn.commit() |
| | conn.close() |
| | |
| | def get_learned_patterns(self, pattern_type: str, min_confidence: float = 0.5) -> List[Dict]: |
| | conn = sqlite3.connect(self.db_path) |
| | c = conn.cursor() |
| | c.execute("""SELECT input_pattern, output_pattern, confidence FROM learning_patterns |
| | WHERE pattern_type = ? AND confidence >= ? ORDER BY confidence DESC""", |
| | (pattern_type, min_confidence)) |
| | results = [{"input": r[0], "output": r[1], "confidence": r[2]} for r in c.fetchall()] |
| | conn.close() |
| | return results |
| | |
| | def save_session(self, state): |
| | conn = sqlite3.connect(self.db_path) |
| | c = conn.cursor() |
| | c.execute("""INSERT OR REPLACE INTO sessions |
| | (id, goal, iterations, satisfied, satisfaction_score, final_output, created_at, history) |
| | VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", |
| | (state.session_id, state.goal, state.iteration, 1 if state.satisfied else 0, |
| | state.satisfaction_score, ensure_string(state.final_output), |
| | datetime.now().isoformat(), json.dumps(state.history, ensure_ascii=False))) |
| | conn.commit() |
| | conn.close() |
| | |
| | def _update_access_count(self, memory_ids: List[str]): |
| | conn = sqlite3.connect(self.db_path) |
| | c = conn.cursor() |
| | now = datetime.now().isoformat() |
| | for mid in memory_ids: |
| | c.execute("""UPDATE memories SET access_count = access_count + 1, last_accessed = ? WHERE id = ?""", |
| | (now, mid)) |
| | conn.commit() |
| | conn.close() |
| | |
| | def _cleanup_memories(self, memory_type: str): |
| | config = MEMORY_CONFIG.get(memory_type, {}) |
| | max_items = config.get("max_items", 100) |
| | conn = sqlite3.connect(self.db_path) |
| | c = conn.cursor() |
| | c.execute("""DELETE FROM memories WHERE id IN |
| | (SELECT id FROM memories WHERE memory_type = ? ORDER BY last_accessed DESC LIMIT -1 OFFSET ?)""", |
| | (memory_type, max_items)) |
| | conn.commit() |
| | conn.close() |
| | |
| | def get_memory_stats(self) -> Dict: |
| | conn = sqlite3.connect(self.db_path) |
| | c = conn.cursor() |
| | stats = {} |
| | for mem_type in ["short_term", "mid_term", "long_term"]: |
| | c.execute("SELECT COUNT(*) FROM memories WHERE memory_type = ?", (mem_type,)) |
| | stats[mem_type] = c.fetchone()[0] |
| | c.execute("SELECT COUNT(*) FROM learning_patterns") |
| | stats["patterns"] = c.fetchone()[0] |
| | c.execute("SELECT COUNT(*) FROM sessions") |
| | stats["sessions"] = c.fetchone()[0] |
| | c.execute("SELECT COUNT(*) FROM knowledge") |
| | stats["knowledge"] = c.fetchone()[0] |
| | c.execute("SELECT COUNT(*) FROM search_cache") |
| | stats["search_cache"] = c.fetchone()[0] |
| | conn.close() |
| | return stats |
| | |
| | def get_dashboard_data(self) -> Dict: |
| | conn = sqlite3.connect(self.db_path) |
| | c = conn.cursor() |
| | data = { |
| | "memory": {}, "knowledge": {}, "learning": {}, |
| | "sessions": {}, "elements": {}, "timeline": {} |
| | } |
| | for mem_type in ["short_term", "mid_term", "long_term"]: |
| | c.execute("SELECT COUNT(*) FROM memories WHERE memory_type = ?", (mem_type,)) |
| | data["memory"][mem_type] = c.fetchone()[0] |
| | c.execute("SELECT COUNT(*) FROM knowledge") |
| | data["knowledge"]["total"] = c.fetchone()[0] |
| | c.execute("SELECT AVG(quality_score) FROM knowledge") |
| | avg_quality = c.fetchone()[0] |
| | data["knowledge"]["avg_quality"] = round(avg_quality, 3) if avg_quality else 0 |
| | c.execute("SELECT SUM(access_count) FROM knowledge") |
| | total_access = c.fetchone()[0] |
| | data["knowledge"]["total_access"] = total_access or 0 |
| | c.execute("SELECT COUNT(*) FROM learning_patterns") |
| | data["learning"]["total_patterns"] = c.fetchone()[0] |
| | c.execute("SELECT AVG(confidence) FROM learning_patterns") |
| | avg_conf = c.fetchone()[0] |
| | data["learning"]["avg_confidence"] = round(avg_conf, 3) if avg_conf else 0 |
| | c.execute("SELECT SUM(success_count), SUM(fail_count) FROM learning_patterns") |
| | row = c.fetchone() |
| | success = row[0] or 0 |
| | fail = row[1] or 0 |
| | data["learning"]["success_rate"] = round(success / (success + fail + 1) * 100, 1) |
| | data["learning"]["total_attempts"] = success + fail |
| | c.execute("SELECT COUNT(*) FROM sessions") |
| | data["sessions"]["total"] = c.fetchone()[0] |
| | c.execute("SELECT COUNT(*) FROM sessions WHERE satisfied = 1") |
| | data["sessions"]["completed"] = c.fetchone()[0] |
| | c.execute("SELECT AVG(satisfaction_score) FROM sessions") |
| | avg_sat = c.fetchone()[0] |
| | data["sessions"]["avg_satisfaction"] = round(avg_sat, 3) if avg_sat else 0 |
| | c.execute("SELECT AVG(iterations) FROM sessions") |
| | avg_iter = c.fetchone()[0] |
| | data["sessions"]["avg_iterations"] = round(avg_iter, 1) if avg_iter else 0 |
| | for element in ["土", "金", "水", "木", "火"]: |
| | c.execute("SELECT COUNT(*) FROM knowledge WHERE element = ?", (element,)) |
| | data["elements"][element] = c.fetchone()[0] |
| | data["timeline"]["dates"] = [] |
| | data["timeline"]["knowledge_count"] = [] |
| | data["timeline"]["memory_count"] = [] |
| | data["timeline"]["session_count"] = [] |
| | for i in range(6, -1, -1): |
| | date = (datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d") |
| | data["timeline"]["dates"].append(date[-5:]) |
| | c.execute("SELECT COUNT(*) FROM knowledge WHERE DATE(created_at) = ?", (date,)) |
| | data["timeline"]["knowledge_count"].append(c.fetchone()[0]) |
| | c.execute("SELECT COUNT(*) FROM memories WHERE DATE(created_at) = ?", (date,)) |
| | data["timeline"]["memory_count"].append(c.fetchone()[0]) |
| | c.execute("SELECT COUNT(*) FROM sessions WHERE DATE(created_at) = ?", (date,)) |
| | data["timeline"]["session_count"].append(c.fetchone()[0]) |
| | |
| | |
| | c.execute("SELECT quality_score FROM knowledge WHERE quality_score IS NOT NULL") |
| | quality_scores = [row[0] for row in c.fetchall()] |
| | quality_dist = { |
| | "very_low": sum(1 for q in quality_scores if 0 <= q < 0.2), |
| | "low": sum(1 for q in quality_scores if 0.2 <= q < 0.4), |
| | "medium": sum(1 for q in quality_scores if 0.4 <= q < 0.6), |
| | "high": sum(1 for q in quality_scores if 0.6 <= q < 0.8), |
| | "very_high": sum(1 for q in quality_scores if 0.8 <= q <= 1.0) |
| | } |
| | data["quality_distribution"] = quality_dist |
| | |
| | |
| | element_performance = {} |
| | for element in ["土", "金", "水", "木", "火"]: |
| | c.execute("SELECT AVG(quality_score), COUNT(*) FROM knowledge WHERE element = ?", (element,)) |
| | row = c.fetchone() |
| | avg_quality = (row[0] * 100) if row[0] else 0 |
| | count = row[1] |
| | element_performance[element] = { |
| | "avg_quality": avg_quality, |
| | "avg_speed": min(100, count * 3), |
| | "count": count |
| | } |
| | data["element_performance"] = element_performance |
| | |
| | intelligence_score = self._calculate_intelligence_score(data) |
| | data["intelligence"] = intelligence_score |
| | conn.close() |
| | return data |
| | |
| | def _calculate_intelligence_score(self, data: Dict) -> Dict: |
| | scores = {} |
| | total_memory = sum(data["memory"].values()) |
| | long_term_ratio = data["memory"].get("long_term", 0) / max(total_memory, 1) |
| | scores["memory_score"] = min(100, total_memory * 2 + long_term_ratio * 50) |
| | knowledge_count = data["knowledge"].get("total", 0) |
| | avg_quality = data["knowledge"].get("avg_quality", 0) |
| | scores["knowledge_score"] = min(100, knowledge_count * 1.5 + avg_quality * 30) |
| | patterns = data["learning"].get("total_patterns", 0) |
| | success_rate = data["learning"].get("success_rate", 0) |
| | scores["learning_score"] = min(100, patterns * 3 + success_rate * 0.5) |
| | sessions = data["sessions"].get("total", 0) |
| | completed = data["sessions"].get("completed", 0) |
| | completion_rate = completed / max(sessions, 1) * 100 |
| | scores["experience_score"] = min(100, sessions * 2 + completion_rate * 0.3) |
| | scores["total"] = round( |
| | scores["memory_score"] * 0.2 + |
| | scores["knowledge_score"] * 0.3 + |
| | scores["learning_score"] * 0.25 + |
| | scores["experience_score"] * 0.25, 1) |
| | total = scores["total"] |
| | if total < 20: |
| | scores["level"], scores["level_name"] = "🌱 초보", "Novice" |
| | elif total < 40: |
| | scores["level"], scores["level_name"] = "🌿 성장", "Growing" |
| | elif total < 60: |
| | scores["level"], scores["level_name"] = "🌳 숙련", "Skilled" |
| | elif total < 80: |
| | scores["level"], scores["level_name"] = "🎋 전문", "Expert" |
| | else: |
| | scores["level"], scores["level_name"] = "🏆 마스터", "Master" |
| | return scores |
| |
|
| | |
| |
|
| | class EmergenceEngine: |
| | def __init__(self, creat_json_path: str = CREAT_JSON_PATH): |
| | self.creat_data = self._load_creat_json(creat_json_path) |
| | |
| | def _load_creat_json(self, path: str) -> Dict: |
| | if Path(path).exists(): |
| | with open(path, "r", encoding="utf-8") as f: |
| | return json.load(f) |
| | return {"_meta": {"description": "기본 창발 매트릭스"}} |
| | |
| | def _detect_question_type(self, context: str) -> str: |
| | if any(kw in context for kw in ["예측", "전망", "될까", "될 것", "미래", "2025", "2026", "2027", "2028"]): |
| | return "예측" |
| | elif any(kw in context for kw in ["전략", "방법", "어떻게", "방안", "계획", "수립"]): |
| | return "전략" |
| | elif any(kw in context for kw in ["비교", "차이", "vs", "VS", "대", "어느 것"]): |
| | return "비교" |
| | elif any(kw in context for kw in ["분석", "왜", "원인", "영향", "현황"]): |
| | return "분석" |
| | elif any(kw in context for kw in ["특허", "발명", "아이디어", "혁신", "신기술"]): |
| | return "발명" |
| | elif any(kw in context for kw in ["소설", "스토리", "시나리오", "웹툰", "드라마", "영화", "캐릭터", "플롯"]): |
| | return "스토리" |
| | elif any(kw in context for kw in ["요리", "레시피", "음식", "만들기", "조리", "맛", "재료"]): |
| | return "레시피" |
| | else: |
| | return "일반" |
| | |
| | def _detect_domains(self, context: str) -> List[str]: |
| | domains = [] |
| | if any(kw in context for kw in ["패권", "전쟁", "국제", "외교", "안보", "군사", "중국", "미국", "러시아", "NATO", "트럼프", "시진핑"]): |
| | domains.append("geopolitical") |
| | if any(kw in context for kw in ["경제", "금융", "주식", "투자", "GDP", "인플레이션", "금리", "달러", "환율", "무역", "관세"]): |
| | domains.append("economic") |
| | if any(kw in context for kw in ["기술", "AI", "반도체", "테크", "혁신", "디지털", "양자", "바이오", "로봇", "자율주행"]): |
| | domains.append("technology") |
| | if any(kw in context for kw in ["사업", "비즈니스", "스타트업", "마케팅", "전략", "경쟁", "시장", "매출", "고객"]): |
| | domains.append("business") |
| | if any(kw in context for kw in ["특허", "발명", "아이디어", "혁신", "신기술", "청구항"]): |
| | domains.append("invention") |
| | if any(kw in context for kw in ["소설", "스토리", "시나리오", "웹툰", "드라마", "영화", "캐릭터", "플롯", "장르"]): |
| | domains.append("story") |
| | if any(kw in context for kw in ["요리", "레시피", "음식", "조리", "맛", "재료", "식당", "셰프"]): |
| | domains.append("recipe") |
| | return domains if domains else ["general"] |
| | |
| | def generate_combinations(self, context: str, max_combinations: int = 30) -> List[Dict]: |
| | combinations = [] |
| | question_type = self._detect_question_type(context) |
| | domains = self._detect_domains(context) |
| | if "geopolitical" in domains and "geopolitical_analysis" in self.creat_data: |
| | geo = self.creat_data["geopolitical_analysis"] |
| | for dim in geo.get("power_dimensions", [])[:4]: |
| | combo = { |
| | "category": "지정학분석", "dimension": dim["type"], |
| | "idea": f"'{context}'를 {dim['type']} 관점에서 분석", |
| | "score": random.uniform(0.75, 0.95) |
| | } |
| | combinations.append(combo) |
| | if "technology" in domains and "technology_trends" in self.creat_data: |
| | tech = self.creat_data["technology_trends"] |
| | for frontier in tech.get("current_frontiers", []): |
| | combo = { |
| | "category": "기술프론티어", "domain": frontier["domain"], |
| | "idea": f"'{frontier['domain']}' 기술 관점에서 분석", |
| | "score": random.uniform(0.7, 0.9) |
| | } |
| | combinations.append(combo) |
| | if "business" in domains and "business_strategy" in self.creat_data: |
| | biz = self.creat_data["business_strategy"] |
| | for framework in biz.get("frameworks", [])[:4]: |
| | combo = { |
| | "category": "비즈니스프레임워크", "framework": framework["name"], |
| | "idea": f"'{framework['name']}' 프레임워크 적용", |
| | "score": random.uniform(0.65, 0.85) |
| | } |
| | combinations.append(combo) |
| | combinations.sort(key=lambda x: x["score"], reverse=True) |
| | return combinations[:max_combinations] |
| | |
| | def cross_combine(self, ideas: List[Dict], max_cross: int = 10) -> List[Dict]: |
| | cross_ideas = [] |
| | if len(ideas) < 2: |
| | return cross_ideas |
| | top_ideas = ideas[:min(6, len(ideas))] |
| | for i, idea1 in enumerate(top_ideas): |
| | for idea2 in top_ideas[i+1:]: |
| | if idea1["category"] != idea2["category"]: |
| | cross = { |
| | "category": "교차조합", |
| | "sources": [idea1["category"], idea2["category"]], |
| | "idea": f"[{idea1['category']} + {idea2['category']}] 융합 아이디어", |
| | "score": (idea1["score"] + idea2["score"]) / 2 * 1.15 |
| | } |
| | cross_ideas.append(cross) |
| | if len(cross_ideas) >= max_cross: |
| | break |
| | if len(cross_ideas) >= max_cross: |
| | break |
| | return cross_ideas |
| | |
| | def format_for_prompt(self, combinations: List[Dict]) -> str: |
| | lines = ["[브루트포스 창발 매트릭스]", ""] |
| | by_category = {} |
| | for combo in combinations[:20]: |
| | cat = combo["category"] |
| | if cat not in by_category: |
| | by_category[cat] = [] |
| | by_category[cat].append(combo) |
| | idx = 1 |
| | for cat, combos in by_category.items(): |
| | lines.append(f"━━ {cat} ━━") |
| | for combo in combos[:4]: |
| | score_bar = "●" * int(combo["score"] * 5) + "○" * (5 - int(combo["score"] * 5)) |
| | lines.append(f"{idx}. {score_bar} {combo['idea'][:80]}") |
| | idx += 1 |
| | lines.append("") |
| | return "\n".join(lines) |
| |
|
| | |
| |
|
| | class LLMClient: |
| | MODEL_LEVELS = { |
| | "LOW": {"name": "⚡ gpt-oss-120b", "provider": "groq", "model": "openai/gpt-oss-120b", "description": "빠른 응답, 기본 분석", "disabled": False}, |
| | "MIDDLE": {"name": "🔥 GLM-4.7", "provider": "fireworks", "model": "accounts/fireworks/models/glm-4p7", "description": "균형잡힌 성능, 심층 분석", "disabled": False}, |
| | "HIGH": {"name": "🧠 Claude 4.5 Sonnet", "provider": "replicate", "model": "anthropic/claude-4.5-sonnet", "description": "최고 품질, 심층 추론", "disabled": False} |
| | } |
| | |
| | def __init__(self, model_level: str = "LOW"): |
| | self.model_level = model_level |
| | self._setup_client() |
| | |
| | def _setup_client(self): |
| | level_config = self.MODEL_LEVELS.get(self.model_level, self.MODEL_LEVELS["LOW"]) |
| | self.provider = level_config["provider"] |
| | self.model = level_config["model"] |
| | self.client = None |
| | if self.provider == "groq": |
| | self.api_key = os.getenv("GROQ_API_KEY", "") |
| | if HAS_GROQ and self.api_key: |
| | try: |
| | self.client = Groq(api_key=self.api_key) |
| | except Exception as e: |
| | print(f"⚠️ Groq 클라이언트 초기화 실패: {e}") |
| | elif self.provider == "fireworks": |
| | self.api_key = os.getenv("FIREWORKS_API_KEY", "") |
| | self.api_url = "https://api.fireworks.ai/inference/v1/chat/completions" |
| | elif self.provider == "replicate": |
| | self.api_key = os.getenv("REPLICATE_API_TOKEN", "") |
| | if self.api_key: |
| | os.environ["REPLICATE_API_TOKEN"] = self.api_key |
| | |
| | def set_model_level(self, level: str) -> bool: |
| | if level in self.MODEL_LEVELS and not self.MODEL_LEVELS[level].get("disabled"): |
| | self.model_level = level |
| | self._setup_client() |
| | return True |
| | return False |
| | |
| | def get_model_info(self) -> str: |
| | config = self.MODEL_LEVELS.get(self.model_level, {}) |
| | return f"{config.get('name', 'Unknown')}" |
| | |
| | def chat(self, system_prompt: str, user_prompt: str, stream: bool = True) -> Generator[str, None, None]: |
| | if self.provider == "groq": |
| | yield from self._chat_groq(system_prompt, user_prompt, stream) |
| | elif self.provider == "fireworks": |
| | yield from self._chat_fireworks(system_prompt, user_prompt, stream) |
| | elif self.provider == "replicate": |
| | yield from self._chat_replicate(system_prompt, user_prompt, stream) |
| | else: |
| | yield from self._chat_groq(system_prompt, user_prompt, stream) |
| | |
| | def _chat_groq(self, system_prompt: str, user_prompt: str, stream: bool = True) -> Generator[str, None, None]: |
| | if not HAS_GROQ: |
| | yield "[ERROR] groq 라이브러리 미설치" |
| | return |
| | if not self.api_key: |
| | yield "[ERROR] GROQ_API_KEY 미설정" |
| | return |
| | if not self.client: |
| | try: |
| | self.client = Groq(api_key=self.api_key) |
| | except Exception as e: |
| | yield f"[ERROR] Groq 클라이언트 생성 실패: {e}" |
| | return |
| | try: |
| | messages = [ |
| | {"role": "system", "content": system_prompt}, |
| | {"role": "user", "content": user_prompt} |
| | ] |
| | if stream: |
| | completion = self.client.chat.completions.create( |
| | model=self.model, messages=messages, |
| | temperature=0.7, max_completion_tokens=8192, top_p=1, stream=True |
| | ) |
| | for chunk in completion: |
| | try: |
| | if chunk.choices and len(chunk.choices) > 0: |
| | delta = chunk.choices[0].delta |
| | if delta and delta.content: |
| | yield delta.content |
| | except: |
| | continue |
| | else: |
| | completion = self.client.chat.completions.create( |
| | model=self.model, messages=messages, |
| | temperature=0.7, max_completion_tokens=8192, top_p=1, stream=False |
| | ) |
| | if completion.choices and len(completion.choices) > 0: |
| | yield completion.choices[0].message.content or "" |
| | except Exception as e: |
| | yield f"\n[ERROR] Groq API: {str(e)[:150]}" |
| | |
| | def _chat_fireworks(self, system_prompt: str, user_prompt: str, stream: bool = True) -> Generator[str, None, None]: |
| | if not self.api_key: |
| | yield "[ERROR] FIREWORKS_API_KEY 미설정" |
| | return |
| | try: |
| | headers = { |
| | "Accept": "application/json", |
| | "Content-Type": "application/json", |
| | "Authorization": f"Bearer {self.api_key}" |
| | } |
| | messages = [ |
| | {"role": "system", "content": system_prompt}, |
| | {"role": "user", "content": user_prompt} |
| | ] |
| | payload = { |
| | "model": self.model, "max_tokens": 8192, "top_p": 1, "top_k": 40, |
| | "presence_penalty": 0, "frequency_penalty": 0, "temperature": 0.7, |
| | "messages": messages, "stream": stream |
| | } |
| | if stream: |
| | response = requests.post(self.api_url, headers=headers, json=payload, stream=True, timeout=120) |
| | for line in response.iter_lines(): |
| | if line: |
| | line_text = line.decode('utf-8') |
| | if line_text.startswith("data: "): |
| | data_str = line_text[6:] |
| | if data_str.strip() == "[DONE]": |
| | break |
| | try: |
| | data = json.loads(data_str) |
| | if data.get("choices") and data["choices"][0].get("delta", {}).get("content"): |
| | yield data["choices"][0]["delta"]["content"] |
| | except: |
| | continue |
| | else: |
| | response = requests.post(self.api_url, headers=headers, json=payload, timeout=120) |
| | result = response.json() |
| | if result.get("choices"): |
| | yield result["choices"][0].get("message", {}).get("content", "") |
| | except Exception as e: |
| | yield f"\n[ERROR] Fireworks API: {str(e)[:150]}" |
| | |
| | def _chat_replicate(self, system_prompt: str, user_prompt: str, stream: bool = True) -> Generator[str, None, None]: |
| | if not self.api_key: |
| | yield "[ERROR] REPLICATE_API_TOKEN 미설정" |
| | return |
| | try: |
| | import replicate |
| | full_prompt = f"""<system> |
| | {system_prompt} |
| | </system> |
| | <user> |
| | {user_prompt} |
| | </user>""" |
| | input_data = {"prompt": full_prompt, "max_tokens": 8192} |
| | if stream: |
| | for event in replicate.stream(self.model, input=input_data): |
| | yield str(event) |
| | else: |
| | output = replicate.run(self.model, input=input_data) |
| | if hasattr(output, '__iter__') and not isinstance(output, str): |
| | yield "".join(str(chunk) for chunk in output) |
| | else: |
| | yield str(output) |
| | except ImportError: |
| | yield "[ERROR] replicate 라이브러리 미설치. pip install replicate" |
| | except Exception as e: |
| | yield f"\n[ERROR] Replicate API: {str(e)[:150]}" |
| | |
| | def chat_sync(self, system_prompt: str, user_prompt: str) -> str: |
| | return "".join(self.chat(system_prompt, user_prompt, stream=False)) |
| |
|
| | |
| |
|
| | def create_dashboard_charts(data: Dict) -> Tuple: |
| | """Create intuitive AGI intelligence evolution dashboard charts""" |
| | import matplotlib.pyplot as plt |
| | import matplotlib |
| | matplotlib.use('Agg') |
| | plt.rcParams['font.family'] = ['DejaVu Sans', 'sans-serif'] |
| | plt.rcParams['axes.unicode_minus'] = False |
| | |
| | charts = {} |
| | |
| | |
| | fig1, ax1 = plt.subplots(figsize=(10, 5), facecolor='#FEF9C3') |
| | ax1.set_facecolor('#FEF9C3') |
| | timeline = data.get("timeline", {}) |
| | dates = timeline.get("dates", []) |
| | if dates and len(dates) > 0: |
| | knowledge_counts = timeline.get("knowledge_count", []) |
| | memory_counts = timeline.get("memory_count", []) |
| | iq_scores = [] |
| | base_iq = 20 |
| | for i, (k, m) in enumerate(zip(knowledge_counts, memory_counts)): |
| | daily_growth = (k * 2 + m * 0.5) |
| | base_iq += daily_growth |
| | iq_scores.append(min(100, base_iq)) |
| | line1 = ax1.plot(dates, iq_scores, marker='o', linewidth=3, |
| | color='#FACC15', markersize=8, label='Total IQ', zorder=3) |
| | ax1.fill_between(dates, iq_scores, alpha=0.3, color='#FACC15') |
| | if len(iq_scores) > 1: |
| | growth = iq_scores[-1] - iq_scores[0] |
| | growth_pct = (growth / iq_scores[0] * 100) if iq_scores[0] > 0 else 0 |
| | ax1.text(0.02, 0.98, f'📈 Growth: +{growth:.1f} ({growth_pct:+.1f}%)', |
| | transform=ax1.transAxes, fontsize=11, fontweight='bold', |
| | va='top', bbox=dict(boxstyle='round', facecolor='#10B981', alpha=0.8, edgecolor='#1F2937')) |
| | ax1.set_title('🧠 Intelligence Evolution Timeline', fontsize=14, fontweight='bold', pad=15) |
| | ax1.set_ylabel('IQ Score', fontsize=11, fontweight='bold') |
| | ax1.set_xlabel('Date', fontsize=11, fontweight='bold') |
| | ax1.grid(True, alpha=0.2, linestyle='--') |
| | ax1.spines['top'].set_visible(False) |
| | ax1.spines['right'].set_visible(False) |
| | ax1.legend(loc='lower right', fontsize=10) |
| | ax1.set_ylim(0, 105) |
| | else: |
| | ax1.text(0.5, 0.5, 'No Timeline Data', ha='center', va='center', fontsize=14, fontweight='bold') |
| | plt.tight_layout() |
| | charts["intelligence_timeline"] = fig1 |
| | |
| | |
| | fig2, ax2 = plt.subplots(figsize=(6, 4), facecolor='#FEF9C3') |
| | ax2.set_facecolor('#FEF9C3') |
| | quality_dist = data.get("quality_distribution", {}) |
| | if quality_dist: |
| | bins = ['0-20%', '20-40%', '40-60%', '60-80%', '80-100%'] |
| | counts = [ |
| | quality_dist.get('very_low', 0), |
| | quality_dist.get('low', 0), |
| | quality_dist.get('medium', 0), |
| | quality_dist.get('high', 0), |
| | quality_dist.get('very_high', 0) |
| | ] |
| | colors = ['#EF4444', '#F59E0B', '#FACC15', '#10B981', '#059669'] |
| | bars = ax2.bar(bins, counts, color=colors, edgecolor='#1F2937', linewidth=2) |
| | for bar in bars: |
| | height = bar.get_height() |
| | if height > 0: |
| | ax2.text(bar.get_x() + bar.get_width()/2, height + 0.5, |
| | int(height), ha='center', fontsize=10, fontweight='bold') |
| | ax2.set_title('📊 Knowledge Quality Distribution', fontsize=13, fontweight='bold') |
| | ax2.set_ylabel('Count', fontsize=10, fontweight='bold') |
| | ax2.set_xlabel('Quality Range', fontsize=10, fontweight='bold') |
| | ax2.spines['top'].set_visible(False) |
| | ax2.spines['right'].set_visible(False) |
| | else: |
| | ax2.text(0.5, 0.5, 'No Quality Data', ha='center', va='center', fontsize=12, fontweight='bold') |
| | plt.tight_layout() |
| | charts["quality_dist"] = fig2 |
| | |
| | |
| | fig3, ax3 = plt.subplots(figsize=(7, 4), facecolor='#FEF9C3') |
| | ax3.set_facecolor('#FEF9C3') |
| | elements = data.get("elements", {}) |
| | elem_performance = data.get("element_performance", {}) |
| | elem_names = ['土\nEarth', '金\nMetal', '水\nWater', '木\nWood', '火\nFire'] |
| | metrics = ['Activity', 'Quality', 'Speed'] |
| | matrix = [] |
| | for elem in ['土', '金', '水', '木', '火']: |
| | perf = elem_performance.get(elem, {}) |
| | row = [ |
| | min(100, elements.get(elem, 0) * 2), |
| | perf.get('avg_quality', 50), |
| | perf.get('avg_speed', 50) |
| | ] |
| | matrix.append(row) |
| | import numpy as np |
| | matrix = np.array(matrix) |
| | im = ax3.imshow(matrix.T, cmap='RdYlGn', aspect='auto', vmin=0, vmax=100) |
| | ax3.set_xticks(np.arange(len(elem_names))) |
| | ax3.set_yticks(np.arange(len(metrics))) |
| | ax3.set_xticklabels(elem_names, fontsize=10, fontweight='bold') |
| | ax3.set_yticklabels(metrics, fontsize=10, fontweight='bold') |
| | for i in range(len(metrics)): |
| | for j in range(len(elem_names)): |
| | text = ax3.text(j, i, f'{matrix[j, i]:.0f}', |
| | ha="center", va="center", color="black", fontsize=9, fontweight='bold') |
| | ax3.set_title('🔥 Five Elements Performance Matrix', fontsize=13, fontweight='bold', pad=10) |
| | cbar = plt.colorbar(im, ax=ax3, orientation='horizontal', pad=0.1, aspect=30) |
| | cbar.set_label('Performance Score', fontsize=9, fontweight='bold') |
| | plt.tight_layout() |
| | charts["element_heatmap"] = fig3 |
| | |
| | |
| | fig4, ax4 = plt.subplots(figsize=(6, 4), facecolor='#FEF9C3') |
| | ax4.set_facecolor('#FEF9C3') |
| | learning = data.get("learning", {}) |
| | success_rate = learning.get("success_rate", 0) |
| | categories = ['Success\nRate', 'Pattern\nConfidence', 'Knowledge\nReuse'] |
| | scores = [ |
| | success_rate, |
| | learning.get("avg_confidence", 0) * 100, |
| | data.get("knowledge", {}).get("avg_quality", 0) * 100 |
| | ] |
| | x = np.arange(len(categories)) |
| | bars = ax4.bar(x, scores, color=['#10B981', '#3B82F6', '#8B5CF6'], |
| | edgecolor='#1F2937', linewidth=2, width=0.6) |
| | for bar, score in zip(bars, scores): |
| | ax4.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 2, |
| | f'{score:.1f}%', ha='center', fontsize=11, fontweight='bold') |
| | ax4.set_ylim(0, 110) |
| | ax4.set_xticks(x) |
| | ax4.set_xticklabels(categories, fontsize=10, fontweight='bold') |
| | ax4.set_ylabel('Score (%)', fontsize=10, fontweight='bold') |
| | ax4.set_title('⚡ Learning Efficiency Metrics', fontsize=13, fontweight='bold') |
| | ax4.spines['top'].set_visible(False) |
| | ax4.spines['right'].set_visible(False) |
| | ax4.axhline(y=70, color='#FACC15', linestyle='--', linewidth=1.5, alpha=0.5, label='Target: 70%') |
| | ax4.legend(loc='upper right', fontsize=8) |
| | plt.tight_layout() |
| | charts["learning_efficiency"] = fig4 |
| | |
| | |
| | fig5, ax5 = plt.subplots(figsize=(6, 4), facecolor='#FEF9C3') |
| | ax5.set_facecolor('#FEF9C3') |
| | memory_data = data.get("memory", {}) |
| | short = memory_data.get("short_term", 0) |
| | mid = memory_data.get("mid_term", 0) |
| | long = memory_data.get("long_term", 0) |
| | total = short + mid + long |
| | if total > 0: |
| | stages = ['Input', 'Short→Mid', 'Mid→Long', 'Long-term\nStorage'] |
| | short_flow = [total, short, 0, 0] |
| | mid_flow = [0, mid * 0.6, mid, 0] |
| | long_flow = [0, 0, long * 0.3, long] |
| | ax5.barh(stages, short_flow, color='#EF4444', edgecolor='#1F2937', linewidth=1, label='Short-term') |
| | ax5.barh(stages, mid_flow, left=short_flow, color='#3B82F6', edgecolor='#1F2937', linewidth=1, label='Mid-term') |
| | ax5.barh(stages, long_flow, left=[s+m for s,m in zip(short_flow, mid_flow)], |
| | color='#10B981', edgecolor='#1F2937', linewidth=1, label='Long-term') |
| | ax5.set_title('💾 Memory Promotion Flow', fontsize=13, fontweight='bold') |
| | ax5.set_xlabel('Memory Count', fontsize=10, fontweight='bold') |
| | ax5.legend(loc='lower right', fontsize=9) |
| | ax5.spines['top'].set_visible(False) |
| | ax5.spines['right'].set_visible(False) |
| | else: |
| | ax5.text(0.5, 0.5, 'No Memory Data', ha='center', va='center', fontsize=12, fontweight='bold') |
| | plt.tight_layout() |
| | charts["memory_flow"] = fig5 |
| | |
| | |
| | fig6, ax6 = plt.subplots(figsize=(5, 4), facecolor='#FEF9C3') |
| | ax6.set_facecolor('#FEF9C3') |
| | sessions = data.get("sessions", {}) |
| | total_sess = sessions.get("total", 0) |
| | completed = sessions.get("completed", 0) |
| | avg_satisfaction = sessions.get("avg_satisfaction", 0) |
| | if total_sess > 0: |
| | completion_rate = (completed / total_sess) * 100 |
| | sizes = [completion_rate, 100 - completion_rate] |
| | colors = ['#10B981', '#E5E7EB'] |
| | wedges, texts, autotexts = ax6.pie(sizes, colors=colors, startangle=90, |
| | wedgeprops=dict(width=0.4, edgecolor='#1F2937', linewidth=2), |
| | autopct='%1.1f%%', textprops={'fontweight': 'bold', 'fontsize': 11}) |
| | ax6.text(0, 0, f'{completed}/{total_sess}\nCompleted', |
| | ha='center', va='center', fontsize=13, fontweight='bold', color='#1F2937') |
| | ax6.set_title(f'✅ Session Success Rate\n(Avg Satisfaction: {avg_satisfaction:.2f})', |
| | fontsize=12, fontweight='bold', pad=10) |
| | else: |
| | ax6.text(0.5, 0.5, 'No Sessions', ha='center', va='center', fontsize=12, fontweight='bold') |
| | plt.tight_layout() |
| | charts["session_success"] = fig6 |
| | |
| | return (charts.get("intelligence_timeline"), |
| | charts.get("quality_dist"), |
| | charts.get("element_heatmap"), |
| | charts.get("learning_efficiency"), |
| | charts.get("memory_flow"), |
| | charts.get("session_success")) |