ragV98's picture
fix 1
315bd36
raw
history blame
7.42 kB
import os
import json
import redis
import numpy as np
from typing import List, Dict
from openai import OpenAI
from components.indexers.news_indexer import get_upstash_vector_store
from llama_index.core import StorageContext
from llama_index.core.vector_stores.types import VectorStoreQuery, MetadataFilter, MetadataFilters, FilterOperator
# πŸ” Environment variables
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
REDIS_KEY = os.environ.get("UPSTASH_REDIS_TOKEN")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
# βœ… Redis client
try:
redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
except Exception as e:
print("❌ [Redis Init Error]", e)
raise
# πŸ“° Topic list
TOPICS = ["India news", "World news", "Tech news", "Finance news", "Sports news"]
# This list correctly generates 'india', 'world', etc.
TOPIC_KEYS = [t.lower().replace(" news", "") for t in TOPICS]
# 🧠 Summarization prompt
BASE_PROMPT = (
"You are Nuse’s editorial summarizer. Read the excerpts below and extract the most important stories. "
"Return up to 3 punchy headlines, each under 20 words, written like a premium editorial bulletin."
)
# πŸ“₯ Load documents grouped by topic from Upstash
def load_all_documents_grouped_by_topic() -> Dict[str, List[str]]:
topic_docs = {key: [] for key in TOPIC_KEYS}
try:
vector_store = get_upstash_vector_store()
print("πŸ’‘ Successfully retrieved Upstash vector store.")
# Debugging prints (keep them for now, they are useful)
print(f"DEBUG: TOPICS = {TOPICS}")
print(f"DEBUG: TOPIC_KEYS = {TOPIC_KEYS}")
print(f"DEBUG: Length of TOPICS = {len(TOPICS)}")
print(f"DEBUG: Length of TOPIC_KEYS = {len(TOPIC_KEYS)}")
for full_topic_name, topic_key_for_filter in zip(TOPICS, TOPIC_KEYS):
try:
# *** THE CRITICAL CHANGE IS HERE ***
# Use 'topic_key_for_filter' (e.g., "india") which matches your stored metadata
# instead of 'full_topic_name' (e.g., "India news").
filters = MetadataFilters(
filters=[
MetadataFilter(key="topic", value=topic_key_for_filter, operator=FilterOperator.EQ)
]
)
dummy_vector = np.random.rand(384).tolist() # Assuming MiniLM embeddings
query = VectorStoreQuery(
query_embedding=dummy_vector,
similarity_top_k=50, # Retrieve enough documents for summarization
filters=filters # Apply the metadata filter
)
print(f"πŸ”Ž Querying Upstash for topic: '{full_topic_name}' using filter value '{topic_key_for_filter}'")
result = vector_store.query(query)
print(f"➑️ Found {len(result.nodes)} nodes for topic: '{full_topic_name}'.")
for node in result.nodes:
content = node.get_content().strip()
if content:
topic_docs[topic_key_for_filter].append(content)
# Optional: Print metadata to verify filtering
# print(f" Node metadata: {node.metadata}")
except Exception as e:
print(f"❌ [Topic Metadata Filter error for '{full_topic_name}']: {e}")
except Exception as e:
print("❌ [load_all_documents_grouped_by_topic Error]", e)
return topic_docs
# πŸ§ͺ Summarize one topic at a time using OpenAI GPT-4
def summarize_topic(topic_key: str, docs: List[str]) -> List[Dict]:
if not docs:
print(f"⚠️ No docs found for topic: {topic_key}, skipping summarization.")
return []
try:
client = OpenAI(api_key=OPENAI_API_KEY)
content = "\n\n---\n\n".join(docs)[:12000]
print(f"🧠 Summarizing topic via OpenAI: {topic_key} ({len(docs)} documents)")
completion = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": BASE_PROMPT},
{"role": "user", "content": content},
],
max_tokens=512,
temperature=0.7,
)
text = completion.choices[0].message.content.strip()
summaries = []
for line in text.splitlines():
line = line.strip("-–‒ ")
if line:
summaries.append({
"summary": line,
"image_url": "https://source.unsplash.com/800x600/?news",
"article_link": f"https://google.com/search?q={topic_key}+news"
})
return summaries
except Exception as e:
print(f"❌ [OpenAI Summarization Error for '{topic_key}']: {e}")
return []
# πŸš€ Main callable
def generate_and_cache_daily_feed():
try:
print("πŸ†• Running OpenAI-powered daily feed generator....")
topic_docs = load_all_documents_grouped_by_topic()
feed_map = {}
for topic_key in TOPIC_KEYS:
try:
summaries = summarize_topic(topic_key, topic_docs.get(topic_key, []))
feed_map[topic_key] = summaries
except Exception as e:
print(f"❌ [Topic Loop Error for '{topic_key}']: {e}")
feed_map[topic_key] = []
# When creating final_feed, use TOPICS for the display name but TOPIC_KEYS for mapping
final_feed = [{"topic": display_name, "feed": feed_map[actual_key]}
for display_name, actual_key in zip(TOPICS, TOPIC_KEYS)]
try:
cache_key_name = "daily_news_feed_cache"
redis_client.set(cache_key_name, json.dumps(final_feed, ensure_ascii=False))
redis_client.expire(cache_key_name, 86400)
print(f"βœ… Cached daily feed under key '{cache_key_name}' with 24-hour expiry.")
except Exception as e:
print("❌ [Redis Cache Error]", e)
return final_feed
except Exception as e:
print("❌ [generate_and_cache_daily_feed Overall Error]", e)
return []
# πŸ“¦ Get cached data
def get_cached_daily_feed():
try:
cache_key_name = "daily_news_feed_cache"
cached = redis_client.get(cache_key_name)
if cached:
print(f"βœ… Retrieved cached daily feed from '{cache_key_name}'.")
return json.loads(cached)
else:
print(f"ℹ️ No cached data found under key '{cache_key_name}'.")
return []
except Exception as e:
print("❌ [get_cached_daily_feed Error]", e)
return []
# Example of how to run it (for testing purposes, if this were the main script)
if __name__ == "__main__":
# Ensure your environment variables are set before running
# os.environ["UPSTASH_REDIS_URL"] = "your_upstash_redis_url"
# os.environ["UPSTASH_REDIS_TOKEN"] = "your_upstash_redis_token"
# os.environ["OPENAI_API_KEY"] = "your_openai_api_key"
generated_feed = generate_and_cache_daily_feed()
print("\n--- Generated and Cached Feed ---")
# print(json.dumps(generated_feed, indent=2, ensure_ascii=False))
cached_feed = get_cached_daily_feed()
print("\n--- Retrieved from Cache ---")
# print(json.dumps(cached_feed, indent=2, ensure_ascii=False))