Spaces:
Sleeping
Sleeping
import streamlit as st | |
from google_auth_oauthlib.flow import Flow | |
from googleapiclient.discovery import build | |
from google.oauth2.credentials import Credentials | |
import os | |
import random | |
from io import BytesIO | |
from PIL import Image | |
# Constants | |
SCOPES = ['https://www.googleapis.com/auth/drive.readonly'] | |
CLIENT_SECRETS_FILE = 'client_secrets.json' | |
def create_flow(): | |
return Flow.from_client_secrets_file( | |
CLIENT_SECRETS_FILE, | |
scopes=SCOPES, | |
redirect_uri=REDIRECT_URI | |
) | |
def list_folders(service): | |
results = service.files().list( | |
q="mimeType='application/vnd.google-apps.folder' and trashed=false", | |
spaces='drive', | |
fields='nextPageToken, files(id, name)', | |
).execute() | |
items = results.get('files', []) | |
folders = {item['id']: item['name'] for item in items} | |
return folders | |
def list_images_in_folder(service, folder_id): | |
query = f"parents='{folder_id}' and mimeType contains 'image/' and trashed=false" | |
results = service.files().list( | |
q=query, | |
spaces='drive', | |
fields='nextPageToken, files(id, name, mimeType)', | |
).execute() | |
items = results.get('files', []) | |
return items | |
def download_image(service, file_id): | |
request = service.files().get_media(fileId=file_id) | |
fh = BytesIO() | |
downloader = MediaIoBaseDownload(fh, request) | |
done = False | |
while not done: | |
status, done = downloader.next_chunk() | |
fh.seek(0) | |
return fh.read() | |
def creds_to_dict(creds): | |
"""Helper function to convert credentials object to a dictionary.""" | |
return { | |
'token': creds.token, | |
'refresh_token': creds.refresh_token, | |
'token_uri': creds.token_uri, | |
'client_id': creds.client_id, | |
'client_secret': creds.client_secret, | |
'scopes': creds.scopes | |
} | |
def main(): | |
st.title("Google Drive Image Viewer") | |
if 'credentials' not in st.session_state or st.session_state['credentials'] is None: | |
flow = create_flow() | |
authorization_url, _ = flow.authorization_url( | |
access_type='offline', | |
include_granted_scopes='true', | |
prompt='consent' | |
) | |
st.write("Please [authorize]({}) to continue.".format(authorization_url)) | |
else: | |
creds = Credentials(**st.session_state['credentials']) | |
service = build('drive', 'v3', credentials=creds) | |
# List folders | |
folders = list_folders(service) | |
if folders: | |
folder_list = list(folders.items()) | |
folder_name = [name for _, name in folder_list] | |
folder_id_list = [id for id, _ in folder_list] | |
selected_index = st.selectbox("Select a folder", range(len(folder_name)), format_func=lambda x: folder_name[x]) | |
folder_id = folder_id_list[selected_index] | |
# Get images from the selected folder | |
images = list_images_in_folder(service, folder_id) | |
if images: | |
# Shuffle images | |
random.shuffle(images) | |
# Get the current image index | |
index = st.session_state.get('image_index', 0) | |
image = images[index] | |
img_data = download_image(service, image['id']) | |
image_pil = Image.open(BytesIO(img_data)) | |
st.image(image_pil, caption=image['name']) | |
if st.button("Next Image"): | |
st.session_state['image_index'] = (index + 1) % len(images) | |
st.experimental_rerun() | |
else: | |
st.write("No images found in this folder.") | |
else: | |
st.write("No folders found in your Drive.") | |
if __name__ == '__main__': | |
from googleapiclient.http import MediaIoBaseDownload | |
# Set REDIRECT_URI | |
if 'REDIRECT_URI' not in st.session_state: | |
# Get the REDIRECT_URI from secrets | |
REDIRECT_URI = st.secrets["REDIRECT_URI"] | |
st.session_state['REDIRECT_URI'] = REDIRECT_URI | |
else: | |
REDIRECT_URI = st.session_state['REDIRECT_URI'] | |
if 'credentials' not in st.session_state: | |
st.session_state['credentials'] = None | |
# Get query parameters | |
query_params = st.query_params | |
if 'code' in query_params: | |
code = query_params['code'][0] | |
flow = create_flow() | |
flow.fetch_token(code=code) | |
creds = flow.credentials | |
st.session_state['credentials'] = creds_to_dict(creds) | |
st.experimental_rerun() | |
else: | |
main() |