"""FastAPI endpoint To run locally use 'uvicorn app:app --host localhost --port 7860' or `python -m uvicorn app:app --reload --host localhost --port 7860` """ import datetime as dt import json import logging import sys #sys.setrecursionlimit(20000) import pandas as pd import numpy as np import os import random from typing import Dict, List import uvicorn from fastapi import FastAPI, HTTPException, Request, Response from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates import scripts.sentiment as sentiment import scripts.twitter_scraper as ts from scripts.summarization import bert_summarization from scripts.twitter_scraper import get_latest_account_tweets from scripts import twitter_scraper as ts import scripts.utils as utils from scripts import generative import nltk logging.basicConfig(level=logging.INFO) app = FastAPI() templates = Jinja2Templates(directory="templates") app.mount("/static", StaticFiles(directory="static"), name="static") # Construct absolute path to models folder models_path = os.path.abspath("models") username_list = [ "alikarimi_ak8", "elonmusk", "BarackObama", "taylorlorenz", "cathiedwood", "ylecun", ] ## Static objects/paths start_date = dt.date(year=2023, month=2, day=1) end_date = dt.date(year=2023, month=3, day=22) @app.get("/", response_class=HTMLResponse) async def webpage(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.get("/accounts") def get_accounts() -> List[dict]: import pandas as pd logging.info(f"Pulling account information on {username_list}") account_info_list = [ ts.get_twitter_account_info(twitter_handle=account) for account in username_list ] df_account = pd.DataFrame(account_info_list) df_account = df_account.style.bar( subset=["follower_count", "friends_count"], color="#d65f5f" ) df_account = df_account.format( {"follower_count": "{:,.0f}", "friends_count": "{:,.0f}"} ) html_table = df_account.to_html(classes="center", index=False) return HTMLResponse(content=html_table, status_code=200) @app.get("/tweets/{username}") def get_tweets_username(username: str) -> dict: # if username in username_list: # query = f"from:{username} since:{start_date} until:{end_date}" # return ts.get_tweets(query=query) # else: # return {"detail": "Account not in scope of project."} # Method 1: Using Tweepy method # df_tweets = get_latest_account_tweets(username) # Method 2: Use Snscrape df_tweets = ts.get_tweets(handle=username) if isinstance(df_tweets, pd.DataFrame): print(df_tweets.head(2)) print(df_tweets.shape) df_tweets = df_tweets[["handle", "created_at", "full_text"]] df_tweets = df_tweets.sort_values("created_at", ascending=True).tail(10) df_tweets_html = df_tweets.to_html(classes="center", index=False) return HTMLResponse(content=df_tweets_html, status_code=200) else: print("Error: Failed to retrieve tweets.") return df_tweets @app.get("/audience/{username}", response_model=dict) def get_audience(username: str) -> dict: if username in username_list: query = f"from:{username} since:{start_date} until:{end_date}" tweets = ts.get_tweets(query=query) n_samples = 5 # Random sample 3 tweets from user tweets_sampled = random.sample(tweets, n_samples) # Get all replies to sampled tweets tweet_threads = [] for tweet in tweets_sampled: threads = ts.get_replies( username=tweet["username"], conversation_id=tweet["conversation_id"], max_tweets=100, ) tweet_threads += threads # Get usernames from sample threads tweets usernames = [t["username"] for t in tweet_threads] # Get user info from sample replies to sampled tweets of user info_accounts = [ ts.get_twitter_account_info(twitter_handle=account) for account in usernames ] # "follower_count":1,"friends_count":20,"verified":false} # Get stats for followers/audience engaging with tweets follower_counts = [ info_accounts[i]["follower_count"] for i in range(len(info_accounts)) ] friends_counts = [ info_accounts[i]["friends_count"] for i in range(len(info_accounts)) ] verified_counts = [ 1 if info_accounts[i]["verified"] == True else 0 for i in range(len(info_accounts)) ] return { "sample_size": len(info_accounts), "mean_follower_count": round(np.mean(follower_counts), 3), "mean_friends_count": round(np.mean(friends_counts), 3), "mean_verified": round(np.mean(verified_counts), 3), } else: response = Response(content="Account not in scope of project.", status_code=404) return response @app.get("/sentiment/{username}") async def get_sentiment(username: str) -> Dict[str, Dict[str, float]]: if username not in username_list: raise HTTPException(status_code=404, detail="Account not in scope of project.") query = f"from:{username} since:{start_date} until:{end_date}" tweets = ts.get_tweets(query=query) n_samples = 5 tweets_sampled = random.sample(tweets, n_samples) tweet_threads = [] for tweet in tweets_sampled: threads = ts.get_replies( username=tweet["username"], conversation_id=tweet["conversation_id"], max_tweets=100, ) tweet_threads += threads print( f"Total replies to {n_samples} sampled tweets from username: {username}, {len(tweet_threads)}" ) ## Sentiment scoring print(f"Running tweet sentiment scoring on username: {username} tweets") tweets_scores = sentiment.get_tweets_sentiment(tweets=tweets) mean_tweets_score = round(np.mean(tweets_scores), 2) ci_tweets = utils.wilson_score_interval(tweets_scores) # Get sentiment of the threads from tweets # Get username tweets sentiment print(f"Running tweet thread sentiment scoring on username: {username} tweets") threads_scores = sentiment.get_tweets_sentiment(tweets=tweet_threads) mean_threads_score = round(np.mean(threads_scores), 2) ci_threads = utils.wilson_score_interval(threads_scores) return { "thread_level": { "mean": mean_threads_score, "confidence_interal": ci_threads, }, "audience_level": { "mean": mean_tweets_score, "confidence_interval": ci_tweets, }, } @app.post("/api/generate") async def generate_text(request: Request): print("*" * 50) data = await request.json() print("*" * 50) print("POST Request:") # Check length of input, if it is greater than 10 tokens, the text is sent off to a summarizer to generate: try: generated_text = generative.generate_account_text( prompt=data["text"], model_dir=os.path.join(models_path, data["account"]) ) logging.info("INFO: Successfully generate text from model.") except Exception as e: logging.error(f"Error generating text: {e}") return {"error": "Error generating text"} # return one example generated_text = generated_text[0]["generated_text"] ################################################### ## Clean up generate text # Get rid of final sentence # sentences = nltk.sent_tokenize(generated_text) # unique_sentences = set() # non_duplicate_sentences = [] # for sentence in sentences: # if sentence not in unique_sentences: # non_duplicate_sentences.append(sentence) # unique_sentences.add(sentence) # final_text = " ".join(non_duplicate_sentences[:-1]) final_text= generated_text return {"generated_text": final_text} @app.post("/api/generate_summary") async def generate_summary(request: Request): """Generate summary from tweets Args: request: The HTTP request. Returns: The generated text. """ print("*" * 50) data = await request.json() # Get the list of text texts = data["text"] # Generate the summary summary = "This is a placeholder for summary model being returned" # Return the summary return {"summary": summary} @app.get("/examples1") async def read_examples(): with open("templates/charts/handle_sentiment_breakdown.html") as f: html = f.read() return HTMLResponse(content=html) @app.get("/examples2") async def read_examples(): with open("templates/charts/handle_sentiment_timesteps.html") as f: html = f.read() return HTMLResponse(content=html) # uvicorn --workers=2 app:app # if __name__ == "__main__": # # uvicorn.run(app, host="0.0.0.0", port=8000) # uvicorn.run("app:app", host="127.0.0.1", port=5049, reload=True)