Spaces:
Sleeping
Sleeping
import os | |
import pickle | |
import base64 | |
from google.auth.transport.requests import Request | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from googleapiclient.discovery import build | |
from googleapiclient.http import MediaFileUpload, MediaIoBaseUpload, MediaIoBaseDownload | |
import io | |
from pathlib import Path | |
import logging | |
logger = logging.getLogger(__name__) | |
# --- CONFIGURATION --- | |
# Get credentials from environment variables | |
CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID") | |
CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET") | |
# Google Drive folder IDs | |
EXPORTS_FOLDER_ID = "1k5iP4egzLrGJwnHkMhxt9bAkaCiieojO" # For Excel exports | |
IMAGES_FOLDER_ID = "1gd280IqcAzpAFTPeYsZjoBUOU9S7Zx3c" # For business card images | |
# Scopes define the level of access you are requesting. | |
SCOPES = ['https://www.googleapis.com/auth/drive.file'] | |
TOKEN_PICKLE_FILE = 'token.pickle' | |
def get_drive_service(): | |
"""Authenticates with Google and returns a Drive service object.""" | |
creds = None | |
# --- NEW CODE FOR DEPLOYMENT ENVIRONMENTS --- | |
# If token file doesn't exist, try to create it from environment variable | |
if not os.path.exists(TOKEN_PICKLE_FILE): | |
encoded_token = os.environ.get('GOOGLE_TOKEN_BASE64') | |
if encoded_token: | |
logger.info("Found token in environment variable. Recreating token.pickle file.") | |
try: | |
decoded_token = base64.b64decode(encoded_token) | |
with open(TOKEN_PICKLE_FILE, "wb") as token_file: | |
token_file.write(decoded_token) | |
logger.info("Successfully recreated token.pickle from environment variable") | |
except Exception as e: | |
logger.error(f"Failed to decode token from environment variable: {e}") | |
# --- END OF NEW CODE --- | |
# The file token.pickle stores the user's access and refresh tokens. | |
if os.path.exists(TOKEN_PICKLE_FILE): | |
with open(TOKEN_PICKLE_FILE, 'rb') as token: | |
creds = pickle.load(token) | |
# If there are no (valid) credentials available, let the user log in. | |
if not creds or not creds.valid: | |
if creds and creds.expired and creds.refresh_token: | |
logger.info("Refreshing expired credentials") | |
creds.refresh(Request()) | |
else: | |
if not CLIENT_ID or not CLIENT_SECRET: | |
raise ValueError("GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET environment variables are required") | |
logger.info("Starting OAuth flow for new credentials") | |
# Use client_config dictionary instead of a client_secret.json file | |
client_config = { | |
"installed": { | |
"client_id": CLIENT_ID, | |
"client_secret": CLIENT_SECRET, | |
"auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
"token_uri": "https://oauth2.googleapis.com/token", | |
"redirect_uris": ["http://localhost"] | |
} | |
} | |
flow = InstalledAppFlow.from_client_config(client_config, SCOPES) | |
creds = flow.run_local_server(port=0) | |
# Save the credentials for the next run | |
with open(TOKEN_PICKLE_FILE, 'wb') as token: | |
pickle.dump(creds, token) | |
logger.info("Saved new credentials to token.pickle") | |
return build('drive', 'v3', credentials=creds) | |
def upload_file_to_drive(service, file_path=None, file_data=None, filename=None, folder_id=None, mimetype='application/octet-stream'): | |
""" | |
Uploads a file to a specific folder in Google Drive. | |
Args: | |
service: Google Drive service object | |
file_path: Path to local file (for file uploads) | |
file_data: Bytes data (for in-memory uploads) | |
filename: Name for the file in Drive | |
folder_id: ID of the target folder | |
mimetype: MIME type of the file | |
Returns: | |
dict: File information (id, webViewLink) or None if failed | |
""" | |
try: | |
if file_path and os.path.exists(file_path): | |
# Upload from local file | |
if not filename: | |
filename = os.path.basename(file_path) | |
media = MediaFileUpload(file_path, mimetype=mimetype, resumable=True) | |
logger.info(f"Uploading file from path: {file_path}") | |
elif file_data and filename: | |
# Upload from bytes data | |
file_io = io.BytesIO(file_data) | |
media = MediaIoBaseUpload(file_io, mimetype=mimetype, resumable=True) | |
logger.info(f"Uploading file from memory: {filename}") | |
else: | |
logger.error("Either file_path or (file_data + filename) must be provided") | |
return None | |
# Define the file's metadata | |
file_metadata = { | |
'name': filename, | |
'parents': [folder_id] if folder_id else [] | |
} | |
logger.info(f"Uploading '{filename}' to Google Drive folder {folder_id}") | |
# Execute the upload request | |
file = service.files().create( | |
body=file_metadata, | |
media_body=media, | |
fields='id, webViewLink, name' | |
).execute() | |
logger.info(f"✅ File uploaded successfully!") | |
logger.info(f" File ID: {file.get('id')}") | |
logger.info(f" File Name: {file.get('name')}") | |
logger.info(f" View Link: {file.get('webViewLink')}") | |
return { | |
'id': file.get('id'), | |
'name': file.get('name'), | |
'webViewLink': file.get('webViewLink') | |
} | |
except Exception as e: | |
logger.error(f"Failed to upload file to Google Drive: {e}") | |
return None | |
def upload_excel_to_exports_folder(service, file_path=None, file_data=None, filename=None): | |
"""Upload Excel file to the exports folder.""" | |
return upload_file_to_drive( | |
service, | |
file_path=file_path, | |
file_data=file_data, | |
filename=filename, | |
folder_id=EXPORTS_FOLDER_ID, | |
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' | |
) | |
def upload_image_to_images_folder(service, file_path=None, file_data=None, filename=None, mimetype='image/png'): | |
"""Upload image file to the images folder.""" | |
return upload_file_to_drive( | |
service, | |
file_path=file_path, | |
file_data=file_data, | |
filename=filename, | |
folder_id=IMAGES_FOLDER_ID, | |
mimetype=mimetype | |
) | |
def list_files_in_folder(service, folder_id, max_results=100): | |
"""List files in a specific Google Drive folder.""" | |
try: | |
query = f"'{folder_id}' in parents" | |
results = service.files().list( | |
q=query, | |
pageSize=max_results, | |
fields="files(id, name, size, createdTime, webViewLink)" | |
).execute() | |
files = results.get('files', []) | |
logger.info(f"Found {len(files)} files in folder {folder_id}") | |
return files | |
except Exception as e: | |
logger.error(f"Failed to list files in folder {folder_id}: {e}") | |
return [] | |
def download_file_from_drive(service, file_id, file_path): | |
"""Download a file from Google Drive to local path.""" | |
try: | |
request = service.files().get_media(fileId=file_id) | |
with open(file_path, 'wb') as local_file: | |
downloader = MediaIoBaseDownload(local_file, request) | |
done = False | |
while done is False: | |
status, done = downloader.next_chunk() | |
logger.info(f"Successfully downloaded file {file_id} to {file_path}") | |
return True | |
except Exception as e: | |
logger.error(f"Failed to download file {file_id}: {e}") | |
return False | |
def delete_file_from_drive(service, file_id): | |
"""Delete a file from Google Drive.""" | |
try: | |
service.files().delete(fileId=file_id).execute() | |
logger.info(f"Successfully deleted file {file_id} from Google Drive") | |
return True | |
except Exception as e: | |
logger.error(f"Failed to delete file {file_id}: {e}") | |
return False | |
def get_existing_cumulative_file(service): | |
"""Find and return the existing cumulative Excel file from exports folder.""" | |
try: | |
exports_files = list_files_in_folder(service, EXPORTS_FOLDER_ID) | |
cumulative_files = [] | |
for file in exports_files: | |
if file['name'] == 'all_business_cards_total.xlsx': | |
cumulative_files.append(file) | |
if cumulative_files: | |
logger.info(f"Found {len(cumulative_files)} cumulative files") | |
# Return the most recent one (by creation time) | |
most_recent = max(cumulative_files, key=lambda x: x['createdTime']) | |
logger.info(f"Most recent cumulative file: {most_recent['name']} (ID: {most_recent['id']})") | |
return most_recent | |
else: | |
logger.info("No existing cumulative file found") | |
return None | |
except Exception as e: | |
logger.error(f"Failed to get existing cumulative file: {e}") | |
return None | |
def cleanup_duplicate_cumulative_files(service): | |
"""Remove duplicate cumulative files, keeping only the most recent one.""" | |
try: | |
exports_files = list_files_in_folder(service, EXPORTS_FOLDER_ID) | |
cumulative_files = [] | |
for file in exports_files: | |
if file['name'] == 'all_business_cards_total.xlsx': | |
cumulative_files.append(file) | |
if len(cumulative_files) > 1: | |
logger.info(f"Found {len(cumulative_files)} duplicate cumulative files, cleaning up...") | |
# Sort by creation time and keep the most recent one | |
cumulative_files.sort(key=lambda x: x['createdTime'], reverse=True) | |
files_to_delete = cumulative_files[1:] # All except the most recent | |
for file in files_to_delete: | |
logger.info(f"Deleting duplicate file: {file['name']} (ID: {file['id']})") | |
delete_file_from_drive(service, file['id']) | |
logger.info(f"Cleaned up {len(files_to_delete)} duplicate files") | |
return len(files_to_delete) | |
else: | |
logger.info("No duplicate cumulative files found") | |
return 0 | |
except Exception as e: | |
logger.error(f"Failed to cleanup duplicate files: {e}") | |
return 0 | |
def cleanup_old_current_run_files(service, keep_count=5): | |
"""Clean up old current run files, keeping only the most recent ones.""" | |
try: | |
exports_files = list_files_in_folder(service, EXPORTS_FOLDER_ID) | |
current_run_files = [] | |
for file in exports_files: | |
if file['name'].startswith('current_run_') and file['name'].endswith('.xlsx'): | |
current_run_files.append(file) | |
if len(current_run_files) > keep_count: | |
logger.info(f"Found {len(current_run_files)} current run files, keeping {keep_count} most recent...") | |
# Sort by creation time and keep the most recent ones | |
current_run_files.sort(key=lambda x: x['createdTime'], reverse=True) | |
files_to_delete = current_run_files[keep_count:] # All except the most recent ones | |
for file in files_to_delete: | |
logger.info(f"Deleting old current run file: {file['name']} (ID: {file['id']})") | |
delete_file_from_drive(service, file['id']) | |
logger.info(f"Cleaned up {len(files_to_delete)} old current run files") | |
return len(files_to_delete) | |
else: | |
logger.info(f"Found {len(current_run_files)} current run files, no cleanup needed") | |
return 0 | |
except Exception as e: | |
logger.error(f"Failed to cleanup old current run files: {e}") | |
return 0 | |
if __name__ == '__main__': | |
# Test the Google Drive connection | |
try: | |
drive_service = get_drive_service() | |
logger.info("Google Drive service initialized successfully") | |
# List files in both folders to verify access | |
exports_files = list_files_in_folder(drive_service, EXPORTS_FOLDER_ID) | |
images_files = list_files_in_folder(drive_service, IMAGES_FOLDER_ID) | |
print(f"Exports folder contains {len(exports_files)} files") | |
print(f"Images folder contains {len(images_files)} files") | |
except Exception as e: | |
logger.error(f"Failed to initialize Google Drive: {e}") |