latest_news_backend_with_cat_pred_similar_news / news_category_similar_news_prediction.py
lalithadevi's picture
Update news_category_similar_news_prediction.py
2c0a5dc verified
import pandas as pd
import numpy as np
import tensorflow as tf
from config import (DISTILBERT_TOKENIZER_N_TOKENS,
NEWS_CATEGORY_CLASSIFIER_N_CLASSES,
CLASSIFIER_THRESHOLD, NEWS_RETENTION_SECONDS)
from logger import get_logger
from find_similar_news import find_similar_news
logger = get_logger()
##########################
from dateutil import parser
def correct_date(x):
if (not isinstance(x, str)) or (str(x).find(":") == -1):
logger.warning(f'correct_date() error: {x} is not the right date format')
return "2020-11-07 00:36:44+05:30"
return x
def date_time_parser(dt):
"""
Computes the minutes elapsed since published time.
:param dt: date
:return: int, minutes elapsed.
"""
try:
return int(np.round((dt.now(dt.tz) - dt).total_seconds() / 60, 0))
except:
logger.warning(f'date_time_parser() error: {dt} is not the right date format')
return 100000
def delete_outdated_news(final_df: pd.DataFrame):
logger.warning("Entering delete_outdated_news()")
final_df = final_df.copy()
final_df["parsed_date_1"] = [correct_date(date_) for date_ in final_df['parsed_date']]
final_df["parsed_date_1"] = [parser.parse(date_) for date_ in final_df['parsed_date_1']]
final_df["elapsed_time"] = [date_time_parser(date_) for date_ in final_df['parsed_date_1']]
final_df = final_df.loc[final_df["elapsed_time"] <= NEWS_RETENTION_SECONDS, :].copy()
final_df.drop(columns=['elapsed_time', 'parsed_date_1'], inplace=True)
final_df.reset_index(drop=True, inplace=True)
logger.warning("Exiting delete_outdated_news()")
return final_df
############################################
def parse_prediction(tflite_pred, label_encoder):
tflite_pred_argmax = np.argmax(tflite_pred, axis=1)
tflite_pred_label = label_encoder.inverse_transform(tflite_pred_argmax)
tflite_pred_prob = np.max(tflite_pred, axis=1)
return tflite_pred_label, tflite_pred_prob
def inference(text, interpreter, label_encoder, tokenizer):
logger.warning('Entering inference()')
batch_size = len(text)
logger.warning(f'Samples to predict: {batch_size}')
if text != "":
tokens = tokenizer(text, max_length=DISTILBERT_TOKENIZER_N_TOKENS, padding="max_length", truncation=True, return_tensors="tf")
# tflite model inference
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()[0]
attention_mask, input_ids = tokens['attention_mask'], tokens['input_ids']
interpreter.resize_tensor_input(input_details[0]['index'],[batch_size, DISTILBERT_TOKENIZER_N_TOKENS])
interpreter.resize_tensor_input(input_details[1]['index'],[batch_size, DISTILBERT_TOKENIZER_N_TOKENS])
interpreter.resize_tensor_input(output_details['index'],[batch_size, NEWS_CATEGORY_CLASSIFIER_N_CLASSES])
interpreter.allocate_tensors()
interpreter.set_tensor(input_details[0]["index"], attention_mask)
interpreter.set_tensor(input_details[1]["index"], input_ids)
interpreter.invoke()
tflite_pred = interpreter.get_tensor(output_details["index"])
tflite_pred = parse_prediction(tflite_pred, label_encoder)
logger.warning('Exiting inference()')
return tflite_pred
def cols_check(new_cols, old_cols):
return all([new_col==old_col for new_col, old_col in zip(new_cols, old_cols)])
def process_prediction_df(df, df_type: str="production"):
logger.warning(f"Entering process_prediction_df(): {df_type}")
df = df.copy()
df.drop_duplicates(subset='url', keep='first', inplace=True)
df.reset_index(drop=True, inplace=True)
df.loc[df['pred_proba']<CLASSIFIER_THRESHOLD, 'category'] = 'NATION'
df.loc[(df['title'].str.contains('Pakistan')) & (df['category'] == 'NATION'), 'category'] = 'WORLD'
logger.warning('Updated category of articles having Pakistan in title and category=NATION to WORLD')
df.loc[(df['title'].str.contains('Zodiac Sign', case=False)) | (df['title'].str.contains('Horoscope', case=False)), 'category'] = 'ASTROLOGY'
logger.warning('Updated category of articles having Zodiac Sign in title to ASTROLOGY')
logger.warning(f"Exiting process_prediction_df(): {df_type}")
return df
def add_flags_to_prediction_df(df):
df = df.copy()
headlines = [*df['title'].fillna("").str.strip()]
descriptions = [*df['description'].fillna("").str.strip()]
headlines_desc = [h if (h == d) else f"{h}. {d}" for h, d in zip(headlines, descriptions)]
df['text'] = headlines_desc
df['used_for_training'] = 0
df['allocated'] = 0
df.rename(columns={'category': 'y_pred', 'parsed_date': 'date_time', 'pred_proba': 'y_pred_proba'}, inplace=True)
df.drop(columns=['similar_news', 'src', 'title', 'description'], inplace=True)
df['y_true_proba'] = df['y_pred_proba']
df['y_true'] = df['y_pred']
return df
def predict_news_category_similar_news(old_news: pd.DataFrame, new_news: pd.DataFrame, interpreter, label_encoder, tokenizer,
collection, vectorizer, sent_model, ce_model):
try:
db_updation_required = 1
logger.warning('Entering predict_news_category()')
logger.warning(f'old news: {old_news}\nnew_news: {new_news}')
if not isinstance(new_news, pd.DataFrame):
raise Exception('No New News Found')
else:
# new_news = new_news.copy()
logger.warning(f'{len(new_news)} new news items found before deleting outdated news')
new_news = delete_outdated_news(new_news)
logger.warning(f'{len(new_news)} new news items found after deleting outdated news')
logger.warning(f'new news columns: {[*new_news.columns]}')
if len(new_news) <= 1:
new_news = None
if not isinstance(new_news, pd.DataFrame):
raise Exception('No New News Found after deleting outdated news')
if isinstance(old_news, pd.DataFrame):
# old_news = old_news.copy()
logger.warning(f'{len(old_news)} old news items found before deleting outdated news')
old_news = delete_outdated_news(old_news)
logger.warning(f'{len(old_news)} old news items found after deleting outdated news')
logger.warning(f'old news columns: {[*old_news.columns]}')
old_news.drop(columns='_id', inplace=True)
logger.warning('Dropped _id column from old news data frame.')
if len(old_news) <= 1:
old_news = None
else:
logger.warning('No old news is found')
old_news = new_news.copy()
if not isinstance(old_news, pd.DataFrame):
logger.warning('No old news is found after deleting outdate news')
old_news = new_news.copy()
if 'category' not in [*old_news.columns]:
logger.warning('No prior predictions found in old news')
if not cols_check([*new_news.columns], [*old_news.columns]):
raise Exception("New and old cols don't match")
final_df = pd.concat([old_news, new_news], axis=0, ignore_index=True)
final_df.drop_duplicates(subset='url', keep='first', inplace=True)
headlines = [*final_df['title'].fillna("").str.strip()]
descriptions = [*final_df['description'].fillna("").str.strip()]
# label, prob = inference(headlines, interpreter, label_encoder, tokenizer)
headlines_desc = [h if (h == d) else f"{h}. {d}" for h, d in zip(headlines, descriptions)]
label, prob = inference(headlines_desc, interpreter, label_encoder, tokenizer)
sent_embs = vectorizer.vectorize_(headlines, sent_model)
sim_news = [find_similar_news(text, search_vec, collection, vectorizer, sent_model, ce_model) for search_vec, text in zip(sent_embs, headlines)]
final_df['category'] = label
final_df['pred_proba'] = prob
final_df['similar_news'] = sim_news
final_df = process_prediction_df(final_df, df_type="production & prediction")
prediction_df = final_df.copy()
prediction_df = add_flags_to_prediction_df(prediction_df)
else:
logger.warning('Prior predictions found in old news')
if not cols_check([*new_news.columns], [*old_news.columns][:-3]):
raise Exception("New and old cols don't match")
old_urls = [*old_news['url']]
new_news = new_news.loc[new_news['url'].isin(old_urls) == False, :]
if len(new_news) > 0:
headlines = [*new_news['title'].fillna("").str.strip()]
descriptions = [*new_news['description'].fillna("").str.strip()]
# label, prob = inference(headlines, interpreter, label_encoder, tokenizer)
headlines_desc = [h if (h == d) else f"{h}. {d}" for h, d in zip(headlines, descriptions)]
label, prob = inference(headlines_desc, interpreter, label_encoder, tokenizer)
sent_embs = vectorizer.vectorize_(headlines, sent_model)
sim_news = [find_similar_news(text, search_vec, collection, vectorizer, sent_model, ce_model) for search_vec, text in zip(sent_embs, headlines)]
new_news['category'] = label
new_news['pred_proba'] = prob
new_news['similar_news'] = sim_news
final_df = pd.concat([old_news, new_news], axis=0, ignore_index=True)
final_df = process_prediction_df(final_df, df_type="production")
prediction_df = new_news.copy()
prediction_df.loc[(prediction_df['title'].str.contains('Pakistan')) & (prediction_df['category'] == 'NATION'), 'category'] = 'WORLD'
logger.warning('Updated category of articles having Pakistan in title and category=NATION to WORLD in prediction_df')
prediction_df.loc[(prediction_df['title'].str.contains('Zodiac Sign', case=False)) | (prediction_df['title'].str.contains('Horoscope', case=False)), 'category'] = 'ASTROLOGY'
logger.warning('Updated category of articles having Zodiac Sign in title to ASTROLOGY in prediction_df')
prediction_df = add_flags_to_prediction_df(prediction_df)
else:
logger.warning('INFO: Old & New Articles are the same. There is no requirement of updating them in the database. Database is not updated.')
db_updation_required = 0
final_df = old_news.copy()
prediction_df = final_df.copy()
if len(final_df) == 0:
final_df = None
logger.warning('Exiting predict_news_category()')
except Exception as e:
logger.warning(f'Unexcpected error in predict_news_category()\n{e}')
final_df = None
prediction_df = final_df.copy()
db_updation_required = 0
return final_df, prediction_df, db_updation_required