MOD_OSINT / src /streamlit_app.py
moddux's picture
Update src/streamlit_app.py
aa1662c verified
-import streamlit import streamlit as st
import importlib.util
import os
import json
import tempfile
from pathlib import Path
import inspect
import traceback
# --- EXISTING PIPELINE IMPORTS (keep these as in original app) ---
from modules.ingestion.ingest_data import run as ingest_run
from modules.preprocessing.preprocess_data import run as preprocess_run
from modules.ml_analysis.ml_analysis import run as ml_run
from modules.correlation.correlate_ioc import run as correlate_run
from modules.export.export_results import run as export_run
st.set_page_config(page_title="Modular OSINT Pipeline", layout="wide")
st.title("๐Ÿš€ Modular OSINT Pipeline Dashboard")
# --- PIPELINE WORKFLOW ---
def write_temp(data: dict) -> str:
f = tempfile.NamedTemporaryFile(delete=False, suffix=".json")
f.write(json.dumps(data).encode())
f.close()
return f.name
uploaded = st.file_uploader("Upload initial OSINTModuleInput JSON", type=["json"])
if uploaded:
init_input = json.load(uploaded)
st.session_state["input"] = init_input
if "input" in st.session_state:
st.markdown("### ๐Ÿ” Initial Input")
st.json(st.session_state["input"])
col1, col2 = st.columns(2)
with col1:
if st.button("Run Ingestion"):
path = write_temp(st.session_state["input"])
out = ingest_run(path)
st.session_state["ingest"] = json.loads(out.json())
if "ingest" in st.session_state:
st.markdown("#### Ingestion Output")
st.json(st.session_state["ingest"])
with col1:
if st.button("Run Preprocessing"):
prev = st.session_state.get("ingest", st.session_state["input"])
path = write_temp(prev)
out = preprocess_run(path)
st.session_state["preprocess"] = json.loads(out.json())
if "preprocess" in st.session_state:
st.markdown("#### Preprocessing Output")
st.json(st.session_state["preprocess"])
with col2:
if st.button("Run ML Analysis"):
prev = st.session_state.get("preprocess", st.session_state.get("ingest"))
path = write_temp(prev)
out = ml_run(path)
st.session_state["ml"] = json.loads(out.json())
if "ml" in st.session_state:
st.markdown("#### ML Analysis Output")
st.json(st.session_state["ml"])
with col2:
if st.button("Run Correlation"):
prev = st.session_state.get("ml", st.session_state.get("preprocess"))
path = write_temp(prev)
out = correlate_run(path)
st.session_state["correlate"] = json.loads(out.json())
if "correlate" in st.session_state:
st.markdown("#### Correlation Output")
st.json(st.session_state["correlate"])
if st.button("Run Export"):
prev = st.session_state.get("correlate", st.session_state.get("ml"))
path = write_temp(prev)
out = export_run(path)
st.session_state["export"] = json.loads(out.json())
if "export" in st.session_state:
st.markdown("#### Export Output")
st.json(st.session_state["export"])
st.download_button(
label="Download Exported Results",
data=json.dumps(st.session_state["export"], indent=2),
file_name="osint_export.json",
mime="application/json"
)
# --- MULTI-DIRECTORY MODULE LAUNCHER SECTION ---
st.sidebar.header("Standalone & Subdirectory Modules")
MODULES_DIR = Path("Modules")
MODULE_REGISTRY = MODULES_DIR / "module_registry.json"
def discover_py_modules(directory):
"""Recursively list .py scripts (excluding __init__.py) with their relative paths."""
py_modules = []
for root, dirs, files in os.walk(directory):
for f in files:
if f.endswith(".py") and f != "__init__.py":
rel_path = Path(root).relative_to(directory) / f
py_modules.append(rel_path)
return py_modules
def normalize_registry_key(path: Path) -> str:
return str(path.with_suffix('')).replace(os.sep, ".").lower()
def load_module_description(module_path):
"""Try to get description from registry or fallback to docstring."""
module_key = normalize_registry_key(module_path)
if MODULE_REGISTRY.exists():
with open(MODULE_REGISTRY) as regfile:
registry = json.load(regfile)
if module_key in registry:
return registry[module_key].get("description", "")
full_path = MODULES_DIR / module_path
if full_path.exists():
with open(full_path) as f:
first_line = f.readline()
if first_line.startswith("\"\"\"") or first_line.startswith("'''"):
delimiter = first_line[:3]
docstring = ""
while True:
l = f.readline()
if not l or l.startswith(delimiter):
break
docstring += l.strip() + " "
return docstring.strip()
return ""
def get_module_params(module_path):
"""Load osintmodule.json file if present for the module (same name, same directory)."""
param_path = (MODULES_DIR / module_path).with_suffix('.osintmodule.json')
if param_path.exists():
with open(param_path, "r") as f:
return json.load(f), str(param_path)
return None, None
def run_module(module_path, params=None):
"""
Dynamically import and run the main() function for any discovered .py module,
passing params if the signature supports it.
"""
full_path = MODULES_DIR / module_path
if not full_path.exists():
st.error(f"Module file not found: {module_path}")
return
mod_name = "mod_" + str(module_path).replace("/", "_").replace("\\", "_").replace(".py", "")
try:
spec = importlib.util.spec_from_file_location(mod_name, str(full_path))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
if hasattr(mod, "main"):
sig = inspect.signature(mod.main)
if params and len(sig.parameters) > 0:
mod.main(params)
else:
mod.main()
else:
st.warning(f"{module_path} does not have a main() function.")
except Exception as e:
st.error(f"Error running {module_path}: {e}")
st.exception(traceback.format_exc())
def save_params_json(params, param_path):
with open(param_path, "w") as f:
json.dump(params, f, indent=2)
py_modules = discover_py_modules(MODULES_DIR)
for rel_path in py_modules:
mod_label = str(rel_path)
mod_name = rel_path.stem
desc = load_module_description(rel_path)
with st.sidebar.expander(mod_label, expanded=False):
if desc:
st.info(desc)
params, param_path = get_module_params(rel_path)
param_input = None
if params is not None:
st.markdown("**Edit module parameters:**")
param_str = st.text_area(
"Parameters (JSON)",
value=json.dumps(params, indent=2),
key=f"params_{mod_label}",
height=200,
)
try:
param_input = json.loads(param_str)
st.success("Valid JSON")
except Exception as e:
st.error(f"Invalid JSON: {e}")
param_input = None
if st.button(f"Run {mod_label}", key=f"run_{mod_label}"):
st.write(f"## Running: {mod_label}")
if desc:
st.info(desc)
if param_input is not None and param_path:
save_params_json(param_input, param_path)
params = param_input
run_module(rel_path, params)
results_folders = [
(MODULES_DIR / rel_path.parent / "Results"),
(MODULES_DIR / "Data" / "Results"),
]
for results_dir in results_folders:
if results_dir.exists():
for f in results_dir.glob(f"{mod_name}*.*"):
with open(f, "rb") as fo:
st.download_button(
label=f"Download result: {f.name}",
data=fo,
file_name=f.name,
)
as st
-import importlib.util
-import os
-import json
-import tempfile
-from pathlib import Path
-
-# --- EXISTING PIPELINE IMPORTS (keep these as in original app) ---
-from modules.ingestion.ingest_data import run as ingest_run
-from modules.preprocessing.preprocess_data import run as preprocess_run
-from modules.ml_analysis.ml_analysis import run as ml_run
-from modules.correlation.correlate_ioc import run as correlate_run
-from modules.export.export_results import run as export_run
-
-st.set_page_config(page_title="Modular OSINT Pipeline", layout="wide")
-st.title("Modular OSINT Pipeline Dashboard")
-
-# --- PIPELINE WORKFLOW, unchanged ---
-def write_temp(data: dict) -> str:
- f = tempfile.NamedTemporaryFile(delete=False, suffix=".json")
- f.write(json.dumps(data).encode())
- f.close()
- return f.name
-
-uploaded = st.file_uploader("Upload initial OSINTModuleInput JSON", type=["json"])
-if uploaded:
- init_input = json.load(uploaded)
- st.session_state["input"] = init_input
-
-if "input" in st.session_state:
- st.markdown("### Initial Input")
- st.json(st.session_state["input"])
-
- col1, col2 = st.columns(2)
-
- with col1:
- if st.button("Run Ingestion"):
- path = write_temp(st.session_state["input"])
- out = ingest_run(path)
- st.session_state["ingest"] = json.loads(out.json())
- if "ingest" in st.session_state:
- st.markdown("#### Ingestion Output")
- st.json(st.session_state["ingest"])
-
- with col1:
- if st.button("Run Preprocessing"):
- prev = st.session_state.get("ingest", st.session_state["input"])
- path = write_temp(prev)
- out = preprocess_run(path)
- st.session_state["preprocess"] = json.loads(out.json())
- if "preprocess" in st.session_state:
- st.markdown("#### Preprocessing Output")
- st.json(st.session_state["preprocess"])
-
- with col2:
- if st.button(" Run ML Analysis"):
- prev = st.session_state.get("preprocess", st.session_state.get("ingest"))
- path = write_temp(prev)
- out = ml_run(path)
- st.session_state["ml"] = json.loads(out.json())
- if "ml" in st.session_state:
- st.markdown("#### ML Analysis Output")
- st.json(st.session_state["ml"])
-
- with col2:
- if st.button("Run Correlation"):
- prev = st.session_state.get("ml", st.session_state.get("preprocess"))
- path = write_temp(prev)
- out = correlate_run(path)
- st.session_state["correlate"] = json.loads(out.json())
- if "correlate" in st.session_state:
- st.markdown("#### Correlation Output")
- st.json(st.session_state["correlate"])
-
- if st.button("Run Export"):
- prev = st.session_state.get("correlate", st.session_state.get("ml"))
- path = write_temp(prev)
- out = export_run(path)
- st.session_state["export"] = json.loads(out.json())
- if "export" in st.session_state:
- st.markdown("#### Export Output")
- st.json(st.session_state["export"])
-
- # Export/download option for pipeline output
- st.download_button(
- label="Download Exported Results",
- data=json.dumps(st.session_state["export"], indent=2),
- file_name="osint_export.json",
- mime="application/json"
- )
-
-# --- STANDALONE MODULE LAUNCHER SECTION ---
-st.sidebar.header("Standalone Modules")
-
-MODULES_DIR = Path("Modules")
-MODULE_REGISTRY = MODULES_DIR / "module_registry.json"
-
-def discover_py_modules(directory):
- """List .py scripts in the given directory (non-recursive, excludes __init__.py)."""
- return [
- f for f in os.listdir(directory)
- if f.endswith(".py") and f != "__init__.py"
- ]
-
-def load_module_description(module_name):
- """Get description from registry or fallback to module docstring."""
- # Registry lookup
- if MODULE_REGISTRY.exists():
- with open(MODULE_REGISTRY) as regfile:
- registry = json.load(regfile)
- if module_name in registry:
- return registry[module_name].get("description", "")
- # Fallback: docstring from module file
- module_path = MODULES_DIR / f"{module_name}.py"
- if module_path.exists():
- with open(module_path) as f:
- first_line = f.readline()
- if first_line.startswith("\"\"\"") or first_line.startswith("'''"):
- docstring = first_line.strip().strip("\"'") + " "
- while True:
- l = f.readline()
- if not l or l.startswith("\"\"\"") or l.startswith("'''"):
- break
- docstring += l.strip() + " "
- return docstring.strip()
- return ""
-
-def get_module_params(module_name):
- """Load osintmodule.json file if present for the module."""
- param_path = MODULES_DIR / f"{module_name}.osintmodule.json"
- if param_path.exists():
- with open(param_path, "r") as f:
- return json.load(f), str(param_path)
- return None, None
-
-def run_module(module_name, params=None):
- """Dynamically import and run the main() function of a module, passing params if supported."""
- module_path = MODULES_DIR / f"{module_name}.py"
- if not module_path.exists():
- st.error(f"Module {module_name} not found.")
- return
- spec = importlib.util.spec_from_file_location(module_name, str(module_path))
- mod = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(mod)
- if hasattr(mod, "main"):
- # Try to pass params if main() supports it
- import inspect
- sig = inspect.signature(mod.main)
- if params and len(sig.parameters) > 0:
- mod.main(params)
- else:
- mod.main()
- else:
- st.warning(f"{module_name} does not have a main() function.")
-
-def save_params_json(params, param_path):
- # Overwrites the osintmodule.json file
- with open(param_path, "w") as f:
- json.dump(params, f, indent=2)
-
-py_modules = discover_py_modules(MODULES_DIR)
-for mod in py_modules:
- mod_name = mod[:-3]
- desc = load_module_description(mod_name)
- with st.sidebar.expander(mod_name, expanded=False):
- if desc:
- st.info(desc)
-
- # Parameter editor (if osintmodule.json exists)
- params, param_path = get_module_params(mod_name)
- param_input = None
- if params is not None:
- st.markdown("**Edit module parameters:**")
- param_str = st.text_area(
- "Parameters (JSON)",
- value=json.dumps(params, indent=2),
- key=f"params_{mod_name}",
- height=200
- )
- try:
- param_input = json.loads(param_str)
- st.success("Valid JSON")
- except Exception as e:
- st.error(f"Invalid JSON: {e}")
- param_input = None
-
- if st.button(f"Run {mod_name}", key=f"run_{mod_name}"):
- st.write(f"## Running: {mod_name}")
- if desc:
- st.info(desc)
- # Save edited params if changed
- if param_input is not None and param_path:
- save_params_json(param_input, param_path)
- params = param_input
- # Run the module (with params if possible)
- run_module(mod_name, params)
- # Look for output file to export (if your modules save output)
- output_files = list((MODULES_DIR / "Data" / "Results").glob(f"{mod_name}*.*"))
- if output_files:
- for f in output_files:
- with open(f, "rb") as fo:
- st.download_button(
- label=f"Download result: {f.name}",
- data=fo,
- file_name=f.name
- )
+import streamlit as st
+import importlib.util
+import os
+import json
+import tempfile
+from pathlib import Path
+import inspect
+
+# --- EXISTING PIPELINE IMPORTS (keep these as in original app) ---
+from modules.ingestion.ingest_data import run as ingest_run
+from modules.preprocessing.preprocess_data import run as preprocess_run
+from modules.ml_analysis.ml_analysis import run as ml_run
+from modules.correlation.correlate_ioc import run as correlate_run
+from modules.export.export_results import run as export_run
+
+st.set_page_config(page_title="Modular OSINT Pipeline", layout="wide")
+st.title("๐Ÿš€ Modular OSINT Pipeline Dashboard")
+
+# --- PIPELINE WORKFLOW, unchanged ---
+def write_temp(data: dict) -> str:
+ f = tempfile.NamedTemporaryFile(delete=False, suffix=".json")
+ f.write(json.dumps(data).encode())
+ f.close()
+ return f.name
+
+uploaded = st.file_uploader("Upload initial OSINTModuleInput JSON", type=["json"])
+if uploaded:
+ init_input = json.load(uploaded)
+ st.session_state["input"] = init_input
+
+if "input" in st.session_state:
+ st.markdown("### ๐Ÿ” Initial Input")
+ st.json(st.session_state["input"])
+
+ col1, col2 = st.columns(2)
+
+ with col1:
+ if st.button("Run Ingestion"):
+ path = write_temp(st.session_state["input"])
+ out = ingest_run(path)
+ st.session_state["ingest"] = json.loads(out.json())
+ if "ingest" in st.session_state:
+ st.markdown("#### Ingestion Output")
+ st.json(st.session_state["ingest"])
+
+ with col1:
+ if st.button(" Run Preprocessing"):
+ prev = st.session_state.get("ingest", st.session_state["input"])
+ path = write_temp(prev)
+ out = preprocess_run(path)
+ st.session_state["preprocess"] = json.loads(out.json())
+ if "preprocess" in st.session_state:
+ st.markdown("#### Preprocessing Output")
+ st.json(st.session_state["preprocess"])
+
+ with col2:
+ if st.button("Run ML Analysis"):
+ prev = st.session_state.get("preprocess", st.session_state.get("ingest"))
+ path = write_temp(prev)
+ out = ml_run(path)
+ st.session_state["ml"] = json.loads(out.json())
+ if "ml" in st.session_state:
+ st.markdown("#### ML Analysis Output")
+ st.json(st.session_state["ml"])
+
+ with col2:
+ if st.button("Run Correlation"):
+ prev = st.session_state.get("ml", st.session_state.get("preprocess"))
+ path = write_temp(prev)
+ out = correlate_run(path)
+ st.session_state["correlate"] = json.loads(out.json())
+ if "correlate" in st.session_state:
+ st.markdown("#### Correlation Output")
+ st.json(st.session_state["correlate"])
+
+ if st.button("Run Export"):
+ prev = st.session_state.get("correlate", st.session_state.get("ml"))
+ path = write_temp(prev)
+ out = export_run(path)
+ st.session_state["export"] = json.loads(out.json())
+ if "export" in st.session_state:
+ st.markdown("#### Export Output")
+ st.json(st.session_state["export"])
+ st.download_button(
+ label="Download Exported Results",
+ data=json.dumps(st.session_state["export"], indent=2),
+ file_name="osint_export.json",
+ mime="application/json"
+ )
+
+# --- MULTI-DIRECTORY MODULE LAUNCHER SECTION ---
+st.sidebar.header("Standalone & Subdirectory Modules")
+
+MODULES_DIR = Path("Modules")
+MODULE_REGISTRY = MODULES_DIR / "module_registry.json"
+
+def discover_py_modules(directory):
+ """Recursively list .py scripts (excluding __init__.py) with their relative paths."""
+ py_modules = []
+ for root, dirs, files in os.walk(directory):
+ for f in files:
+ if f.endswith(".py") and f != "__init__.py":
+ rel_path = Path(root).relative_to(directory) / f
+ py_modules.append(rel_path)
+ return py_modules
+
+def load_module_description(module_path):
+ """Try to get description from registry or fallback to docstring."""
+ module_name = str(module_path.with_suffix('')).replace(os.sep, ".")
+ # Registry lookup (top-level modules only)
+ if MODULE_REGISTRY.exists():
+ with open(MODULE_REGISTRY) as regfile:
+ registry = json.load(regfile)
+ if module_name in registry:
+ return registry[module_name].get("description", "")
+ # Fallback: docstring from module file
+ full_path = MODULES_DIR / module_path
+ if full_path.exists():
+ with open(full_path) as f:
+ first_line = f.readline()
+ if first_line.startswith("\"\"\"") or first_line.startswith("'''"):
+ delimiter = first_line[:3]
+ docstring = ""
+ while True:
+ l = f.readline()
+ if not l or l.startswith(delimiter):
+ break
+ docstring += l.strip() + " "
+ return docstring.strip()
+ return ""
+
+def get_module_params(module_path):
+ """Load osintmodule.json file if present for the module (same name, same directory)."""
+ param_path = (MODULES_DIR / module_path).with_suffix('.osintmodule.json')
+ if param_path.exists():
+ with open(param_path, "r") as f:
+ return json.load(f), str(param_path)
+ return None, None
+
+def run_module(module_path, params=None):
+ """
+ Dynamically import and run the main() function for any discovered .py module,
+ passing params if the signature supports it.
+ """
+ full_path = MODULES_DIR / module_path
+ if not full_path.exists():
+ st.error(f"Module file not found: {module_path}")
+ return
+ mod_name = "mod_" + str(module_path).replace("/", "_").replace("\\", "_").replace(".py", "")
+ spec = importlib.util.spec_from_file_location(mod_name, str(full_path))
+ mod = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(mod)
+ if hasattr(mod, "main"):
+ sig = inspect.signature(mod.main)
+ if params and len(sig.parameters) > 0:
+ mod.main(params)
+ else:
+ mod.main()
+ else:
+ st.warning(f"{module_path} does not have a main() function.")
+
+def save_params_json(params, param_path):
+ with open(param_path, "w") as f:
+ json.dump(params, f, indent=2)
+
+py_modules = discover_py_modules(MODULES_DIR)
+for rel_path in py_modules:
+ mod_label = str(rel_path)
+ mod_name = rel_path.stem
+ desc = load_module_description(rel_path)
+ with st.sidebar.expander(mod_label, expanded=False):
+ if desc:
+ st.info(desc)
+ params, param_path = get_module_params(rel_path)
+ param_input = None
+ if params is not None:
+ st.markdown("**Edit module parameters:**")
+ param_str = st.text_area(
+ "Parameters (JSON)",
+ value=json.dumps(params, indent=2),
+ key=f"params_{mod_label}",
+ height=200,
+ )
+ try:
+ param_input = json.loads(param_str)
+ st.success("Valid JSON")
+ except Exception as e:
+ st.error(f"Invalid JSON: {e}")
+ param_input = None
+ if st.button(f"Run {mod_label}", key=f"run_{mod_label}"):
+ st.write(f"## Running: {mod_label}")
+ if desc:
+ st.info(desc)
+ if param_input is not None and param_path:
+ save_params_json(param_input, param_path)
+ params = param_input
+ run_module(rel_path, params)
+ # Try to find and offer downloads for any result files in this module's directory or a shared results folder
+ results_folders = [
+ (MODULES_DIR / rel_path.parent / "Results"),
+ (MODULES_DIR / "Data" / "Results"),
+ ]
+ for results_dir in results_folders:
+ if results_dir.exists():
+ for f in results_dir.glob(f"{mod_name}*.*"):
+ with open(f, "rb") as fo:
+ st.download_button(
+ label=f"Download result: {f.name}",
+ data=fo,
+ file_name=f.name,
+ )