"""FastAPI endpoint To run locally use 'uvicorn app:app --host localhost --port 7860' or `python -m uvicorn app:app --reload --host localhost --port 7860` """ import datetime as dt import json import logging import sys import spacy # sys.setrecursionlimit(20000) import pandas as pd import numpy as np import os import random from typing import Dict, List import uvicorn from fastapi import FastAPI, HTTPException, Request, Response from fastapi.responses import HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from rouge_score import rouge_scorer # Scripts import scripts.sentiment as sentiment import scripts.twitter_scraper as ts from scripts import sentiment from scripts.summarization import bert_summarization from scripts.twitter_scraper import get_latest_account_tweets from scripts.sentiment import twitter_sentiment_api_score from scripts import twitter_scraper as ts import scripts.utils as utils from scripts import translation from scripts import generative import nltk nltk.download('punkt') punkt_download_location = nltk.data.path[0] logging.info(f"punkt_download_location: {punkt_download_location}") logging.basicConfig(level=logging.INFO) pd.set_option('display.max_colwidth', 20) app = FastAPI() templates = Jinja2Templates(directory="templates") app.mount("/static", StaticFiles(directory="static"), name="static") # Construct absolute path to models folder models_path = os.path.abspath("models") username_list = [ "alikarimi_ak8", "elonmusk", "BarackObama", "taylorlorenz", "cathiedwood", "ylecun", ] ## Static objects/paths start_date = dt.date(year=2023, month=2, day=1) end_date = dt.date(year=2023, month=3, day=22) # Load spacy module on app start nlp = spacy.load("en_core_web_sm") nlp.add_pipe("sentencizer") @app.get("/", response_class=HTMLResponse) async def webpage(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.get("/accounts") async def get_accounts() -> List[dict]: import pandas as pd logging.info(f"Pulling account information on {username_list}") account_info_list = [ ts.get_twitter_account_info(twitter_handle=account) for account in username_list ] df_account = pd.DataFrame(account_info_list) df_account = df_account.style.bar( subset=["follower_count", "friends_count"], color="#d65f5f" ) df_account = df_account.format( {"follower_count": "{:,.0f}", "friends_count": "{:,.0f}"} ) html_table = df_account.to_html(classes="center", index=False) return HTMLResponse(content=html_table, status_code=200) @app.get("/tweets/{username}") def get_tweets_username(username: str) -> dict: # Method 2: Use Snscrape df_tweets = ts.get_tweets(handle=username) if isinstance(df_tweets, pd.DataFrame): df_tweets = df_tweets[["handle", "created_at","retweet_count","view_count","like_count", "full_text"]] df_tweets["created_at"] = df_tweets["created_at"].dt.strftime( "%Y-%m-%d %H:%M:%S" ) df_tweets = df_tweets.sort_values("created_at", ascending=False) # Additional processing logging.info("Running sentiment on tweets") sentiments = twitter_sentiment_api_score( df_tweets['full_text'].to_list(), use_api=False ) df_tweets["sentiment"] = [s['argmax'] for s in sentiments] # if username == "alikarimi_ak8": # p = translation.PersianTextProcessor() # df_tweets['full_text_translated'] = df_tweets["full_text"].apply(lambda c: p.translate_text(persian_text = c)) df_tweets_html = df_tweets.to_html(classes="center", index=False, escape=False) df_tweets.to_html(open("df_tweets_html.html", "w")) df_tweets_data = df_tweets.to_dict(orient="records") response_data = {"html": df_tweets_html, "data": df_tweets_data} return JSONResponse(content=response_data, status_code=200) else: print("Error: Failed to retrieve tweets.") return df_tweets @app.get("/audience/{username}", response_model=dict) async def get_audience(username: str) -> dict: if username in username_list: query = f"from:{username} since:{start_date} until:{end_date}" tweets = ts.get_tweets(query=query) n_samples = 5 # Random sample 3 tweets from user tweets_sampled = random.sample(tweets, n_samples) # Get all replies to sampled tweets tweet_threads = [] for tweet in tweets_sampled: threads = ts.get_replies( username=tweet["username"], conversation_id=tweet["conversation_id"], max_tweets=100, ) tweet_threads += threads # Get usernames from sample threads tweets usernames = [t["username"] for t in tweet_threads] # Get user info from sample replies to sampled tweets of user info_accounts = [ ts.get_twitter_account_info(twitter_handle=account) for account in usernames ] # "follower_count":1,"friends_count":20,"verified":false} # Get stats for followers/audience engaging with tweets follower_counts = [ info_accounts[i]["follower_count"] for i in range(len(info_accounts)) ] friends_counts = [ info_accounts[i]["friends_count"] for i in range(len(info_accounts)) ] verified_counts = [ 1 if info_accounts[i]["verified"] == True else 0 for i in range(len(info_accounts)) ] return { "sample_size": len(info_accounts), "mean_follower_count": round(np.mean(follower_counts), 3), "mean_friends_count": round(np.mean(friends_counts), 3), "mean_verified": round(np.mean(verified_counts), 3), } else: response = Response(content="Account not in scope of project.", status_code=404) return response @app.get("/sentiment/{username}") async def get_sentiment(username: str) -> Dict[str, Dict[str, float]]: if username not in username_list: raise HTTPException(status_code=404, detail="Account not in scope of project.") query = f"from:{username} since:{start_date} until:{end_date}" tweets = ts.get_tweets(query=query) n_samples = 5 tweets_sampled = random.sample(tweets, n_samples) tweet_threads = [] for tweet in tweets_sampled: threads = ts.get_replies( username=tweet["username"], conversation_id=tweet["conversation_id"], max_tweets=100, ) tweet_threads += threads print( f"Total replies to {n_samples} sampled tweets from username: {username}, {len(tweet_threads)}" ) ## Sentiment scoring print(f"Running tweet sentiment scoring on username: {username} tweets") tweets_scores = sentiment.get_tweets_sentiment(tweets=tweets) mean_tweets_score = round(np.mean(tweets_scores), 2) ci_tweets = utils.wilson_score_interval(tweets_scores) # Get sentiment of the threads from tweets # Get username tweets sentiment print(f"Running tweet thread sentiment scoring on username: {username} tweets") threads_scores = sentiment.get_tweets_sentiment(tweets=tweet_threads) mean_threads_score = round(np.mean(threads_scores), 2) ci_threads = utils.wilson_score_interval(threads_scores) return { "thread_level": { "mean": mean_threads_score, "confidence_interal": ci_threads, }, "audience_level": { "mean": mean_tweets_score, "confidence_interval": ci_tweets, }, } ## APIs: Primarily called by the index page @app.post("/api/generate") async def generate_text(request: Request): """Generate text from a prompt. Args: request: The HTTP request. Returns: The generated text. """ print("*" * 50) data = await request.json() print("*" * 50) logging.info("POST to api/generate received and processing") # Check length of input, if it is greater than 10 tokens, the text is sent off to a summarizer to generate: try: generated_text = generative.generate_account_text( prompt=data["text"], model_dir=os.path.join(models_path, data["account"]) ) logging.info("INFO: Successfully generate text from model.") except Exception as e: logging.error(f"Error generating text: {e}") return {"error": "Error generating text"} # return one example generated_text = generated_text[0]["generated_text"] ################################################### ## Clean up generate text # Get rid of final sentence sentences = nltk.sent_tokenize(generated_text) unique_sentences = set() non_duplicate_sentences = [] for sentence in sentences: if sentence not in unique_sentences: non_duplicate_sentences.append(sentence) unique_sentences.add(sentence) final_text = " ".join(non_duplicate_sentences[:-1]) return {"generated_text": final_text} @app.post("/api/generate_summary") async def generate_summary(request: Request): """Generate summary from tweets Args: request: The HTTP request. Returns: The generated text. """ print("*" * 50) data = await request.json() print("data", data["tweetsData"]) # Get the list of text tweets = [t["full_text"] for t in data["tweetsData"]] # Concatenate tweets into a single string text = " .".join(tweets) sentences = nlp(text).sents sentences = list(sentences) # Option 2 sampled_sentences = random.sample(sentences, int(0.1 * len(sentences))) sampled_sentences = [sentiment.tweet_cleaner(s.text) for s in sampled_sentences] # Join the strings into one text blob tweet_blob = " ".join(sampled_sentences) # Generate the summary summary = bert_summarization(tweet_blob) print("Summary:", summary) # Return the summary return {"tweets_summary": summary} ## Historical Tweets pages @app.get("/examples1") async def read_examples(): with open("templates/charts/handle_sentiment_breakdown.html") as f: html = f.read() return HTMLResponse(content=html) @app.get("/examples2") async def read_examples(): with open("templates/charts/handle_sentiment_timesteps.html") as f: html = f.read() return HTMLResponse(content=html) # uvicorn --workers=2 app:app if __name__ == "__main__": # uvicorn.run(app, host="0.0.0.0", port=8000) uvicorn.run("app:app", host="127.0.0.1", port=5050, reload=True)