|
import pandas as pd |
|
import numpy as np |
|
from bs4 import BeautifulSoup |
|
import requests as r |
|
import regex as re |
|
from dateutil import parser |
|
import logging |
|
import multiprocessing |
|
from config import NEWS_EXTRACTOR_URL_TIMEOUT, RSS_FEEDS_TO_EXTRACT |
|
from logger import get_logger |
|
|
|
logger = get_logger() |
|
|
|
|
|
def date_time_parser(dt): |
|
""" |
|
Computes the minutes elapsed since published time. |
|
:param dt: date |
|
:return: int, minutes elapsed. |
|
""" |
|
try: |
|
return int(np.round((dt.now(dt.tz) - dt).total_seconds() / 60, 0)) |
|
except: |
|
return 100000 |
|
|
|
def text_clean(desc): |
|
""" |
|
Cleans the text by removing special chars. |
|
:param desc: string containing description |
|
:return: str, cleaned description. |
|
""" |
|
try: |
|
desc = desc.replace("<", "<") |
|
desc = desc.replace(">", ">") |
|
desc = re.sub("<.*?>", "", desc) |
|
desc = desc.replace("#39;", "'") |
|
desc = desc.replace('"', '"') |
|
desc = desc.replace(' ', ' ') |
|
desc = desc.replace('#32;', ' ') |
|
except: |
|
desc = "" |
|
return desc |
|
|
|
|
|
def rss_parser(i): |
|
""" |
|
Returns a data frame of parsed news item. |
|
:param i: single news item in RSS feed. |
|
:return: Data frame of parsed news item. |
|
""" |
|
try: |
|
b1 = BeautifulSoup(str(i), "xml") |
|
title = "" if b1.find("title") is None else b1.find("title").get_text() |
|
title = text_clean(title) |
|
url = "" if b1.find("link") is None else b1.find("link").get_text() |
|
desc = "" if b1.find("description") is None else b1.find("description").get_text() |
|
desc = text_clean(desc) |
|
desc = f'{desc[:300]}...' if len(desc) >= 300 else desc |
|
date = "Sat, 12 Aug 2000 13:39:15 +05:30" if ((b1.find("pubDate") == "") or (b1.find("pubDate") is None)) else b1.find("pubDate").get_text() |
|
if url.find("businesstoday.in") >= 0: |
|
date = date.replace("GMT", "+0530") |
|
|
|
date1 = parser.parse(date) |
|
except Exception as e: |
|
logger.warning(f'Skipping item {i} due to an error {e}') |
|
return None |
|
return pd.DataFrame({"title": title, |
|
"url": url, |
|
"description": desc, |
|
"parsed_date": date1}, index=[0]) |
|
|
|
|
|
def src_parse(rss): |
|
""" |
|
Returns the root domain name (eg. livemint.com is extracted from www.livemint.com |
|
:param rss: RSS URL |
|
:return: str, string containing the source name |
|
""" |
|
if rss.find('ndtvprofit') >= 0: |
|
rss = 'ndtv profit' |
|
if rss.find('ndtv') >= 0: |
|
rss = 'ndtv.com' |
|
if rss.find('telanganatoday') >= 0: |
|
rss = 'telanganatoday.com' |
|
|
|
rss = rss.replace("https://www.", "") |
|
rss = rss.split("/") |
|
return rss[0] |
|
|
|
|
|
def news_agg(rss): |
|
""" |
|
Returns feeds from each 'rss' URL. |
|
:param rss: RSS URL. |
|
:return: Data frame of processed articles. |
|
""" |
|
try: |
|
rss_df = pd.DataFrame() |
|
headers = { |
|
'authority': 'www.google.com', |
|
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', |
|
'accept-language': 'en-US,en;q=0.9', |
|
'cache-control': 'max-age=0', |
|
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36' |
|
} |
|
|
|
|
|
resp = r.get(rss, timeout=NEWS_EXTRACTOR_URL_TIMEOUT, headers=headers) |
|
logger.warning(f'{rss}: {resp.status_code}') |
|
b = BeautifulSoup(resp.content, "xml") |
|
items = b.find_all("item") |
|
for i in items: |
|
parsed_item = rss_parser(i) |
|
if parsed_item is not None: |
|
rss_df = pd.concat([rss_df, parsed_item], axis=0) |
|
rss_df.reset_index(drop=True, inplace=True) |
|
rss_df["description"] = rss_df["description"].replace([" NULL", ''], np.nan) |
|
|
|
rss_df["src"] = src_parse(rss) |
|
rss_df["elapsed_time"] = rss_df["parsed_date"].apply(date_time_parser) |
|
rss_df["parsed_date"] = rss_df["parsed_date"].astype("str") |
|
|
|
if len(rss_df) == 0: |
|
rss_df = None |
|
except Exception as e: |
|
logger.warning(f'Skipping {rss} feed extraction due to an error {e}') |
|
return None |
|
return rss_df |
|
|
|
|
|
|
|
rss = RSS_FEEDS_TO_EXTRACT |
|
|
|
|
|
def get_news_rss(url): |
|
''' |
|
Function that is used in multiprocessing |
|
''' |
|
try: |
|
final_df = news_agg(url) |
|
if final_df is not None: |
|
final_df.reset_index(drop=True, inplace=True) |
|
|
|
final_df.sort_values(by="elapsed_time", inplace=True) |
|
final_df.drop(columns=['elapsed_time'], inplace=True) |
|
|
|
final_df.drop_duplicates(subset='url', inplace=True) |
|
|
|
final_df = final_df.loc[(final_df["title"] != ""), :].copy() |
|
|
|
final_df.loc[(final_df['description'].isna()) | (final_df['description']=='')| (final_df['description']==' '), 'description'] = final_df.loc[(final_df['description'].isna()) | (final_df['description']=='')| (final_df['description']==' '), 'title'] |
|
|
|
if len(final_df) == 0: |
|
final_df = None |
|
|
|
except Exception as e: |
|
logger.warning(f'Skipping {url} feed processing due to an error {e}') |
|
return None |
|
return final_df |
|
|
|
|
|
def get_news_multi_process(urls): |
|
logger.warning('Entering get_news_multi_process() to extract new news articles') |
|
''' |
|
Get the data shape by parallely calculating lenght of each chunk and |
|
aggregating them to get lenght of complete training dataset |
|
''' |
|
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()) |
|
|
|
results = [] |
|
for url in urls: |
|
f = pool.apply_async(get_news_rss, [url]) |
|
results.append(f) |
|
|
|
final_df = pd.DataFrame() |
|
for f in results: |
|
rss_df = f.get(timeout=120) |
|
if rss_df is not None: |
|
final_df = pd.concat([final_df, rss_df], axis=0) |
|
|
|
final_df.reset_index(drop=True, inplace=True) |
|
pool.close() |
|
pool.join() |
|
logger.warning(f'Extracted {len(final_df)} new news articles.') |
|
logger.warning('Exiting get_news_multi_process()') |
|
|
|
if len(final_df) == 0: |
|
final_df = None |
|
|
|
return final_df |
|
|
|
|
|
def get_news(): |
|
return get_news_multi_process(rss) |
|
|