web_scrape / app.py
jonathanjordan21's picture
Update app.py
47bf472 verified
from typing import Annotated, Optional
from fastapi import FastAPI, Header, Query
import html2text
import requests
import httpx
import re
import json
import newspaper
from fastapi.middleware.cors import CORSMiddleware
from bs4 import BeautifulSoup
import googleapiclient
import googleapiclient.discovery
from datetime import datetime
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/news_details")
def news_details(url: str):
article = newspaper.Article(url)
article.download()
article.parse()
return {
"title": article.title,
"description": article.text,
"author": article.authors,
"date": article.publish_date,
}
@app.get("/linkedin_post_details")
async def linkedin_post_details(post_id: Optional[str] = None, url: Optional[str] = None):
if not url:
url = "https://www.linkedin.com/posts/"+post_id
res = requests.get(url, headers={"user-agent":"Googlebot", "accept-language": "en-US"})
soup = BeautifulSoup(res.content, "html.parser")
script_tags = soup.find_all("script")
for script_tag in script_tags:
try:
script_tag = json.loads(script_tag.string)
if script_tag.get("datePublished"):
desc = script_tag.get("articleBody")
if not desc:
desc = script_tag.get("description")
author = script_tag.get("author")
full_name = author.get("name")
username = author.get("url").rsplit("/", 1)[-1]
user_type = author.get("@type").lower()
date = script_tag.get("datePublished")
except Exception as e:
continue
spans = soup.find_all(
"span", {"data-test-id": "social-actions__reaction-count"}
)
if spans:
reactions = spans[0].text.strip()
else:
reactions = '0'
try:
comments = str(soup.find("a", {"data-test-id": "social-actions__comments"}).get(
"data-num-comments"
))
except:
comments = '0'
return {
"insights": {
"likeCount": None,
# "commentCount": int(comments.replace(",", "")),
"commentCount": comments,
"shareCount": None,
# "reactionCount": int(reactions.replace(",", "")),
"reactionCount":reactions,
"reactions": [],
},
"description": desc,
"username": username,
"name": full_name,
"userType": user_type,
"date": date,
}
# async def linkedin_post_details(post_id: str):
# url = "https://www.linkedin.com/posts/"+post_id
# res = requests.get(url, headers={"user-agent":"Googlebot", "accept-language": "en-US"})
# text_maker = html2text.HTML2Text()
# text_maker.ignore_links = True
# text_maker.ignore_images = True
# text_maker.bypass_tables = False
# docs = text_maker.handle(res.content.decode("utf-8"))
# chunks = docs.split("\n\n#")
# linkedin_content = chunks[1]
# user = linkedin_content.split("\n\n", 5)
# full_name = user[1]
# bio = user[2]
# try:
# date, edited = user[3].split(" ")
# edited = True
# except:
# date = user[3].strip()
# edited = False
# content = "\n\n".join(user[5:])
# insights = chunks[3].split("\n\n")[2]
# likes = insights.split(" ", 1)[0].strip()
# comments = insights.rsplit(" ", 2)[1].strip()
# username = url.rsplit("/",1)[-1].split("_")[0]
# return {
# "userDetails": {"full_name": full_name, "username":username,"bio": bio},
# "content": content,
# "date": date,
# "is_edited": edited,
# "insights": {"likeCount": likes, "commentCount": comments, "shareCount": None, "viewCount":None},
# "username":username
# }
@app.get("/instagram_post_details")
async def ig_post_detail(post_id: Optional[str] = None, url: Optional[str] = None):
if not url:
url = f"https://www.instagram.com/p/{post_id}"
res = requests.get(
url,
headers={
"user-agent": "Googlebot",
"accept-language": "en-US"
},
timeout=(10, 27),
)
soup = BeautifulSoup(res.content, "html.parser")
meta = soup.find("meta", {"name": "description"})
content = meta.get("content")
like_split = content.split(" likes, ")
likes = like_split[0]
comment_split = like_split[1].split(" comments - ")
comments = comment_split[0]
author_split = comment_split[1].split(": "")
author_date = author_split[0].split(" on ")
username = author_date[0]
date = author_date[1].split(":")[0]
name_desc = (
soup.find("meta", {"property": "og:title"})
.get("content")
.split(" on Instagram: ", 1)
)
full_name = name_desc[0]
desc = name_desc[-1]
return {
"insights": {
"likeCount": likes,
"commentCount": comments,
"shareCount": None,
},
"description": desc,
"username": username,
"name": full_name,
"date": date,
}
@app.get("/instagram_post_details_api")
async def ig_post_detail_api(post_id: Optional[str] = None, url: Optional[str] = None):
if not post_id:
# url = f"https://www.instagram.com/p/{post_id}"
post_id = url.split("/")[-1]
query_hash = "2b0673e0dc4580674a88d426fe00ea90"
variables = {
"shortcode": post_id
}
variables_json = json.dumps(variables, separators=(',', ':'))
url = f"https://www.instagram.com/graphql/query/?query_hash={query_hash}&variables={variables_json}"
res = requests.get(url, headers={"user-agent":"Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Googlebot/2.1; +http://www.google.com/bot.html) Chrome/W.X.Y.Z Safari/537.36"}).json()
print(res)
res = res["data"]["shortcut_media"]
if res.get("edge_media_preview_like"):
likes = res.get("edge_media_preview_like").get("count", 0)
else:
likes = 0
if res.get("edge_media_to_comment"):
comments = res.get("edge_media_to_comment").get("count", 0)
else:
comments = 0
if res.get("edge_media_to_caption"):
desc = ""
for x in res.get("edge_media_to_caption"):
if x.get("node"):
desc += x.get("node").get("text", "") + "\n\n"
desc = desc[:-4]
username = res["owner"].get("username")
full_name = res["owner"].get("full_name")
date = str(datetime.fromtimestamp(int(res.get("taken_at_timestamp"))))
# res = requests.get(
# url,
# headers={
# "user-agent": "Googlebot",
# "accept-language": "en-US"
# },
# timeout=(10, 27),
# )
# soup = BeautifulSoup(res.content, "html.parser")
# meta = soup.find("meta", {"name": "description"})
# content = meta.get("content")
# like_split = content.split(" likes, ")
# likes = like_split[0]
# comment_split = like_split[1].split(" comments - ")
# comments = comment_split[0]
# author_split = comment_split[1].split(": "")
# author_date = author_split[0].split(" on ")
# username = author_date[0]
# date = author_date[1].split(":")[0]
# name_desc = (
# soup.find("meta", {"property": "og:title"})
# .get("content")
# .split(" on Instagram: ", 1)
# )
# full_name = name_desc[0]
# desc = name_desc[-1]
return {
"insights": {
"likeCount": likes,
"commentCount": comments,
"shareCount": None,
},
"description": desc,
"username": username,
"name": full_name,
"date": date,
}
@app.get("/facebook_post_details")
async def fb_post_detail(username: Optional[str] = None, post_id: Optional[str] = None, url: Optional[str] = None, api_access_key: Optional[str] = None):
if not url:
url = f"https://www.facebook.com/{username}/posts/{post_id}"
else:
username = url.split("//www.facebook.com/",1)[-1].split("/",1)[0]
user_agent = "Googlebot"
res = requests.get(
url,
headers={
"user-agent": user_agent,
"accept-language": "en-US"
},
timeout=(10, 27),
)
soup = BeautifulSoup(res.content, "html.parser")
script_tags = soup.find_all("script")
print(len(script_tags))
for script_tag in script_tags:
try:
if "important_reactors" in script_tag.string:
splitter = '"reaction_count":{"count":'
total_react, reaction_split = script_tag.string.split(splitter, 2)[1].split("},", 1)
total_react = total_react.split(',"')[0]
pattern = r"\[.*?\]"
reactions = re.search(pattern, reaction_split)
if reactions:
reactions = json.loads(reactions.group(0))
else:
reactions = []
reactions = [
dict(
name=reaction["node"]["localized_name"].lower(),
count=reaction["reaction_count"],
is_visible=reaction["visible_in_bling_bar"],
)
for reaction in reactions
]
splitter = '"share_count":{"count":'
shares = script_tag.string.split(splitter, 2)[1].split(",")[0]
splitter = '"comments":{"total_count":'
comments = script_tag.string.split(splitter, 2)[1].split("}")[0]
likes = [x.get("count") for x in reactions if x.get("name") == "like"]
likes = likes[0] if likes else 0
print(total_react, reactions, shares, comments, likes)
if '"message":{"text":"' in script_tag.string:
desc = script_tag.string.split('"message":{"text":"', 1)[-1].split('"},')[0]
except Exception as e:
print(e)
continue
name = soup.find("meta", {"property": "og:title"}).get("content")
date = None
if api_access_key:
if not post_id:
post_id = url.split("/")[-1]
try:
post_details = requests.get(
f"https://graph.facebook.com/v20.0/1066512588151225_{post_id}?fields=place,shares,targeting,updated_time,created_time,description,child_attachments,caption,event,message,message_tags,story,status_type,source,coordinates,backdated_time,story_tags,scheduled_publish_time,properties,attachments&access_token={api_access_key}",
headers={"user-agent": "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Googlebot/2.1; +http://www.google.com/bot.html) Chrome/W.X.Y.Z Safari/537.36",}
).json()
if post_details.get("updated_time"):
date = post_details.get("updated_time")
else:
date = post_details.get("created_time")
except Exception as e:
print(e)
else:
post_details = None
return {
"insights": {
"likeCount": likes,
"commentCount": int(comments),
"shareCount": int(shares),
"reactionCount": int(total_react),
"reactions": reactions,
},
"description": desc,
"username": username,
"name": name,
"date": date,
"details":post_details
}
@app.get("/google_search")
async def google_search(q: str, delimiter: str = "\n---\n", sites: Annotated[list[str] | None, Query()] = None):
print(sites)
print(type(sites))
url = f"https://www.google.com/search?q={q} "
if sites:
url += " OR ".join(["site:"+site for site in sites])
texts = ""
soup = BeautifulSoup(requests.get(url).content, "html.parser")
for div in soup.find_all("div")[24:]:
if len(div.find_parents("div")) == 8: # Depth 4 means 3 parent divs (0-indexed)
# print(div.get_text().strip())
href = div.find(href=True, recursive=True)
text = div.find(text=True, recursive=False)
if href and text:
print(text)
text = f'[{text}]({href["href"].split("/url?q=")[-1]})'
if text != None and text.strip():
texts += text + delimiter
return {"results":texts}
@app.get("/google_search_urls")
async def google_search_url(q: str, sites: Annotated[list[str] | None, Query()] = None, start:int = 0, user_agent="Twitterbot"):
url = f"https://www.google.com/search?start={start}&q={q} "
if sites:
url += " OR ".join(["site:"+site for site in sites])
res = requests.get(
url,
headers={
"user-agent": user_agent,
"accept-language": "en-US"
},
timeout=(10, 27),
)
soup = BeautifulSoup(res.content, "html.parser")
prefix = "/url?q=h"
len_prefix = len(prefix)
docs = []
for div in soup.find_all(True):
if len(div.find_parents()) == 2: # Depth 4 means 3 parent divs (0-indexed)
a_tags = div.find_all("a")
for a in a_tags:
doc = a.get("href")
if (
doc[:len_prefix] == prefix
and "google.com" not in doc[len_prefix - 1 :]
):
docs.append(
doc[len_prefix - 1 :]
.split("&")[0]
.replace("%3F", "?")
.replace("%3D", "=")
)
return {"results":docs}
@app.get("/tiktok_video_details")
async def tiktok_video_details(username: Optional[str] = None, video_id:Optional[str] = None, url: Optional[str] = None):
if not url:
if username[0] != "@":
username = "@" + username
url = f"https://www.tiktok.com/{username}/video/{video_id}"
else:
username = url.split("//www.tiktok.com/",1)[-1].split("/")[0]
# user_agent = "LinkedInBot"
user_agent = "Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)"
res = requests.get(url, headers={"user-agent": user_agent})
# soup = BeautifulSoup(res.content, "html.parser")
# insights = soup.find("meta", {"property": "og:description"}).get("content")
# likes = insights.split(" ", 1)[0]
# desc = insights.rsplit(" comments. “", 1)[-1][:-1]
# comments = insights.split(", ", 1)[-1].split(" ", 1)[0]
# name = soup.find("meta", {"property": "og:title"}).get("content")[9:]
# return {
# "insights": {"likeCount": likes, "commentCount": comments, "shareCount":None, "viewCount":None},
# "description": desc,
# "username": username,
# "name": name,
# }
text_maker = html2text.HTML2Text()
text_maker.ignore_links = True
text_maker.ignore_images = True
text_maker.bypass_tables = False
print("RESPONSE DETAIlL", res.content.decode("utf-8"))
docs = text_maker.handle(res.content.decode("utf-8"))
print("DOCS", docs)
content_detail = docs.split("###")[5]
likes, comments, bookmarks, shares = re.findall(r'\*\*([\w.]+)\*\*', content_detail)
profile = [x.strip() for x in content_detail.split("\n\nSpeed\n\n", 1)[1].split("\n", 6) if x.strip()]
username = profile[0]
date = profile[1].rsplit(" · ", 1)[-1]
desc = profile[-1][2:].replace("**", "")
return {
"insights":{
"likeCount":likes,
"commentCount":comments,
"bookmarkCount":bookmarks,
"shareCount":shares
},
"username":username,
"date":date,
"description":desc
}
@app.get("/youtube_video_details")
async def yt_vid_detail(api_key:str, video_id: Optional[str] = None, url: Optional[str] = None):
# yt_ids = [doc.split("?v=")[-1] for doc in docs]
if url:
video_id = url.split("?v=")[-1]
youtube = googleapiclient.discovery.build(
"youtube", "v3", developerKey=api_key
)
# request = youtube.search().list(part="snippet", q="sari roti", type="video")
request = youtube.videos().list(
part="snippet,statistics,topicDetails",
# id=",".join(yt_ids),
id = video_id,
)
return request.execute()["items"]