Spaces:
Runtime error
Runtime error
""" | |
OSINT engine for username and person search. | |
""" | |
from typing import Dict, List, Any, Optional | |
import asyncio | |
from datetime import datetime | |
import json | |
import whois | |
from holehe.core import AsyncEngine | |
from holehe.localuseragent import ua | |
import subprocess | |
import tempfile | |
import os | |
import geopy | |
from geopy.geocoders import Nominatim | |
from geopy.exc import GeocoderTimedOut | |
class OSINTEngine: | |
def __init__(self): | |
self.holehe_engine = AsyncEngine() | |
self.geocoder = Nominatim(user_agent="osint_search") | |
async def search_username(self, username: str) -> Dict[str, Any]: | |
"""Search for username across platforms.""" | |
results = { | |
"platforms": [], | |
"emails": [], | |
"metadata": {} | |
} | |
# Holehe search | |
try: | |
holehe_results = await self.holehe_engine.check_all(username) | |
for result in holehe_results: | |
if result["exists"]: | |
results["platforms"].append({ | |
"name": result["name"], | |
"url": result["url"] if "url" in result else None, | |
"type": "social" if "social" in result["type"] else "other" | |
}) | |
if "email" in result and result["email"]: | |
results["emails"].append(result["email"]) | |
except Exception as e: | |
print(f"Holehe search error: {e}") | |
# Sherlock search using subprocess | |
try: | |
with tempfile.TemporaryDirectory() as temp_dir: | |
output_file = os.path.join(temp_dir, "sherlock_results.txt") | |
process = subprocess.Popen( | |
["sherlock", username, "--output", output_file], | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE | |
) | |
stdout, stderr = process.communicate(timeout=30) | |
if os.path.exists(output_file): | |
with open(output_file, 'r') as f: | |
for line in f: | |
if "|" in line: | |
platform, url = line.strip().split("|") | |
results["platforms"].append({ | |
"name": platform.strip(), | |
"url": url.strip(), | |
"type": "social" | |
}) | |
except Exception as e: | |
print(f"Sherlock search error: {e}") | |
# Deduplicate results | |
results["platforms"] = list({json.dumps(x) for x in results["platforms"]}) | |
results["platforms"] = [json.loads(x) for x in results["platforms"]] | |
results["emails"] = list(set(results["emails"])) | |
return results | |
async def search_person(self, name: str, location: Optional[str] = None, age: Optional[int] = None) -> Dict[str, Any]: | |
"""Search for person information.""" | |
results = { | |
"basic_info": {}, | |
"locations": [], | |
"social_profiles": [], | |
"metadata": {} | |
} | |
# Process location if provided | |
if location: | |
try: | |
location_info = self.geocoder.geocode(location, timeout=10) | |
if location_info: | |
results["locations"].append({ | |
"address": location_info.address, | |
"latitude": location_info.latitude, | |
"longitude": location_info.longitude | |
}) | |
except GeocoderTimedOut: | |
print("Geocoding timed out") | |
except Exception as e: | |
print(f"Geocoding error: {e}") | |
# Basic info | |
results["basic_info"] = { | |
"name": name, | |
"age": age if age else None, | |
"location": location if location else None | |
} | |
# Search for potential usernames | |
usernames = self._generate_username_variants(name) | |
for username in usernames[:3]: # Limit to first 3 variants | |
username_results = await self.search_username(username) | |
results["social_profiles"].extend(username_results["platforms"]) | |
# Deduplicate social profiles | |
results["social_profiles"] = list({json.dumps(x) for x in results["social_profiles"]}) | |
results["social_profiles"] = [json.loads(x) for x in results["social_profiles"]] | |
return results | |
def _generate_username_variants(self, name: str) -> List[str]: | |
"""Generate possible username variants from a name.""" | |
name = name.lower() | |
parts = name.split() | |
variants = [] | |
if len(parts) >= 2: | |
first, last = parts[0], parts[-1] | |
variants.extend([ | |
first + last, | |
first + "_" + last, | |
first + "." + last, | |
first[0] + last, | |
first + last[0], | |
last + first | |
]) | |
if len(parts) == 1: | |
variants.extend([ | |
parts[0], | |
parts[0] + "123", | |
"the" + parts[0] | |
]) | |
return list(set(variants)) | |
async def search_domain(self, domain: str) -> Dict[str, Any]: | |
"""Get information about a domain.""" | |
try: | |
domain_info = whois.whois(domain) | |
return { | |
"registrar": domain_info.registrar, | |
"creation_date": domain_info.creation_date, | |
"expiration_date": domain_info.expiration_date, | |
"last_updated": domain_info.updated_date, | |
"status": domain_info.status, | |
"name_servers": domain_info.name_servers, | |
"emails": domain_info.emails, | |
"raw": domain_info | |
} | |
except Exception as e: | |
return { | |
"error": str(e) | |
} | |