import gradio as gr from google_auth_oauthlib.flow import Flow from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload from deepgram import DeepgramClient, PrerecordedOptions import os import re import pickle import webbrowser from threading import Thread from flask import Flask, request from dotenv import load_dotenv load_dotenv() DEEPGRAM_API_KEY = os.getenv("DEEPGRAM_API_KEY") CLIENT_SECRETS_FILE = os.getenv("CLIENT_SECRETS_FILE") # OAuth 2.0 configuration #CLIENT_SECRETS_FILE = "C:/Users/sathy/OneDrive/Desktop/gradio_app/gradio3/client_secret2.json" SCOPES = ['https://www.googleapis.com/auth/drive.file', 'https://www.googleapis.com/auth/userinfo.profile'] API_SERVICE_NAME = 'drive' API_VERSION = 'v3' # Global variables to store user credentials and info user_creds = None user_info = None auth_completed = False function_variables = {} # Flask app for handling OAuth callback app = Flask(__name__) @app.route('/oauth2callback') def oauth2callback(): global user_creds, user_info, auth_completed flow = Flow.from_client_secrets_file( CLIENT_SECRETS_FILE, SCOPES, redirect_uri='http://localhost:8080/oauth2callback') flow.fetch_token(code=request.args.get('code')) user_creds = flow.credentials # Save credentials for future use with open('token.pickle', 'wb') as token: pickle.dump(user_creds, token) # Get user info try: service = build('oauth2', 'v2', credentials=user_creds) user_info = service.userinfo().get().execute() auth_completed = True except Exception as e: print(f"Error getting user info: {str(e)}") user_info = None auth_completed = False return "Authentication successful! You can close this window and return to the app." def start_server(): app.run(port=8080) def login(): global user_creds, user_info, auth_completed auth_completed = False flow = Flow.from_client_secrets_file( CLIENT_SECRETS_FILE, SCOPES, redirect_uri='http://localhost:8080/oauth2callback') auth_url, _ = flow.authorization_url(prompt='consent') # Start the local server in a separate thread Thread(target=start_server).start() # Open the authorization URL in the default browser webbrowser.open(auth_url) # Wait for the authentication to complete import time timeout = 60 # 60 seconds timeout start_time = time.time() while not auth_completed: time.sleep(1) if time.time() - start_time > timeout: return "Login timed out. Please try again." if user_info: return f"Logged in as: {user_info.get('name', 'Unknown')}" else: return "Login failed. Please check your credentials and try again." def create_folder_if_not_exists(service, folder_name): query = f"name='{folder_name}' and mimeType='application/vnd.google-apps.folder'" results = service.files().list(q=query, fields="files(id)").execute() folders = results.get('files', []) if not folders: folder_metadata = { 'name': folder_name, 'mimeType': 'application/vnd.google-apps.folder' } folder = service.files().create(body=folder_metadata, fields='id').execute() return folder['id'] else: return folders[0]['id'] def upload_file(file): global user_creds, user_info if not user_creds: return "Please login first." try: drive_service = build(API_SERVICE_NAME, API_VERSION, credentials=user_creds) folder_id = create_folder_if_not_exists(drive_service, 'GradioUploads') file_metadata = { 'name': os.path.basename(file.name), 'parents': [folder_id] } media = MediaFileUpload(file.name, resumable=True) file = drive_service.files().create(body=file_metadata, media_body=media, fields='id,webViewLink,name').execute() uploaded_filename = file.get('name') # Set the file to be publicly accessible permission = { 'type': 'anyone', 'role': 'reader', } drive_service.permissions().create(fileId=file['id'], body=permission).execute() file_id = file['id'] function_variables['file_name']=file.get('name') download_link = f"https://drive.google.com/uc?export=download&id={file_id}" function_variables['download_link'] = download_link return f"File uploaded by: {user_info.get('name', 'Unknown')}\nShareable Link: {file['webViewLink']}\nDownloadable Link: {function_variables['download_link'] }" except Exception as e: return f"An error occurred: {str(e)}" def transcribe_audio(): try: deepgram = DeepgramClient(DEEPGRAM_API_KEY) options = PrerecordedOptions( model="nova-2", language="en", smart_format=True, ) #AUDIO_URL = function_variables['download_link'] AUDIO_URL = { "url": str(function_variables['download_link']) } #print(f"Audio link: {AUDIO_URL}") # Debugging line to verify URL response = deepgram.listen.prerecorded.v("1").transcribe_url(AUDIO_URL, options) response_json = response.to_dict() # Convert response to a dictionary transcript = response_json['results']['channels'][0]['alternatives'][0]['transcript'] function_variables['transcript'] = transcript #return audio_link return transcript except Exception as e: print(f"Exception: {e}") def print_transcript(): try: audioname=str(function_variables['file_name']) transcript_file_name = re.sub(r'\..*', '', audioname) + "_transcript.txt" with open(transcript_file_name, 'w') as file: file.write(function_variables['transcript']) function_variables['transcript_file_path'] = transcript_file_name # Save the file path return transcript_file_name #return function_variables['download_link'] #return transcript_file_name, text_op except Exception as e: return f"An error occurred while saving the transcript: {str(e)}" # Gradio interface with gr.Blocks() as demo: gr.Markdown("# Upload File to Google Drive") with gr.Row(): login_button = gr.Button("Login with Google") login_output = gr.Textbox(label="Login Status", lines=3) with gr.Row(): file_input = gr.File(label="Upload a file") upload_output = gr.Textbox(label="Upload Result") upload_button = gr.Button("Upload to Google Drive") with gr.Row(): transcribe_button = gr.Button("Transcribe the audio") print_transcript_button = gr.Button("Print Transcript") downloadble_preview = gr.Textbox(label="Transcript") download_file = gr.File(label="Download file") login_button.click(login, outputs=login_output) upload_button.click(upload_file, inputs=file_input, outputs=upload_output) transcribe_button.click(transcribe_audio, outputs=downloadble_preview) #print_transcript_button.click(print_transcript, outputs=downloadble_preview) print_transcript_button.click(print_transcript, outputs=download_file) if __name__ == '__main__': demo.launch()