Spaces:
Running
on
Zero
Running
on
Zero
File size: 16,700 Bytes
9290385 bec1fe0 9290385 b4e2f0d 9290385 bec1fe0 9290385 bec1fe0 b4e2f0d bec1fe0 b4e2f0d bec1fe0 b4e2f0d 5b17871 b4e2f0d 5b17871 b4e2f0d 9290385 7f2ec60 9290385 7f2ec60 9290385 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 |
import math, json
import gradio as gr
import torch, pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
# ZeroGPU support
try:
import spaces
ZEROGPU_AVAILABLE = True
print("ZeroGPU support enabled")
except ImportError:
ZEROGPU_AVAILABLE = False
print("ZeroGPU not available, running in standard mode")
# Create dummy decorator for local development
def spaces_gpu_decorator(duration=60):
def decorator(func):
return func
return decorator
spaces = type('spaces', (), {'GPU': spaces_gpu_decorator})
# Model configuration - Foundation-Sec-8B only
MODEL_NAME = "fdtn-ai/Foundation-Sec-8B"
# Initialize tokenizer and model using pipeline approach
print(f"Loading model: {MODEL_NAME}")
try:
print(f"Initializing Foundation-Sec-8B model...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
text_pipeline = pipeline(
"text-generation",
model=MODEL_NAME,
tokenizer=tokenizer,
torch_dtype=torch.bfloat16,
device_map="auto",
trust_remote_code=True
)
print(f"Foundation-Sec-8B model initialized successfully")
# Extract model and tokenizer from pipeline for direct access
model = text_pipeline.model
tok = text_pipeline.tokenizer
except Exception as e:
print(f"Error initializing Foundation-Sec-8B model: {str(e)}")
print("Trying with simplified parameters...")
try:
# Try with simpler parameters
text_pipeline = pipeline(
"text-generation",
model=MODEL_NAME,
trust_remote_code=True
)
model = text_pipeline.model
tok = text_pipeline.tokenizer
print(f"Foundation-Sec-8B model loaded with simplified parameters")
except Exception as e2:
print(f"Failed to load Foundation-Sec-8B model: {str(e2)}")
raise RuntimeError(f"Could not load Foundation-Sec-8B model. Please ensure the model is accessible and try again. Error: {str(e2)}")
# Log device information
if hasattr(model, 'device'):
print(f"Model loaded on device: {model.device}")
else:
device_info = next(model.parameters()).device
print(f"Model parameters on device: {device_info}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"CUDA device count: {torch.cuda.device_count()}")
print(f"Current CUDA device: {torch.cuda.current_device()}")
print(f"CUDA device name: {torch.cuda.get_device_name()}")
# Configuration parameters
LEN_ALPHA = 0.7 # Length correction factor (0=no correction, 1=full average logP)
# Sample data for testing
CAMPAIGN_LIST = [
"Operation Aurora",
"Dust Storm",
"ShadowHammer",
"NotPetya",
"SolarWinds",
]
ACTOR_LIST = ["APT1", "APT28", "APT33", "APT38", "FIN8"]
# Sample ATT&CK technique IDs with names
TECHNIQUE_LIST = [
"T1059 Command and Scripting Interpreter",
"T1566 Phishing",
"T1027 Obfuscated/Stored Files",
"T1036 Masquerading",
"T1105 Ingress Tool Transfer",
"T1018 Remote System Discovery",
"T1568 Dynamic Resolution",
]
@torch.no_grad()
def phrase_log_prob(prompt, phrase):
"""Calculate log probability of a phrase given a prompt using the language model."""
try:
# Log GPU usage information
device_info = next(model.parameters()).device
print(f"Running phrase_log_prob on device: {device_info}")
ids_prompt = tok(prompt, return_tensors="pt").to(model.device)["input_ids"][0]
ids_phrase = tok(phrase, add_special_tokens=False)["input_ids"]
lp = 0.0
cur = ids_prompt.unsqueeze(0)
for tid in ids_phrase:
logits = model(cur).logits[0, -1].float()
lp += torch.log_softmax(logits, -1)[tid].item()
cur = torch.cat([cur, torch.tensor([[tid]], device=model.device)], 1)
return lp
except Exception as e:
print(f"Error in phrase_log_prob: {e}")
raise e
def binary_assoc_score(prompt: str, phrase: str, neg="does NOT use", prompt_template="typically uses") -> float:
"""
Calculate binary association score: p ≈ P(use) / (P(use)+P(not use))
Applies length normalization to correct for longer phrases.
Args:
prompt: Base prompt string
phrase: Phrase to evaluate
neg: Negative template to replace positive template
prompt_template: Positive template to be replaced
Returns:
Length-normalized association score between 0 and 1
"""
lp_pos = phrase_log_prob(prompt, phrase)
lp_neg = phrase_log_prob(prompt.replace(prompt_template, neg), phrase)
# Logistic transformation
prob = 1 / (1 + math.exp(lp_neg - lp_pos))
# Length normalization
n_tok = len(tok(phrase, add_special_tokens=False)["input_ids"])
return prob / (n_tok ** LEN_ALPHA)
def campaign_actor_associations(campaigns, actors):
"""Campaign × Actor の関連度を計算し、各CampaignごとにTop Actorを返す"""
results = {}
for camp in campaigns:
prompt_base = CAMPAIGN_ACTOR_PROMPT.format(campaign=camp)
actor_scores = {}
for actor in actors:
score = binary_assoc_score(prompt_base, actor, neg="is NOT associated with")
actor_scores[actor] = score
# スコア順でソート
sorted_actors = sorted(actor_scores.items(), key=lambda x: x[1], reverse=True)
results[camp] = sorted_actors
return results
def campaign_technique_matrix(campaigns, techniques, prompt_template="typically uses", neg_template="typically does NOT use"):
"""
Generate Campaign × Technique association matrix using binary scoring.
Args:
campaigns: List of campaign names
techniques: List of technique names
prompt_template: Template for positive association
neg_template: Template for negative association
Returns:
DataFrame with campaigns as rows, techniques as columns, scores as values
"""
rows = {}
for camp in campaigns:
prompt_base = f"{camp} {prompt_template}"
rows[camp] = {
tech: binary_assoc_score(prompt_base, tech, neg=neg_template, prompt_template=prompt_template)
for tech in techniques
}
return pd.DataFrame.from_dict(rows, orient="index")
def campaign_actor_matrix(campaigns, actors):
"""Campaign × Actor 行列を生成"""
rows = {}
for camp in campaigns:
prompt_base = CAMPAIGN_ACTOR_PROMPT.format(campaign=camp)
rows[camp] = {
actor: binary_assoc_score(prompt_base, actor, neg="is NOT associated with")
for actor in actors
}
return pd.DataFrame.from_dict(rows, orient="index")
def campaign_actor_probs(campaigns, actors, prompt_template="is conducted by"):
"""
Generate Campaign × Actor probability matrix using softmax normalization.
Args:
campaigns: List of campaign names
actors: List of actor names
prompt_template: Template for actor association prompt
Returns:
DataFrame with campaigns as rows, actors as columns, probabilities as values
"""
rows = {}
for camp in campaigns:
prompt = f"{camp} {prompt_template}"
logps = [phrase_log_prob(prompt, a) for a in actors]
# Softmax normalization (with max-shift for numerical stability)
m = max(logps)
ps = [math.exp(lp - m) for lp in logps]
s = sum(ps)
rows[camp] = {a: p/s for a, p in zip(actors, ps)}
return pd.DataFrame.from_dict(rows, orient="index")
@spaces.GPU(duration=120)
def generate_actor_heatmap(c_list, a_list, actor_prompt_template):
"""Generate Campaign-Actor association heatmap with probability visualization."""
try:
campaigns = [c.strip() for c in c_list.split(",") if c.strip()]
actors = [a.strip() for a in a_list.split(",") if a.strip()]
if not campaigns or not actors:
fig, ax = plt.subplots(figsize=(8, 6))
ax.text(0.5, 0.5, 'Please enter both Campaigns and Actors',
ha='center', va='center', fontsize=16)
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
return fig
print(f"Processing {len(campaigns)} campaigns and {len(actors)} actors...")
print(f"Using prompt template: '{actor_prompt_template}'")
# Check GPU availability
if torch.cuda.is_available():
print(f"GPU computation enabled - Device: {torch.cuda.get_device_name()}")
else:
print("Running on CPU")
# Calculate probability matrix
df_ca = campaign_actor_probs(campaigns, actors, actor_prompt_template)
print(f"Actor probability matrix shape: {df_ca.shape}")
print("Actor probability matrix:")
print(df_ca.round(4))
# Create heatmap with matplotlib/seaborn
fig, ax = plt.subplots(figsize=(max(8, len(actors)*1.2), max(6, len(campaigns)*0.8)))
sns.heatmap(df_ca, annot=True, cmap='plasma', fmt='.3f',
cbar_kws={'label': 'P(actor)'}, ax=ax)
ax.set_title('Campaign-Actor Probabilities (softmax normalized)',
fontsize=14, pad=20)
ax.set_xlabel('Actor', fontsize=12)
ax.set_ylabel('Campaign', fontsize=12)
# Adjust label rotation
plt.setp(ax.get_xticklabels(), rotation=45, ha='right')
plt.setp(ax.get_yticklabels(), rotation=0)
plt.tight_layout()
print("Actor heatmap generated successfully!")
return fig
except Exception as e:
print(f"Error in generate_actor_heatmap: {e}")
import traceback
traceback.print_exc()
fig, ax = plt.subplots(figsize=(8, 6))
ax.text(0.5, 0.5, f'Error occurred: {str(e)}',
ha='center', va='center', fontsize=12, color='red')
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
return fig
@spaces.GPU(duration=120)
def generate_technique_heatmap(c_list, t_list, technique_prompt_template, technique_neg_template):
"""Generate Campaign-Technique association heatmap with binary scoring visualization."""
try:
campaigns = [c.strip() for c in c_list.split(",") if c.strip()]
techniques = [t.strip() for t in t_list.split(",") if t.strip()]
if not campaigns or not techniques:
fig, ax = plt.subplots(figsize=(8, 6))
ax.text(0.5, 0.5, 'Please enter both Campaigns and Techniques',
ha='center', va='center', fontsize=16)
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
return fig
print(f"Processing {len(campaigns)} campaigns and {len(techniques)} techniques...")
print(f"Using prompt templates: '{technique_prompt_template}' / '{technique_neg_template}'")
# Check GPU availability
if torch.cuda.is_available():
print(f"GPU computation enabled - Device: {torch.cuda.get_device_name()}")
else:
print("Running on CPU")
# Calculate score matrix
df_ct = campaign_technique_matrix(campaigns, techniques, technique_prompt_template, technique_neg_template)
print(f"Score matrix shape: {df_ct.shape}")
print("Score matrix:")
print(df_ct.round(4))
# Create heatmap with matplotlib/seaborn
fig, ax = plt.subplots(figsize=(max(8, len(techniques)*1.2), max(6, len(campaigns)*0.8)))
sns.heatmap(df_ct, annot=True, cmap='viridis', fmt='.3f',
cbar_kws={'label': 'Association Score'}, ax=ax)
ax.set_title('Campaign-Technique Associations (len-norm, independent)',
fontsize=14, pad=20)
ax.set_xlabel('Technique', fontsize=12)
ax.set_ylabel('Campaign', fontsize=12)
# Adjust label rotation
plt.setp(ax.get_xticklabels(), rotation=45, ha='right')
plt.setp(ax.get_yticklabels(), rotation=0)
plt.tight_layout()
print("Technique heatmap generated successfully!")
return fig
except Exception as e:
print(f"Error in generate_technique_heatmap: {e}")
import traceback
traceback.print_exc()
fig, ax = plt.subplots(figsize=(8, 6))
ax.text(0.5, 0.5, f'Error occurred: {str(e)}',
ha='center', va='center', fontsize=12, color='red')
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
return fig
with gr.Blocks(title="LLM Threat Graph Demo") as demo:
gr.Markdown("# 🕸️ LLM Threat Association Analysis\n*Visualizing Campaign-Actor-Technique relationships using Language Models*")
# Common inputs
with gr.Row():
campaigns = gr.Textbox(
"Operation Aurora, Dust Storm, ShadowHammer, NotPetya, SolarWinds",
label="Campaigns (comma-separated)",
placeholder="e.g., Operation Aurora, NotPetya, Stuxnet"
)
# Campaign-Actor section (probabilistic)
gr.Markdown("## 👤 Campaign-Actor Associations")
gr.Markdown("Visualizing Campaign-Actor relationships with probabilistic heatmaps")
gr.Markdown("""
**Calculation Method**: `P(actor | "{campaign} is conducted by") (softmax normalized)`
1. Calculate `phrase_log_prob("{campaign} is conducted by", actor)` for each Actor
2. Apply softmax normalization to create probability distribution (probabilities sum to 1.0 per Campaign)
3. Result: Shows relative likelihood of each Actor conducting each Campaign
""")
with gr.Row():
actor_prompt_template = gr.Textbox(
"is conducted by",
label="Actor Prompt Template",
placeholder="e.g., is conducted by, is attributed to"
)
actors = gr.Textbox(
"APT1, APT28, APT33, APT38, FIN8",
label="Actors (comma-separated)",
placeholder="e.g., APT1, Lazarus Group, Cozy Bear"
)
btn_actor = gr.Button("Generate Actor Heatmap", variant="primary")
plot_actor = gr.Plot(label="Campaign-Actor Heatmap")
btn_actor.click(
fn=generate_actor_heatmap,
inputs=[campaigns, actors, actor_prompt_template],
outputs=plot_actor,
show_progress=True
)
# Campaign-Technique section (independent scoring)
gr.Markdown("## 🛠️ Campaign-Technique Associations")
gr.Markdown("Visualizing Campaign-Technique relationships with independent association scores")
gr.Markdown("""
**Calculation Method**: `Binary Association Score (length-normalized, independent)`
1. For each Technique, calculate:
- `lp_pos = phrase_log_prob("{campaign} typically uses", technique)`
- `lp_neg = phrase_log_prob("{campaign} typically does NOT use", technique)`
2. Apply logistic transformation: `prob = 1 / (1 + exp(lp_neg - lp_pos))`
3. Length normalization: `score = prob / (n_tokens^0.7)` (penalty for longer phrases)
4. Result: Independent association scores (0-1) for each Campaign-Technique pair
""")
with gr.Row():
technique_prompt_template = gr.Textbox(
"typically uses",
label="Technique Prompt Template (positive)",
placeholder="e.g., typically uses, commonly employs"
)
technique_neg_template = gr.Textbox(
"typically does NOT use",
label="Technique Prompt Template (negative)",
placeholder="e.g., typically does NOT use, never employs"
)
techniques = gr.Textbox(
"T1059 Command and Scripting Interpreter, T1566 Phishing, T1027 Obfuscated/Stored Files, T1036 Masquerading, T1105 Ingress Tool Transfer, T1018 Remote System Discovery, T1568 Dynamic Resolution",
label="Techniques (comma-separated)",
placeholder="e.g., T1059 Command and Scripting Interpreter, T1566 Phishing"
)
btn_technique = gr.Button("Generate Technique Heatmap", variant="primary")
plot_technique = gr.Plot(label="Campaign-Technique Heatmap")
btn_technique.click(
fn=generate_technique_heatmap,
inputs=[campaigns, techniques, technique_prompt_template, technique_neg_template],
outputs=plot_technique,
show_progress=True
)
demo.launch()
|