Spaces:
Sleeping
Sleeping
import dropbox.files | |
import os | |
import main as m | |
import requests,base64 | |
def refresh_token_dropbox(): | |
app_key = "m79mo7gg7mh8pez" | |
app_secret = "by93yed7as4qlmo" | |
refresh_token = 'VG4SP0Ugo4EAAAAAAAAAAbHf7esmak_gieNGvLHxiiAXGfSeKPVXbX1lm28DMz_o' | |
url = 'https://api.dropbox.com/oauth2/token' | |
auth_string = f"{app_key}:{app_secret}" | |
base64authorization = base64.b64encode(auth_string.encode()).decode('utf-8') | |
headers = { | |
'Authorization': f'Basic {base64authorization}', | |
'Content-Type': 'application/x-www-form-urlencoded' | |
} | |
data = { | |
'refresh_token': refresh_token, | |
'grant_type': 'refresh_token' | |
} | |
response = requests.post(url, headers=headers, data=data) | |
response_json = response.json() | |
access_token = response_json.get('access_token', None) | |
return access_token | |
TOKEN = refresh_token_dropbox() | |
dbx=dropbox.Dropbox(TOKEN) | |
def upload_all_local_files(): | |
for file in os.listdir("local_files"): | |
with open(os.path.join("local_files",file),"rb") as f: | |
data = f.read() | |
dbx.files_upload(data,f"/{file}") | |
def download_all_cloud_files(): | |
for entry in dbx.files_list_folder("").entries: | |
dbx.files_download_to_file(os.path.join("local_files1",entry.name),f"/{entry.name}") | |
def delete_file(id,name_file): | |
try: | |
TOKEN = refresh_token_dropbox() | |
dbx=dropbox.Dropbox(TOKEN) | |
file_path = f"/{id}/{name_file}" | |
dbx.files_delete_v2(file_path) | |
print(f"Xóa file '{file_path}' thành công.") | |
except dropbox.exceptions.ApiError as e: | |
print(f"Lỗi khi xóa file '{file_path}': {e}") | |
def list_files(id): | |
file_names = [] | |
try: | |
TOKEN = refresh_token_dropbox() | |
dbx=dropbox.Dropbox(TOKEN) | |
# Liệt kê tất cả các tệp trong thư mục trên Dropbox | |
result = dbx.files_list_folder(f"/{id}") | |
# Lấy tên của từng tệp | |
for entry in result.entries: | |
if isinstance(entry, dropbox.files.FileMetadata): | |
file_names.append(os.path.basename(entry.path_display)) | |
except dropbox.exceptions.ApiError as e: | |
print(f"Error listing files: {e}") | |
return file_names | |
def upload_file_fix(local_path,cloud_path,token): | |
try: | |
TOKEN = refresh_token_dropbox() | |
dbx=dropbox.Dropbox(TOKEN) | |
with open(local_path, "rb") as f: | |
data = f.read() | |
dbx.files_upload(data, cloud_path) | |
print(f"Uploaded file '{local_path}' to '{cloud_path}'") | |
except dropbox.exceptions.ApiError as e: | |
print(f"Error uploading file '{local_path}': {e}") | |
def upload_file(local_path, cloud_path): | |
try: | |
TOKEN = refresh_token_dropbox() | |
dbx=dropbox.Dropbox(TOKEN) | |
with open(local_path, "rb") as f: | |
data = f.read() | |
dbx.files_upload(data, cloud_path) | |
print(f"Uploaded file '{local_path}' to '{cloud_path}'") | |
except dropbox.exceptions.ApiError as e: | |
upload_file_fix() | |
#print(f"Error uploading file '{local_path}': {e}") | |
#download_file("demo1.pdf", os.path.join("local_files1", "demo1.pdf")) | |
def search_files_starting_with_j(): | |
try: | |
# List all files in the root folder | |
result = dbx.files_list_folder("") | |
# Filter files starting with 'j' | |
files_starting_with_j = [entry.name for entry in result.entries if entry.name.startswith("demo")] | |
# Print the list of files starting with 'j' | |
print("Files starting with 'demo':") | |
for file_name in files_starting_with_j: | |
print(file_name) | |
except dropbox.exceptions.ApiError as e: | |
print(f"Error searching for files: {e}") | |
def download_folder(id): | |
try: | |
TOKEN = refresh_token_dropbox() | |
dbx=dropbox.Dropbox(TOKEN) | |
# Tạo thư mục cục bộ nếu nó chưa tồn tại | |
os.makedirs(f"/code/temp/{id}", exist_ok=True) | |
path = f"/code/temp/{id}" | |
# Liệt kê tất cả các tệp trong thư mục trên Dropbox | |
result = dbx.files_list_folder(f"/{id}") | |
# Tải về từng tệp | |
for entry in result.entries: | |
if isinstance(entry, dropbox.files.FileMetadata): | |
cloud_file_path = entry.path_display | |
file_name = os.path.basename(cloud_file_path) | |
local_file_path = os.path.join(path, file_name) | |
dbx.files_download_to_file(local_file_path, cloud_file_path) | |
print(f"Downloaded file '{file_name}' to '{local_file_path}'") | |
except dropbox.exceptions.ApiError as e: | |
print(f"Error downloading file '{id}': {e}") | |
from fastapi import FastAPI, UploadFile, File, HTTPException | |
def download_file_id(file_name, id): | |
try: | |
# Tạo thư mục cục bộ nếu nó chưa tồn tại | |
local_folder_path = f"/code/temp/{id}" | |
os.makedirs(local_folder_path, exist_ok=True) | |
# Tải file từ Dropbox về thư mục cục bộ | |
local_file_path = os.path.join(local_folder_path, file_name) | |
with open(local_file_path, "wb") as f: | |
metadata, response = dbx.files_download(f"/{id}/{file_name}") | |
f.write(response.content) | |
print(f"Downloaded file '{file_name}' to '{local_file_path}'") | |
except dropbox.exceptions.ApiError as e: | |
print(f"Error downloading file '{file_name}': {e}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
def search_and_download_file(start_char, id): | |
try: | |
# Liệt kê tất cả các tệp trong thư mục trên Dropbox | |
result = dbx.files_list_folder(f"/{id}") | |
# Lọc các tệp bắt đầu bằng ký tự cụ thể | |
files_starting_with_char = [entry.name for entry in result.entries if entry.name.startswith(start_char)] | |
if len(files_starting_with_char) == 0: | |
print(f"No file found starting with '{start_char}' in folder '{id}'") | |
return | |
# Chọn tệp đầu tiên | |
file_name = files_starting_with_char[0] | |
# Tải file từ Dropbox về thư mục cục bộ | |
local_folder_path = f"/code/temp/{id}" | |
os.makedirs(local_folder_path, exist_ok=True) | |
local_file_path = os.path.join(local_folder_path, file_name) | |
with open(local_file_path, "wb") as f: | |
metadata, response = dbx.files_download(f"/{id}/{file_name}") | |
f.write(response.content) | |
print(f"Downloaded file '{file_name}' to '{local_file_path}'") | |
except dropbox.exceptions.ApiError as e: | |
print(f"Error searching or downloading file: {e}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |