KLTN / store_file_dropbox.py
ohhhchank3's picture
Update store_file_dropbox.py
502d9ec verified
import dropbox.files
import os
import main as m
import requests,base64
def refresh_token_dropbox():
app_key = "m79mo7gg7mh8pez"
app_secret = "by93yed7as4qlmo"
refresh_token = 'VG4SP0Ugo4EAAAAAAAAAAbHf7esmak_gieNGvLHxiiAXGfSeKPVXbX1lm28DMz_o'
url = 'https://api.dropbox.com/oauth2/token'
auth_string = f"{app_key}:{app_secret}"
base64authorization = base64.b64encode(auth_string.encode()).decode('utf-8')
headers = {
'Authorization': f'Basic {base64authorization}',
'Content-Type': 'application/x-www-form-urlencoded'
}
data = {
'refresh_token': refresh_token,
'grant_type': 'refresh_token'
}
response = requests.post(url, headers=headers, data=data)
response_json = response.json()
access_token = response_json.get('access_token', None)
return access_token
TOKEN = refresh_token_dropbox()
dbx=dropbox.Dropbox(TOKEN)
def upload_all_local_files():
for file in os.listdir("local_files"):
with open(os.path.join("local_files",file),"rb") as f:
data = f.read()
dbx.files_upload(data,f"/{file}")
def download_all_cloud_files():
for entry in dbx.files_list_folder("").entries:
dbx.files_download_to_file(os.path.join("local_files1",entry.name),f"/{entry.name}")
def delete_file(id,name_file):
try:
TOKEN = refresh_token_dropbox()
dbx=dropbox.Dropbox(TOKEN)
file_path = f"/{id}/{name_file}"
dbx.files_delete_v2(file_path)
print(f"Xóa file '{file_path}' thành công.")
except dropbox.exceptions.ApiError as e:
print(f"Lỗi khi xóa file '{file_path}': {e}")
def list_files(id):
file_names = []
try:
TOKEN = refresh_token_dropbox()
dbx=dropbox.Dropbox(TOKEN)
# Liệt kê tất cả các tệp trong thư mục trên Dropbox
result = dbx.files_list_folder(f"/{id}")
# Lấy tên của từng tệp
for entry in result.entries:
if isinstance(entry, dropbox.files.FileMetadata):
file_names.append(os.path.basename(entry.path_display))
except dropbox.exceptions.ApiError as e:
print(f"Error listing files: {e}")
return file_names
def upload_file_fix(local_path,cloud_path,token):
try:
TOKEN = refresh_token_dropbox()
dbx=dropbox.Dropbox(TOKEN)
with open(local_path, "rb") as f:
data = f.read()
dbx.files_upload(data, cloud_path)
print(f"Uploaded file '{local_path}' to '{cloud_path}'")
except dropbox.exceptions.ApiError as e:
print(f"Error uploading file '{local_path}': {e}")
def upload_file(local_path, cloud_path):
try:
TOKEN = refresh_token_dropbox()
dbx=dropbox.Dropbox(TOKEN)
with open(local_path, "rb") as f:
data = f.read()
dbx.files_upload(data, cloud_path)
print(f"Uploaded file '{local_path}' to '{cloud_path}'")
except dropbox.exceptions.ApiError as e:
upload_file_fix()
#print(f"Error uploading file '{local_path}': {e}")
#download_file("demo1.pdf", os.path.join("local_files1", "demo1.pdf"))
def search_files_starting_with_j():
try:
# List all files in the root folder
result = dbx.files_list_folder("")
# Filter files starting with 'j'
files_starting_with_j = [entry.name for entry in result.entries if entry.name.startswith("demo")]
# Print the list of files starting with 'j'
print("Files starting with 'demo':")
for file_name in files_starting_with_j:
print(file_name)
except dropbox.exceptions.ApiError as e:
print(f"Error searching for files: {e}")
def download_folder(id):
try:
TOKEN = refresh_token_dropbox()
dbx=dropbox.Dropbox(TOKEN)
# Tạo thư mục cục bộ nếu nó chưa tồn tại
os.makedirs(f"/code/temp/{id}", exist_ok=True)
path = f"/code/temp/{id}"
# Liệt kê tất cả các tệp trong thư mục trên Dropbox
result = dbx.files_list_folder(f"/{id}")
# Tải về từng tệp
for entry in result.entries:
if isinstance(entry, dropbox.files.FileMetadata):
cloud_file_path = entry.path_display
file_name = os.path.basename(cloud_file_path)
local_file_path = os.path.join(path, file_name)
dbx.files_download_to_file(local_file_path, cloud_file_path)
print(f"Downloaded file '{file_name}' to '{local_file_path}'")
except dropbox.exceptions.ApiError as e:
print(f"Error downloading file '{id}': {e}")
from fastapi import FastAPI, UploadFile, File, HTTPException
def download_file_id(file_name, id):
try:
# Tạo thư mục cục bộ nếu nó chưa tồn tại
local_folder_path = f"/code/temp/{id}"
os.makedirs(local_folder_path, exist_ok=True)
# Tải file từ Dropbox về thư mục cục bộ
local_file_path = os.path.join(local_folder_path, file_name)
with open(local_file_path, "wb") as f:
metadata, response = dbx.files_download(f"/{id}/{file_name}")
f.write(response.content)
print(f"Downloaded file '{file_name}' to '{local_file_path}'")
except dropbox.exceptions.ApiError as e:
print(f"Error downloading file '{file_name}': {e}")
raise HTTPException(status_code=500, detail="Internal Server Error")
def search_and_download_file(start_char, id):
try:
# Liệt kê tất cả các tệp trong thư mục trên Dropbox
result = dbx.files_list_folder(f"/{id}")
# Lọc các tệp bắt đầu bằng ký tự cụ thể
files_starting_with_char = [entry.name for entry in result.entries if entry.name.startswith(start_char)]
if len(files_starting_with_char) == 0:
print(f"No file found starting with '{start_char}' in folder '{id}'")
return
# Chọn tệp đầu tiên
file_name = files_starting_with_char[0]
# Tải file từ Dropbox về thư mục cục bộ
local_folder_path = f"/code/temp/{id}"
os.makedirs(local_folder_path, exist_ok=True)
local_file_path = os.path.join(local_folder_path, file_name)
with open(local_file_path, "wb") as f:
metadata, response = dbx.files_download(f"/{id}/{file_name}")
f.write(response.content)
print(f"Downloaded file '{file_name}' to '{local_file_path}'")
except dropbox.exceptions.ApiError as e:
print(f"Error searching or downloading file: {e}")
raise HTTPException(status_code=500, detail="Internal Server Error")