Spaces:
Sleeping
Sleeping
File size: 20,804 Bytes
23d4cfa 0b484f3 23d4cfa feefcf3 23d4cfa 8bec4e8 1c45af0 fb7175b 3e04bc5 67dfaaf 23d4cfa f834df3 3e04bc5 f834df3 3e04bc5 f834df3 3e04bc5 f834df3 3e04bc5 f834df3 23d4cfa e9724c9 fc4e5a1 e9724c9 fc4e5a1 e9724c9 fc4e5a1 e9724c9 fc4e5a1 1c45af0 e9724c9 fc4e5a1 8bec4e8 23d4cfa 8bec4e8 23d4cfa 3f4d8ed 23d4cfa 3f4d8ed 23d4cfa c58dd90 0dbdfb6 c58dd90 0dbdfb6 0bb279e 8833d57 0bb279e f779c4b 92ea8a8 93b36e1 f779c4b 93b36e1 c685b52 fb7175b c980ffd dafb74c c980ffd dafb74c c980ffd f779c4b c685b52 23d4cfa 92ea8a8 23d4cfa 0dbdfb6 23d4cfa f779c4b a05e96e 23d4cfa 73e1a8e 23d4cfa f779c4b 23d4cfa 73e1a8e da7713e 73e1a8e 23d4cfa f779c4b 23d4cfa f779c4b 73e1a8e 23d4cfa 5917224 6b61f59 c1fb085 a218ae1 c1fb085 6b61f59 f834df3 23d4cfa 93b36e1 641010d 93b36e1 c685b52 93b36e1 115e46c ab9afed 08da491 115e46c 2a6df21 bc388ae 0bb279e 0dbdfb6 de40089 23d4cfa f834df3 d74463a f834df3 2a6df21 c58dd90 2a6df21 65d9c89 23d4cfa 2a6df21 23d4cfa 115e46c 6f7b62a 115e46c abfad1f 6f7b62a 115e46c 2a6df21 115e46c f8fdfda 2a6df21 f8fdfda 115e46c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 |
from pytubefix import YouTube
from pytubefix.cli import on_progress
import time
import math
import gradio as gr
import ffmpeg
from faster_whisper import WhisperModel
import requests
import json
import arabic_reshaper # pip install arabic-reshaper
from bidi.algorithm import get_display # pip install python-bidi
from moviepy import VideoFileClip, TextClip, CompositeVideoClip, AudioFileClip, ColorClip
import pysrt
import instaloader
import time
import re
import concurrent.futures
import os
api_key = "268976:66f4f58a2a905"
def fetch_data(url):
try:
response = requests.get(url)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}")
return None
def download_file(url):
try:
response = requests.get(url.split("#")[0], stream=True)
response.raise_for_status()
print(url.split("#")[1])
with open(url.split("#")[1], 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
print(f"Downloaded successfully: {url.split('#')[1]}")
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}")
def download_chunk(url, start, end, filename, index):
headers = {'Range': f'bytes={start}-{end}'}
response = requests.get(url, headers=headers, stream=True)
response.raise_for_status()
chunk_filename = f'{filename}.part{index}'
with open(chunk_filename, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
return chunk_filename
def merge_files(filename, num_parts):
with open(filename, 'wb') as output_file:
for i in range(num_parts):
part_filename = f'{filename}.part{i}'
with open(part_filename, 'rb') as part_file:
output_file.write(part_file.read())
# Optionally, delete the part file after merging
# os.remove(part_filename)
def download_file_in_parallel(link, size, num_threads=4):
url = link.split("#")[0]
filename = link.split("#")[1]
print(url+" filename: "+filename)
response = requests.head(url)
#file_size = int(response.headers['Content-Length'])
chunk_size = size // num_threads
ranges = [(i * chunk_size, (i + 1) * chunk_size - 1) for i in range(num_threads)]
ranges[-1] = (ranges[-1][0], size - 1) # Adjust the last range to the end of the file
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [
executor.submit(download_chunk, url, start, end, filename, i)
for i, (start, end) in enumerate(ranges)
]
for future in concurrent.futures.as_completed(futures):
future.result() # Ensure all threads complete
merge_files(filename, num_threads)
print(f'Downloaded successfully: {filename}')
def one_youtube(link, api_key):
# Fetch video ID
video_id_url = f"https://one-api.ir/youtube/?token={api_key}&action=getvideoid&link={link}"
video_data = fetch_data(video_id_url)
if not video_data:
return None, None
video_id = video_data["result"]
# Fetch video data
filter_option = "" # Replace with your filter option
video_data_url = f"https://youtube.one-api.ir/?token={api_key}&action=fullvideo&id={video_id}&filter={filter_option}"
video_data_2 = fetch_data(video_data_url)
if not video_data_2:
return None, None
formats_list = video_data_2["result"]["formats"]
file_name = video_data_2["result"]["title"]
video_name = f'{file_name}.mp4'
audio_name = f'{file_name}.mp3'
for f in formats_list:
if f["format_note"] == "360p":
download_id = f["id"]
video_size = f["filesize"]
for f in formats_list:
if f["format_note"] == "medium":
audio_id = f["id"]
audio_size = f["filesize"]
if not download_id or not audio_id:
return None, None
# Fetch video and audio links
video_link_url = f"https://youtube.one-api.ir/?token={api_key}&action=download&id={download_id}"
audio_link_url = f"https://youtube.one-api.ir/?token={api_key}&action=download&id={audio_id}"
video_link_data = fetch_data(video_link_url)
audio_link_data = fetch_data(audio_link_url)
if not video_link_data or not audio_link_data:
return None, None
video_link = video_link_data["result"]["link"]
audio_link = audio_link_data["result"]["link"]
vid_str=video_link+"#"+video_name
audio_str=audio_link+"#"+audio_name
# Download video and audio files
print(video_size , audio_size)
download_file_in_parallel(vid_str, video_size)
download_file_in_parallel(audio_str, audio_size)
return video_name, audio_name
# Define your functions here
def yt_download(url):
yt = YouTube(url)
print(yt.title)
video_path = f"{yt.title}.mp4"
ys = yt.streams.get_highest_resolution()
print(ys)
ys.download()
return video_path, yt.title
def insta_oneapi(url, api_key):
shortcode = url.split("/")[-1]
print(shortcode)
url_one="https://api.one-api.ir/instagram/v1/post/?shortcode="+shortcode
request_body = [{"shortcode": shortcode},]
headers = {"one-api-token": api_key, "Content-Type": "application/json"}
response = requests.get(url_one, headers=headers)
print(response)
if response.status_code == 200:
result = response.json()
try:
time.sleep(10)
response = requests.get(result["result"]['media'][0]["url"], stream=True)
response.raise_for_status()
with open("video.mp4", 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
print(f"Downloaded successfully")
return "video.mp4"
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}")
else:
print(f"Error: {response.status_code}, {response.text}")
return None
def insta_download(permalink):
# Create an instance of Instaloader
L = instaloader.Instaloader()
try:
# Extract the shortcode from the permalink
if "instagram.com/reel/" in permalink:
shortcode = permalink.split("instagram.com/reel/")[-1].split("/")[0]
elif "instagram.com/p/" in permalink:
shortcode = permalink.split("instagram.com/p/")[-1].split("/")[0]
else:
raise ValueError("Invalid permalink format")
# Load the post using the shortcode
post = instaloader.Post.from_shortcode(L.context, shortcode)
# Check if the post is a video
if not post.is_video:
raise ValueError("The provided permalink is not a video.")
# Get the video URL
video_url = post.video_url
# Extract the filename from the URL
filename = video_url.split("/")[-1]
# Remove query parameters
filename = filename.split("?")[0]
# Download the video using requests
response = requests.get(video_url, stream=True)
response.raise_for_status() # Raise an error for bad responses
# Save the content to a file
with open(filename, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
print(f"Downloaded video {filename} successfully.")
return filename
except Exception as e:
print(f"Failed to download video from {permalink}: {e}")
def extract_audio(input_video_name):
# Define the input video file and output audio file
mp3_file = "audio.mp3"
# Load the video clip
video_clip = VideoFileClip(input_video_name)
# Extract the audio from the video clip
audio_clip = video_clip.audio
# Write the audio to a separate file
audio_clip.write_audiofile(mp3_file)
# Close the video and audio clips
audio_clip.close()
video_clip.close()
print("Audio extraction successful!")
return mp3_file
def transcribe(audio):
model = WhisperModel("tiny")
segments, info = model.transcribe(audio)
segments = list(segments)
for segment in segments:
print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))
return segments
def format_time(seconds):
hours = math.floor(seconds / 3600)
seconds %= 3600
minutes = math.floor(seconds / 60)
seconds %= 60
milliseconds = round((seconds - math.floor(seconds)) * 1000)
seconds = math.floor(seconds)
formatted_time = f"{hours:02d}:{minutes:02d}:{seconds:01d},{milliseconds:03d}"
return formatted_time
def generate_subtitle_file(language, segments, input_video_name):
subtitle_file = f"sub-{input_video_name}.{language}.srt"
text = ""
for index, segment in enumerate(segments):
segment_start = format_time(segment.start)
segment_end = format_time(segment.end)
text += f"{str(index+1)} \n"
text += f"{segment_start} --> {segment_end} \n"
text += f"{segment.text} \n"
text += "\n"
f = open(subtitle_file, "w", encoding='utf8')
f.write(text)
f.close()
return subtitle_file
def read_srt_file(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as file:
srt_content = file.read()
return srt_content
except FileNotFoundError:
print(f"The file {file_path} was not found.")
except Exception as e:
print(f"An error occurred: {e}")
def write_srt(subtitle_text, output_file="edited_srt.srt"):
with open(output_file+".srt", 'w', encoding="utf-8") as file:
file.write(subtitle_text)
return output_file+".srt"
def generate_translated_subtitle(language, segments, input_video_name):
input_video_name=input_video_name.split('/')[-1]
subtitle_file = f"{input_video_name}.srt"
text = ""
lines = segments.split('\n')
new_list = [item for item in lines if item != '']
segment_number = 1
for index, segment in enumerate(new_list):
if (index+1) % 3 == 1 or (index+1)==1:
text += f"{segment}\n"
segment_number += 1
if (index+1) % 3 == 2 or (index+1)==2:
text += segment + "\n"
if (index+1) % 3 == 0:
text += f"\u200F{segment}\n\n"
with open(subtitle_file, "w", encoding='utf8') as f:
f.write(text)
return subtitle_file
def clean_text(text):
# Remove 'srt ' from the start of each line
# Remove ''' from the start and end
text = re.sub(r"^```|```$", '', text)
text = re.sub(r'^srt', '', text, flags=re.MULTILINE)
return text
def split_srt_file(input_file, max_chars=3000):
# Read the contents of the SRT file
with open(input_file, 'r', encoding='utf-8') as file:
content = file.read()
# Split the content into individual subtitles
subtitles = content.strip().split('\n\n')
# Prepare to write the split files
output_files = []
current_file_content = ''
current_file_index = 1
for subtitle in subtitles:
# Check if adding this subtitle would exceed the character limit
if len(current_file_content) + len(subtitle) + 2 > max_chars: # +2 for \n\n
# Write the current file
output_file_name = f'split_{current_file_index}.srt'
with open(output_file_name, 'w', encoding='utf-8') as output_file:
output_file.write(current_file_content.strip())
output_files.append(output_file_name)
# Prepare for the next file
current_file_index += 1
current_file_content = subtitle + '\n\n'
else:
# If it fits, add the subtitle
current_file_content += subtitle + '\n\n'
# Write any remaining content to a new SRT file
if current_file_content:
output_file_name = f'split_{current_file_index}.srt'
with open(output_file_name, 'w', encoding='utf-8') as output_file:
output_file.write(current_file_content.strip())
output_files.append(output_file_name)
return output_files
def translate_text(api_key, source_lang, target_lang, text):
url = "https://api.one-api.ir/translate/v1/google/"
request_body = {"source": source_lang, "target": target_lang, "text": text}
headers = {"one-api-token": api_key, "Content-Type": "application/json"}
response = requests.post(url, headers=headers, json=request_body)
if response.status_code == 200:
result = response.json()
enhanced_text = enhance_text(api_key, text, result['result'])
return enhanced_text
else:
print(f"Error: {response.status_code}, {response.text}")
return None
def enhance_text(api_key, text):
url = "https://api.one-api.ir/chatbot/v1/gpt4o/"
# Prepare the request body
request_body = [{
"role": "user",
"content": "Translate the following English text into Persian, specifically for a voice-over. Prioritize a natural and engaging tone that resonates with a Persian-speaking audience. Make sure the translation is:Concise and Time-Conscious: It needs to be spoken fluently within approximately the same time it takes to say the original English. Avoid overly verbose phrasing.Clear and Understandable: Use vocabulary and sentence structures that are easily understood by a general Persian-speaking audience. Avoid jargon or overly technical terms unless absolutely necessary and always provide a culturally appropriate equivalent.Culturally Relevant: Adapt the translation to ensure it is culturally appropriate and resonates with Iranian sensibilities. Consider nuances of politeness, humor, and common expressions. Adapt phrases as necessary to better reflect the culture.Natural Sounding: Read the Persian translation aloud to ensure it sounds natural and flows well when spoken.Engaging: Use phrasing that keeps the listener interested and attentive. Consider using rhetorical devices or active voice where appropriate to make the delivery more impactful.[Optional: Include the intended audience demographics here, e.g., "Targeting a younger audience" or "Intended for a professional context". This helps tailor the language.][Optional: Mention the overall tone you're aiming for, e.g., "maintain a formal tone" or "use an informal and friendly tone".]Essentially, I need a translation that doesn't sound like a translation, but a message written originally in Persian that is easy to grasp and will keep the audience captivated.the text will send in next message .
},]
{
"role": "assistant",
"content": "okay"
},
{
"role": "user",
"content": text
}
]
# Add the API key to the request
headers = {
"one-api-token": api_key,
"Content-Type": "application/json"
}
# Make the POST request
attempts = 0
max_attempts = 3
while attempts < max_attempts:
response = requests.post(url, headers=headers, json=request_body)
if response.status_code == 200:
result = response.json()
if result["status"] == 200:
print("status: ", result["status"])
te = clean_text(result["result"][0])
print("result: ", te)
return te
else:
print(f"Error: status {result['status']}, retrying in 30 seconds...")
else:
print(f"Error: {response.status_code}, {response.text}, retrying in 30 seconds...")
attempts += 1
time.sleep(30)
print("Error Max attempts reached. Could not retrieve a successful response.")
return 0
def write_google(google_translate):
google = "google_translate.srt"
with open(google, 'a', encoding="utf-8") as f:
f.write(google_translate)
def time_to_seconds(time_obj):
return time_obj.hours * 3600 + time_obj.minutes * 60 + time_obj.seconds + time_obj.milliseconds / 1000
def create_subtitle_clips(subtitles, videosize, fontsize, font, color, debug):
subtitle_clips = []
color_clips=[]
for subtitle in subtitles:
start_time = time_to_seconds(subtitle.start) # Add 2 seconds offset
end_time = time_to_seconds(subtitle.end)
duration = end_time - start_time
video_width, video_height = videosize
max_width = video_width * 0.8
max_height = video_height * 0.2
text_clip = TextClip(font, subtitle.text, font_size=fontsize, size=(int(video_width * 0.8), int(video_height * 0.2)) ,text_align="center" ,color=color, method='caption').with_start(start_time).with_duration(duration)
myclip = ColorClip(size=(int(video_width * 0.8), int(video_height * 0.2)) , color=(0, 0, 0)).with_opacity(0.4).with_start(start_time).with_duration(duration)
subtitle_x_position = 'center'
subtitle_y_position = video_height * 0.68
text_position = (subtitle_x_position, subtitle_y_position)
subtitle_clips.append(text_clip.with_position(text_position))
color_clips.append(myclip.with_position(text_position))
return subtitle_clips, color_clips
def process_video(video, url, type):
if isinstance(video, str):
input_video = video.split("/")[-1]
print(input_video)
input_audio = extract_audio(video)
input_video_name = input_video.replace(".mp4", "")
else:
if type=="insta":
input_video=insta_oneapi(url, api_key)
input_video_name = input_video.replace(".mp4", "")
input_audio = extract_audio(input_video)
elif type=="youtube":
#input_video, input_audio = one_youtube(url, api_key)
input_video, title = yt_download(url)
input_video_name = input_video.replace(".mp4", "")
input_audio = extract_audio(input_video)
segments = transcribe(audio=input_audio)
language = "fa"
subtitle_file = generate_subtitle_file(language=language, segments=segments, input_video_name=input_video_name)
source_language = "en"
target_language = "fa"
#srt_string = read_srt_file(subtitle_file)
srt_files=split_srt_file(subtitle_file)
for i in srt_files:
srt_string = read_srt_file(f"{i}")
#google_translate = translate_text(api_key, source_language, target_language, srt_string)
google_translate = enhance_text(api_key, srt_string)
if google_translate == 0 :
google_translate = translate_text(api_key, source_language, target_language, srt_string)
write_google(google_translate)
time.sleep(15)
srt = read_srt_file("google_translate.srt")
os.remove("google_translate.srt")
return srt, video, input_audio
def video_edit(srt, input_video, input_audio= 'audio.mp3'):
input_video_name = input_video.replace(".mp4", "")
srt_name=generate_translated_subtitle("fa", srt, input_video_name)
return input_video, srt_name
""" input_video_name = input_video.replace(''.mp4', '')
video = VideoFileClip(input_video)
audio = AudioFileClip(input_audio)
video = video.with_audio(audio)
print(video)
output_video_file = input_video_name + '_subtitled' + '.mp4'
write_srt(srt)
subtitles = pysrt.open('edited_srt.srt', encoding='utf-8')
subtitle_clips = create_subtitle_clips(subtitles, video.size, 32, 'arial.ttf', 'white', False)
final_video = CompositeVideoClip([video] + subtitle_clips)
final_video.write_videofile(output_video_file, codec='libx264', audio_codec='aac', logger=None)
os.remove('google_translate.srt')
print('final')"""
with gr.Blocks() as demo:
gr.Markdown("Start typing below and then click **Run** to see the output.")
with gr.Row():
inp = gr.Textbox(placeholder="Enter URL or upload")
drp = gr.Dropdown(["insta", "youtube"])
btn = gr.Button("transcribe")
out = gr.Textbox(interactive=True)
video_file_input = gr.Video(label="Upload Video File")
video_path_output = gr.Textbox(label="Video Path", visible=False)
audio_path_output = gr.Textbox(label="Video Path", visible=False)
btn.click(fn=process_video, inputs=[video_file_input, inp, drp], outputs=[out, video_path_output, audio_path_output])
with gr.Row():
vid_out = gr.Video()
srt_file = gr.File()
btn2 = gr.Button("transcribe")
gr.on(
triggers=[btn2.click],
fn=write_google,
inputs=out,
).then(video_edit, [out, video_path_output, audio_path_output], outputs=[vid_out, srt_file])
demo.launch(debug=True) |