Spaces:
Paused
Paused
Update app.py
Browse files
app.py
CHANGED
|
@@ -1,4 +1,4 @@
|
|
| 1 |
-
import os,
|
| 2 |
from typing import List, Tuple, Dict, Any
|
| 3 |
from PIL import Image
|
| 4 |
|
|
@@ -8,21 +8,23 @@ import torch.nn.functional as F
|
|
| 8 |
import gradio as gr
|
| 9 |
from datasets import load_dataset
|
| 10 |
from sklearn.neighbors import NearestNeighbors
|
| 11 |
-
from transformers import pipeline
|
| 12 |
|
| 13 |
# =============== CONFIG ===============
|
| 14 |
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
|
| 15 |
|
| 16 |
-
# Embeddings
|
| 17 |
OPENCLIP_BACKBONE = "ViT-H-14"
|
| 18 |
OPENCLIP_PRETRAIN = "laion2B-s32B-b79K" # laion/CLIP-ViT-H-14-laion2B-s32B-b79K
|
| 19 |
-
INDEX_SIZE = int(os.getenv("INDEX_SIZE", 400)) # כמה תמונות מהדאטהסט לאינדוקס
|
| 20 |
-
TOPK_NEAREST = 5
|
| 21 |
|
| 22 |
-
#
|
| 23 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 24 |
|
| 25 |
-
# Optional
|
| 26 |
USE_SD_VARIATIONS = True
|
| 27 |
SD_MODEL = "lambdalabs/sd-image-variations-diffusers"
|
| 28 |
# =====================================
|
|
@@ -48,110 +50,126 @@ def embed_image(img: Image.Image) -> np.ndarray:
|
|
| 48 |
feats = F.normalize(feats, dim=-1).squeeze(0).detach().cpu().numpy().astype(np.float32)
|
| 49 |
return feats # shape [D]
|
| 50 |
|
| 51 |
-
# ----------
|
| 52 |
-
|
| 53 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 54 |
|
| 55 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 56 |
ds = load_dataset(DATASET_NAME, split=DATASET_SPLIT)
|
|
|
|
| 57 |
n = min(n, len(ds))
|
| 58 |
-
imgs = []
|
| 59 |
for i in range(n):
|
| 60 |
-
|
| 61 |
-
im =
|
| 62 |
-
if isinstance(im, Image.Image):
|
| 63 |
-
|
| 64 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 65 |
|
| 66 |
def build_index(imgs: List[Image.Image]) -> Tuple[NearestNeighbors, np.ndarray]:
|
| 67 |
-
vecs = []
|
| 68 |
-
for im in imgs:
|
| 69 |
-
vecs.append(embed_image(im))
|
| 70 |
X = np.stack(vecs, axis=0)
|
| 71 |
-
nn = NearestNeighbors(metric="cosine", n_neighbors=min(TOPK_NEAREST, len(imgs)))
|
| 72 |
nn.fit(X)
|
| 73 |
return nn, X
|
| 74 |
|
| 75 |
print("Loading dataset & building index (first time only)...")
|
| 76 |
-
DATASET_IMAGES
|
|
|
|
|
|
|
| 77 |
NN_MODEL, EMB_MATRIX = build_index(DATASET_IMAGES)
|
| 78 |
-
print(f"Index ready with {len(DATASET_IMAGES)} images.")
|
| 79 |
|
|
|
|
| 80 |
def nearest5(pil_img: Image.Image) -> List[Tuple[Image.Image, str]]:
|
| 81 |
q = embed_image(pil_img).reshape(1, -1)
|
| 82 |
-
|
| 83 |
-
|
| 84 |
out = []
|
| 85 |
for rank, (dist, idx) in enumerate(zip(dists[0], idxs[0]), start=1):
|
| 86 |
-
sim = 1.0 - float(dist)
|
| 87 |
im = DATASET_IMAGES[int(idx)]
|
| 88 |
caption = f"#{rank} sim={sim:.3f} idx={int(idx)}"
|
| 89 |
out.append((im, caption))
|
| 90 |
-
return out
|
| 91 |
-
|
| 92 |
-
# ---------- Emotion & Stress ----------
|
| 93 |
-
EMO_MAP = {
|
| 94 |
-
"anger": "anger", "angry": "anger",
|
| 95 |
-
"disgust": "disgust",
|
| 96 |
-
"fear": "fear",
|
| 97 |
-
"happy": "happy", "happiness": "happy",
|
| 98 |
-
"neutral": "neutral", "calm": "neutral",
|
| 99 |
-
"sad": "sad", "sadness": "sad",
|
| 100 |
-
"surprise": "surprise",
|
| 101 |
-
"contempt": "contempt",
|
| 102 |
-
}
|
| 103 |
-
|
| 104 |
-
# higher == more stressed
|
| 105 |
-
STRESS_WEIGHTS = {
|
| 106 |
-
"anger": 0.95,
|
| 107 |
-
"fear": 0.90,
|
| 108 |
-
"disgust": 0.70,
|
| 109 |
-
"sad": 0.80,
|
| 110 |
-
"surprise": 0.55,
|
| 111 |
-
"neutral": 0.30,
|
| 112 |
-
"contempt": 0.65,
|
| 113 |
-
"happy": 0.10,
|
| 114 |
-
}
|
| 115 |
-
|
| 116 |
-
def _bucket(p: float) -> str:
|
| 117 |
-
return "Low" if p < 33 else ("Medium" if p < 66 else "High")
|
| 118 |
-
|
| 119 |
-
emo_pipe = pipeline("image-classification", model=EMO_MODEL, device=0 if DEVICE == "cuda" else -1)
|
| 120 |
|
| 121 |
-
def
|
| 122 |
-
|
| 123 |
-
|
| 124 |
-
|
| 125 |
-
|
| 126 |
-
|
| 127 |
-
|
| 128 |
-
|
| 129 |
-
|
| 130 |
-
|
| 131 |
-
|
| 132 |
-
|
| 133 |
-
return
|
| 134 |
|
| 135 |
def emotions_top3(pil_img: Image.Image) -> List[List[Any]]:
|
| 136 |
-
|
| 137 |
-
probs = _pipe_to_probs(res)
|
| 138 |
items = sorted(probs.items(), key=lambda kv: kv[1], reverse=True)[:3]
|
| 139 |
table = []
|
| 140 |
for i, (emo, p) in enumerate(items, start=1):
|
| 141 |
table.append([i, emo, round(100.0 * p, 2)])
|
| 142 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 143 |
|
| 144 |
def stress_index(pil_img: Image.Image) -> Tuple[str, float]:
|
| 145 |
-
|
| 146 |
-
|
| 147 |
-
raw = 0.0
|
| 148 |
-
for k, v in probs.items():
|
| 149 |
-
w = STRESS_WEIGHTS.get(k, 0.5)
|
| 150 |
-
raw += v * w
|
| 151 |
pct = max(0.0, min(100.0, 100.0 * raw))
|
| 152 |
return f"{pct:.1f}% ({_bucket(pct)})", pct
|
| 153 |
|
| 154 |
-
# ---------- Optional: SD image variations
|
| 155 |
sd_pipe = None
|
| 156 |
if USE_SD_VARIATIONS:
|
| 157 |
try:
|
|
@@ -172,15 +190,13 @@ def generate_one_variation(pil_img: Image.Image, steps: int) -> Image.Image:
|
|
| 172 |
return out
|
| 173 |
|
| 174 |
# ===================== GRADIO UI =====================
|
| 175 |
-
CSS = ""
|
| 176 |
-
.box { border: 1px solid #e5e7eb; border-radius: 12px; padding: 10px; }
|
| 177 |
-
"""
|
| 178 |
|
| 179 |
-
with gr.Blocks(title="Face Emotion & Stress Analyzer —
|
| 180 |
gr.Markdown(
|
| 181 |
-
"### Face Emotion & Stress Analyzer —
|
| 182 |
"- Embeddings: **laion/CLIP-ViT-H-14-laion2B-s32B-b79K** (open_clip)\n"
|
| 183 |
-
"- Emotion model: **
|
| 184 |
"- Optional SD variations: **lambdalabs/sd-image-variations-diffusers** (1 synthetic only)\n"
|
| 185 |
"- Right column shows nearest 5 images from the dataset (clickable)."
|
| 186 |
)
|
|
@@ -246,9 +262,7 @@ with gr.Blocks(title="Face Emotion & Stress Analyzer — CPU-friendly", css=CSS,
|
|
| 246 |
gal = nearest5(img) # list[(PIL, caption)]
|
| 247 |
gal_imgs = [g[0] for g in gal]
|
| 248 |
gal_caps = [g[1] for g in gal]
|
| 249 |
-
# gr.Gallery accepts [(img, caption), ...]
|
| 250 |
gallery = [(im, cap) for im, cap in zip(gal_imgs, gal_caps)]
|
| 251 |
-
# return
|
| 252 |
return t3, s_label, gallery, gal_imgs, list(range(len(gal_imgs)))
|
| 253 |
|
| 254 |
upload_image.change(
|
|
@@ -258,7 +272,6 @@ with gr.Blocks(title="Face Emotion & Stress Analyzer — CPU-friendly", css=CSS,
|
|
| 258 |
)
|
| 259 |
|
| 260 |
def on_gallery_select(evt: gr.SelectData, imgs: List[Image.Image], idxs: List[int]):
|
| 261 |
-
# evt.index is the clicked cell
|
| 262 |
if imgs is None or not imgs:
|
| 263 |
return [], ""
|
| 264 |
i = int(evt.index) if evt is not None else 0
|
|
@@ -291,4 +304,4 @@ with gr.Blocks(title="Face Emotion & Stress Analyzer — CPU-friendly", css=CSS,
|
|
| 291 |
)
|
| 292 |
|
| 293 |
if __name__ == "__main__":
|
| 294 |
-
demo.launch()
|
|
|
|
| 1 |
+
import os, numpy as np
|
| 2 |
from typing import List, Tuple, Dict, Any
|
| 3 |
from PIL import Image
|
| 4 |
|
|
|
|
| 8 |
import gradio as gr
|
| 9 |
from datasets import load_dataset
|
| 10 |
from sklearn.neighbors import NearestNeighbors
|
|
|
|
| 11 |
|
| 12 |
# =============== CONFIG ===============
|
| 13 |
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
|
| 14 |
|
| 15 |
+
# Embeddings backbone
|
| 16 |
OPENCLIP_BACKBONE = "ViT-H-14"
|
| 17 |
OPENCLIP_PRETRAIN = "laion2B-s32B-b79K" # laion/CLIP-ViT-H-14-laion2B-s32B-b79K
|
|
|
|
|
|
|
| 18 |
|
| 19 |
+
# Dataset (THIS IS YOUR "MODEL" SOURCE NOW)
|
| 20 |
+
DATASET_NAME = "tukey/human_face_emotions_roboflow"
|
| 21 |
+
DATASET_SPLIT = "train"
|
| 22 |
+
|
| 23 |
+
INDEX_SIZE = int(os.getenv("INDEX_SIZE", 400)) # כמה דוגמאות מהדאטהסט לאינדוקס
|
| 24 |
+
TOPK_NEAREST = 5 # להצגה בגלריה
|
| 25 |
+
KNN_K_FOR_CLASS = 25 # לשקלול רגשות
|
| 26 |
|
| 27 |
+
# Optional SD variations
|
| 28 |
USE_SD_VARIATIONS = True
|
| 29 |
SD_MODEL = "lambdalabs/sd-image-variations-diffusers"
|
| 30 |
# =====================================
|
|
|
|
| 50 |
feats = F.normalize(feats, dim=-1).squeeze(0).detach().cpu().numpy().astype(np.float32)
|
| 51 |
return feats # shape [D]
|
| 52 |
|
| 53 |
+
# ---------- Labels & stress mapping ----------
|
| 54 |
+
EMO_MAP = {
|
| 55 |
+
"anger": "anger", "angry": "anger",
|
| 56 |
+
"disgust": "disgust",
|
| 57 |
+
"fear": "fear",
|
| 58 |
+
"happy": "happy", "happiness": "happy",
|
| 59 |
+
"neutral": "neutral", "calm": "neutral",
|
| 60 |
+
"sad": "sad", "sadness": "sad",
|
| 61 |
+
"surprise": "surprise",
|
| 62 |
+
"contempt": "contempt",
|
| 63 |
+
}
|
| 64 |
+
ALLOWED = set(EMO_MAP.values()) # whitelist קשיח
|
| 65 |
+
|
| 66 |
+
STRESS_WEIGHTS = {
|
| 67 |
+
"anger": 0.95, "fear": 0.90, "disgust": 0.70, "sad": 0.80,
|
| 68 |
+
"surprise": 0.55, "neutral": 0.30, "contempt": 0.65, "happy": 0.10,
|
| 69 |
+
}
|
| 70 |
+
def _bucket(p: float) -> str:
|
| 71 |
+
return "Low" if p < 33 else ("Medium" if p < 66 else "High")
|
| 72 |
|
| 73 |
+
# ---------- Load dataset & build index ----------
|
| 74 |
+
def _extract_label(rec: Dict[str, Any]) -> str:
|
| 75 |
+
# התאמה לשדות אפשריים בדאטהסט
|
| 76 |
+
if "label" in rec and rec["label"]:
|
| 77 |
+
raw = rec["label"]
|
| 78 |
+
if isinstance(raw, (list, tuple)): raw = raw[0]
|
| 79 |
+
return str(raw).strip().lower()
|
| 80 |
+
if "labels" in rec and rec["labels"]:
|
| 81 |
+
raw = rec["labels"][0]
|
| 82 |
+
return str(raw).strip().lower()
|
| 83 |
+
if "qa" in rec and rec["qa"] and isinstance(rec["qa"], list):
|
| 84 |
+
qa0 = rec["qa"][0]
|
| 85 |
+
if qa0 and "answer" in qa0:
|
| 86 |
+
return str(qa0["answer"]).strip().lower()
|
| 87 |
+
return ""
|
| 88 |
+
|
| 89 |
+
def _map_allowed(lbl: str) -> str:
|
| 90 |
+
# ממפה לשם סטנדרטי, ומסנן החוצה לא מוכרות
|
| 91 |
+
mapped = EMO_MAP.get(lbl, lbl)
|
| 92 |
+
return mapped if mapped in ALLOWED else "" # "" => drop
|
| 93 |
+
|
| 94 |
+
def _load_images_labels_for_index(n: int) -> Tuple[List[Image.Image], List[str]]:
|
| 95 |
ds = load_dataset(DATASET_NAME, split=DATASET_SPLIT)
|
| 96 |
+
imgs, labels = [], []
|
| 97 |
n = min(n, len(ds))
|
|
|
|
| 98 |
for i in range(n):
|
| 99 |
+
rec = ds[i]
|
| 100 |
+
im = rec.get("image")
|
| 101 |
+
if not isinstance(im, Image.Image):
|
| 102 |
+
continue
|
| 103 |
+
raw_lbl = _extract_label(rec)
|
| 104 |
+
mapped = _map_allowed(raw_lbl)
|
| 105 |
+
if not mapped:
|
| 106 |
+
continue # זורק תוויות לא מותרות/ריקות
|
| 107 |
+
imgs.append(im.copy())
|
| 108 |
+
labels.append(mapped)
|
| 109 |
+
return imgs, labels
|
| 110 |
|
| 111 |
def build_index(imgs: List[Image.Image]) -> Tuple[NearestNeighbors, np.ndarray]:
|
| 112 |
+
vecs = [embed_image(im) for im in imgs]
|
|
|
|
|
|
|
| 113 |
X = np.stack(vecs, axis=0)
|
| 114 |
+
nn = NearestNeighbors(metric="cosine", n_neighbors=min(max(TOPK_NEAREST, KNN_K_FOR_CLASS), len(imgs)))
|
| 115 |
nn.fit(X)
|
| 116 |
return nn, X
|
| 117 |
|
| 118 |
print("Loading dataset & building index (first time only)...")
|
| 119 |
+
DATASET_IMAGES, DATASET_LABELS = _load_images_labels_for_index(INDEX_SIZE)
|
| 120 |
+
if len(DATASET_IMAGES) == 0:
|
| 121 |
+
raise RuntimeError("No images with allowed labels were loaded from the dataset.")
|
| 122 |
NN_MODEL, EMB_MATRIX = build_index(DATASET_IMAGES)
|
| 123 |
+
print(f"Index ready with {len(DATASET_IMAGES)} images (labels={sorted(set(DATASET_LABELS))}).")
|
| 124 |
|
| 125 |
+
# ---------- Nearest & KNN-based classification ----------
|
| 126 |
def nearest5(pil_img: Image.Image) -> List[Tuple[Image.Image, str]]:
|
| 127 |
q = embed_image(pil_img).reshape(1, -1)
|
| 128 |
+
n = min(5, len(DATASET_IMAGES))
|
| 129 |
+
dists, idxs = NN_MODEL.kneighbors(q, n_neighbors=n)
|
| 130 |
out = []
|
| 131 |
for rank, (dist, idx) in enumerate(zip(dists[0], idxs[0]), start=1):
|
| 132 |
+
sim = max(0.0, 1.0 - float(dist)) # cosine distance -> similarity
|
| 133 |
im = DATASET_IMAGES[int(idx)]
|
| 134 |
caption = f"#{rank} sim={sim:.3f} idx={int(idx)}"
|
| 135 |
out.append((im, caption))
|
| 136 |
+
return out
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 137 |
|
| 138 |
+
def knn_probs(pil_img: Image.Image, k: int = KNN_K_FOR_CLASS) -> Dict[str, float]:
|
| 139 |
+
q = embed_image(pil_img).reshape(1, -1)
|
| 140 |
+
k = min(k, len(DATASET_IMAGES))
|
| 141 |
+
dists, idxs = NN_MODEL.kneighbors(q, n_neighbors=k)
|
| 142 |
+
sims = 1.0 - dists[0] # higher is better
|
| 143 |
+
sims = np.maximum(sims, 0.0)
|
| 144 |
+
votes: Dict[str, float] = {}
|
| 145 |
+
for sim, idx in zip(sims, idxs[0]):
|
| 146 |
+
lbl = DATASET_LABELS[int(idx)]
|
| 147 |
+
if lbl in ALLOWED:
|
| 148 |
+
votes[lbl] = votes.get(lbl, 0.0) + float(sim)
|
| 149 |
+
Z = sum(votes.values()) or 1.0
|
| 150 |
+
return {k: v / Z for k, v in votes.items()}
|
| 151 |
|
| 152 |
def emotions_top3(pil_img: Image.Image) -> List[List[Any]]:
|
| 153 |
+
probs = knn_probs(pil_img)
|
|
|
|
| 154 |
items = sorted(probs.items(), key=lambda kv: kv[1], reverse=True)[:3]
|
| 155 |
table = []
|
| 156 |
for i, (emo, p) in enumerate(items, start=1):
|
| 157 |
table.append([i, emo, round(100.0 * p, 2)])
|
| 158 |
+
# משלימים אם יש פחות מ-3
|
| 159 |
+
seen = {r[1] for r in table}
|
| 160 |
+
for fill in ["neutral", "other"]:
|
| 161 |
+
if len(table) >= 3: break
|
| 162 |
+
if fill in ALLOWED and fill not in seen:
|
| 163 |
+
table.append([len(table)+1, fill, 0.0])
|
| 164 |
+
return table
|
| 165 |
|
| 166 |
def stress_index(pil_img: Image.Image) -> Tuple[str, float]:
|
| 167 |
+
probs = knn_probs(pil_img)
|
| 168 |
+
raw = sum(probs.get(k, 0.0) * STRESS_WEIGHTS.get(k, 0.5) for k in ALLOWED)
|
|
|
|
|
|
|
|
|
|
|
|
|
| 169 |
pct = max(0.0, min(100.0, 100.0 * raw))
|
| 170 |
return f"{pct:.1f}% ({_bucket(pct)})", pct
|
| 171 |
|
| 172 |
+
# ---------- Optional: SD image variations ----------
|
| 173 |
sd_pipe = None
|
| 174 |
if USE_SD_VARIATIONS:
|
| 175 |
try:
|
|
|
|
| 190 |
return out
|
| 191 |
|
| 192 |
# ===================== GRADIO UI =====================
|
| 193 |
+
CSS = ".box { border: 1px solid #e5e7eb; border-radius: 12px; padding: 10px; }"
|
|
|
|
|
|
|
| 194 |
|
| 195 |
+
with gr.Blocks(title="Face Emotion & Stress Analyzer — KNN over tukey dataset", css=CSS, fill_height=False) as demo:
|
| 196 |
gr.Markdown(
|
| 197 |
+
"### Face Emotion & Stress Analyzer — **KNN over `tukey/human_face_emotions_roboflow`**\n"
|
| 198 |
"- Embeddings: **laion/CLIP-ViT-H-14-laion2B-s32B-b79K** (open_clip)\n"
|
| 199 |
+
"- Emotion model: **KNN using labels from `tukey/human_face_emotions_roboflow`**\n"
|
| 200 |
"- Optional SD variations: **lambdalabs/sd-image-variations-diffusers** (1 synthetic only)\n"
|
| 201 |
"- Right column shows nearest 5 images from the dataset (clickable)."
|
| 202 |
)
|
|
|
|
| 262 |
gal = nearest5(img) # list[(PIL, caption)]
|
| 263 |
gal_imgs = [g[0] for g in gal]
|
| 264 |
gal_caps = [g[1] for g in gal]
|
|
|
|
| 265 |
gallery = [(im, cap) for im, cap in zip(gal_imgs, gal_caps)]
|
|
|
|
| 266 |
return t3, s_label, gallery, gal_imgs, list(range(len(gal_imgs)))
|
| 267 |
|
| 268 |
upload_image.change(
|
|
|
|
| 272 |
)
|
| 273 |
|
| 274 |
def on_gallery_select(evt: gr.SelectData, imgs: List[Image.Image], idxs: List[int]):
|
|
|
|
| 275 |
if imgs is None or not imgs:
|
| 276 |
return [], ""
|
| 277 |
i = int(evt.index) if evt is not None else 0
|
|
|
|
| 304 |
)
|
| 305 |
|
| 306 |
if __name__ == "__main__":
|
| 307 |
+
demo.launch()
|