from typing import Annotated, Optional from fastapi import FastAPI, Header, Query import html2text import requests import httpx import re import json import newspaper from fastapi.middleware.cors import CORSMiddleware from bs4 import BeautifulSoup import googleapiclient import googleapiclient.discovery from datetime import datetime app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/news_details") def news_details(url: str): article = newspaper.Article(url) article.download() article.parse() return { "title": article.title, "description": article.text, "author": article.authors, "date": article.publish_date, } @app.get("/linkedin_post_details") async def linkedin_post_details(post_id: Optional[str] = None, url: Optional[str] = None): if not url: url = "https://www.linkedin.com/posts/"+post_id res = requests.get(url, headers={"user-agent":"Googlebot", "accept-language": "en-US"}) soup = BeautifulSoup(res.content, "html.parser") script_tags = soup.find_all("script") for script_tag in script_tags: try: script_tag = json.loads(script_tag.string) if script_tag.get("datePublished"): desc = script_tag.get("articleBody") if not desc: desc = script_tag.get("description") author = script_tag.get("author") full_name = author.get("name") username = author.get("url").rsplit("/", 1)[-1] user_type = author.get("@type").lower() date = script_tag.get("datePublished") except Exception as e: continue spans = soup.find_all( "span", {"data-test-id": "social-actions__reaction-count"} ) if spans: reactions = spans[0].text.strip() else: reactions = '0' try: comments = str(soup.find("a", {"data-test-id": "social-actions__comments"}).get( "data-num-comments" )) except: comments = '0' return { "insights": { "likeCount": None, # "commentCount": int(comments.replace(",", "")), "commentCount": comments, "shareCount": None, # "reactionCount": int(reactions.replace(",", "")), "reactionCount":reactions, "reactions": [], }, "description": desc, "username": username, "name": full_name, "userType": user_type, "date": date, } # async def linkedin_post_details(post_id: str): # url = "https://www.linkedin.com/posts/"+post_id # res = requests.get(url, headers={"user-agent":"Googlebot", "accept-language": "en-US"}) # text_maker = html2text.HTML2Text() # text_maker.ignore_links = True # text_maker.ignore_images = True # text_maker.bypass_tables = False # docs = text_maker.handle(res.content.decode("utf-8")) # chunks = docs.split("\n\n#") # linkedin_content = chunks[1] # user = linkedin_content.split("\n\n", 5) # full_name = user[1] # bio = user[2] # try: # date, edited = user[3].split(" ") # edited = True # except: # date = user[3].strip() # edited = False # content = "\n\n".join(user[5:]) # insights = chunks[3].split("\n\n")[2] # likes = insights.split(" ", 1)[0].strip() # comments = insights.rsplit(" ", 2)[1].strip() # username = url.rsplit("/",1)[-1].split("_")[0] # return { # "userDetails": {"full_name": full_name, "username":username,"bio": bio}, # "content": content, # "date": date, # "is_edited": edited, # "insights": {"likeCount": likes, "commentCount": comments, "shareCount": None, "viewCount":None}, # "username":username # } @app.get("/instagram_post_details") async def ig_post_detail(post_id: Optional[str] = None, url: Optional[str] = None): if not url: url = f"https://www.instagram.com/p/{post_id}" res = requests.get( url, headers={ "user-agent": "Googlebot", "accept-language": "en-US" }, timeout=(10, 27), ) soup = BeautifulSoup(res.content, "html.parser") meta = soup.find("meta", {"name": "description"}) content = meta.get("content") like_split = content.split(" likes, ") likes = like_split[0] comment_split = like_split[1].split(" comments - ") comments = comment_split[0] author_split = comment_split[1].split(": "") author_date = author_split[0].split(" on ") username = author_date[0] date = author_date[1].split(":")[0] name_desc = ( soup.find("meta", {"property": "og:title"}) .get("content") .split(" on Instagram: ", 1) ) full_name = name_desc[0] desc = name_desc[-1] return { "insights": { "likeCount": likes, "commentCount": comments, "shareCount": None, }, "description": desc, "username": username, "name": full_name, "date": date, } @app.get("/instagram_post_details_api") async def ig_post_detail_api(post_id: Optional[str] = None, url: Optional[str] = None): if not post_id: # url = f"https://www.instagram.com/p/{post_id}" post_id = url.split("/")[-1] query_hash = "2b0673e0dc4580674a88d426fe00ea90" variables = { "shortcode": post_id } variables_json = json.dumps(variables, separators=(',', ':')) url = f"https://www.instagram.com/graphql/query/?query_hash={query_hash}&variables={variables_json}" res = requests.get(url, headers={"user-agent":"Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Googlebot/2.1; +http://www.google.com/bot.html) Chrome/W.X.Y.Z Safari/537.36"}).json() print(res) res = res["data"]["shortcut_media"] if res.get("edge_media_preview_like"): likes = res.get("edge_media_preview_like").get("count", 0) else: likes = 0 if res.get("edge_media_to_comment"): comments = res.get("edge_media_to_comment").get("count", 0) else: comments = 0 if res.get("edge_media_to_caption"): desc = "" for x in res.get("edge_media_to_caption"): if x.get("node"): desc += x.get("node").get("text", "") + "\n\n" desc = desc[:-4] username = res["owner"].get("username") full_name = res["owner"].get("full_name") date = str(datetime.fromtimestamp(int(res.get("taken_at_timestamp")))) # res = requests.get( # url, # headers={ # "user-agent": "Googlebot", # "accept-language": "en-US" # }, # timeout=(10, 27), # ) # soup = BeautifulSoup(res.content, "html.parser") # meta = soup.find("meta", {"name": "description"}) # content = meta.get("content") # like_split = content.split(" likes, ") # likes = like_split[0] # comment_split = like_split[1].split(" comments - ") # comments = comment_split[0] # author_split = comment_split[1].split(": "") # author_date = author_split[0].split(" on ") # username = author_date[0] # date = author_date[1].split(":")[0] # name_desc = ( # soup.find("meta", {"property": "og:title"}) # .get("content") # .split(" on Instagram: ", 1) # ) # full_name = name_desc[0] # desc = name_desc[-1] return { "insights": { "likeCount": likes, "commentCount": comments, "shareCount": None, }, "description": desc, "username": username, "name": full_name, "date": date, } @app.get("/facebook_post_details") async def fb_post_detail(username: Optional[str] = None, post_id: Optional[str] = None, url: Optional[str] = None, api_access_key: Optional[str] = None): if not url: url = f"https://www.facebook.com/{username}/posts/{post_id}" else: username = url.split("//www.facebook.com/",1)[-1].split("/",1)[0] user_agent = "Googlebot" res = requests.get( url, headers={ "user-agent": user_agent, "accept-language": "en-US" }, timeout=(10, 27), ) soup = BeautifulSoup(res.content, "html.parser") script_tags = soup.find_all("script") print(len(script_tags)) for script_tag in script_tags: try: if "important_reactors" in script_tag.string: splitter = '"reaction_count":{"count":' total_react, reaction_split = script_tag.string.split(splitter, 2)[1].split("},", 1) total_react = total_react.split(',"')[0] pattern = r"\[.*?\]" reactions = re.search(pattern, reaction_split) if reactions: reactions = json.loads(reactions.group(0)) else: reactions = [] reactions = [ dict( name=reaction["node"]["localized_name"].lower(), count=reaction["reaction_count"], is_visible=reaction["visible_in_bling_bar"], ) for reaction in reactions ] splitter = '"share_count":{"count":' shares = script_tag.string.split(splitter, 2)[1].split(",")[0] splitter = '"comments":{"total_count":' comments = script_tag.string.split(splitter, 2)[1].split("}")[0] likes = [x.get("count") for x in reactions if x.get("name") == "like"] likes = likes[0] if likes else 0 print(total_react, reactions, shares, comments, likes) if '"message":{"text":"' in script_tag.string: desc = script_tag.string.split('"message":{"text":"', 1)[-1].split('"},')[0] except Exception as e: print(e) continue name = soup.find("meta", {"property": "og:title"}).get("content") date = None if api_access_key: if not post_id: post_id = url.split("/")[-1] try: post_details = requests.get( f"https://graph.facebook.com/v20.0/1066512588151225_{post_id}?fields=place,shares,targeting,updated_time,created_time,description,child_attachments,caption,event,message,message_tags,story,status_type,source,coordinates,backdated_time,story_tags,scheduled_publish_time,properties,attachments&access_token={api_access_key}", headers={"user-agent": "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Googlebot/2.1; +http://www.google.com/bot.html) Chrome/W.X.Y.Z Safari/537.36",} ).json() if post_details.get("updated_time"): date = post_details.get("updated_time") else: date = post_details.get("created_time") except Exception as e: print(e) else: post_details = None return { "insights": { "likeCount": likes, "commentCount": int(comments), "shareCount": int(shares), "reactionCount": int(total_react), "reactions": reactions, }, "description": desc, "username": username, "name": name, "date": date, "details":post_details } @app.get("/google_search") async def google_search(q: str, delimiter: str = "\n---\n", sites: Annotated[list[str] | None, Query()] = None): print(sites) print(type(sites)) url = f"https://www.google.com/search?q={q} " if sites: url += " OR ".join(["site:"+site for site in sites]) texts = "" soup = BeautifulSoup(requests.get(url).content, "html.parser") for div in soup.find_all("div")[24:]: if len(div.find_parents("div")) == 8: # Depth 4 means 3 parent divs (0-indexed) # print(div.get_text().strip()) href = div.find(href=True, recursive=True) text = div.find(text=True, recursive=False) if href and text: print(text) text = f'[{text}]({href["href"].split("/url?q=")[-1]})' if text != None and text.strip(): texts += text + delimiter return {"results":texts} @app.get("/google_search_urls") async def google_search_url(q: str, sites: Annotated[list[str] | None, Query()] = None, start:int = 0, user_agent="Twitterbot"): url = f"https://www.google.com/search?start={start}&q={q} " if sites: url += " OR ".join(["site:"+site for site in sites]) res = requests.get( url, headers={ "user-agent": user_agent, "accept-language": "en-US" }, timeout=(10, 27), ) soup = BeautifulSoup(res.content, "html.parser") prefix = "/url?q=h" len_prefix = len(prefix) docs = [] for div in soup.find_all(True): if len(div.find_parents()) == 2: # Depth 4 means 3 parent divs (0-indexed) a_tags = div.find_all("a") for a in a_tags: doc = a.get("href") if ( doc[:len_prefix] == prefix and "google.com" not in doc[len_prefix - 1 :] ): docs.append( doc[len_prefix - 1 :] .split("&")[0] .replace("%3F", "?") .replace("%3D", "=") ) return {"results":docs} @app.get("/tiktok_video_details") async def tiktok_video_details(username: Optional[str] = None, video_id:Optional[str] = None, url: Optional[str] = None): if not url: if username[0] != "@": username = "@" + username url = f"https://www.tiktok.com/{username}/video/{video_id}" else: username = url.split("//www.tiktok.com/",1)[-1].split("/")[0] # user_agent = "LinkedInBot" user_agent = "Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)" res = requests.get(url, headers={"user-agent": user_agent}) # soup = BeautifulSoup(res.content, "html.parser") # insights = soup.find("meta", {"property": "og:description"}).get("content") # likes = insights.split(" ", 1)[0] # desc = insights.rsplit(" comments. “", 1)[-1][:-1] # comments = insights.split(", ", 1)[-1].split(" ", 1)[0] # name = soup.find("meta", {"property": "og:title"}).get("content")[9:] # return { # "insights": {"likeCount": likes, "commentCount": comments, "shareCount":None, "viewCount":None}, # "description": desc, # "username": username, # "name": name, # } text_maker = html2text.HTML2Text() text_maker.ignore_links = True text_maker.ignore_images = True text_maker.bypass_tables = False print("RESPONSE DETAIlL", res.content.decode("utf-8")) docs = text_maker.handle(res.content.decode("utf-8")) print("DOCS", docs) content_detail = docs.split("###")[5] likes, comments, bookmarks, shares = re.findall(r'\*\*([\w.]+)\*\*', content_detail) profile = [x.strip() for x in content_detail.split("\n\nSpeed\n\n", 1)[1].split("\n", 6) if x.strip()] username = profile[0] date = profile[1].rsplit(" · ", 1)[-1] desc = profile[-1][2:].replace("**", "") return { "insights":{ "likeCount":likes, "commentCount":comments, "bookmarkCount":bookmarks, "shareCount":shares }, "username":username, "date":date, "description":desc } @app.get("/youtube_video_details") async def yt_vid_detail(api_key:str, video_id: Optional[str] = None, url: Optional[str] = None): # yt_ids = [doc.split("?v=")[-1] for doc in docs] if url: video_id = url.split("?v=")[-1] youtube = googleapiclient.discovery.build( "youtube", "v3", developerKey=api_key ) # request = youtube.search().list(part="snippet", q="sari roti", type="video") request = youtube.videos().list( part="snippet,statistics,topicDetails", # id=",".join(yt_ids), id = video_id, ) return request.execute()["items"]