ref_id changes and prompt revision
Browse files- components/generators/daily_feed.py +113 -37
- pipeline/news_ingest.py +11 -5
components/generators/daily_feed.py
CHANGED
@@ -6,56 +6,106 @@ from typing import List, Dict
|
|
6 |
from openai import OpenAI
|
7 |
from components.indexers.news_indexer import get_upstash_vector_store
|
8 |
from llama_index.core.vector_stores.types import VectorStoreQuery, MetadataFilter, MetadataFilters, FilterOperator
|
|
|
|
|
|
|
|
|
9 |
|
10 |
# π Environment variables
|
11 |
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
|
12 |
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
|
13 |
|
14 |
# β
Redis client
|
15 |
-
|
|
|
|
|
|
|
|
|
|
|
16 |
|
17 |
# π° Topics
|
18 |
TOPICS = ["India news", "World news", "Tech news", "Finance news", "Sports news"]
|
19 |
TOPIC_KEYS = [t.lower().replace(" news", "") for t in TOPICS]
|
20 |
|
21 |
# π§ Summarization Prompt
|
|
|
22 |
BASE_PROMPT = (
|
23 |
"You are Nuseβs editorial summarizer. Read the excerpts below and extract the most important stories. "
|
24 |
"Return up to 3 punchy headlines, each under 20 words. Each headline should be followed by a short explanation of why the story matters."
|
|
|
25 |
)
|
26 |
|
27 |
# π₯ Load documents and metadata
|
28 |
def load_docs_by_topic_with_refs() -> Dict[str, List[Dict]]:
|
29 |
topic_docs = {key: [] for key in TOPIC_KEYS}
|
|
|
30 |
try:
|
31 |
vector_store = get_upstash_vector_store()
|
32 |
-
for
|
33 |
filters = MetadataFilters(
|
34 |
filters=[MetadataFilter(key="topic", value=topic_key, operator=FilterOperator.EQ)]
|
35 |
)
|
36 |
-
|
|
|
37 |
query = VectorStoreQuery(query_embedding=dummy_vector, similarity_top_k=50, filters=filters)
|
|
|
|
|
38 |
result = vector_store.query(query)
|
|
|
|
|
39 |
for node in result.nodes:
|
40 |
content = node.get_content().strip()
|
41 |
-
|
42 |
-
|
43 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
44 |
except Exception as e:
|
45 |
-
|
46 |
return topic_docs
|
47 |
|
48 |
# π§ͺ Topic summarizer
|
49 |
-
def summarize_topic(topic_key: str, docs: List[Dict]
|
50 |
if not docs:
|
51 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
52 |
return []
|
53 |
|
|
|
|
|
|
|
54 |
try:
|
55 |
-
content = "\n\n---\n\n".join([d["text"] for d in docs])[:12000]
|
56 |
client = OpenAI(api_key=OPENAI_API_KEY)
|
57 |
response = client.chat.completions.create(
|
58 |
-
model="gpt-4",
|
59 |
messages=[
|
60 |
{"role": "system", "content": BASE_PROMPT},
|
61 |
{"role": "user", "content": content},
|
@@ -63,45 +113,60 @@ def summarize_topic(topic_key: str, docs: List[Dict], start_index: int) -> List[
|
|
63 |
max_tokens=512,
|
64 |
temperature=0.7,
|
65 |
)
|
66 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
67 |
result = []
|
68 |
-
|
69 |
-
|
70 |
-
|
71 |
-
|
72 |
-
|
73 |
-
|
74 |
-
|
75 |
-
|
76 |
-
|
77 |
-
|
|
|
|
|
78 |
return result
|
79 |
except Exception as e:
|
80 |
-
|
81 |
return []
|
82 |
|
83 |
# π Generate and cache feed
|
84 |
def generate_and_cache_daily_feed():
|
85 |
try:
|
86 |
-
|
87 |
-
topic_docs = load_docs_by_topic_with_refs()
|
88 |
feed_map = {}
|
89 |
-
global_ref
|
90 |
|
91 |
for topic_key in TOPIC_KEYS:
|
92 |
try:
|
93 |
-
|
|
|
94 |
feed_map[topic_key] = summaries
|
95 |
-
global_ref += len(summaries)
|
96 |
except Exception as e:
|
97 |
-
|
98 |
feed_map[topic_key] = []
|
99 |
|
100 |
final_feed = []
|
101 |
-
for
|
102 |
topic_feed = feed_map.get(topic_key, [])
|
103 |
final_feed.append({
|
104 |
-
"topic":
|
105 |
"feed": topic_feed
|
106 |
})
|
107 |
|
@@ -109,15 +174,15 @@ def generate_and_cache_daily_feed():
|
|
109 |
try:
|
110 |
cache_key = "daily_news_feed_cache"
|
111 |
redis_client.set(cache_key, json.dumps(final_feed, ensure_ascii=False))
|
112 |
-
redis_client.expire(cache_key, 86400)
|
113 |
-
|
114 |
except Exception as e:
|
115 |
-
|
116 |
|
117 |
return final_feed
|
118 |
|
119 |
except Exception as e:
|
120 |
-
|
121 |
return []
|
122 |
|
123 |
# π¦ Retrieve from cache
|
@@ -125,12 +190,23 @@ def get_cached_daily_feed():
|
|
125 |
try:
|
126 |
cache_key = "daily_news_feed_cache"
|
127 |
cached = redis_client.get(cache_key)
|
128 |
-
|
|
|
|
|
|
|
|
|
|
|
129 |
except Exception as e:
|
130 |
-
|
131 |
return []
|
132 |
|
133 |
# π§ͺ Run if main
|
134 |
if __name__ == "__main__":
|
135 |
feed = generate_and_cache_daily_feed()
|
|
|
136 |
print(json.dumps(feed, indent=2, ensure_ascii=False))
|
|
|
|
|
|
|
|
|
|
|
|
6 |
from openai import OpenAI
|
7 |
from components.indexers.news_indexer import get_upstash_vector_store
|
8 |
from llama_index.core.vector_stores.types import VectorStoreQuery, MetadataFilter, MetadataFilters, FilterOperator
|
9 |
+
import logging
|
10 |
+
|
11 |
+
# Configure logging
|
12 |
+
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
13 |
|
14 |
# π Environment variables
|
15 |
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
|
16 |
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
|
17 |
|
18 |
# β
Redis client
|
19 |
+
try:
|
20 |
+
redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
|
21 |
+
except Exception as e:
|
22 |
+
logging.error(f"β [Redis Init Error]: {e}")
|
23 |
+
# Decide if you want to raise an exception or continue without Redis
|
24 |
+
raise # It's critical for caching, so raising is appropriate here
|
25 |
|
26 |
# π° Topics
|
27 |
TOPICS = ["India news", "World news", "Tech news", "Finance news", "Sports news"]
|
28 |
TOPIC_KEYS = [t.lower().replace(" news", "") for t in TOPICS]
|
29 |
|
30 |
# π§ Summarization Prompt
|
31 |
+
# Removed the "numbered headline" instruction as we want LLM to just give plain headlines
|
32 |
BASE_PROMPT = (
|
33 |
"You are Nuseβs editorial summarizer. Read the excerpts below and extract the most important stories. "
|
34 |
"Return up to 3 punchy headlines, each under 20 words. Each headline should be followed by a short explanation of why the story matters."
|
35 |
+
"Don't include unnecessary text like 'this is important because' or 'why this matters because..' just state the logic and nothing else."
|
36 |
)
|
37 |
|
38 |
# π₯ Load documents and metadata
|
39 |
def load_docs_by_topic_with_refs() -> Dict[str, List[Dict]]:
|
40 |
topic_docs = {key: [] for key in TOPIC_KEYS}
|
41 |
+
logging.info("Starting to load documents by topic from Upstash Vector Store...")
|
42 |
try:
|
43 |
vector_store = get_upstash_vector_store()
|
44 |
+
for full_topic_name, topic_key in zip(TOPICS, TOPIC_KEYS):
|
45 |
filters = MetadataFilters(
|
46 |
filters=[MetadataFilter(key="topic", value=topic_key, operator=FilterOperator.EQ)]
|
47 |
)
|
48 |
+
# A random vector for querying, adjust similarity_top_k based on your data density
|
49 |
+
dummy_vector = np.random.rand(384).tolist()
|
50 |
query = VectorStoreQuery(query_embedding=dummy_vector, similarity_top_k=50, filters=filters)
|
51 |
+
|
52 |
+
logging.info(f"π Querying for topic '{topic_key}' with filter value '{topic_key}'.")
|
53 |
result = vector_store.query(query)
|
54 |
+
logging.info(f"β‘οΈ Found {len(result.nodes)} nodes for topic '{topic_key}'.")
|
55 |
+
|
56 |
for node in result.nodes:
|
57 |
content = node.get_content().strip()
|
58 |
+
# *** IMPORTANT: Retrieve the 'headline_id' from node.metadata ***
|
59 |
+
headline_id = node.metadata.get("headline_id")
|
60 |
+
|
61 |
+
# You can also get other metadata like title, url, source if needed for the summary output or debugging
|
62 |
+
title = node.metadata.get("title", "No Title")
|
63 |
+
url = node.metadata.get("url", "#")
|
64 |
+
source = node.metadata.get("source", "Unknown Source")
|
65 |
+
|
66 |
+
if content and headline_id is not None:
|
67 |
+
topic_docs[topic_key].append({
|
68 |
+
"text": content,
|
69 |
+
"headline_id": headline_id, # Store the actual ID
|
70 |
+
"title": title,
|
71 |
+
"url": url,
|
72 |
+
"source": source
|
73 |
+
})
|
74 |
+
elif content and headline_id is None:
|
75 |
+
logging.warning(f"Node found without 'headline_id' for topic '{topic_key}': URL {node.metadata.get('url', 'N/A')}")
|
76 |
+
|
77 |
except Exception as e:
|
78 |
+
logging.error(f"β [load_docs_by_topic_with_refs Error]: {e}", exc_info=True)
|
79 |
return topic_docs
|
80 |
|
81 |
# π§ͺ Topic summarizer
|
82 |
+
def summarize_topic(topic_key: str, docs: List[Dict]) -> List[Dict]: # Removed start_index
|
83 |
if not docs:
|
84 |
+
logging.warning(f"β οΈ No docs for topic: {topic_key}, skipping summarization.")
|
85 |
+
return []
|
86 |
+
|
87 |
+
# Get the headline_id of the first document to use as a representative ID for the summary
|
88 |
+
# This assumes the summary is broadly about the topic, and we just need *a* representative ID.
|
89 |
+
representative_headline_id = docs[0].get("headline_id") if docs else None
|
90 |
+
representative_article_link = docs[0].get("url") if docs else f"https://google.com/search?q={topic_key}+news"
|
91 |
+
representative_title = docs[0].get("title") if docs else f"Summary for {topic_key}"
|
92 |
+
|
93 |
+
|
94 |
+
# Concatenate document texts for summarization
|
95 |
+
# Ensure all document texts are strings before joining
|
96 |
+
content = "\n\n---\n\n".join([str(d["text"]) for d in docs if "text" in d and d["text"] is not None])
|
97 |
+
|
98 |
+
if not content:
|
99 |
+
logging.warning(f"β οΈ No valid text content found in docs for topic: {topic_key}, skipping summarization.")
|
100 |
return []
|
101 |
|
102 |
+
content = content[:12000] # Truncate to avoid excessive token usage
|
103 |
+
|
104 |
+
logging.info(f"π§ Summarizing topic via OpenAI: '{topic_key}' ({len(docs)} documents)")
|
105 |
try:
|
|
|
106 |
client = OpenAI(api_key=OPENAI_API_KEY)
|
107 |
response = client.chat.completions.create(
|
108 |
+
model="gpt-4", # Consider "gpt-4o" or "gpt-3.5-turbo" for cost/speed
|
109 |
messages=[
|
110 |
{"role": "system", "content": BASE_PROMPT},
|
111 |
{"role": "user", "content": content},
|
|
|
113 |
max_tokens=512,
|
114 |
temperature=0.7,
|
115 |
)
|
116 |
+
llm_output = response.choices[0].message.content.strip()
|
117 |
+
|
118 |
+
# Parse headlines based on the expected format
|
119 |
+
headlines = []
|
120 |
+
for line in llm_output.splitlines():
|
121 |
+
line = line.strip("-ββ’ ") # Clean bullet points
|
122 |
+
if ":" in line:
|
123 |
+
parts = line.split(':', 1) # Split only on the first colon
|
124 |
+
if len(parts) == 2:
|
125 |
+
headline_text = parts[0].strip()
|
126 |
+
explanation_text = parts[1].strip()
|
127 |
+
if headline_text and explanation_text: # Ensure both parts are non-empty
|
128 |
+
headlines.append({"summary": headline_text, "explanation": explanation_text})
|
129 |
+
|
130 |
result = []
|
131 |
+
# Assign the representative ID to each generated summary
|
132 |
+
for i, h_item in enumerate(headlines):
|
133 |
+
result.append({
|
134 |
+
"summary": h_item["summary"],
|
135 |
+
"explanation": h_item["explanation"],
|
136 |
+
"headline_id": representative_headline_id, # Use the representative ID
|
137 |
+
"image_url": "https://source.unsplash.com/800x600/?news",
|
138 |
+
"article_link": representative_article_link, # Use representative link
|
139 |
+
"representative_title": representative_title # Add the representative title for context
|
140 |
+
})
|
141 |
+
|
142 |
+
logging.info(f"β
Successfully generated {len(result)} summaries for topic '{topic_key}'.")
|
143 |
return result
|
144 |
except Exception as e:
|
145 |
+
logging.error(f"β [Summarize topic '{topic_key}' Error]: {e}", exc_info=True)
|
146 |
return []
|
147 |
|
148 |
# π Generate and cache feed
|
149 |
def generate_and_cache_daily_feed():
|
150 |
try:
|
151 |
+
logging.info("π Generating daily feed...")
|
152 |
+
topic_docs = load_docs_by_topic_with_refs() # This now returns docs with 'headline_id'
|
153 |
feed_map = {}
|
154 |
+
# global_ref is no longer needed in this context for generating summary IDs
|
155 |
|
156 |
for topic_key in TOPIC_KEYS:
|
157 |
try:
|
158 |
+
# Pass the documents directly to the summarizer
|
159 |
+
summaries = summarize_topic(topic_key, topic_docs.get(topic_key, []))
|
160 |
feed_map[topic_key] = summaries
|
|
|
161 |
except Exception as e:
|
162 |
+
logging.error(f"β [Topic summarization loop error for '{topic_key}']: {e}", exc_info=True)
|
163 |
feed_map[topic_key] = []
|
164 |
|
165 |
final_feed = []
|
166 |
+
for topic_display_name, topic_key in zip(TOPICS, TOPIC_KEYS):
|
167 |
topic_feed = feed_map.get(topic_key, [])
|
168 |
final_feed.append({
|
169 |
+
"topic": topic_display_name,
|
170 |
"feed": topic_feed
|
171 |
})
|
172 |
|
|
|
174 |
try:
|
175 |
cache_key = "daily_news_feed_cache"
|
176 |
redis_client.set(cache_key, json.dumps(final_feed, ensure_ascii=False))
|
177 |
+
redis_client.expire(cache_key, 86400) # Set expiry to 24 hours
|
178 |
+
logging.info(f"β
Cached feed under key '{cache_key}' with 24-hour expiry.")
|
179 |
except Exception as e:
|
180 |
+
logging.error(f"β [Redis cache error]: {e}", exc_info=True)
|
181 |
|
182 |
return final_feed
|
183 |
|
184 |
except Exception as e:
|
185 |
+
logging.critical(f"β [generate_and_cache_daily_feed Overall Error]: {e}", exc_info=True)
|
186 |
return []
|
187 |
|
188 |
# π¦ Retrieve from cache
|
|
|
190 |
try:
|
191 |
cache_key = "daily_news_feed_cache"
|
192 |
cached = redis_client.get(cache_key)
|
193 |
+
if cached:
|
194 |
+
logging.info(f"β
Retrieved cached daily feed from '{cache_key}'.")
|
195 |
+
return json.loads(cached)
|
196 |
+
else:
|
197 |
+
logging.info(f"βΉοΈ No cached data found under key '{cache_key}'.")
|
198 |
+
return []
|
199 |
except Exception as e:
|
200 |
+
logging.error(f"β [get_cached_daily_feed Error]: {e}", exc_info=True)
|
201 |
return []
|
202 |
|
203 |
# π§ͺ Run if main
|
204 |
if __name__ == "__main__":
|
205 |
feed = generate_and_cache_daily_feed()
|
206 |
+
print("\n--- Generated Daily Feed ---")
|
207 |
print(json.dumps(feed, indent=2, ensure_ascii=False))
|
208 |
+
|
209 |
+
# Example of retrieving from cache
|
210 |
+
# cached_data = get_cached_daily_feed()
|
211 |
+
# print("\n--- Cached Daily Feed ---")
|
212 |
+
# print(json.dumps(cached_data, indent=2, ensure_ascii=False))
|
pipeline/news_ingest.py
CHANGED
@@ -37,10 +37,14 @@ def write_articles_jsonl(articles: List[Dict], file_path: str):
|
|
37 |
|
38 |
# π Convert raw scraped data into Document objects
|
39 |
async def build_documents(data: List[Dict]) -> List[Document]:
|
|
|
|
|
|
|
40 |
return [
|
41 |
Document(
|
42 |
text=entry["content"],
|
43 |
metadata={
|
|
|
44 |
"title": entry["title"],
|
45 |
"url": entry["url"],
|
46 |
"topic": entry["topic"].lower().replace(" news", ""), # normalized topic key
|
@@ -58,7 +62,8 @@ async def main():
|
|
58 |
print("π Fetching news URLs from Google...")
|
59 |
|
60 |
all_articles = []
|
61 |
-
counter
|
|
|
62 |
|
63 |
for query in QUERIES:
|
64 |
print(f"π Searching for: {query}")
|
@@ -77,15 +82,16 @@ async def main():
|
|
77 |
article_text = scrape_url(url)
|
78 |
|
79 |
if article_text:
|
80 |
-
|
81 |
all_articles.append({
|
|
|
82 |
"topic": query,
|
83 |
-
"title":
|
84 |
"url": url,
|
85 |
"source": source,
|
86 |
"content": article_text
|
87 |
})
|
88 |
-
|
89 |
else:
|
90 |
print(f"β οΈ Skipped: {url}")
|
91 |
|
@@ -107,4 +113,4 @@ async def main():
|
|
107 |
|
108 |
# π Entrypoint
|
109 |
if __name__ == "__main__":
|
110 |
-
asyncio.run(main())
|
|
|
37 |
|
38 |
# π Convert raw scraped data into Document objects
|
39 |
async def build_documents(data: List[Dict]) -> List[Document]:
|
40 |
+
# --- IMPORTANT CHANGE HERE ---
|
41 |
+
# The 'data' list from all_articles already contains the 'headline_id' (which was 'counter').
|
42 |
+
# We will use that directly.
|
43 |
return [
|
44 |
Document(
|
45 |
text=entry["content"],
|
46 |
metadata={
|
47 |
+
"headline_id": entry["headline_id"], # Use the pre-assigned ID
|
48 |
"title": entry["title"],
|
49 |
"url": entry["url"],
|
50 |
"topic": entry["topic"].lower().replace(" news", ""), # normalized topic key
|
|
|
62 |
print("π Fetching news URLs from Google...")
|
63 |
|
64 |
all_articles = []
|
65 |
+
# This counter will be used for your simple sequential ID
|
66 |
+
global_headline_id_counter = 1
|
67 |
|
68 |
for query in QUERIES:
|
69 |
print(f"π Searching for: {query}")
|
|
|
82 |
article_text = scrape_url(url)
|
83 |
|
84 |
if article_text:
|
85 |
+
# Assign the simple sequential ID here
|
86 |
all_articles.append({
|
87 |
+
"headline_id": global_headline_id_counter, # Assign the unique ID
|
88 |
"topic": query,
|
89 |
+
"title": title, # Keep title clean, the numbering can be for display later
|
90 |
"url": url,
|
91 |
"source": source,
|
92 |
"content": article_text
|
93 |
})
|
94 |
+
global_headline_id_counter += 1 # Increment for the next article
|
95 |
else:
|
96 |
print(f"β οΈ Skipped: {url}")
|
97 |
|
|
|
113 |
|
114 |
# π Entrypoint
|
115 |
if __name__ == "__main__":
|
116 |
+
asyncio.run(main())
|