KLTN / main.py
ohhhchank3's picture
Update main.py
ec4d6cd verified
from fastapi import FastAPI, File, UploadFile, HTTPException, Header,Query
from fastapi.responses import JSONResponse
import dropbox
from typing import Any, Dict, List, Optional, Union
import shutil
import os
import firebase_admin.auth
import nest_asyncio
from pyngrok import ngrok
import uvicorn
import store_file_dropbox as sf_dropbox
from typing import List
import server_function as sf
import firebase_admin
from firebase_admin import credentials,auth
import uvicorn
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from fastapi.exceptions import HTTPException
from fastapi.requests import Request
global_TOKEN = ""
# Đọc token từ file
with open("/code/TOKEN.txt", "r") as f:
global_TOKEN = f.read().strip()
# Khởi tạo đối tượng Dropbox
dbx = dropbox.Dropbox(global_TOKEN)
NGROK_STATIC_DOMAIN = "rightly-poetic-amoeba.ngrok-free.app"
NGROK_TOKEN="2fEEmIGqKqV2EoE6n5lBRgObtov_4KmEfqwPJLdMi2G7GKnXU"
app = FastAPI(
title="Python ChatGPT plugin",
description="A Python ChatGPT plugin",
)
ALLOWED_EXTENSIONS = {'csv', 'txt', 'doc', 'docx', 'pdf', 'xlsx', 'pptx','xml','csv','sql','html','md','json'}
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
import shutil,requests,json
from fastapi import Depends, Security
from fastapi.security import OAuth2PasswordBearer
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm,OAuth2AuthorizationCodeBearer
import jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from typing import Optional
from fastapi.security import SecurityScopes
import base64
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login")
if not firebase_admin._apps:
cred = credentials.Certificate("pondering-5ff7c-c033cfade319.json")
fred = firebase_admin.initialize_app(cred) #get your service account keys from firebase
def sign_in_with_email_and_password(email=None, password=None, return_secure_token=True):
rest_api_url = "https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword"
try:
payload = {
"returnSecureToken": return_secure_token
}
if email:
payload["email"] = email
if password:
payload["password"] = password
payload = json.dumps(payload)
r = requests.post(rest_api_url, params={"key": "AIzaSyDk579sNTudOnmAEHq4iOfTNIfqVzjFXYw"}, data=payload)
r.raise_for_status() # Raises an exception for 4xx and 5xx status codes
data = r.json()
if 'idToken' in data:
return data['email']
else:
# Handle unsuccessful login
return False
except requests.exceptions.RequestException as e:
print(f"Error signing in: {e}")
return False
# Khai báo cấu hình cho mã token
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" # Đặt key bí mật cho mã token
ALGORITHM = "HS256" # Thuật toán mã hóa
# Khai báo cấu hình cho mật khẩu
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=30)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def authenticate_user(email: str, password: str):
try:
user = auth.get_user_by_email(email)
# Check if the password is correct (you may need to handle this differently with Firebase)
# For example, if you're using Firebase Auth REST API:
# https://firebase.google.com/docs/reference/rest/auth#section-verify-password
return user.uid
except Exception as e:
print(f"Authentication failed: {e}")
return None
def create_access_token(data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
from auth_hander import signJWT
@app.post('/login')
async def login(email: str, password: str):
user = sign_in_with_email_and_password(email, password) # Your authentication logic
return signJWT(user)
@app.get('/login_google')
async def login_google(email: str):
user = email # Your authentication logic
return signJWT(user)
@app.get('/login1')
async def login1(email: str, password: str):
user = sign_in_with_email_and_password(email, password) # Your authentication logic
return signJWT(user)
def refresh_token_dropbox():
app_key = "m79mo7gg7mh8pez"
app_secret = "by93yed7as4qlmo"
refresh_token = 'VG4SP0Ugo4EAAAAAAAAAAbHf7esmak_gieNGvLHxiiAXGfSeKPVXbX1lm28DMz_o'
url = 'https://api.dropbox.com/oauth2/token'
auth_string = f"{app_key}:{app_secret}"
base64authorization = base64.b64encode(auth_string.encode()).decode('utf-8')
headers = {
'Authorization': f'Basic {base64authorization}',
'Content-Type': 'application/x-www-form-urlencoded'
}
data = {
'refresh_token': refresh_token,
'grant_type': 'refresh_token'
}
response = requests.post(url, headers=headers, data=data)
response_json = response.json()
access_token = response_json.get('access_token', None)
return access_token
def delete_local_directory(id):
try:
shutil.rmtree(f"./temp/{id}")
print(f"Deleted local directory '{id}' and all its contents")
except OSError as e:
print(f"Error deleting local directory '{id}': {e}")
#The goal of this file is to check whether the reques tis authorized or not [ verification of the proteced route]
from fastapi import Request, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from auth_hander import decodeJWT
from model import PostSchema, UserSchema, UserLoginSchema
class JWTBearer(HTTPBearer):
def __init__(self, auto_error: bool = True):
super(JWTBearer, self).__init__(auto_error=auto_error)
async def __call__(self, request: Request):
credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(status_code=403, detail="Invalid authentication scheme.")
if not self.verify_jwt(credentials.credentials):
raise HTTPException(status_code=403, detail="Invalid token or expired token.")
return credentials.credentials
else:
raise HTTPException(status_code=403, detail="Invalid authorization code.")
def verify_jwt(self, jwtoken: str) -> bool:
isTokenValid: bool = False
try:
payload = decodeJWT(jwtoken)
except:
payload = None
if payload:
isTokenValid = True
return isTokenValid
users = []
def check_user(data: UserLoginSchema):
for user in users:
if user.email == data.email and user.password == data.password:
return True
return False
@app.delete("/deletefolder/",dependencies=[Depends(JWTBearer())])
async def delete_folder(id: str):
delete_local_directory(id)
return {"message": f"Deleted folder '{id}' on Dropbox and its local copy"}
@app.get("/getNameFile/",dependencies=[Depends(JWTBearer())])
async def get_name(id: str):
file= sf_dropbox.list_files(id)
return JSONResponse(content={"message": f"{file}"}, status_code=200)
@app.delete("/deleteFile/",dependencies=[Depends(JWTBearer())])
async def get_name(id: str,name_file:str):
sf_dropbox.delete_file(id,name_file)
return JSONResponse(content={"message": f"delete {name_file} success"}, status_code=200)
import auth_hander as auth123
@app.get('/refresh_token_account/')
async def login(refresh_token:str,token_now, email: str):
user = auth123.get_refresh_token(refresh_token,token_now,email)
return user
@app.get("/query2/",dependencies=[Depends(JWTBearer())])
async def handle_nhuy(id: str = Query(None), question: str = Query(None)):
if id is None or question is None:
return JSONResponse(content={"error": "Thiếu tham số id hoặc question"}, status_code=400)
text_all = sf.extract_data2(id) # Giả sử extract_data cần tham số id
test = sf.question_answer_all_query_v1_id(question, text_all, id)
return {"message": test}
@app.get("/query2_upgrade/",dependencies=[Depends(JWTBearer())])
async def handle_query2_upgrade(id: str = Query(None), question: str = Query(None),chat_history: str = Query(None)):
if id is None or question is None:
return JSONResponse(content={"error": "Thiếu tham số id hoặc question"}, status_code=400)
text_all = sf.extract_data2(id) # Giả sử extract_data cần tham số id
test = sf.handle_query_upgrade_keyword(question,text_all,id,chat_history)
return {"message": test}
@app.get("/query2_upgrade_old/",dependencies=[Depends(JWTBearer())])
async def handle_query2_upgrade(id: str = Query(None), question: str = Query(None)):
if id is None or question is None:
return JSONResponse(content={"error": "Thiếu tham số id hoặc question"}, status_code=400)
text_all = sf.extract_data2(id) # Giả sử extract_data cần tham số id
test = sf.handle_query_upgrade_keyword_old(question,text_all,id)
return {"message": test}
@app.get("/query/",dependencies=[Depends(JWTBearer())])
async def handle_query(question: str):
text_all = sf.extract_data()
test = sf.question_answer_all_query_v1(question,text_all)
return {"message": f"{test}"}
@app.get("/extract_file2/",dependencies=[Depends(JWTBearer())])
async def extract_file(id: str):
text_all = sf.extract_data2(id) # Giả sử extract_data cần tham số id
return {"message": f"Xử lý thành công{text_all}"}
@app.get("/extract_file/",dependencies=[Depends(JWTBearer())])
async def extract_file():
text_all = sf.extract_data()
return {"message": f"Xử lý thành công{text_all}"}
@app.get("/downloadfolder/",dependencies=[Depends(JWTBearer())])
async def download_folder_from_dropbox(id:str):
try:
sf_dropbox.download_folder(id)
return JSONResponse(content={"message": f"Downloaded folder '{id}' to '{id}'"}, status_code=200)
except Exception as e:
print(f"Error downloading folder '{id}': {e}")
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/downloadfile/",dependencies=[Depends(JWTBearer())])
async def download_file_by_id(id: str, file_name: str):
sf_dropbox.search_and_download_file(file_name,id)
return JSONResponse(content={"message": f"Downloaded file '{file_name}' by ID '{id}'"}, status_code=200)
@app.get("/refresh_token/")
async def refresh_token():
#try:
token_dropbox = refresh_token_dropbox()
TOKEN1 = token_dropbox
#os.makedirs("/code/temp", exist_ok=True)
#with open(f"/code/temp/TOKEN.txt", 'w') as file:
# file.write(TOKEN1)
return JSONResponse(content={"message": f"{TOKEN1}"}, status_code=200)
#except Exception as e:
# raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/searchfiles/", dependencies=[Depends(JWTBearer())])
async def search_files_starting_with(start_char: str):
try:
# Liệt kê tất cả các tệp trong thư mục trên Dropbox
result = dbx.files_list_folder("")
# Lọc các tệp bắt đầu bằng ký tự cụ thể
files_starting_with_char = [entry.name for entry in result.entries if entry.name.startswith(start_char)]
return files_starting_with_char
except dropbox.exceptions.ApiError as e:
print(f"Error searching for files: {e}")
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.post("/uploadfile/",dependencies=[Depends(JWTBearer())])
async def upload_file(file: UploadFile = File(...)):
#try:
if not allowed_file(file.filename):
raise HTTPException(status_code=429, detail="File không được hỗ trợ")
# Tạo một thư mục tạm để lưu file
os.makedirs("/code/temp", exist_ok=True)
with open(f"/code/temp/{file.filename}", "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return JSONResponse(content={"message": "File đã được tải lên và lưu thành công"}, status_code=200)
#except Exception as e:
#raise HTTPException(status_code=500, detail="Internal Server Error")
@app.post("/uploadfile2/",dependencies=[Depends(JWTBearer())])
async def upload_file(file: UploadFile = File(...), id: str = Header(None)):
try:
if id is None:
raise HTTPException(status_code=400, detail="Yêu cầu không hợp lệ: id không được cung cấp")
if not allowed_file(file.filename):
raise HTTPException(status_code=429, detail="File không được hỗ trợ")
# Tạo một thư mục tạm để lưu file
os.makedirs(f"/code/temp/{id}", exist_ok=True)
with open(f"/code/temp/{id}/{file.filename}", "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return JSONResponse(content={"message": "File đã được tải lên và lưu thành công"}, status_code=200)
except Exception as e:
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.post("/uploadfile_dropbox_fix/",dependencies=[Depends(JWTBearer())])
async def upload_file_dropbox_fix(files: List[UploadFile] = File(...), id: str = Header(None),token:str = Header(None)):
try:
if id is None:
raise HTTPException(status_code=401, detail="Yêu cầu không hợp lệ: id không được cung cấp")
if not allowed_file(file.filename):
raise HTTPException(status_code=406, detail="File không được hỗ trợ")
# Tạo một thư mục tạm để lưu file
temp_dir = f"/code/temp/{id}"
os.makedirs(temp_dir, exist_ok=True)
file_path = os.path.join(temp_dir, file.filename)
# Lưu file tạm thời
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Tải file lên Dropbox
cloud_path = f"/{id}/{file.filename}"
sf_dropbox.upload_file_fix(file_path, cloud_path,token)
# Xóa file tạm thời
os.remove(file_path)
return JSONResponse(content={"message": "File đã được tải lên và lưu thành công"}, status_code=200)
except Exception as e:
raise HTTPException(status_code=500, detail="Sever Error")
@app.post("/uploadfile_dropbox/", dependencies=[Depends(JWTBearer())])
async def upload_files_dropbox(files: List[UploadFile] = File(...), id: str = Header(None)):
#try:
if id is None:
raise HTTPException(status_code=401, detail="Yêu cầu không hợp lệ: id không được cung cấp")
for file in files:
if not allowed_file(file.filename):
raise HTTPException(status_code=406, detail="File không được hỗ trợ")
# Tạo một thư mục tạm để lưu file
temp_dir = f"/code/temp/{id}"
os.makedirs(temp_dir, exist_ok=True)
file_path = os.path.join(temp_dir, file.filename)
# Lưu file tạm thời
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Tải file lên Dropbox
cloud_path = f"/{id}/{file.filename}"
sf_dropbox.upload_file(file_path, cloud_path)
return JSONResponse(content={"message": "Các file đã được tải lên và lưu thành công"}, status_code=200)
#except Exception as e:
#raise HTTPException(status_code=500, detail="Lỗi máy chủ")