Spaces:
Sleeping
Sleeping
import os | |
import torch | |
import cv2 | |
import instaloader | |
from PIL import Image | |
from transformers import AutoProcessor, AutoModelForCausalLM | |
from dotenv import load_dotenv | |
from typing import Optional, List, Dict, Union | |
import streamlit as st | |
def download_instagram_reels(hashtag, num_reels=1, username="your_username", password="your_password"): | |
# Remove previous downloads if they exist | |
os.system("rm -rf downloaded_reels") | |
os.makedirs("downloaded_reels", exist_ok=True) | |
loader = instaloader.Instaloader(download_videos=True, download_video_thumbnails=True, download_comments=True) | |
try: | |
# Login to Instagram | |
loader.login(username, password) | |
# Get posts by hashtag | |
posts = instaloader.Hashtag.from_name(loader.context, hashtag).get_posts() | |
reel_urls = [] | |
for post in posts: | |
if post.is_video: | |
reel_urls.append(post.url) | |
if len(reel_urls) >= num_reels: | |
break | |
for reel_url in reel_urls: | |
shortcode = reel_url.split('/')[-2] | |
post = instaloader.Post.from_shortcode(loader.context, shortcode) | |
loader.download_post(post, target='downloaded_reels') | |
# Find the video file name | |
video_files = [f for f in os.listdir('downloaded_reels') if f.endswith('.mp4')] | |
if not video_files: | |
raise ValueError("No video file found in the downloaded reels.") | |
return [os.path.join('downloaded_reels', video_files[i]) for i in range(0, len(video_files))], reel_urls | |
except Exception as e: | |
print(f"Error downloading reels: {e}") | |
return [], [] | |
def parse_query_with_groq( | |
query: str, | |
groq_api_key: str, | |
seed: int = 42, | |
llama_model: str = "llama-3.2-11b-text-preview" | |
) -> Optional[str]: | |
""" | |
Enhanced sentiment analysis with Groq API | |
Args: | |
query: Input text for sentiment analysis | |
groq_api_key: API key for Groq | |
seed: Random seed for reproducibility | |
llama_model: Model identifier | |
""" | |
url = "https://api.groq.com/openai/v1/chat/completions" | |
# Normalize query | |
#query = ' '.join(query.lower().split()) | |
headers = { | |
"Authorization": f"Bearer {groq_api_key}", | |
"Content-Type": "application/json" | |
} | |
system_message = """You are a precise sentiment analysis assistant. | |
Analyze the user_prompt and provide a JSON-formatted list of objects, where each object contains: | |
- sentiment_score: a float between -1 (very negative) and 1 (very positive) | |
- frame_index: the corresponding frame index | |
Strictly follow this JSON format: | |
[ | |
{"sentiment_score": <float>, "frame_index": <int>}, | |
... | |
] | |
""" | |
payload = { | |
"model": llama_model, | |
"response_format": { | |
"type": "json_schema", | |
"json_schema": { | |
"type": "array", | |
"items": { | |
"type": "object", | |
"properties": { | |
"sentiment_score": {"type": "number"}, | |
"frame_index": {"type": "integer"} | |
}, | |
"required": ["sentiment_score", "frame_index"] | |
} | |
} | |
}, | |
"messages": [ | |
{"role": "system", "content": system_message}, | |
{"role": "user", "content": query} | |
], | |
"temperature": 0, | |
"max_tokens": 300, | |
"seed": seed | |
} | |
try: | |
response = requests.post(url, headers=headers, json=payload, timeout=30) | |
response.raise_for_status() | |
print(f"DEBUG : Raw Response is {response}") | |
parsed_response = response.json()['choices'][0]['message']['content'] | |
print(f"DEBUG : Raw Response is {parsed_response}") | |
return parsed_response | |
except Exception as e: | |
print(f"Sentiment Analysis Error: {e}") | |
return None | |
def extract_frames(video_path, output_folder, fps=1): | |
# Create the output folder if it doesn't exist | |
os.makedirs(output_folder, exist_ok=True) | |
# Open the video file | |
cap = cv2.VideoCapture(video_path) | |
# Check if the video was opened successfully | |
if not cap.isOpened(): | |
print(f"Error: Could not open video file {video_path}") | |
return | |
# Get the frames per second of the video | |
video_fps = cap.get(cv2.CAP_PROP_FPS) | |
# Calculate the interval between frames to capture based on desired fps | |
frame_interval = int(video_fps / fps) | |
count = 0 | |
frame_count = 0 | |
time_stamps = [] | |
while True: | |
# Read a frame from the video | |
ret, frame = cap.read() | |
# Break the loop if there are no more frames | |
if not ret: | |
break | |
# Save every 'frame_interval' frame | |
if count % frame_interval == 0: | |
frame_filename = os.path.join(output_folder, f"image{frame_count}.jpg") | |
cv2.imwrite(frame_filename, frame) | |
print(f"Extracted: {frame_filename}") | |
frame_count += 1 | |
time_stamps.append(count/video_fps) | |
count += 1 | |
# Release the video capture object | |
cap.release() | |
print("Frame extraction completed.") | |
return frame_count, time_stamps | |
def download_instagram_reel_old(reel_url, username="shivani.sharma2814@gmail.com", password="instagram@123"): | |
# Remove previous downloads if they exist | |
os.system("rm -rf downloaded_reels") | |
os.makedirs("downloaded_reels", exist_ok=True) | |
# Create an instance of Instaloader | |
print(f"Creating instance of instaloader") | |
loader = instaloader.Instaloader( | |
download_videos=True, | |
download_video_thumbnails=True, | |
download_comments=True | |
) | |
try: | |
# Login to Instagram | |
loader.login(username, password) | |
# Extract the shortcode from the URL | |
shortcode = reel_url.split('/')[-2] | |
# Download the reel using the shortcode | |
post = instaloader.Post.from_shortcode(loader.context, shortcode) | |
loader.download_post(post, target='downloaded_reels') | |
# Extract comments | |
comments = post.get_comments() | |
print(f"Comments are : {comments}") | |
for comment in comments: | |
print(f"{comment.owner.username}: {comment.text}") | |
# Find the video file name | |
video_files = [f for f in os.listdir('downloaded_reels') if f.endswith('.mp4')] | |
if not video_files: | |
raise ValueError("No video file found in the downloaded reels.") | |
return os.path.join('downloaded_reels', video_files[0]) | |
except Exception as e: | |
print(f"Error downloading reel: {e}") | |
return None | |
def analyze_frames_with_florence(image_folder, timestamps): | |
# Set up device and dtype | |
device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
# Load Florence-2 model | |
model = AutoModelForCausalLM.from_pretrained( | |
"microsoft/Florence-2-large", | |
torch_dtype=torch_dtype, | |
trust_remote_code=True | |
).to(device) | |
processor = AutoProcessor.from_pretrained( | |
"microsoft/Florence-2-large", | |
trust_remote_code=True | |
) | |
prompt = "<DETAILED_CAPTION>" | |
# Collect frame analysis results | |
frame_analyses = [] | |
# Iterate through all images in the specified folder | |
N = len(os.listdir(image_folder)) # Count number of images in the folder | |
for i in range(N): | |
image_path = os.path.join(image_folder, f"image{i}.jpg") | |
image = Image.open(image_path) | |
inputs = processor(text=prompt, images=image, return_tensors="pt").to(device) | |
generated_ids = model.generate( | |
input_ids=inputs["input_ids"], | |
pixel_values=inputs["pixel_values"], | |
max_new_tokens=1024, | |
num_beams=3, | |
do_sample=False | |
) | |
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0] | |
parsed_answer = processor.post_process_generation( | |
generated_text, | |
task=prompt, | |
image_size=(image.width, image.height) | |
) | |
frame_analyses.append({ | |
'Frame_Index': i, | |
'Caption': parsed_answer | |
}) | |
print(f"Frame {i}, TimeStamp {timestamps[i]} sec : {parsed_answer}") | |
return frame_analyses | |
def main(): | |
# Specify the URL of the reel | |
reel_url = "https://www.instagram.com/purnagummies/reel/C7RRVstqtwY/" | |
fps = 0.5 | |
# Download the reel | |
st.title("BrandScan") | |
hashtag = st.text_input("Enter the hashtag (without #):", "purnagummies") | |
if st.button("Download Reels"): | |
if hashtag: | |
with st.spinner("Downloading reels..."): | |
video_paths, reel_urls = download_instagram_reels(hashtag) | |
if reel_urls: | |
st.success(f"Downloaded {len(video_paths)} reels:") | |
for url in reel_urls: | |
st.write(url) | |
else: | |
st.error("No reels found or an error occurred.") | |
else: | |
st.error("Please enter a valid hashtag.") | |
#video_path = download_instagram_reel(reel_urls[0]) | |
if len(video_paths) == 0: | |
print("Failed to download the reel.") | |
return | |
#video_path | |
video_path = video_paths[0] | |
# Collect images from the video | |
image_folder = "downloaded_reels/images" | |
os.makedirs(image_folder, exist_ok=True) | |
# Extract frames from the video | |
N, timestamps = extract_frames(video_path, image_folder, fps) | |
print(f"Analyzing video {video_path} with {N} frames extracted at {fps} frames per second") | |
# Analyze frames with Florence-2 | |
frame_analyses = analyze_frames_with_florence(image_folder, timestamps) | |
# Optional: You can further process or store the frame_analyses as needed | |
print("Frame analysis completed.") | |
frame_analyses_str = "<Frame_Index>; <Description>\n" | |
for item in frame_analyses: | |
frame_analyses_str += item['Frame_Index'] + "; " + item['Caption'] + "\n" | |
print(frame_analyses_str) | |
sentiment_analysis = parse_query_with_groq(frame_analyses_str, os.getenv("GROQ_API_KEY")) | |
print("Sentiment Analysis on the video:") | |
print(sentiment_analysis) | |
if __name__ == "__main__": | |
main() |