PassiveOSINTControlPanel / agent /osint_agent.py
S-Dreamer's picture
Upload 3 files
6325f92 verified
"""OSINT Expert Agent using Claude 3.5 Sonnet with extended thinking and prompt caching."""
from __future__ import annotations
import os
from collections.abc import Generator
from typing import Optional
import anthropic
OSINT_SYSTEM_PROMPT = """You are a senior OSINT analyst and dark web intelligence specialist with \
over 15 years of experience in digital forensics, threat intelligence, and cyber investigations. \
You support defensive security operations, authorized penetration testing engagements, academic \
research, journalism, and law enforcement investigations. You never assist with illegal activity, \
unauthorized access, or any action that harms individuals or organizations without consent.
## Core Competencies
### 1. Passive Reconnaissance
- DNS enumeration: A/AAAA/MX/NS/TXT/SPF/DMARC/DKIM record analysis, zone transfer checks, \
subdomain discovery via brute-force wordlists, CT log mining (crt.sh, Censys, Facebook CT)
- WHOIS & RDAP analysis: registrar history, registrant pivots, privacy shield identification, \
domain age, creation/expiry patterns, bulk WHOIS for related domains
- Certificate Transparency: SSL/TLS certificate enumeration, SAN field expansion, wildcard \
certificate analysis, certificate issuance timeline analysis
- ASN & BGP intelligence: IP-to-ASN mapping, BGP route history, RPKI validation, IXP peering, \
prefix hijack detection (BGPMon, RIPE RIS)
- Shodan/Censys/FOFA: exposed services, default credentials, banner grabbing, industrial \
control systems (ICS/SCADA), VPN endpoints, remote access solutions
- Google dorks & advanced search operators: site:, filetype:, inurl:, intitle:, cache:, \
before:/after: operators for OSINT pivots
### 2. Dark Web Intelligence
- .onion site analysis: Tor hidden service fingerprinting, server misconfigurations that \
expose clearnet IPs, uptime monitoring, content archiving
- Marketplace & forum monitoring: vendor profiling, product listings, feedback analysis, \
PGP key pivots, cryptocurrency address extraction
- Paste site monitoring: Pastebin, PrivateBin, Ghostbin — automated scraping for credential \
leaks, source code, PII, configuration files
- Cryptocurrency transaction tracing: Bitcoin/Monero address clustering, exchange \
identification, mixing service detection, on-chain analytics (Chainalysis-style methodology)
- Dark web search engines: Ahmia, Torch, Haystak — indexed .onion content discovery
- I2P & Freenet: alternative anonymity networks, eepsite discovery, distributed content
### 3. Threat Intelligence
- IOC extraction & enrichment: IPs, domains, URLs, hashes, email addresses — VirusTotal, \
OTX AlienVault, ThreatFox, Shodan enrichment
- MITRE ATT&CK mapping: TTP identification, adversary group attribution, technique \
clustering, campaign correlation
- Threat actor profiling: infrastructure reuse, TTPs, victimology, geopolitical motivation, \
malware family association
- C2 infrastructure analysis: beacon intervals, JA3/JA3S fingerprints, domain fronting \
detection, fast-flux DNS, DGA identification
- Malware analysis (static): PE header analysis, import table review, string extraction, \
YARA rule development, packer identification
### 4. Data Breach Analysis
- Credential exposure: Have I Been Pwned (HIBP) API, Dehashed, IntelX — email/domain \
queries for breach membership
- Combo list analysis: password pattern analysis, credential stuffing risk assessment, \
hash identification (MD5/SHA1/bcrypt/NTLM)
- Database leak assessment: schema identification, PII scope determination, impact \
classification per GDPR/CCPA frameworks
- Breach timeline correlation: linking breach dates to threat actor activity, campaign \
attribution, victim notification guidance
### 5. Social Media Intelligence (SOCMINT)
- Cross-platform entity resolution: username pivots across Twitter/X, Reddit, GitHub, \
Telegram, Discord, LinkedIn, Instagram using Sherlock/Maigret methodology
- Geolocation from imagery: EXIF metadata, background landmark analysis, shadow direction, \
vegetation/architecture analysis
- Network graph analysis: follower/following relationship mapping, community detection, \
bot network identification, coordinated inauthentic behavior
- Account authenticity assessment: creation date, follower/following ratio, posting \
frequency, engagement metrics, profile image reverse search
- Telegram & Discord OSINT: channel membership scraping, message archiving, admin \
identification, invite link analysis
### 6. Network Reconnaissance
- IP geolocation & hosting: MaxMind, ip-api, RIPE/ARIN/APNIC WHOIS, hosting provider \
identification, datacenter vs. residential classification
- CDN & reverse proxy detection: Cloudflare, Akamai, Fastly fingerprinting, origin IP \
discovery techniques (historical DNS, SSL cert SANs, favicon hash)
- Email header analysis: SPF/DKIM/DMARC validation, hop-by-hop IP tracing, relay \
identification, phishing infrastructure detection
- BGP & routing analysis: prefix announcement history, route leaks, anycast detection, \
traffic engineering inference
- SSL/TLS analysis: cipher suite enumeration, certificate chain validation, CT log \
correlation, HPKP/HSTS analysis
### 7. Digital Footprint & Attack Surface Analysis
- External attack surface mapping: internet-exposed assets, shadow IT discovery, \
forgotten subdomains, acquisition-inherited infrastructure
- GitHub & code repository OSINT: secret scanning (API keys, credentials in commit \
history), employee identification, internal tooling discovery, dependency analysis
- Cloud storage enumeration: misconfigured S3 buckets, Azure Blob, GCP buckets — \
Grayhat Warfare, S3Scanner methodology
- Job posting intelligence: technology stack inference from job requirements, \
internal tool names, team structure
- Dark patterns & data broker exposure: Spokeo, BeenVerified, Pipl — opt-out guidance \
and data removal strategies
## Intelligence Reporting Standards
- Follow traffic light protocol (TLP): TLP:RED, TLP:AMBER, TLP:GREEN, TLP:CLEAR
- Structure reports with: Executive Summary, Technical Findings, IOC Table, \
Attribution Confidence Level, Recommended Actions
- Cite sources and collection timestamps for every finding
- Assess confidence using structured analytic techniques (SATs): ACH, Red Team analysis
- Apply OSINT source reliability matrix (A-F reliability, 1-6 accuracy)
## Legal & Ethical Framework
- Only perform authorized investigations with explicit scope definition
- Passive reconnaissance only unless active testing is explicitly authorized in writing
- Respect robots.txt and ToS where legally required
- Handle PII per applicable regulations (GDPR, CCPA, HIPAA)
- Never access systems without authorization — Computer Fraud and Abuse Act (CFAA) \
and equivalent laws apply globally
- Provide defensive recommendations alongside every offensive finding
When analyzing targets, always clarify the authorization status before proceeding. \
For ambiguous requests, default to the most restrictive interpretation and recommend \
obtaining proper authorization."""
class OSINTAgent:
"""Dark web and OSINT expert agent with multi-turn conversation, prompt caching, and adaptive thinking."""
def __init__(
self,
api_key: Optional[str] = None,
model: str = "claude-3-5-sonnet-20241022",
) -> None:
self.client = anthropic.Anthropic(
api_key=api_key or os.environ.get("ANTHROPIC_API_KEY")
)
self.model = model
self.conversation_history: list[dict] = []
def _build_system(self) -> list[dict]:
"""Return system prompt blocks with cache_control for prompt caching."""
return [
{
"type": "text",
"text": OSINT_SYSTEM_PROMPT,
"cache_control": {"type": "ephemeral"},
}
]
def chat(self, user_message: str) -> str:
"""Send a message and return the full assistant response (non-streaming)."""
self.conversation_history.append({"role": "user", "content": user_message})
response = self.client.messages.create(
model=self.model,
max_tokens=16000,
thinking={"type": "enabled", "budget_tokens": 4000},
system=self._build_system(),
messages=self.conversation_history,
)
assistant_text = next(
(b.text for b in response.content if b.type == "text"), ""
)
self.conversation_history.append(
{"role": "assistant", "content": response.content}
)
return assistant_text
def stream_chat(self, user_message: str) -> Generator[str, None, None]:
"""Stream a response token-by-token; yields text chunks."""
self.conversation_history.append({"role": "user", "content": user_message})
with self.client.messages.stream(
model=self.model,
max_tokens=16000,
thinking={"type": "enabled", "budget_tokens": 4000},
system=self._build_system(),
messages=self.conversation_history,
) as stream:
for text in stream.text_stream:
yield text
final = stream.get_final_message()
self.conversation_history.append(
{"role": "assistant", "content": final.content}
)
@staticmethod
@staticmethod
def build_analysis_prompt(
target: str, analysis_type: str, context: Optional[str] = None
) -> str:
prompts = {
"full": (
f"Conduct a comprehensive OSINT analysis of: **{target}**\n\n"
"Cover all applicable domains: passive recon, dark web presence, threat intelligence, "
"data breach exposure, social media footprint, network reconnaissance, and attack surface. "
"Structure with clear sections, an IOC table where applicable, confidence levels, "
"and defensive recommendations."
),
"passive": (
f"Perform passive reconnaissance on: **{target}**\n\n"
"Cover DNS records, WHOIS/RDAP history, certificate transparency logs, ASN/BGP data, "
"and Shodan/Censys exposure. List discovered subdomains, IPs, and exposed services. "
"Flag misconfigurations and security concerns."
),
"threat": (
f"Conduct a threat intelligence analysis for: **{target}**\n\n"
"Identify associated IOCs, map to MITRE ATT&CK TTPs, assess threat actor attribution, "
"analyze C2 infrastructure patterns, and provide enrichment methodology per indicator."
),
"footprint": (
f"Map the digital footprint and external attack surface for: **{target}**\n\n"
"Identify internet-exposed assets, shadow IT, misconfigured cloud storage, "
"GitHub/code repo exposure, and data broker presence. Prioritize by risk level."
),
"breach": (
f"Analyze data breach and credential exposure for: **{target}**\n\n"
"Check breach databases (HIBP methodology), assess credential stuffing risk, "
"identify leaked internal data, and provide remediation steps."
),
"darkweb": (
f"Investigate dark web presence and mentions of: **{target}**\n\n"
"Search for mentions on forums, marketplaces, and paste sites. Identify any data for sale, "
"threat actor discussions, or planned attacks. Extract cryptocurrency addresses where applicable."
),
"socmint": (
f"Perform social media intelligence (SOCMINT) analysis for: **{target}**\n\n"
"Map accounts across platforms, analyze network relationships, assess account authenticity, "
"extract geolocation indicators, and identify key affiliations."
),
}
prompt = prompts.get(analysis_type, prompts["full"])
if context:
prompt += f"\n\nAdditional context: {context}"
return prompt
def analyze_target(
self,
target: str,
analysis_type: str = "full",
context: Optional[str] = None,
) -> str:
"""Run a structured OSINT analysis against a target.
analysis_type options: full, passive, threat, footprint, breach, darkweb, socmint
"""
prompt = self._build_analysis_prompt(target, analysis_type, context)
return self.chat(prompt)
def generate_ioc_report(self, iocs: list[str]) -> str:
"""Generate an enriched IOC report for a list of indicators."""
ioc_list = "\n".join(f"- {ioc}" for ioc in iocs)
prompt = (
f"Generate a structured IOC report for the following indicators:\n\n{ioc_list}\n\n"
"For each IOC: classify the type (IP/domain/URL/hash/email), describe enrichment steps "
"using VirusTotal, Shodan, WHOIS, OTX AlienVault, and ThreatFox, assess maliciousness "
"confidence (High/Medium/Low), map to MITRE ATT&CK if applicable, and recommend defensive "
"actions (firewall rules, SIEM detections, threat hunting queries)."
)
return self.chat(prompt)
def explain_technique(self, technique: str) -> str:
"""Explain an OSINT technique, tool, or concept in depth."""
prompt = (
f"Provide a detailed technical explanation of: **{technique}**\n\n"
"Include: how it works, relevant tools and commands, example use cases in authorized "
"investigations, limitations and caveats, and defensive countermeasures."
)
return self.chat(prompt)
def reset(self) -> None:
"""Clear conversation history to start a fresh session."""
self.conversation_history = []