|
import os |
|
import json |
|
import redis |
|
import numpy as np |
|
from typing import List, Dict |
|
from openai import OpenAI |
|
from components.indexers.news_indexer import get_upstash_vector_store |
|
from llama_index.core import StorageContext |
|
from llama_index.core.vector_stores.types import VectorStoreQuery, MetadataFilter, MetadataFilters, FilterOperator |
|
|
|
|
|
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379") |
|
REDIS_KEY = os.environ.get("UPSTASH_REDIS_TOKEN") |
|
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") |
|
|
|
|
|
try: |
|
redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True) |
|
except Exception as e: |
|
print("β [Redis Init Error]", e) |
|
raise |
|
|
|
|
|
TOPICS = ["India news", "World news", "Tech news", "Finance news", "Sports news"] |
|
TOPIC_KEYS = [t.lower().replace(" news", "") for t in TOPICS] |
|
|
|
|
|
BASE_PROMPT = ( |
|
"You are Nuseβs editorial summarizer. Read the excerpts below and extract the most important stories. " |
|
"Return up to 3 punchy headlines, each under 20 words, written like a premium editorial bulletin." |
|
) |
|
|
|
|
|
def load_all_documents_grouped_by_topic() -> Dict[str, List[str]]: |
|
topic_docs = {key: [] for key in TOPIC_KEYS} |
|
|
|
try: |
|
vector_store = get_upstash_vector_store() |
|
print("π‘ Successfully retrieved Upstash vector store.") |
|
|
|
|
|
print(f"DEBUG: TOPICS = {TOPICS}") |
|
print(f"DEBUG: TOPIC_KEYS = {TOPIC_KEYS}") |
|
print(f"DEBUG: Length of TOPICS = {len(TOPICS)}") |
|
print(f"DEBUG: Length of TOPIC_KEYS = {len(TOPIC_KEYS)}") |
|
|
|
|
|
for topic, key in zip(TOPICS, TOPIC_KEYS): |
|
try: |
|
|
|
|
|
|
|
|
|
|
|
filters = MetadataFilters( |
|
filters=[ |
|
MetadataFilter(key="topic", value=topic, operator=FilterOperator.EQ) |
|
] |
|
) |
|
|
|
dummy_vector = np.random.rand(384).tolist() |
|
query = VectorStoreQuery( |
|
query_embedding=dummy_vector, |
|
similarity_top_k=50, |
|
filters=filters |
|
) |
|
|
|
|
|
print(f"π Querying Upstash for topic: '{topic}'") |
|
result = vector_store.query(query) |
|
print(f"β‘οΈ Found {len(result.nodes)} nodes for topic: '{topic}'.") |
|
|
|
for node in result.nodes: |
|
content = node.get_content().strip() |
|
if content: |
|
topic_docs[key].append(content) |
|
|
|
|
|
except Exception as e: |
|
print(f"β [Topic Metadata Filter error for '{topic}']: {e}") |
|
|
|
|
|
|
|
|
|
except Exception as e: |
|
print("β [load_all_documents_grouped_by_topic Error]", e) |
|
|
|
|
|
|
|
return topic_docs |
|
|
|
|
|
def summarize_topic(topic_key: str, docs: List[str]) -> List[Dict]: |
|
if not docs: |
|
print(f"β οΈ No docs found for topic: {topic_key}, skipping summarization.") |
|
return [] |
|
|
|
try: |
|
client = OpenAI(api_key=OPENAI_API_KEY) |
|
|
|
content = "\n\n---\n\n".join(docs)[:12000] |
|
|
|
print(f"π§ Summarizing topic via OpenAI: {topic_key} ({len(docs)} documents)") |
|
completion = client.chat.completions.create( |
|
model="gpt-4", |
|
messages=[ |
|
{"role": "system", "content": BASE_PROMPT}, |
|
{"role": "user", "content": content}, |
|
], |
|
max_tokens=512, |
|
temperature=0.7, |
|
) |
|
|
|
text = completion.choices[0].message.content.strip() |
|
|
|
summaries = [] |
|
|
|
for line in text.splitlines(): |
|
line = line.strip("-ββ’ ") |
|
if line: |
|
summaries.append({ |
|
"summary": line, |
|
"image_url": "https://source.unsplash.com/800x600/?news", |
|
"article_link": f"https://google.com/search?q={topic_key}+news" |
|
}) |
|
return summaries |
|
|
|
except Exception as e: |
|
print(f"β [OpenAI Summarization Error for '{topic_key}']: {e}") |
|
|
|
|
|
return [] |
|
|
|
|
|
def generate_and_cache_daily_feed(): |
|
try: |
|
print("π Running OpenAI-powered daily feed generator....") |
|
topic_docs = load_all_documents_grouped_by_topic() |
|
feed_map = {} |
|
|
|
for topic_key in TOPIC_KEYS: |
|
try: |
|
summaries = summarize_topic(topic_key, topic_docs.get(topic_key, [])) |
|
feed_map[topic_key] = summaries |
|
except Exception as e: |
|
print(f"β [Topic Loop Error for '{topic_key}']: {e}") |
|
|
|
|
|
feed_map[topic_key] = [] |
|
|
|
final_feed = [{"topic": topic, "feed": feed_map[topic_key]} for topic, topic_key in zip(TOPICS, TOPIC_KEYS)] |
|
|
|
try: |
|
|
|
|
|
cache_key_name = "daily_news_feed_cache" |
|
redis_client.set(cache_key_name, json.dumps(final_feed, ensure_ascii=False)) |
|
|
|
redis_client.expire(cache_key_name, 86400) |
|
print(f"β
Cached daily feed under key '{cache_key_name}' with 24-hour expiry.") |
|
except Exception as e: |
|
print("β [Redis Cache Error]", e) |
|
|
|
|
|
|
|
return final_feed |
|
|
|
except Exception as e: |
|
print("β [generate_and_cache_daily_feed Overall Error]", e) |
|
|
|
|
|
return [] |
|
|
|
|
|
def get_cached_daily_feed(): |
|
try: |
|
cache_key_name = "daily_news_feed_cache" |
|
cached = redis_client.get(cache_key_name) |
|
if cached: |
|
print(f"β
Retrieved cached daily feed from '{cache_key_name}'.") |
|
return json.loads(cached) |
|
else: |
|
print(f"βΉοΈ No cached data found under key '{cache_key_name}'.") |
|
return [] |
|
except Exception as e: |
|
print("β [get_cached_daily_feed Error]", e) |
|
|
|
|
|
return [] |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
generated_feed = generate_and_cache_daily_feed() |
|
print("\n--- Generated and Cached Feed ---") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cached_feed = get_cached_daily_feed() |
|
print("\n--- Retrieved from Cache ---") |
|
|
|
|
|
|
|
|
|
|