Spaces:
Sleeping
Sleeping
| try: | |
| import os | |
| import io | |
| import json | |
| import hashlib | |
| from googleapiclient.discovery import build | |
| from googleapiclient.http import MediaIoBaseDownload | |
| from google.auth.transport.requests import Request | |
| from google.oauth2.credentials import Credentials | |
| from tqdm import tqdm | |
| except ImportError as e: | |
| # Se faltarem as bibliotecas necessárias, levantamos Exception | |
| raise Exception( | |
| "Faltam bibliotecas necessárias para o GoogleDriveDownloader. " | |
| "Instale-as com:\n\n" | |
| " pip install google-api-python-client google-auth-httplib2 google-auth-oauthlib tqdm\n\n" | |
| f"Detalhes do erro: {str(e)}" | |
| ) | |
| class GoogleDriveDownloader: | |
| """ | |
| Classe para autenticar e baixar arquivos do Google Drive, | |
| preservando a estrutura de pastas e evitando downloads redundantes. | |
| - Nunca abrirá navegador se não encontrar token válido (apenas levanta exceção). | |
| - Pode ler 'credentials.json' e 'token.json' do disco ou das variáveis de ambiente. | |
| """ | |
| SCOPES = ['https://www.googleapis.com/auth/drive.readonly'] | |
| def __init__(self, chunksize=100 * 1024 * 1024): | |
| """ | |
| :param chunksize: Tamanho (em bytes) de cada chunk ao baixar arquivos. | |
| Ex.: 100MB = 100 * 1024 * 1024. | |
| """ | |
| self.chunksize = chunksize | |
| self.service = None | |
| def _get_credentials_from_env_or_file(self): | |
| """ | |
| Verifica se existem variáveis de ambiente para 'CREDENTIALS' e 'TOKEN'. | |
| Caso contrário, tenta usar arquivos locais 'credentials.json' e 'token.json'. | |
| Se o token local/ambiente não existir ou for inválido (sem refresh), | |
| levanta exceção (não abrimos navegador neste fluxo). | |
| """ | |
| print("Procurando credentials na variavel de ambiente...") | |
| env_credentials = os.environ.get("CREDENTIALS") # Conteúdo JSON do client secrets | |
| env_token = os.environ.get("TOKEN") # Conteúdo JSON do token | |
| creds = None | |
| # 1) Carregar credenciais do ambiente, se houver | |
| if env_credentials: | |
| try: | |
| creds_json = json.loads(env_credentials) | |
| except json.JSONDecodeError: | |
| raise ValueError("A variável de ambiente 'CREDENTIALS' não contém JSON válido.") | |
| # Validamos o "client_id" para garantir que seja um JSON de credenciais mesmo | |
| client_id = ( | |
| creds_json.get("installed", {}).get("client_id") or | |
| creds_json.get("web", {}).get("client_id") | |
| ) | |
| if not client_id: | |
| raise ValueError("Credenciais em memória não parecem válidas. Faltam campos 'client_id'.") | |
| else: | |
| # Se não há credenciais no ambiente, tentamos local | |
| if not os.path.exists("credentials.json"): | |
| raise FileNotFoundError( | |
| "Nenhuma credencial encontrada em ambiente ou no arquivo 'credentials.json'." | |
| ) | |
| print("Variavel não encontrada, usando credentials.json") | |
| with open("credentials.json", 'r', encoding='utf-8') as f: | |
| creds_json = json.load(f) | |
| print("Procurando tokens na variavel de ambiente...") | |
| token_data = None | |
| if env_token: | |
| try: | |
| token_data = json.loads(env_token) | |
| except json.JSONDecodeError: | |
| raise ValueError("A variável de ambiente 'TOKEN' não contém JSON válido.") | |
| else: | |
| # Se não há token no ambiente, checamos arquivo local | |
| if os.path.exists("token.json"): | |
| print("Variavel não encontrada, usando token.json") | |
| with open("token.json", 'r', encoding='utf-8') as tf: | |
| token_data = json.load(tf) | |
| else: | |
| raise FileNotFoundError( | |
| "Não há token no ambiente nem em 'token.json'. " | |
| "Não é possível autenticar sem abrir navegador, então abortando." | |
| ) | |
| # 3) Criar credenciais a partir do token_data | |
| creds = Credentials.from_authorized_user_info(token_data, self.SCOPES) | |
| # 4) Se expirou, tenta refresh | |
| if not creds.valid: | |
| if creds.expired and creds.refresh_token: | |
| creds.refresh(Request()) | |
| # Salva token atualizado, se estiver usando arquivo local | |
| if not env_token: # só sobrescreve se está lendo do disco | |
| with open("token.json", 'w', encoding='utf-8') as token_file: | |
| token_file.write(creds.to_json()) | |
| else: | |
| # Se não é válido e não há refresh token, não temos como renovar sem navegador | |
| raise RuntimeError( | |
| "As credenciais de token são inválidas/expiradas e sem refresh token. " | |
| "Não é possível abrir navegador neste fluxo, abortando." | |
| ) | |
| return creds | |
| def authenticate(self): | |
| """Cria e armazena o serviço do Drive API nesta instância.""" | |
| creds = self._get_credentials_from_env_or_file() | |
| self.service = build("drive", "v3", credentials=creds) | |
| def _list_files_in_folder(self, folder_id): | |
| """Retorna a lista de itens (arquivos/pastas) diretamente em 'folder_id'.""" | |
| items = [] | |
| page_token = None | |
| query = f"'{folder_id}' in parents and trashed=false" | |
| while True: | |
| response = self.service.files().list( | |
| q=query, | |
| spaces='drive', | |
| fields='nextPageToken, files(id, name, mimeType)', | |
| pageToken=page_token | |
| ).execute() | |
| items.extend(response.get('files', [])) | |
| page_token = response.get('nextPageToken', None) | |
| if not page_token: | |
| break | |
| return items | |
| def _get_file_metadata(self, file_id): | |
| """ | |
| Retorna (size, md5Checksum, modifiedTime) de um arquivo no Drive. | |
| Se algum campo não existir, retorna valor padrão. | |
| """ | |
| data = self.service.files().get( | |
| fileId=file_id, | |
| fields='size, md5Checksum, modifiedTime' | |
| ).execute() | |
| size = int(data.get('size', 0)) | |
| md5 = data.get('md5Checksum', '') | |
| modified_time = data.get('modifiedTime', '') | |
| return size, md5, modified_time | |
| def _get_all_items_recursively(self, folder_id, parent_path=''): | |
| """ | |
| Percorre recursivamente a pasta (folder_id) no Drive, | |
| retornando lista de dicts (id, name, mimeType, path). | |
| """ | |
| results = [] | |
| items = self._list_files_in_folder(folder_id) | |
| for item in items: | |
| current_path = os.path.join(parent_path, item['name']) | |
| if item['mimeType'] == 'application/vnd.google-apps.folder': | |
| results.append({ | |
| 'id': item['id'], | |
| 'name': item['name'], | |
| 'mimeType': item['mimeType'], | |
| 'path': current_path | |
| }) | |
| sub = self._get_all_items_recursively(item['id'], current_path) | |
| results.extend(sub) | |
| else: | |
| results.append({ | |
| 'id': item['id'], | |
| 'name': item['name'], | |
| 'mimeType': item['mimeType'], | |
| 'path': parent_path | |
| }) | |
| return results | |
| def _needs_download(self, local_folder, file_info): | |
| """ | |
| Verifica se o arquivo em 'file_info' precisa ser baixado. | |
| - Se não existir localmente, retorna True. | |
| - Se existir, compara tamanho e MD5 (quando disponível). | |
| - Retorna True se for diferente, False se for idêntico. | |
| """ | |
| file_id = file_info['id'] | |
| file_name = file_info['name'] | |
| rel_path = file_info['path'] | |
| drive_size, drive_md5, _ = self._get_file_metadata(file_id) | |
| full_local_path = os.path.join(local_folder, rel_path, file_name) | |
| if not os.path.exists(full_local_path): | |
| return True # Não existe localmente | |
| local_size = os.path.getsize(full_local_path) | |
| if local_size != drive_size: | |
| return True | |
| if drive_md5: | |
| with open(full_local_path, 'rb') as f: | |
| local_md5 = hashlib.md5(f.read()).hexdigest() | |
| if local_md5 != drive_md5: | |
| return True | |
| return False | |
| def _download_single_file(self, file_id, file_name, relative_path, progress_bar): | |
| """ | |
| Faz download de um único arquivo do Drive, atualizando a barra de progresso global. | |
| """ | |
| # Como fizemos 'os.chdir(local_folder)' antes, 'relative_path' pode ser vazio. | |
| # Então concatenamos sem o local_folder: | |
| file_path = os.path.join(relative_path, file_name) | |
| # Se o path do diretório for vazio, cai no '.' para evitar WinError 3 | |
| dir_name = os.path.dirname(file_path) or '.' | |
| os.makedirs(dir_name, exist_ok=True) | |
| request = self.service.files().get_media(fileId=file_id) | |
| with io.FileIO(file_path, 'wb') as fh: | |
| downloader = MediaIoBaseDownload(fh, request, chunksize=self.chunksize) | |
| done = False | |
| previous_progress = 0 | |
| while not done: | |
| status, done = downloader.next_chunk() | |
| if status: | |
| current_progress = status.resumable_progress | |
| chunk_downloaded = current_progress - previous_progress | |
| previous_progress = current_progress | |
| progress_bar.update(chunk_downloaded) | |
| def download_from_folder(self, drive_folder_id: str, local_folder: str): | |
| """ | |
| Método principal para: | |
| 1. Autenticar sem abrir navegador (usa token local/ambiente). | |
| 2. Exibir "Iniciando verificação de documentos". | |
| 3. Listar recursivamente arquivos da pasta do Drive. | |
| 4. Verificar quais precisam de download. | |
| 5. Baixar apenas o necessário, com barra de progresso única. | |
| """ | |
| print("Iniciando verificação de documentos") | |
| if not self.service: | |
| self.authenticate() | |
| print("Buscando lista de arquivos no Drive...") | |
| all_items = self._get_all_items_recursively(drive_folder_id) | |
| # Filtra apenas arquivos (exclui subpastas) | |
| all_files = [f for f in all_items if f['mimeType'] != 'application/vnd.google-apps.folder'] | |
| print("Verificando quais arquivos precisam ser baixados...") | |
| files_to_download = [] | |
| total_size_to_download = 0 | |
| for info in all_files: | |
| if self._needs_download(local_folder, info): | |
| drive_size, _, _ = self._get_file_metadata(info['id']) | |
| total_size_to_download += drive_size | |
| files_to_download.append(info) | |
| if not files_to_download: | |
| print("Nenhum arquivo novo ou atualizado. Tudo sincronizado!") | |
| return | |
| print("Calculando total de bytes a serem baixados...") | |
| # Ajusta a pasta local e cria se necessário | |
| os.makedirs(local_folder, exist_ok=True) | |
| # Muda diretório de trabalho para simplificar criação de subpastas | |
| old_cwd = os.getcwd() | |
| os.chdir(local_folder) | |
| # Cria a barra de progresso global | |
| progress_bar = tqdm( | |
| total=total_size_to_download, | |
| unit='B', | |
| unit_scale=True, | |
| desc='Baixando arquivos' | |
| ) | |
| # Baixa só o que precisa | |
| for file_info in files_to_download: | |
| self._download_single_file( | |
| file_id=file_info['id'], | |
| file_name=file_info['name'], | |
| relative_path=file_info['path'], | |
| progress_bar=progress_bar | |
| ) | |
| progress_bar.close() | |
| os.chdir(old_cwd) | |
| print("Download concluído com sucesso!") | |