Spaces:
Running
Running
| import os | |
| import re | |
| import json | |
| import hashlib | |
| import datetime | |
| from typing import List, Dict, Any, Tuple | |
| # Constants | |
| MAX_FILES_DEFAULT = 200 | |
| MAX_FILES_UPPER_BOUND = 1000 | |
| SENSITIVE_PATH_PATTERNS = [ | |
| re.compile(r"(^|[\\/_.-])secret([\\/_.-]|s?$)", re.IGNORECASE), | |
| re.compile(r"(^|[\\/_.-])credentials?([\\/_.-]|$)", re.IGNORECASE), | |
| re.compile(r"(^|[\\/_.-])private[_-]?key([\\/_.-]|$)", re.IGNORECASE), | |
| re.compile(r"(^|[\\/])\.env(\.[\w-]+)?$", re.IGNORECASE), | |
| re.compile(r"(^|[\\/_.-])id_rsa([\\/_.-]|$)", re.IGNORECASE), | |
| re.compile(r"(^|[\\/])auth[_-]?token([\\/_.-]|$)", re.IGNORECASE), | |
| re.compile(r"(^|[\\/])bearer[_-]?token([\\/_.-]|$)", re.IGNORECASE), | |
| re.compile(r"(^|[\\/])access[_-]?token([\\/_.-]|$)", re.IGNORECASE), | |
| re.compile(r"(^|[\\/])api[_-]?token([\\/_.-]|$)", re.IGNORECASE), | |
| ] | |
| LESSON_PATTERNS = [ | |
| re.compile(r"\b(always|never|don'?t|do not|make sure|remember to|note:|caveat:|warning:)\b[^.\n]{10,200}[.!\n]", re.IGNORECASE), | |
| re.compile(r"\b(prefer|avoid)\s[^.\n]{10,200}[.!\n]", re.IGNORECASE), | |
| ] | |
| def generate_id(prefix: str) -> str: | |
| import uuid | |
| return f"{prefix}_{uuid.uuid4().hex}" | |
| def fingerprint_id(prefix: str, content: str) -> str: | |
| # Hash content for stable ID (similar to TS fingerprintId) | |
| h = hashlib.sha256(content.strip().encode("utf-8")).hexdigest() | |
| return f"{prefix}_{h[:32]}" | |
| def is_sensitive(path: str) -> bool: | |
| return any(pattern.search(path) for pattern in SENSITIVE_PATH_PATTERNS) | |
| def derive_project(cwd: str) -> str: | |
| if not cwd: | |
| return "unknown" | |
| parts = [p for p in re.split(r'[\\/]', cwd) if p] | |
| return parts[-1] if parts else "unknown" | |
| def to_text(content: Any) -> str: | |
| if isinstance(content, str): | |
| return content | |
| if not isinstance(content, list): | |
| return "" | |
| parts = [] | |
| for item in content: | |
| if isinstance(item, dict) and item.get("type") == "text" and isinstance(item.get("text"), str): | |
| parts.append(item["text"]) | |
| return "\n".join(parts) | |
| def extract_tool_uses(content: Any) -> List[Dict[str, Any]]: | |
| if not isinstance(content, list): | |
| return [] | |
| out = [] | |
| for item in content: | |
| if isinstance(item, dict) and item.get("type") == "tool_use": | |
| out.append({ | |
| "id": item.get("id", ""), | |
| "name": item.get("name", "unknown"), | |
| "input": item.get("input") | |
| }) | |
| return out | |
| def extract_tool_results(content: Any) -> List[Dict[str, Any]]: | |
| if not isinstance(content, list): | |
| return [] | |
| out = [] | |
| for item in content: | |
| if isinstance(item, dict) and item.get("type") == "tool_result": | |
| out.append({ | |
| "toolUseId": item.get("tool_use_id", ""), | |
| "output": item.get("content"), | |
| "isError": item.get("is_error") is True | |
| }) | |
| return out | |
| def parse_jsonl_text(text: str, fallback_session_id: str = None) -> Dict[str, Any]: | |
| lines = [l for l in text.split("\n") if l.strip()] | |
| entries = [] | |
| for line in lines: | |
| try: | |
| parsed = json.loads(line) | |
| if isinstance(parsed, dict): | |
| entries.append(parsed) | |
| except Exception: | |
| pass | |
| session_id = "" | |
| cwd = "" | |
| first_ts = "" | |
| last_ts = "" | |
| observations = [] | |
| for entry in entries: | |
| if entry.get("sessionId") and not session_id: | |
| session_id = entry["sessionId"] | |
| if entry.get("cwd") and not cwd: | |
| cwd = entry["cwd"] | |
| ts = entry.get("timestamp") or datetime.datetime.utcnow().isoformat() + "Z" | |
| if not first_ts: | |
| first_ts = ts | |
| last_ts = ts | |
| msg = entry.get("message") or {} | |
| role = msg.get("role") | |
| content = msg.get("content") | |
| if entry.get("type") == "user" and role == "user": | |
| tool_results = extract_tool_results(content) | |
| if tool_results: | |
| for result in tool_results: | |
| observations.append({ | |
| "id": generate_id("obs"), | |
| "sessionId": session_id or "imported", | |
| "timestamp": ts, | |
| "hookType": "post_tool_failure" if result["isError"] else "post_tool_use", | |
| "toolName": None, | |
| "toolInput": {"toolUseId": result["toolUseId"]}, | |
| "toolOutput": result["output"], | |
| "raw": entry | |
| }) | |
| else: | |
| txt = to_text(content) | |
| if txt.strip(): | |
| observations.append({ | |
| "id": generate_id("obs"), | |
| "sessionId": session_id or "imported", | |
| "timestamp": ts, | |
| "hookType": "prompt_submit", | |
| "userPrompt": txt, | |
| "raw": entry | |
| }) | |
| elif entry.get("type") == "assistant" and role == "assistant": | |
| txt = to_text(content) | |
| tools = extract_tool_uses(content) | |
| if txt.strip(): | |
| observations.append({ | |
| "id": generate_id("obs"), | |
| "sessionId": session_id or "imported", | |
| "timestamp": ts, | |
| "hookType": "stop", | |
| "assistantResponse": txt, | |
| "raw": entry | |
| }) | |
| for tool in tools: | |
| observations.append({ | |
| "id": generate_id("obs"), | |
| "sessionId": session_id or "imported", | |
| "timestamp": ts, | |
| "hookType": "pre_tool_use", | |
| "toolName": tool["name"], | |
| "toolInput": tool["input"], | |
| "raw": {"toolUseId": tool["id"], "entry": entry} | |
| }) | |
| effective_session_id = session_id or fallback_session_id or generate_id("sess") | |
| for obs in observations: | |
| if obs["sessionId"] == "imported": | |
| obs["sessionId"] = effective_session_id | |
| now_iso = datetime.datetime.utcnow().isoformat() + "Z" | |
| return { | |
| "sessionId": effective_session_id, | |
| "project": derive_project(cwd), | |
| "cwd": cwd or os.getcwd(), | |
| "startedAt": first_ts or now_iso, | |
| "endedAt": last_ts or now_iso, | |
| "observations": observations | |
| } | |
| def derive_crystal_and_lessons( | |
| kv, | |
| session_id: str, | |
| project: str, | |
| raw_obs: List[Dict[str, Any]], | |
| compressed: List[Dict[str, Any]], | |
| first_prompt: str = None | |
| ) -> None: | |
| from functions import KV | |
| if not raw_obs: | |
| return | |
| created_at = datetime.datetime.utcnow().isoformat() + "Z" | |
| files = set() | |
| tools = set() | |
| for c in compressed: | |
| for f in c.get("files", []): | |
| files.add(f) | |
| if c.get("type") and c.get("type") != "conversation" and c.get("title"): | |
| tools.add(c["title"]) | |
| assistant_texts = [] | |
| user_prompts = [] | |
| for r in raw_obs: | |
| if isinstance(r.get("assistantResponse"), str) and r["assistantResponse"].strip(): | |
| assistant_texts.append(r["assistantResponse"]) | |
| if isinstance(r.get("userPrompt"), str) and r["userPrompt"].strip(): | |
| user_prompts.append(r["userPrompt"]) | |
| lesson_matches = {} | |
| for text in (assistant_texts + user_prompts)[:200]: | |
| for pat in LESSON_PATTERNS: | |
| for m in pat.finditer(text): | |
| if len(lesson_matches) >= 40: | |
| break | |
| snippet = re.sub(r"\s+", " ", m.group(0)).strip() | |
| if 20 <= len(snippet) <= 220: | |
| key = snippet.lower() | |
| if key not in lesson_matches: | |
| lesson_matches[key] = snippet | |
| lesson_entries = list(lesson_matches.values())[:20] | |
| lesson_ids = [] | |
| for content in lesson_entries: | |
| lesson_id = fingerprint_id("lesson", content.lower()) | |
| try: | |
| existing = kv.get(KV.lessons, lesson_id) | |
| if existing: | |
| existing_sources = existing.get("sourceIds", []) | |
| merged_sources = existing_sources if session_id in existing_sources else (existing_sources + [session_id]) | |
| existing_tags = existing.get("tags", []) | |
| merged_tags = existing_tags if "auto-import" in existing_tags else (existing_tags + ["auto-import"]) | |
| existing["sourceIds"] = merged_sources | |
| existing["tags"] = merged_tags | |
| existing["reinforcements"] = existing.get("reinforcements", 0) + 1 | |
| existing["updatedAt"] = created_at | |
| existing["lastReinforcedAt"] = created_at | |
| kv.set(KV.lessons, lesson_id, existing) | |
| else: | |
| lesson = { | |
| "id": lesson_id, | |
| "content": content, | |
| "context": first_prompt or project, | |
| "confidence": 0.4, | |
| "reinforcements": 0, | |
| "source": "consolidation", | |
| "sourceIds": [session_id], | |
| "project": project, | |
| "tags": ["auto-import"], | |
| "createdAt": created_at, | |
| "updatedAt": created_at, | |
| "decayRate": 0.05 | |
| } | |
| kv.set(KV.lessons, lesson_id, lesson) | |
| lesson_ids.append(lesson_id) | |
| except Exception: | |
| pass | |
| crystal_id = fingerprint_id("crystal", session_id) | |
| if first_prompt: | |
| narrative_preview = first_prompt[:300] | |
| else: | |
| previews = [] | |
| for c in compressed[:5]: | |
| p = c.get("narrative") or c.get("title") | |
| if p: | |
| previews.append(p) | |
| narrative_preview = (" · ".join(previews))[:300] | |
| try: | |
| existing_crystal = kv.get(KV.crystals, crystal_id) or {} | |
| crystal = { | |
| "id": crystal_id, | |
| "narrative": narrative_preview or f"Session {session_id[:12]} ({len(raw_obs)} observations)", | |
| "keyOutcomes": list(tools)[:8], | |
| "filesAffected": list(files)[:20], | |
| "lessons": lesson_ids, | |
| "sourceActionIds": existing_crystal.get("sourceActionIds", []), | |
| "sessionId": session_id, | |
| "project": project, | |
| "createdAt": existing_crystal.get("createdAt", created_at) | |
| } | |
| kv.set(KV.crystals, crystal_id, crystal) | |
| except Exception: | |
| pass | |
| def find_jsonl_files(root: str, limit=200) -> Tuple[List[str], bool, int, bool]: | |
| out = [] | |
| discovered = 0 | |
| walked = 0 | |
| traversal_cap = max(limit * 50, 50000) | |
| for dirpath, dirnames, filenames in os.walk(root): | |
| if walked >= traversal_cap: | |
| break | |
| # skip symlinks or hidden directories | |
| dirnames[:] = [d for d in dirnames if not d.startswith('.') and not os.path.islink(os.path.join(dirpath, d))] | |
| for name in filenames: | |
| walked += 1 | |
| if walked >= traversal_cap: | |
| break | |
| if name.endswith(".jsonl"): | |
| full = os.path.join(dirpath, name) | |
| if not os.path.islink(full): | |
| discovered += 1 | |
| if len(out) < limit: | |
| out.append(full) | |
| traversal_capped = walked >= traversal_cap | |
| truncated = discovered > len(out) or traversal_capped | |
| return out, truncated, discovered, traversal_capped | |
| def import_jsonl_data(kv, path: str = None, max_files: int = None) -> Dict[str, Any]: | |
| from functions import KV, build_synthetic_compression, IndexPersistence, _bm25_index, _vector_index | |
| default_root = os.path.expanduser(os.path.join("~", ".claude", "projects")) | |
| raw_path = path or default_root | |
| expanded = os.path.expanduser(raw_path) | |
| abs_path = os.path.abspath(expanded) | |
| if is_sensitive(abs_path): | |
| return {"success": False, "error": "refusing to process sensitive-looking path"} | |
| if os.path.islink(abs_path): | |
| return {"success": False, "error": "symlinks are not supported"} | |
| if not os.path.exists(abs_path): | |
| return {"success": False, "error": "path not found"} | |
| limit = max_files if isinstance(max_files, int) and max_files > 0 else MAX_FILES_DEFAULT | |
| limit = min(limit, MAX_FILES_UPPER_BOUND) | |
| files = [] | |
| truncated = False | |
| discovered = 0 | |
| traversal_capped = False | |
| if os.path.isdir(abs_path): | |
| files, truncated, discovered, traversal_capped = find_jsonl_files(abs_path, limit) | |
| elif os.path.isfile(abs_path) and abs_path.endswith(".jsonl"): | |
| files = [abs_path] | |
| discovered = 1 | |
| else: | |
| return {"success": False, "error": "path must be a .jsonl file or directory"} | |
| if not files: | |
| return { | |
| "success": True, | |
| "imported": 0, | |
| "sessionIds": [], | |
| "observations": 0, | |
| "discovered": discovered, | |
| "truncated": truncated, | |
| "traversalCapped": traversal_capped, | |
| "maxFiles": limit, | |
| "maxFilesUpperBound": MAX_FILES_UPPER_BOUND | |
| } | |
| session_ids = [] | |
| observation_count = 0 | |
| for file in files: | |
| if is_sensitive(file): | |
| continue | |
| if os.path.islink(file): | |
| continue | |
| try: | |
| with open(file, "r", encoding="utf-8", errors="ignore") as f: | |
| text = f.read() | |
| except Exception as e: | |
| print(f"[import-jsonl] Failed to read {file}: {e}") | |
| continue | |
| parsed = parse_jsonl_text(text, generate_id("sess")) | |
| if not parsed["observations"]: | |
| continue | |
| first_prompt_obs = None | |
| for o in parsed["observations"]: | |
| if isinstance(o.get("userPrompt"), str) and o["userPrompt"].strip(): | |
| first_prompt_obs = o | |
| break | |
| first_prompt = None | |
| if first_prompt_obs: | |
| first_prompt = re.sub(r"\s+", " ", first_prompt_obs["userPrompt"]).strip()[:200] | |
| existing = kv.get(KV.sessions, parsed["sessionId"]) | |
| if existing: | |
| existing["observationCount"] = existing.get("observationCount", 0) + len(parsed["observations"]) | |
| if parsed["endedAt"] > existing.get("endedAt", ""): | |
| existing["endedAt"] = parsed["endedAt"] | |
| if existing.get("status") == "active": | |
| existing["status"] = "completed" | |
| existing_tags = existing.get("tags", []) | |
| if "jsonl-import" not in existing_tags: | |
| existing["tags"] = existing_tags + ["jsonl-import"] | |
| if not existing.get("firstPrompt") and first_prompt: | |
| existing["firstPrompt"] = first_prompt | |
| if not existing.get("id"): | |
| existing["id"] = parsed["sessionId"] | |
| kv.set(KV.sessions, parsed["sessionId"], existing) | |
| else: | |
| session = { | |
| "id": parsed["sessionId"], | |
| "project": parsed["project"], | |
| "cwd": parsed["cwd"], | |
| "startedAt": parsed["startedAt"], | |
| "endedAt": parsed["endedAt"], | |
| "status": "completed", | |
| "observationCount": len(parsed["observations"]), | |
| "tags": ["jsonl-import"], | |
| "firstPrompt": first_prompt | |
| } | |
| kv.set(KV.sessions, session["id"], session) | |
| from functions import _bm25_index, vector_index_add_guarded | |
| compressed = [] | |
| for obs in parsed["observations"]: | |
| synthetic = build_synthetic_compression(obs) | |
| compressed.append(synthetic) | |
| kv.set(KV.observations(parsed["sessionId"]), obs["id"], synthetic) | |
| # Index | |
| _bm25_index.add(synthetic) | |
| comb_text = synthetic["title"] + " " + (synthetic.get("narrative") or "") | |
| vector_index_add_guarded(synthetic["id"], synthetic["sessionId"], comb_text, {"kind": "synthetic", "logId": synthetic["id"]}) | |
| observation_count += len(parsed["observations"]) | |
| session_ids.append(parsed["sessionId"]) | |
| derive_crystal_and_lessons( | |
| kv, | |
| parsed["sessionId"], | |
| parsed["project"], | |
| parsed["observations"], | |
| compressed, | |
| first_prompt | |
| ) | |
| # Save the updated persistence state | |
| import functions | |
| if functions._index_persistence: | |
| try: | |
| functions._index_persistence.save() | |
| except Exception as e: | |
| print(f"[import-jsonl] Warning saving index persistence: {e}") | |
| # Audit trail | |
| try: | |
| from functions import log_audit | |
| log_audit(kv, "import", "mem::replay::import-jsonl", f"Imported {len(session_ids)} sessions: {','.join(session_ids[:3])}...") | |
| except Exception: | |
| pass | |
| # Dolt commit if enabled | |
| try: | |
| kv.commit_version(f"Import {len(session_ids)} Claude Code sessions from JSONL", "system") | |
| except Exception: | |
| pass | |
| return { | |
| "success": True, | |
| "imported": len(files), | |
| "sessionIds": session_ids, | |
| "observations": observation_count, | |
| "discovered": discovered, | |
| "truncated": truncated, | |
| "traversalCapped": traversal_capped, | |
| "maxFiles": limit, | |
| "maxFilesUpperBound": MAX_FILES_UPPER_BOUND | |
| } | |
| def kind_from_hook(obs: Dict[str, Any]) -> str: | |
| ht = obs.get("hookType") | |
| if ht == "session_start": | |
| return "session_start" | |
| elif ht == "session_end": | |
| return "session_end" | |
| elif ht == "prompt_submit": | |
| return "prompt" | |
| elif ht == "stop": | |
| return "response" if obs.get("assistantResponse") else "hook" | |
| elif ht == "pre_tool_use": | |
| return "tool_call" | |
| elif ht == "post_tool_use": | |
| return "tool_result" | |
| elif ht == "post_tool_failure": | |
| return "tool_error" | |
| else: | |
| return "hook" | |
| def label_for(obs: Dict[str, Any], kind: str) -> str: | |
| if kind == "prompt": | |
| val = obs.get("userPrompt") or "User prompt" | |
| return val[:79] + "…" if len(val) > 80 else val | |
| elif kind == "response": | |
| val = obs.get("assistantResponse") or "Assistant response" | |
| return val[:79] + "…" if len(val) > 80 else val | |
| elif kind == "tool_call": | |
| return f"{obs.get('toolName') or 'tool'} ▸ call" | |
| elif kind == "tool_result": | |
| return f"{obs.get('toolName') or 'tool'} ▸ result" | |
| elif kind == "tool_error": | |
| return f"{obs.get('toolName') or 'tool'} ▸ error" | |
| elif kind == "session_start": | |
| return "Session start" | |
| elif kind == "session_end": | |
| return "Session end" | |
| else: | |
| return obs.get("hookType") or "" | |
| def estimate_duration_ms(event: Dict[str, Any]) -> int: | |
| body = event.get("body") or "" | |
| tool_input = event.get("toolInput") or "" | |
| tool_output = event.get("toolOutput") or "" | |
| chars = len(body) | |
| if isinstance(tool_input, str): | |
| chars += len(tool_input) | |
| elif tool_input is not None: | |
| chars += len(json.dumps(tool_input)) | |
| if isinstance(tool_output, str): | |
| chars += len(tool_output) | |
| elif tool_output is not None: | |
| chars += len(json.dumps(tool_output)) | |
| if chars == 0: | |
| return 300 | |
| ms = round((chars / 40) * 1000) | |
| return max(300, min(20000, ms)) | |
| def project_timeline(observations: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| if not observations: | |
| now = datetime.datetime.utcnow().isoformat() + "Z" | |
| return { | |
| "sessionId": "", | |
| "startedAt": now, | |
| "endedAt": now, | |
| "totalDurationMs": 0, | |
| "eventCount": 0, | |
| "events": [] | |
| } | |
| sorted_obs = sorted(observations, key=lambda o: o.get("timestamp", "")) | |
| started_at = sorted_obs[0].get("timestamp", "") | |
| try: | |
| import dateutil.parser | |
| start_dt = dateutil.parser.isoparse(started_at) | |
| start_ms = start_dt.timestamp() * 1000 | |
| except Exception: | |
| start_ms = 0 | |
| events = [] | |
| synthetic_offset = 0 | |
| all_same_ts = all(o.get("timestamp") == started_at for o in sorted_obs) | |
| for obs in sorted_obs: | |
| kind = kind_from_hook(obs) | |
| body = obs.get("userPrompt") if kind == "prompt" else (obs.get("assistantResponse") if kind == "response" else None) | |
| try: | |
| import dateutil.parser | |
| obs_dt = dateutil.parser.isoparse(obs.get("timestamp", "")) | |
| obs_ms = obs_dt.timestamp() * 1000 | |
| offset_ms = int(max(0, obs_ms - start_ms)) if not all_same_ts else synthetic_offset | |
| except Exception: | |
| offset_ms = synthetic_offset | |
| event = { | |
| "id": obs.get("id"), | |
| "sessionId": obs.get("sessionId"), | |
| "ts": obs.get("timestamp"), | |
| "offsetMs": offset_ms, | |
| "durationMs": 0, | |
| "kind": kind, | |
| "label": label_for(obs, kind), | |
| "body": body, | |
| "toolName": obs.get("toolName"), | |
| "toolInput": obs.get("toolInput"), | |
| "toolOutput": obs.get("toolOutput") | |
| } | |
| event["durationMs"] = estimate_duration_ms(event) | |
| events.append(event) | |
| synthetic_offset += event["durationMs"] | |
| if not events: | |
| total_duration_ms = 0 | |
| else: | |
| last = events[-1] | |
| total_duration_ms = last["offsetMs"] + last["durationMs"] | |
| return { | |
| "sessionId": sorted_obs[0].get("sessionId"), | |
| "startedAt": started_at, | |
| "endedAt": sorted_obs[-1].get("timestamp"), | |
| "totalDurationMs": total_duration_ms, | |
| "eventCount": len(events), | |
| "events": events | |
| } | |