|
import pandas as pd |
|
import numpy as np |
|
import tensorflow as tf |
|
from config import CLASSIFIER_THRESHOLD, NEWS_RETENTION_SECONDS |
|
|
|
from logger import get_logger |
|
from find_similar_news import find_similar_news |
|
|
|
logger = get_logger() |
|
|
|
|
|
|
|
from dateutil import parser |
|
def correct_date(x): |
|
if (not isinstance(x, str)) or (str(x).find(":") == -1): |
|
logger.warning(f'correct_date() error: {x} is not the right date format') |
|
return "2020-11-07 00:36:44+05:30" |
|
return x |
|
|
|
def date_time_parser(dt): |
|
""" |
|
Computes the minutes elapsed since published time. |
|
:param dt: date |
|
:return: int, minutes elapsed. |
|
""" |
|
try: |
|
return int(np.round((dt.now(dt.tz) - dt).total_seconds() / 60, 0)) |
|
except: |
|
logger.warning(f'date_time_parser() error: {dt} is not the right date format') |
|
return 100000 |
|
|
|
def delete_outdated_news(final_df: pd.DataFrame): |
|
logger.warning("Entering delete_outdated_news()") |
|
final_df = final_df.copy() |
|
final_df["parsed_date_1"] = [correct_date(date_) for date_ in final_df['parsed_date']] |
|
final_df["parsed_date_1"] = [parser.parse(date_) for date_ in final_df['parsed_date_1']] |
|
final_df["elapsed_time"] = [date_time_parser(date_) for date_ in final_df['parsed_date_1']] |
|
final_df = final_df.loc[final_df["elapsed_time"] <= NEWS_RETENTION_SECONDS, :].copy() |
|
final_df.drop(columns=['elapsed_time', 'parsed_date_1'], inplace=True) |
|
final_df.reset_index(drop=True, inplace=True) |
|
logger.warning("Exiting delete_outdated_news()") |
|
return final_df |
|
|
|
|
|
def find_path(url): |
|
if url == '': |
|
return '' |
|
url = url.replace("-/-", "-") |
|
url_split = url.replace("https://", "") |
|
url_split = url_split.replace("www.", "") |
|
url_split = url_split.strip() |
|
url = url.replace("//", "/") |
|
url = url.replace("https/timesofindia-indiatimes-com", "") |
|
url_split = url_split.split("/") |
|
url_split = [u for u in url_split if (u != "") and |
|
(u != "articleshow") and |
|
(u.find(".cms")==-1) and |
|
(u.find(".ece")==-1) and |
|
(u.find(".htm")==-1) and |
|
(len(u.split('-')) <= 5) and |
|
(u.find(" ") == -1) |
|
] |
|
if len(url_split) > 2: |
|
url_split = "/".join(url_split[1:]) |
|
else: |
|
if len(url_split) > 0: |
|
url_split = url_split[-1] |
|
else: |
|
url_split = '-' |
|
return url_split |
|
|
|
|
|
|
|
|
|
|
|
def parse_prediction(tflite_pred, label_encoder): |
|
tflite_pred_argmax = np.argmax(tflite_pred, axis=1) |
|
tflite_pred_label = label_encoder.inverse_transform(tflite_pred_argmax) |
|
tflite_pred_prob = np.max(tflite_pred, axis=1) |
|
return tflite_pred_label, tflite_pred_prob |
|
|
|
|
|
def inference(text, calibrated_model, label_encoder): |
|
logger.warning('Entering inference()') |
|
|
|
logger.warning(f'Samples to predict: {len(text)}') |
|
if text != "": |
|
tflite_pred = calibrated_model.predict_proba(text) |
|
tflite_pred = parse_prediction(tflite_pred, label_encoder) |
|
logger.warning('Exiting inference()') |
|
return tflite_pred |
|
|
|
def cols_check(new_cols, old_cols): |
|
return all([new_col==old_col for new_col, old_col in zip(new_cols, old_cols)]) |
|
|
|
def process_prediction_df(df, df_type: str="production"): |
|
logger.warning(f"Entering process_prediction_df(): {df_type}") |
|
df = df.copy() |
|
df.drop_duplicates(subset='url', keep='first', inplace=True) |
|
df.reset_index(drop=True, inplace=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.warning(f"Exiting process_prediction_df(): {df_type}") |
|
return df |
|
|
|
def add_flags_to_prediction_df(df): |
|
df = df.copy() |
|
headlines = [*df['title'].fillna("").str.strip()] |
|
descriptions = [*df['description'].fillna("").str.strip()] |
|
headlines_desc = [h if (h == d) else f"{h}. {d}" for h, d in zip(headlines, descriptions)] |
|
df['text'] = headlines_desc |
|
df['used_for_training'] = 0 |
|
df['allocated'] = 0 |
|
df.rename(columns={'category': 'y_pred', 'parsed_date': 'date_time', 'pred_proba': 'y_pred_proba'}, inplace=True) |
|
|
|
df.drop(columns=['similar_news', 'src', 'title', 'description'], inplace=True) |
|
df['y_true_proba'] = df['y_pred_proba'] |
|
df['y_true'] = df['y_pred'] |
|
df['updated_prediction'] = 1 |
|
return df |
|
|
|
|
|
def predict_news_category_similar_news(old_news: pd.DataFrame, new_news: pd.DataFrame, calibrated_model, label_encoder, |
|
collection, vectorizer, sent_model, ce_model): |
|
try: |
|
db_updation_required = 1 |
|
logger.warning('Entering predict_news_category()') |
|
logger.warning(f'old news: {old_news}\nnew_news: {new_news}') |
|
if not isinstance(new_news, pd.DataFrame): |
|
raise Exception('No New News Found') |
|
else: |
|
|
|
logger.warning(f'{len(new_news)} new news items found before deleting outdated news') |
|
new_news = delete_outdated_news(new_news) |
|
logger.warning(f'{len(new_news)} new news items found after deleting outdated news') |
|
logger.warning(f'new news columns: {[*new_news.columns]}') |
|
if len(new_news) <= 1: |
|
new_news = None |
|
|
|
if not isinstance(new_news, pd.DataFrame): |
|
raise Exception('No New News Found after deleting outdated news') |
|
|
|
|
|
if isinstance(old_news, pd.DataFrame): |
|
|
|
logger.warning(f'{len(old_news)} old news items found before deleting outdated news') |
|
old_news = delete_outdated_news(old_news) |
|
logger.warning(f'{len(old_news)} old news items found after deleting outdated news') |
|
logger.warning(f'old news columns: {[*old_news.columns]}') |
|
|
|
old_news.drop(columns='_id', inplace=True) |
|
logger.warning('Dropped _id column from old news data frame.') |
|
if len(old_news) <= 1: |
|
old_news = None |
|
else: |
|
logger.warning('No old news is found') |
|
old_news = new_news.copy() |
|
|
|
if not isinstance(old_news, pd.DataFrame): |
|
logger.warning('No old news is found after deleting outdate news') |
|
old_news = new_news.copy() |
|
|
|
if 'category' not in [*old_news.columns]: |
|
logger.warning('No prior predictions found in old news') |
|
if not cols_check([*new_news.columns], [*old_news.columns]): |
|
raise Exception("New and old cols don't match") |
|
final_df = pd.concat([old_news, new_news], axis=0, ignore_index=True) |
|
final_df.drop_duplicates(subset='url', keep='first', inplace=True) |
|
|
|
|
|
headlines = [*final_df['title'].fillna("").str.strip()] |
|
descriptions = [*final_df['description'].fillna("").str.strip()] |
|
paths = [*final_df['url'].map(find_path).fillna("").str.strip()] |
|
|
|
headlines_desc = [f"{p}: {h}" if (h == d) else f"{p}: {h}. {d}" for p, h, d in zip(paths, headlines, descriptions)] |
|
label, prob = inference(headlines_desc, calibrated_model, label_encoder) |
|
|
|
|
|
sent_embs = vectorizer.vectorize_(headlines, sent_model) |
|
sim_news = [find_similar_news(text, search_vec, collection, vectorizer, sent_model, ce_model) for search_vec, text in zip(sent_embs, headlines)] |
|
final_df['category'] = label |
|
final_df['pred_proba'] = prob |
|
final_df['similar_news'] = sim_news |
|
|
|
final_df = process_prediction_df(final_df, df_type="production & prediction") |
|
prediction_df = final_df.copy() |
|
prediction_df = add_flags_to_prediction_df(prediction_df) |
|
|
|
else: |
|
logger.warning('Prior predictions found in old news') |
|
if not cols_check([*new_news.columns], [*old_news.columns][:-3]): |
|
raise Exception("New and old cols don't match") |
|
old_urls = [*old_news['url']] |
|
new_news = new_news.loc[new_news['url'].isin(old_urls) == False, :] |
|
if len(new_news) > 0: |
|
|
|
|
|
headlines = [*new_news['title'].fillna("").str.strip()] |
|
descriptions = [*new_news['description'].fillna("").str.strip()] |
|
paths = [*new_news['url'].map(find_path).fillna("").str.strip()] |
|
|
|
headlines_desc = [f"{p}: {h}" if (h == d) else f"{p}: {h}. {d}" for p, h, d in zip(paths, headlines, descriptions)] |
|
label, prob = inference(headlines_desc, calibrated_model, label_encoder) |
|
|
|
|
|
sent_embs = vectorizer.vectorize_(headlines, sent_model) |
|
sim_news = [find_similar_news(text, search_vec, collection, vectorizer, sent_model, ce_model) for search_vec, text in zip(sent_embs, headlines)] |
|
new_news['category'] = label |
|
new_news['pred_proba'] = prob |
|
new_news['similar_news'] = sim_news |
|
final_df = pd.concat([old_news, new_news], axis=0, ignore_index=True) |
|
|
|
final_df = process_prediction_df(final_df, df_type="production") |
|
prediction_df = new_news.copy() |
|
|
|
|
|
|
|
|
|
prediction_df = add_flags_to_prediction_df(prediction_df) |
|
|
|
else: |
|
logger.warning('INFO: Old & New Articles are the same. There is no requirement of updating them in the database. Database is not updated.') |
|
db_updation_required = 0 |
|
final_df = old_news.copy() |
|
prediction_df = final_df.copy() |
|
|
|
|
|
if len(final_df) == 0: |
|
final_df = None |
|
|
|
logger.warning('Exiting predict_news_category()') |
|
except Exception as e: |
|
logger.warning(f'Unexcpected error in predict_news_category()\n{e}') |
|
final_df = None |
|
prediction_df = final_df.copy() |
|
db_updation_required = 0 |
|
return final_df, prediction_df, db_updation_required |
|
|