Spaces:
Runtime error
Runtime error
| # ================================================================ | |
| # === INT STRATEGIC BRAIN v4 — Enemy Analysis & Tab Integration === | |
| # ================================================================ | |
| # Paste this block inside your app. For UI, insert the Gradio tab block | |
| # within your existing `with gr.Blocks(...) as demo:` where other tabs appear. | |
| # ================================================================ | |
| import math | |
| import random | |
| import datetime | |
| import json | |
| from collections import Counter | |
| # ------------------------- | |
| # INTBrainV4 core extension | |
| # ------------------------- | |
| class INTBrainV4: | |
| def __init__(self, base=None, memory_file="int_brain_v4.json"): | |
| self.base = base | |
| self.memory_file = memory_file | |
| self.seed = 42 | |
| random.seed(self.seed) | |
| def _now(self): | |
| return datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC") | |
| def infer_intent(self, text): | |
| """Lightweight intent inference (doctrinal, non-operational).""" | |
| t = text.lower() | |
| intents = [] | |
| if any(k in t for k in ["recon","observe","surveil","watch"]): | |
| intents.append("Reconnaissance / Surveillance") | |
| if any(k in t for k in ["attack","ambush","assault","blast","gunfire","strike"]): | |
| intents.append("Direct Attack / Ambush") | |
| if any(k in t for k in ["probe","test","drill","feint"]): | |
| intents.append("Probe / Feint") | |
| if any(k in t for k in ["disrupt","sabotage","deny service","block"]): | |
| intents.append("Disruption / Sabotage") | |
| if not intents: | |
| intents.append("Unknown / Recon Required") | |
| return intents | |
| def extract_order_of_battle(self, text): | |
| """Very simple OOB extractor: counts actors, platforms, activity tokens.""" | |
| tokens = [w.strip(".,:;()[]\"'").lower() for w in text.split()] | |
| counts = Counter(tokens) | |
| # heuristic features to present | |
| features = { | |
| "actors_mentioned": sorted(set([w for w in tokens if len(w)>3]) )[:12], | |
| "mention_count": sum(v for k,v in counts.items() if len(k)>3), | |
| } | |
| return features | |
| def cluster_ttp(self, reports, threshold=0.33): | |
| """Group short TTP-like fragments into rough clusters (lexical heuristic).""" | |
| clusters = [] | |
| for r in reports: | |
| placed = False | |
| for c in clusters: | |
| if len(set(r.split()) & set(c["centroid"].split()))/max(1,len(set(r.split())|set(c["centroid"].split()))) > threshold: | |
| c["members"].append(r) | |
| # pick longest as centroid | |
| c["centroid"] = max(c["members"], key=lambda s: len(s.split())) | |
| placed = True | |
| break | |
| if not placed: | |
| clusters.append({"centroid": r, "members": [r]}) | |
| clusters.sort(key=lambda c: len(c["members"]), reverse=True) | |
| return clusters | |
| def enemy_analysis(self, text, extra_reports=None): | |
| """ | |
| High-level enemy analysis (doctrinal). Returns JSON-friendly dict: | |
| - inferred intent | |
| - OOB features | |
| - TTP clusters | |
| - quick doctrinal assessment (risk_level) | |
| """ | |
| res = {"timestamp": self._now(), "input": text} | |
| res["intent"] = self.infer_intent(text) | |
| res["oob"] = self.extract_order_of_battle(text) | |
| ttp_fragments = [] | |
| if extra_reports: | |
| for er in extra_reports: | |
| if isinstance(er, str): | |
| ttp_fragments.extend([s.strip() for s in er.splitlines() if s.strip()]) | |
| # include short sentences from text as potential TTP fragments | |
| ttp_fragments.extend([s.strip() for s in text.split(".") if s.strip()]) | |
| ttp_fragments = list(dict.fromkeys(ttp_fragments))[:60] | |
| res["ttp_clusters"] = self.cluster_ttp(ttp_fragments) | |
| # Simple, safe risk-level heuristic | |
| t = text.lower() | |
| if any(k in t for k in ["mass", "large", "suicide", "23 terrorists", "dozen", "multiple attackers", "armored"]): | |
| risk = "High" | |
| elif any(k in t for k in ["attack","ambush","firefight","assault","blast","gunfire"]): | |
| risk = "Medium-High" | |
| elif any(k in t for k in ["threat","probe","suspicious","riot","breach"]): | |
| risk = "Medium" | |
| else: | |
| risk = "Low" | |
| res["risk_level"] = risk | |
| # Human-friendly doctrinal blurb (non-operational) | |
| res["doctrinal_summary"] = ( | |
| f"At {res['timestamp']}: Inferred intent(s): {', '.join(res['intent'])}. " | |
| f"Risk level (doctrinal heuristic): {res['risk_level']}. " | |
| f"Top OOB markers: {', '.join(res['oob'].get('actors_mentioned',[])[:6]) or '[none]'}. " | |
| "TTP clusters and reasoning provided for planning-level assessment only." | |
| ) | |
| return res | |
| # wrapper instance and runner | |
| INTBrainV4 = INTBrainV4(base=globals().get("INTStrategicBrainV3", None)) | |
| def run_int_brain_v4(text, extra_reports=None): | |
| try: | |
| out = INTBrainV4.enemy_analysis(text, extra_reports=extra_reports or []) | |
| return out | |
| except Exception as e: | |
| return {"error": str(e), "timestamp": datetime.datetime.utcnow().isoformat()} | |
| # ------------------------- | |
| # UI: Gradio Tab block (paste inside your build_interface demo) | |
| # ------------------------- | |
| # Insert this block inside your with gr.Blocks(...) as demo: where other `with gr.Tab(...)` are defined. | |
| with gr.Tab("Enemy Analysis"): | |
| gr.Markdown(""" | |
| ### Enemy Analysis (Doctrinal) | |
| Paste reports, sightings, or chatter. The system produces a doctrinal, non-operational | |
| Enemy Analysis: inferred intent, order-of-battle markers, TTP clusters, and a planning-level risk indicator. | |
| """) | |
| enemy_input = gr.Textbox(label="Paste Enemy-Related Text / Logs / Reports", lines=6, | |
| placeholder="e.g., '23 attackers attacked convoy at 0300, RPGs observed, vehicles spotted near waypoint X'") | |
| extra_reports = gr.Textbox(label="Optional: Additional short reports (one per line)", lines=4, | |
| placeholder="Paste supporting notes or HUMINT fragments (optional).") | |
| enemy_run = gr.Button("Run Enemy Analysis") | |
| enemy_out_json = gr.JSON(label="Enemy Analysis (structured)") # shows JSON | |
| enemy_out_text = gr.Textbox(label="Enemy Analysis (human-friendly)", lines=12, interactive=False) | |
| def _enemy_handler(text, extras): | |
| txt = (text or "").strip() | |
| if not txt: | |
| return {"error":"No input provided."}, "No input provided." | |
| try: | |
| extra_list = [s for s in (extras or "").splitlines() if s.strip()] | |
| analysis = run_int_brain_v4(txt, extra_reports=extra_list) | |
| # Derive a polished, non-actionable brief if enrich available | |
| brief = None | |
| try: | |
| prompt = ( | |
| "You are a doctrinal security planner. Produce a concise, non-operational planning brief " | |
| "from this structured enemy analysis. Keep it high-level and non-actionable.\n\n" | |
| + json.dumps(analysis, indent=2, ensure_ascii=False) | |
| ) | |
| brief = enrich_with_llm(prompt, context_title="Enemy Analysis Brief") if "enrich_with_llm" in globals() else None | |
| except Exception: | |
| brief = None | |
| human = brief if brief else analysis.get("doctrinal_summary", "[No summary available]") | |
| # Register with director shared state for later fusion | |
| try: | |
| update_shared_state("Enemy Analysis", { | |
| "input": txt[:800], | |
| "summary": analysis.get("doctrinal_summary", "")[:800], | |
| "risk_level": analysis.get("risk_level", "Unknown"), | |
| "timestamp": analysis.get("timestamp", "") | |
| }) | |
| except Exception: | |
| pass | |
| return analysis, human | |
| except Exception as e: | |
| return {"error": str(e)}, f"[Error] {e}" | |
| enemy_run.click(_enemy_handler, inputs=[enemy_input, extra_reports], outputs=[enemy_out_json, enemy_out_text]) | |
| # ------------------------- | |
| # Director hook snippet (small) - safe to paste before your Director button handler | |
| # ------------------------- | |
| def director_collect_and_summarize(extra_context=""): | |
| """ | |
| Lightweight wrapper: aggregates shared_reports_state, calls LLM if available, | |
| and returns doctrinal consolidated brief (non-operational). | |
| """ | |
| try: | |
| reports = shared_reports_state.copy() if isinstance(shared_reports_state, dict) else {} | |
| if not reports: | |
| return "⚠️ No departmental reports available yet." | |
| compiled = "DIRECTOR CONSOLIDATED (DOCTRINAL)\nGenerated: " + datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC") + "\n\n" | |
| for dept, r in reports.items(): | |
| compiled += f"--- {dept} ---\n" | |
| if isinstance(r, dict): | |
| compiled += f"Risk: {r.get('risk_level','Unknown')}\nSummary: {r.get('summary','')[:600]}\n\n" | |
| else: | |
| compiled += str(r)[:800] + "\n\n" | |
| # Optional LLM polish (safe, non-operational) | |
| try: | |
| if "enrich_with_llm" in globals(): | |
| prompt = "Polish the following doctrinal consolidated report. Keep it non-operational.\n\n" + compiled + "\n\nContext: " + (extra_context or "") | |
| polished = enrich_with_llm(prompt, context_title="Director Consolidated Brief") | |
| return polished or compiled | |
| except Exception: | |
| pass | |
| return compiled | |
| except Exception as e: | |
| return f"[Director aggregation error] {e}" | |
| # ================================================================ | |
| # === END OF INT v4 Enemy Analysis block | |
| # ================================================================ | |