import os import torch import cv2 import instaloader from PIL import Image from transformers import AutoProcessor, AutoModelForCausalLM from typing import Optional, List, Dict, Union import streamlit as st def download_instagram_reels(hashtag, num_reels=1, username="your_username", password="your_password"): # Remove previous downloads if they exist os.system("rm -rf downloaded_reels") os.makedirs("downloaded_reels", exist_ok=True) loader = instaloader.Instaloader(download_videos=True, download_video_thumbnails=True, download_comments=True) try: # Login to Instagram loader.login(username, password) # Get posts by hashtag posts = instaloader.Hashtag.from_name(loader.context, hashtag).get_posts() reel_urls = [] for post in posts: if post.is_video: reel_urls.append(post.url) if len(reel_urls) >= num_reels: break for reel_url in reel_urls: shortcode = reel_url.split('/')[-2] post = instaloader.Post.from_shortcode(loader.context, shortcode) loader.download_post(post, target='downloaded_reels') # Find the video file name video_files = [f for f in os.listdir('downloaded_reels') if f.endswith('.mp4')] if not video_files: raise ValueError("No video file found in the downloaded reels.") return [os.path.join('downloaded_reels', video_files[i]) for i in range(0, len(video_files))], reel_urls except Exception as e: print(f"Error downloading reels: {e}") return [], [] def parse_query_with_groq( query: str, groq_api_key: str, seed: int = 42, llama_model: str = "llama-3.2-11b-text-preview" ) -> Optional[str]: """ Enhanced sentiment analysis with Groq API Args: query: Input text for sentiment analysis groq_api_key: API key for Groq seed: Random seed for reproducibility llama_model: Model identifier """ url = "https://api.groq.com/openai/v1/chat/completions" # Normalize query #query = ' '.join(query.lower().split()) headers = { "Authorization": f"Bearer {groq_api_key}", "Content-Type": "application/json" } system_message = """You are a precise sentiment analysis assistant. Analyze the user_prompt and provide a JSON-formatted list of objects, where each object contains: - sentiment_score: a float between -1 (very negative) and 1 (very positive) - frame_index: the corresponding frame index Strictly follow this JSON format: [ {"sentiment_score": , "frame_index": }, ... ] """ payload = { "model": llama_model, "response_format": { "type": "json_schema", "json_schema": { "type": "array", "items": { "type": "object", "properties": { "sentiment_score": {"type": "number"}, "frame_index": {"type": "integer"} }, "required": ["sentiment_score", "frame_index"] } } }, "messages": [ {"role": "system", "content": system_message}, {"role": "user", "content": query} ], "temperature": 0, "max_tokens": 300, "seed": seed } try: response = requests.post(url, headers=headers, json=payload, timeout=30) response.raise_for_status() print(f"DEBUG : Raw Response is {response}") parsed_response = response.json()['choices'][0]['message']['content'] print(f"DEBUG : Raw Response is {parsed_response}") return parsed_response except Exception as e: print(f"Sentiment Analysis Error: {e}") return None def extract_frames(video_path, output_folder, fps=1): # Create the output folder if it doesn't exist os.makedirs(output_folder, exist_ok=True) # Open the video file cap = cv2.VideoCapture(video_path) # Check if the video was opened successfully if not cap.isOpened(): print(f"Error: Could not open video file {video_path}") return # Get the frames per second of the video video_fps = cap.get(cv2.CAP_PROP_FPS) # Calculate the interval between frames to capture based on desired fps frame_interval = int(video_fps / fps) count = 0 frame_count = 0 time_stamps = [] while True: # Read a frame from the video ret, frame = cap.read() # Break the loop if there are no more frames if not ret: break # Save every 'frame_interval' frame if count % frame_interval == 0: frame_filename = os.path.join(output_folder, f"image{frame_count}.jpg") cv2.imwrite(frame_filename, frame) print(f"Extracted: {frame_filename}") frame_count += 1 time_stamps.append(count/video_fps) count += 1 # Release the video capture object cap.release() print("Frame extraction completed.") return frame_count, time_stamps def download_instagram_reel_old(reel_url, username="shivani.sharma2814@gmail.com", password="instagram@123"): # Remove previous downloads if they exist os.system("rm -rf downloaded_reels") os.makedirs("downloaded_reels", exist_ok=True) # Create an instance of Instaloader print(f"Creating instance of instaloader") loader = instaloader.Instaloader( download_videos=True, download_video_thumbnails=True, download_comments=True ) try: # Login to Instagram loader.login(username, password) # Extract the shortcode from the URL shortcode = reel_url.split('/')[-2] # Download the reel using the shortcode post = instaloader.Post.from_shortcode(loader.context, shortcode) loader.download_post(post, target='downloaded_reels') # Extract comments comments = post.get_comments() print(f"Comments are : {comments}") for comment in comments: print(f"{comment.owner.username}: {comment.text}") # Find the video file name video_files = [f for f in os.listdir('downloaded_reels') if f.endswith('.mp4')] if not video_files: raise ValueError("No video file found in the downloaded reels.") return os.path.join('downloaded_reels', video_files[0]) except Exception as e: print(f"Error downloading reel: {e}") return None def analyze_frames_with_florence(image_folder, timestamps): # Set up device and dtype device = "cuda:0" if torch.cuda.is_available() else "cpu" torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 # Load Florence-2 model model = AutoModelForCausalLM.from_pretrained( "microsoft/Florence-2-large", torch_dtype=torch_dtype, trust_remote_code=True ).to(device) processor = AutoProcessor.from_pretrained( "microsoft/Florence-2-large", trust_remote_code=True ) prompt = "" # Collect frame analysis results frame_analyses = [] # Iterate through all images in the specified folder N = len(os.listdir(image_folder)) # Count number of images in the folder for i in range(N): image_path = os.path.join(image_folder, f"image{i}.jpg") image = Image.open(image_path) inputs = processor(text=prompt, images=image, return_tensors="pt").to(device) generated_ids = model.generate( input_ids=inputs["input_ids"], pixel_values=inputs["pixel_values"], max_new_tokens=1024, num_beams=3, do_sample=False ) generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0] parsed_answer = processor.post_process_generation( generated_text, task=prompt, image_size=(image.width, image.height) ) frame_analyses.append({ 'Frame_Index': i, 'Caption': parsed_answer }) print(f"Frame {i}, TimeStamp {timestamps[i]} sec : {parsed_answer}") return frame_analyses def main(): # Specify the URL of the reel reel_url = "https://www.instagram.com/purnagummies/reel/C7RRVstqtwY/" fps = 0.5 # Download the reel st.title("BrandScan") hashtag = st.text_input("Enter the hashtag (without #):", "purnagummies") video_paths = [] if st.button("Download Reels"): if hashtag: with st.spinner("Downloading reels..."): video_paths, reel_urls = download_instagram_reels(hashtag) if reel_urls: st.success(f"Downloaded {len(video_paths)} reels:") for url in reel_urls: st.write(url) else: st.error("No reels found or an error occurred.") else: st.error("Please enter a valid hashtag.") #video_path = download_instagram_reel(reel_urls[0]) if len(video_paths) == 0: print("Failed to download the reel.") return #video_path video_path = video_paths[0] # Collect images from the video image_folder = "downloaded_reels/images" os.makedirs(image_folder, exist_ok=True) # Extract frames from the video N, timestamps = extract_frames(video_path, image_folder, fps) print(f"Analyzing video {video_path} with {N} frames extracted at {fps} frames per second") # Analyze frames with Florence-2 frame_analyses = analyze_frames_with_florence(image_folder, timestamps) # Optional: You can further process or store the frame_analyses as needed print("Frame analysis completed.") frame_analyses_str = "; \n" for item in frame_analyses: frame_analyses_str += item['Frame_Index'] + "; " + item['Caption'] + "\n" print(frame_analyses_str) sentiment_analysis = parse_query_with_groq(frame_analyses_str, os.getenv("GROQ_API_KEY")) print("Sentiment Analysis on the video:") print(sentiment_analysis) if __name__ == "__main__": main()