URL2Text2 / app.py
tregu0458's picture
Update app.py
f9bab08 verified
import os
import requests
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import OAuth2PasswordBearer
from langchain_community.document_loaders import YoutubeLoader, UnstructuredPDFLoader, WebBaseLoader
from langchain_community.document_loaders import OnlinePDFLoader
from bs4 import BeautifulSoup
from urllib.parse import urljoin
import httpx
app = FastAPI()
API_KEY = os.environ["API_KEY"]
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def validate_token(token: str = Depends(oauth2_scheme)):
if token != API_KEY:
raise HTTPException(status_code=401, detail="Invalid API Key")
@app.post("/extract_text", tags=["Text Extraction"], dependencies=[Depends(validate_token)])
def extract_text(url: str, language: str = "ja", length: int = 150000,use_jina:bool = True):
try:
if "youtube.com" in url or "youtu.be" in url:
# YouTubeの場合
loader = YoutubeLoader.from_youtube_url(
youtube_url=url,
add_video_info=True,
language=[language],
)
docs = loader.load()
text_content = str(docs)
elif url.endswith(".pdf"):
# PDFの場合
loader = OnlinePDFLoader(url)
docs = loader.load()
text_content = docs[0].page_content
else:
# それ以外の場合
# loader = WebBaseLoader(url)
# docs = loader.load()
# text_content = docs[0].page_content
if use_jina:
response = requests.get("https://r.jina.ai/"+ url)
text_content = response.text
else:
response = requests.get(url,timeout = 10)
text_content = str(convert_to_markdown(response.text,url))
if len(text_content) < length:
return {"text_content": text_content}
else:
return {
"text_content": text_content[: int(length / 2)]
+ text_content[len(text_content) - int(length / 2) :]
}
except Exception as e:
error_msg = str(e)
return {"message": error_msg}
@app.post("/httpx_bs", tags=["Text Extraction and beautiful soup"], dependencies=[Depends(validate_token)])
def httpx_bs(url: str, length: int = 150000):
try:
response = httpx.get(url)
text_content = str(convert_to_markdown(response,url))
if len(text_content) < length:
return {"text_content": text_content}
else:
return {
"text_content": text_content[: int(length / 2)]
+ text_content[len(text_content) - int(length / 2) :]
}
except Exception as e:
error_msg = str(e)
return {"message": error_msg}
@app.post("/extract_from_url", tags=["Text Extraction from URL"], dependencies=[Depends(validate_token)])
def extract_from_url(url: str, length: int = 150000, tool: str = "httpx"):
try:
if tool == "jina":
response = requests.get("https://r.jina.ai/" + url)
text_content = response.text
elif tool == "httpx":
response = httpx.get(url)
text_content = str(convert_to_markdown(response.text, url))
elif tool == "requests":
response = requests.get(url, timeout=10)
text_content = str(convert_to_markdown(response.text, url))
elif tool == "webbaseloader":
loader = WebBaseLoader(url)
docs = loader.load()
text_content = docs[0].page_content
else:
raise ValueError("Invalid tool specified. Choose from 'jina', 'httpx', 'requests', or 'webbaseloader'.")
if len(text_content) < length:
return {"text_content": text_content}
else:
return {
"text_content": text_content[: int(length / 2)]
+ text_content[len(text_content) - int(length / 2) :]
}
except Exception as e:
error_msg = str(e)
return {"message": error_msg}
def convert_to_markdown(response_text,url):
# if response.status_code != 200:
# return f"エラー: ステータスコード {response.status_code}"
soup = BeautifulSoup(response_text, 'html.parser')
markdown = ""
# タイトル
if soup.title:
markdown += f"# {soup.title.string.strip()}\n\n"
# メインコンテンツ(この例では body タグ内のコンテンツを対象とします)
main_content = soup.body
if main_content:
for element in main_content.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'a', 'ul', 'ol']):
if element.name.startswith('h'):
level = int(element.name[1])
markdown += f"{'#' * level} {element.get_text().strip()}\n\n"
elif element.name == 'p':
markdown += f"{element.get_text().strip()}\n\n"
elif element.name == 'a':
href = element.get('href')
if href:
full_url = urljoin(url, href)
markdown += f"[{element.get_text().strip()}]({full_url})\n\n"
elif element.name in ['ul', 'ol']:
for li in element.find_all('li'):
markdown += f"- {li.get_text().strip()}\n"
markdown += "\n"
return markdown