|
import gradio as gr |
|
from google_auth_oauthlib.flow import Flow |
|
from googleapiclient.discovery import build |
|
from googleapiclient.http import MediaFileUpload |
|
from deepgram import DeepgramClient, PrerecordedOptions |
|
import os |
|
import re |
|
import pickle |
|
import webbrowser |
|
from threading import Thread |
|
from flask import Flask, request |
|
|
|
|
|
CLIENT_SECRETS_FILE = "C:/Users/sathy/OneDrive/Desktop/gradio_app/gradio3/client_secret2.json" |
|
SCOPES = ['https://www.googleapis.com/auth/drive.file', 'https://www.googleapis.com/auth/userinfo.profile'] |
|
API_SERVICE_NAME = 'drive' |
|
API_VERSION = 'v3' |
|
|
|
|
|
user_creds = None |
|
user_info = None |
|
auth_completed = False |
|
DEEPGRAM_API_KEY = "1e124afa2a0bdb7fbc62306edf5cb73f5439dc4c" |
|
|
|
function_variables = {} |
|
|
|
|
|
app = Flask(__name__) |
|
|
|
@app.route('/oauth2callback') |
|
def oauth2callback(): |
|
global user_creds, user_info, auth_completed |
|
flow = Flow.from_client_secrets_file( |
|
CLIENT_SECRETS_FILE, SCOPES, redirect_uri='http://localhost:8080/oauth2callback') |
|
flow.fetch_token(code=request.args.get('code')) |
|
user_creds = flow.credentials |
|
|
|
|
|
with open('token.pickle', 'wb') as token: |
|
pickle.dump(user_creds, token) |
|
|
|
|
|
try: |
|
service = build('oauth2', 'v2', credentials=user_creds) |
|
user_info = service.userinfo().get().execute() |
|
auth_completed = True |
|
except Exception as e: |
|
print(f"Error getting user info: {str(e)}") |
|
user_info = None |
|
auth_completed = False |
|
|
|
return "Authentication successful! You can close this window and return to the app." |
|
|
|
def start_server(): |
|
app.run(port=8080) |
|
|
|
def login(): |
|
global user_creds, user_info, auth_completed |
|
auth_completed = False |
|
flow = Flow.from_client_secrets_file( |
|
CLIENT_SECRETS_FILE, SCOPES, redirect_uri='http://localhost:8080/oauth2callback') |
|
auth_url, _ = flow.authorization_url(prompt='consent') |
|
|
|
|
|
Thread(target=start_server).start() |
|
|
|
|
|
webbrowser.open(auth_url) |
|
|
|
|
|
import time |
|
timeout = 60 |
|
start_time = time.time() |
|
while not auth_completed: |
|
time.sleep(1) |
|
if time.time() - start_time > timeout: |
|
return "Login timed out. Please try again." |
|
|
|
if user_info: |
|
return f"Logged in as: {user_info.get('name', 'Unknown')}" |
|
else: |
|
return "Login failed. Please check your credentials and try again." |
|
|
|
def create_folder_if_not_exists(service, folder_name): |
|
query = f"name='{folder_name}' and mimeType='application/vnd.google-apps.folder'" |
|
results = service.files().list(q=query, fields="files(id)").execute() |
|
folders = results.get('files', []) |
|
|
|
if not folders: |
|
folder_metadata = { |
|
'name': folder_name, |
|
'mimeType': 'application/vnd.google-apps.folder' |
|
} |
|
folder = service.files().create(body=folder_metadata, fields='id').execute() |
|
return folder['id'] |
|
else: |
|
return folders[0]['id'] |
|
|
|
|
|
def upload_file(file): |
|
global user_creds, user_info |
|
if not user_creds: |
|
return "Please login first." |
|
|
|
try: |
|
drive_service = build(API_SERVICE_NAME, API_VERSION, credentials=user_creds) |
|
|
|
folder_id = create_folder_if_not_exists(drive_service, 'GradioUploads') |
|
|
|
file_metadata = { |
|
'name': os.path.basename(file.name), |
|
'parents': [folder_id] |
|
} |
|
|
|
media = MediaFileUpload(file.name, resumable=True) |
|
file = drive_service.files().create(body=file_metadata, media_body=media, fields='id,webViewLink,name').execute() |
|
uploaded_filename = file.get('name') |
|
|
|
permission = { |
|
'type': 'anyone', |
|
'role': 'reader', |
|
} |
|
drive_service.permissions().create(fileId=file['id'], body=permission).execute() |
|
|
|
file_id = file['id'] |
|
function_variables['file_name']=file.get('name') |
|
|
|
download_link = f"https://drive.google.com/uc?export=download&id={file_id}" |
|
|
|
function_variables['download_link'] = download_link |
|
|
|
return f"File uploaded by: {user_info.get('name', 'Unknown')}\nShareable Link: {file['webViewLink']}\nDownloadable Link: {function_variables['download_link'] }" |
|
except Exception as e: |
|
return f"An error occurred: {str(e)}" |
|
|
|
def transcribe_audio(): |
|
try: |
|
deepgram = DeepgramClient(DEEPGRAM_API_KEY) |
|
|
|
options = PrerecordedOptions( |
|
model="nova-2", |
|
language="en", |
|
smart_format=True, |
|
) |
|
|
|
|
|
AUDIO_URL = { |
|
"url": str(function_variables['download_link']) |
|
} |
|
|
|
response = deepgram.listen.prerecorded.v("1").transcribe_url(AUDIO_URL, options) |
|
response_json = response.to_dict() |
|
transcript = response_json['results']['channels'][0]['alternatives'][0]['transcript'] |
|
function_variables['transcript'] = transcript |
|
|
|
return transcript |
|
|
|
except Exception as e: |
|
print(f"Exception: {e}") |
|
|
|
def print_transcript(): |
|
try: |
|
audioname=str(function_variables['file_name']) |
|
transcript_file_name = re.sub(r'\..*', '', audioname) + "_transcript.txt" |
|
|
|
with open(transcript_file_name, 'w') as file: |
|
file.write(function_variables['transcript']) |
|
|
|
function_variables['transcript_file_path'] = transcript_file_name |
|
|
|
return transcript_file_name |
|
|
|
|
|
|
|
|
|
except Exception as e: |
|
return f"An error occurred while saving the transcript: {str(e)}" |
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Upload File to Google Drive") |
|
|
|
with gr.Row(): |
|
login_button = gr.Button("Login with Google") |
|
login_output = gr.Textbox(label="Login Status", lines=3) |
|
|
|
with gr.Row(): |
|
file_input = gr.File(label="Upload a file") |
|
upload_output = gr.Textbox(label="Upload Result") |
|
|
|
upload_button = gr.Button("Upload to Google Drive") |
|
|
|
with gr.Row(): |
|
transcribe_button = gr.Button("Transcribe the audio") |
|
print_transcript_button = gr.Button("Print Transcript") |
|
|
|
|
|
downloadble_preview = gr.Textbox(label="Transcript") |
|
download_file = gr.File(label="Download file") |
|
|
|
login_button.click(login, outputs=login_output) |
|
upload_button.click(upload_file, inputs=file_input, outputs=upload_output) |
|
transcribe_button.click(transcribe_audio, outputs=downloadble_preview) |
|
|
|
print_transcript_button.click(print_transcript, outputs=download_file) |
|
|
|
if __name__ == '__main__': |
|
demo.launch() |