FedAdv / attacks.py
MarshallCN
add targeted attack
685f135
"""
attacks.py
提供对检测模型(以 YOLOv8/ultralytics 为主)执行 FGSM 与 PGD 的实现。
设计思路与注意事项:
- 假定我们可以访问到底层的 torch.nn.Module(例如 ultralytics.YOLO 实例的 .model 成员)
并能以 tensor 输入直接跑 forward(),得到原始预测张量 (batch, N_preds, C)
其中通常 C = 5 + num_classes(bbox4 + obj_conf + class_logits)。
- 计算 loss: 对每个 anchor/pred,取 obj_conf * max_class_prob 作为该预测的置信度,
把全局置信度求和作为被攻击的目标函数;对该目标函数**做最小化**以让检测置信下降。
- FGSM: x_adv = x - eps * sign(grad(loss))
- PGD: 多步迭代,每步做 x = x - alpha * sign(grad), 并投影到 L_inf 球体:|x-x_orig|<=eps
- 如果你的 ultralytics 版本/模型封装与假定不同,代码会抛错并提示如何修改。
"""
from typing import Tuple, Optional
import torch
import torch.nn as nn
import numpy as np
from PIL import Image
import torchvision.transforms as T
import math
import torch.nn.functional as F
from typing import Tuple, Dict
# ============= Resize image =====================
def _get_max_stride(net) -> int:
s = getattr(net, "stride", None)
if isinstance(s, torch.Tensor):
return int(s.max().item())
try:
return int(max(s))
except Exception:
return 32 # 兜底
def letterbox_tensor(
x: torch.Tensor,
*,
imgsz: int,
stride: int,
fill: float = 114.0 / 255.0,
scaleup: bool = True
) -> Tuple[torch.Tensor, Dict]:
"""
x: [1,3,H,W] in [0,1]
返回: x_lb, meta (含缩放比例与左右上下 padding 以便反映射)
"""
assert x.ndim == 4 and x.shape[0] == 1
_, C, H, W = x.shape
if imgsz is None:
# 动态设定目标边长:把 max(H,W) 向上取整到 stride 的倍数(更贴近原生自动整形)
imgsz = int(math.ceil(max(H, W) / stride) * stride)
r = min(imgsz / H, imgsz / W) # 等比缩放比例
if not scaleup:
r = min(r, 1.0)
new_w = int(round(W * r))
new_h = int(round(H * r))
# 先等比缩放
if (new_h, new_w) != (H, W):
x = F.interpolate(x, size=(new_h, new_w), mode="bilinear", align_corners=False)
# 再 padding 到 (imgsz, imgsz),保持与 YOLO 一致的对称填充
dw = imgsz - new_w
dh = imgsz - new_h
left, right = dw // 2, dw - dw // 2
top, bottom = dh // 2, dh - dh // 2
x = F.pad(x, (left, right, top, bottom), mode="constant", value=fill)
meta = {
"ratio": r,
"pad": (left, top),
"resized_shape": (new_h, new_w),
"imgsz": imgsz,
}
return x, meta
def unletterbox_to_original(
x_lb: torch.Tensor, meta: Dict, orig_hw: Tuple[int, int]
) -> torch.Tensor:
"""
把 letterboxed 张量([1,3,imgsz,imgsz])反映射回原始 H0,W0 尺寸(去 padding + 反缩放)
"""
assert x_lb.ndim == 4 and x_lb.shape[0] == 1
H0, W0 = orig_hw
(left, top) = meta["pad"]
(h_r, w_r) = meta["resized_shape"]
# 去 padding(裁出等比缩放后的区域)
x_unpad = x_lb[..., top:top + h_r, left:left + w_r] # [1,3,h_r,w_r]
# 反缩放到原图大小
x_rec = F.interpolate(x_unpad, size=(H0, W0), mode="bilinear", align_corners=False)
return x_rec
# ----- basic preprocessing / deprocessing (RGB PIL <-> torch tensor) -----
_to_tensor = T.Compose([
T.ToTensor(), # float in [0,1], shape C,H,W
])
_to_pil = T.ToPILImage()
def pil_to_tensor(img_pil: Image.Image, device: torch.device) -> torch.Tensor:
"""PIL RGB -> float tensor [1,3,H,W] on device"""
t = _to_tensor(img_pil).unsqueeze(0).to(device) # 1,C,H,W
t.requires_grad = True
return t
def tensor_to_pil(t: torch.Tensor) -> Image.Image:
"""tensor [1,3,H,W] (0..1) -> PIL RGB"""
t = t.detach().cpu().squeeze(0).clamp(0.0, 1.0)
return _to_pil(t)
# ----- helper to obtain underlying torch module from ultralytics YOLO wrapper -----
def get_torch_module_from_ultralytics(model) -> nn.Module:
"""
Try to retrieve an nn.Module that accepts an input tensor and returns raw preds.
For ultralytics.YOLO, .model is usually the underlying Detect/Model (nn.Module).
"""
if hasattr(model, "model") and isinstance(model.model, nn.Module):
return model.model
# Some wrappers nest further; attempt a few common names
for attr in ("model", "module", "net", "model_"):
if hasattr(model, attr) and isinstance(getattr(model, attr), nn.Module):
return getattr(model, attr)
raise RuntimeError("无法找到底层 torch.nn.Module。请确保传入的是 ultralytics.YOLO 实例且能访问 model.model。")
# ----- interpret raw model outputs to confidences -----
def _ensure_bcn(preds):
assert preds.ndim == 3
B, C1, C2 = preds.shape
if C1 - 4 > 0 and C2 >= 1000: # [B, 4+nc, N]
return preds
if C2 - 4 > 0 and C1 >= 1000: # [B, N, 4+nc]
return preds.permute(0, 2, 1).contiguous()
return preds
def _xywh_to_xyxy(xywh):
x,y,w,h = xywh.unbind(-1)
return torch.stack([x-w/2, y-h/2, x+w/2, y+h/2], dim=-1)
def _xyxy_to_xywh(xyxy):
x1,y1,x2,y2 = xyxy.unbind(-1)
cx = (x1+x2)/2; cy = (y1+y2)/2
w = (x2-x1).clamp(min=0); h = (y2-y1).clamp(min=0)
return torch.stack([cx,cy,w,h], dim=-1)
def _map_xyxy_to_letterbox(xyxy_tensor, meta):
if meta is None:
return xyxy_tensor
r = meta.get('ratio', meta.get('scale', (1.0, 1.0)))
p = meta.get('pad', (0.0, 0.0))
if isinstance(r, (int, float)):
r = (float(r), float(r))
rx, ry = float(r[0]), float(r[1])
px, py = float(p[0]), float(p[1])
x1 = xyxy_tensor[:, 0] * rx + px
y1 = xyxy_tensor[:, 1] * ry + py
x2 = xyxy_tensor[:, 2] * rx + px
y2 = xyxy_tensor[:, 3] * ry + py
return torch.stack([x1, y1, x2, y2], dim=-1)
def _iou_xyxy(b_xyxy, g_xyxy):
N, M = b_xyxy.size(0), g_xyxy.size(0)
b = b_xyxy[:, None, :].expand(N, M, 4)
g = g_xyxy[None, :, :].expand(N, M, 4)
inter_x1 = torch.maximum(b[...,0], g[...,0])
inter_y1 = torch.maximum(b[...,1], g[...,1])
inter_x2 = torch.minimum(b[...,2], g[...,2])
inter_y2 = torch.minimum(b[...,3], g[...,3])
inter_w = (inter_x2 - inter_x1).clamp(min=0)
inter_h = (inter_y2 - inter_y1).clamp(min=0)
inter = inter_w * inter_h
area_b = (b[...,2]-b[...,0]).clamp(min=0) * (b[...,3]-b[...,1]).clamp(min=0)
area_g = (g[...,2]-g[...,0]).clamp(min=0) * (g[...,3]-g[...,1]).clamp(min=0)
return inter / (area_b + area_g - inter + 1e-9)
def _gt_list_to_xyxy_tensor(gt_list, device, meta=None):
if not gt_list:
return torch.empty(0, 4, device=device, dtype=torch.float32)
xyxy = torch.tensor([b['xyxy'] for b in gt_list], dtype=torch.float32, device=device)
return _map_xyxy_to_letterbox(xyxy, meta)
def preds_to_targeted_loss(
preds, # [B,4+nc,N] 或 [B,N,4+nc];类别部分最好是 logits
target_cls: int,
gt_xywh, # 这里直接支持 list[{'xyxy':..., 'cls':..., 'conf':...}]
topk: int = 20,
kappa: float = 0.1,
lambda_margin: float = 1.0,
lambda_keep: float = 0.2,
lambda_target: float = 0.0, # 新增:恢复 -p_t.mean() 这项
debug: bool = False,
meta: dict | None = None, # 若 GT 是原图坐标,传入 letterbox 的 meta
):
preds = _ensure_bcn(preds)
B, C, N = preds.shape
nc = C - 4
assert 0 <= target_cls < nc
# 解析 GT(list -> tensor in letterbox coords)
gt_xyxy_lb = _gt_list_to_xyxy_tensor(gt_xywh, preds.device, meta=meta) # [M,4]
boxes_bxn4 = preds[:, :4, :].permute(0, 2, 1) # [B,N,4] (xywh, letterbox)
logits_bxcn = preds[:, 4:, :] # [B,nc,N]
# 若类别部分像概率(0~1),转为 logits
zmin, zmax = logits_bxcn.min().item(), logits_bxcn.max().item()
if 0.0 <= zmin and zmax <= 1.0:
p = logits_bxcn.clamp(1e-6, 1-1e-6)
logits_bxcn = torch.log(p) - torch.log1p(-p)
# 选与 GT 最相关的候选 idx(batch=0)
b_xyxy = _xywh_to_xyxy(boxes_bxn4[0]) # [N,4]
if gt_xyxy_lb.numel() > 0:
iou = _iou_xyxy(b_xyxy, gt_xyxy_lb) # [N,M]
best_per_gt = iou.argmax(dim=0) # [M]
idx = torch.unique(best_per_gt, sorted=False)
if idx.numel() < topk:
topvals = iou.max(dim=1).values
topidx2 = torch.topk(topvals, k=min(topk, N)).indices
idx = torch.unique(torch.cat([idx, topidx2], 0), sorted=False)[:topk]
else:
# 没 GT 就按当前最大类别置信度取 topk
z = logits_bxcn[0] # [nc,N]
pmax = z.softmax(dim=0).max(dim=0).values
idx = torch.topk(pmax, k=min(topk, N)).indices
if idx.numel() == 0:
idx = torch.arange(min(topk, N), device=preds.device)
# 取这些候选的类别 logits:[K,nc]
z = logits_bxcn[0, :, idx].T # [K,nc]
# 1) CW-style margin
mask = torch.ones(nc, device=z.device, dtype=torch.bool)
mask[target_cls] = False
z_t = z[:, target_cls]
z_oth = z[:, mask].max(dim=1).values
loss_margin = F.relu(kappa + z_oth - z_t).mean()
# 2) keep(KL >= 0)
with torch.no_grad():
p_clean = z.detach().softmax(dim=1)
logp_adv = z.log_softmax(dim=1)
loss_keep = F.kl_div(logp_adv, p_clean, reduction="batchmean")
# 3) 你的旧项:直接推高目标类 logit
loss_target = -z_t.mean()
loss = (
lambda_margin * loss_margin
+ lambda_keep * loss_keep
+ lambda_target * loss_target
)
if debug:
same_ratio = (z.argmax(dim=1) == target_cls).float().mean().item()
print(
f"[dbg] K={idx.numel()} nc={nc} target={target_cls} "
f"margin={loss_margin.item():.6f} keep={loss_keep.item():.6f} "
f"targ={loss_target.item():.6f} same_ratio={same_ratio:.3f} "
f"z_t_mean={z_t.mean().item():.3f} z_oth_mean={z_oth.mean().item():.3f}"
)
return loss
# def preds_to_confidence_sum(preds: torch.Tensor) -> torch.Tensor:
# """
# preds: tensor shape (batch, N_preds, C) or (batch, C, H, W) depending on model.
# We support the common YOLO format where last dim: [x,y,w,h,obj_conf, class_probs...]
# Returns scalar: sum of (obj_conf * max_class_prob) over batch and predictions.
# """
# if preds is None:
# raise ValueError("preds is None")
# # handle shape (batch, N_preds, C)
# if preds.ndim == 3:
# # assume last dim: 5 + num_classes
# if preds.shape[-1] < 6:
# # can't interpret
# raise RuntimeError(f"preds last dim too small ({preds.shape[-1]}). Expecting >=6.")
# obj_conf = preds[..., 4] # (batch, N)
# class_probs = preds[..., 5:] # (batch, N, num_cls)
# max_class, _ = class_probs.max(dim=-1) # (batch, N)
# conf = obj_conf * max_class
# return conf.sum()
# # some models output (batch, C, H, W) - flatten
# if preds.ndim == 4:
# # try to collapse so that last dim is class
# b, c, h, w = preds.shape
# flat = preds.view(b, c, -1).permute(0, 2, 1) # (batch, N, C)
# return preds_to_confidence_sum(flat)
# raise RuntimeError(f"Unhandled preds dimensionality: {preds.shape}")
# ----- core attack implementations -----
def fgsm_attack_on_detector(
model,
img_pil: Image.Image,
eps: float = 0.03,
device: Optional[torch.device] = None,
imgsz: Optional[int] = None, # None=自动对齐到 stride 倍数;也可传 640
gt_xywh: torch.Tensor | None = None, # letterbox坐标系下的目标框(可选)
target_cls: int = 2,
) -> Image.Image:
"""
Perform a single-step FGSM on a detection model (white-box).
- model: ultralytics.YOLO wrapper (or anything where get_torch_module_from_ultralytics works)
- img_pil: input PIL RGB
- eps: max per-pixel perturbation in [0,1] (L_inf)
Returns PIL image of adversarial example.
"""
device = device or (torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu"))
# get torch module
net = get_torch_module_from_ultralytics(model)
net = net.to(device).eval()
for p in net.parameters():
p.requires_grad_(False) # 建议:避免对参数求梯度
# (a) 原图 -> [1,3,H0,W0],随后先 detach 掉梯度
x_orig = pil_to_tensor(img_pil, device)
H0, W0 = x_orig.shape[-2:]
x_orig = x_orig.detach()
# (b) 可微 letterbox
s = _get_max_stride(net)
x_lb, meta = letterbox_tensor(x_orig, imgsz=imgsz, stride=s, fill=114/255.0)
x_lb = x_lb.clone().detach().requires_grad_(True)
# (c) 前向与你的损失
with torch.enable_grad():
preds = net(x_lb)
if isinstance(preds, (tuple, list)):
tensor_pred = next((p for p in preds if isinstance(p, torch.Tensor) and p.ndim >= 3), None)
if tensor_pred is None:
raise RuntimeError("模型 forward 返回了 tuple/list,但无法从中找到预测张量。")
preds = tensor_pred
loss = - preds_to_targeted_loss(
preds,
target_cls=target_cls,
gt_xywh=gt_xywh, # 直接传你的 list[dict]
topk=20,
kappa=0.1,
lambda_margin=1.0,
lambda_keep=0.2,
lambda_target=0.0, # 恢复 -p_t.mean() 的影响
debug=False,
meta=meta # 若 GT 是原图坐标,务必传 meta
)
# loss = - preds_to_confidence_sum(preds)
loss.backward()
# (d) FGSM 在 letterboxed 空间施扰
# FGSM update: x_adv = x + eps * sign(grad(loss wrt x))
with torch.no_grad():
adv_lb = (x_lb + eps * x_lb.grad.sign()).clamp(0, 1)
# 清理(单步可选;PGD循环时必做)
x_lb.grad = None
net.zero_grad(set_to_none=True)
# (e) 反映射回原图尺寸
adv_orig = unletterbox_to_original(adv_lb, meta, (H0, W0)).detach()
# (f) 转回 PIL
adv_pil = tensor_to_pil(adv_orig)
return adv_pil
def pgd_attack_on_detector(
model,
img_pil: Image.Image,
eps: float = 0.03, # L_inf 半径(输入在[0,1]域)
alpha: float = 0.007, # 步长
iters: int = 10,
device: Optional[torch.device] = None,
imgsz: Optional[int] = None, # None=自动对齐到 stride 倍数;也可传 640
gt_xywh: torch.Tensor | None = None, # letterbox坐标系下的目标框(可选)
target_cls: int = 2,
):
"""
在 YOLO 的 letterbox 域做 PGD,
迭代结束后把对抗样本映回原图大小并返回 PIL。
依赖你已实现的: pil_to_tensor, tensor_to_pil, letterbox_tensor, unletterbox_to_original, _get_max_stride
"""
device = device or (torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu"))
net = get_torch_module_from_ultralytics(model).to(device).eval()
# 仅对输入求梯度,冻结参数以省资源
for p in net.parameters():
p.requires_grad_(False)
# 原图 -> Tensor([1,3,H0,W0], [0,1])
x0 = pil_to_tensor(img_pil, device).detach()
H0, W0 = x0.shape[-2:]
# 可微 letterbox(等比缩放 + 对称 pad 到 stride 倍数)
s = _get_max_stride(net)
x_lb_orig, meta = letterbox_tensor(x0, imgsz=imgsz, stride=s, fill=114/255.0) # [1,3,S,S]
x = x_lb_orig.clone().detach().requires_grad_(True)
# targeted = True
# sign = -1.0 if targeted else 1.0 # 定向取负号,非定向取正号
for _ in range(iters):
# 前向 + 反向(需要梯度)
preds = net(x)
if isinstance(preds, (tuple, list)):
preds = next((p for p in preds if isinstance(p, torch.Tensor) and p.ndim >= 3), None)
if preds is None:
raise RuntimeError("模型 forward 返回 tuple/list,但未找到预测张量。")
loss = - preds_to_targeted_loss(
preds,
target_cls=target_cls,
gt_xywh=gt_xywh, # 直接传你的 list[dict]
topk=20,
kappa=0.1,
lambda_margin=1.0,
lambda_keep=0.2,
lambda_target=0.0, # 恢复 -p_t.mean() 的影响
debug=False,
meta=meta # 若 GT 是原图坐标,务必传 meta
)
# loss = - preds_to_confidence_sum(preds) # 我们希望置信度总和下降 → 最小化
loss.backward()
# 更新步与投影(不记录计算图)
with torch.no_grad():
x.add_(alpha * x.grad.sign())
# 投影到 L_inf 球: 通过裁剪 delta 更稳
delta = (x - x_lb_orig).clamp(-eps, eps)
x.copy_((x_lb_orig + delta).clamp(0.0, 1.0))
# 清理并设置下一步
x.grad = None
net.zero_grad(set_to_none=True)
x.requires_grad_(True)
# 反映射回原图尺寸
adv_orig = unletterbox_to_original(x.detach(), meta, (H0, W0)).detach()
return tensor_to_pil(adv_orig)
# ----- graceful fallback / demo noise if whitebox impossible -----
def demo_random_perturbation(img_pil: Image.Image, eps: float = 0.03) -> Image.Image:
"""Non-gradient demo perturbation used as fallback."""
arr = np.asarray(img_pil).astype(np.float32) / 255.0
noise = np.sign(np.random.randn(*arr.shape)).astype(np.float32)
adv = np.clip(arr + eps * noise, 0.0, 1.0)
adv_img = Image.fromarray((adv * 255).astype(np.uint8))
return adv_img