Spaces:
Sleeping
Sleeping
import streamlit as st | |
from faster_whisper import WhisperModel | |
from moviepy.editor import VideoFileClip | |
import os | |
import traceback | |
from instagrapi import Client | |
from pathlib import Path | |
import torch | |
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline | |
def convert_to_mp3(video_file): | |
"""Converts video to audio directly using `ffmpeg` command | |
with the help of subprocess module""" | |
video_clip = VideoFileClip(video_file) | |
# Extract the audio from the video clip | |
audio_clip = video_clip.audio | |
# Write the audio to a separate file | |
filename, ext = os.path.splitext(video_file) | |
audio_clip.write_audiofile(filename + ".mp3") | |
# Close the video and audio clips | |
audio_clip.close() | |
video_clip.close() | |
os.remove(video_file) | |
return filename + ".mp3" | |
def get_model(): | |
model_size = "large-v3" | |
model = WhisperModel(model_size, device="cuda", compute_type="float16") | |
return model | |
def transcribe_audio(audio_file): | |
model = get_model() | |
st.info("transcribe_audio") | |
segments, info = model.transcribe(audio_file, beam_size=1) | |
st.info("transcribe_audio done..") | |
return segments | |
def transcribe_post(): | |
file_name = None | |
media_pk: str = None | |
with st.spinner("Searching reel..."): | |
try: | |
media_pk = st.session_state.insta.media_pk_from_url(st.session_state.reel_id) | |
if media_pk is None: | |
st.warning("Invalid reel!") | |
except Exception as e: | |
st.error(traceback.format_exc()) | |
st.error("Cannot load post") | |
if media_pk and not st.session_state.file_result: | |
ok = False | |
with st.spinner("Loading reel..."): | |
try: | |
# write to temp | |
media_path:Path = st.session_state.insta.video_download(media_pk, "/data") | |
file_name = "/data/" + media_path.name | |
ok = True | |
except Exception as e: | |
st.error(traceback.format_exc()) | |
st.error("Cannot download reel!") | |
if ok: | |
file_name_audio = None | |
with st.spinner("Extracting audio..."): | |
file_name_audio = convert_to_mp3(file_name) | |
st.session_state.file_result = file_name_audio + ".txt" | |
st.success("Audio extracted!") | |
with st.spinner("Final step: transcribing audio..."): | |
try: | |
segments = transcribe_audio(file_name_audio) | |
st.info("Transcription done! Saving...") | |
st.session_state.file_transcript = "/data/" + st.session_state.file_result | |
with open(st.session_state.file_transcript, "w", encoding="utf-8") as f: | |
for segment in segments: | |
f.writelines("[" + str(segment.start) + "=>" + str(segment.end) + "]: " + segment.text) | |
except Exception as e: | |
st.error(traceback.format_exc()) | |
st.error("Cannot transcribe audio!") | |
if not st.session_state.file_transcript: | |
st.error("No transcription found!") | |
else: | |
st.balloons() | |
if st.session_state.file_transcript and st.session_state.file_result: | |
st.header('Results', divider='orange') | |
data = '' | |
try: | |
with open(st.session_state.file_transcript, "r", encoding="utf-8") as f: | |
data = f.read() | |
# st.text_area("Transcript", data, disabled=True, height=300) | |
st.download_button( | |
label="Download transcript", | |
data=data, | |
file_name=st.session_state.file_result, | |
mime="text/plain", | |
) | |
except Exception as e: | |
st.error(traceback.format_exc()) | |
st.error("Cannot load transcript result!") | |
def load_profile(username): | |
with st.spinner("Loading profile..."): | |
try: | |
st.session_state.profile = st.session_state.insta.user_info_by_username(username) | |
st.info("Profile loaded!") | |
except Exception as e: | |
st.error("Profile not found!") | |
def exec_transcribe(): | |
if st.session_state.logged_in: | |
st.header('Transcription', divider='violet') | |
with st.container(border=True): | |
target_user = st.text_input("Profile", placeholder="Enter target profile") | |
if st.button("Check"): | |
if target_user: | |
load_profile(target_user) | |
else: | |
st.warning("Please enter username") | |
if st.session_state.profile: | |
reel_id = st.text_input("Enter reel url") | |
if st.button("Transcribe"): | |
if not reel_id: | |
st.warning("Please enter reel url") | |
else: | |
st.session_state.reel_id = reel_id | |
if st.session_state.reel_id: | |
transcribe_post() | |
def login_user(username: str, password: str): | |
if st.button("Login"): | |
if username and password: | |
with st.spinner("Logging in..."): | |
st.session_state.insta.login(username, password) | |
st.session_state.insta.dump_settings("/data/session.json") | |
st.session_state.logged_in = True | |
else: | |
st.session_state.logged_in = False | |
st.warning("Please enter username and password") | |
def initialize_app(): | |
st.set_page_config( | |
page_title="Transcriber", | |
page_icon="public/favicon.ico", | |
menu_items={ | |
"About": "### Contact\n ✉️florinbobis@gmail.com", | |
}, | |
) | |
st.title("✍️Transcribe your reel") | |
if not "insta" in st.session_state: | |
st.session_state.insta = None | |
if not "profile" in st.session_state: | |
st.session_state.profile = None | |
if not "logged_in" in st.session_state: | |
st.session_state.logged_in = False | |
if not "file_transcript" in st.session_state: | |
st.session_state.file_transcript = None | |
if not "file_result" in st.session_state: | |
st.session_state.file_result = None | |
if not "reel_id" in st.session_state: | |
st.session_state.reel_id = None | |
def init_ig() -> Client: | |
cl = Client() | |
cl.delay_range = [1, 3] | |
try: | |
cl.load_settings("/data/session.json") | |
except Exception as e: | |
print(e) | |
return cl | |
def main(): | |
initialize_app() | |
st.session_state.insta = init_ig() | |
st.header('IG Login', divider='blue') | |
with st.container(border=True): | |
username = st.text_input("Username", placeholder='Please enter your username') | |
password = st.text_input("Password", type="password", placeholder='Please enter your password') | |
login_user(username, password) | |
exec_transcribe() | |
if __name__ == "__main__": | |
main() | |