Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# (c) Shrimadhav U K | |
# the logging things | |
import logging | |
logging.basicConfig(level=logging.DEBUG, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
import asyncio | |
import os | |
import time | |
from hachoir.metadata import extractMetadata | |
from hachoir.parser import createParser | |
async def place_water_mark(input_file, output_file, water_mark_file): | |
watermarked_file = output_file + ".watermark.png" | |
metadata = extractMetadata(createParser(input_file)) | |
width = metadata.get("width") | |
# https://stackoverflow.com/a/34547184/4723940 | |
shrink_watermark_file_genertor_command = [ | |
"ffmpeg", | |
"-i", water_mark_file, | |
"-y -v quiet", | |
"-vf", | |
"scale={}*0.5:-1".format(width), | |
watermarked_file | |
] | |
# print(shrink_watermark_file_genertor_command) | |
process = await asyncio.create_subprocess_exec( | |
*shrink_watermark_file_genertor_command, | |
# stdout must a pipe to be accessible as process.stdout | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE, | |
) | |
# Wait for the subprocess to finish | |
stdout, stderr = await process.communicate() | |
e_response = stderr.decode().strip() | |
t_response = stdout.decode().strip() | |
commands_to_execute = [ | |
"ffmpeg", | |
"-i", input_file, | |
"-i", watermarked_file, | |
"-filter_complex", | |
# https://stackoverflow.com/a/16235519 | |
# "\"[0:0] scale=400:225 [wm]; [wm][1:0] overlay=305:0 [out]\"", | |
# "-map \"[out]\" -b:v 896k -r 20 -an ", | |
"\"overlay=(main_w-overlay_w):(main_h-overlay_h)\"", | |
# "-vf \"drawtext=text='@FFMovingPictureExpertGroupBOT':x=W-(W/2):y=H-(H/2):fontfile=" + Config.FONT_FILE + ":fontsize=12:fontcolor=white:shadowcolor=black:shadowx=5:shadowy=5\"", | |
output_file | |
] | |
# print(commands_to_execute) | |
process = await asyncio.create_subprocess_exec( | |
*commands_to_execute, | |
# stdout must a pipe to be accessible as process.stdout | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE, | |
) | |
# Wait for the subprocess to finish | |
stdout, stderr = await process.communicate() | |
e_response = stderr.decode().strip() | |
t_response = stdout.decode().strip() | |
return output_file | |
async def take_screen_shot(video_file, output_directory, ttl): | |
# https://stackoverflow.com/a/13891070/4723940 | |
out_put_file_name = output_directory + \ | |
"/" + str(time.time()) + ".jpg" | |
file_genertor_command = [ | |
"ffmpeg", | |
"-ss", | |
str(ttl), | |
"-i", | |
video_file, | |
"-vframes", | |
"1", | |
out_put_file_name | |
] | |
# width = "90" | |
process = await asyncio.create_subprocess_exec( | |
*file_genertor_command, | |
# stdout must a pipe to be accessible as process.stdout | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE, | |
) | |
# Wait for the subprocess to finish | |
stdout, stderr = await process.communicate() | |
e_response = stderr.decode().strip() | |
t_response = stdout.decode().strip() | |
if os.path.lexists(out_put_file_name): | |
return out_put_file_name | |
else: | |
return None | |
# https://github.com/Nekmo/telegram-upload/blob/master/telegram_upload/video.py#L26 | |
async def cult_small_video(video_file, output_directory, start_time, end_time): | |
# https://stackoverflow.com/a/13891070/4723940 | |
out_put_file_name = output_directory + \ | |
"/" + str(round(time.time())) + ".mp4" | |
file_genertor_command = [ | |
"ffmpeg", | |
"-i", | |
video_file, | |
"-ss", | |
start_time, | |
"-to", | |
end_time, | |
"-async", | |
"1", | |
"-strict", | |
"-2", | |
out_put_file_name | |
] | |
process = await asyncio.create_subprocess_exec( | |
*file_genertor_command, | |
# stdout must a pipe to be accessible as process.stdout | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE, | |
) | |
# Wait for the subprocess to finish | |
stdout, stderr = await process.communicate() | |
e_response = stderr.decode().strip() | |
t_response = stdout.decode().strip() | |
if os.path.lexists(out_put_file_name): | |
return out_put_file_name | |
else: | |
return None | |
async def generate_screen_shots( | |
video_file, | |
output_directory, | |
is_watermarkable, | |
wf, | |
min_duration, | |
no_of_photos | |
): | |
metadata = extractMetadata(createParser(video_file)) | |
duration = 0 | |
if metadata is not None: | |
if metadata.has("duration"): | |
duration = metadata.get('duration').seconds | |
if duration > min_duration: | |
images = [] | |
ttl_step = duration // no_of_photos | |
current_ttl = ttl_step | |
for looper in range(0, no_of_photos): | |
ss_img = await take_screen_shot(video_file, output_directory, current_ttl) | |
current_ttl = current_ttl + ttl_step | |
if is_watermarkable: | |
ss_img = await place_water_mark(ss_img, output_directory + "/" + str(time.time()) + ".jpg", wf) | |
images.append(ss_img) | |
return images | |
else: | |
return None | |