walidsobhie-code commited on
Commit
2531078
·
1 Parent(s): aeb76d4

feat: Add FileEditTool, GrepTool, GlobTool (Tier 1 RTMP tools)

Browse files

- FileEditTool: insert, delete, replace operations with regex and backups
- GrepTool: regex search with context lines and file filtering
- GlobTool: pattern matching (**/*.py style) with exclusions

src/tools/file_edit.py ADDED
@@ -0,0 +1,255 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """FileEditTool - Intelligent file editing for Stack 2.9"""
2
+
3
+ import json
4
+ import os
5
+ import re
6
+ import shutil
7
+ from datetime import datetime
8
+ from pathlib import Path
9
+ from typing import Any, Dict, List, Optional, Tuple
10
+
11
+ from .base import BaseTool, ToolResult
12
+ from .registry import tool_registry
13
+
14
+ # Protected paths that should not be edited
15
+ PROTECTED_PATHS = {
16
+ '/etc', '/usr', '/bin', '/sbin', '/lib', '/System',
17
+ 'C:\\Windows', 'C:\\Program Files', 'C:\\Program Files (x86)',
18
+ '/System', '/Library', '/Applications'
19
+ }
20
+
21
+ BACKUP_DIR = Path.home() / ".stack-2.9" / "backups"
22
+
23
+
24
+ def _is_protected_path(path: str) -> bool:
25
+ """Check if path is protected."""
26
+ abs_path = str(Path(path).resolve())
27
+ for protected in PROTECTED_PATHS:
28
+ if abs_path.startswith(protected):
29
+ return True
30
+ return False
31
+
32
+
33
+ def _create_backup(content: str, file_path: str) -> str:
34
+ """Create backup of file content."""
35
+ BACKUP_DIR.mkdir(parents=True, exist_ok=True)
36
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
37
+ backup_name = f"{Path(file_path).name}.{timestamp}.bak"
38
+ backup_path = BACKUP_DIR / backup_name
39
+ backup_path.write_text(content)
40
+ return str(backup_path)
41
+
42
+
43
+ class FileEditInsertTool(BaseTool):
44
+ """Insert content at specific line or after a pattern."""
45
+
46
+ name = "file_edit_insert"
47
+ description = "Insert content at a specific line number or after a pattern"
48
+
49
+ input_schema = {
50
+ "type": "object",
51
+ "properties": {
52
+ "path": {"type": "string", "description": "File path to edit"},
53
+ "content": {"type": "string", "description": "Content to insert"},
54
+ "line": {"type": "number", "description": "Line number to insert at (1-indexed)"},
55
+ "after_pattern": {"type": "string", "description": "Or insert after this regex pattern"},
56
+ "create_backup": {"type": "boolean", "default": True}
57
+ },
58
+ "required": ["path", "content"]
59
+ }
60
+
61
+ async def execute(self, path: str, content: str, line: Optional[int] = None, after_pattern: Optional[str] = None, create_backup: bool = True) -> ToolResult:
62
+ """Insert content into file."""
63
+ if _is_protected_path(path):
64
+ return ToolResult(success=False, error="Cannot edit protected path")
65
+
66
+ if not Path(path).exists():
67
+ return ToolResult(success=False, error=f"File not found: {path}")
68
+
69
+ original_content = Path(path).read_text()
70
+ lines = original_content.split('\n')
71
+
72
+ if after_pattern:
73
+ pattern = re.compile(after_pattern)
74
+ for i, l in enumerate(lines):
75
+ if pattern.search(l):
76
+ lines.insert(i + 1, content)
77
+ break
78
+ else:
79
+ return ToolResult(success=False, error=f"Pattern not found: {after_pattern}")
80
+ elif line:
81
+ if line < 1 or line > len(lines) + 1:
82
+ return ToolResult(success=False, error=f"Line {line} out of range (1-{len(lines) + 1})")
83
+ lines.insert(line - 1, content)
84
+ else:
85
+ return ToolResult(success=False, error="Must specify line or after_pattern")
86
+
87
+ if create_backup:
88
+ backup_path = _create_backup(original_content, path)
89
+
90
+ new_content = '\n'.join(lines)
91
+ Path(path).write_text(new_content)
92
+
93
+ return ToolResult(success=True, data={
94
+ "path": path,
95
+ "operation": "insert",
96
+ "line": line or "after_pattern",
97
+ "backup": backup_path if create_backup else None
98
+ })
99
+
100
+
101
+ class FileEditDeleteTool(BaseTool):
102
+ """Delete lines from a file."""
103
+
104
+ name = "file_edit_delete"
105
+ description = "Delete lines from a file"
106
+
107
+ input_schema = {
108
+ "type": "object",
109
+ "properties": {
110
+ "path": {"type": "string", "description": "File path"},
111
+ "line_start": {"type": "number", "description": "Start line (1-indexed)"},
112
+ "line_end": {"type": "number", "description": "End line (1-indexed, inclusive)"},
113
+ "pattern": {"type": "string", "description": "Or delete lines matching this regex"},
114
+ "create_backup": {"type": "boolean", "default": True}
115
+ },
116
+ "required": ["path"]
117
+ }
118
+
119
+ async def execute(self, path: str, line_start: Optional[int] = None, line_end: Optional[int] = None, pattern: Optional[str] = None, create_backup: bool = True) -> ToolResult:
120
+ """Delete lines from file."""
121
+ if _is_protected_path(path):
122
+ return ToolResult(success=False, error="Cannot edit protected path")
123
+
124
+ if not Path(path).exists():
125
+ return ToolResult(success=False, error=f"File not found: {path}")
126
+
127
+ original_content = Path(path).read_text()
128
+ lines = original_content.split('\n')
129
+
130
+ if pattern:
131
+ regex = re.compile(pattern)
132
+ deleted = [i for i, l in enumerate(lines) if regex.search(l)]
133
+ lines = [l for i, l in enumerate(lines) if i not in deleted]
134
+ deleted_count = len(deleted)
135
+ else:
136
+ if line_start is None or line_end is None:
137
+ return ToolResult(success=False, error="Must specify line_start/line_end or pattern")
138
+
139
+ if line_start < 1 or line_end > len(lines) or line_start > line_end:
140
+ return ToolResult(success=False, error=f"Invalid line range ({line_start}-{line_end})")
141
+
142
+ deleted_count = line_end - line_start + 1
143
+ lines = lines[:line_start - 1] + lines[line_end:]
144
+
145
+ if create_backup:
146
+ backup_path = _create_backup(original_content, path)
147
+
148
+ new_content = '\n'.join(lines)
149
+ Path(path).write_text(new_content)
150
+
151
+ return ToolResult(success=True, data={
152
+ "path": path,
153
+ "operation": "delete",
154
+ "lines_deleted": deleted_count,
155
+ "backup": backup_path if create_backup else None
156
+ })
157
+
158
+
159
+ class FileEditReplaceTool(BaseTool):
160
+ """Replace content using regex."""
161
+
162
+ name = "file_edit_replace"
163
+ description = "Replace content using regex pattern"
164
+
165
+ input_schema = {
166
+ "type": "object",
167
+ "properties": {
168
+ "path": {"type": "string", "description": "File path"},
169
+ "pattern": {"type": "string", "description": "Regex pattern to find"},
170
+ "replacement": {"type": "string", "description": "Replacement text"},
171
+ "replace_all": {"type": "boolean", "default": False, "description": "Replace all occurrences"},
172
+ "case_sensitive": {"type": "boolean", "default": True},
173
+ "create_backup": {"type": "boolean", "default": True}
174
+ },
175
+ "required": ["path", "pattern", "replacement"]
176
+ }
177
+
178
+ async def execute(self, path: str, pattern: str, replacement: str, replace_all: bool = False, case_sensitive: bool = True, create_backup: bool = True) -> ToolResult:
179
+ """Replace content in file."""
180
+ if _is_protected_path(path):
181
+ return ToolResult(success=False, error="Cannot edit protected path")
182
+
183
+ if not Path(path).exists():
184
+ return ToolResult(success=False, error=f"File not found: {path}")
185
+
186
+ original_content = Path(path).read_text()
187
+
188
+ flags = 0 if case_sensitive else re.IGNORECASE
189
+ regex = re.compile(pattern, flags)
190
+
191
+ if replace_all:
192
+ new_content, count = regex.subn(replacement, original_content)
193
+ else:
194
+ match = regex.search(original_content)
195
+ if not match:
196
+ return ToolResult(success=False, error=f"Pattern not found: {pattern}")
197
+ new_content = original_content[:match.start()] + replacement + original_content[match.end():]
198
+ count = 1
199
+
200
+ if create_backup:
201
+ backup_path = _create_backup(original_content, path)
202
+
203
+ Path(path).write_text(new_content)
204
+
205
+ return ToolResult(success=True, data={
206
+ "path": path,
207
+ "operation": "replace",
208
+ "replacements": count,
209
+ "backup": backup_path if create_backup else None
210
+ })
211
+
212
+
213
+ class FileEditTool(BaseTool):
214
+ """Main file editing tool with multiple operations."""
215
+
216
+ name = "file_edit"
217
+ description = "Edit files with insert, delete, replace operations"
218
+
219
+ input_schema = {
220
+ "type": "object",
221
+ "properties": {
222
+ "path": {"type": "string", "description": "File path"},
223
+ "operation": {
224
+ "type": "string",
225
+ "enum": ["insert", "delete", "replace"],
226
+ "description": "Operation type"
227
+ },
228
+ "content": {"type": "string", "description": "Content for insert/replace"},
229
+ "line_start": {"type": "number", "description": "Start line for delete"},
230
+ "line_end": {"type": "number", "description": "End line for delete"},
231
+ "pattern": {"type": "string", "description": "Regex pattern"},
232
+ "replacement": {"type": "string", "description": "Replacement text"},
233
+ "replace_all": {"type": "boolean", "default": False},
234
+ "create_backup": {"type": "boolean", "default": True}
235
+ },
236
+ "required": ["path", "operation"]
237
+ }
238
+
239
+ async def execute(self, path: str, operation: str, content: Optional[str] = None, line_start: Optional[int] = None, line_end: Optional[int] = None, pattern: Optional[str] = None, replacement: Optional[str] = None, replace_all: bool = False, create_backup: bool = True) -> ToolResult:
240
+ """Execute file edit operation."""
241
+ if operation == "insert" and content:
242
+ return await FileEditInsertTool().execute(path, content, line=line_start, after_pattern=pattern, create_backup=create_backup)
243
+ elif operation == "delete":
244
+ return await FileEditDeleteTool().execute(path, line_start=line_start, line_end=line_end, pattern=pattern, create_backup=create_backup)
245
+ elif operation == "replace" and pattern and replacement:
246
+ return await FileEditReplaceTool().execute(path, pattern, replacement, replace_all=replace_all, create_backup=create_backup)
247
+ else:
248
+ return ToolResult(success=False, error=f"Invalid operation or missing parameters: {operation}")
249
+
250
+
251
+ # Register tools
252
+ tool_registry.register(FileEditInsertTool())
253
+ tool_registry.register(FileEditDeleteTool())
254
+ tool_registry.register(FileEditReplaceTool())
255
+ tool_registry.register(FileEditTool())
src/tools/glob_tool.py ADDED
@@ -0,0 +1,223 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """GlobTool - File pattern matching for Stack 2.9"""
2
+
3
+ import fnmatch
4
+ import os
5
+ from pathlib import Path
6
+ from typing import Any, Dict, List, Optional
7
+
8
+ from .base import BaseTool, ToolResult
9
+ from .registry import tool_registry
10
+
11
+ # Default exclusions
12
+ DEFAULT_EXCLUDES = {
13
+ '.git', '.svn', '.hg', '__pycache__', 'node_modules', '.venv', 'venv',
14
+ 'env', '.idea', '.vscode', '.DS_Store', '*.pyc', '*.pyo', '*.so',
15
+ '*.dylib', '.cache', '.pytest_cache', '.mypy_cache', 'dist', 'build',
16
+ '*.egg-info', '.tox', '.nox'
17
+ }
18
+
19
+
20
+ def _should_exclude(path: Path, exclude_patterns: List[str]) -> bool:
21
+ """Check if path should be excluded."""
22
+ name = path.name
23
+
24
+ # Check default exclusions
25
+ if name in DEFAULT_EXCLUDES:
26
+ return True
27
+
28
+ # Check custom patterns
29
+ for pattern in exclude_patterns:
30
+ if fnmatch.fnmatch(name, pattern) or fnmatch.fnmatch(str(path), pattern):
31
+ return True
32
+
33
+ return False
34
+
35
+
36
+ def _glob_pattern_to_regex(pattern: str) -> str:
37
+ """Convert glob pattern to regex."""
38
+ # Handle ** for recursive matching
39
+ regex_parts = []
40
+ i = 0
41
+ while i < len(pattern):
42
+ c = pattern[i]
43
+ if c == '*':
44
+ if i + 1 < len(pattern) and pattern[i + 1] == '*':
45
+ # ** matches everything including /
46
+ regex_parts.append('.*')
47
+ i += 2
48
+ else:
49
+ # * matches everything except /
50
+ regex_parts.append('[^/]*')
51
+ i += 1
52
+ elif c == '?':
53
+ regex_parts.append('.')
54
+ i += 1
55
+ elif c == '[':
56
+ # Character class
57
+ j = i + 1
58
+ if j < len(pattern) and pattern[j] == '!':
59
+ regex_parts.append('[^')
60
+ j += 1
61
+ else:
62
+ regex_parts.append('[')
63
+ while j < len(pattern) and pattern[j] != ']':
64
+ regex_parts.append(re.escape(pattern[j]))
65
+ j += 1
66
+ regex_parts.append(']')
67
+ i = j + 1
68
+ else:
69
+ regex_parts.append(re.escape(c))
70
+ i += 1
71
+
72
+ return ''.join(regex_parts)
73
+
74
+
75
+ def _match_glob(path: Path, pattern: str) -> bool:
76
+ """Check if path matches glob pattern."""
77
+ import re
78
+
79
+ # Normalize pattern - handle **/*.py style patterns
80
+ if pattern.startswith('**/'):
81
+ # Recursive pattern
82
+ regex_pattern = _glob_pattern_to_regex(pattern)
83
+ regex = re.compile(regex_pattern)
84
+ return bool(regex.match(str(path))) or bool(regex.match(path.name))
85
+ elif '**' in pattern:
86
+ regex_pattern = _glob_pattern_to_regex(pattern)
87
+ regex = re.compile(regex_pattern)
88
+ return bool(regex.match(str(path)))
89
+ else:
90
+ # Simple pattern
91
+ return fnmatch.fnmatch(path.name, pattern) or fnmatch.fnmatch(str(path), pattern)
92
+
93
+
94
+ class GlobTool(BaseTool):
95
+ """Find files matching glob patterns."""
96
+
97
+ name = "glob"
98
+ description = "Find files matching glob patterns"
99
+
100
+ input_schema = {
101
+ "type": "object",
102
+ "properties": {
103
+ "pattern": {"type": "string", "description": "Glob pattern (e.g., **/*.py, *.js)"},
104
+ "base_path": {"type": "string", "description": "Base directory to search"},
105
+ "exclude": {"type": "array", "items": {"type": "string"}, "description": "Patterns to exclude"},
106
+ "max_results": {"type": "number", "default": 1000, "description": "Maximum results"},
107
+ "files_only": {"type": "boolean", "default": True, "description": "Only return files"}
108
+ },
109
+ "required": ["pattern"]
110
+ }
111
+
112
+ async def execute(self, pattern: str, base_path: Optional[str] = None, exclude: Optional[List[str]] = None, max_results: int = 1000, files_only: bool = True) -> ToolResult:
113
+ """Find files matching pattern."""
114
+ if base_path:
115
+ search_path = Path(base_path)
116
+ else:
117
+ search_path = Path.cwd()
118
+
119
+ if not search_path.exists():
120
+ return ToolResult(success=False, error=f"Path not found: {search_path}")
121
+
122
+ exclude_patterns = exclude or []
123
+ matches = []
124
+ visited_dirs = set()
125
+
126
+ def search_dir(dir_path: Path, depth: int = 0):
127
+ """Recursively search directory."""
128
+ if str(dir_path) in visited_dirs:
129
+ return
130
+ visited_dirs.add(str(dir_path))
131
+
132
+ try:
133
+ for item in dir_path.iterdir():
134
+ if _should_exclude(item, exclude_patterns):
135
+ continue
136
+
137
+ if item.is_file():
138
+ if _match_glob(item, pattern):
139
+ matches.append(str(item))
140
+ if len(matches) >= max_results:
141
+ return True
142
+ elif item.is_dir():
143
+ # Handle ** pattern
144
+ if '**' in pattern:
145
+ search_dir(item, depth + 1)
146
+ elif depth < 20: # Limit depth for non-** patterns
147
+ search_dir(item, depth + 1)
148
+ except PermissionError:
149
+ pass
150
+
151
+ return False
152
+
153
+ search_dir(search_path)
154
+
155
+ return ToolResult(success=True, data={
156
+ "pattern": pattern,
157
+ "base_path": str(search_path),
158
+ "matches": matches,
159
+ "count": len(matches),
160
+ "truncated": len(matches) >= max_results
161
+ })
162
+
163
+
164
+ class GlobListTool(BaseTool):
165
+ """List all files in directory with optional filtering."""
166
+
167
+ name = "glob_list"
168
+ description = "List files in directory with optional pattern filter"
169
+
170
+ input_schema = {
171
+ "type": "object",
172
+ "properties": {
173
+ "path": {"type": "string", "description": "Directory path"},
174
+ "pattern": {"type": "string", "description": "Optional pattern filter"},
175
+ "recursive": {"type": "boolean", "default": False, "description": "Recursive listing"},
176
+ "max_results": {"type": "number", "default": 500}
177
+ },
178
+ "required": ["path"]
179
+ }
180
+
181
+ async def execute(self, path: str, pattern: Optional[str] = None, recursive: bool = False, max_results: int = 500) -> ToolResult:
182
+ """List directory contents."""
183
+ dir_path = Path(path)
184
+ if not dir_path.exists():
185
+ return ToolResult(success=False, error=f"Path not found: {path}")
186
+
187
+ matches = []
188
+
189
+ def search_dir(d: Path, depth: int = 0):
190
+ if len(matches) >= max_results:
191
+ return
192
+
193
+ try:
194
+ for item in d.iterdir():
195
+ if item.name.startswith('.'):
196
+ continue
197
+
198
+ if pattern:
199
+ if _match_glob(item, pattern):
200
+ matches.append(str(item))
201
+ else:
202
+ matches.append(str(item))
203
+
204
+ if item.is_dir() and recursive and depth < 10:
205
+ search_dir(item, depth + 1)
206
+
207
+ if len(matches) >= max_results:
208
+ return
209
+ except PermissionError:
210
+ pass
211
+
212
+ search_dir(dir_path)
213
+
214
+ return ToolResult(success=True, data={
215
+ "path": str(dir_path),
216
+ "files": matches,
217
+ "count": len(matches)
218
+ })
219
+
220
+
221
+ # Register tools
222
+ tool_registry.register(GlobTool())
223
+ tool_registry.register(GlobListTool())
src/tools/grep_tool.py ADDED
@@ -0,0 +1,163 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """GrepTool - Enhanced code search for Stack 2.9"""
2
+
3
+ import os
4
+ import re
5
+ from pathlib import Path
6
+ from typing import Any, Dict, List, Optional
7
+
8
+ from .base import BaseTool, ToolResult
9
+ from .registry import tool_registry
10
+
11
+
12
+ class GrepTool(BaseTool):
13
+ """Search files using regex patterns."""
14
+
15
+ name = "grep"
16
+ description = "Search for regex patterns in files"
17
+
18
+ input_schema = {
19
+ "type": "object",
20
+ "properties": {
21
+ "pattern": {"type": "string", "description": "Regex pattern to search"},
22
+ "path": {"type": "string", "description": "Directory or file path to search"},
23
+ "recursive": {"type": "boolean", "default": True, "description": "Search recursively"},
24
+ "case_sensitive": {"type": "boolean", "default": True},
25
+ "context_lines": {"type": "number", "default": 0, "description": "Lines of context before/after"},
26
+ "file_pattern": {"type": "string", "description": "Filter by file pattern (e.g., *.py, *.js)"},
27
+ "max_results": {"type": "number", "default": 1000, "description": "Maximum matches to return"}
28
+ },
29
+ "required": ["pattern", "path"]
30
+ }
31
+
32
+ async def execute(self, pattern: str, path: str, recursive: bool = True, case_sensitive: bool = True, context_lines: int = 0, file_pattern: Optional[str] = None, max_results: int = 1000) -> ToolResult:
33
+ """Search for pattern in files."""
34
+ search_path = Path(path)
35
+ if not search_path.exists():
36
+ return ToolResult(success=False, error=f"Path not found: {path}")
37
+
38
+ try:
39
+ regex = re.compile(pattern, 0 if case_sensitive else re.IGNORECASE)
40
+ except re.error as e:
41
+ return ToolResult(success=False, error=f"Invalid regex: {e}")
42
+
43
+ matches = []
44
+ visited_dirs = set()
45
+
46
+ def matches_pattern(text: str) -> List[str]:
47
+ """Find all matches in text with line numbers."""
48
+ results = []
49
+ for i, line in enumerate(text.split('\n'), 1):
50
+ if regex.search(line):
51
+ results.append((i, line.rstrip()))
52
+ return results
53
+
54
+ def search_file(file_path: Path):
55
+ """Search a single file."""
56
+ if file_pattern:
57
+ if not any(file_path.match(p) for p in file_pattern.split(',')):
58
+ return
59
+
60
+ try:
61
+ content = file_path.read_text(errors='ignore')
62
+ except Exception:
63
+ return
64
+
65
+ for line_num, line_text in matches_pattern(content):
66
+ match_entry = {
67
+ "file": str(file_path),
68
+ "line": line_num,
69
+ "text": line_text
70
+ }
71
+
72
+ if context_lines > 0:
73
+ lines = content.split('\n')
74
+ start = max(0, line_num - context_lines - 1)
75
+ end = min(len(lines), line_num + context_lines)
76
+ match_entry["context"] = {
77
+ "before": lines[start:line_num - 1],
78
+ "after": lines[line_num:end]
79
+ }
80
+
81
+ matches.append(match_entry)
82
+
83
+ if len(matches) >= max_results:
84
+ return True # Stop searching
85
+ return False
86
+
87
+ def walk_dir(dir_path: Path):
88
+ """Walk directory tree."""
89
+ if str(dir_path) in visited_dirs:
90
+ return
91
+ visited_dirs.add(str(dir_path))
92
+
93
+ try:
94
+ for item in dir_path.iterdir():
95
+ # Skip hidden and common ignore directories
96
+ if item.name.startswith('.') or item.name in ('__pycache__', 'node_modules', '.git', 'venv', 'env'):
97
+ continue
98
+
99
+ if item.is_file():
100
+ if search_file(item):
101
+ return True
102
+ elif item.is_dir() and recursive:
103
+ if walk_dir(item):
104
+ return True
105
+ except PermissionError:
106
+ pass
107
+ return False
108
+
109
+ walk_dir(search_path)
110
+
111
+ return ToolResult(success=True, data={
112
+ "pattern": pattern,
113
+ "path": str(path),
114
+ "matches": matches,
115
+ "count": len(matches),
116
+ "truncated": len(matches) >= max_results
117
+ })
118
+
119
+
120
+ class GrepCountTool(BaseTool):
121
+ """Count occurrences of pattern in files."""
122
+
123
+ name = "grep_count"
124
+ description = "Count occurrences of pattern in files"
125
+
126
+ input_schema = {
127
+ "type": "object",
128
+ "properties": {
129
+ "pattern": {"type": "string", "description": "Regex pattern"},
130
+ "path": {"type": "string", "description": "Directory or file path"},
131
+ "recursive": {"type": "boolean", "default": True},
132
+ "case_sensitive": {"type": "boolean", "default": True},
133
+ "file_pattern": {"type": "string", "description": "Filter by file pattern"}
134
+ },
135
+ "required": ["pattern", "path"]
136
+ }
137
+
138
+ async def execute(self, pattern: str, path: str, recursive: bool = True, case_sensitive: bool = True, file_pattern: Optional[str] = None) -> ToolResult:
139
+ """Count pattern matches."""
140
+ grep_result = await GrepTool().execute(
141
+ pattern=pattern,
142
+ path=path,
143
+ recursive=recursive,
144
+ case_sensitive=case_sensitive,
145
+ file_pattern=file_pattern,
146
+ max_results=100000
147
+ )
148
+
149
+ counts = {}
150
+ for match in grep_result.data.get("matches", []):
151
+ file_path = match["file"]
152
+ counts[file_path] = counts.get(file_path, 0) + 1
153
+
154
+ return ToolResult(success=True, data={
155
+ "pattern": pattern,
156
+ "total_matches": len(matches) if (matches := grep_result.data.get("matches", [])) else 0,
157
+ "by_file": counts
158
+ })
159
+
160
+
161
+ # Register tools
162
+ tool_registry.register(GrepTool())
163
+ tool_registry.register(GrepCountTool())