import streamlit as st import pandas as pd import re import requests import urllib.request from PIL import Image from transformers import pipeline import tempfile import cv2 import io import yt_dlp import os # Add a styled disclaimer at the top st.markdown( """
**Disclaimer:** You are recommended to give any images and videos from your local device. In case of URLs, give the url of the website's image from chrome by copying image address. And give the URL of twitter videos for video captioning by URL.
""", unsafe_allow_html=True ) # Load the Salesforce BLIP model for image captioning captioning_model = pipeline("image-to-text", model="Salesforce/blip-image-captioning-base") # Load the summarization model for summarizing captions summarizer = pipeline("summarization", model="facebook/bart-large-cnn") # Function to extract URLs from a text def extract_urls(text): url_pattern = re.compile(r'https?://\S+') return url_pattern.findall(text) # Function to fetch image from URL def fetch_image_from_url(url): try: response = urllib.request.urlopen(url) image_data = response.read() image = Image.open(io.BytesIO(image_data)) return image except Exception as e: return None # Function to convert video to 30 FPS def convert_video_to_30fps(video_path): cap = cv2.VideoCapture(video_path) fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Output format fps = 30 # Desired FPS width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Temporary file to save the 30 FPS video converted_video_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name out = cv2.VideoWriter(converted_video_path, fourcc, fps, (width, height)) while True: ret, frame = cap.read() if not ret: break out.write(frame) # Write the frame into the new video cap.release() out.release() return converted_video_path # Function to extract frames from a 30 FPS video at 1-second intervals def extract_frames(video_stream): frames = [] with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_video_file: temp_video_file.write(video_stream.read()) temp_video_file_path = temp_video_file.name # Convert video to 30 FPS converted_video_path = convert_video_to_30fps(temp_video_file_path) cap = cv2.VideoCapture(converted_video_path) fps = cap.get(cv2.CAP_PROP_FPS) # This should now be 30 FPS frame_interval = int(fps) # Frame interval for 1 second while True: success, frame = cap.read() if not success: break current_frame_number = int(cap.get(cv2.CAP_PROP_POS_FRAMES)) if current_frame_number % frame_interval == 0: # Extract one frame per second frames.append(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))) cap.release() return frames # Function to generate captions for a list of frames def generate_captions(frames): captions = [] for frame in frames: caption = captioning_model(frame) if caption and 'generated_text' in caption[0]: captions.append(caption[0]['generated_text']) return captions # Function to generate caption for a single image def generate_caption_for_image(image): caption = captioning_model(image) if caption and 'generated_text' in caption[0]: return caption[0]['generated_text'] return "No caption generated." # Function to summarize the captions def summarize_captions(captions): combined_captions = " ".join(captions) summary = summarizer(combined_captions, max_length=150, min_length=30, do_sample=False) return summary[0]['summary_text'] # Function to download Twitter video using yt-dlp def download_twitter_video(url): url = url.replace("x.com", "twitter.com") # Convert the URL if needed ydl_opts = { 'format': 'best', 'outtmpl': 'downloaded_video.%(ext)s', 'quiet': True, 'noplaylist': True, } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info_dict = ydl.extract_info(url, download=False) video_url = info_dict.get("url", None) response = requests.get(video_url) if response.status_code == 200: return io.BytesIO(response.content) else: return None except Exception as e: st.error(f"An error occurred: {e}") return None # Function to process URLs in a DataFrame def process_urls_in_dataframe(df): results = [] for index, row in df.iterrows(): for cell in row: if pd.notna(cell): urls = extract_urls(str(cell)) for url in urls: if url.startswith("https://x.com"): st.write(f"Processing video URL: {url}") video_stream = download_twitter_video(url) if video_stream: frames = extract_frames(video_stream) if frames: captions = generate_captions(frames) summary = summarize_captions(captions) results.append({"URL": url, "Caption": summary}) save_results_to_csv(results) else: st.error(f"Failed to extract frames from video: {url}") else: st.error(f"Failed to fetch video: {url}") else: st.write(f"Processing image URL: {url}") image = fetch_image_from_url(url) if image: caption = generate_caption_for_image(image) results.append({"URL": url, "Caption": caption}) save_results_to_csv(results) return results # Function to save results to a CSV file def save_results_to_csv(results): file_path = "captions_results.csv" df = pd.DataFrame(results) if not os.path.isfile(file_path): df.to_csv(file_path, index=False, mode='w', header=True) else: df.to_csv(file_path, index=False, mode='a', header=False) # Streamlit app st.title("Captioning Application") # Section to process uploaded CSV or Excel files st.subheader("Process URLs from File") uploaded_file = st.file_uploader("Upload a CSV or Excel file", type=["csv", "xlsx"]) if uploaded_file is not None: st.write("Processing file...") if uploaded_file.name.endswith("csv"): df = pd.read_csv(uploaded_file) else: df = pd.read_excel(uploaded_file) results = process_urls_in_dataframe(df) if results: st.write(f"Processed {len(results)} URLs from the file.") st.write("Results saved to captions_results.csv") else: st.write("No URLs found or processed.") # Section to process URLs for images and videos st.subheader("Process URLs Directly") # Upload image URL image_url = st.text_input("Enter Image URL:") if image_url: st.write(f"Processing Image URL: {image_url}") image = fetch_image_from_url(image_url) if image: caption = generate_caption_for_image(image) st.image(image, caption="Uploaded Image", use_column_width=True) st.write(f"Caption: {caption}") # Collect results in a list of dictionaries results = [{"URL": image_url, "Caption": caption}] # Save the results to the CSV file save_results_to_csv(results) st.success("Results saved to captions_results.csv") # Upload video URL video_url = st.text_input("Enter Video URL:") if video_url: st.write(f"Processing Video URL: {video_url}") if video_url.startswith("https://x.com"): video_stream = download_twitter_video(video_url) if video_stream: frames = extract_frames(video_stream) if frames: captions = generate_captions(frames) summary = summarize_captions(captions) st.write(f"Caption: {summary}") # Collect results in a list of dictionaries results = [{"URL": video_url, "Caption": summary}] # Save the results to the CSV file save_results_to_csv(results) st.success("Results saved to captions_results.csv") else: st.error("Failed to extract frames from video.") else: st.error("Failed to fetch video.") else: st.error("Only Twitter video URLs are supported.") # Section to process local files st.subheader("Process Local Files") uploaded_local_file = st.file_uploader("Upload a local image or video file", type=["jpg", "jpeg", "png", "mp4"]) if uploaded_local_file is not None: if uploaded_local_file.type.startswith("image"): image = Image.open(uploaded_local_file) caption = generate_caption_for_image(image) st.image(image, caption="Uploaded Image", use_column_width=True) st.write(f"Caption: {caption}") elif uploaded_local_file.type.startswith("video"): video_stream = io.BytesIO(uploaded_local_file.read()) frames = extract_frames(video_stream) if frames: captions = generate_captions(frames) summary = summarize_captions(captions) st.video(uploaded_local_file) st.write(f"Summary of Captions: {summary}") else: st.error("Failed to extract frames from video.") st.write("Upload a file or enter a URL to start processing.")