File size: 8,385 Bytes
4b4eb33 827013d b0c5434 4b4eb33 f39513f 304ad78 4b4eb33 304ad78 f39513f 304ad78 4b4eb33 f39513f 827013d 4b4eb33 f39513f 827013d f39513f 827013d 4b4eb33 827013d f39513f 827013d f39513f 827013d f39513f 827013d f39513f 827013d f39513f 827013d 4b4eb33 21e5246 304ad78 827013d 21e5246 304ad78 827013d f39513f 4b4eb33 f39513f 304ad78 827013d 4b4eb33 f39513f 827013d 4b4eb33 f39513f 4b4eb33 304ad78 f39513f 827013d 304ad78 f39513f 304ad78 f39513f 4b4eb33 f39513f 9050713 827013d f39513f 4b4eb33 f39513f 304ad78 f39513f 304ad78 f39513f 304ad78 f39513f 304ad78 f39513f 827013d 4b4eb33 f39513f 827013d f39513f c75ec8b b0c5434 f39513f c75ec8b 827013d f39513f 827013d 4b4eb33 f39513f 827013d 4b4eb33 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# Import necessary libraries
import gradio as gr
import pixeltable as pxt
import os
import getpass
from pixeltable.functions.video import extract_audio
from pixeltable.functions import openai as pxop
import openai
# Set up Pixeltable database and table
db_directory = "video_db"
table_name = "video_table"
# Define constants
MAX_VIDEO_SIZE_MB = 35
GPT_MODEL = "gpt-4o-mini-2024-07-18"
MAX_TOKENS = 500
WHISPER_MODEL = "whisper-1"
# Set OpenAI API key
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI API Key:")
# Clean up existing database and table if they exist
pxt.drop_dir("video_db", force=True)
if table_name in pxt.list_tables():
pxt.drop_table("video_db.video_table")
# Create or use existing directory and table
if db_directory not in pxt.list_dirs():
pxt.create_dir(db_directory)
else:
print(f"Directory {db_directory} already exists. Using the existing directory.")
if table_name not in pxt.list_tables():
t = pxt.create_table(
f"{db_directory}.{table_name}",
{
"video": pxt.VideoType(),
"video_filename": pxt.StringType(),
"sm_type": pxt.StringType(),
"sm_post": pxt.StringType(),
},
)
else:
t = pxt.load_table(f"{db_directory}.{table_name}")
print(f"Table {table_name} already exists. Using the existing table.")
# Function to generate social media post using OpenAI GPT-4 API
def generate_social_media_post(transcript_text, social_media_type):
response = openai.chat.completions.create(
model=GPT_MODEL,
messages=[
{
"role": "system",
"content": f"You are an expert in creating social media content for {social_media_type}.",
},
{
"role": "user",
"content": f"Generate an effective and casual social media post based on this video transcript below. Make it a viral and suitable post for {social_media_type}. Transcript:\n{transcript_text}.",
},
],
max_tokens=MAX_TOKENS,
)
return response.choices[0].message.content
# Function to process the uploaded video and generate the post
def process_and_generate_post(video_file, social_media_type):
if video_file:
try:
# Check video file size
video_size = os.path.getsize(video_file) / (1024 * 1024) # Convert to MB
if video_size > MAX_VIDEO_SIZE_MB:
return f"The video file is larger than {MAX_VIDEO_SIZE_MB} MB. Please upload a smaller file."
video_filename = os.path.basename(video_file)
tr_audio_gen_flag = True
sm_gen_flag = True
# Check if video already exists in the table
video_df = t.where(t.video_filename == video_filename).tail(1)
if t.select().where(t.video_filename == video_filename).count() >= 1:
tr_audio_gen_flag = False
# Check if video and social media type combination exists
video_type_df = t.where(
(t.video_filename == video_filename) & (t.sm_type == social_media_type)
).tail(1)
if video_type_df:
sm_gen_flag = False
# Insert video into PixelTable if it doesn't exist or if it's a new social media type
if (
(t.count() < 1)
or not (
t.select().where(t.video_filename == video_filename).count() >= 1
)
or (video_df and not video_type_df)
):
t.insert(
[
{
"video": video_file,
"video_filename": video_filename,
"sm_type": social_media_type,
"sm_post": "",
}
]
)
# Extract audio and transcribe if needed
if tr_audio_gen_flag:
if not t.get_column(name="audio"):
t["audio"] = extract_audio(t.video, format="mp3")
else:
t.audio = extract_audio(t.video, format="mp3")
print("########### processing transcription #############")
if not t.get_column(name="transcription"):
t["transcription"] = pxop.transcriptions(
t.audio, model=WHISPER_MODEL
)
else:
t.transcription = pxop.transcriptions(t.audio, model=WHISPER_MODEL)
# Get the current video data
filtered_df = t.where(
(t.video_filename == video_filename) & (t.sm_type == social_media_type)
).tail(1)
if len(filtered_df) == 0:
return "No matching video found in the table. Please ensure the video is uploaded correctly and try again."
cur_video_df = filtered_df[0]
plain_text = cur_video_df["transcription"]["text"]
# Generate or retrieve social media post
if (
t.select()
.where(
(t.video_filename == video_filename)
& (t.sm_type == social_media_type)
& (t.sm_post != "")
)
.count()
>= 1
):
print("retrieving existing social media post")
social_media_post = (
t.select(t.sm_post)
.where(
(t.sm_type == social_media_type)
& (t.video_filename == video_filename)
)
.collect()["sm_post"]
)
else:
print("generating new social media post")
social_media_post = generate_social_media_post(
plain_text, social_media_type
)
if sm_gen_flag:
cur_video_df.update({"sm_post": social_media_post})
return cur_video_df["sm_post"]
except Exception as e:
return f"An error occurred: {e}"
else:
return "Please upload a video file."
# Gradio Interface
def gradio_interface():
with gr.Blocks(theme=gr.themes.Glass()) as demo:
# Set up the UI components
gr.Markdown(
"""<center><font size=12>Video to Social Media Post Generator</center>"""
)
gr.Markdown(
"""<div align="center">
<img src="https://raw.githubusercontent.com/pixeltable/pixeltable/main/docs/source/data/pixeltable-logo-large.png" alt="Pixeltable" width="20%" />
"""
)
gr.Markdown(
"""<center><font size=6>Data Ops powered by <a href="https://github.com/pixeltable/pixeltable">Pixeltable</a></center>"""
)
gr.Markdown(
"""<center>Pixeltable is a Python library providing a declarative interface for multimodal data (text, images, audio, video). It features built-in versioning, lineage tracking, and incremental updates, enabling users to store, transform, index, and iterate on data for their ML workflows. Data transformations, model inference, and custom logic are embedded as computed columns.
</center>"""
)
video_input = gr.Video(label=f"Upload Video File (max {MAX_VIDEO_SIZE_MB} MB):",
include_audio = True,
max_length= 300,
height='400px')
social_media_type = gr.Dropdown(
choices=["X (Twitter)", "Facebook", "LinkedIn"],
label="Select Social Media Platform:",
value="X (Twitter)",
)
generate_btn = gr.Button("Generate Post", interactive= True)
output = gr.Textbox(label="Generated Social Media Post", show_copy_button=True)
examples = gr.Examples(
[["example1.mp4"], ["example2.mp4"]], inputs=[video_input]
)
# Connect the generate button to the processing function
generate_btn.click(
fn=process_and_generate_post,
inputs=[video_input, social_media_type],
outputs=[output],
)
return demo
# Launch the Gradio interface
gradio_interface().launch(show_api=False) |