Spaces:
Running
Running
try: | |
import os | |
import io | |
import json | |
import hashlib | |
from googleapiclient.discovery import build | |
from googleapiclient.http import MediaIoBaseDownload | |
from google.auth.transport.requests import Request | |
from google.oauth2.credentials import Credentials | |
from tqdm import tqdm | |
except ImportError as e: | |
# Se faltarem as bibliotecas necessárias, levantamos Exception | |
raise Exception( | |
"Faltam bibliotecas necessárias para o GoogleDriveDownloader. " | |
"Instale-as com:\n\n" | |
" pip install google-api-python-client google-auth-httplib2 google-auth-oauthlib tqdm\n\n" | |
f"Detalhes do erro: {str(e)}" | |
) | |
class GoogleDriveDownloader: | |
""" | |
Classe para autenticar e baixar arquivos do Google Drive, | |
preservando a estrutura de pastas e evitando downloads redundantes. | |
- Nunca abrirá navegador se não encontrar token válido (apenas levanta exceção). | |
- Pode ler 'credentials.json' e 'token.json' do disco ou das variáveis de ambiente. | |
""" | |
SCOPES = ['https://www.googleapis.com/auth/drive.readonly'] | |
def __init__(self, chunksize=100 * 1024 * 1024): | |
""" | |
:param chunksize: Tamanho (em bytes) de cada chunk ao baixar arquivos. | |
Ex.: 100MB = 100 * 1024 * 1024. | |
""" | |
self.chunksize = chunksize | |
self.service = None | |
def _get_credentials_from_env_or_file(self): | |
""" | |
Verifica se existem variáveis de ambiente para 'CREDENTIALS' e 'TOKEN'. | |
Caso contrário, tenta usar arquivos locais 'credentials.json' e 'token.json'. | |
Se o token local/ambiente não existir ou for inválido (sem refresh), | |
levanta exceção (não abrimos navegador neste fluxo). | |
""" | |
print("Procurando credentials na variavel de ambiente...") | |
env_credentials = os.environ.get("CREDENTIALS") # Conteúdo JSON do client secrets | |
env_token = os.environ.get("TOKEN") # Conteúdo JSON do token | |
creds = None | |
# 1) Carregar credenciais do ambiente, se houver | |
if env_credentials: | |
try: | |
creds_json = json.loads(env_credentials) | |
except json.JSONDecodeError: | |
raise ValueError("A variável de ambiente 'CREDENTIALS' não contém JSON válido.") | |
# Validamos o "client_id" para garantir que seja um JSON de credenciais mesmo | |
client_id = ( | |
creds_json.get("installed", {}).get("client_id") or | |
creds_json.get("web", {}).get("client_id") | |
) | |
if not client_id: | |
raise ValueError("Credenciais em memória não parecem válidas. Faltam campos 'client_id'.") | |
else: | |
# Se não há credenciais no ambiente, tentamos local | |
if not os.path.exists("credentials.json"): | |
raise FileNotFoundError( | |
"Nenhuma credencial encontrada em ambiente ou no arquivo 'credentials.json'." | |
) | |
print("Variavel não encontrada, usando credentials.json") | |
with open("credentials.json", 'r', encoding='utf-8') as f: | |
creds_json = json.load(f) | |
print("Procurando tokens na variavel de ambiente...") | |
token_data = None | |
if env_token: | |
try: | |
token_data = json.loads(env_token) | |
except json.JSONDecodeError: | |
raise ValueError("A variável de ambiente 'TOKEN' não contém JSON válido.") | |
else: | |
# Se não há token no ambiente, checamos arquivo local | |
if os.path.exists("token.json"): | |
print("Variavel não encontrada, usando token.json") | |
with open("token.json", 'r', encoding='utf-8') as tf: | |
token_data = json.load(tf) | |
else: | |
raise FileNotFoundError( | |
"Não há token no ambiente nem em 'token.json'. " | |
"Não é possível autenticar sem abrir navegador, então abortando." | |
) | |
# 3) Criar credenciais a partir do token_data | |
creds = Credentials.from_authorized_user_info(token_data, self.SCOPES) | |
# 4) Se expirou, tenta refresh | |
if not creds.valid: | |
if creds.expired and creds.refresh_token: | |
creds.refresh(Request()) | |
# Salva token atualizado, se estiver usando arquivo local | |
if not env_token: # só sobrescreve se está lendo do disco | |
with open("token.json", 'w', encoding='utf-8') as token_file: | |
token_file.write(creds.to_json()) | |
else: | |
# Se não é válido e não há refresh token, não temos como renovar sem navegador | |
raise RuntimeError( | |
"As credenciais de token são inválidas/expiradas e sem refresh token. " | |
"Não é possível abrir navegador neste fluxo, abortando." | |
) | |
return creds | |
def authenticate(self): | |
"""Cria e armazena o serviço do Drive API nesta instância.""" | |
creds = self._get_credentials_from_env_or_file() | |
self.service = build("drive", "v3", credentials=creds) | |
def _list_files_in_folder(self, folder_id): | |
"""Retorna a lista de itens (arquivos/pastas) diretamente em 'folder_id'.""" | |
items = [] | |
page_token = None | |
query = f"'{folder_id}' in parents and trashed=false" | |
while True: | |
response = self.service.files().list( | |
q=query, | |
spaces='drive', | |
fields='nextPageToken, files(id, name, mimeType)', | |
pageToken=page_token | |
).execute() | |
items.extend(response.get('files', [])) | |
page_token = response.get('nextPageToken', None) | |
if not page_token: | |
break | |
return items | |
def _get_file_metadata(self, file_id): | |
""" | |
Retorna (size, md5Checksum, modifiedTime) de um arquivo no Drive. | |
Se algum campo não existir, retorna valor padrão. | |
""" | |
data = self.service.files().get( | |
fileId=file_id, | |
fields='size, md5Checksum, modifiedTime' | |
).execute() | |
size = int(data.get('size', 0)) | |
md5 = data.get('md5Checksum', '') | |
modified_time = data.get('modifiedTime', '') | |
return size, md5, modified_time | |
def _get_all_items_recursively(self, folder_id, parent_path=''): | |
""" | |
Percorre recursivamente a pasta (folder_id) no Drive, | |
retornando lista de dicts (id, name, mimeType, path). | |
""" | |
results = [] | |
items = self._list_files_in_folder(folder_id) | |
for item in items: | |
current_path = os.path.join(parent_path, item['name']) | |
if item['mimeType'] == 'application/vnd.google-apps.folder': | |
results.append({ | |
'id': item['id'], | |
'name': item['name'], | |
'mimeType': item['mimeType'], | |
'path': current_path | |
}) | |
sub = self._get_all_items_recursively(item['id'], current_path) | |
results.extend(sub) | |
else: | |
results.append({ | |
'id': item['id'], | |
'name': item['name'], | |
'mimeType': item['mimeType'], | |
'path': parent_path | |
}) | |
return results | |
def _needs_download(self, local_folder, file_info): | |
""" | |
Verifica se o arquivo em 'file_info' precisa ser baixado. | |
- Se não existir localmente, retorna True. | |
- Se existir, compara tamanho e MD5 (quando disponível). | |
- Retorna True se for diferente, False se for idêntico. | |
""" | |
file_id = file_info['id'] | |
file_name = file_info['name'] | |
rel_path = file_info['path'] | |
drive_size, drive_md5, _ = self._get_file_metadata(file_id) | |
full_local_path = os.path.join(local_folder, rel_path, file_name) | |
if not os.path.exists(full_local_path): | |
return True # Não existe localmente | |
local_size = os.path.getsize(full_local_path) | |
if local_size != drive_size: | |
return True | |
if drive_md5: | |
with open(full_local_path, 'rb') as f: | |
local_md5 = hashlib.md5(f.read()).hexdigest() | |
if local_md5 != drive_md5: | |
return True | |
return False | |
def _download_single_file(self, file_id, file_name, relative_path, progress_bar): | |
""" | |
Faz download de um único arquivo do Drive, atualizando a barra de progresso global. | |
""" | |
# Como fizemos 'os.chdir(local_folder)' antes, 'relative_path' pode ser vazio. | |
# Então concatenamos sem o local_folder: | |
file_path = os.path.join(relative_path, file_name) | |
# Se o path do diretório for vazio, cai no '.' para evitar WinError 3 | |
dir_name = os.path.dirname(file_path) or '.' | |
os.makedirs(dir_name, exist_ok=True) | |
request = self.service.files().get_media(fileId=file_id) | |
with io.FileIO(file_path, 'wb') as fh: | |
downloader = MediaIoBaseDownload(fh, request, chunksize=self.chunksize) | |
done = False | |
previous_progress = 0 | |
while not done: | |
status, done = downloader.next_chunk() | |
if status: | |
current_progress = status.resumable_progress | |
chunk_downloaded = current_progress - previous_progress | |
previous_progress = current_progress | |
progress_bar.update(chunk_downloaded) | |
def download_from_folder(self, drive_folder_id: str, local_folder: str): | |
""" | |
Método principal para: | |
1. Autenticar sem abrir navegador (usa token local/ambiente). | |
2. Exibir "Iniciando verificação de documentos". | |
3. Listar recursivamente arquivos da pasta do Drive. | |
4. Verificar quais precisam de download. | |
5. Baixar apenas o necessário, com barra de progresso única. | |
""" | |
print("Iniciando verificação de documentos") | |
if not self.service: | |
self.authenticate() | |
print("Buscando lista de arquivos no Drive...") | |
all_items = self._get_all_items_recursively(drive_folder_id) | |
# Filtra apenas arquivos (exclui subpastas) | |
all_files = [f for f in all_items if f['mimeType'] != 'application/vnd.google-apps.folder'] | |
print("Verificando quais arquivos precisam ser baixados...") | |
files_to_download = [] | |
total_size_to_download = 0 | |
for info in all_files: | |
if self._needs_download(local_folder, info): | |
drive_size, _, _ = self._get_file_metadata(info['id']) | |
total_size_to_download += drive_size | |
files_to_download.append(info) | |
if not files_to_download: | |
print("Nenhum arquivo novo ou atualizado. Tudo sincronizado!") | |
return | |
print("Calculando total de bytes a serem baixados...") | |
# Ajusta a pasta local e cria se necessário | |
os.makedirs(local_folder, exist_ok=True) | |
# Muda diretório de trabalho para simplificar criação de subpastas | |
old_cwd = os.getcwd() | |
os.chdir(local_folder) | |
# Cria a barra de progresso global | |
progress_bar = tqdm( | |
total=total_size_to_download, | |
unit='B', | |
unit_scale=True, | |
desc='Baixando arquivos' | |
) | |
# Baixa só o que precisa | |
for file_info in files_to_download: | |
self._download_single_file( | |
file_id=file_info['id'], | |
file_name=file_info['name'], | |
relative_path=file_info['path'], | |
progress_bar=progress_bar | |
) | |
progress_bar.close() | |
os.chdir(old_cwd) | |
print("Download concluído com sucesso!") | |