lumen / app.py
mdAmin313's picture
Update app.py
4d01f43 verified
raw
history blame
9.6 kB
import os
import json
import asyncio
import logging
from datetime import datetime
from typing import List, Optional, Dict, Any
import google.generativeai as genai
import httpx
from fastapi import FastAPI, HTTPException, Header, Depends
from pydantic import BaseModel, Field
try:
from newspaper import Article
except Exception:
Article = None
try:
from googlesearch import search as google_search
except Exception:
google_search = None
# -------------------------
# Logging setup
# -------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger("app")
# -------------------------
# Config
# -------------------------
NEWSAPI_KEY = os.getenv("NEWSAPI_KEY")
GNEWS_KEY = os.getenv("GNEWS_KEY")
AI_PROVIDER = os.getenv("AI_PROVIDER", "none") # "gemini" or "openai"
AI_API_KEY = os.getenv("AI_API_KEY")
API_KEY = os.getenv("API_KEY", "changeme") # protect your API
# -------------------------
# Dependencies
# -------------------------
def verify_api_key(x_api_key: str = Header(...)):
if x_api_key != API_KEY:
raise HTTPException(status_code=401, detail="Invalid or missing API Key")
# -------------------------
# Helpers
# -------------------------
def parse_iso_date(value: str) -> Optional[str]:
if not value:
return None
try:
dt = datetime.fromisoformat(value.replace("Z", "+00:00"))
return dt.astimezone().isoformat()
except Exception:
return None
async def fetch_json(client: httpx.AsyncClient, url: str, params: Dict[str, Any]) -> Dict[str, Any]:
try:
r = await client.get(url, params=params, timeout=15)
r.raise_for_status()
return r.json()
except Exception as e:
logger.error(f"Error fetching {url}: {e}")
return {}
# -------------------------
# Models
# -------------------------
class VerifyIn(BaseModel):
text: str = Field(..., description="Claim text to verify")
lang: str = Field("en", description="Language (ISO 639-1)")
class ArticleItem(BaseModel):
title: Optional[str] = None
url: Optional[str] = None
source: Optional[str] = None
publishedAt: Optional[str] = None
content: Optional[str] = None
confidence: float = 0.0
class SocialHit(BaseModel):
title: Optional[str] = None
url: Optional[str] = None
source: Optional[str] = None
class Classification(BaseModel):
category: str
keywords: List[str] = []
class Verdict(BaseModel):
verdict: str
reason: str
confidence: float
class VerifyOut(BaseModel):
classification: Classification
summary: str
verdict: Verdict
news: List[ArticleItem] = []
social: Dict[str, List[SocialHit]] = {}
timeline: List[ArticleItem] = []
# -------------------------
# Core Logic
# -------------------------
def lightweight_keywords(text: str) -> List[str]:
import re
words = re.findall(r"[A-Za-z]{4,}", text.lower())
stopwords = set("this that with from into about your they it's dont cant wont very more less most the for and not but or yet so on in at by to of as is are be".split())
return [w for w in words if w not in stopwords][:12]
async def search_newsapi(query: str, lang: str) -> List[ArticleItem]:
if not NEWSAPI_KEY:
return []
url = "https://newsapi.org/v2/everything"
params = {"q": query, "language": lang, "pageSize": 10, "sortBy": "relevancy", "apiKey": NEWSAPI_KEY}
async with httpx.AsyncClient() as client:
data = await fetch_json(client, url, params)
return [
ArticleItem(
title=a.get("title"),
url=a.get("url"),
source=(a.get("source") or {}).get("name"),
publishedAt=parse_iso_date(a.get("publishedAt")),
)
for a in data.get("articles", [])
]
async def search_gnews(query: str, lang: str) -> List[ArticleItem]:
if not GNEWS_KEY:
return []
url = "https://gnews.io/api/v4/search"
params = {"q": query, "lang": lang, "token": GNEWS_KEY, "max": 10}
async with httpx.AsyncClient() as client:
data = await fetch_json(client, url, params)
return [
ArticleItem(
title=a.get("title"),
url=a.get("url"),
source=(a.get("source") or {}).get("name"),
publishedAt=parse_iso_date(a.get("publishedAt")),
)
for a in data.get("articles", [])
]
async def fetch_article_body(url: str) -> Optional[str]:
if not Article:
return None
try:
art = Article(url)
art.download()
art.parse()
return art.text
except Exception:
return None
async def score_article_content(text: Optional[str]) -> float:
if not text:
return 0.2
length = len(text)
if length > 3000:
return 1.0
if length > 800:
return 0.7
if length > 300:
return 0.5
return 0.3
async def gather_social(query: str, limit: int = 5) -> Dict[str, List[SocialHit]]:
results = {"twitter": [], "reddit": [], "facebook": [], "google_news": []}
if not google_search:
return results
sites = {
"twitter": "site:twitter.com",
"reddit": "site:reddit.com",
"facebook": "site:facebook.com",
"google_news": "site:news.google.com",
}
for key, prefix in sites.items():
try:
urls = google_search(f"{prefix} {query}", num=limit, stop=limit)
results[key] = [SocialHit(url=u, source=key) for u in urls]
except Exception as e:
logger.warning(f"Social search failed for {key}: {e}")
return results
if AI_PROVIDER == "gemini" and AI_API_KEY:
genai.configure(api_key=AI_API_KEY)
gemini_model = genai.GenerativeModel("gemini-2.5-flash")
else:
gemini_model = None
async def ai_evaluate(user_text: str, context_articles: List[ArticleItem]) -> Verdict:
"""
Use Gemini if available, else fallback to rule-based evaluation
"""
# --- if Gemini enabled ---
if gemini_model:
sources_text = "\n".join([f"- {a.title or ''} ({a.url})" for a in context_articles[:10]])
prompt = f"""
You are a fact-checking assistant.
Task: Analyze the following claim and evidence.
Decide if the claim is True, False, Misleading, or Unverifiable.
Explain reasoning clearly.
Claim: {user_text}
Evidence from news:
{sources_text}
Respond with JSON:
{{
"verdict": "True/False/Misleading/Unverifiable",
"reason": "explanation here",
"confidence": 0.0 to 1.0
}}
"""
try:
response = gemini_model.generate_content(prompt)
import json
data = json.loads(response.text)
return Verdict(
verdict=data.get("verdict", "Unclear"),
reason=data.get("reason", "No reasoning provided"),
confidence=float(data.get("confidence", 0.5))
)
except Exception as e:
logger.error(f"Gemini evaluation failed: {e}")
# --- fallback (rule-based) ---
sources = len([a for a in context_articles if a.url])
if sources >= 3:
verdict, conf = "Likely true", 0.8
elif sources == 0:
verdict, conf = "Insufficient evidence", 0.4
else:
verdict, conf = "Unclear", 0.5
return Verdict(
verdict=verdict,
reason=f"Fallback evaluation with {sources} sources.",
confidence=conf,
)
def make_timeline(items: List[ArticleItem]) -> List[ArticleItem]:
def keyfn(a: ArticleItem):
if a.publishedAt:
try:
return datetime.fromisoformat(a.publishedAt)
except Exception:
return datetime.min
return datetime.min
return sorted(items, key=keyfn)
# -------------------------
# FastAPI App
# -------------------------
app = FastAPI(title="OSINT Verification API", version="1.0.0")
@app.get("/health")
async def health():
return {"status": "ok"}
@app.post("/verify", response_model=VerifyOut, dependencies=[Depends(verify_api_key)])
async def verify(payload: VerifyIn):
text = payload.text.strip()
if not text:
raise HTTPException(status_code=400, detail="Empty text")
# Step 1: Classification
kws = lightweight_keywords(text)
classification = Classification(category="claim", keywords=kws)
# Step 2: Gather evidence
query = " ".join(kws) if kws else text[:200]
news1, news2, social = await asyncio.gather(
search_newsapi(query, payload.lang), search_gnews(query, payload.lang), gather_social(query)
)
articles = news1 + news2
# Step 3: Enrich articles
async def enrich(item: ArticleItem) -> ArticleItem:
body = await fetch_article_body(item.url) if item.url else None
item.content = body
item.confidence = await score_article_content(body)
return item
enriched = await asyncio.gather(*[enrich(a) for a in articles])
# Step 4: AI evaluation
summary = (text[:200] + "...") if len(text) > 200 else text
verdict = await ai_evaluate(text, enriched)
# Step 5: Timeline
timeline = make_timeline(enriched)
return VerifyOut(
classification=classification,
summary=summary,
verdict=verdict,
news=enriched,
social=social,
timeline=timeline,
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", "7860")))