import feedparser from urllib.parse import quote from newspaper import Article from urllib.parse import quote from llama_index import Document from typing import Any, List, Tuple from datetime import datetime, timedelta from llama_index import PromptTemplate from llama_index.query_engine import RetrieverQueryEngine from llama_index import get_response_synthesizer from llama_index.schema import NodeWithScore from llama_index.query_engine import RetrieverQueryEngine from llama_index import VectorStoreIndex, ServiceContext from llama_index.query_engine import RetrieverQueryEngine from llama_index.llms.base import llm_completion_callback from llama_index.postprocessor import SentenceTransformerRerank from llama_index.retrievers import VectorIndexRetriever, BaseRetriever, BM25Retriever class NewsFeedParser: def __init__(self): """ Initializes the NewsFeedParser class. """ self.articles_data = [] def create_search_urls(self, question, base_urls): """ Converts a question into properly formatted search URLs for multiple base URLs. Parameters: question (str): The query or question to be converted into search URLs. base_urls (list): A list of base URLs for the search services. Returns: list: A list of formatted search URLs, each containing the encoded question. """ # URL Encoding encoded_question = quote(question) # Constructing the full URLs for each base URL search_urls = [base_url + encoded_question for base_url in base_urls] return search_urls def parse_feed(self, rss_url): """ Parses the RSS feed from a given URL and processes each entry. Parameters: rss_url (str): URL of the RSS feed to be parsed. """ news_feed = feedparser.parse(rss_url) content = news_feed.entries # Get the current date current_date = datetime.now() for entry in content: # Extract and format the publication date newformat = "%a, %d %b %Y %H:%M:%S %Z" published_date = datetime.strptime(entry.published, newformat) # Check if the article is within the last week if current_date - published_date <= timedelta(days=7): # Extract the article text article_text = self.extract_article_text(entry.link) # Only add to the list if article text is successfully extracted if article_text: self.articles_data.append({ 'link': entry.link, 'published': published_date.strftime("%Y-%m-%d %H:%M:%S"), 'article_text': article_text }) def extract_article_text(self, url): """ Extracts text from a given article URL. Parameters: url (str): The URL of the article from which to extract text. Returns: str: Extracted article text. Returns None if extraction fails. """ try: article = Article(url) article.download() article.parse() return article.text except Exception as e: print(f"An error occurred: {e}") return None def process_query(self, input_query): """ Processes an input query to extract articles related to the query from multiple sources. Parameters: input_query (str): The query from which to extract information. Returns: list: A list of articles with their details. """ # Define base URLs for multiple news sources base_urls = [ 'https://news.google.com/rss/search?q=', 'http://www.ft.com/rss/markets?q=' #'https://www.bloomberg.com/search?query=' ] # Step 1: Create search URLs for each base URL search_urls = self.create_search_urls(input_query, base_urls) # Step 2: Parse the feed for each search URL for url in search_urls: self.parse_feed(url) # Return the accumulated articles from all feeds return self.articles_data def chunk_text_by_words_with_overlap(self, text, max_words, overlap, metadata): """ Splits the text into chunks of a specified number of words with a specified overlap and attaches metadata to each chunk. """ words = text.split() chunks = [' '.join(words[i:i + max_words]) for i in range(0, len(words), max_words - overlap)] # Ensure the last chunk doesn't exceed the text length if len(chunks) > 1 and len(chunks[-1].split()) < overlap: chunks[-2] = ' '.join(chunks[-2:]) chunks.pop(-1) return [{'text': chunk, **metadata} for chunk in chunks] def process_and_chunk_articles(self, input_query, max_words=250, overlap=20): """ Processes an input query, fetches related articles, and chunks their text. Returns a list of Document objects with attached metadata. """ # Process the query and get articles articles = self.process_query(input_query) print(len(articles)) # Chunk each article's text and create Document objects documents = [] metadata_list = [] for article in articles: article_chunks = self.chunk_text_by_words_with_overlap( article['article_text'], max_words, overlap, metadata={'link': article['link'], 'published': article['published']} ) for chunk in article_chunks: documents.append(Document(text=chunk['text'])) metadata_list.append({'link': chunk['link'], 'published_date': chunk['published']}) # Add metadata to each document for doc, meta in zip(documents, metadata_list): doc.metadata = meta return documents class HybridRetriever(BaseRetriever): """ A hybrid retriever that combines results from two different retrieval methods: vector-based retrieval and BM25 retrieval. Attributes: vector_retriever: An instance of a retriever that uses vector embeddings for retrieval. bm25_retriever: An instance of a retriever that uses BM25 algorithm for retrieval. The class inherits from BaseRetriever, indicating that it follows a similar interface. """ def __init__(self, vector_retriever, bm25_retriever): """ Initializes the HybridRetriever with two different types of retrievers. Args: vector_retriever: The retriever instance which uses vector-based retrieval methods. bm25_retriever: The retriever instance which uses BM25 algorithm for retrieval. """ self.vector_retriever = vector_retriever self.bm25_retriever = bm25_retriever super().__init__() def _retrieve(self, query, **kwargs): """ Performs a retrieval operation by combining results from both the vector and BM25 retrievers. Args: query: The query string based on which the documents are to be retrieved. **kwargs: Additional keyword arguments that might be required for retrieval. Returns: all_nodes: A list of nodes (documents) retrieved by combining results from both retrievers. This ensures a diverse set of results leveraging the strengths of both retrieval methods. """ # Retrieve nodes using BM25 retriever bm25_nodes = self.bm25_retriever.retrieve(query, **kwargs) # Retrieve nodes using vector retriever vector_nodes = self.vector_retriever.retrieve(query, **kwargs) # Combine the two lists of nodes, ensuring no duplicates all_nodes = [] node_ids = set() for n in bm25_nodes + vector_nodes: # Check if node is already added; if not, add it to the list if n.node.node_id not in node_ids: all_nodes.append(n) node_ids.add(n.node.node_id) return all_nodes class NewsQueryEngine: """ A class to handle the process of setting up a query engine and performing queries on PDF documents. This class encapsulates the functionality of creating prompt templates, embedding models, service contexts, indexes, hybrid retrievers, response synthesizers, and executing queries on the set up engine. Attributes: documents (List): A list of documents to be indexed. llm (Language Model): The language model to be used for embeddings and queries. qa_prompt_tmpl (str): Template for creating query prompts. queries (List[str]): List of queries to be executed. Methods: setup_query_engine(): Sets up the query engine with all necessary components. execute_queries(): Executes the predefined queries and prints the results. """ def __init__(self, documents: List[Any], llm: Any, embed_model: Any): self.documents = documents self.llm = llm self.embed_model = embed_model self.qa_prompt_tmpl = ( "Context information is below.\n" "---------------------\n" "{context_str}\n" "---------------------\n" "As an experienced financial analyst and researcher, you are tasked with helping fellow analysts in research using the latest financial news.\n " "Your answer will be based on the snippets of latest news provided as context information for each query.\n " "For each query, provide a concise answer derived from the information provided in the form of news.\n" "Try to not assume any critical information that might impact the answer. \n " "Note any major issues in the paper's results and analysis.\n" "If a query cannot be answered due to lack of information in the context, state this explicitly.\n" "Query: {query_str}\n" "Answer:" ) def setup_query_engine(self) -> Any: """ Sets up the query engine by initializing and configuring the embedding model, service context, index, hybrid retriever (combining vector and BM25 retrievers), and the response synthesizer. Returns the configured query engine. """ # Initialize the service context with the language model and embedding model service_context = ServiceContext.from_defaults(llm=self.llm, embed_model=self.embed_model) # Create an index from documents index = VectorStoreIndex.from_documents(documents=self.documents, service_context=service_context) nodes = service_context.node_parser.get_nodes_from_documents(self.documents) # Set up vector and BM25 retrievers vector_retriever = index.as_retriever(similarity_top_k=5) bm25_retriever = BM25Retriever.from_defaults(nodes=nodes, similarity_top_k=5) hybrid_retriever = HybridRetriever(vector_retriever, bm25_retriever) # Configure the response synthesizer with the prompt template qa_prompt = PromptTemplate(self.qa_prompt_tmpl) response_synthesizer = get_response_synthesizer( service_context=service_context, text_qa_template=qa_prompt, response_mode="tree_summarize", ) # Assemble the query engine with a reranker and the synthesizer reranker = SentenceTransformerRerank(top_n=4, model="BAAI/bge-reranker-base") query_engine = RetrieverQueryEngine.from_args( retriever=hybrid_retriever, node_postprocessors=[reranker], response_synthesizer=response_synthesizer, ) return query_engine