File size: 9,639 Bytes
69210b9 faed34c 6858714 2af85a2 faed34c e51955e ec3b991 0e7d7a3 69210b9 e51955e 2af85a2 69210b9 0e7d7a3 ec3b991 69210b9 6858714 69210b9 6858714 c8b3b66 2af85a2 c8b3b66 71257bd faed34c b1c1acd faed34c b1c1acd ec3b991 a1280d9 ec3b991 f8625a7 b1c1acd e51955e faed34c e51955e faed34c e51955e a1280d9 faed34c e51955e faed34c e51955e ec3b991 e51955e c6150f2 ec3b991 b1c1acd e51955e b1c1acd 3f4bef7 2af85a2 3f4bef7 e51955e 3f4bef7 ec3b991 2af85a2 e51955e faed34c 2af85a2 e51955e 2af85a2 e51955e 2af85a2 e51955e 2af85a2 ec3b991 e51955e 2af85a2 e51955e 2af85a2 e51955e 2af85a2 ec3b991 e51955e ec3b991 7200af5 4df303e ec3b991 b4711c6 b1c1acd ec3b991 e51955e ec3b991 e51955e 4df303e ec3b991 e51955e ec3b991 e51955e 69210b9 ec3b991 e51955e ec3b991 69210b9 4df303e 69210b9 ec3b991 e51955e ec3b991 e51955e ec3b991 e51955e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
import os
import json
import redis
import numpy as np
from typing import List, Dict
from openai import OpenAI
from components.indexers.news_indexer import get_upstash_vector_store
from llama_index.core import StorageContext
from llama_index.core.vector_stores.types import VectorStoreQuery, MetadataFilter, MetadataFilters, FilterOperator
# π Environment variables
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
REDIS_KEY = os.environ.get("UPSTASH_REDIS_TOKEN") # Using REDIS_KEY for the cache key, assuming UPSTASH_REDIS_TOKEN is meant for the cache key here
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
# β
Redis client
try:
redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
except Exception as e:
print("β [Redis Init Error]", e)
raise
# π° Topic list
TOPICS = ["India news", "World news", "Tech news", "Finance news", "Sports news"]
TOPIC_KEYS = [t.lower().replace(" news", "") for t in TOPICS]
# π§ Summarization prompt
BASE_PROMPT = (
"You are Nuseβs editorial summarizer. Read the excerpts below and extract the most important stories. "
"Return up to 3 punchy headlines, each under 20 words, written like a premium editorial bulletin."
)
# π₯ Load documents grouped by topic from Upstash
def load_all_documents_grouped_by_topic() -> Dict[str, List[str]]:
topic_docs = {key: [] for key in TOPIC_KEYS}
try:
vector_store = get_upstash_vector_store()
print("π‘ Successfully retrieved Upstash vector store.")
# --- ADD THESE PRINT STATEMENTS ---
print(f"DEBUG: TOPICS = {TOPICS}")
print(f"DEBUG: TOPIC_KEYS = {TOPIC_KEYS}")
print(f"DEBUG: Length of TOPICS = {len(TOPICS)}")
print(f"DEBUG: Length of TOPIC_KEYS = {len(TOPIC_KEYS)}")
# ----------------------------------
for topic, key in zip(TOPICS, TOPIC_KEYS):
try:
# Upstash VectorStore expects the filter value to match the exact string
# of the topic as it was indexed. Make sure your 'topic' metadata
# in Upstash exactly matches the values in TOPICS (e.g., "India news").
# Construct MetadataFilters object
filters = MetadataFilters(
filters=[
MetadataFilter(key="topic", value=topic, operator=FilterOperator.EQ)
]
)
dummy_vector = np.random.rand(384).tolist() # Assuming MiniLM embeddings
query = VectorStoreQuery(
query_embedding=dummy_vector,
similarity_top_k=50, # Retrieve enough documents for summarization
filters=filters # Apply the metadata filter
)
# Removed the problematic .to_dict() call
print(f"π Querying Upstash for topic: '{topic}'")
result = vector_store.query(query)
print(f"β‘οΈ Found {len(result.nodes)} nodes for topic: '{topic}'.")
for node in result.nodes:
content = node.get_content().strip()
if content:
topic_docs[key].append(content)
# Optional: Print metadata to verify filtering
# print(f" Node metadata: {node.metadata}")
except Exception as e:
print(f"β [Topic Metadata Filter error for '{topic}']: {e}")
# Optional: Log the full traceback for more detailed debugging
# import traceback
# traceback.print_exc()
except Exception as e:
print("β [load_all_documents_grouped_by_topic Error]", e)
# import traceback
# traceback.print_exc()
return topic_docs
# π§ͺ Summarize one topic at a time using OpenAI GPT-4
def summarize_topic(topic_key: str, docs: List[str]) -> List[Dict]:
if not docs:
print(f"β οΈ No docs found for topic: {topic_key}, skipping summarization.")
return []
try:
client = OpenAI(api_key=OPENAI_API_KEY)
# Join documents, ensuring we don't exceed typical GPT-4 context window (approx 128k tokens, 12000 chars is safe)
content = "\n\n---\n\n".join(docs)[:12000]
print(f"π§ Summarizing topic via OpenAI: {topic_key} ({len(docs)} documents)")
completion = client.chat.completions.create(
model="gpt-4", # Or "gpt-4o" for potentially better performance
messages=[
{"role": "system", "content": BASE_PROMPT},
{"role": "user", "content": content},
],
max_tokens=512, # Enough tokens for 3 punchy headlines
temperature=0.7, # A bit creative but focused
)
text = completion.choices[0].message.content.strip()
summaries = []
# Parse the headlines, assuming they might be bullet points or lines
for line in text.splitlines():
line = line.strip("-ββ’ ") # Remove common bullet characters
if line:
summaries.append({
"summary": line,
"image_url": "https://source.unsplash.com/800x600/?news", # Generic image, could be improved
"article_link": f"https://google.com/search?q={topic_key}+news" # Generic search link
})
return summaries
except Exception as e:
print(f"β [OpenAI Summarization Error for '{topic_key}']: {e}")
# import traceback
# traceback.print_exc()
return []
# π Main callable
def generate_and_cache_daily_feed():
try:
print("π Running OpenAI-powered daily feed generator....")
topic_docs = load_all_documents_grouped_by_topic()
feed_map = {}
for topic_key in TOPIC_KEYS:
try:
summaries = summarize_topic(topic_key, topic_docs.get(topic_key, []))
feed_map[topic_key] = summaries
except Exception as e:
print(f"β [Topic Loop Error for '{topic_key}']: {e}")
# import traceback
# traceback.print_exc()
feed_map[topic_key] = []
final_feed = [{"topic": topic, "feed": feed_map[topic_key]} for topic, topic_key in zip(TOPICS, TOPIC_KEYS)]
try:
# Ensure the REDIS_KEY is suitable for a key name (e.g., not an API token itself)
# You might want a separate environment variable for the cache key, e.g., DAILY_FEED_CACHE_KEY
cache_key_name = "daily_news_feed_cache" # A more descriptive key
redis_client.set(cache_key_name, json.dumps(final_feed, ensure_ascii=False))
# Set an expiry for the cache, e.g., 24 hours (86400 seconds)
redis_client.expire(cache_key_name, 86400)
print(f"β
Cached daily feed under key '{cache_key_name}' with 24-hour expiry.")
except Exception as e:
print("β [Redis Cache Error]", e)
# import traceback
# traceback.print_exc()
return final_feed
except Exception as e:
print("β [generate_and_cache_daily_feed Overall Error]", e)
# import traceback
# traceback.print_exc()
return []
# π¦ Get cached data
def get_cached_daily_feed():
try:
cache_key_name = "daily_news_feed_cache" # Use the same key name as in generate_and_cache_daily_feed
cached = redis_client.get(cache_key_name)
if cached:
print(f"β
Retrieved cached daily feed from '{cache_key_name}'.")
return json.loads(cached)
else:
print(f"βΉοΈ No cached data found under key '{cache_key_name}'.")
return []
except Exception as e:
print("β [get_cached_daily_feed Error]", e)
# import traceback
# traceback.print_exc()
return []
# Example of how to run it (for testing purposes, if this were the main script)
if __name__ == "__main__":
# Ensure your environment variables are set before running
# os.environ["UPSTASH_REDIS_URL"] = "your_upstash_redis_url"
# os.environ["UPSTASH_REDIS_TOKEN"] = "your_upstash_redis_token" # This should ideally be a unique key for caching, not the token
# os.environ["OPENAI_API_KEY"] = "your_openai_api_key"
# For the UPSTASH_REDIS_TOKEN environment variable, if it's truly a Redis token
# that shouldn't be used as a cache key, you should define a separate environment
# variable for the cache key, or use a hardcoded string as I've done with "daily_news_feed_cache".
# For Upstash Vector connection, ensure UPSTASH_VECTOR_REST_URL and UPSTASH_VECTOR_REST_TOKEN
# are configured in your `components.indexers.news_indexer.py`'s `get_upstash_vector_store` function.
# Generate and cache the feed
generated_feed = generate_and_cache_daily_feed()
print("\n--- Generated and Cached Feed ---")
# for item in generated_feed:
# print(f"Topic: {item['topic']}")
# for summary in item['feed']:
# print(f" - {summary['summary']}")
# print(json.dumps(generated_feed, indent=2, ensure_ascii=False)) # For full output
# Retrieve from cache
cached_feed = get_cached_daily_feed()
print("\n--- Retrieved from Cache ---")
# for item in cached_feed:
# print(f"Topic: {item['topic']}")
# for summary in item['feed']:
# print(f" - {summary['summary']}")
# print(json.dumps(cached_feed, indent=2, ensure_ascii=False)) # For full output |