Spaces:
Running
Running
from typing import Annotated, Optional | |
from fastapi import FastAPI, Header, Query | |
import html2text | |
import requests | |
import httpx | |
import re | |
import json | |
import newspaper | |
from fastapi.middleware.cors import CORSMiddleware | |
from bs4 import BeautifulSoup | |
import googleapiclient | |
import googleapiclient.discovery | |
from datetime import datetime | |
app = FastAPI() | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
def news_details(url: str): | |
article = newspaper.Article(url) | |
article.download() | |
article.parse() | |
return { | |
"title": article.title, | |
"description": article.text, | |
"author": article.authors, | |
"date": article.publish_date, | |
} | |
async def linkedin_post_details(post_id: Optional[str] = None, url: Optional[str] = None): | |
if not url: | |
url = "https://www.linkedin.com/posts/"+post_id | |
res = requests.get(url, headers={"user-agent":"Googlebot", "accept-language": "en-US"}) | |
soup = BeautifulSoup(res.content, "html.parser") | |
script_tags = soup.find_all("script") | |
for script_tag in script_tags: | |
try: | |
script_tag = json.loads(script_tag.string) | |
if script_tag.get("datePublished"): | |
desc = script_tag.get("articleBody") | |
if not desc: | |
desc = script_tag.get("description") | |
author = script_tag.get("author") | |
full_name = author.get("name") | |
username = author.get("url").rsplit("/", 1)[-1] | |
user_type = author.get("@type").lower() | |
date = script_tag.get("datePublished") | |
except Exception as e: | |
continue | |
spans = soup.find_all( | |
"span", {"data-test-id": "social-actions__reaction-count"} | |
) | |
if spans: | |
reactions = spans[0].text.strip() | |
else: | |
reactions = '0' | |
try: | |
comments = str(soup.find("a", {"data-test-id": "social-actions__comments"}).get( | |
"data-num-comments" | |
)) | |
except: | |
comments = '0' | |
return { | |
"insights": { | |
"likeCount": None, | |
# "commentCount": int(comments.replace(",", "")), | |
"commentCount": comments, | |
"shareCount": None, | |
# "reactionCount": int(reactions.replace(",", "")), | |
"reactionCount":reactions, | |
"reactions": [], | |
}, | |
"description": desc, | |
"username": username, | |
"name": full_name, | |
"userType": user_type, | |
"date": date, | |
} | |
# async def linkedin_post_details(post_id: str): | |
# url = "https://www.linkedin.com/posts/"+post_id | |
# res = requests.get(url, headers={"user-agent":"Googlebot", "accept-language": "en-US"}) | |
# text_maker = html2text.HTML2Text() | |
# text_maker.ignore_links = True | |
# text_maker.ignore_images = True | |
# text_maker.bypass_tables = False | |
# docs = text_maker.handle(res.content.decode("utf-8")) | |
# chunks = docs.split("\n\n#") | |
# linkedin_content = chunks[1] | |
# user = linkedin_content.split("\n\n", 5) | |
# full_name = user[1] | |
# bio = user[2] | |
# try: | |
# date, edited = user[3].split(" ") | |
# edited = True | |
# except: | |
# date = user[3].strip() | |
# edited = False | |
# content = "\n\n".join(user[5:]) | |
# insights = chunks[3].split("\n\n")[2] | |
# likes = insights.split(" ", 1)[0].strip() | |
# comments = insights.rsplit(" ", 2)[1].strip() | |
# username = url.rsplit("/",1)[-1].split("_")[0] | |
# return { | |
# "userDetails": {"full_name": full_name, "username":username,"bio": bio}, | |
# "content": content, | |
# "date": date, | |
# "is_edited": edited, | |
# "insights": {"likeCount": likes, "commentCount": comments, "shareCount": None, "viewCount":None}, | |
# "username":username | |
# } | |
async def ig_post_detail(post_id: Optional[str] = None, url: Optional[str] = None): | |
if not url: | |
url = f"https://www.instagram.com/p/{post_id}" | |
res = requests.get( | |
url, | |
headers={ | |
"user-agent": "Googlebot", | |
"accept-language": "en-US" | |
}, | |
timeout=(10, 27), | |
) | |
soup = BeautifulSoup(res.content, "html.parser") | |
meta = soup.find("meta", {"name": "description"}) | |
content = meta.get("content") | |
like_split = content.split(" likes, ") | |
likes = like_split[0] | |
comment_split = like_split[1].split(" comments - ") | |
comments = comment_split[0] | |
author_split = comment_split[1].split(": "") | |
author_date = author_split[0].split(" on ") | |
username = author_date[0] | |
date = author_date[1].split(":")[0] | |
name_desc = ( | |
soup.find("meta", {"property": "og:title"}) | |
.get("content") | |
.split(" on Instagram: ", 1) | |
) | |
full_name = name_desc[0] | |
desc = name_desc[-1] | |
return { | |
"insights": { | |
"likeCount": likes, | |
"commentCount": comments, | |
"shareCount": None, | |
}, | |
"description": desc, | |
"username": username, | |
"name": full_name, | |
"date": date, | |
} | |
async def ig_post_detail_api(post_id: Optional[str] = None, url: Optional[str] = None): | |
if not post_id: | |
# url = f"https://www.instagram.com/p/{post_id}" | |
post_id = url.split("/")[-1] | |
query_hash = "2b0673e0dc4580674a88d426fe00ea90" | |
variables = { | |
"shortcode": post_id | |
} | |
variables_json = json.dumps(variables, separators=(',', ':')) | |
url = f"https://www.instagram.com/graphql/query/?query_hash={query_hash}&variables={variables_json}" | |
res = requests.get(url, headers={"user-agent":"Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Googlebot/2.1; +http://www.google.com/bot.html) Chrome/W.X.Y.Z Safari/537.36"}).json() | |
print(res) | |
res = res["data"]["shortcut_media"] | |
if res.get("edge_media_preview_like"): | |
likes = res.get("edge_media_preview_like").get("count", 0) | |
else: | |
likes = 0 | |
if res.get("edge_media_to_comment"): | |
comments = res.get("edge_media_to_comment").get("count", 0) | |
else: | |
comments = 0 | |
if res.get("edge_media_to_caption"): | |
desc = "" | |
for x in res.get("edge_media_to_caption"): | |
if x.get("node"): | |
desc += x.get("node").get("text", "") + "\n\n" | |
desc = desc[:-4] | |
username = res["owner"].get("username") | |
full_name = res["owner"].get("full_name") | |
date = str(datetime.fromtimestamp(int(res.get("taken_at_timestamp")))) | |
# res = requests.get( | |
# url, | |
# headers={ | |
# "user-agent": "Googlebot", | |
# "accept-language": "en-US" | |
# }, | |
# timeout=(10, 27), | |
# ) | |
# soup = BeautifulSoup(res.content, "html.parser") | |
# meta = soup.find("meta", {"name": "description"}) | |
# content = meta.get("content") | |
# like_split = content.split(" likes, ") | |
# likes = like_split[0] | |
# comment_split = like_split[1].split(" comments - ") | |
# comments = comment_split[0] | |
# author_split = comment_split[1].split(": "") | |
# author_date = author_split[0].split(" on ") | |
# username = author_date[0] | |
# date = author_date[1].split(":")[0] | |
# name_desc = ( | |
# soup.find("meta", {"property": "og:title"}) | |
# .get("content") | |
# .split(" on Instagram: ", 1) | |
# ) | |
# full_name = name_desc[0] | |
# desc = name_desc[-1] | |
return { | |
"insights": { | |
"likeCount": likes, | |
"commentCount": comments, | |
"shareCount": None, | |
}, | |
"description": desc, | |
"username": username, | |
"name": full_name, | |
"date": date, | |
} | |
async def fb_post_detail(username: Optional[str] = None, post_id: Optional[str] = None, url: Optional[str] = None, api_access_key: Optional[str] = None): | |
if not url: | |
url = f"https://www.facebook.com/{username}/posts/{post_id}" | |
else: | |
username = url.split("//www.facebook.com/",1)[-1].split("/",1)[0] | |
user_agent = "Googlebot" | |
res = requests.get( | |
url, | |
headers={ | |
"user-agent": user_agent, | |
"accept-language": "en-US" | |
}, | |
timeout=(10, 27), | |
) | |
soup = BeautifulSoup(res.content, "html.parser") | |
script_tags = soup.find_all("script") | |
print(len(script_tags)) | |
for script_tag in script_tags: | |
try: | |
if "important_reactors" in script_tag.string: | |
splitter = '"reaction_count":{"count":' | |
total_react, reaction_split = script_tag.string.split(splitter, 2)[1].split("},", 1) | |
total_react = total_react.split(',"')[0] | |
pattern = r"\[.*?\]" | |
reactions = re.search(pattern, reaction_split) | |
if reactions: | |
reactions = json.loads(reactions.group(0)) | |
else: | |
reactions = [] | |
reactions = [ | |
dict( | |
name=reaction["node"]["localized_name"].lower(), | |
count=reaction["reaction_count"], | |
is_visible=reaction["visible_in_bling_bar"], | |
) | |
for reaction in reactions | |
] | |
splitter = '"share_count":{"count":' | |
shares = script_tag.string.split(splitter, 2)[1].split(",")[0] | |
splitter = '"comments":{"total_count":' | |
comments = script_tag.string.split(splitter, 2)[1].split("}")[0] | |
likes = [x.get("count") for x in reactions if x.get("name") == "like"] | |
likes = likes[0] if likes else 0 | |
print(total_react, reactions, shares, comments, likes) | |
if '"message":{"text":"' in script_tag.string: | |
desc = script_tag.string.split('"message":{"text":"', 1)[-1].split('"},')[0] | |
except Exception as e: | |
print(e) | |
continue | |
name = soup.find("meta", {"property": "og:title"}).get("content") | |
date = None | |
if api_access_key: | |
if not post_id: | |
post_id = url.split("/")[-1] | |
try: | |
post_details = requests.get( | |
f"https://graph.facebook.com/v20.0/1066512588151225_{post_id}?fields=place,shares,targeting,updated_time,created_time,description,child_attachments,caption,event,message,message_tags,story,status_type,source,coordinates,backdated_time,story_tags,scheduled_publish_time,properties,attachments&access_token={api_access_key}", | |
headers={"user-agent": "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Googlebot/2.1; +http://www.google.com/bot.html) Chrome/W.X.Y.Z Safari/537.36",} | |
).json() | |
if post_details.get("updated_time"): | |
date = post_details.get("updated_time") | |
else: | |
date = post_details.get("created_time") | |
except Exception as e: | |
print(e) | |
else: | |
post_details = None | |
return { | |
"insights": { | |
"likeCount": likes, | |
"commentCount": int(comments), | |
"shareCount": int(shares), | |
"reactionCount": int(total_react), | |
"reactions": reactions, | |
}, | |
"description": desc, | |
"username": username, | |
"name": name, | |
"date": date, | |
"details":post_details | |
} | |
async def google_search(q: str, delimiter: str = "\n---\n", sites: Annotated[list[str] | None, Query()] = None): | |
print(sites) | |
print(type(sites)) | |
url = f"https://www.google.com/search?q={q} " | |
if sites: | |
url += " OR ".join(["site:"+site for site in sites]) | |
texts = "" | |
soup = BeautifulSoup(requests.get(url).content, "html.parser") | |
for div in soup.find_all("div")[24:]: | |
if len(div.find_parents("div")) == 8: # Depth 4 means 3 parent divs (0-indexed) | |
# print(div.get_text().strip()) | |
href = div.find(href=True, recursive=True) | |
text = div.find(text=True, recursive=False) | |
if href and text: | |
print(text) | |
text = f'[{text}]({href["href"].split("/url?q=")[-1]})' | |
if text != None and text.strip(): | |
texts += text + delimiter | |
return {"results":texts} | |
async def google_search_url(q: str, sites: Annotated[list[str] | None, Query()] = None, start:int = 0, user_agent="Twitterbot"): | |
url = f"https://www.google.com/search?start={start}&q={q} " | |
if sites: | |
url += " OR ".join(["site:"+site for site in sites]) | |
res = requests.get( | |
url, | |
headers={ | |
"user-agent": user_agent, | |
"accept-language": "en-US" | |
}, | |
timeout=(10, 27), | |
) | |
soup = BeautifulSoup(res.content, "html.parser") | |
prefix = "/url?q=h" | |
len_prefix = len(prefix) | |
docs = [] | |
for div in soup.find_all(True): | |
if len(div.find_parents()) == 2: # Depth 4 means 3 parent divs (0-indexed) | |
a_tags = div.find_all("a") | |
for a in a_tags: | |
doc = a.get("href") | |
if ( | |
doc[:len_prefix] == prefix | |
and "google.com" not in doc[len_prefix - 1 :] | |
): | |
docs.append( | |
doc[len_prefix - 1 :] | |
.split("&")[0] | |
.replace("%3F", "?") | |
.replace("%3D", "=") | |
) | |
return {"results":docs} | |
async def tiktok_video_details(username: Optional[str] = None, video_id:Optional[str] = None, url: Optional[str] = None): | |
if not url: | |
if username[0] != "@": | |
username = "@" + username | |
url = f"https://www.tiktok.com/{username}/video/{video_id}" | |
else: | |
username = url.split("//www.tiktok.com/",1)[-1].split("/")[0] | |
# user_agent = "LinkedInBot" | |
user_agent = "Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)" | |
res = requests.get(url, headers={"user-agent": user_agent}) | |
# soup = BeautifulSoup(res.content, "html.parser") | |
# insights = soup.find("meta", {"property": "og:description"}).get("content") | |
# likes = insights.split(" ", 1)[0] | |
# desc = insights.rsplit(" comments. “", 1)[-1][:-1] | |
# comments = insights.split(", ", 1)[-1].split(" ", 1)[0] | |
# name = soup.find("meta", {"property": "og:title"}).get("content")[9:] | |
# return { | |
# "insights": {"likeCount": likes, "commentCount": comments, "shareCount":None, "viewCount":None}, | |
# "description": desc, | |
# "username": username, | |
# "name": name, | |
# } | |
text_maker = html2text.HTML2Text() | |
text_maker.ignore_links = True | |
text_maker.ignore_images = True | |
text_maker.bypass_tables = False | |
print("RESPONSE DETAIlL", res.content.decode("utf-8")) | |
docs = text_maker.handle(res.content.decode("utf-8")) | |
print("DOCS", docs) | |
content_detail = docs.split("###")[5] | |
likes, comments, bookmarks, shares = re.findall(r'\*\*([\w.]+)\*\*', content_detail) | |
profile = [x.strip() for x in content_detail.split("\n\nSpeed\n\n", 1)[1].split("\n", 6) if x.strip()] | |
username = profile[0] | |
date = profile[1].rsplit(" · ", 1)[-1] | |
desc = profile[-1][2:].replace("**", "") | |
return { | |
"insights":{ | |
"likeCount":likes, | |
"commentCount":comments, | |
"bookmarkCount":bookmarks, | |
"shareCount":shares | |
}, | |
"username":username, | |
"date":date, | |
"description":desc | |
} | |
async def yt_vid_detail(api_key:str, video_id: Optional[str] = None, url: Optional[str] = None): | |
# yt_ids = [doc.split("?v=")[-1] for doc in docs] | |
if url: | |
video_id = url.split("?v=")[-1] | |
youtube = googleapiclient.discovery.build( | |
"youtube", "v3", developerKey=api_key | |
) | |
# request = youtube.search().list(part="snippet", q="sari roti", type="video") | |
request = youtube.videos().list( | |
part="snippet,statistics,topicDetails", | |
# id=",".join(yt_ids), | |
id = video_id, | |
) | |
return request.execute()["items"] | |