File size: 6,134 Bytes
c2c5fc6 67a9e75 56ea47d c2c5fc6 a92fa68 c2c5fc6 a92fa68 c2c5fc6 67a9e75 c9e162f c2c5fc6 a92fa68 c2c5fc6 a92fa68 c2c5fc6 a92fa68 c2c5fc6 67a9e75 c2c5fc6 a92fa68 4db51e3 c2c5fc6 a92fa68 11a6cf5 a92fa68 c2c5fc6 a92fa68 c2c5fc6 a78212f c2c5fc6 a92fa68 c2c5fc6 a78212f 56ea47d a92fa68 c2c5fc6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
import pandas as pd
import numpy as np
from bs4 import BeautifulSoup
import requests as r
import regex as re
from dateutil import parser
import logging
import multiprocessing
from config import NEWS_EXTRACTOR_URL_TIMEOUT, RSS_FEEDS_TO_EXTRACT
from logger import get_logger
logger = get_logger()
def text_clean(desc):
"""
Cleans the text by removing special chars.
:param desc: string containing description
:return: str, cleaned description.
"""
try:
desc = desc.replace("<", "<")
desc = desc.replace(">", ">")
desc = re.sub("<.*?>", "", desc)
desc = desc.replace("#39;", "'")
desc = desc.replace('"', '"')
desc = desc.replace(' ', ' ')
desc = desc.replace('#32;', ' ')
except:
desc = ""
return desc
def rss_parser(i):
"""
Returns a data frame of parsed news item.
:param i: single news item in RSS feed.
:return: Data frame of parsed news item.
"""
try:
b1 = BeautifulSoup(str(i), "xml")
title = "" if b1.find("title") is None else b1.find("title").get_text()
title = text_clean(title)
url = "" if b1.find("link") is None else b1.find("link").get_text()
desc = "" if b1.find("description") is None else b1.find("description").get_text()
desc = text_clean(desc)
desc = f'{desc[:300]}...' if len(desc) >= 300 else desc
date = "Sat, 12 Aug 2000 13:39:15 +05:30" if ((b1.find("pubDate") == "") or (b1.find("pubDate") is None)) else b1.find("pubDate").get_text()
if url.find("businesstoday.in") >= 0:
date = date.replace("GMT", "+0530")
date1 = parser.parse(date)
except Exception as e:
logger.warning(f'Skipping item {i} due to an error {e}')
return None
return pd.DataFrame({"title": title,
"url": url,
"description": desc,
"parsed_date": date1}, index=[0])
def src_parse(rss):
"""
Returns the root domain name (eg. livemint.com is extracted from www.livemint.com
:param rss: RSS URL
:return: str, string containing the source name
"""
if rss.find('ndtvprofit') >= 0:
rss = 'ndtv profit'
if rss.find('ndtv') >= 0:
rss = 'ndtv.com'
if rss.find('telanganatoday') >= 0:
rss = 'telanganatoday.com'
rss = rss.replace("https://www.", "")
rss = rss.split("/")
return rss[0]
def news_agg(rss):
"""
Returns feeds from each 'rss' URL.
:param rss: RSS URL.
:return: Data frame of processed articles.
"""
try:
rss_df = pd.DataFrame()
headers = {
'authority': 'www.google.com',
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'en-US,en;q=0.9',
'cache-control': 'max-age=0',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36'
}
resp = r.get(rss, timeout=NEWS_EXTRACTOR_URL_TIMEOUT, headers=headers)
logger.warning(f'{rss}: {resp.status_code}')
b = BeautifulSoup(resp.content, "xml")
items = b.find_all("item")
for i in items:
parsed_item = rss_parser(i)
if parsed_item is not None:
rss_df = pd.concat([rss_df, parsed_item], axis=0)
rss_df.reset_index(drop=True, inplace=True)
rss_df["description"] = rss_df["description"].replace([" NULL", ''], np.nan)
rss_df["src"] = src_parse(rss)
rss_df["parsed_date"] = rss_df["parsed_date"].astype("str")
if len(rss_df) == 0:
rss_df = None
except Exception as e:
logger.warning(f'Skipping {rss} feed extraction due to an error {e}')
return None
return rss_df
# List of RSS feeds
rss = RSS_FEEDS_TO_EXTRACT
def get_news_rss(url):
'''
Function that is used in multiprocessing
'''
try:
final_df = news_agg(url)
if final_df is not None:
final_df.reset_index(drop=True, inplace=True)
final_df.drop_duplicates(subset='url', inplace=True)
final_df = final_df.loc[(final_df["title"] != ""), :].copy()
final_df.loc[(final_df['description'].isna()) | (final_df['description']=='')| (final_df['description']==' '), 'description'] = final_df.loc[(final_df['description'].isna()) | (final_df['description']=='')| (final_df['description']==' '), 'title']
if len(final_df) == 0:
final_df = None
except Exception as e:
logger.warning(f'Skipping {url} feed processing due to an error {e}')
return None
return final_df
def get_news_multi_process(urls):
logger.warning('Entering get_news_multi_process() to extract new news articles')
'''
Get the data shape by parallely calculating lenght of each chunk and
aggregating them to get lenght of complete training dataset
'''
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
results = []
for url in urls:
f = pool.apply_async(get_news_rss, [url]) # asynchronously applying function to chunk. Each worker parallely begins to work on the job
results.append(f) # appending result to results
final_df = pd.DataFrame()
for f in results:
rss_df = f.get(timeout=120)
if rss_df is not None:
final_df = pd.concat([final_df, rss_df], axis=0) # getting output of each parallel job
final_df.reset_index(drop=True, inplace=True)
pool.close()
pool.join()
logger.warning(f'Extracted {len(final_df)} new news articles.')
logger.warning('Exiting get_news_multi_process()')
if len(final_df) == 0:
final_df = None
return final_df
def get_news():
return get_news_multi_process(rss)
|