import pandas as pd import numpy as np import tensorflow as tf from config import (DISTILBERT_TOKENIZER_N_TOKENS, NEWS_CATEGORY_CLASSIFIER_N_CLASSES, CLASSIFIER_THRESHOLD, NEWS_RETENTION_SECONDS) from logger import get_logger from find_similar_news import find_similar_news logger = get_logger() ########################## from dateutil import parser def correct_date(x): if (not isinstance(x, str)) or (str(x).find(":") == -1): logger.warning(f'correct_date() error: {x} is not the right date format') return "2020-11-07 00:36:44+05:30" return x def date_time_parser(dt): """ Computes the minutes elapsed since published time. :param dt: date :return: int, minutes elapsed. """ try: return int(np.round((dt.now(dt.tz) - dt).total_seconds() / 60, 0)) except: logger.warning(f'date_time_parser() error: {dt} is not the right date format') return 100000 def delete_outdated_news(final_df: pd.DataFrame): logger.warning("Entering delete_outdated_news()") final_df = final_df.copy() final_df["parsed_date_1"] = [correct_date(date_) for date_ in final_df['parsed_date']] final_df["parsed_date_1"] = [parser.parse(date_) for date_ in final_df['parsed_date_1']] final_df["elapsed_time"] = [date_time_parser(date_) for date_ in final_df['parsed_date_1']] final_df = final_df.loc[final_df["elapsed_time"] <= NEWS_RETENTION_SECONDS, :].copy() final_df.drop(columns=['elapsed_time', 'parsed_date_1'], inplace=True) final_df.reset_index(drop=True, inplace=True) logger.warning("Exiting delete_outdated_news()") return final_df ############################################ def parse_prediction(tflite_pred, label_encoder): tflite_pred_argmax = np.argmax(tflite_pred, axis=1) tflite_pred_label = label_encoder.inverse_transform(tflite_pred_argmax) tflite_pred_prob = np.max(tflite_pred, axis=1) return tflite_pred_label, tflite_pred_prob def inference(text, interpreter, label_encoder, tokenizer): logger.warning('Entering inference()') batch_size = len(text) logger.warning(f'Samples to predict: {batch_size}') if text != "": tokens = tokenizer(text, max_length=DISTILBERT_TOKENIZER_N_TOKENS, padding="max_length", truncation=True, return_tensors="tf") # tflite model inference interpreter.allocate_tensors() input_details = interpreter.get_input_details() output_details = interpreter.get_output_details()[0] attention_mask, input_ids = tokens['attention_mask'], tokens['input_ids'] interpreter.resize_tensor_input(input_details[0]['index'],[batch_size, DISTILBERT_TOKENIZER_N_TOKENS]) interpreter.resize_tensor_input(input_details[1]['index'],[batch_size, DISTILBERT_TOKENIZER_N_TOKENS]) interpreter.resize_tensor_input(output_details['index'],[batch_size, NEWS_CATEGORY_CLASSIFIER_N_CLASSES]) interpreter.allocate_tensors() interpreter.set_tensor(input_details[0]["index"], attention_mask) interpreter.set_tensor(input_details[1]["index"], input_ids) interpreter.invoke() tflite_pred = interpreter.get_tensor(output_details["index"]) tflite_pred = parse_prediction(tflite_pred, label_encoder) logger.warning('Exiting inference()') return tflite_pred def cols_check(new_cols, old_cols): return all([new_col==old_col for new_col, old_col in zip(new_cols, old_cols)]) def process_prediction_df(df, df_type: str="production"): logger.warning(f"Entering process_prediction_df(): {df_type}") df = df.copy() df.drop_duplicates(subset='url', keep='first', inplace=True) df.reset_index(drop=True, inplace=True) df.loc[df['pred_proba'] 0: headlines = [*new_news['title'].fillna("").str.strip()] descriptions = [*new_news['description'].fillna("").str.strip()] # label, prob = inference(headlines, interpreter, label_encoder, tokenizer) headlines_desc = [h if (h == d) else f"{h}. {d}" for h, d in zip(headlines, descriptions)] label, prob = inference(headlines_desc, interpreter, label_encoder, tokenizer) sent_embs = vectorizer.vectorize_(headlines, sent_model) sim_news = [find_similar_news(text, search_vec, collection, vectorizer, sent_model, ce_model) for search_vec, text in zip(sent_embs, headlines)] new_news['category'] = label new_news['pred_proba'] = prob new_news['similar_news'] = sim_news final_df = pd.concat([old_news, new_news], axis=0, ignore_index=True) final_df = process_prediction_df(final_df, df_type="production") prediction_df = new_news.copy() prediction_df.loc[(prediction_df['title'].str.contains('Pakistan')) & (prediction_df['category'] == 'NATION'), 'category'] = 'WORLD' logger.warning('Updated category of articles having Pakistan in title and category=NATION to WORLD in prediction_df') prediction_df.loc[(prediction_df['title'].str.contains('Zodiac Sign', case=False)) | (prediction_df['title'].str.contains('Horoscope', case=False)), 'category'] = 'ASTROLOGY' logger.warning('Updated category of articles having Zodiac Sign in title to ASTROLOGY in prediction_df') prediction_df = add_flags_to_prediction_df(prediction_df) else: logger.warning('INFO: Old & New Articles are the same. There is no requirement of updating them in the database. Database is not updated.') db_updation_required = 0 final_df = old_news.copy() prediction_df = final_df.copy() if len(final_df) == 0: final_df = None logger.warning('Exiting predict_news_category()') except Exception as e: logger.warning(f'Unexcpected error in predict_news_category()\n{e}') final_df = None prediction_df = final_df.copy() db_updation_required = 0 return final_df, prediction_df, db_updation_required