Adv_bot / helper_funcs /help_Nekmo_ffmpeg.py
imseldrith's picture
Upload 3 files
2c82039
raw
history blame contribute delete
No virus
5.23 kB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) Shrimadhav U K
# the logging things
import logging
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
import asyncio
import os
import time
from hachoir.metadata import extractMetadata
from hachoir.parser import createParser
async def place_water_mark(input_file, output_file, water_mark_file):
watermarked_file = output_file + ".watermark.png"
metadata = extractMetadata(createParser(input_file))
width = metadata.get("width")
# https://stackoverflow.com/a/34547184/4723940
shrink_watermark_file_genertor_command = [
"ffmpeg",
"-i", water_mark_file,
"-y -v quiet",
"-vf",
"scale={}*0.5:-1".format(width),
watermarked_file
]
# print(shrink_watermark_file_genertor_command)
process = await asyncio.create_subprocess_exec(
*shrink_watermark_file_genertor_command,
# stdout must a pipe to be accessible as process.stdout
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Wait for the subprocess to finish
stdout, stderr = await process.communicate()
e_response = stderr.decode().strip()
t_response = stdout.decode().strip()
commands_to_execute = [
"ffmpeg",
"-i", input_file,
"-i", watermarked_file,
"-filter_complex",
# https://stackoverflow.com/a/16235519
# "\"[0:0] scale=400:225 [wm]; [wm][1:0] overlay=305:0 [out]\"",
# "-map \"[out]\" -b:v 896k -r 20 -an ",
"\"overlay=(main_w-overlay_w):(main_h-overlay_h)\"",
# "-vf \"drawtext=text='@FFMovingPictureExpertGroupBOT':x=W-(W/2):y=H-(H/2):fontfile=" + Config.FONT_FILE + ":fontsize=12:fontcolor=white:shadowcolor=black:shadowx=5:shadowy=5\"",
output_file
]
# print(commands_to_execute)
process = await asyncio.create_subprocess_exec(
*commands_to_execute,
# stdout must a pipe to be accessible as process.stdout
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Wait for the subprocess to finish
stdout, stderr = await process.communicate()
e_response = stderr.decode().strip()
t_response = stdout.decode().strip()
return output_file
async def take_screen_shot(video_file, output_directory, ttl):
# https://stackoverflow.com/a/13891070/4723940
out_put_file_name = output_directory + \
"/" + str(time.time()) + ".jpg"
file_genertor_command = [
"ffmpeg",
"-ss",
str(ttl),
"-i",
video_file,
"-vframes",
"1",
out_put_file_name
]
# width = "90"
process = await asyncio.create_subprocess_exec(
*file_genertor_command,
# stdout must a pipe to be accessible as process.stdout
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Wait for the subprocess to finish
stdout, stderr = await process.communicate()
e_response = stderr.decode().strip()
t_response = stdout.decode().strip()
if os.path.lexists(out_put_file_name):
return out_put_file_name
else:
return None
# https://github.com/Nekmo/telegram-upload/blob/master/telegram_upload/video.py#L26
async def cult_small_video(video_file, output_directory, start_time, end_time):
# https://stackoverflow.com/a/13891070/4723940
out_put_file_name = output_directory + \
"/" + str(round(time.time())) + ".mp4"
file_genertor_command = [
"ffmpeg",
"-i",
video_file,
"-ss",
start_time,
"-to",
end_time,
"-async",
"1",
"-strict",
"-2",
out_put_file_name
]
process = await asyncio.create_subprocess_exec(
*file_genertor_command,
# stdout must a pipe to be accessible as process.stdout
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Wait for the subprocess to finish
stdout, stderr = await process.communicate()
e_response = stderr.decode().strip()
t_response = stdout.decode().strip()
if os.path.lexists(out_put_file_name):
return out_put_file_name
else:
return None
async def generate_screen_shots(
video_file,
output_directory,
is_watermarkable,
wf,
min_duration,
no_of_photos
):
metadata = extractMetadata(createParser(video_file))
duration = 0
if metadata is not None:
if metadata.has("duration"):
duration = metadata.get('duration').seconds
if duration > min_duration:
images = []
ttl_step = duration // no_of_photos
current_ttl = ttl_step
for looper in range(0, no_of_photos):
ss_img = await take_screen_shot(video_file, output_directory, current_ttl)
current_ttl = current_ttl + ttl_step
if is_watermarkable:
ss_img = await place_water_mark(ss_img, output_directory + "/" + str(time.time()) + ".jpg", wf)
images.append(ss_img)
return images
else:
return None