ragV98's picture
retrival to retriver
a092d54
raw
history blame
3.09 kB
import os
import sys
import json
import requests
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import redis
from typing import List, Dict
from llama_index.core import VectorStoreIndex
from llama_index.core.query_engine import RetrieverQueryEngine
from components.indexers.news_indexer import load_news_index
# Load environment variables
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
REDIS_KEY = os.environ.get("UPSTASH_REDIS_TOKEN")
MISTRAL_URL = os.environ.get("MISTRAL_URL") # Inference endpoint URL
HF_TOKEN = os.environ.get("HF_TOKEN") # Hugging Face endpoint token
# Connect to Redis
redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
# Topics to query
TOPICS = ["India news", "World news", "Tech news", "Finance news", "Sports news"]
# Prompt to summarize topic
def build_prompt(content: str, topic: str) -> str:
return (
f"You are a news summarizer. Summarize the following content in 25-30 words. "
f"Make it engaging and informative. Include appropriate emojis. Topic: {topic}\n\n{content}"
)
# Call Mistral via inference endpoint
def call_mistral(prompt: str) -> str:
headers = {
"Authorization": f"Bearer {HF_TOKEN}",
"Content-Type": "application/json"
}
payload = {
"inputs": [
{"role": "user", "content": prompt}
]
}
try:
response = requests.post(MISTRAL_URL, headers=headers, json=payload, timeout=20)
response.raise_for_status()
return response.json()["outputs"][0]["content"].strip()
except Exception as e:
print(f"⚠️ Mistral error: {e}")
return None
# Generate summary for topic using Mistral
def summarize_topic(docs: List[str], topic: str) -> List[Dict]:
feed = []
for doc in docs[:5]:
prompt = build_prompt(doc, topic)
summary = call_mistral(prompt)
if summary:
feed.append({
"summary": summary,
"image_url": "https://source.unsplash.com/800x600/?news",
"article_link": "https://google.com/search?q=" + topic.replace(" ", "+")
})
return feed
# Main generation pipeline
def generate_and_cache_daily_feed():
index: VectorStoreIndex = load_news_index()
query_engine = RetrieverQueryEngine.from_args(index)
final_feed = []
for topic in TOPICS:
print(f"\n🔍 Generating for: {topic}")
response = query_engine.query(topic)
docs = [str(node.get_content()) for node in response.source_nodes]
topic_feed = summarize_topic(docs, topic)
final_feed.append({
"topic": topic.lower().replace(" news", ""),
"feed": topic_feed
})
# Cache to Redis
redis_client.set(REDIS_KEY, json.dumps(final_feed, ensure_ascii=False))
print(f"✅ Cached daily feed under key '{REDIS_KEY}'")
return final_feed
# Redis fetch for API
def get_cached_daily_feed():
cached = redis_client.get(REDIS_KEY)
return json.loads(cached) if cached else []