ragV98's picture
logging
f8625a7
raw
history blame
9.64 kB
import os
import json
import redis
import numpy as np
from typing import List, Dict
from openai import OpenAI
from components.indexers.news_indexer import get_upstash_vector_store
from llama_index.core import StorageContext
from llama_index.core.vector_stores.types import VectorStoreQuery, MetadataFilter, MetadataFilters, FilterOperator
# πŸ” Environment variables
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
REDIS_KEY = os.environ.get("UPSTASH_REDIS_TOKEN") # Using REDIS_KEY for the cache key, assuming UPSTASH_REDIS_TOKEN is meant for the cache key here
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
# βœ… Redis client
try:
redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
except Exception as e:
print("❌ [Redis Init Error]", e)
raise
# πŸ“° Topic list
TOPICS = ["India news", "World news", "Tech news", "Finance news", "Sports news"]
TOPIC_KEYS = [t.lower().replace(" news", "") for t in TOPICS]
# 🧠 Summarization prompt
BASE_PROMPT = (
"You are Nuse’s editorial summarizer. Read the excerpts below and extract the most important stories. "
"Return up to 3 punchy headlines, each under 20 words, written like a premium editorial bulletin."
)
# πŸ“₯ Load documents grouped by topic from Upstash
def load_all_documents_grouped_by_topic() -> Dict[str, List[str]]:
topic_docs = {key: [] for key in TOPIC_KEYS}
try:
vector_store = get_upstash_vector_store()
print("πŸ’‘ Successfully retrieved Upstash vector store.")
# --- ADD THESE PRINT STATEMENTS ---
print(f"DEBUG: TOPICS = {TOPICS}")
print(f"DEBUG: TOPIC_KEYS = {TOPIC_KEYS}")
print(f"DEBUG: Length of TOPICS = {len(TOPICS)}")
print(f"DEBUG: Length of TOPIC_KEYS = {len(TOPIC_KEYS)}")
# ----------------------------------
for topic, key in zip(TOPICS, TOPIC_KEYS):
try:
# Upstash VectorStore expects the filter value to match the exact string
# of the topic as it was indexed. Make sure your 'topic' metadata
# in Upstash exactly matches the values in TOPICS (e.g., "India news").
# Construct MetadataFilters object
filters = MetadataFilters(
filters=[
MetadataFilter(key="topic", value=topic, operator=FilterOperator.EQ)
]
)
dummy_vector = np.random.rand(384).tolist() # Assuming MiniLM embeddings
query = VectorStoreQuery(
query_embedding=dummy_vector,
similarity_top_k=50, # Retrieve enough documents for summarization
filters=filters # Apply the metadata filter
)
# Removed the problematic .to_dict() call
print(f"πŸ”Ž Querying Upstash for topic: '{topic}'")
result = vector_store.query(query)
print(f"➑️ Found {len(result.nodes)} nodes for topic: '{topic}'.")
for node in result.nodes:
content = node.get_content().strip()
if content:
topic_docs[key].append(content)
# Optional: Print metadata to verify filtering
# print(f" Node metadata: {node.metadata}")
except Exception as e:
print(f"❌ [Topic Metadata Filter error for '{topic}']: {e}")
# Optional: Log the full traceback for more detailed debugging
# import traceback
# traceback.print_exc()
except Exception as e:
print("❌ [load_all_documents_grouped_by_topic Error]", e)
# import traceback
# traceback.print_exc()
return topic_docs
# πŸ§ͺ Summarize one topic at a time using OpenAI GPT-4
def summarize_topic(topic_key: str, docs: List[str]) -> List[Dict]:
if not docs:
print(f"⚠️ No docs found for topic: {topic_key}, skipping summarization.")
return []
try:
client = OpenAI(api_key=OPENAI_API_KEY)
# Join documents, ensuring we don't exceed typical GPT-4 context window (approx 128k tokens, 12000 chars is safe)
content = "\n\n---\n\n".join(docs)[:12000]
print(f"🧠 Summarizing topic via OpenAI: {topic_key} ({len(docs)} documents)")
completion = client.chat.completions.create(
model="gpt-4", # Or "gpt-4o" for potentially better performance
messages=[
{"role": "system", "content": BASE_PROMPT},
{"role": "user", "content": content},
],
max_tokens=512, # Enough tokens for 3 punchy headlines
temperature=0.7, # A bit creative but focused
)
text = completion.choices[0].message.content.strip()
summaries = []
# Parse the headlines, assuming they might be bullet points or lines
for line in text.splitlines():
line = line.strip("-–‒ ") # Remove common bullet characters
if line:
summaries.append({
"summary": line,
"image_url": "https://source.unsplash.com/800x600/?news", # Generic image, could be improved
"article_link": f"https://google.com/search?q={topic_key}+news" # Generic search link
})
return summaries
except Exception as e:
print(f"❌ [OpenAI Summarization Error for '{topic_key}']: {e}")
# import traceback
# traceback.print_exc()
return []
# πŸš€ Main callable
def generate_and_cache_daily_feed():
try:
print("πŸ†• Running OpenAI-powered daily feed generator....")
topic_docs = load_all_documents_grouped_by_topic()
feed_map = {}
for topic_key in TOPIC_KEYS:
try:
summaries = summarize_topic(topic_key, topic_docs.get(topic_key, []))
feed_map[topic_key] = summaries
except Exception as e:
print(f"❌ [Topic Loop Error for '{topic_key}']: {e}")
# import traceback
# traceback.print_exc()
feed_map[topic_key] = []
final_feed = [{"topic": topic, "feed": feed_map[topic_key]} for topic, topic_key in zip(TOPICS, TOPIC_KEYS)]
try:
# Ensure the REDIS_KEY is suitable for a key name (e.g., not an API token itself)
# You might want a separate environment variable for the cache key, e.g., DAILY_FEED_CACHE_KEY
cache_key_name = "daily_news_feed_cache" # A more descriptive key
redis_client.set(cache_key_name, json.dumps(final_feed, ensure_ascii=False))
# Set an expiry for the cache, e.g., 24 hours (86400 seconds)
redis_client.expire(cache_key_name, 86400)
print(f"βœ… Cached daily feed under key '{cache_key_name}' with 24-hour expiry.")
except Exception as e:
print("❌ [Redis Cache Error]", e)
# import traceback
# traceback.print_exc()
return final_feed
except Exception as e:
print("❌ [generate_and_cache_daily_feed Overall Error]", e)
# import traceback
# traceback.print_exc()
return []
# πŸ“¦ Get cached data
def get_cached_daily_feed():
try:
cache_key_name = "daily_news_feed_cache" # Use the same key name as in generate_and_cache_daily_feed
cached = redis_client.get(cache_key_name)
if cached:
print(f"βœ… Retrieved cached daily feed from '{cache_key_name}'.")
return json.loads(cached)
else:
print(f"ℹ️ No cached data found under key '{cache_key_name}'.")
return []
except Exception as e:
print("❌ [get_cached_daily_feed Error]", e)
# import traceback
# traceback.print_exc()
return []
# Example of how to run it (for testing purposes, if this were the main script)
if __name__ == "__main__":
# Ensure your environment variables are set before running
# os.environ["UPSTASH_REDIS_URL"] = "your_upstash_redis_url"
# os.environ["UPSTASH_REDIS_TOKEN"] = "your_upstash_redis_token" # This should ideally be a unique key for caching, not the token
# os.environ["OPENAI_API_KEY"] = "your_openai_api_key"
# For the UPSTASH_REDIS_TOKEN environment variable, if it's truly a Redis token
# that shouldn't be used as a cache key, you should define a separate environment
# variable for the cache key, or use a hardcoded string as I've done with "daily_news_feed_cache".
# For Upstash Vector connection, ensure UPSTASH_VECTOR_REST_URL and UPSTASH_VECTOR_REST_TOKEN
# are configured in your `components.indexers.news_indexer.py`'s `get_upstash_vector_store` function.
# Generate and cache the feed
generated_feed = generate_and_cache_daily_feed()
print("\n--- Generated and Cached Feed ---")
# for item in generated_feed:
# print(f"Topic: {item['topic']}")
# for summary in item['feed']:
# print(f" - {summary['summary']}")
# print(json.dumps(generated_feed, indent=2, ensure_ascii=False)) # For full output
# Retrieve from cache
cached_feed = get_cached_daily_feed()
print("\n--- Retrieved from Cache ---")
# for item in cached_feed:
# print(f"Topic: {item['topic']}")
# for summary in item['feed']:
# print(f" - {summary['summary']}")
# print(json.dumps(cached_feed, indent=2, ensure_ascii=False)) # For full output