livetranscribe / app.py
Reggie's picture
process kill prints
0e033bb
from authlib.integrations.flask_client import OAuth
from authlib.common.security import generate_token
import ffmpeg
from flask import Flask, render_template, request, jsonify, url_for, redirect, session
from functools import wraps
import os
import streamlink
import threading
import time
from faster_whisper import WhisperModel
import subprocess
from datetime import datetime as dt
from datetime import timedelta, timezone
from apiclient import discovery
from google.oauth2 import service_account
import json
# Import secrets
client_secret = json.loads(os.environ.get("client_secret"))
gdoc_id = os.environ.get("gdoc_id")
GOOGLE_CLIENT_SECRET = os.environ.get("GOOGLE_CLIENT_SECRET")
GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_CLIENT_ID")
allowed_users = os.environ.get("allowed_users")
# Faster Whisper setup
model_size = 'small'
beamsize = 2
wmodel = WhisperModel(model_size, device="cpu", compute_type="int8")
# Delete local_transcript if it exists
if not os.path.exists('transcription_files'): os.makedirs('transcription_files')
for f in os.listdir('transcription_files/'): os.remove(os.path.join('transcription_files/', f)) # clear any old files in transcription_files folder
with open("client_secret.json", "w") as json_file: json.dump(client_secret, json_file, indent=4)
scopes = ["https://www.googleapis.com/auth/documents", "https://www.googleapis.com/auth/drive.file"]
credentials = service_account.Credentials.from_service_account_file('client_secret.json', scopes=scopes)
service = discovery.build('docs', 'v1', credentials=credentials)
local_tz = 5.5 # For timestamps
local_transcript = 'transcription_files/tr.txt'
pid_file = 'transcription_files/pid.txt'
# Check if mp3 folder exists, and create it if it doesn't
if not os.path.exists('mp3'): os.makedirs('mp3')
# Delete any old files in mp3 folder
for f in os.listdir('mp3/'): os.remove(os.path.join('mp3/', f))
app = Flask(__name__, static_url_path='/static')
app.secret_key = os.urandom(12)
oauth = OAuth(app)
# Store the streamlink process
stream_process = None
recording = False
mp3_extraction_process = None
def update_gdoc(text, gdoc_id): # Update contents of google doc
print('Updating Google Doc', gdoc_id)
doc = service.documents().get(documentId=gdoc_id).execute()
endindex = [p['endIndex'] for p in doc['body']['content'] if 'paragraph' in p][-1]
try:
body = {'requests': [{'insertText': {'location': {'index': endindex-1,}, 'text': ' ' + text}}]}
result = service.documents().batchUpdate(documentId=gdoc_id, body=body).execute()
print(result)
except Exception as e:
print(e)
def process_complete_callback(retcode, **kwargs):
if retcode == 0:
print("FFmpeg process completed successfully!")
else:
print("FFmpeg process encountered an error.")
def transcribe_audio(latest_file, time_counter):
print('transcribing ', latest_file)
segments, info = wmodel.transcribe(f"{latest_file}", beam_size=beamsize) # beamsize is 2.
text = ''
for segment in segments:
text += segment.text
transcribed = text.replace('\n', ' ').replace(' ', ' ')
if time_counter%5 == 0:
transcribed_sents = transcribed.split('. ') # Get the first fullstop break and append to previous para, before adding time code
transcribed = transcribed_sents[0] + '\nTime ' + str((dt.now(timezone.utc) + timedelta(hours=local_tz)).strftime('%H:%M:%S')) + '\n' + '. '.join(transcribed_sents[1:])
time_counter += 1
return transcribed, time_counter
def save_audio(youtube_url):
global stream_process, recording, mp3_extraction_process
try:
streams = streamlink.streams(youtube_url)
#if "audio" not in streams:
# raise Exception("No audio stream found.")
stream_url = streams["144p"].url
time_counter = 0
while recording:
# Save audio only into mp3 files
saved_mp3 = f"mp3/audio_{int(time.time())}.mp3"
mp3_extraction_process = (
ffmpeg
.input(stream_url, t=30)
.audio
# TODO - change destination url to relevant url
.output(saved_mp3)
.overwrite_output()
.global_args('-loglevel', 'panic')
.run_async()
)
print('pid', mp3_extraction_process.pid)
# write the pid to pid_file
with open(pid_file, 'w') as f: f.write(str(mp3_extraction_process.pid))
# If there is more than one mp3 file in the folder, transcribe the one that is not being written to
mp3files = [f for f in os.listdir('mp3') if f.endswith('.mp3')]
if len(mp3files) < 2:
print('Sleeping for 30s as only one mp3 file in folder')
time.sleep(30)
else:
starttime = time.time()
file_to_transcribe = [f for f in mp3files if f != os.path.basename(saved_mp3)][0]
print('Working on ', file_to_transcribe)
transcribed, time_counter = transcribe_audio(f'mp3/{file_to_transcribe}', time_counter)
os.remove(f'mp3/{file_to_transcribe}')
update_gdoc(transcribed, gdoc_id)
with open(local_transcript, 'a', encoding='utf-8', errors='ignore') as f: f.write(transcribed)
elapsed_time = time.time() - starttime
print('Time to transcribe:', elapsed_time, 'seconds')
if elapsed_time < 30:
print(f'Sleeping for {30-elapsed_time} as there are more than one mp3 files in folder')
time.sleep(30-elapsed_time)
#time.sleep(30)
except Exception as e:
recording = False
print('exception', str(e))
return str(e)
@app.route("/start_process", methods=["POST"])
def start_process():
if not os.path.isfile(local_transcript):
global recording, stream_process
with open(local_transcript, 'a', encoding='utf-8', errors='ignore') as f: f.write('') # Create the local transcript file, which is used as a check to prevent multiple recordings
youtube_url = request.form.get("url")
if not youtube_url:
return jsonify({"message": "Please provide a valid YouTube URL."}), 400
if recording:
return jsonify({"message": "A recording is already in progress."}), 400
print('In start process')
recording = True
stream_process = threading.Thread(target=save_audio, args=(youtube_url,))
stream_process.start()
return jsonify({"message": "Recording started."}), 200
else: return jsonify({"message": "Recording is already in progress."}), 400
@app.route("/stop_process", methods=["POST"])
def stop_process():
global recording, stream_process, mp3_extraction_process
if not recording:
return jsonify({"message": "No recording is currently in progress."}), 400
print('In stop process')
recording = False
stream_process.join()
stream_process = None
mp3_extraction_process.terminate()
mp3_extraction_process = None
for f in os.listdir('mp3/'): os.remove(os.path.join('mp3/', f))
if os.path.isfile(local_transcript): os.remove(local_transcript)
# check if pid_file exists, get the pid inside it and convert to int, and use os.kill to kill it
if os.path.isfile(pid_file):
with open(pid_file, 'r') as f: pid = int(f.read())
try:
os.kill(pid, 9) # For linux
print("Process terminated successfully in Linux.")
except:
try:
process = subprocess.Popen(["taskkill", "/F", "/PID", str(pid)], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # For Windows
process.communicate()
print("Process terminated successfully in Windows.")
except Exception as e:
print("Error:", e)
os.remove(pid_file)
return jsonify({"message": "Recording stopped."}), 200
@app.route('/google/')
def google():
CONF_URL = 'https://accounts.google.com/.well-known/openid-configuration'
oauth.register(
name='google',
client_id=GOOGLE_CLIENT_ID,
client_secret=GOOGLE_CLIENT_SECRET,
server_metadata_url=CONF_URL,
client_kwargs={"scope": "openid email profile"}
)
# Redirect to google_auth function/page
redirect_uri = url_for('google_auth', _external=True)
session['nonce'] = generate_token()
return oauth.google.authorize_redirect(redirect_uri, nonce=session['nonce'])
@app.route('/google/auth/')
def google_auth():
token = oauth.google.authorize_access_token()
user = oauth.google.parse_id_token(token, nonce=session['nonce'])
session['user'] = user
print('USER', user)
# Redirect to home if login successful
return redirect('/home')
def is_not_logged_in():
return session.get('user') is None or session.get('nonce') is None
# decorator to check if user is logged in, used for protected URLs
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if is_not_logged_in():
return redirect('/login')
return f(*args, **kwargs)
return decorated_function
@app.route("/home")
@login_required
def home():
return render_template("home.html")
@app.route("/", methods=["GET"])
@app.route("/login", methods=["GET"])
def login():
if not is_not_logged_in():
return redirect("/home")
#return render_template("login.html")
return render_template("home.html")
if __name__ == "__main__":
app.run(host="0.0.0.0", debug=True, port=7860)