ragV98 commited on
Commit
4df303e
Β·
1 Parent(s): f090069

structural changes

Browse files
components/generators/daily_feed.py CHANGED
@@ -2,20 +2,21 @@ import os
2
  import json
3
  import redis
4
  from typing import List, Dict
 
5
  from llama_index.core.schema import Document
 
6
  from components.LLMs.Mistral import call_mistral
7
 
8
  # πŸ” Environment variables
9
  REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
10
  REDIS_KEY = os.environ.get("UPSTASH_REDIS_TOKEN")
 
11
 
12
  # βœ… Redis client
13
  redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
14
 
15
  # πŸ“° Topic list
16
  TOPICS = ["India news", "World news", "Tech news", "Finance news", "Sports news"]
17
-
18
- # πŸ”§ Flattened topic keys for JSON output
19
  TOPIC_KEYS = [t.lower().replace(" news", "") for t in TOPICS]
20
 
21
  # 🧠 Summarization prompt
@@ -65,15 +66,29 @@ def categorize_summary(summary: str) -> str:
65
  else:
66
  return "world"
67
 
68
- # πŸ§ͺ Summarize the entire day’s documents in one LLM pass
69
- def summarize_all_documents(documents: List[Document]) -> Dict[str, List[Dict]]:
70
- merged_text = "\n\n---\n\n".join(doc.text.strip() for doc in documents if doc.text.strip())
 
 
 
 
 
 
 
 
 
 
 
 
71
 
 
 
 
72
  print("\n🧠 Sending merged prompt to summarizer...\n")
73
  summary_block = call_mistral(base_prompt=BASE_PROMPT, tail_prompt=merged_text)
74
 
75
  categorized_feed = {key: [] for key in TOPIC_KEYS}
76
-
77
  if summary_block:
78
  for line in summary_block.splitlines():
79
  line = line.strip()
@@ -88,16 +103,21 @@ def summarize_all_documents(documents: List[Document]) -> Dict[str, List[Dict]]:
88
  })
89
  return categorized_feed
90
 
91
- # πŸš€ Final callable to build and cache the feed
92
- def generate_and_cache_daily_feed(documents: List[Document]):
93
- all_feed = summarize_all_documents(documents)
94
- final_feed = [{"topic": topic, "feed": all_feed[topic]} for topic in TOPIC_KEYS]
 
 
 
 
 
95
 
96
  redis_client.set(REDIS_KEY, json.dumps(final_feed, ensure_ascii=False))
97
  print(f"βœ… Cached daily feed under key '{REDIS_KEY}'")
98
  return final_feed
99
 
100
- # πŸ“¦ Utility to read cached data
101
  def get_cached_daily_feed():
102
  cached = redis_client.get(REDIS_KEY)
103
  return json.loads(cached) if cached else []
 
2
  import json
3
  import redis
4
  from typing import List, Dict
5
+ from llama_index.core import VectorStoreIndex, StorageContext, load_index_from_storage
6
  from llama_index.core.schema import Document
7
+ from llama_index.core.query_engine import RetrieverQueryEngine
8
  from components.LLMs.Mistral import call_mistral
9
 
10
  # πŸ” Environment variables
11
  REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
12
  REDIS_KEY = os.environ.get("UPSTASH_REDIS_TOKEN")
13
+ INDEX_DIR = os.environ.get("INDEX_DIR", "storage/index")
14
 
15
  # βœ… Redis client
16
  redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
17
 
18
  # πŸ“° Topic list
19
  TOPICS = ["India news", "World news", "Tech news", "Finance news", "Sports news"]
 
 
20
  TOPIC_KEYS = [t.lower().replace(" news", "") for t in TOPICS]
21
 
22
  # 🧠 Summarization prompt
 
66
  else:
67
  return "world"
68
 
69
+ # πŸ“₯ Load all documents from the vector store
70
+ def load_all_documents() -> List[Document]:
71
+ storage_context = StorageContext.from_defaults(persist_dir=INDEX_DIR)
72
+ index = load_index_from_storage(storage_context)
73
+ retriever = index.as_retriever(similarity_top_k=50)
74
+ query_engine = RetrieverQueryEngine(retriever=retriever)
75
+
76
+ combined_docs = []
77
+ for topic in TOPICS:
78
+ response = query_engine.query(topic)
79
+ for node in response.source_nodes:
80
+ doc_text = str(node.get_content()).strip()
81
+ if doc_text:
82
+ combined_docs.append(doc_text)
83
+ return combined_docs
84
 
85
+ # πŸ§ͺ Summarize entire day's content in one call
86
+ def summarize_and_categorize(docs: List[str]) -> Dict[str, List[Dict]]:
87
+ merged_text = "\n\n---\n\n".join(docs)
88
  print("\n🧠 Sending merged prompt to summarizer...\n")
89
  summary_block = call_mistral(base_prompt=BASE_PROMPT, tail_prompt=merged_text)
90
 
91
  categorized_feed = {key: [] for key in TOPIC_KEYS}
 
92
  if summary_block:
93
  for line in summary_block.splitlines():
94
  line = line.strip()
 
103
  })
104
  return categorized_feed
105
 
106
+ # πŸš€ Main callable
107
+ def generate_and_cache_daily_feed():
108
+ docs = load_all_documents()
109
+ if not docs:
110
+ print("⚠️ No documents found in vector store.")
111
+ return []
112
+
113
+ feed_map = summarize_and_categorize(docs)
114
+ final_feed = [{"topic": topic, "feed": feed_map[topic]} for topic in TOPIC_KEYS]
115
 
116
  redis_client.set(REDIS_KEY, json.dumps(final_feed, ensure_ascii=False))
117
  print(f"βœ… Cached daily feed under key '{REDIS_KEY}'")
118
  return final_feed
119
 
120
+ # πŸ“¦ Get cached data
121
  def get_cached_daily_feed():
122
  cached = redis_client.get(REDIS_KEY)
123
  return json.loads(cached) if cached else []
pipeline/news_ingest.py CHANGED
@@ -101,10 +101,10 @@ async def main():
101
  documents = await build_documents(all_articles)
102
  get_or_build_index_from_docs(documents)
103
 
104
- print("⚑ Generating daily feed...")
105
- generate_and_cache_daily_feed(documents) # βœ… SYNC CALL
106
 
107
- print(f"βœ… Indexed, headlines generated, and stored at: {INDEX_DIR}")
108
 
109
  # 🏁 Entrypoint
110
  if __name__ == "__main__":
 
101
  documents = await build_documents(all_articles)
102
  get_or_build_index_from_docs(documents)
103
 
104
+ # print("⚑ Generating daily feed...")
105
+ # generate_and_cache_daily_feed(documents) # βœ… SYNC CALL
106
 
107
+ # print(f"βœ… Indexed, headlines generated, and stored at: {INDEX_DIR}")
108
 
109
  # 🏁 Entrypoint
110
  if __name__ == "__main__":