Spaces:
Runtime error
Runtime error
from authlib.integrations.flask_client import OAuth | |
from authlib.common.security import generate_token | |
import ffmpeg | |
from flask import Flask, render_template, request, jsonify, url_for, redirect, session | |
from functools import wraps | |
import os | |
import streamlink | |
import threading | |
import time | |
from faster_whisper import WhisperModel | |
import subprocess | |
from datetime import datetime as dt | |
from datetime import timedelta, timezone | |
from apiclient import discovery | |
from google.oauth2 import service_account | |
import json | |
# Import secrets | |
client_secret = json.loads(os.environ.get("client_secret")) | |
gdoc_id = os.environ.get("gdoc_id") | |
GOOGLE_CLIENT_SECRET = os.environ.get("GOOGLE_CLIENT_SECRET") | |
GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_CLIENT_ID") | |
allowed_users = os.environ.get("allowed_users") | |
# Faster Whisper setup | |
model_size = 'small' | |
beamsize = 2 | |
wmodel = WhisperModel(model_size, device="cpu", compute_type="int8") | |
# Delete local_transcript if it exists | |
if not os.path.exists('transcription_files'): os.makedirs('transcription_files') | |
for f in os.listdir('transcription_files/'): os.remove(os.path.join('transcription_files/', f)) # clear any old files in transcription_files folder | |
with open("client_secret.json", "w") as json_file: json.dump(client_secret, json_file, indent=4) | |
scopes = ["https://www.googleapis.com/auth/documents", "https://www.googleapis.com/auth/drive.file"] | |
credentials = service_account.Credentials.from_service_account_file('client_secret.json', scopes=scopes) | |
service = discovery.build('docs', 'v1', credentials=credentials) | |
local_tz = 5.5 # For timestamps | |
local_transcript = 'transcription_files/tr.txt' | |
pid_file = 'transcription_files/pid.txt' | |
# Check if mp3 folder exists, and create it if it doesn't | |
if not os.path.exists('mp3'): os.makedirs('mp3') | |
# Delete any old files in mp3 folder | |
for f in os.listdir('mp3/'): os.remove(os.path.join('mp3/', f)) | |
app = Flask(__name__, static_url_path='/static') | |
app.secret_key = os.urandom(12) | |
oauth = OAuth(app) | |
# Store the streamlink process | |
stream_process = None | |
recording = False | |
mp3_extraction_process = None | |
def update_gdoc(text, gdoc_id): # Update contents of google doc | |
print('Updating Google Doc', gdoc_id) | |
doc = service.documents().get(documentId=gdoc_id).execute() | |
endindex = [p['endIndex'] for p in doc['body']['content'] if 'paragraph' in p][-1] | |
try: | |
body = {'requests': [{'insertText': {'location': {'index': endindex-1,}, 'text': ' ' + text}}]} | |
result = service.documents().batchUpdate(documentId=gdoc_id, body=body).execute() | |
print(result) | |
except Exception as e: | |
print(e) | |
def process_complete_callback(retcode, **kwargs): | |
if retcode == 0: | |
print("FFmpeg process completed successfully!") | |
else: | |
print("FFmpeg process encountered an error.") | |
def transcribe_audio(latest_file, time_counter): | |
print('transcribing ', latest_file) | |
segments, info = wmodel.transcribe(f"{latest_file}", beam_size=beamsize) # beamsize is 2. | |
text = '' | |
for segment in segments: | |
text += segment.text | |
transcribed = text.replace('\n', ' ').replace(' ', ' ') | |
if time_counter%5 == 0: | |
transcribed_sents = transcribed.split('. ') # Get the first fullstop break and append to previous para, before adding time code | |
transcribed = transcribed_sents[0] + '\nTime ' + str((dt.now(timezone.utc) + timedelta(hours=local_tz)).strftime('%H:%M:%S')) + '\n' + '. '.join(transcribed_sents[1:]) | |
time_counter += 1 | |
return transcribed, time_counter | |
def save_audio(youtube_url): | |
global stream_process, recording, mp3_extraction_process | |
try: | |
streams = streamlink.streams(youtube_url) | |
#if "audio" not in streams: | |
# raise Exception("No audio stream found.") | |
stream_url = streams["144p"].url | |
time_counter = 0 | |
while recording: | |
# Save audio only into mp3 files | |
saved_mp3 = f"mp3/audio_{int(time.time())}.mp3" | |
mp3_extraction_process = ( | |
ffmpeg | |
.input(stream_url, t=30) | |
.audio | |
# TODO - change destination url to relevant url | |
.output(saved_mp3) | |
.overwrite_output() | |
.global_args('-loglevel', 'panic') | |
.run_async() | |
) | |
print('pid', mp3_extraction_process.pid) | |
# write the pid to pid_file | |
with open(pid_file, 'w') as f: f.write(str(mp3_extraction_process.pid)) | |
# If there is more than one mp3 file in the folder, transcribe the one that is not being written to | |
mp3files = [f for f in os.listdir('mp3') if f.endswith('.mp3')] | |
if len(mp3files) < 2: | |
print('Sleeping for 30s as only one mp3 file in folder') | |
time.sleep(30) | |
else: | |
starttime = time.time() | |
file_to_transcribe = [f for f in mp3files if f != os.path.basename(saved_mp3)][0] | |
print('Working on ', file_to_transcribe) | |
transcribed, time_counter = transcribe_audio(f'mp3/{file_to_transcribe}', time_counter) | |
os.remove(f'mp3/{file_to_transcribe}') | |
update_gdoc(transcribed, gdoc_id) | |
with open(local_transcript, 'a', encoding='utf-8', errors='ignore') as f: f.write(transcribed) | |
elapsed_time = time.time() - starttime | |
print('Time to transcribe:', elapsed_time, 'seconds') | |
if elapsed_time < 30: | |
print(f'Sleeping for {30-elapsed_time} as there are more than one mp3 files in folder') | |
time.sleep(30-elapsed_time) | |
#time.sleep(30) | |
except Exception as e: | |
recording = False | |
print('exception', str(e)) | |
return str(e) | |
def start_process(): | |
if not os.path.isfile(local_transcript): | |
global recording, stream_process | |
with open(local_transcript, 'a', encoding='utf-8', errors='ignore') as f: f.write('') # Create the local transcript file, which is used as a check to prevent multiple recordings | |
youtube_url = request.form.get("url") | |
if not youtube_url: | |
return jsonify({"message": "Please provide a valid YouTube URL."}), 400 | |
if recording: | |
return jsonify({"message": "A recording is already in progress."}), 400 | |
print('In start process') | |
recording = True | |
stream_process = threading.Thread(target=save_audio, args=(youtube_url,)) | |
stream_process.start() | |
return jsonify({"message": "Recording started."}), 200 | |
else: return jsonify({"message": "Recording is already in progress."}), 400 | |
def stop_process(): | |
global recording, stream_process, mp3_extraction_process | |
if not recording: | |
return jsonify({"message": "No recording is currently in progress."}), 400 | |
print('In stop process') | |
recording = False | |
stream_process.join() | |
stream_process = None | |
mp3_extraction_process.terminate() | |
mp3_extraction_process = None | |
for f in os.listdir('mp3/'): os.remove(os.path.join('mp3/', f)) | |
if os.path.isfile(local_transcript): os.remove(local_transcript) | |
# check if pid_file exists, get the pid inside it and convert to int, and use os.kill to kill it | |
if os.path.isfile(pid_file): | |
with open(pid_file, 'r') as f: pid = int(f.read()) | |
try: os.kill(pid, 9) # For linux | |
except: | |
try: | |
process = subprocess.Popen(["taskkill", "/F", "/PID", str(pid)], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # For Windows | |
process.communicate() | |
print("Process terminated successfully.") | |
except Exception as e: | |
print("Error:", e) | |
os.remove(pid_file) | |
return jsonify({"message": "Recording stopped."}), 200 | |
def google(): | |
CONF_URL = 'https://accounts.google.com/.well-known/openid-configuration' | |
oauth.register( | |
name='google', | |
client_id=GOOGLE_CLIENT_ID, | |
client_secret=GOOGLE_CLIENT_SECRET, | |
server_metadata_url=CONF_URL, | |
client_kwargs={"scope": "openid email profile"} | |
) | |
# Redirect to google_auth function/page | |
redirect_uri = url_for('google_auth', _external=True) | |
session['nonce'] = generate_token() | |
return oauth.google.authorize_redirect(redirect_uri, nonce=session['nonce']) | |
def google_auth(): | |
token = oauth.google.authorize_access_token() | |
user = oauth.google.parse_id_token(token, nonce=session['nonce']) | |
session['user'] = user | |
print('USER', user) | |
# Redirect to home if login successful | |
return redirect('/home') | |
def is_not_logged_in(): | |
return session.get('user') is None or session.get('nonce') is None | |
# decorator to check if user is logged in, used for protected URLs | |
def login_required(f): | |
def decorated_function(*args, **kwargs): | |
if is_not_logged_in(): | |
return redirect('/login') | |
return f(*args, **kwargs) | |
return decorated_function | |
def home(): | |
return render_template("home.html") | |
def login(): | |
if not is_not_logged_in(): | |
return redirect("/home") | |
#return render_template("login.html") | |
return render_template("home.html") | |
if __name__ == "__main__": | |
app.run(host="0.0.0.0", debug=True, port=7860) |