from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import pipe_line_obsei import support_function as sf from typing import Dict, Any import json import asyncio from crawl4ai import AsyncWebCrawler from crawl4ai.extraction_strategy import JsonCssExtractionStrategy import modal crawler = modal.Image.debian_slim(python_version="3.10").pip_install_from_requirements("requirements.txt").run_commands( "apt-get update", "apt-get install -y software-properties-common", "apt-add-repository non-free", "apt-add-repository contrib", "playwright install-deps chromium", "playwright install chromium", "playwright install", ) # Định nghĩa model request body class URLProcessRequest(BaseModel): target_url: str # URL cần phân tích primary_db: str # Tên database chính primary_collection: str # Tên collection chính backup_db: str # Tên database dự phòng backup_collection: str class SchemaReq(BaseModel): schema: dict[str, Any] url: str class SchemaReqMain(BaseModel): schema: dict[str, Any] url: str scroll: bool category: str class TimeProcessRequest(BaseModel): target_time : str # Thời gian cần chuẩn hóa async def return_json(schema: Dict[str, Any], url: str) -> Dict[str, Any]: extraction_strategy = JsonCssExtractionStrategy(schema, verbose=True) async with AsyncWebCrawler(always_by_pass_cache=True) as crawler: result = await crawler.arun( url=url, exclude_external_links=True, bypass_cache=True, verbose=False, warning=False, extraction_strategy=extraction_strategy ) # Parse nội dung đã crawl thành JSON news_teasers = json.loads(result.extracted_content) return news_teasers async def return_json_main (schema: dict[str, Any], url: str, scroll: bool, category: str): js_code = """ (async function() { let lastHeight = 0; while (document.body.scrollHeight !== lastHeight) { lastHeight = document.body.scrollHeight; window.scrollTo(0, document.body.scrollHeight); await new Promise(resolve => setTimeout(resolve, 1000)); // Đợi 1 giây giữa mỗi lần cuộn } })(); """ if scroll else "" extraction_strategy = JsonCssExtractionStrategy(schema, verbose=True) async with AsyncWebCrawler(always_bypass_cache=True) as crawler: result = await crawler.arun( url=url, extraction_strategy=extraction_strategy, bypass_cache=True, warning=False, js_code=js_code ) await asyncio.sleep(5) news_teasers = json.loads(result.extracted_content) if isinstance(news_teasers, list): for item in news_teasers: item["category"] = category else: print("Unexpected data format:", news_teasers) return [] return news_teasers # Khởi tạo FastAPI app = FastAPI( title="Obsei", description="API để xử lý dữ liệu từ các nguồn web", swagger_ui_parameters={"syntaxHighlight.theme": "obsidian"}, version="1.0.0", contact={ "name": "Vo Nhu Y", "url": "https://pychatbot1.streamlit.app", "email": "vonhuy5112002@gmail.com", }, license_info={ "name": "Apache 2.0", "url": "https://www.apache.org/licenses/LICENSE-2.0.html", } ) # Cấu hình CORS origins = [ "http://localhost:8000", "https://yourfrontendapp.com", # Thêm domain của frontend nếu cần ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") async def root(): return {"message": "Welcome to ChatBot HCMUTE API!"} @app.post("/api/v1/obsei/process_url/") async def process_url_api(request: URLProcessRequest): """ API nhận request body chứa thông tin URL và các thông tin database cần thiết, sau đó xử lý URL, phân tích và lưu dữ liệu vào MongoDB. """ try: # Lấy dữ liệu từ request body target_url = request.target_url primary_db = request.primary_db primary_collection = request.primary_collection backup_db = request.backup_db backup_collection = request.backup_collection # Gọi hàm `process_url` đã định nghĩa processed_text, content_data = await pipe_line_obsei.process_url( target_url, primary_db, primary_collection, backup_db, backup_collection ) return { "processed_text": processed_text, "content_data": content_data, } except Exception as e: raise HTTPException( status_code=500, detail=f"An error occurred while processing the request: {str(e)}" ) # API cho hàm chuẩn hóa thời gian @app.post("/api/v1/obsei/chuan_hoa_time/") # endpoint chuẩn hóa thời gian async def chuan_hoa_time_api(request: TimeProcessRequest): """ API nhận chuỗi thời gian và chuẩn hóa thời gian theo định dạng mong muốn. """ try: time_str = request.target_time # Bạn có thể thay đổi thuộc tính phù hợp formatted_time = sf.chuan_hoa_time(time_str) return { "formatted_time": formatted_time } except Exception as e: raise HTTPException( status_code=500, detail=f"An error occurred while processing the time: {str(e)}" ) # API cho crawling với schema @app.post("/api/v1/crawl/") async def crawl_url(request: SchemaReq): """ API nhận request body chứa schema và URL, sau đó crawl và trả về dữ liệu. """ try: # Lấy schema và URL từ request body schema = request.schema url = request.url # Gọi hàm `return_json` để lấy dữ liệu đã crawl print(schema) print(url) data = await return_json(schema, url) # Gọi async function mà không cần asyncio.run() return {"status": "success", "data": data} except Exception as e: raise HTTPException( status_code=500, detail=f"An error occurred while processing the request: {str(e)}" ) @app.post("/api/v1/crawl_main/") async def crawl_url(request: SchemaReqMain): """ API nhận request body chứa schema và URL, sau đó crawl và trả về dữ liệu. """ try: # Lấy schema và URL từ request body schema = request.schema url = request.url scroll = request.scroll category = request.category # Gọi hàm `return_json` để lấy dữ liệu đã crawl data = await return_json_main(schema, url,scroll,category ) # Gọi async function mà không cần asyncio.run() return {"status": "success", "data": data} except Exception as e: raise HTTPException( status_code=500, detail=f"An error occurred while processing the request: {str(e)}" )