import os import requests from fastapi import FastAPI, HTTPException, Depends from fastapi.security import OAuth2PasswordBearer from langchain_community.document_loaders import YoutubeLoader, UnstructuredPDFLoader, WebBaseLoader from langchain_community.document_loaders import OnlinePDFLoader from bs4 import BeautifulSoup from urllib.parse import urljoin import httpx app = FastAPI() API_KEY = os.environ["API_KEY"] oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") async def validate_token(token: str = Depends(oauth2_scheme)): if token != API_KEY: raise HTTPException(status_code=401, detail="Invalid API Key") @app.post("/extract_text", tags=["Text Extraction"], dependencies=[Depends(validate_token)]) def extract_text(url: str, language: str = "ja", length: int = 150000,use_jina:bool = True): try: if "youtube.com" in url or "youtu.be" in url: # YouTubeの場合 loader = YoutubeLoader.from_youtube_url( youtube_url=url, add_video_info=True, language=[language], ) docs = loader.load() text_content = str(docs) elif url.endswith(".pdf"): # PDFの場合 loader = OnlinePDFLoader(url) docs = loader.load() text_content = docs[0].page_content else: # それ以外の場合 # loader = WebBaseLoader(url) # docs = loader.load() # text_content = docs[0].page_content if use_jina: response = requests.get("https://r.jina.ai/"+ url) text_content = response.text else: response = requests.get(url,timeout = 10) text_content = str(convert_to_markdown(response.text,url)) if len(text_content) < length: return {"text_content": text_content} else: return { "text_content": text_content[: int(length / 2)] + text_content[len(text_content) - int(length / 2) :] } except Exception as e: error_msg = str(e) return {"message": error_msg} @app.post("/httpx_bs", tags=["Text Extraction and beautiful soup"], dependencies=[Depends(validate_token)]) def httpx_bs(url: str, length: int = 150000): try: response = httpx.get(url) text_content = str(convert_to_markdown(response,url)) if len(text_content) < length: return {"text_content": text_content} else: return { "text_content": text_content[: int(length / 2)] + text_content[len(text_content) - int(length / 2) :] } except Exception as e: error_msg = str(e) return {"message": error_msg} @app.post("/extract_from_url", tags=["Text Extraction from URL"], dependencies=[Depends(validate_token)]) def extract_from_url(url: str, length: int = 150000, tool: str = "httpx"): try: if tool == "jina": response = requests.get("https://r.jina.ai/" + url) text_content = response.text elif tool == "httpx": response = httpx.get(url) text_content = str(convert_to_markdown(response.text, url)) elif tool == "requests": response = requests.get(url, timeout=10) text_content = str(convert_to_markdown(response.text, url)) elif tool == "webbaseloader": loader = WebBaseLoader(url) docs = loader.load() text_content = docs[0].page_content else: raise ValueError("Invalid tool specified. Choose from 'jina', 'httpx', 'requests', or 'webbaseloader'.") if len(text_content) < length: return {"text_content": text_content} else: return { "text_content": text_content[: int(length / 2)] + text_content[len(text_content) - int(length / 2) :] } except Exception as e: error_msg = str(e) return {"message": error_msg} def convert_to_markdown(response_text,url): # if response.status_code != 200: # return f"エラー: ステータスコード {response.status_code}" soup = BeautifulSoup(response_text, 'html.parser') markdown = "" # タイトル if soup.title: markdown += f"# {soup.title.string.strip()}\n\n" # メインコンテンツ(この例では body タグ内のコンテンツを対象とします) main_content = soup.body if main_content: for element in main_content.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'a', 'ul', 'ol']): if element.name.startswith('h'): level = int(element.name[1]) markdown += f"{'#' * level} {element.get_text().strip()}\n\n" elif element.name == 'p': markdown += f"{element.get_text().strip()}\n\n" elif element.name == 'a': href = element.get('href') if href: full_url = urljoin(url, href) markdown += f"[{element.get_text().strip()}]({full_url})\n\n" elif element.name in ['ul', 'ol']: for li in element.find_all('li'): markdown += f"- {li.get_text().strip()}\n" markdown += "\n" return markdown