Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from PIL import Image | |
| import requests | |
| import io | |
| import os | |
| import random | |
| import time | |
| st.set_page_config(page_title="RecycleRight Budapest", page_icon="♻️", layout="wide") | |
| # ============== VISION AGENT ============== | |
| class VisionAgent: | |
| def __init__(self): | |
| self.token = os.getenv("HF_TOKEN") | |
| if not self.token and "HF_TOKEN" in st.secrets: | |
| self.token = st.secrets["HF_TOKEN"] | |
| # Use the standard API URL (Router URL can be finicky with these models) | |
| self.base_url = "https://api-inference.huggingface.co/models" | |
| # 🔄 CHANGED MODELS to ones that are still active on Free Tier | |
| self.classifier_model = "microsoft/resnet-50" | |
| self.captioner_model = "nlpconnect/vit-gpt2-image-captioning" | |
| def query_api(self, model_id, image_bytes): | |
| headers = {"Authorization": f"Bearer {self.token}"} | |
| # Retry logic | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| try: | |
| response = requests.post( | |
| f"{self.base_url}/{model_id}", | |
| headers=headers, | |
| data=image_bytes, | |
| timeout=20 # 20s is enough | |
| ) | |
| # SUCCESS | |
| if response.status_code == 200: | |
| return response | |
| # LOADING (Cold Start) | |
| elif response.status_code == 503: | |
| error_data = response.json() | |
| wait_time = error_data.get("estimated_time", 15) | |
| st.toast(f"⏳ Waking up {model_id}... {wait_time:.0f}s", icon="💤") | |
| time.sleep(wait_time) | |
| continue | |
| # If 410/404/500 -> Fail immediately, don't retry | |
| else: | |
| print(f"Error {response.status_code}: {response.text}") | |
| return None | |
| except Exception as e: | |
| print(f"Connection Error: {e}") | |
| return None | |
| return None | |
| def process(self, image, filename=""): | |
| # Image Prep | |
| if image.mode != 'RGB': image = image.convert('RGB') | |
| buf = io.BytesIO() | |
| image.save(buf, format='JPEG') | |
| image_bytes = buf.getvalue() | |
| if not self.token: | |
| st.error("❌ HF_TOKEN is missing.") | |
| return self.fallback_detection(filename) | |
| detected_keywords = [] | |
| display_description = "" | |
| # 1. Classifier (ResNet) | |
| st.toast(f"🤖 Identifying object...", icon="🔍") | |
| resp_class = self.query_api(self.classifier_model, image_bytes) | |
| if resp_class and resp_class.status_code == 200: | |
| results = resp_class.json() | |
| if isinstance(results, list): | |
| # ResNet returns [{'label': 'water bottle', 'score': 0.9}] | |
| top_labels = [item.get('label', '') for item in results[:3]] | |
| detected_keywords.extend(top_labels) | |
| st.success(f"🏷️ Detected: {', '.join(top_labels)}") | |
| # 2. Captioner (ViT-GPT2) | |
| st.toast(f"👁️ Analyzing context...", icon="📝") | |
| resp_cap = self.query_api(self.captioner_model, image_bytes) | |
| if resp_cap and resp_cap.status_code == 200: | |
| results = resp_cap.json() | |
| # Format: [{'generated_text': 'a woman holding a bottle'}] | |
| if isinstance(results, list) and len(results) > 0 and 'generated_text' in results[0]: | |
| caption = results[0]['generated_text'] | |
| display_description = caption | |
| detected_keywords.append(caption) | |
| # Check if we got ANYTHING | |
| combined_text = " ".join(detected_keywords).lower() | |
| if not combined_text.strip(): | |
| st.warning("⚠️ AI models unavailable. Switching to filename detection.") | |
| return self.fallback_detection(filename) | |
| return self.analyze(combined_text, display_description) | |
| def analyze(self, text, display_desc=""): | |
| # ... (Keep your existing analyze logic exactly as it is) ... | |
| # COPY PASTE YOUR EXISTING ANALYZE FUNCTION HERE | |
| result = { | |
| "description": display_desc if display_desc else text, | |
| "items": [], | |
| "materials": [], | |
| "condition": "clean", | |
| "needs_separation": False | |
| } | |
| # 1. Plastic | |
| if any(w in text for w in ["plastic", "bottle", "poly", "water bottle"]): | |
| if "glass" not in text and "wine" not in text: | |
| result["materials"].append("plastic") | |
| # 2. Glass | |
| if any(w in text for w in ["glass", "wine", "beer bottle", "jar"]): | |
| result["materials"].append("glass") | |
| # 3. Metal / Aluminum | |
| if any(w in text for w in ["can", "aluminum", "tin", "soda", "beer", "coke"]): | |
| if "trash can" not in text: | |
| result["materials"].append("metal") | |
| result["items"].append("can") | |
| # 4. Paper / Cardboard | |
| if any(w in text for w in ["cardboard", "box", "carton", "pizza"]): | |
| result["materials"].append("cardboard") | |
| if any(w in text for w in ["paper", "newspaper", "magazine"]): | |
| result["materials"].append("paper") | |
| # 5. Special Items | |
| if "pizza" in text: | |
| result["items"].append("pizza") | |
| result["condition"] = "dirty" | |
| if any(w in text for w in ["bottle", "can", "soda", "beer", "coke"]): | |
| result["items"].append("deposit") | |
| result["items"].append("bottle") | |
| if "coffee" in text or "cup" in text: | |
| result["items"].append("cup") | |
| result["items"].append("takeaway") | |
| if any(w in text for w in ["dirty", "trash", "garbage", "waste", "rotten", "greasy"]): | |
| result["condition"] = "dirty" | |
| result["items"] = list(set(result["items"])) | |
| result["materials"] = list(set(result["materials"])) | |
| if not result["materials"] and not result["items"]: | |
| result["materials"].append("unknown") | |
| return result | |
| def fallback_detection(self, filename): | |
| st.warning("using filename detection") | |
| return self.analyze(filename.lower() if filename else "") | |
| #def fallback_detection(self, filename): | |
| # st.warning(f"⚠️ Using filename detection for: {filename}") | |
| # # Simple logic for fallback | |
| # fname = filename.lower() if filename else "" | |
| # result = {"description": fname, "items": [], "materials": [], "condition": "clean", "needs_separation": False} | |
| # | |
| # if "bottle" in fname: result["items"].append("bottle"); result["materials"].append("plastic") | |
| # elif "can" in fname: result["items"].append("can"); result["materials"].append("metal") | |
| # elif "pizza" in fname: result["items"].append("pizza"); result["materials"].append("cardboard"); result["condition"]="dirty" | |
| # else: result["materials"].append("unknown") | |
| # return result | |
| # ============== RULES AGENT ============== | |
| class RulesAgent: | |
| def __init__(self): | |
| self.rules = { | |
| "plastic": {"bin": "SÁRGA", "emoji": "🟡", "label": "Műanyag és Fém"}, | |
| "metal": {"bin": "SÁRGA", "emoji": "🟡", "label": "Műanyag és Fém"}, | |
| "paper": {"bin": "KÉK", "emoji": "🔵", "label": "Papír"}, | |
| "cardboard": {"bin": "KÉK", "emoji": "🔵", "label": "Papír"}, | |
| "glass": {"bin": "GYŰJTŐPONT", "emoji": "🟢", "label": "Üveg"}, | |
| "hazardous": {"bin": "KÜLÖNLEGES", "emoji": "⚠️", "label": "Veszélyes hulladék"} | |
| } | |
| self.special_items = { | |
| "deposit": {"bin": "REpont", "emoji": "♻️", "reason": "Return for 50 Ft refund!"}, | |
| "takeaway": {"bin": "FEKETE", "emoji": "⚫", "reason": "Plastic lining prevents recycling"}, | |
| "pizza": {"bin": "FEKETE", "emoji": "⚫", "reason": "Grease contamination"}, | |
| "batteries": {"bin": "ELEMGYŰJTŐ", "emoji": "🔋", "reason": "Toxic materials"} | |
| } | |
| def process(self, vision_result): | |
| items = vision_result.get("items", []) | |
| materials = vision_result.get("materials", []) | |
| condition = vision_result.get("condition", "clean") | |
| result = {"bins_needed": [], "steps": [], "warnings": []} | |
| step = 1 | |
| handled = set() | |
| # Separation warning | |
| if vision_result.get("needs_separation"): | |
| result["steps"].append(f"{step}. 🔄 **Separate different materials first**") | |
| step += 1 | |
| # Special items | |
| for item in items: | |
| if item in self.special_items: | |
| special = self.special_items[item] | |
| result["steps"].append(f"{step}. {special['emoji']} **{item.title()} → {special['bin']}**") | |
| result["bins_needed"].append({"bin": special["bin"], "emoji": special["emoji"], "label": special["bin"]}) | |
| result["warnings"].append(f"💡 {special['reason']}") | |
| step += 1 | |
| # Mark materials as handled | |
| if item == "deposit": | |
| handled.update(["plastic", "metal", "glass"]) | |
| elif item == "takeaway": | |
| handled.add("paper") | |
| # Regular materials | |
| for mat in materials: | |
| if mat in handled: | |
| continue | |
| if mat in self.rules: | |
| rule = self.rules[mat] | |
| # Dirty paper/cardboard | |
| if condition == "dirty" and mat in ["paper", "cardboard"]: | |
| result["steps"].append(f"{step}. ⚫ **Dirty {mat} → FEKETE**") | |
| result["warnings"].append(f"⚠️ Contaminated {mat} cannot be recycled!") | |
| else: | |
| result["steps"].append(f"{step}. {rule['emoji']} **{mat.title()} → {rule['bin']}**") | |
| result["bins_needed"].append(rule) | |
| step += 1 | |
| # Remove duplicate bins | |
| seen = set() | |
| unique_bins = [] | |
| for bin_info in result["bins_needed"]: | |
| if bin_info["bin"] not in seen: | |
| unique_bins.append(bin_info) | |
| seen.add(bin_info["bin"]) | |
| result["bins_needed"] = unique_bins | |
| return result | |
| # ============== RECOMMENDATION AGENT ============== | |
| class RecommendationAgent: | |
| def __init__(self): | |
| self.co2_map = { | |
| "plastic": 2.5, | |
| "metal": 8.1, | |
| "paper": 3.9, | |
| "cardboard": 3.9, | |
| "glass": 0.5 | |
| } | |
| self.tips = { | |
| "plastic": "💡 Crush bottles to save 70% truck space!", | |
| "metal": "💡 Aluminum recycles infinitely without quality loss!", | |
| "paper": "💡 Recycling 1 ton saves 17 trees!", | |
| "glass": "💡 Glass recycles endlessly - no quality degradation!" | |
| } | |
| self.locations = { | |
| "REpont": ["📍 Tesco Árkád Shopping Center", "📍 Spar Westend", "📍 Auchan Budaörs"], | |
| "GYŰJTŐPONT": ["📍 Glass: Major intersections", "📍 Map: fkf.hu/hulladekgyujto-szigetek"], | |
| "ELEMGYŰJTŐ": ["📍 Media Markt", "📍 DM drugstores", "📍 Tesco Customer Service"] | |
| } | |
| def process(self, vision_result, rules_result): | |
| materials = vision_result.get("materials", []) | |
| items = vision_result.get("items", []) | |
| # Calculate CO2 impact | |
| co2 = sum(self.co2_map.get(m, 0) * 0.2 for m in materials) | |
| impact_msg = f"♻️ Saves ~**{co2:.1f} kg CO₂**" | |
| if co2 > 1: | |
| car_km = co2 * 4.6 | |
| impact_msg += f"\n🚗 Equivalent to {car_km:.1f} km car travel!" | |
| # Generate tips | |
| tips = [self.tips[m] for m in materials if m in self.tips] | |
| if "deposit" in items: | |
| tips.append("💰 Each bottle/can = 50 Ft refund!") | |
| # Find locations | |
| locations = [] | |
| if "deposit" in items: | |
| locations = self.locations["REpont"] | |
| elif "batteries" in items: | |
| locations = self.locations["ELEMGYŰJTŐ"] | |
| elif "glass" in materials: | |
| locations = self.locations["GYŰJTŐPONT"] | |
| # Fun facts | |
| facts = [ | |
| "🎯 Budapest aims for 65% recycling by 2035!", | |
| "📊 Average Hungarian produces 385 kg waste/year", | |
| "🏆 Proper sorting reduces costs by 40%!", | |
| "🌱 Recycling 1 ton plastic saves 5,774 kWh energy" | |
| ] | |
| return { | |
| "impact": impact_msg, | |
| "co2_kg": co2, | |
| "tips": tips, | |
| "locations": locations, | |
| "fun_fact": random.choice(facts) | |
| } | |
| # ============== MAIN APP ============== | |
| def load_agents(): | |
| return VisionAgent(), RulesAgent(), RecommendationAgent() | |
| def main(): | |
| st.title("♻️ RecycleRight Budapest") | |
| st.markdown("**Multi-Agent AI Recycling Assistant**") | |
| vision, rules, recommender = load_agents() | |
| with st.sidebar: | |
| st.markdown("## 🗑️ Budapest Bins") | |
| bins = [ | |
| ("🟡 SÁRGA", "Plastic & Metal"), | |
| ("🔵 KÉK", "Paper & Cardboard"), | |
| ("⚫ FEKETE", "General Waste"), | |
| ("🟢 GYŰJTŐPONT", "Glass (Collection Points)"), | |
| ("♻️ REpont", "Deposit Returns (50 Ft)") | |
| ] | |
| for emoji_bin, desc in bins: | |
| st.markdown(f"**{emoji_bin}** - {desc}") | |
| st.markdown("---") | |
| st.markdown("### 🤖 Agent Status") | |
| st.success("✅ Vision Agent ") | |
| st.success("✅ Rules Agent (Budapest FKF)") | |
| st.success("✅ Recommendation Agent") | |
| tab1, tab2 = st.tabs(["📸 Analyze", "🎮 Examples"]) | |
| with tab1: | |
| st.info("💡 **Multi-Model AI:** Tries different vision models for reliability!") | |
| uploaded = st.file_uploader("📁 Upload image", type=['jpg','png','jpeg']) | |
| if uploaded: | |
| img = Image.open(uploaded) | |
| st.image(img, caption=f"Uploaded: {uploaded.name}", use_column_width=True) | |
| if st.button("🔍 **ANALYZE**", type="primary", use_container_width=True): | |
| with st.spinner("🤖 Multi-agent processing..."): | |
| # Vision analysis | |
| v_result = vision.process(img, filename=uploaded.name) | |
| # Rules application | |
| r_result = rules.process(v_result) | |
| # Recommendations | |
| rec = recommender.process(v_result, r_result) | |
| st.balloons() | |
| st.success("✅ **Analysis Complete!**") | |
| # Display results | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| st.markdown("### 👁️ Detection") | |
| st.write(f"**Description:** {v_result['description']}") | |
| st.write(f"**Items:** {', '.join(v_result['items']) if v_result['items'] else 'None detected'}") | |
| st.write(f"**Materials:** {', '.join(v_result['materials']) if v_result['materials'] else 'None detected'}") | |
| st.write(f"**Condition:** {v_result['condition']}") | |
| st.markdown("### 📋 Disposal Instructions") | |
| if r_result['steps']: | |
| for step in r_result['steps']: | |
| st.markdown(step) | |
| else: | |
| st.info("No specific instructions - general waste disposal") | |
| for warning in r_result['warnings']: | |
| st.warning(warning) | |
| with col2: | |
| st.markdown("### 🗑️ Bins Needed") | |
| if r_result['bins_needed']: | |
| for bin_info in r_result['bins_needed']: | |
| st.metric(bin_info.get('label', bin_info['bin']), f"{bin_info['emoji']} {bin_info['bin']}") | |
| else: | |
| st.info("No specific bin required") | |
| st.markdown("### 🌍 Environmental Impact") | |
| st.info(rec['impact']) | |
| if rec['locations']: | |
| st.markdown("### 📍 Where to Go") | |
| for loc in rec['locations']: | |
| st.markdown(loc) | |
| # Tips | |
| if rec['tips']: | |
| st.markdown("### 💡 Pro Tips") | |
| cols = st.columns(len(rec['tips'])) | |
| for i, tip in enumerate(rec['tips']): | |
| with cols[i]: | |
| st.info(tip) | |
| st.markdown(f"**Did you know?** {rec['fun_fact']}") | |
| with tab2: | |
| st.markdown("### 🎮 Quick Test Examples") | |
| examples = [ | |
| ("🥤 Plastic Bottle", {"items": ["bottle", "deposit"], "materials": ["plastic"], "condition": "clean", "needs_separation": False, "description": "plastic water bottle"}), | |
| ("☕ Coffee Cup", {"items": ["cup", "takeaway"], "materials": ["paper"], "condition": "clean", "needs_separation": True, "description": "disposable coffee cup"}), | |
| ("🍕 Pizza Box", {"items": ["pizza", "box"], "materials": ["cardboard"], "condition": "dirty", "needs_separation": False, "description": "greasy pizza box"}), | |
| ("🍷 Wine Bottle", {"items": ["bottle"], "materials": ["glass"], "condition": "clean", "needs_separation": False, "description": "glass wine bottle"}), | |
| ("🥫 Metal Can", {"items": ["can", "deposit"], "materials": ["metal"], "condition": "clean", "needs_separation": False, "description": "aluminum soda can"}), | |
| ("🔋 Batteries", {"items": ["batteries"], "materials": ["hazardous"], "condition": "used", "needs_separation": False, "description": "AA batteries"}) | |
| ] | |
| cols = st.columns(3) | |
| for i, (name, data) in enumerate(examples): | |
| with cols[i % 3]: | |
| if st.button(name, use_container_width=True): | |
| r = rules.process(data) | |
| rec = recommender.process(data, r) | |
| st.markdown(f"#### {name}") | |
| st.write(f"**Items:** {', '.join(data['items'])}") | |
| for step in r['steps']: | |
| st.markdown(step) | |
| st.info(rec['impact']) | |
| if __name__ == "__main__": | |
| main() | |