| from unittest.mock import MagicMock |
|
|
| import pytest |
|
|
| from messaging.rendering.telegram_markdown import ( |
| escape_md_v2, |
| escape_md_v2_code, |
| mdv2_bold, |
| mdv2_code_inline, |
| render_markdown_to_mdv2, |
| ) |
| from messaging.transcript import RenderCtx, TranscriptBuffer |
|
|
|
|
| @pytest.fixture |
| def handler(): |
| platform = MagicMock() |
| cli = MagicMock() |
| store = MagicMock() |
| |
| return (platform, cli, store) |
|
|
|
|
| def _ctx() -> RenderCtx: |
| return RenderCtx( |
| bold=mdv2_bold, |
| code_inline=mdv2_code_inline, |
| escape_code=escape_md_v2_code, |
| escape_text=escape_md_v2, |
| render_markdown=render_markdown_to_mdv2, |
| ) |
|
|
|
|
| def test_transcript_structure_and_order(handler): |
| """Verify ordered transcript rendering (thinking/tool/subagent/text/error/status).""" |
| status = "β
*Complete*" |
| t = TranscriptBuffer() |
|
|
| |
| t.apply({"type": "thinking_chunk", "text": "Thinking process..."}) |
| t.apply( |
| {"type": "tool_use", "id": "t1", "name": "list_files", "input": {"path": "."}} |
| ) |
|
|
| |
| t.apply( |
| { |
| "type": "tool_use", |
| "id": "task1", |
| "name": "Task", |
| "input": {"description": "Searching codebase..."}, |
| } |
| ) |
| t.apply( |
| {"type": "tool_use", "id": "t2", "name": "read_file", "input": {"path": "x.py"}} |
| ) |
| t.apply({"type": "tool_result", "tool_use_id": "task1", "content": "done"}) |
|
|
| t.apply({"type": "text_chunk", "text": "Here is the file content."}) |
| t.apply({"type": "error", "message": "Some error happened"}) |
|
|
| msg = t.render(_ctx(), limit_chars=3900, status=status) |
|
|
| print(f"Generated Message:\n{msg}") |
|
|
| |
| assert "Thinking process..." in msg |
| assert "list_files" in msg |
| assert "read_file" in msg |
| assert "Searching codebase..." in msg |
| assert escape_md_v2("Here is the file content.") in msg |
| assert "Some error happened" in msg |
| assert "β
*Complete*" in msg |
|
|
| |
| assert "π *Thinking*" in msg |
| assert "π *Tool call:*" in msg |
| assert "π€ *Subagent:*" in msg |
| assert "β οΈ *Error:*" in msg |
|
|
| |
| p_thinking = msg.find("Thinking process...") |
| p_tool_call = msg.find("π *Tool call:*") |
| p_subagent = msg.find("π€ *Subagent:*") |
| p_content = msg.find(escape_md_v2("Here is the file content.")) |
| p_errors = msg.find("β οΈ *Error:*") |
| p_status = msg.find("β
*Complete*") |
|
|
| assert p_thinking < p_tool_call, "Thinking should come before tool calls" |
| assert p_tool_call < p_subagent, "Tool calls should come before subagent marker" |
| assert p_subagent < p_content, "Subagent should come before Content" |
| assert p_content < p_errors, "Content should come before Errors" |
| assert p_errors < p_status, "Errors should come before Status" |
|
|
|
|
| def test_transcript_simple(handler): |
| """Verify simple transcript with just text + status.""" |
| t = TranscriptBuffer() |
| t.apply({"type": "text_chunk", "text": "Simple message."}) |
| msg = t.render(_ctx(), limit_chars=3900, status="Ready") |
|
|
| assert escape_md_v2("Simple message.") in msg |
| assert "Ready" in msg |
| assert "π" not in msg |
| assert "π " not in msg |
|
|
|
|
| def test_subagents_formatting(handler): |
| """Verify subagent formatting (Task tool).""" |
| t = TranscriptBuffer() |
| t.apply( |
| { |
| "type": "tool_use", |
| "id": "task_1", |
| "name": "Task", |
| "input": {"description": "Task 1"}, |
| } |
| ) |
| t.apply({"type": "tool_result", "tool_use_id": "task_1", "content": "done"}) |
| t.apply( |
| { |
| "type": "tool_use", |
| "id": "task_2", |
| "name": "Task", |
| "input": {"description": "Task 2"}, |
| } |
| ) |
|
|
| msg = t.render(_ctx(), limit_chars=3900, status=None) |
|
|
| assert "π€ *Subagent:* `Task 1`" in msg |
| assert "π€ *Subagent:* `Task 2`" in msg |
|
|