sathyaphaneeshwar's picture
Speech_to_test_Version1
538dd80
raw
history blame
7.22 kB
import gradio as gr
from google_auth_oauthlib.flow import Flow
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from deepgram import DeepgramClient, PrerecordedOptions
import os
import re
import pickle
import webbrowser
from threading import Thread
from flask import Flask, request
# OAuth 2.0 configuration
CLIENT_SECRETS_FILE = "C:/Users/sathy/OneDrive/Desktop/gradio_app/gradio3/client_secret2.json"
SCOPES = ['https://www.googleapis.com/auth/drive.file', 'https://www.googleapis.com/auth/userinfo.profile']
API_SERVICE_NAME = 'drive'
API_VERSION = 'v3'
# Global variables to store user credentials and info
user_creds = None
user_info = None
auth_completed = False
DEEPGRAM_API_KEY = "1e124afa2a0bdb7fbc62306edf5cb73f5439dc4c"
function_variables = {}
# Flask app for handling OAuth callback
app = Flask(__name__)
@app.route('/oauth2callback')
def oauth2callback():
global user_creds, user_info, auth_completed
flow = Flow.from_client_secrets_file(
CLIENT_SECRETS_FILE, SCOPES, redirect_uri='http://localhost:8080/oauth2callback')
flow.fetch_token(code=request.args.get('code'))
user_creds = flow.credentials
# Save credentials for future use
with open('token.pickle', 'wb') as token:
pickle.dump(user_creds, token)
# Get user info
try:
service = build('oauth2', 'v2', credentials=user_creds)
user_info = service.userinfo().get().execute()
auth_completed = True
except Exception as e:
print(f"Error getting user info: {str(e)}")
user_info = None
auth_completed = False
return "Authentication successful! You can close this window and return to the app."
def start_server():
app.run(port=8080)
def login():
global user_creds, user_info, auth_completed
auth_completed = False
flow = Flow.from_client_secrets_file(
CLIENT_SECRETS_FILE, SCOPES, redirect_uri='http://localhost:8080/oauth2callback')
auth_url, _ = flow.authorization_url(prompt='consent')
# Start the local server in a separate thread
Thread(target=start_server).start()
# Open the authorization URL in the default browser
webbrowser.open(auth_url)
# Wait for the authentication to complete
import time
timeout = 60 # 60 seconds timeout
start_time = time.time()
while not auth_completed:
time.sleep(1)
if time.time() - start_time > timeout:
return "Login timed out. Please try again."
if user_info:
return f"Logged in as: {user_info.get('name', 'Unknown')}"
else:
return "Login failed. Please check your credentials and try again."
def create_folder_if_not_exists(service, folder_name):
query = f"name='{folder_name}' and mimeType='application/vnd.google-apps.folder'"
results = service.files().list(q=query, fields="files(id)").execute()
folders = results.get('files', [])
if not folders:
folder_metadata = {
'name': folder_name,
'mimeType': 'application/vnd.google-apps.folder'
}
folder = service.files().create(body=folder_metadata, fields='id').execute()
return folder['id']
else:
return folders[0]['id']
def upload_file(file):
global user_creds, user_info
if not user_creds:
return "Please login first."
try:
drive_service = build(API_SERVICE_NAME, API_VERSION, credentials=user_creds)
folder_id = create_folder_if_not_exists(drive_service, 'GradioUploads')
file_metadata = {
'name': os.path.basename(file.name),
'parents': [folder_id]
}
media = MediaFileUpload(file.name, resumable=True)
file = drive_service.files().create(body=file_metadata, media_body=media, fields='id,webViewLink,name').execute()
uploaded_filename = file.get('name')
# Set the file to be publicly accessible
permission = {
'type': 'anyone',
'role': 'reader',
}
drive_service.permissions().create(fileId=file['id'], body=permission).execute()
file_id = file['id']
function_variables['file_name']=file.get('name')
download_link = f"https://drive.google.com/uc?export=download&id={file_id}"
function_variables['download_link'] = download_link
return f"File uploaded by: {user_info.get('name', 'Unknown')}\nShareable Link: {file['webViewLink']}\nDownloadable Link: {function_variables['download_link'] }"
except Exception as e:
return f"An error occurred: {str(e)}"
def transcribe_audio():
try:
deepgram = DeepgramClient(DEEPGRAM_API_KEY)
options = PrerecordedOptions(
model="nova-2",
language="en",
smart_format=True,
)
#AUDIO_URL = function_variables['download_link']
AUDIO_URL = {
"url": str(function_variables['download_link'])
}
#print(f"Audio link: {AUDIO_URL}") # Debugging line to verify URL
response = deepgram.listen.prerecorded.v("1").transcribe_url(AUDIO_URL, options)
response_json = response.to_dict() # Convert response to a dictionary
transcript = response_json['results']['channels'][0]['alternatives'][0]['transcript']
function_variables['transcript'] = transcript
#return audio_link
return transcript
except Exception as e:
print(f"Exception: {e}")
def print_transcript():
try:
audioname=str(function_variables['file_name'])
transcript_file_name = re.sub(r'\..*', '', audioname) + "_transcript.txt"
with open(transcript_file_name, 'w') as file:
file.write(function_variables['transcript'])
function_variables['transcript_file_path'] = transcript_file_name # Save the file path
return transcript_file_name
#return function_variables['download_link']
#return transcript_file_name, text_op
except Exception as e:
return f"An error occurred while saving the transcript: {str(e)}"
# Gradio interface
with gr.Blocks() as demo:
gr.Markdown("# Upload File to Google Drive")
with gr.Row():
login_button = gr.Button("Login with Google")
login_output = gr.Textbox(label="Login Status", lines=3)
with gr.Row():
file_input = gr.File(label="Upload a file")
upload_output = gr.Textbox(label="Upload Result")
upload_button = gr.Button("Upload to Google Drive")
with gr.Row():
transcribe_button = gr.Button("Transcribe the audio")
print_transcript_button = gr.Button("Print Transcript")
downloadble_preview = gr.Textbox(label="Transcript")
download_file = gr.File(label="Download file")
login_button.click(login, outputs=login_output)
upload_button.click(upload_file, inputs=file_input, outputs=upload_output)
transcribe_button.click(transcribe_audio, outputs=downloadble_preview)
#print_transcript_button.click(print_transcript, outputs=downloadble_preview)
print_transcript_button.click(print_transcript, outputs=download_file)
if __name__ == '__main__':
demo.launch()