nyasukun's picture
.
b4e2f0d
import math, json
import gradio as gr
import torch, pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
# ZeroGPU support
try:
import spaces
ZEROGPU_AVAILABLE = True
print("ZeroGPU support enabled")
except ImportError:
ZEROGPU_AVAILABLE = False
print("ZeroGPU not available, running in standard mode")
# Create dummy decorator for local development
def spaces_gpu_decorator(duration=60):
def decorator(func):
return func
return decorator
spaces = type('spaces', (), {'GPU': spaces_gpu_decorator})
# Model configuration - Foundation-Sec-8B only
MODEL_NAME = "fdtn-ai/Foundation-Sec-8B"
# Initialize tokenizer and model using pipeline approach
print(f"Loading model: {MODEL_NAME}")
try:
print(f"Initializing Foundation-Sec-8B model...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
text_pipeline = pipeline(
"text-generation",
model=MODEL_NAME,
tokenizer=tokenizer,
torch_dtype=torch.bfloat16,
device_map="auto",
trust_remote_code=True
)
print(f"Foundation-Sec-8B model initialized successfully")
# Extract model and tokenizer from pipeline for direct access
model = text_pipeline.model
tok = text_pipeline.tokenizer
except Exception as e:
print(f"Error initializing Foundation-Sec-8B model: {str(e)}")
print("Trying with simplified parameters...")
try:
# Try with simpler parameters
text_pipeline = pipeline(
"text-generation",
model=MODEL_NAME,
trust_remote_code=True
)
model = text_pipeline.model
tok = text_pipeline.tokenizer
print(f"Foundation-Sec-8B model loaded with simplified parameters")
except Exception as e2:
print(f"Failed to load Foundation-Sec-8B model: {str(e2)}")
raise RuntimeError(f"Could not load Foundation-Sec-8B model. Please ensure the model is accessible and try again. Error: {str(e2)}")
# Log device information
if hasattr(model, 'device'):
print(f"Model loaded on device: {model.device}")
else:
device_info = next(model.parameters()).device
print(f"Model parameters on device: {device_info}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"CUDA device count: {torch.cuda.device_count()}")
print(f"Current CUDA device: {torch.cuda.current_device()}")
print(f"CUDA device name: {torch.cuda.get_device_name()}")
# Configuration parameters
LEN_ALPHA = 0.7 # Length correction factor (0=no correction, 1=full average logP)
# Sample data for testing
CAMPAIGN_LIST = [
"Operation Aurora",
"Dust Storm",
"ShadowHammer",
"NotPetya",
"SolarWinds",
]
ACTOR_LIST = ["APT1", "APT28", "APT33", "APT38", "FIN8"]
# Sample ATT&CK technique IDs with names
TECHNIQUE_LIST = [
"T1059 Command and Scripting Interpreter",
"T1566 Phishing",
"T1027 Obfuscated/Stored Files",
"T1036 Masquerading",
"T1105 Ingress Tool Transfer",
"T1018 Remote System Discovery",
"T1568 Dynamic Resolution",
]
@torch.no_grad()
def phrase_log_prob(prompt, phrase):
"""Calculate log probability of a phrase given a prompt using the language model."""
try:
# Log GPU usage information
device_info = next(model.parameters()).device
print(f"Running phrase_log_prob on device: {device_info}")
ids_prompt = tok(prompt, return_tensors="pt").to(model.device)["input_ids"][0]
ids_phrase = tok(phrase, add_special_tokens=False)["input_ids"]
lp = 0.0
cur = ids_prompt.unsqueeze(0)
for tid in ids_phrase:
logits = model(cur).logits[0, -1].float()
lp += torch.log_softmax(logits, -1)[tid].item()
cur = torch.cat([cur, torch.tensor([[tid]], device=model.device)], 1)
return lp
except Exception as e:
print(f"Error in phrase_log_prob: {e}")
raise e
def binary_assoc_score(prompt: str, phrase: str, neg="does NOT use", prompt_template="typically uses") -> float:
"""
Calculate binary association score: p ≈ P(use) / (P(use)+P(not use))
Applies length normalization to correct for longer phrases.
Args:
prompt: Base prompt string
phrase: Phrase to evaluate
neg: Negative template to replace positive template
prompt_template: Positive template to be replaced
Returns:
Length-normalized association score between 0 and 1
"""
lp_pos = phrase_log_prob(prompt, phrase)
lp_neg = phrase_log_prob(prompt.replace(prompt_template, neg), phrase)
# Logistic transformation
prob = 1 / (1 + math.exp(lp_neg - lp_pos))
# Length normalization
n_tok = len(tok(phrase, add_special_tokens=False)["input_ids"])
return prob / (n_tok ** LEN_ALPHA)
def campaign_actor_associations(campaigns, actors):
"""Campaign × Actor の関連度を計算し、各CampaignごとにTop Actorを返す"""
results = {}
for camp in campaigns:
prompt_base = CAMPAIGN_ACTOR_PROMPT.format(campaign=camp)
actor_scores = {}
for actor in actors:
score = binary_assoc_score(prompt_base, actor, neg="is NOT associated with")
actor_scores[actor] = score
# スコア順でソート
sorted_actors = sorted(actor_scores.items(), key=lambda x: x[1], reverse=True)
results[camp] = sorted_actors
return results
def campaign_technique_matrix(campaigns, techniques, prompt_template="typically uses", neg_template="typically does NOT use"):
"""
Generate Campaign × Technique association matrix using binary scoring.
Args:
campaigns: List of campaign names
techniques: List of technique names
prompt_template: Template for positive association
neg_template: Template for negative association
Returns:
DataFrame with campaigns as rows, techniques as columns, scores as values
"""
rows = {}
for camp in campaigns:
prompt_base = f"{camp} {prompt_template}"
rows[camp] = {
tech: binary_assoc_score(prompt_base, tech, neg=neg_template, prompt_template=prompt_template)
for tech in techniques
}
return pd.DataFrame.from_dict(rows, orient="index")
def campaign_actor_matrix(campaigns, actors):
"""Campaign × Actor 行列を生成"""
rows = {}
for camp in campaigns:
prompt_base = CAMPAIGN_ACTOR_PROMPT.format(campaign=camp)
rows[camp] = {
actor: binary_assoc_score(prompt_base, actor, neg="is NOT associated with")
for actor in actors
}
return pd.DataFrame.from_dict(rows, orient="index")
def campaign_actor_probs(campaigns, actors, prompt_template="is conducted by"):
"""
Generate Campaign × Actor probability matrix using softmax normalization.
Args:
campaigns: List of campaign names
actors: List of actor names
prompt_template: Template for actor association prompt
Returns:
DataFrame with campaigns as rows, actors as columns, probabilities as values
"""
rows = {}
for camp in campaigns:
prompt = f"{camp} {prompt_template}"
logps = [phrase_log_prob(prompt, a) for a in actors]
# Softmax normalization (with max-shift for numerical stability)
m = max(logps)
ps = [math.exp(lp - m) for lp in logps]
s = sum(ps)
rows[camp] = {a: p/s for a, p in zip(actors, ps)}
return pd.DataFrame.from_dict(rows, orient="index")
@spaces.GPU(duration=120)
def generate_actor_heatmap(c_list, a_list, actor_prompt_template):
"""Generate Campaign-Actor association heatmap with probability visualization."""
try:
campaigns = [c.strip() for c in c_list.split(",") if c.strip()]
actors = [a.strip() for a in a_list.split(",") if a.strip()]
if not campaigns or not actors:
fig, ax = plt.subplots(figsize=(8, 6))
ax.text(0.5, 0.5, 'Please enter both Campaigns and Actors',
ha='center', va='center', fontsize=16)
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
return fig
print(f"Processing {len(campaigns)} campaigns and {len(actors)} actors...")
print(f"Using prompt template: '{actor_prompt_template}'")
# Check GPU availability
if torch.cuda.is_available():
print(f"GPU computation enabled - Device: {torch.cuda.get_device_name()}")
else:
print("Running on CPU")
# Calculate probability matrix
df_ca = campaign_actor_probs(campaigns, actors, actor_prompt_template)
print(f"Actor probability matrix shape: {df_ca.shape}")
print("Actor probability matrix:")
print(df_ca.round(4))
# Create heatmap with matplotlib/seaborn
fig, ax = plt.subplots(figsize=(max(8, len(actors)*1.2), max(6, len(campaigns)*0.8)))
sns.heatmap(df_ca, annot=True, cmap='plasma', fmt='.3f',
cbar_kws={'label': 'P(actor)'}, ax=ax)
ax.set_title('Campaign-Actor Probabilities (softmax normalized)',
fontsize=14, pad=20)
ax.set_xlabel('Actor', fontsize=12)
ax.set_ylabel('Campaign', fontsize=12)
# Adjust label rotation
plt.setp(ax.get_xticklabels(), rotation=45, ha='right')
plt.setp(ax.get_yticklabels(), rotation=0)
plt.tight_layout()
print("Actor heatmap generated successfully!")
return fig
except Exception as e:
print(f"Error in generate_actor_heatmap: {e}")
import traceback
traceback.print_exc()
fig, ax = plt.subplots(figsize=(8, 6))
ax.text(0.5, 0.5, f'Error occurred: {str(e)}',
ha='center', va='center', fontsize=12, color='red')
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
return fig
@spaces.GPU(duration=120)
def generate_technique_heatmap(c_list, t_list, technique_prompt_template, technique_neg_template):
"""Generate Campaign-Technique association heatmap with binary scoring visualization."""
try:
campaigns = [c.strip() for c in c_list.split(",") if c.strip()]
techniques = [t.strip() for t in t_list.split(",") if t.strip()]
if not campaigns or not techniques:
fig, ax = plt.subplots(figsize=(8, 6))
ax.text(0.5, 0.5, 'Please enter both Campaigns and Techniques',
ha='center', va='center', fontsize=16)
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
return fig
print(f"Processing {len(campaigns)} campaigns and {len(techniques)} techniques...")
print(f"Using prompt templates: '{technique_prompt_template}' / '{technique_neg_template}'")
# Check GPU availability
if torch.cuda.is_available():
print(f"GPU computation enabled - Device: {torch.cuda.get_device_name()}")
else:
print("Running on CPU")
# Calculate score matrix
df_ct = campaign_technique_matrix(campaigns, techniques, technique_prompt_template, technique_neg_template)
print(f"Score matrix shape: {df_ct.shape}")
print("Score matrix:")
print(df_ct.round(4))
# Create heatmap with matplotlib/seaborn
fig, ax = plt.subplots(figsize=(max(8, len(techniques)*1.2), max(6, len(campaigns)*0.8)))
sns.heatmap(df_ct, annot=True, cmap='viridis', fmt='.3f',
cbar_kws={'label': 'Association Score'}, ax=ax)
ax.set_title('Campaign-Technique Associations (len-norm, independent)',
fontsize=14, pad=20)
ax.set_xlabel('Technique', fontsize=12)
ax.set_ylabel('Campaign', fontsize=12)
# Adjust label rotation
plt.setp(ax.get_xticklabels(), rotation=45, ha='right')
plt.setp(ax.get_yticklabels(), rotation=0)
plt.tight_layout()
print("Technique heatmap generated successfully!")
return fig
except Exception as e:
print(f"Error in generate_technique_heatmap: {e}")
import traceback
traceback.print_exc()
fig, ax = plt.subplots(figsize=(8, 6))
ax.text(0.5, 0.5, f'Error occurred: {str(e)}',
ha='center', va='center', fontsize=12, color='red')
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
return fig
with gr.Blocks(title="LLM Threat Graph Demo") as demo:
gr.Markdown("# 🕸️ LLM Threat Association Analysis\n*Visualizing Campaign-Actor-Technique relationships using Language Models*")
# Common inputs
with gr.Row():
campaigns = gr.Textbox(
"Operation Aurora, Dust Storm, ShadowHammer, NotPetya, SolarWinds",
label="Campaigns (comma-separated)",
placeholder="e.g., Operation Aurora, NotPetya, Stuxnet"
)
# Campaign-Actor section (probabilistic)
gr.Markdown("## 👤 Campaign-Actor Associations")
gr.Markdown("Visualizing Campaign-Actor relationships with probabilistic heatmaps")
gr.Markdown("""
**Calculation Method**: `P(actor | "{campaign} is conducted by") (softmax normalized)`
1. Calculate `phrase_log_prob("{campaign} is conducted by", actor)` for each Actor
2. Apply softmax normalization to create probability distribution (probabilities sum to 1.0 per Campaign)
3. Result: Shows relative likelihood of each Actor conducting each Campaign
""")
with gr.Row():
actor_prompt_template = gr.Textbox(
"is conducted by",
label="Actor Prompt Template",
placeholder="e.g., is conducted by, is attributed to"
)
actors = gr.Textbox(
"APT1, APT28, APT33, APT38, FIN8",
label="Actors (comma-separated)",
placeholder="e.g., APT1, Lazarus Group, Cozy Bear"
)
btn_actor = gr.Button("Generate Actor Heatmap", variant="primary")
plot_actor = gr.Plot(label="Campaign-Actor Heatmap")
btn_actor.click(
fn=generate_actor_heatmap,
inputs=[campaigns, actors, actor_prompt_template],
outputs=plot_actor,
show_progress=True
)
# Campaign-Technique section (independent scoring)
gr.Markdown("## 🛠️ Campaign-Technique Associations")
gr.Markdown("Visualizing Campaign-Technique relationships with independent association scores")
gr.Markdown("""
**Calculation Method**: `Binary Association Score (length-normalized, independent)`
1. For each Technique, calculate:
- `lp_pos = phrase_log_prob("{campaign} typically uses", technique)`
- `lp_neg = phrase_log_prob("{campaign} typically does NOT use", technique)`
2. Apply logistic transformation: `prob = 1 / (1 + exp(lp_neg - lp_pos))`
3. Length normalization: `score = prob / (n_tokens^0.7)` (penalty for longer phrases)
4. Result: Independent association scores (0-1) for each Campaign-Technique pair
""")
with gr.Row():
technique_prompt_template = gr.Textbox(
"typically uses",
label="Technique Prompt Template (positive)",
placeholder="e.g., typically uses, commonly employs"
)
technique_neg_template = gr.Textbox(
"typically does NOT use",
label="Technique Prompt Template (negative)",
placeholder="e.g., typically does NOT use, never employs"
)
techniques = gr.Textbox(
"T1059 Command and Scripting Interpreter, T1566 Phishing, T1027 Obfuscated/Stored Files, T1036 Masquerading, T1105 Ingress Tool Transfer, T1018 Remote System Discovery, T1568 Dynamic Resolution",
label="Techniques (comma-separated)",
placeholder="e.g., T1059 Command and Scripting Interpreter, T1566 Phishing"
)
btn_technique = gr.Button("Generate Technique Heatmap", variant="primary")
plot_technique = gr.Plot(label="Campaign-Technique Heatmap")
btn_technique.click(
fn=generate_technique_heatmap,
inputs=[campaigns, techniques, technique_prompt_template, technique_neg_template],
outputs=plot_technique,
show_progress=True
)
demo.launch()