| | """ |
| | Thinking Module for MiniMind Max2 |
| | Interleaved and Sequential Thinking for complex reasoning and tool interactions. |
| | """ |
| |
|
| | from dataclasses import dataclass, field |
| | from typing import List, Optional, Dict, Any, Callable, Tuple, Generator |
| | from enum import Enum |
| | import time |
| | import json |
| | import re |
| |
|
| |
|
| | class ThinkingMode(Enum): |
| | """Modes of thinking.""" |
| | INTERLEAVED = "interleaved" |
| | SEQUENTIAL = "sequential" |
| | STREAMING = "streaming" |
| | HIDDEN = "hidden" |
| |
|
| |
|
| | @dataclass |
| | class ThinkingStep: |
| | """A single step in the thinking process.""" |
| | step_id: int |
| | content: str |
| | step_type: str = "reasoning" |
| | confidence: float = 1.0 |
| | duration_ms: int = 0 |
| | tool_call: Optional[Dict[str, Any]] = None |
| | tool_result: Optional[Any] = None |
| | is_final: bool = False |
| |
|
| |
|
| | @dataclass |
| | class ThinkingConfig: |
| | """Configuration for thinking behavior.""" |
| | mode: ThinkingMode = ThinkingMode.INTERLEAVED |
| | max_thinking_steps: int = 10 |
| | min_confidence_threshold: float = 0.7 |
| | enable_self_reflection: bool = True |
| | enable_step_verification: bool = True |
| | show_thinking_to_user: bool = False |
| | thinking_budget_ms: int = 30000 |
| |
|
| | |
| | think_start: str = "<Thinking>" |
| | think_end: str = "</Thinking>" |
| | step_marker: str = "<step>" |
| | reflect_marker: str = "<reflect>" |
| | conclude_marker: str = "<conclude>" |
| |
|
| |
|
| | class ThinkingContext: |
| | """Maintains context across thinking steps.""" |
| |
|
| | def __init__(self, config: ThinkingConfig): |
| | self.config = config |
| | self.steps: List[ThinkingStep] = [] |
| | self.tool_history: List[Dict[str, Any]] = [] |
| | self.start_time: float = 0 |
| | self.total_tokens: int = 0 |
| | self.current_confidence: float = 1.0 |
| |
|
| | def start(self): |
| | """Start thinking session.""" |
| | self.start_time = time.time() |
| | self.steps = [] |
| | self.tool_history = [] |
| |
|
| | def elapsed_ms(self) -> int: |
| | """Get elapsed time in milliseconds.""" |
| | return int((time.time() - self.start_time) * 1000) |
| |
|
| | def can_continue(self) -> bool: |
| | """Check if thinking can continue.""" |
| | if len(self.steps) >= self.config.max_thinking_steps: |
| | return False |
| | if self.elapsed_ms() > self.config.thinking_budget_ms: |
| | return False |
| | return True |
| |
|
| | def add_step(self, step: ThinkingStep): |
| | """Add a thinking step.""" |
| | step.duration_ms = self.elapsed_ms() |
| | self.steps.append(step) |
| |
|
| | def add_tool_call(self, tool_name: str, arguments: Dict[str, Any], result: Any): |
| | """Record a tool call.""" |
| | self.tool_history.append({ |
| | "tool": tool_name, |
| | "arguments": arguments, |
| | "result": result, |
| | "step": len(self.steps), |
| | }) |
| |
|
| | def get_summary(self) -> str: |
| | """Get summary of thinking process.""" |
| | summary = [] |
| | for i, step in enumerate(self.steps): |
| | summary.append(f"Step {i+1} ({step.step_type}): {step.content[:100]}...") |
| | return "\n".join(summary) |
| |
|
| | def to_dict(self) -> Dict[str, Any]: |
| | """Convert to dictionary.""" |
| | return { |
| | "steps": [ |
| | { |
| | "id": s.step_id, |
| | "content": s.content, |
| | "type": s.step_type, |
| | "confidence": s.confidence, |
| | "duration_ms": s.duration_ms, |
| | "is_final": s.is_final, |
| | } |
| | for s in self.steps |
| | ], |
| | "tool_history": self.tool_history, |
| | "total_time_ms": self.elapsed_ms(), |
| | "total_steps": len(self.steps), |
| | } |
| |
|
| |
|
| | class InterleavedThinking: |
| | """ |
| | Interleaved Thinking: Reason between each tool interaction. |
| | Enables the model to adapt strategy based on intermediate results. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | model, |
| | tool_registry, |
| | config: Optional[ThinkingConfig] = None, |
| | ): |
| | self.model = model |
| | self.tools = tool_registry |
| | self.config = config or ThinkingConfig(mode=ThinkingMode.INTERLEAVED) |
| |
|
| | def think_and_act( |
| | self, |
| | query: str, |
| | context: Optional[ThinkingContext] = None, |
| | ) -> Generator[ThinkingStep, None, str]: |
| | """ |
| | Think and act in interleaved fashion. |
| | |
| | Yields: |
| | ThinkingStep objects as thinking progresses |
| | |
| | Returns: |
| | Final answer string |
| | """ |
| | ctx = context or ThinkingContext(self.config) |
| | ctx.start() |
| |
|
| | step_id = 0 |
| | current_state = query |
| |
|
| | while ctx.can_continue(): |
| | |
| | thinking = self._generate_thought(current_state, ctx) |
| | step = ThinkingStep( |
| | step_id=step_id, |
| | content=thinking["thought"], |
| | step_type="reasoning", |
| | confidence=thinking.get("confidence", 0.9), |
| | ) |
| | ctx.add_step(step) |
| | yield step |
| | step_id += 1 |
| |
|
| | |
| | action = self._decide_action(thinking, ctx) |
| |
|
| | if action["type"] == "answer": |
| | |
| | final_step = ThinkingStep( |
| | step_id=step_id, |
| | content=action["content"], |
| | step_type="conclusion", |
| | is_final=True, |
| | ) |
| | ctx.add_step(final_step) |
| | yield final_step |
| | return action["content"] |
| |
|
| | elif action["type"] == "tool_call": |
| | |
| | tool_name = action["tool"] |
| | tool_args = action["arguments"] |
| |
|
| | try: |
| | result = self.tools.execute(tool_name, **tool_args) |
| | except Exception as e: |
| | result = f"Error: {str(e)}" |
| |
|
| | ctx.add_tool_call(tool_name, tool_args, result) |
| |
|
| | |
| | tool_step = ThinkingStep( |
| | step_id=step_id, |
| | content=f"Called {tool_name}", |
| | step_type="tool_use", |
| | tool_call={"name": tool_name, "args": tool_args}, |
| | tool_result=result, |
| | ) |
| | ctx.add_step(tool_step) |
| | yield tool_step |
| | step_id += 1 |
| |
|
| | |
| | current_state = f"{current_state}\n\nTool result: {result}" |
| |
|
| | elif action["type"] == "reflect": |
| | |
| | reflect_step = ThinkingStep( |
| | step_id=step_id, |
| | content=action["content"], |
| | step_type="reflection", |
| | ) |
| | ctx.add_step(reflect_step) |
| | yield reflect_step |
| | step_id += 1 |
| |
|
| | |
| | final_answer = self._generate_final_answer(ctx) |
| | return final_answer |
| |
|
| | def _generate_thought( |
| | self, |
| | state: str, |
| | context: ThinkingContext, |
| | ) -> Dict[str, Any]: |
| | """Generate a thought about current state.""" |
| | |
| | |
| | return { |
| | "thought": f"Analyzing: {state[:100]}...", |
| | "confidence": 0.85, |
| | "next_action": "continue", |
| | } |
| |
|
| | def _decide_action( |
| | self, |
| | thinking: Dict[str, Any], |
| | context: ThinkingContext, |
| | ) -> Dict[str, Any]: |
| | """Decide next action based on thinking.""" |
| | |
| | if thinking.get("confidence", 0) > 0.95: |
| | return {"type": "answer", "content": "Final answer based on analysis"} |
| |
|
| | if len(context.tool_history) < 3: |
| | return { |
| | "type": "tool_call", |
| | "tool": "search", |
| | "arguments": {"query": "relevant information"}, |
| | } |
| |
|
| | return {"type": "answer", "content": "Answer after tool use"} |
| |
|
| | def _generate_final_answer(self, context: ThinkingContext) -> str: |
| | """Generate final answer from context.""" |
| | return f"Based on {len(context.steps)} thinking steps and {len(context.tool_history)} tool calls: [Final Answer]" |
| |
|
| |
|
| | class SequentialThinking: |
| | """ |
| | Sequential Thinking: Plan all steps before execution. |
| | Best for well-defined tasks with predictable steps. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | model, |
| | tool_registry, |
| | config: Optional[ThinkingConfig] = None, |
| | ): |
| | self.model = model |
| | self.tools = tool_registry |
| | self.config = config or ThinkingConfig(mode=ThinkingMode.SEQUENTIAL) |
| |
|
| | def plan_and_execute( |
| | self, |
| | query: str, |
| | ) -> Tuple[List[Dict[str, Any]], str]: |
| | """ |
| | Plan all steps then execute sequentially. |
| | |
| | Returns: |
| | Tuple of (execution_log, final_answer) |
| | """ |
| | |
| | plan = self._generate_plan(query) |
| |
|
| | |
| | execution_log = [] |
| | context = {} |
| |
|
| | for step in plan: |
| | result = self._execute_step(step, context) |
| | execution_log.append({ |
| | "step": step, |
| | "result": result, |
| | }) |
| | context[f"step_{len(execution_log)}"] = result |
| |
|
| | |
| | final_answer = self._synthesize_answer(query, execution_log) |
| |
|
| | return execution_log, final_answer |
| |
|
| | def _generate_plan(self, query: str) -> List[Dict[str, Any]]: |
| | """Generate execution plan.""" |
| | |
| | return [ |
| | {"action": "analyze", "description": "Understand the query"}, |
| | {"action": "search", "description": "Gather information"}, |
| | {"action": "synthesize", "description": "Combine findings"}, |
| | {"action": "answer", "description": "Formulate response"}, |
| | ] |
| |
|
| | def _execute_step( |
| | self, |
| | step: Dict[str, Any], |
| | context: Dict[str, Any], |
| | ) -> Any: |
| | """Execute a single step.""" |
| | action = step.get("action", "") |
| |
|
| | if action == "search" and self.tools: |
| | return self.tools.execute("search", query=step.get("query", "")) |
| |
|
| | return f"Executed: {action}" |
| |
|
| | def _synthesize_answer( |
| | self, |
| | query: str, |
| | execution_log: List[Dict[str, Any]], |
| | ) -> str: |
| | """Synthesize final answer from execution log.""" |
| | return f"Answer to '{query}' based on {len(execution_log)} execution steps" |
| |
|
| |
|
| | class ThinkingEngine: |
| | """ |
| | Unified thinking engine supporting multiple modes. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | model, |
| | tool_registry=None, |
| | config: Optional[ThinkingConfig] = None, |
| | ): |
| | self.model = model |
| | self.tools = tool_registry |
| | self.config = config or ThinkingConfig() |
| |
|
| | self.interleaved = InterleavedThinking(model, tool_registry, config) |
| | self.sequential = SequentialThinking(model, tool_registry, config) |
| |
|
| | def think( |
| | self, |
| | query: str, |
| | mode: Optional[ThinkingMode] = None, |
| | stream: bool = False, |
| | ) -> Dict[str, Any]: |
| | """ |
| | Main thinking interface. |
| | |
| | Args: |
| | query: User query |
| | mode: Thinking mode (uses config default if None) |
| | stream: Whether to stream thinking steps |
| | |
| | Returns: |
| | Dictionary with answer and thinking trace |
| | """ |
| | mode = mode or self.config.mode |
| |
|
| | if mode == ThinkingMode.INTERLEAVED: |
| | return self._run_interleaved(query, stream) |
| | elif mode == ThinkingMode.SEQUENTIAL: |
| | return self._run_sequential(query) |
| | elif mode == ThinkingMode.HIDDEN: |
| | return self._run_hidden(query) |
| | else: |
| | return self._run_interleaved(query, stream) |
| |
|
| | def _run_interleaved(self, query: str, stream: bool) -> Dict[str, Any]: |
| | """Run interleaved thinking.""" |
| | context = ThinkingContext(self.config) |
| | steps = [] |
| | final_answer = "" |
| |
|
| | for step in self.interleaved.think_and_act(query, context): |
| | steps.append(step) |
| | if step.is_final: |
| | final_answer = step.content |
| |
|
| | return { |
| | "answer": final_answer, |
| | "thinking": self._format_thinking(steps), |
| | "context": context.to_dict(), |
| | } |
| |
|
| | def _run_sequential(self, query: str) -> Dict[str, Any]: |
| | """Run sequential thinking.""" |
| | execution_log, answer = self.sequential.plan_and_execute(query) |
| |
|
| | return { |
| | "answer": answer, |
| | "plan": execution_log, |
| | "thinking": self._format_plan_thinking(execution_log), |
| | } |
| |
|
| | def _run_hidden(self, query: str) -> Dict[str, Any]: |
| | """Run thinking but hide trace.""" |
| | result = self._run_interleaved(query, False) |
| | return { |
| | "answer": result["answer"], |
| | "thinking": None, |
| | } |
| |
|
| | def _format_thinking(self, steps: List[ThinkingStep]) -> str: |
| | """Format thinking steps for display.""" |
| | cfg = self.config |
| | lines = [cfg.think_start] |
| |
|
| | for step in steps: |
| | if step.step_type == "reasoning": |
| | lines.append(f"{cfg.step_marker} {step.content}") |
| | elif step.step_type == "reflection": |
| | lines.append(f"{cfg.reflect_marker} {step.content}") |
| | elif step.step_type == "tool_use": |
| | lines.append(f"[Tool: {step.tool_call['name']}] → {step.tool_result}") |
| | elif step.step_type == "conclusion": |
| | lines.append(f"{cfg.conclude_marker} {step.content}") |
| |
|
| | lines.append(cfg.think_end) |
| | return "\n".join(lines) |
| |
|
| | def _format_plan_thinking(self, execution_log: List[Dict[str, Any]]) -> str: |
| | """Format sequential plan execution.""" |
| | cfg = self.config |
| | lines = [cfg.think_start] |
| |
|
| | for i, entry in enumerate(execution_log): |
| | step = entry["step"] |
| | result = entry["result"] |
| | lines.append(f"{cfg.step_marker} Step {i+1}: {step.get('description', '')}") |
| | lines.append(f" Result: {result}") |
| |
|
| | lines.append(cfg.think_end) |
| | return "\n".join(lines) |
| |
|
| | def evaluate_response( |
| | self, |
| | query: str, |
| | response: str, |
| | ) -> Dict[str, Any]: |
| | """ |
| | Evaluate a response before presenting to user. |
| | Can reject or warn based on content. |
| | """ |
| | evaluation = { |
| | "approved": True, |
| | "confidence": 0.9, |
| | "warnings": [], |
| | "suggestions": [], |
| | } |
| |
|
| | |
| | if len(response) < 10: |
| | evaluation["warnings"].append("Response is very short") |
| | evaluation["confidence"] -= 0.2 |
| |
|
| | |
| | uncertainty_markers = ["I'm not sure", "I don't know", "maybe", "perhaps"] |
| | for marker in uncertainty_markers: |
| | if marker.lower() in response.lower(): |
| | evaluation["warnings"].append(f"Contains uncertainty: '{marker}'") |
| | evaluation["confidence"] -= 0.1 |
| |
|
| | |
| | if evaluation["confidence"] < self.config.min_confidence_threshold: |
| | evaluation["approved"] = False |
| | evaluation["suggestions"].append("Consider gathering more information") |
| |
|
| | return evaluation |
| |
|
| |
|
| | class MultilingualThinking: |
| | """ |
| | Multilingual response capability with native thinking. |
| | """ |
| |
|
| | LANGUAGE_PROMPTS = { |
| | "en": "Think and respond in English.", |
| | "zh": "用中文思考和回答。", |
| | "es": "Piensa y responde en español.", |
| | "fr": "Réfléchis et réponds en français.", |
| | "de": "Denke und antworte auf Deutsch.", |
| | "ja": "日本語で考えて答えてください。", |
| | "ko": "한국어로 생각하고 답하세요.", |
| | "ar": "فكر وأجب بالعربية.", |
| | "ru": "Думай и отвечай по-русски.", |
| | "pt": "Pense e responda em português.", |
| | } |
| |
|
| | def __init__(self, thinking_engine: ThinkingEngine): |
| | self.engine = thinking_engine |
| |
|
| | def detect_language(self, text: str) -> str: |
| | """Detect language of input text.""" |
| | |
| | if re.search(r'[\u4e00-\u9fff]', text): |
| | return "zh" |
| | if re.search(r'[\u3040-\u309f\u30a0-\u30ff]', text): |
| | return "ja" |
| | if re.search(r'[\uac00-\ud7af]', text): |
| | return "ko" |
| | if re.search(r'[\u0600-\u06ff]', text): |
| | return "ar" |
| | if re.search(r'[\u0400-\u04ff]', text): |
| | return "ru" |
| | return "en" |
| |
|
| | def think_multilingual( |
| | self, |
| | query: str, |
| | target_language: Optional[str] = None, |
| | ) -> Dict[str, Any]: |
| | """ |
| | Think in target language natively. |
| | |
| | Args: |
| | query: User query |
| | target_language: Target language code (auto-detect if None) |
| | |
| | Returns: |
| | Response with thinking in target language |
| | """ |
| | lang = target_language or self.detect_language(query) |
| | lang_prompt = self.LANGUAGE_PROMPTS.get(lang, self.LANGUAGE_PROMPTS["en"]) |
| |
|
| | |
| | augmented_query = f"{lang_prompt}\n\n{query}" |
| |
|
| | |
| | result = self.engine.think(augmented_query) |
| | result["language"] = lang |
| |
|
| | return result |
| |
|