|
|
|
import gradio as gr |
|
import pixeltable as pxt |
|
import os |
|
import getpass |
|
from pixeltable.functions.video import extract_audio |
|
from pixeltable.functions import openai as pxop |
|
import openai |
|
|
|
|
|
|
|
db_directory = "video_db" |
|
table_name = "video_table" |
|
|
|
|
|
MAX_VIDEO_SIZE_MB = 35 |
|
GPT_MODEL = "gpt-4o-mini-2024-07-18" |
|
MAX_TOKENS = 500 |
|
WHISPER_MODEL = "whisper-1" |
|
|
|
|
|
if "OPENAI_API_KEY" not in os.environ: |
|
os.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI API Key:") |
|
|
|
|
|
pxt.drop_dir("video_db", force=True) |
|
if table_name in pxt.list_tables(): |
|
pxt.drop_table("video_db.video_table") |
|
|
|
|
|
if db_directory not in pxt.list_dirs(): |
|
pxt.create_dir(db_directory) |
|
else: |
|
print(f"Directory {db_directory} already exists. Using the existing directory.") |
|
|
|
if table_name not in pxt.list_tables(): |
|
t = pxt.create_table( |
|
f"{db_directory}.{table_name}", |
|
{ |
|
"video": pxt.VideoType(), |
|
"video_filename": pxt.StringType(), |
|
"sm_type": pxt.StringType(), |
|
"sm_post": pxt.StringType(), |
|
}, |
|
) |
|
else: |
|
t = pxt.load_table(f"{db_directory}.{table_name}") |
|
print(f"Table {table_name} already exists. Using the existing table.") |
|
|
|
|
|
def generate_social_media_post(transcript_text, social_media_type): |
|
response = openai.chat.completions.create( |
|
model=GPT_MODEL, |
|
messages=[ |
|
{ |
|
"role": "system", |
|
"content": f"You are an expert in creating social media content for {social_media_type}.", |
|
}, |
|
{ |
|
"role": "user", |
|
"content": f"Generate an effective and casual social media post based on this video transcript below. Make it a viral and suitable post for {social_media_type}. Transcript:\n{transcript_text}.", |
|
}, |
|
], |
|
max_tokens=MAX_TOKENS, |
|
) |
|
return response.choices[0].message.content |
|
|
|
|
|
def process_and_generate_post(video_file, social_media_type): |
|
if video_file: |
|
try: |
|
|
|
video_size = os.path.getsize(video_file) / (1024 * 1024) |
|
if video_size > MAX_VIDEO_SIZE_MB: |
|
return f"The video file is larger than {MAX_VIDEO_SIZE_MB} MB. Please upload a smaller file." |
|
|
|
video_filename = os.path.basename(video_file) |
|
tr_audio_gen_flag = True |
|
sm_gen_flag = True |
|
|
|
|
|
video_df = t.where(t.video_filename == video_filename).tail(1) |
|
if t.select().where(t.video_filename == video_filename).count() >= 1: |
|
tr_audio_gen_flag = False |
|
|
|
|
|
video_type_df = t.where( |
|
(t.video_filename == video_filename) & (t.sm_type == social_media_type) |
|
).tail(1) |
|
if video_type_df: |
|
sm_gen_flag = False |
|
|
|
|
|
if ( |
|
(t.count() < 1) |
|
or not ( |
|
t.select().where(t.video_filename == video_filename).count() >= 1 |
|
) |
|
or (video_df and not video_type_df) |
|
): |
|
t.insert( |
|
[ |
|
{ |
|
"video": video_file, |
|
"video_filename": video_filename, |
|
"sm_type": social_media_type, |
|
"sm_post": "", |
|
} |
|
] |
|
) |
|
|
|
|
|
if tr_audio_gen_flag: |
|
if not t.get_column(name="audio"): |
|
t["audio"] = extract_audio(t.video, format="mp3") |
|
else: |
|
t.audio = extract_audio(t.video, format="mp3") |
|
|
|
print("########### processing transcription #############") |
|
|
|
if not t.get_column(name="transcription"): |
|
t["transcription"] = pxop.transcriptions( |
|
t.audio, model=WHISPER_MODEL |
|
) |
|
else: |
|
t.transcription = pxop.transcriptions(t.audio, model=WHISPER_MODEL) |
|
|
|
|
|
filtered_df = t.where( |
|
(t.video_filename == video_filename) & (t.sm_type == social_media_type) |
|
).tail(1) |
|
|
|
if len(filtered_df) == 0: |
|
return "No matching video found in the table. Please ensure the video is uploaded correctly and try again." |
|
|
|
cur_video_df = filtered_df[0] |
|
plain_text = cur_video_df["transcription"]["text"] |
|
|
|
|
|
if ( |
|
t.select() |
|
.where( |
|
(t.video_filename == video_filename) |
|
& (t.sm_type == social_media_type) |
|
& (t.sm_post != "") |
|
) |
|
.count() |
|
>= 1 |
|
): |
|
print("retrieving existing social media post") |
|
social_media_post = ( |
|
t.select(t.sm_post) |
|
.where( |
|
(t.sm_type == social_media_type) |
|
& (t.video_filename == video_filename) |
|
) |
|
.collect()["sm_post"] |
|
) |
|
else: |
|
print("generating new social media post") |
|
social_media_post = generate_social_media_post( |
|
plain_text, social_media_type |
|
) |
|
if sm_gen_flag: |
|
cur_video_df.update({"sm_post": social_media_post}) |
|
|
|
return cur_video_df["sm_post"] |
|
|
|
except Exception as e: |
|
return f"An error occurred: {e}" |
|
else: |
|
return "Please upload a video file." |
|
|
|
|
|
def gradio_interface(): |
|
with gr.Blocks(theme=gr.themes.Glass()) as demo: |
|
|
|
gr.Markdown( |
|
"""<center><font size=12>Video to Social Media Post Generator</center>""" |
|
) |
|
gr.Markdown( |
|
"""<div align="center"> |
|
<img src="https://raw.githubusercontent.com/pixeltable/pixeltable/main/docs/source/data/pixeltable-logo-large.png" alt="Pixeltable" width="20%" /> |
|
""" |
|
) |
|
gr.Markdown( |
|
"""<center><font size=6>Data Ops powered by <a href="https://github.com/pixeltable/pixeltable">Pixeltable</a></center>""" |
|
) |
|
gr.Markdown( |
|
"""<center>Pixeltable is a Python library providing a declarative interface for multimodal data (text, images, audio, video). It features built-in versioning, lineage tracking, and incremental updates, enabling users to store, transform, index, and iterate on data for their ML workflows. Data transformations, model inference, and custom logic are embedded as computed columns. |
|
</center>""" |
|
) |
|
video_input = gr.Video(label=f"Upload Video File (max {MAX_VIDEO_SIZE_MB} MB):", |
|
include_audio = True, |
|
max_length= 300, |
|
height='400px') |
|
social_media_type = gr.Dropdown( |
|
choices=["X (Twitter)", "Facebook", "LinkedIn"], |
|
label="Select Social Media Platform:", |
|
value="X (Twitter)", |
|
) |
|
generate_btn = gr.Button("Generate Post", interactive= True) |
|
|
|
output = gr.Textbox(label="Generated Social Media Post", show_copy_button=True) |
|
|
|
examples = gr.Examples( |
|
[["example1.mp4"], ["example2.mp4"]], inputs=[video_input] |
|
) |
|
|
|
|
|
generate_btn.click( |
|
fn=process_and_generate_post, |
|
inputs=[video_input, social_media_type], |
|
outputs=[output], |
|
) |
|
|
|
return demo |
|
|
|
|
|
gradio_interface().launch(show_api=False) |