Spaces:
Sleeping
Sleeping
from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, load_tool, tool | |
import datetime | |
import pytz | |
import yaml | |
from tools.final_answer import FinalAnswerTool | |
import instaloader | |
import os | |
from PIL import Image | |
import aiohttp | |
import asyncio | |
from aiofiles import open as aio_open | |
from qwen_vl_utils import process_vision_info | |
from transformers import Qwen2_5_VLForConditionalGeneration, AutoTokenizer, AutoProcessor | |
import torch | |
from Gradio_UI import GradioUI | |
def my_insta_analizer(username: str) -> str: #it's import to specify the return type | |
"""A tool that analyzes your Instagram profile and describes how you appear to people seeing it for the first time. | |
Args: | |
username: The Instagram username of the person whose profile needs to be analyzed. Always strats from "@". Example: @irusvvirus. | |
""" | |
# return "I have analized your priffile - you are the best" | |
username = username.replace("@", "") | |
DATA_ROOT = f"data/{username}" | |
MAX_SIZE = (144, 192) | |
TOP_K_POSTS = 12 | |
os.makedirs(DATA_ROOT, exist_ok=True) | |
# Login to Insta | |
L = instaloader.Instaloader() | |
# L.load_session("hfia2025", { | |
# "csrftoken": "EvyQWJTLWfbLy00C1h2hJoMmW3V002ik", | |
# "sessionid": "72565382956%3AbXg6LSEaqDXogy%3A21%3AAYc8jNkM18-P4t4l7dokdj3NK5odeGP6xfCwMiL0NA", | |
# "ds_user_id": "72565382956", | |
# "mid": "Zz7yUQAEAAHHANNrTkDrJG-KA05E", | |
# "ig_did": "44A88963-6613-41AA-93F0-418DF87BDA72" | |
# }) | |
# Get target profile | |
profile = instaloader.Profile.from_username(L.context, username) | |
# Read general info | |
user_meta = { | |
"username": profile.username, | |
"full Name": profile.full_name, | |
"bio": profile.biography, | |
"followers": profile.followers, | |
"followees": profile.followees, | |
"private": profile.is_private, | |
"verified": profile.is_verified, | |
} | |
if user_meta["private"]: | |
return "This profile is private. We are not allowed to access it." | |
# return f"I loged to inst && Profile {username} looks great && I recieved next user meta={user_meta}!" | |
# Scrape posts | |
posts = profile.get_posts() | |
async def download_post(post, session): | |
preview_url = post.url | |
async with session.get(preview_url) as response: | |
if response.status == 200: | |
filename = f"{post.shortcode}_preview.jpg" | |
filepath = os.path.join(DATA_ROOT, filename) | |
async with aio_open(filepath, "wb") as file: | |
await file.write(await response.read()) | |
print(f"✅ Saved: {filepath}") | |
else: | |
print(f"❌ Failed to download: {preview_url}") | |
meta = { | |
"shortcode": post.shortcode, | |
"likes": post.likes, | |
# "comments": post.comments, | |
"caption": post.caption, | |
"date": str(post.date), | |
} | |
return meta | |
async def process_posts(posts): | |
tasks = [] | |
async with aiohttp.ClientSession() as session: | |
for i, post in enumerate(posts): | |
if i == TOP_K_POSTS: | |
break | |
print(f"Downloading post {i}...") | |
tasks.append(download_post(post, session)) | |
result = await asyncio.gather(*tasks) | |
return result | |
posts_meta = asyncio.run(process_posts(posts)) | |
def create_image_grid(images, grid_size=(3, 4)): | |
cols, rows = grid_size | |
grid_width = cols * MAX_SIZE[0] | |
grid_height = rows * MAX_SIZE[1] | |
grid_image = Image.new("RGB", (grid_width, grid_height), "white") | |
for index, img in enumerate(images): | |
row, col = divmod(index, cols) | |
x_offset = col * MAX_SIZE[0] | |
y_offset = row * MAX_SIZE[1] | |
grid_image.paste(img, (x_offset, y_offset)) | |
return grid_image | |
def make_4_3_crop(image): | |
"""Crops the center of an image to a 4:3 aspect ratio.""" | |
width, height = image.size | |
target_ratio = 3 / 4 | |
# Determine new width and height based on the 4:3 ratio | |
if width / height > target_ratio: | |
# Image is too wide, crop width | |
new_width = int(height * target_ratio) | |
new_height = height | |
else: | |
# Image is too tall, crop height | |
new_width = width | |
new_height = int(width / target_ratio) | |
# Calculate cropping box (centered) | |
left = (width - new_width) // 2 | |
top = (height - new_height) // 2 | |
right = left + new_width | |
bottom = top + new_height | |
return image.crop((left, top, right, bottom)) | |
posts_images = [] | |
for p in posts_meta: | |
filename = p["shortcode"] + "_preview.jpg" | |
filepath = os.path.join(DATA_ROOT, filename) | |
img = Image.open(filepath) | |
img = make_4_3_crop(img) | |
img.thumbnail(MAX_SIZE) | |
posts_images.append(img) | |
posts_image = create_image_grid(posts_images) | |
user_meta["posts"] = posts_meta | |
model = Qwen2_5_VLForConditionalGeneration.from_pretrained( | |
"Qwen/Qwen2.5-VL-7B-Instruct", | |
torch_dtype=torch.bfloat16, | |
attn_implementation="flash_attention_2", | |
device_map="auto", | |
) | |
processor = AutoProcessor.from_pretrained("Qwen/Qwen2.5-VL-7B-Instruct") | |
messages = [ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "image", | |
"image": posts_image, | |
}, | |
{"type": "text", "text": "Describe this image."}, | |
], | |
} | |
] | |
text = processor.apply_chat_template( | |
messages, tokenize=False, add_generation_prompt=True | |
) | |
image_inputs, video_inputs = process_vision_info(messages) | |
inputs = processor( | |
text=[text], | |
images=image_inputs, | |
videos=video_inputs, | |
padding=True, | |
return_tensors="pt", | |
) | |
inputs = inputs.to(model.device) | |
generated_ids = model.generate(**inputs, max_new_tokens=128) | |
generated_ids_trimmed = [ | |
out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs.input_ids, generated_ids) | |
] | |
output_text = processor.batch_decode( | |
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False | |
) | |
return output_text | |
def get_current_time_in_timezone(timezone: str) -> str: | |
"""A tool that fetches the current local time in a specified timezone. | |
Args: | |
timezone: A string representing a valid timezone (e.g., 'America/New_York'). | |
""" | |
try: | |
# Create timezone object | |
tz = pytz.timezone(timezone) | |
# Get current time in that timezone | |
local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S") | |
return f"The current local time in {timezone} is: {local_time}" | |
except Exception as e: | |
return f"Error fetching time for timezone '{timezone}': {str(e)}" | |
final_answer = FinalAnswerTool() | |
# If the agent does not answer, the model is overloaded, please use another model or the following Hugging Face Endpoint that also contains qwen2.5 coder: | |
# model_id='https://pflgm2locj2t89co.us-east-1.aws.endpoints.huggingface.cloud' | |
model = HfApiModel( | |
max_tokens=2096, | |
temperature=0.5, | |
model_id='Qwen/Qwen2.5-Coder-32B-Instruct',# it is possible that this model may be overloaded | |
custom_role_conversions=None, | |
) | |
# Import tool from Hub | |
image_generation_tool = load_tool("agents-course/text-to-image", trust_remote_code=True) | |
with open("prompts.yaml", 'r') as stream: | |
prompt_templates = yaml.safe_load(stream) | |
agent = CodeAgent( | |
model=model, | |
tools=[my_insta_analizer, get_current_time_in_timezone, final_answer], ## add your tools here (don't remove final answer) | |
max_steps=6, | |
verbosity_level=1, | |
grammar=None, | |
planning_interval=None, | |
name=None, | |
description=None, | |
prompt_templates=prompt_templates | |
) | |
GradioUI(agent).launch() |