from fastapi import FastAPI, File, UploadFile, HTTPException, Header,Query
from fastapi.responses import JSONResponse
import dropbox
from typing import Any, Dict, List, Optional, Union
import shutil
import os
import firebase_admin.auth
import nest_asyncio
from pyngrok import ngrok
import uvicorn
import store_file_dropbox as sf_dropbox
from typing import List
import server_function as sf
import firebase_admin
from firebase_admin import credentials,auth
import uvicorn
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from fastapi.exceptions import HTTPException
from fastapi.requests import Request
global_TOKEN = ""
# Đọc token từ file
with open("/code/TOKEN.txt", "r") as f:
global_TOKEN = f.read().strip()
# Khởi tạo đối tượng Dropbox
dbx = dropbox.Dropbox(global_TOKEN)
NGROK_STATIC_DOMAIN = "rightly-poetic-amoeba.ngrok-free.app"
app = FastAPI(
title="Python ChatGPT plugin",
description="A Python ChatGPT plugin",
ALLOWED_EXTENSIONS = {'csv', 'txt', 'doc', 'docx', 'pdf', 'xlsx', 'pptx','xml','csv','sql','html','md','json'}
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
import shutil,requests,json
from fastapi import Depends, Security
from fastapi.security import OAuth2PasswordBearer
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm,OAuth2AuthorizationCodeBearer
import jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from typing import Optional
from fastapi.security import SecurityScopes
import base64
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login")
if not firebase_admin._apps:
cred = credentials.Certificate("pondering-5ff7c-c033cfade319.json")
fred = firebase_admin.initialize_app(cred) #get your service account keys from firebase
def sign_in_with_email_and_password(email=None, password=None, return_secure_token=True):
rest_api_url = "https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword"
payload = {
"returnSecureToken": return_secure_token
if email:
payload["email"] = email
if password:
payload["password"] = password
payload = json.dumps(payload)
r = requests.post(rest_api_url, params={"key": "AIzaSyDk579sNTudOnmAEHq4iOfTNIfqVzjFXYw"}, data=payload)
r.raise_for_status() # Raises an exception for 4xx and 5xx status codes
data = r.json()
if 'idToken' in data:
return data['email']
# Handle unsuccessful login
return False
except requests.exceptions.RequestException as e:
print(f"Error signing in: {e}")
return False
# Khai báo cấu hình cho mã token
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" # Đặt key bí mật cho mã token
ALGORITHM = "HS256" # Thuật toán mã hóa
# Khai báo cấu hình cho mật khẩu
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
expire = datetime.utcnow() + timedelta(minutes=30)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def authenticate_user(email: str, password: str):
user = auth.get_user_by_email(email)
# Check if the password is correct (you may need to handle this differently with Firebase)
# For example, if you're using Firebase Auth REST API:
# https://firebase.google.com/docs/reference/rest/auth#section-verify-password
return user.uid
except Exception as e:
print(f"Authentication failed: {e}")
return None
def create_access_token(data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
from auth_hander import signJWT
async def login(email: str, password: str):
user = sign_in_with_email_and_password(email, password) # Your authentication logic
return signJWT(user)
async def login_google(email: str):
user = email # Your authentication logic
return signJWT(user)
async def login1(email: str, password: str):
user = sign_in_with_email_and_password(email, password) # Your authentication logic
return signJWT(user)
def refresh_token_dropbox():
app_key = "m79mo7gg7mh8pez"
app_secret = "by93yed7as4qlmo"
refresh_token = 'VG4SP0Ugo4EAAAAAAAAAAbHf7esmak_gieNGvLHxiiAXGfSeKPVXbX1lm28DMz_o'
url = 'https://api.dropbox.com/oauth2/token'
auth_string = f"{app_key}:{app_secret}"
base64authorization = base64.b64encode(auth_string.encode()).decode('utf-8')
headers = {
'Authorization': f'Basic {base64authorization}',
'Content-Type': 'application/x-www-form-urlencoded'
data = {
'refresh_token': refresh_token,
'grant_type': 'refresh_token'
response = requests.post(url, headers=headers, data=data)
response_json = response.json()
access_token = response_json.get('access_token', None)
return access_token
def delete_local_directory(id):
print(f"Deleted local directory '{id}' and all its contents")
except OSError as e:
print(f"Error deleting local directory '{id}': {e}")
#The goal of this file is to check whether the reques tis authorized or not [ verification of the proteced route]
from fastapi import Request, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from auth_hander import decodeJWT
from model import PostSchema, UserSchema, UserLoginSchema
class JWTBearer(HTTPBearer):
def __init__(self, auto_error: bool = True):
super(JWTBearer, self).__init__(auto_error=auto_error)
async def __call__(self, request: Request):
credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(status_code=403, detail="Invalid authentication scheme.")
if not self.verify_jwt(credentials.credentials):
raise HTTPException(status_code=403, detail="Invalid token or expired token.")
return credentials.credentials
raise HTTPException(status_code=403, detail="Invalid authorization code.")
def verify_jwt(self, jwtoken: str) -> bool:
isTokenValid: bool = False
payload = decodeJWT(jwtoken)
payload = None
if payload:
isTokenValid = True
return isTokenValid
users = []
def check_user(data: UserLoginSchema):
for user in users:
if user.email == data.email and user.password == data.password:
return True
return False
async def delete_folder(id: str):
return {"message": f"Deleted folder '{id}' on Dropbox and its local copy"}
async def get_name(id: str):
file= sf_dropbox.list_files(id)
return JSONResponse(content={"message": f"{file}"}, status_code=200)
async def get_name(id: str,name_file:str):
return JSONResponse(content={"message": f"delete {name_file} success"}, status_code=200)
import auth_hander as auth123
async def login(refresh_token:str,token_now, email: str):
user = auth123.get_refresh_token(refresh_token,token_now,email)
return user
async def handle_nhuy(id: str = Query(None), question: str = Query(None)):
if id is None or question is None:
return JSONResponse(content={"error": "Thiếu tham số id hoặc question"}, status_code=400)
text_all = sf.extract_data2(id) # Giả sử extract_data cần tham số id
test = sf.question_answer_all_query_v1_id(question, text_all, id)
return {"message": test}
async def handle_query2_upgrade(id: str = Query(None), question: str = Query(None),chat_history: str = Query(None)):
if id is None or question is None:
return JSONResponse(content={"error": "Thiếu tham số id hoặc question"}, status_code=400)
text_all = sf.extract_data2(id) # Giả sử extract_data cần tham số id
test = sf.handle_query_upgrade_keyword(question,text_all,id,chat_history)
return {"message": test}
async def handle_query2_upgrade(id: str = Query(None), question: str = Query(None)):
if id is None or question is None:
return JSONResponse(content={"error": "Thiếu tham số id hoặc question"}, status_code=400)
text_all = sf.extract_data2(id) # Giả sử extract_data cần tham số id
test = sf.handle_query_upgrade_keyword_old(question,text_all,id)
return {"message": test}
async def handle_query(question: str):
text_all = sf.extract_data()
test = sf.question_answer_all_query_v1(question,text_all)
return {"message": f"{test}"}
async def extract_file(id: str):
text_all = sf.extract_data2(id) # Giả sử extract_data cần tham số id
return {"message": f"Xử lý thành công{text_all}"}
async def extract_file():
text_all = sf.extract_data()
return {"message": f"Xử lý thành công{text_all}"}
async def download_folder_from_dropbox(id:str):
return JSONResponse(content={"message": f"Downloaded folder '{id}' to '{id}'"}, status_code=200)
except Exception as e:
print(f"Error downloading folder '{id}': {e}")
raise HTTPException(status_code=500, detail="Internal Server Error")
async def download_file_by_id(id: str, file_name: str):
return JSONResponse(content={"message": f"Downloaded file '{file_name}' by ID '{id}'"}, status_code=200)
async def refresh_token():
token_dropbox = refresh_token_dropbox()
TOKEN1 = token_dropbox
#os.makedirs("/code/temp", exist_ok=True)
#with open(f"/code/temp/TOKEN.txt", 'w') as file:
# file.write(TOKEN1)
return JSONResponse(content={"message": f"{TOKEN1}"}, status_code=200)
#except Exception as e:
# raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/searchfiles/", dependencies=[Depends(JWTBearer())])
async def search_files_starting_with(start_char: str):
# Liệt kê tất cả các tệp trong thư mục trên Dropbox
result = dbx.files_list_folder("")
# Lọc các tệp bắt đầu bằng ký tự cụ thể
files_starting_with_char = [entry.name for entry in result.entries if entry.name.startswith(start_char)]
return files_starting_with_char
except dropbox.exceptions.ApiError as e:
print(f"Error searching for files: {e}")
raise HTTPException(status_code=500, detail="Internal Server Error")
async def upload_file(file: UploadFile = File(...)):
if not allowed_file(file.filename):
raise HTTPException(status_code=429, detail="File không được hỗ trợ")
# Tạo một thư mục tạm để lưu file
os.makedirs("/code/temp", exist_ok=True)
with open(f"/code/temp/{file.filename}", "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return JSONResponse(content={"message": "File đã được tải lên và lưu thành công"}, status_code=200)
#except Exception as e:
#raise HTTPException(status_code=500, detail="Internal Server Error")
async def upload_file(file: UploadFile = File(...), id: str = Header(None)):
if id is None:
raise HTTPException(status_code=400, detail="Yêu cầu không hợp lệ: id không được cung cấp")
if not allowed_file(file.filename):
raise HTTPException(status_code=429, detail="File không được hỗ trợ")
# Tạo một thư mục tạm để lưu file
os.makedirs(f"/code/temp/{id}", exist_ok=True)
with open(f"/code/temp/{id}/{file.filename}", "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return JSONResponse(content={"message": "File đã được tải lên và lưu thành công"}, status_code=200)
except Exception as e:
raise HTTPException(status_code=500, detail="Internal Server Error")
async def upload_file_dropbox_fix(files: List[UploadFile] = File(...), id: str = Header(None),token:str = Header(None)):
if id is None:
raise HTTPException(status_code=401, detail="Yêu cầu không hợp lệ: id không được cung cấp")
if not allowed_file(file.filename):
raise HTTPException(status_code=406, detail="File không được hỗ trợ")
# Tạo một thư mục tạm để lưu file
temp_dir = f"/code/temp/{id}"
os.makedirs(temp_dir, exist_ok=True)
file_path = os.path.join(temp_dir, file.filename)
# Lưu file tạm thời
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Tải file lên Dropbox
cloud_path = f"/{id}/{file.filename}"
sf_dropbox.upload_file_fix(file_path, cloud_path,token)
# Xóa file tạm thời
return JSONResponse(content={"message": "File đã được tải lên và lưu thành công"}, status_code=200)
except Exception as e:
raise HTTPException(status_code=500, detail="Sever Error")
@app.post("/uploadfile_dropbox/", dependencies=[Depends(JWTBearer())])
async def upload_files_dropbox(files: List[UploadFile] = File(...), id: str = Header(None)):
if id is None:
raise HTTPException(status_code=401, detail="Yêu cầu không hợp lệ: id không được cung cấp")
for file in files:
if not allowed_file(file.filename):
raise HTTPException(status_code=406, detail="File không được hỗ trợ")
# Tạo một thư mục tạm để lưu file
temp_dir = f"/code/temp/{id}"
os.makedirs(temp_dir, exist_ok=True)
file_path = os.path.join(temp_dir, file.filename)
# Lưu file tạm thời
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Tải file lên Dropbox
cloud_path = f"/{id}/{file.filename}"
sf_dropbox.upload_file(file_path, cloud_path)
return JSONResponse(content={"message": "Các file đã được tải lên và lưu thành công"}, status_code=200)
#except Exception as e:
#raise HTTPException(status_code=500, detail="Lỗi máy chủ")