# This file wraps around Pinpoint to provide simple prediction functionality. import csv import time import uuid from pprint import pprint import Pinpoint.FeatureExtraction from Pinpoint.RandomForest import * class predictor(): def __init__(self): ''' Constructor ''' self.model = random_forest() self.model.PSYCHOLOGICAL_SIGNALS_ENABLED = False # Needs LIWC markup self.model.BEHAVIOURAL_FEATURES_ENABLED = False self.model.train_model(features_file=None, force_new_dataset=False, model_location=r"far-right-radical-language.model") self.dict_of_users_all = {} self.feature_extractor = Pinpoint.FeatureExtraction.feature_extraction( violent_words_dataset_location="swears", baseline_training_dataset_location="LIWC2015 Results (Storm_Front_Posts).csv") def predict(self, string_to_predict = None, username = "unknown"): ''' A wrapper function used to call pinpoint and predict if a given piece of text is extremist. :param string_to_predict: :param username: :return: boolean true/ false ''' if string_to_predict == None: raise Exception("No prediction material given...") extended_prediction_uuid = str(uuid.uuid1())+"-"+str(uuid.uuid1()) self.model.model_folder = "{}-output".format(extended_prediction_uuid) self.feature_extractor.MESSAGE_TMP_CACHE_LOCATION = "{}-message-cache".format(extended_prediction_uuid) print("Starting prediction for {}".format(extended_prediction_uuid)) if string_to_predict != None: users_posts = [{"username": "{}".format(username), "timestamp": "tmp", "message": "{}".format(string_to_predict)}] try: os.remove("./{}-messages.json".format(extended_prediction_uuid)) except: pass with open('{}-all-messages.csv'.format(extended_prediction_uuid), 'w', encoding='utf8', newline='') as output_file: writer = csv.DictWriter(output_file, fieldnames=["username", "timestamp", "message"]) for users_post in users_posts: writer.writerow(users_post) try: self.feature_extractor._get_standard_tweets("{}-all-messages.csv".format(extended_prediction_uuid)) except FileNotFoundError: return False with open("./{}-messages.json".format(extended_prediction_uuid), 'w') as outfile: features = self.feature_extractor.completed_tweet_user_features json.dump(features, outfile, indent=4) rows = self.model.get_features_as_df("./{}-messages.json".format(extended_prediction_uuid), True) rows.pop("is_extremist") try: features = rows.loc[0] is_extremist = self.model.model.predict([features]) except FileNotFoundError as e: is_extremist = False print("Message cache error, next - {}".format(e)) print("Ending prediction for {}".format(extended_prediction_uuid)) dir_name = "." test = os.listdir(dir_name) os.remove("{}-all-messages.csv".format(extended_prediction_uuid)) os.remove("{}-messages.json.csv".format(extended_prediction_uuid)) os.remove("{}-messages.json".format(extended_prediction_uuid)) if is_extremist == True: return True else: return False