import pandas as pd import numpy as np from bs4 import BeautifulSoup import requests as r import regex as re from dateutil import parser import logging import multiprocessing def date_time_parser(dt): """ Computes the minutes elapsed since published time. :param dt: date :return: int, minutes elapsed. """ return int(np.round((dt.now(dt.tz) - dt).total_seconds() / 60, 0)) def text_clean(desc): """ Cleans the text by removing special chars. :param desc: string containing description :return: str, cleaned description. """ desc = desc.replace("<", "<") desc = desc.replace(">", ">") desc = re.sub("<.*?>", "", desc) desc = desc.replace("#39;", "'") desc = desc.replace('"', '"') desc = desc.replace(' ', ' ') desc = desc.replace('#32;', ' ') return desc def rss_parser(i): """ Returns a data frame of parsed news item. :param i: single news item in RSS feed. :return: Data frame of parsed news item. """ b1 = BeautifulSoup(str(i), "xml") title = "" if b1.find("title") is None else b1.find("title").get_text() title = text_clean(title) url = "" if b1.find("link") is None else b1.find("link").get_text() desc = "" if b1.find("description") is None else b1.find("description").get_text() desc = text_clean(desc) desc = f'{desc[:300]}...' if len(desc) >= 300 else desc date = "Sat, 12 Aug 2000 13:39:15 +05:30" if ((b1.find("pubDate") == "") or (b1.find("pubDate") is None)) else b1.find("pubDate").get_text() if url.find("businesstoday.in") >= 0: date = date.replace("GMT", "+0530") date1 = parser.parse(date) return pd.DataFrame({"title": title, "url": url, "description": desc, "parsed_date": date1}, index=[0]) def src_parse(rss): """ Returns the root domain name (eg. livemint.com is extracted from www.livemint.com :param rss: RSS URL :return: str, string containing the source name """ if rss.find('ndtvprofit') >= 0: rss = 'ndtv profit' if rss.find('ndtv') >= 0: rss = 'ndtv.com' if rss.find('telanganatoday') >= 0: rss = 'telanganatoday.com' rss = rss.replace("https://www.", "") rss = rss.split("/") return rss[0] def news_agg(rss): """ Returns feeds from each 'rss' URL. :param rss: RSS URL. :return: Data frame of processed articles. """ try: rss_df = pd.DataFrame() # user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36" headers = { 'authority': 'www.google.com', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'accept-language': 'en-US,en;q=0.9', 'cache-control': 'max-age=0', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36' } timeout = 5 resp = r.get(rss, timeout=timeout, headers=headers) logging.warning(f'{rss}: {resp.status_code}') b = BeautifulSoup(resp.content, "xml") items = b.find_all("item") for i in items: # rss_df = rss_df.append(rss_parser(i)).copy() rss_df = pd.concat([rss_df, rss_parser(i)], axis=0) rss_df.reset_index(drop=True, inplace=True) rss_df["description"] = rss_df["description"].replace([" NULL", ''], np.nan) #### UNCOMMENT IN CASE OF OOM ERROR IN RENDER # rss_df.dropna(inplace=True) #### rss_df["src"] = src_parse(rss) rss_df["elapsed_time"] = rss_df["parsed_date"].apply(date_time_parser) rss_df["parsed_date"] = rss_df["parsed_date"].astype("str") # rss_df["elapsed_time_str"] = rss_df["elapsed_time"].apply(elapsed_time_str) except Exception as e: print(e) pass return rss_df # List of RSS feeds rss = ['https://www.economictimes.indiatimes.com/rssfeedstopstories.cms', 'https://www.thehindu.com/news/feeder/default.rss', # 'https://telanganatoday.com/feed', 'https://www.businesstoday.in/rssfeeds/?id=225346', 'https://feeds.feedburner.com/ndtvnews-latest', 'https://www.hindustantimes.com/feeds/rss/world-news/rssfeed.xml', 'https://www.indiatoday.in/rss/1206578', 'https://www.moneycontrol.com/rss/latestnews.xml', 'https://www.livemint.com/rss/news', 'https://www.zeebiz.com/latest.xml/feed', 'https://www.timesofindia.indiatimes.com/rssfeedmostrecent.cms'] def get_news_rss(url): # final_df = pd.DataFrame() # for i in rss: # # final_df = final_df.append(news_agg(i)) # final_df = pd.concat([final_df, news_agg(i)], axis=0) final_df = news_agg(url) final_df.reset_index(drop=True, inplace=True) final_df.sort_values(by="elapsed_time", inplace=True) # final_df['src_time'] = final_df['src'] + (" " * 5) + final_df["elapsed_time_str"] # final_df.drop(columns=['date', 'parsed_date', 'src', 'elapsed_time', 'elapsed_time_str'], inplace=True) final_df.drop(columns=['elapsed_time'], inplace=True) #### UNCOMMENT 1ST STATEMENT AND REMOVE 2ND STATEMENT IN CASE OF OOM ERROR IN RENDER # final_df.drop_duplicates(subset='description', inplace=True) final_df.drop_duplicates(subset='url', inplace=True) #### final_df = final_df.loc[(final_df["title"] != ""), :].copy() final_df.loc[(final_df['description'].isna()) | (final_df['description']=='')| (final_df['description']==' '), 'description'] = final_df.loc[(final_df['description'].isna()) | (final_df['description']=='')| (final_df['description']==' '), 'title'] return final_df def get_news_multi_process(urls): ''' Get the data shape by parallely calculating lenght of each chunk and aggregating them to get lenght of complete training dataset ''' pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()) results = [] for url in urls: f = pool.apply_async(get_news_rss, [url]) # asynchronously applying function to chunk. Each worker parallely begins to work on the job results.append(f) # appending result to results final_df = pd.DataFrame() for f in results: # print(f.get()) final_df = pd.concat([final_df, f.get(timeout=120)], axis=0) # getting output of each parallel job final_df.reset_index(drop=True, inplace=True) logging.warning(final_df['src'].unique()) pool.close() pool.join() return final_df def get_news(): return get_news_multi_process(rss)