from authlib.integrations.flask_client import OAuth from authlib.common.security import generate_token import ffmpeg from flask import Flask, render_template, request, jsonify, url_for, redirect, session from functools import wraps import os import streamlink import threading import time from faster_whisper import WhisperModel import subprocess from datetime import datetime as dt from datetime import timedelta, timezone from apiclient import discovery from google.oauth2 import service_account import json # Import secrets client_secret = json.loads(os.environ.get("client_secret")) gdoc_id = os.environ.get("gdoc_id") GOOGLE_CLIENT_SECRET = os.environ.get("GOOGLE_CLIENT_SECRET") GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_CLIENT_ID") allowed_users = os.environ.get("allowed_users") # Faster Whisper setup model_size = 'small' beamsize = 2 wmodel = WhisperModel(model_size, device="cpu", compute_type="int8") # Delete local_transcript if it exists if not os.path.exists('transcription_files'): os.makedirs('transcription_files') for f in os.listdir('transcription_files/'): os.remove(os.path.join('transcription_files/', f)) # clear any old files in transcription_files folder with open("client_secret.json", "w") as json_file: json.dump(client_secret, json_file, indent=4) scopes = ["https://www.googleapis.com/auth/documents", "https://www.googleapis.com/auth/drive.file"] credentials = service_account.Credentials.from_service_account_file('client_secret.json', scopes=scopes) service = discovery.build('docs', 'v1', credentials=credentials) local_tz = 5.5 # For timestamps local_transcript = 'transcription_files/tr.txt' pid_file = 'transcription_files/pid.txt' # Check if mp3 folder exists, and create it if it doesn't if not os.path.exists('mp3'): os.makedirs('mp3') # Delete any old files in mp3 folder for f in os.listdir('mp3/'): os.remove(os.path.join('mp3/', f)) app = Flask(__name__, static_url_path='/static') app.secret_key = os.urandom(12) oauth = OAuth(app) # Store the streamlink process stream_process = None recording = False mp3_extraction_process = None def update_gdoc(text, gdoc_id): # Update contents of google doc print('Updating Google Doc', gdoc_id) doc = service.documents().get(documentId=gdoc_id).execute() endindex = [p['endIndex'] for p in doc['body']['content'] if 'paragraph' in p][-1] try: body = {'requests': [{'insertText': {'location': {'index': endindex-1,}, 'text': ' ' + text}}]} result = service.documents().batchUpdate(documentId=gdoc_id, body=body).execute() print(result) except Exception as e: print(e) def process_complete_callback(retcode, **kwargs): if retcode == 0: print("FFmpeg process completed successfully!") else: print("FFmpeg process encountered an error.") def transcribe_audio(latest_file, time_counter): print('transcribing ', latest_file) segments, info = wmodel.transcribe(f"{latest_file}", beam_size=beamsize) # beamsize is 2. text = '' for segment in segments: text += segment.text transcribed = text.replace('\n', ' ').replace(' ', ' ') if time_counter%5 == 0: transcribed_sents = transcribed.split('. ') # Get the first fullstop break and append to previous para, before adding time code transcribed = transcribed_sents[0] + '\nTime ' + str((dt.now(timezone.utc) + timedelta(hours=local_tz)).strftime('%H:%M:%S')) + '\n' + '. '.join(transcribed_sents[1:]) time_counter += 1 return transcribed, time_counter def save_audio(youtube_url): global stream_process, recording, mp3_extraction_process try: streams = streamlink.streams(youtube_url) #if "audio" not in streams: # raise Exception("No audio stream found.") stream_url = streams["144p"].url time_counter = 0 while recording: # Save audio only into mp3 files saved_mp3 = f"mp3/audio_{int(time.time())}.mp3" mp3_extraction_process = ( ffmpeg .input(stream_url, t=30) .audio # TODO - change destination url to relevant url .output(saved_mp3) .overwrite_output() .global_args('-loglevel', 'panic') .run_async() ) print('pid', mp3_extraction_process.pid) # write the pid to pid_file with open(pid_file, 'w') as f: f.write(str(mp3_extraction_process.pid)) # If there is more than one mp3 file in the folder, transcribe the one that is not being written to mp3files = [f for f in os.listdir('mp3') if f.endswith('.mp3')] if len(mp3files) < 2: print('Sleeping for 30s as only one mp3 file in folder') time.sleep(30) else: starttime = time.time() file_to_transcribe = [f for f in mp3files if f != os.path.basename(saved_mp3)][0] print('Working on ', file_to_transcribe) transcribed, time_counter = transcribe_audio(f'mp3/{file_to_transcribe}', time_counter) os.remove(f'mp3/{file_to_transcribe}') update_gdoc(transcribed, gdoc_id) with open(local_transcript, 'a', encoding='utf-8', errors='ignore') as f: f.write(transcribed) elapsed_time = time.time() - starttime print('Time to transcribe:', elapsed_time, 'seconds') if elapsed_time < 30: print(f'Sleeping for {30-elapsed_time} as there are more than one mp3 files in folder') time.sleep(30-elapsed_time) #time.sleep(30) except Exception as e: recording = False print('exception', str(e)) return str(e) @app.route("/start_process", methods=["POST"]) def start_process(): if not os.path.isfile(local_transcript): global recording, stream_process with open(local_transcript, 'a', encoding='utf-8', errors='ignore') as f: f.write('') # Create the local transcript file, which is used as a check to prevent multiple recordings youtube_url = request.form.get("url") if not youtube_url: return jsonify({"message": "Please provide a valid YouTube URL."}), 400 if recording: return jsonify({"message": "A recording is already in progress."}), 400 print('In start process') recording = True stream_process = threading.Thread(target=save_audio, args=(youtube_url,)) stream_process.start() return jsonify({"message": "Recording started."}), 200 else: return jsonify({"message": "Recording is already in progress."}), 400 @app.route("/stop_process", methods=["POST"]) def stop_process(): global recording, stream_process, mp3_extraction_process if not recording: return jsonify({"message": "No recording is currently in progress."}), 400 print('In stop process') recording = False stream_process.join() stream_process = None mp3_extraction_process.terminate() mp3_extraction_process = None for f in os.listdir('mp3/'): os.remove(os.path.join('mp3/', f)) if os.path.isfile(local_transcript): os.remove(local_transcript) # check if pid_file exists, get the pid inside it and convert to int, and use os.kill to kill it if os.path.isfile(pid_file): with open(pid_file, 'r') as f: pid = int(f.read()) try: os.kill(pid, 9) # For linux print("Process terminated successfully in Linux.") except: try: process = subprocess.Popen(["taskkill", "/F", "/PID", str(pid)], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # For Windows process.communicate() print("Process terminated successfully in Windows.") except Exception as e: print("Error:", e) os.remove(pid_file) return jsonify({"message": "Recording stopped."}), 200 @app.route('/google/') def google(): CONF_URL = 'https://accounts.google.com/.well-known/openid-configuration' oauth.register( name='google', client_id=GOOGLE_CLIENT_ID, client_secret=GOOGLE_CLIENT_SECRET, server_metadata_url=CONF_URL, client_kwargs={"scope": "openid email profile"} ) # Redirect to google_auth function/page redirect_uri = url_for('google_auth', _external=True) session['nonce'] = generate_token() return oauth.google.authorize_redirect(redirect_uri, nonce=session['nonce']) @app.route('/google/auth/') def google_auth(): token = oauth.google.authorize_access_token() user = oauth.google.parse_id_token(token, nonce=session['nonce']) session['user'] = user print('USER', user) # Redirect to home if login successful return redirect('/home') def is_not_logged_in(): return session.get('user') is None or session.get('nonce') is None # decorator to check if user is logged in, used for protected URLs def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if is_not_logged_in(): return redirect('/login') return f(*args, **kwargs) return decorated_function @app.route("/home") @login_required def home(): return render_template("home.html") @app.route("/", methods=["GET"]) @app.route("/login", methods=["GET"]) def login(): if not is_not_logged_in(): return redirect("/home") #return render_template("login.html") return render_template("home.html") if __name__ == "__main__": app.run(host="0.0.0.0", debug=True, port=7860)