kumru2b-chat / app.py
ilkerkara's picture
Refactor HTML structure in chatbot interface: remove background color for improved styling
b6202eb
# -*- coding: utf-8 -*-
"""
Kumru 2B Chat - HF Spaces
"""
import os
import json
import time
import threading
import urllib.parse
import re
import uuid
from pathlib import Path
from typing import List, Dict, Optional, Union, Any
import gradio as gr
try:
from huggingface_hub import InferenceClient, snapshot_download, hf_hub_download
except Exception:
InferenceClient = None
snapshot_download = None
hf_hub_download = None
try:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
except Exception:
torch = None
AutoModelForCausalLM = None
AutoTokenizer = None
TextIteratorStreamer = None
try:
from ddgs import DDGS
except Exception:
DDGS = None
try:
from pypdf import PdfReader
except Exception:
PdfReader = None
try:
import docx
except Exception:
docx = None
try:
import structlog
from structlog.stdlib import BoundLogger, ProcessorFormatter
except Exception:
structlog = None
BoundLogger = None
ProcessorFormatter = None
# Web Agent dependencies
try:
import httpx
from bs4 import BeautifulSoup
import trafilatura
from newspaper import Article, Config
from fake_useragent import UserAgent
import feedparser
from googletrans import Translator
from selectolax.parser import HTMLParser
except Exception as e:
httpx = None
BeautifulSoup = None
trafilatura = None
Article = None
Config = None
UserAgent = None
feedparser = None
Translator = None
HTMLParser = None
try:
import platform
# Only import MLX on macOS as it's not stable on Linux
if platform.system() == "Darwin":
from mlx_lm import load as mlx_load, generate as mlx_generate
else:
mlx_load = None
mlx_generate = None
except Exception:
mlx_load = None
mlx_generate = None
try:
from peft import PeftModel
except Exception:
PeftModel = None
import logging
import requests
import sys
MODEL_ID = os.environ.get("KUMRU_MODEL_ID", "vngrs-ai/Kumru-2B")
MAX_FILE_BYTES = 5 * 1024 * 1024
MAX_PDF_PAGES = 8
MLX_REPO = "ibraschwan/Kumru-2B-mlx-4Bit"
BASE_REPO = "vngrs-ai/Kumru-2B-Base"
LORA_REPO = "ceofast/kumru-2b-lora"
def _setup_logger() -> Union["BoundLogger", logging.Logger]:
if structlog is not None and ProcessorFormatter is not None:
root = logging.getLogger()
root.setLevel(logging.INFO)
for h in list(root.handlers):
root.removeHandler(h)
json_handler = logging.StreamHandler(sys.stdout)
json_handler.setLevel(logging.INFO)
json_handler.setFormatter(
ProcessorFormatter(
processor=structlog.processors.JSONRenderer(),
foreign_pre_chain=[
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
],
)
)
root.addHandler(json_handler)
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(
ProcessorFormatter(
processor=structlog.dev.ConsoleRenderer(colors=True),
foreign_pre_chain=[
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
],
)
)
root.addHandler(console_handler)
structlog.configure(
processors=[
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
return structlog.get_logger("kumru")
logger = logging.getLogger("kumru")
if not logger.handlers:
logger.setLevel(logging.INFO)
h = logging.StreamHandler(sys.stdout)
h.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(h)
class _JSONAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
if isinstance(msg, dict):
msg = json.dumps(msg, ensure_ascii=False)
return msg, kwargs
return _JSONAdapter(logger, {})
return logger
log = _setup_logger()
class WebAgent:
"""Advanced web agent for intelligent content extraction and analysis"""
def __init__(self):
self.session = None
self.user_agent = None
self.translator = None
self._init_components()
def _init_components(self):
"""Initialize web agent components"""
try:
if httpx is not None:
self.session = httpx.Client(
timeout=30.0,
follow_redirects=True,
limits=httpx.Limits(max_keepalive_connections=5, max_connections=10)
)
if UserAgent is not None:
self.user_agent = UserAgent()
if Translator is not None:
self.translator = Translator()
except Exception:
pass
def get_headers(self) -> Dict[str, str]:
"""Get randomized headers for web requests"""
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'tr-TR,tr;q=0.9,en-US;q=0.8,en;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0',
}
if self.user_agent:
try:
headers['User-Agent'] = self.user_agent.random
except Exception:
headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
return headers
def extract_content(self, url: str) -> Dict[str, str]:
"""Extract clean content from URL using multiple methods"""
result = {
'url': url,
'title': '',
'content': '',
'summary': '',
'publish_date': '',
'authors': [],
'method': 'failed'
}
if not self.session:
return result
try:
# Method 1: Trafilatura (most reliable for news articles)
if trafilatura is not None:
try:
downloaded = trafilatura.fetch_url(url)
if downloaded:
extracted = trafilatura.extract(
downloaded,
include_comments=False,
include_tables=True,
include_formatting=False,
output_format='text'
)
if extracted and len(extracted.strip()) > 100:
metadata = trafilatura.extract_metadata(downloaded)
result.update({
'content': extracted[:5000],
'title': metadata.title if metadata and metadata.title else '',
'publish_date': metadata.date if metadata and metadata.date else '',
'authors': [metadata.author] if metadata and metadata.author else [],
'method': 'trafilatura'
})
return result
except Exception:
pass
# Method 2: Newspaper3k
if Article is not None and Config is not None:
try:
config = Config()
config.browser_user_agent = self.get_headers().get('User-Agent', '')
config.request_timeout = 15
article = Article(url, config=config)
article.download()
article.parse()
if article.text and len(article.text.strip()) > 100:
result.update({
'content': article.text[:5000],
'title': article.title or '',
'summary': article.summary[:500] if article.summary else '',
'publish_date': str(article.publish_date) if article.publish_date else '',
'authors': article.authors or [],
'method': 'newspaper3k'
})
return result
except Exception:
pass
# Method 3: Raw HTTP + BeautifulSoup
if BeautifulSoup is not None:
try:
response = self.session.get(url, headers=self.get_headers())
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style", "nav", "footer", "header", "aside"]):
script.decompose()
# Extract title
title_elem = soup.find('title')
title = title_elem.get_text().strip() if title_elem else ''
# Extract main content
content_selectors = [
'article', 'main', '.content', '.post', '.entry',
'[role="main"]', '.article-body', '.post-content'
]
content = ''
for selector in content_selectors:
elem = soup.select_one(selector)
if elem:
content = elem.get_text(separator=' ', strip=True)
break
if not content:
# Fallback to body text
content = soup.get_text(separator=' ', strip=True)
if content and len(content.strip()) > 100:
result.update({
'content': content[:5000],
'title': title,
'method': 'beautifulsoup'
})
return result
except Exception:
pass
except Exception:
pass
return result
def search_news(self, query: str, limit: int = 5) -> List[Dict[str, str]]:
"""Search for recent news articles"""
results = []
if not feedparser:
return results
# RSS feeds for Turkish news
rss_feeds = [
'https://www.hurriyet.com.tr/rss/anasayfa',
'https://www.milliyet.com.tr/rss/rssNew/SonDakikaRSS.xml',
'https://www.sabah.com.tr/rss/anasayfa.xml',
'https://www.cnnturk.com/feed/rss/all/news',
'https://www.ntv.com.tr/gundem.rss',
]
query_terms = [term.lower() for term in re.findall(r'\w+', query) if len(term) > 2]
for feed_url in rss_feeds[:3]: # Limit to avoid timeout
try:
feed = feedparser.parse(feed_url)
for entry in feed.entries[:10]: # Limit entries per feed
title = entry.get('title', '').lower()
summary = entry.get('summary', '').lower()
content = f"{title} {summary}"
# Check if query terms match
if any(term in content for term in query_terms):
results.append({
'title': entry.get('title', ''),
'url': entry.get('link', ''),
'snippet': entry.get('summary', '')[:300],
'date': entry.get('published', ''),
'engine': 'rss'
})
if len(results) >= limit:
break
if len(results) >= limit:
break
except Exception:
continue
return results
def get_realtime_info(self, query: str) -> Dict[str, Any]:
"""Get real-time information about the query"""
info = {
'current_time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),
'query': query,
'news': [],
'trends': [],
'summary': ''
}
try:
# Get recent news
news_results = self.search_news(query, limit=3)
info['news'] = news_results
# Extract content from top news
detailed_news = []
for news_item in news_results[:2]: # Limit to avoid timeout
content = self.extract_content(news_item['url'])
if content['content']:
detailed_news.append({
'title': content['title'] or news_item['title'],
'content': content['content'][:1000],
'url': news_item['url'],
'date': content['publish_date'] or news_item['date']
})
info['detailed_news'] = detailed_news
# Create summary
if detailed_news:
summary_parts = []
for news in detailed_news:
summary_parts.append(f"• {news['title']}: {news['content'][:200]}...")
info['summary'] = "\n".join(summary_parts)
except Exception:
pass
return info
def close(self):
"""Clean up resources"""
if self.session:
try:
self.session.close()
except Exception:
pass
# Global web agent instance
web_agent = WebAgent()
def cleanup_memory():
"""Clean up memory for CPU-constrained environments"""
try:
import gc
import torch
# Force garbage collection
gc.collect()
# Clear PyTorch cache if available
if hasattr(torch, 'cuda') and hasattr(torch.cuda, 'empty_cache'):
torch.cuda.empty_cache()
# Clear any remaining tensor caches
if hasattr(torch, 'clear_autocast_cache'):
torch.clear_autocast_cache()
except Exception:
pass
def log_event(event_type: str, **kwargs):
"""Log events for debugging in CPU-constrained environments"""
import json
try:
event_data = {
"timestamp": time.time(),
"event": event_type,
**kwargs
}
# Only log to stdout in development, avoid file I/O in production
if os.environ.get("DEBUG", "").lower() in ("1", "true"):
print(f"LOG: {json.dumps(event_data)}")
# Cleanup memory after logging to prevent accumulation
if event_type in ["model_loaded", "local_generate_done"]:
cleanup_memory()
except Exception:
pass
def _mask_ip(ip: Optional[str]) -> Optional[str]:
if not ip:
return ip
parts = ip.split(".")
if len(parts) == 4:
parts[-1] = "x"
return ".".join(parts)
return ip
def ensure_all_models_on_disk():
if snapshot_download is None:
return
try:
snapshot_download("vngrs-ai/Kumru-2B")
except Exception as e:
log_event("prefetch_error", repo="vngrs-ai/Kumru-2B", error=str(e))
try:
snapshot_download(MLX_REPO)
except Exception as e:
log_event("prefetch_error", repo=MLX_REPO, error=str(e))
try:
snapshot_download(BASE_REPO)
snapshot_download(LORA_REPO)
except Exception as e:
log_event("prefetch_error", repo="base_or_lora", error=str(e))
log_event("prefetch_done", repos=["vngrs-ai/Kumru-2B", MLX_REPO, BASE_REPO, LORA_REPO])
ensure_all_models_on_disk()
def build_prompt(system_message: str, history: List[Dict[str, str]], user_msg: str) -> str:
lines = []
if system_message:
lines.append(f"System: {system_message.strip()}")
lines.append(f"User: {user_msg.strip()}")
lines.append("Assistant:")
return "\n".join(lines)
def attach_context(prompt: str, web_context: Optional[str], file_context: Optional[str]) -> str:
blocks = [prompt]
if web_context:
blocks.append("\n[Evidence]\n" + web_context.strip())
if file_context:
blocks.append("\n[Files]\n" + file_context.strip())
return "\n\n".join(blocks)
def read_file(path: str) -> str:
p = Path(path)
suffix = p.suffix.lower()
try:
if p.stat().st_size > MAX_FILE_BYTES:
return f"{p.name}: [SKIPPED: file too large]"
except Exception:
return f"{p.name}: [ERROR: unreadable file]"
if suffix in [".txt", ".md", ".csv", ".json"]:
return f"{p.name}:\n{p.read_text(encoding='utf-8', errors='ignore')[:20000]}"
if suffix == ".pdf" and PdfReader is not None:
text_parts = []
try:
with open(p, "rb") as f:
reader = PdfReader(f)
for page in reader.pages[:MAX_PDF_PAGES]:
try:
text_parts.append(page.extract_text() or "")
except Exception:
pass
except Exception as e:
return f"{p.name}: [ERROR {e}]"
return f"{p.name} (first {min(MAX_PDF_PAGES, len(text_parts))} pages):\n" + "\n".join(text_parts)[:20000]
if suffix == ".docx" and docx is not None:
try:
d = docx.Document(str(p))
text = "\n".join([para.text for para in d.paragraphs])
return f"{p.name}:\n{text[:20000]}"
except Exception as e:
return f"{p.name}: [ERROR {e}]"
return f"{p.name}: [Unsupported type or missing parser]"
def gather_files(files: List[gr.File]) -> str:
if not files:
return ""
texts = []
for f in files:
try:
texts.append(read_file(f.name))
except Exception as e:
texts.append(f"{Path(f.name).name}: [ERROR {e}]")
return "\n\n".join(texts)
_tokenizer = None
_model = None
_backend = "transformers"
_mlx_model = None
_mlx_tokenizer = None
_qr_tokenizer = None
_qr_model = None
_QR_SYSTEM = (
"Görevin: web araması için son isteği kullanarak TEK satırlık, kısa ve odaklı bir Türkçe arama sorgusu üretmek.\n"
"- Özel isimleri ve yılları koru (örn. 2025-2026, Galatasaray).\n"
"- Gereksiz kelimeleri çıkar.\n"
"- En fazla 20 kelime.\n"
"- Sadece sorguyu yaz; açıklama, tırnak, son noktalama yok."
)
def _ensure_qr_model():
global _qr_tokenizer, _qr_model
if AutoTokenizer is None or AutoModelForCausalLM is None:
raise RuntimeError("Transformers is not available. Please install transformers and torch.")
if _qr_tokenizer is None or _qr_model is None:
t0 = time.perf_counter()
_qr_tokenizer = AutoTokenizer.from_pretrained("vngrs-ai/Kumru-2B")
_qr_model = AutoModelForCausalLM.from_pretrained("vngrs-ai/Kumru-2B", dtype="auto", device_map="auto")
try:
_qr_model.eval()
except Exception:
pass
log_event("model_loaded", model_id="vngrs-ai/Kumru-2B", elapsed_ms=int((time.perf_counter() - t0) * 1000))
def postprocess_query(q: str, user_msg: str) -> str:
q = re.sub(r"^[-•*\s]+", "", q).strip()
q = re.sub(r'^[\'"“”‘’]+|[\'"“”‘’]+$', "", q).strip()
q = re.sub(r"[.。!!??]+$", "", q).strip()
if len(q) < 3:
q = user_msg.strip()
return q[:300]
def build_search_query_llm(user_msg: str) -> str:
prompt = f"{_QR_SYSTEM}\n\nSohbet:\nUser: {user_msg.strip()}\n\nSorgu:"
t0 = time.perf_counter()
try:
_ensure_qr_model()
enc = _qr_tokenizer(prompt, return_tensors="pt", return_token_type_ids=False)
enc = {k: v.to(_qr_model.device) for k, v in enc.items()}
with torch.no_grad():
gen = _qr_model.generate(
**enc,
max_new_tokens=48,
do_sample=True,
temperature=0.3,
top_p=0.9,
repetition_penalty=1.05,
)
text = _qr_tokenizer.decode(gen[0], skip_special_tokens=True)
query = postprocess_query(text.split("Sorgu:")[-1].strip().splitlines()[0], user_msg)
log_event("qrewrite_local", elapsed_ms=int((time.perf_counter() - t0) * 1000), query=query)
return query
except Exception as e:
log_event("qrewrite_local_error", error=type(e).__name__, detail=str(e))
try:
if InferenceClient is None:
raise RuntimeError("huggingface_hub not available")
out = InferenceClient(model="vngrs-ai/Kumru-2B").text_generation(
prompt, max_new_tokens=48, temperature=0.3, top_p=0.9, return_full_text=False, stream=False
)
query = postprocess_query((out or "").strip().splitlines()[0], user_msg)
log_event("qrewrite_inference", elapsed_ms=int((time.perf_counter() - t0) * 1000), query=query)
return query
except Exception as e:
log_event("qrewrite_inference_error", error=type(e).__name__, detail=str(e))
fallback = user_msg.strip()[:300]
log_event("qrewrite_fallback", query=fallback)
return fallback
def normalize_url(u: str) -> str:
if not u:
return ""
try:
parsed = urllib.parse.urlsplit(u.strip())
path = parsed.path or "/"
return urllib.parse.urlunsplit((parsed.scheme.lower(), parsed.netloc.lower(), path, "", ""))
except Exception:
return u.strip()
def enhanced_web_search(query: str, k: int, timelimit: str, region: str, safesearch: str, timeout: float = 12.0) -> List[Dict[str, str]]:
"""Enhanced web search combining multiple sources and real-time analysis"""
all_results = []
search_stats = {'ddg': 0, 'news': 0, 'realtime': 0, 'enhanced': 0}
# 1. Traditional DuckDuckGo search
ddg_results = ddg_search(query, k, timelimit, region, safesearch, timeout)
all_results.extend(ddg_results)
search_stats['ddg'] = len(ddg_results)
# 2. News search using web agent
try:
news_results = web_agent.search_news(query, limit=min(5, k//2))
all_results.extend(news_results)
search_stats['news'] = len(news_results)
except Exception:
pass
# 3. Real-time information
try:
realtime_info = web_agent.get_realtime_info(query)
if realtime_info.get('summary'):
all_results.append({
'title': f"Güncel Bilgiler: {query}",
'url': 'realtime://current',
'snippet': realtime_info['summary'],
'date': realtime_info['current_time'],
'engine': 'realtime'
})
search_stats['realtime'] = 1
except Exception:
pass
# 4. Enhanced content extraction for top results
enhanced_results = []
for i, result in enumerate(all_results[:3]): # Only enhance top 3 to avoid timeout
try:
if result['url'].startswith('http'):
content = web_agent.extract_content(result['url'])
if content['content']:
enhanced_result = result.copy()
enhanced_result['snippet'] = content['content'][:800] # More detailed content
enhanced_result['engine'] = f"{result.get('engine', 'unknown')}-enhanced"
enhanced_results.append(enhanced_result)
search_stats['enhanced'] += 1
else:
enhanced_results.append(result)
else:
enhanced_results.append(result)
except Exception:
enhanced_results.append(result)
# Add remaining results without enhancement
enhanced_results.extend(all_results[3:])
log_event("enhanced_search_done", query=query, stats=search_stats, total_results=len(enhanced_results))
return enhanced_results
def needs_web_search(user_msg: str, conversation_history: Optional[List[Dict[str, str]]] = None) -> Dict[str, Any]:
"""
Kullanıcının mesajının cevaplanması için web araması gerekip gerekmediğini akıllıca belirle
"""
analysis = {
'needs_search': False,
'confidence': 0.0,
'reasons': [],
'search_type': 'none', # 'current', 'factual', 'news', 'specific'
'suggested_query': ''
}
msg_lower = user_msg.lower()
# 1. Güncel bilgi gerektiren ifadeler
current_info_keywords = [
'bugün', 'dün', 'şimdi', 'şu anda', 'son dakika', 'güncel', 'yeni', 'fresh',
'bu hafta', 'geçen hafta', 'bu ay', 'son', 'en son', 'latest', 'recent',
'ne oldu', 'ne oluyor', 'son durum', 'gelişmeler', 'haberler', 'news',
'2024', '2025', 'bu yıl', 'geçen yıl'
]
# 2. Haber ve gündem gerektiren konular
news_keywords = [
'haber', 'haberler', 'news', 'gündem', 'siyaset', 'ekonomi', 'spor',
'teknoloji', 'bilim', 'sağlık', 'eğitim', 'seçim', 'borsa', 'dolar',
'euro', 'kripto', 'bitcoin', 'weather', 'hava durumu', 'deprem',
'terör', 'savaş', 'covid', 'pandemi', 'aşı'
]
# 3. Spesifik bilgi gerektiren sorular
factual_keywords = [
'kim', 'ne', 'nerede', 'ne zaman', 'nasıl', 'neden', 'kaç', 'hangi',
'who', 'what', 'where', 'when', 'how', 'why', 'which', 'how many',
'kaça', 'fiyat', 'price', 'cost', 'maliyet', 'ücret'
]
# 4. Gerçek zamanlı veri gerektiren konular
realtime_keywords = [
'saat kaç', 'time', 'tarih', 'date', 'hava durumu', 'weather',
'trafik', 'traffic', 'uçuş', 'flight', 'sefer', 'schedule',
'açık mı', 'kapalı mı', 'çalışıyor mu', 'working'
]
# 5. Web araması GEREKMEyen durumlar
no_search_keywords = [
'merhaba', 'selam', 'hello', 'nasılsın', 'naber', 'teşekkür',
'sağol', 'thanks', 'thank you', 'anladım', 'tamam', 'ok', 'okay',
'günaydın', 'iyi geceler', 'good morning', 'good night',
'kodla', 'kod yaz', 'program', 'script', 'function', 'algoritma',
'hesapla', 'calculate', 'çevir', 'translate', 'açıkla', 'explain',
'özetle', 'summarize', 'yaz', 'write', 'oluştur', 'create'
]
# 6. Kişisel/genel konuşma ifadeleri
personal_keywords = [
'ne düşünüyorsun', 'fikrin', 'görüşün', 'opinion', 'think',
'nasıl', 'how do you', 'what do you think', 'sen', 'you',
'bence', 'sanırım', 'think', 'believe', 'feel'
]
confidence = 0.0
reasons = []
# Önce web araması GEREKSIZ durumları kontrol et
if any(keyword in msg_lower for keyword in no_search_keywords):
confidence -= 0.4
reasons.append("Genel konuşma/kod yazma isteği")
if any(keyword in msg_lower for keyword in personal_keywords):
confidence -= 0.3
reasons.append("Kişisel görüş sorusu")
# Güncel bilgi kontrolü
if any(keyword in msg_lower for keyword in current_info_keywords):
confidence += 0.6
reasons.append("Güncel bilgi gereksinimi")
analysis['search_type'] = 'current'
# Haber/gündem kontrolü
if any(keyword in msg_lower for keyword in news_keywords):
confidence += 0.5
reasons.append("Haber/gündem bilgisi")
analysis['search_type'] = 'news'
# Faktüel bilgi kontrolü
if any(keyword in msg_lower for keyword in factual_keywords):
confidence += 0.4
reasons.append("Spesifik bilgi sorusu")
if analysis['search_type'] == 'none':
analysis['search_type'] = 'factual'
# Gerçek zamanlı veri kontrolü
if any(keyword in msg_lower for keyword in realtime_keywords):
confidence += 0.7
reasons.append("Gerçek zamanlı veri gereksinimi")
analysis['search_type'] = 'current'
# Soru formatı kontrolü
question_patterns = [
r'\?', r'\bne\b.*\boluyor\b', r'\bkim\b.*\b(kim|ne|nerede)\b',
r'\bhangi\b', r'\bkaç\b', r'\bne zaman\b', r'\bnasıl\b'
]
if any(re.search(pattern, msg_lower) for pattern in question_patterns):
confidence += 0.3
reasons.append("Soru formatı")
# Sayı ve tarih varlığı (potansiyel güncel bilgi)
if re.search(r'\b(202[0-9]|19[0-9][0-9])\b', user_msg):
confidence += 0.2
reasons.append("Tarih/yıl referansı")
# Özel isim varlığı (kişi, yer, kurum adları)
proper_nouns = re.findall(r'\b[A-ZÜĞÜŞÖÇI][a-züğüşöçı]+\b', user_msg)
if len(proper_nouns) >= 2:
confidence += 0.3
reasons.append("Özel isim referansları")
# Sohbet geçmişi analizi
if conversation_history:
recent_messages = conversation_history[-3:] # Son 3 mesaj
context_text = " ".join([msg.get('content', '') for msg in recent_messages])
# Sohbet bağlamında güncel konu devam ediyor mu?
if any(keyword in context_text.lower() for keyword in current_info_keywords + news_keywords):
confidence += 0.2
reasons.append("Sohbet bağlamında güncel konu")
# Final karar
analysis['confidence'] = max(0.0, min(1.0, confidence))
analysis['needs_search'] = analysis['confidence'] > 0.4
analysis['reasons'] = reasons
# Arama sorgusu önerisi
if analysis['needs_search']:
analysis['suggested_query'] = intelligent_search_query_builder(user_msg, conversation_history)
return analysis
def intelligent_search_query_builder(user_msg: str, conversation_history: Optional[List[Dict[str, str]]] = None) -> str:
"""Build more intelligent search queries based on conversation context"""
# Extract key entities and topics
query_terms = []
# Current date awareness
current_year = time.strftime('%Y')
current_date = time.strftime('%Y-%m-%d')
# Check for temporal keywords
temporal_keywords = ['bugün', 'dün', 'bu hafta', 'geçen hafta', 'bu ay', 'geçen ay',
'şu anda', 'güncel', 'son', 'yeni', 'recent', 'latest', '2024', '2025']
has_temporal = any(keyword in user_msg.lower() for keyword in temporal_keywords)
# Extract main topic
topic_words = re.findall(r'\b[A-ZÜĞÜŞÖÇI][a-züğüşöçı]+\b', user_msg) # Turkish proper nouns
topic_words.extend(re.findall(r'\b[A-Z][a-z]+\b', user_msg)) # English proper nouns
# Remove common words
stop_words = {'Bir', 'Bu', 'Şu', 'Ne', 'Nasıl', 'Neden', 'Kim', 'Nerede', 'When', 'What', 'How', 'Why', 'Who', 'Where'}
topic_words = [word for word in topic_words if word not in stop_words]
# Build query
if topic_words:
main_query = ' '.join(topic_words[:3]) # Top 3 entities
else:
# Fallback to important words
words = re.findall(r'\b\w{4,}\b', user_msg.lower())
main_query = ' '.join(words[:5])
# Add temporal context if needed
if has_temporal:
main_query += f" {current_year}"
# Add Turkish context for better local results
if any(ord(c) > 127 for c in user_msg): # Contains Turkish characters
main_query += " Türkiye"
return main_query[:200]
def ddg_search(query: str, k: int, timelimit: str, region: str, safesearch: str, timeout: float = 12.0) -> List[Dict[str, str]]:
rows: List[Dict[str, str]] = []
if not query or DDGS is None:
return rows
k = max(10, min(int(k), 30))
t0 = time.perf_counter()
try:
with DDGS(timeout=timeout) as ddgs:
for r in ddgs.text(query, region=region, safesearch=safesearch, timelimit=timelimit or None, max_results=k):
rows.append({
"title": r.get("title") or "(untitled)",
"url": r.get("href") or "",
"snippet": r.get("body") or "",
"date": r.get("published") or r.get("date") or "",
"engine": "ddg",
})
try:
for r in ddgs.news(query, region=region, safesearch=safesearch, timelimit=timelimit or None, max_results=min(k, 10)):
rows.append({
"title": r.get("title") or "(untitled)",
"url": r.get("url") or r.get("href") or "",
"snippet": r.get("body") or r.get("excerpt") or "",
"date": r.get("date") or r.get("published") or "",
"engine": "ddg-news",
})
except Exception:
pass
except Exception as e:
log_event("ddg_error", error=type(e).__name__, detail=str(e))
seen, deduped = {}, []
for r in rows:
key = normalize_url(r.get("url", ""))
if not key or key in seen:
continue
seen[key] = True
deduped.append(r)
if len(deduped) >= k:
break
log_event("ddg_done", query=query, results=len(deduped), elapsed_ms=int((time.perf_counter() - t0) * 1000))
return deduped
def tavily_search(query: str, api_key: Optional[str], k: int = 8) -> List[Dict[str, str]]:
rows: List[Dict[str, str]] = []
if not query or not api_key:
return rows
url = "https://api.tavily.com/search"
payload = {
"api_key": api_key,
"query": query,
"search_depth": "basic",
"max_results": int(max(3, min(k, 12))),
"include_domains": [],
"exclude_domains": [],
"include_answer": False,
"include_images": False,
"include_raw_content": False,
}
t0 = time.perf_counter()
try:
r = requests.post(url, json=payload, timeout=12)
r.raise_for_status()
data = r.json()
for item in data.get("results", []):
rows.append({
"title": item.get("title") or "(untitled)",
"url": item.get("url") or "",
"snippet": item.get("content") or item.get("snippet") or "",
"date": item.get("published_date") or "",
"engine": "tavily",
})
except Exception as e:
log_event("tavily_error", error=type(e).__name__, detail=str(e))
log_event("tavily_done", query=query, results=len(rows), elapsed_ms=int((time.perf_counter() - t0) * 1000))
return rows
def _score_result(r: Dict[str, str], terms: List[str]) -> float:
title = (r.get("title") or "").lower()
snippet = (r.get("snippet") or "").lower()
text = f"{title} {snippet}"
overlap = sum(1 for t in terms if t and t in text)
date = r.get("date") or ""
recency = 1.0 if re.search(r"\b202[3-6]\b", date) else 0.0
engine_bonus = 0.3 if r.get("engine") == "tavily" else 0.0
return overlap + recency + engine_bonus
def rank_results(rows: List[Dict[str, str]], query: str, k: int) -> List[Dict[str, str]]:
terms = [w.lower() for w in re.findall(r"\w{3,}", query)]
rows.sort(key=lambda r: _score_result(r, terms), reverse=True)
return rows[:k]
def format_evidence(rows: List[Dict[str, str]]) -> str:
if not rows:
return ""
lines = []
for i, r in enumerate(rows, 1):
date = f" • {r['date']}" if r.get("date") else ""
src = "(tavily)" if r.get("engine") == "tavily" else "(duckduckgo)"
lines.append(f"[{i}] {r.get('title')}{date}\n{r.get('url')}\n{r.get('snippet','')}\n{src}")
return "Merged & ranked sources:\n" + "\n\n".join(lines)
def _apply_chat_or_fallback(system_message: str, user_msg: str):
assert _tokenizer is not None and _model is not None
try:
messages = [{"role": "system", "content": system_message or ""}, {"role": "user", "content": user_msg}]
return _tokenizer.apply_chat_template(messages, return_tensors="pt", add_generation_prompt=True).to(_model.device)
except Exception:
prompt = f"System: {system_message.strip()}\nUser: {user_msg.strip()}\nAssistant:"
enc = _tokenizer(prompt, return_tensors="pt", return_token_type_ids=False)
return {k: v.to(_model.device) for k, v in enc.items()}
def _ensure_local_model(selected_model: str):
global _tokenizer, _model, _backend, _mlx_model, _mlx_tokenizer
_backend = "transformers"
_tokenizer = None
_model = None
_mlx_model = None
_mlx_tokenizer = None
t0 = time.perf_counter()
if selected_model == "ibraschwan/Kumru-2B-mlx-4Bit":
if mlx_load is None or mlx_generate is None:
raise RuntimeError(f"MLX is not available on {platform.system()}. Please use a different model.")
_backend = "mlx"
_mlx_model, _mlx_tokenizer = mlx_load(MLX_REPO)
log_event("model_loaded", model_id=selected_model, backend=_backend, elapsed_ms=int((time.perf_counter() - t0) * 1000))
return
if selected_model == "ceofast/kumru-2b-lora":
if AutoTokenizer is None or AutoModelForCausalLM is None or PeftModel is None:
raise RuntimeError("transformers/peft is not available.")
try:
# CPU-optimized settings for Hugging Face Spaces
import tempfile
offload_dir = tempfile.mkdtemp(prefix="kumru_offload_")
# Force CPU deployment with minimal memory usage
base_kwargs = {
"torch_dtype": "float32", # Use string for CPU compatibility
"device_map": "cpu", # Force CPU for HF Spaces
"offload_folder": offload_dir,
"low_cpu_mem_usage": True,
"use_cache": False, # Disable KV cache to save memory
}
# Load base model with minimal memory footprint
base = AutoModelForCausalLM.from_pretrained(BASE_REPO, **base_kwargs)
# Load LoRA adapter
_model = PeftModel.from_pretrained(base, LORA_REPO)
_tokenizer = AutoTokenizer.from_pretrained(BASE_REPO)
except Exception as e:
# Final fallback: Use base model instead of LoRA if memory issues persist
log_event("lora_fallback", model_id=selected_model, error=str(e), fallback_to="base_model")
_tokenizer = AutoTokenizer.from_pretrained("vngrs-ai/Kumru-2B")
_model = AutoModelForCausalLM.from_pretrained(
"vngrs-ai/Kumru-2B",
torch_dtype="float32",
device_map="cpu",
low_cpu_mem_usage=True,
use_cache=False
)
try:
_model.eval()
except Exception:
pass
log_event("model_loaded", model_id=selected_model, backend="peft_cpu", elapsed_ms=int((time.perf_counter() - t0) * 1000))
return
if AutoTokenizer is None or AutoModelForCausalLM is None:
raise RuntimeError("transformers is not available.")
_tokenizer = AutoTokenizer.from_pretrained(selected_model)
# Load model with memory-efficient settings
model_kwargs = {
"torch_dtype": torch.float16 if torch is not None else "auto",
"device_map": "auto",
"low_cpu_mem_usage": True
}
# Add offload directory if needed for large models
if torch is not None and torch.cuda.is_available():
import tempfile
offload_dir = tempfile.mkdtemp(prefix="kumru_model_offload_")
model_kwargs["offload_folder"] = offload_dir
model_kwargs["max_memory"] = {0: "6GB", "cpu": "8GB"}
_model = AutoModelForCausalLM.from_pretrained(selected_model, **model_kwargs)
try:
_model.eval()
except Exception:
pass
log_event("model_loaded", model_id=selected_model, backend=_backend, elapsed_ms=int((time.perf_counter() - t0) * 1000))
def local_stream_generate(selected_model, system_message, user_msg, max_tokens, temperature, top_p, web_ctx, file_ctx):
_ensure_local_model(selected_model)
if _backend == "mlx":
merged = user_msg
if web_ctx:
merged += f"\n\n[Evidence]\n{web_ctx}"
if file_ctx:
merged += f"\n\n[Files]\n{file_ctx}"
sysmsg = (system_message or "") + "\nWhen Evidence contains sources labeled like [1], [2], cite them in your answer using the same [n] markers."
try:
# Check for MLX components availability
if (_mlx_tokenizer is not None and
hasattr(_mlx_tokenizer, "apply_chat_template") and
getattr(_mlx_tokenizer, "chat_template", None) and
_mlx_model is not None and
mlx_generate is not None):
messages = [{"role": "system", "content": sysmsg}, {"role": "user", "content": merged}]
prompt = _mlx_tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
text = mlx_generate(_mlx_model, _mlx_tokenizer, prompt=prompt, max_tokens=int(max_tokens), temperature=float(temperature), top_p=float(top_p), verbose=False)
else:
prompt = f"System: {sysmsg}\nUser: {merged}\nAssistant:"
if _mlx_model is not None and _mlx_tokenizer is not None and mlx_generate is not None:
text = mlx_generate(_mlx_model, _mlx_tokenizer, prompt=prompt, max_tokens=int(max_tokens), temperature=float(temperature), top_p=float(top_p), verbose=False)
else:
text = "MLX Error: Model components not available"
except TypeError:
if _mlx_model is not None and _mlx_tokenizer is not None and mlx_generate is not None:
text = mlx_generate(_mlx_model, _mlx_tokenizer, prompt=prompt, max_tokens=int(max_tokens))
else:
text = "MLX Error: Model components not available"
except Exception as e:
text = f"MLX Error: {str(e)}"
emitted = ""
for i in range(0, len(text), 256):
emitted += text[i:i+256]
yield emitted
return
# Transformers path with null checking
if _model is None or _tokenizer is None:
yield "Error: Model not loaded"
return
system_with_hint = (system_message or "") + "\nWhen Evidence contains sources labeled like [1], [2], cite them in your answer using the same [n] markers."
base_inputs = _apply_chat_or_fallback(system_with_hint, user_msg)
try:
if isinstance(base_inputs, dict):
base_prompt = f"System: {system_with_hint.strip()}\nUser: {user_msg.strip()}\nAssistant:"
prompt = attach_context(base_prompt, web_ctx, file_ctx)
enc = _tokenizer(prompt, return_tensors="pt", return_token_type_ids=False)
# Use CPU for Hugging Face Spaces compatibility
inputs = {k: v.to("cpu") for k, v in enc.items()}
else:
merged_user = user_msg
if web_ctx:
merged_user += f"\n\n[Evidence]\n{web_ctx}"
if file_ctx:
merged_user += f"\n\n[Files]\n{file_ctx}"
inputs = _apply_chat_or_fallback(system_message, merged_user)
except Exception:
inputs = base_inputs
streamer = TextIteratorStreamer(_tokenizer, skip_prompt=True, skip_special_tokens=True)
gen_kwargs = dict(
inputs=inputs,
max_new_tokens=int(max_tokens),
do_sample=True,
temperature=float(temperature),
top_p=float(top_p),
repetition_penalty=1.1,
streamer=streamer,
)
t0 = time.perf_counter()
thread = threading.Thread(target=_model.generate, kwargs=gen_kwargs)
thread.start()
emitted = ""
for new_text in streamer:
emitted += new_text
yield emitted
log_event(
"local_generate_done",
elapsed_ms=int((time.perf_counter() - t0) * 1000),
max_new_tokens=int(max_tokens),
temperature=float(temperature),
top_p=float(top_p),
)
def pick_inference_model(selected_model: str) -> str:
# Models that don't support remote inference
unsupported = {MLX_REPO, LORA_REPO}
if selected_model in unsupported:
return "vngrs-ai/Kumru-2B"
# Also fallback MLX model selection to base model on non-macOS
if selected_model == "ibraschwan/Kumru-2B-mlx-4Bit" and platform.system() != "Darwin":
return "vngrs-ai/Kumru-2B"
return selected_model
def respond(
message,
history: List[Dict[str, str]],
selected_model,
system_message,
max_tokens,
temperature,
top_p,
files,
include_files,
do_web_search,
web_k,
timelimit,
region,
safesearch,
tavily_api_key,
request: gr.Request,
):
req_id = str(uuid.uuid4())
user_msg = (message or "").strip()
if not user_msg:
yield "Please enter a prompt."
return
client_ip = None
username = None
session_hash = None
try:
client = getattr(request, "client", None)
client_ip = client.host if client else None
username = getattr(request, "username", None)
session_hash = getattr(request, "session_hash", None)
except Exception:
pass
log_event(
"request_start",
req_id=req_id,
user={"username": username, "ip": _mask_ip(client_ip), "session_hash": session_hash},
message_preview=user_msg[:200],
params={
"model": selected_model,
"max_tokens": int(max_tokens),
"temperature": float(temperature),
"top_p": float(top_p),
"do_web_search": bool(do_web_search),
"web_k": int(web_k),
"timelimit": timelimit,
"region": region,
"safesearch": safesearch,
"tavily_provided": bool(tavily_api_key),
},
model_id=selected_model,
)
web_ctx = ""
web_stats = {"ddg": 0, "tavily": 0, "ranked": 0, "query": None, "auto_search": False, "search_analysis": {}}
file_ctx = ""
used_mode = None
t_request_start = time.perf_counter()
response_text = ""
# Akıllı web araması karar mekanizması
search_needed = False
search_analysis = {}
if do_web_search:
# Kullanıcı web aramasını manuel olarak etkinleştirdi
search_needed = True
search_analysis = {"manual": True, "confidence": 1.0}
log_event("manual_search_enabled", req_id=req_id)
else:
# Otomatik olarak web araması gerekip gerekmediğini belirle
search_analysis = needs_web_search(user_msg, history)
search_needed = search_analysis['needs_search']
if search_needed:
web_stats["auto_search"] = True
log_event("auto_search_triggered", req_id=req_id,
confidence=search_analysis['confidence'],
reasons=search_analysis['reasons'],
search_type=search_analysis['search_type'])
else:
log_event("no_search_needed", req_id=req_id,
confidence=search_analysis['confidence'],
reasons=search_analysis['reasons'])
web_stats["search_analysis"] = search_analysis
if search_needed:
try:
# Önerilen sorgu varsa onu kullan, yoksa akıllı sorgu oluştur
if search_analysis.get('suggested_query'):
search_query_text = search_analysis['suggested_query']
else:
search_query_text = intelligent_search_query_builder(user_msg, history)
log_event("enhanced_query_built", req_id=req_id,
original=user_msg[:100],
enhanced=search_query_text,
auto_triggered=web_stats["auto_search"])
except Exception as e:
log_event("qrewrite_error", req_id=req_id, error=type(e).__name__, detail=str(e))
search_query_text = user_msg.strip()
k = max(10, min(int(web_k), 30))
# Use enhanced web search instead of basic search
try:
enhanced_results = enhanced_web_search(search_query_text, k=k, timelimit=timelimit, region=region, safesearch=safesearch)
web_ctx = format_evidence(enhanced_results)
web_stats.update({
"enhanced_search": len(enhanced_results),
"query": search_query_text,
"has_realtime": any(r.get('engine') == 'realtime' for r in enhanced_results),
"has_news": any(r.get('engine') == 'rss' for r in enhanced_results)
})
log_event("enhanced_web_search_done", req_id=req_id, **{k: v for k, v in web_stats.items() if k != 'search_analysis'})
except Exception as e:
# Fallback to basic search
log_event("enhanced_search_failed", req_id=req_id, error=str(e), fallback="basic_search")
ddg_rows = ddg_search(search_query_text, k=k, timelimit=timelimit, region=region, safesearch=safesearch)
tav_rows = tavily_search(search_query_text, api_key=(tavily_api_key or None), k=min(k, 12))
merged = (ddg_rows or []) + (tav_rows or [])
ranked = rank_results(merged, search_query_text, k)
web_ctx = format_evidence(ranked)
web_stats.update({"ddg": len(ddg_rows), "tavily": len(tav_rows), "ranked": len(ranked), "query": search_query_text})
else:
# Web araması yapılmadı
log_event("skipped_web_search", req_id=req_id, reason="not_needed")
if include_files and files:
file_ctx = gather_files(files)
try:
used_mode = "local"
for chunk in local_stream_generate(selected_model, system_message, user_msg, max_tokens, temperature, top_p, web_ctx, file_ctx):
response_text = chunk
yield chunk
log_event(
"response_complete",
req_id=req_id,
mode=used_mode,
elapsed_ms=int((time.perf_counter() - t_request_start) * 1000),
response_len=len(response_text),
response_preview=response_text[:2000],
web=web_stats,
files_count=len(files) if files else 0,
)
return
except Exception as e:
log_event("local_generate_error", req_id=req_id, error=type(e).__name__, detail=str(e))
if InferenceClient is None:
msg = "Local generation failed and huggingface_hub is not installed."
response_text = msg
yield msg
log_event(
"response_complete",
req_id=req_id,
mode="error",
elapsed_ms=int((time.perf_counter() - t_request_start) * 1000),
response_len=len(response_text),
response_preview=response_text[:2000],
web=web_stats,
files_count=len(files) if files else 0,
)
return
try:
used_mode = "inference"
remote_model = pick_inference_model(selected_model)
client = InferenceClient(model=remote_model)
base_prompt = build_prompt(system_message, history, message)
prompt = attach_context(base_prompt, web_ctx, file_ctx)
t0 = time.perf_counter()
for ev in client.text_generation(
prompt,
max_new_tokens=int(max_tokens),
stream=True,
temperature=float(temperature),
top_p=float(top_p),
return_full_text=False,
):
token_text = getattr(getattr(ev, "token", None), "text", None)
if token_text is None:
token_text = str(ev)
response_text += token_text
yield response_text
log_event(
"response_complete",
req_id=req_id,
mode=used_mode,
elapsed_ms=int((time.perf_counter() - t_request_start) * 1000),
latency_inference_ms=int((time.perf_counter() - t0) * 1000),
response_len=len(response_text),
response_preview=response_text[:2000],
web=web_stats,
files_count=len(files) if files else 0,
)
except Exception as e:
err = "Generation failed (both local and remote). Please check your environment."
response_text = err
yield err
log_event(
"response_complete",
req_id=req_id,
mode="error",
elapsed_ms=int((time.perf_counter() - t_request_start) * 1000),
error=type(e).__name__,
error_detail=str(e),
response_len=len(response_text),
response_preview=response_text[:2000],
web=web_stats,
files_count=len(files) if files else 0,
)
# Create model choices based on platform
def get_model_choices():
base_choices = [
"vngrs-ai/Kumru-2B",
"ceofast/kumru-2b-lora",
]
# Only add MLX model on macOS
if platform.system() == "Darwin" and mlx_load is not None:
base_choices.insert(1, "ibraschwan/Kumru-2B-mlx-4Bit")
return base_choices
model_dropdown = gr.Dropdown(
choices=get_model_choices(),
value="vngrs-ai/Kumru-2B",
label="Model",
)
chatbot = gr.ChatInterface(
respond,
type="messages",
title="Kumru 2B Chat",
description="""
<div style="text-align: center; max-width: 100%; padding: 20px;">
<h2 style="color: #2c3e50; margin-bottom: 20px;">🇹🇷 Kumru 2B - Akıllı Web Agent ile Güçlendirilmiş Türkçe Dil Modeli</h2>
<p style="color: #34495e; font-size: 16px; margin-bottom: 20px;">
<strong>🚀 Akıllı Özellikler:</strong><br>
🤖 <strong>Çoklu Model Desteği:</strong> Transformers, MLX ve LoRA backend'leri<br>
🧠 <strong>Akıllı Web Agent:</strong> Sorunuzu analiz eder, gerektiğinde otomatik web araması yapar<br>
🌐 <strong>Çoklu Arama:</strong> DuckDuckGo + Tavily + RSS + Gerçek zamanlı haber akışı<br>
📄 <strong>Dosya Desteği:</strong> PDF, DOCX, TXT, MD ve JSON dosyalarını analiz<br>
🎯 <strong>Akıllı Referanslar:</strong> Web araması sonuçlarını [1], [2] şeklinde referans gösterir<br>
⚡ <strong>Otomatik Karar:</strong> Güncel bilgi gerekip gerekmediğini akıllıca belirler
</p>
<p style="color: #7f8c8d; font-size: 14px; margin-bottom: 25px;">
<strong>Akıllı Kullanım:</strong> Sadece sorunuzu sorun! Sistem otomatik olarak güncel bilgi gerekip gerekmediğini analiz eder.
✅ Güncel haberler, tarihler, fiyatlar için otomatik web araması<br>
❌ Kod yazma, kişisel görüş, genel sohbet için gereksiz arama yapmaz
</p>
<div style="padding: 10px; border-radius: 5px; margin-top: 15px;">
<small style="color: #7f8c8d;">
💡 <strong>Örnekler:</strong><br>
🔍 <em>"Bugün ne oldu?"</em> → Otomatik web araması yapar<br>
💬 <em>"Nasılsın?"</em> → Web araması yapmaz<br>
🔍 <em>"Dolar kuru nedir?"</em> → Güncel veri için web araması yapar<br>
💬 <em>"Python kodu yaz"</em> → Web araması yapmaz
</small>
</div>
</div>
""",
textbox=gr.Textbox(placeholder="Type your message… (Shift+Enter = newline)"),
additional_inputs=[
model_dropdown,
gr.Textbox(
value=(
"Adın Kumru. Sen, VNGRS tarafından Türkçe dil yeteneklerini en üst düzeye çıkarmak için sıfırdan eğitilmiş, büyük bir dil modelisin. "
"Görevin, tüm sorulara sadece Türkçe olarak, doğru, akıcı ve doğal yanıtlar sağlamaktır. "
"Akıllı web arama sisteminle kullanıcının sorusunun güncel bilgi gerektirip gerektirmediğini otomatik olarak belirler ve gerektiğinde web araması yapar. "
"Web araması aktifse, güncel bilgileri Evidence bölümünden alacak ve [1], [2] şeklinde kaynak referansları vereceksin. "
"Gerçek zamanlı bilgilere erişimin var ve her zaman en güncel ve doğru bilgiyi sunmaya odaklanacaksın. "
f"Bugünün tarihi: {time.strftime('%Y-%m-%d %H:%M')}. "
"Sadece gerekli durumlarda web araması yap - kişisel görüş, kod yazma, genel sohbet için arama yapmaya gerek yok."
),
label="System message",
),
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"),
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p"),
gr.Files(file_count="multiple", label="Upload files (txt, md, pdf, docx)"),
gr.Checkbox(value=False, label="Include file contents"),
gr.Checkbox(value=False, label="Web aramasını zorla etkinleştir (Otomatik akıllı arama varsayılan olarak açık)"),
gr.Slider(minimum=10, maximum=30, value=12, step=1, label="Web results (10–30)"),
gr.Dropdown(
choices=[("24h", "d"), ("7 days", "w"), ("30 days", "m"), ("1 year", "y"), ("No limit", "")],
value="w",
label="Freshness (DuckDuckGo)"
),
gr.Dropdown(
choices=[("Turkey (tr-tr)", "tr-tr"), ("Global (wt-wt)", "wt-wt"), ("US (us-en)", "us-en")],
value="tr-tr",
label="Region (DuckDuckGo)"
),
gr.Dropdown(
choices=[("Moderate", "moderate"), ("Strict", "strict"), ("Off", "off")],
value="moderate",
label="Safe search (DuckDuckGo)"
),
gr.Textbox(value="", label="Tavily API Key (optional)", type="password"),
],
)
if __name__ == "__main__":
# CPU-only optimizations for Hugging Face Spaces
os.environ.setdefault("PYTORCH_CUDA_ALLOC_CONF", "max_split_size_mb:32")
os.environ.setdefault("TRANSFORMERS_CACHE", "/tmp/transformers_cache")
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") # Disable tokenizer parallelism
os.environ.setdefault("OMP_NUM_THREADS", "1") # Limit OpenMP threads
# Force CPU for transformers to ensure HF Spaces compatibility
try:
import torch
if hasattr(torch, 'set_default_tensor_type'):
torch.set_default_tensor_type('torch.FloatTensor')
except ImportError:
pass
# Get configuration from environment variables for cloud deployment
server_name = os.environ.get("GRADIO_SERVER_NAME", "0.0.0.0")
server_port = int(os.environ.get("GRADIO_SERVER_PORT", "7860"))
chatbot.launch(
server_name=server_name,
server_port=server_port,
share=False,
show_error=True,
favicon_path=None,
ssl_verify=False
)