|
|
""" |
|
|
Simple example of using the forensic agent with streaming output. |
|
|
""" |
|
|
|
|
|
import sys |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
|
|
|
from src.agents import ForensicAgent |
|
|
|
|
|
|
|
|
def main(): |
|
|
|
|
|
|
|
|
|
|
|
use_tools = "--tools" in sys.argv |
|
|
use_streaming = "--no-stream" not in sys.argv |
|
|
|
|
|
|
|
|
|
|
|
keyword_args = {"--tools", "--no-stream"} |
|
|
non_keyword_args = [arg for arg in sys.argv[1:] if arg not in keyword_args] |
|
|
|
|
|
if not non_keyword_args: |
|
|
print("Error: Please provide an image path as an argument.") |
|
|
print("Usage: python example.py [--tools] [--no-stream] <image_path>") |
|
|
sys.exit(1) |
|
|
|
|
|
image_path = non_keyword_args[0] |
|
|
|
|
|
print("Initializing forensic agent...") |
|
|
agent = ForensicAgent(llm_model="gpt-5.1", temperature=None, reasoning_effort="medium") |
|
|
|
|
|
mode = "full (with tools)" if use_tools else "vision-only (no tools)" |
|
|
stream_mode = "with streaming" if use_streaming else "without streaming" |
|
|
print(f"\nAnalyzing image: {image_path} [{mode}, {stream_mode}]") |
|
|
print("-" * 60) |
|
|
|
|
|
try: |
|
|
if use_streaming: |
|
|
|
|
|
print("\n[STREAMING MODE]\n") |
|
|
accumulated_text = "" |
|
|
tool_calls_seen = set() |
|
|
|
|
|
for event in agent.analyze_stream(image_path, use_tools=use_tools): |
|
|
event_type = event.get('type') |
|
|
|
|
|
if event_type == 'status': |
|
|
print(f"\n{event.get('content', '')}") |
|
|
sys.stdout.flush() |
|
|
|
|
|
elif event_type == 'tool_call': |
|
|
tool_name = event.get('tool_name', 'unknown') |
|
|
if tool_name not in tool_calls_seen: |
|
|
print(f"\n{'='*60}") |
|
|
print(f"🔧 [TOOL CALL] {event.get('content', '')}") |
|
|
tool_args = event.get('tool_args', {}) |
|
|
if tool_args: |
|
|
print(f" Arguments: {tool_args}") |
|
|
print(f"{'='*60}") |
|
|
sys.stdout.flush() |
|
|
tool_calls_seen.add(tool_name) |
|
|
|
|
|
elif event_type == 'tool_result': |
|
|
print(f"\n{'='*60}") |
|
|
print(f"✅ [TOOL RESULT] {event.get('content', '')}") |
|
|
tool_result = event.get('tool_result', '') |
|
|
if tool_result: |
|
|
|
|
|
preview = str(tool_result)[:150] |
|
|
print(f" Full result: {tool_result}") |
|
|
print(f" Preview: {preview}{'...' if len(str(tool_result)) > 150 else ''}") |
|
|
print(f"{'='*60}\n") |
|
|
sys.stdout.flush() |
|
|
|
|
|
elif event_type == 'llm_chunk': |
|
|
|
|
|
chunk = event.get('content', '') |
|
|
print(chunk, end='', flush=True) |
|
|
accumulated_text += chunk |
|
|
|
|
|
elif event_type == 'final': |
|
|
final_result = event.get('final_result', {}) |
|
|
print("\n\n" + "=" * 60) |
|
|
print("ANALYSIS COMPLETE") |
|
|
print("=" * 60) |
|
|
print(f"\nTools used: {', '.join(final_result.get('tool_usage', [])) if final_result.get('tool_usage') else 'None'}") |
|
|
print("\n" + "=" * 60) |
|
|
else: |
|
|
|
|
|
result = agent.analyze(image_path, use_tools=use_tools) |
|
|
|
|
|
print("\nAnalysis Results:") |
|
|
print("=" * 60) |
|
|
print(result['conclusion']) |
|
|
print("\nTools used:", result['tool_usage'] if result['tool_usage'] else "None") |
|
|
|
|
|
except FileNotFoundError: |
|
|
print(f"\nError: Image file not found: {image_path}") |
|
|
print("Please update the image_path variable with a valid image path.") |
|
|
except Exception as e: |
|
|
print(f"\nError: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|
|
|
|
|
|
|