jackkuo's picture
fix: track PNGs with LFS
aa98b19
from typing import Any, Dict, List, Callable, Optional
from langchain_core.messages import BaseMessage
from langchain_core.runnables import RunnableConfig
from langgraph.graph.state import CompiledStateGraph
import uuid
def random_uuid():
return str(uuid.uuid4())
async def astream_graph(
graph: CompiledStateGraph,
inputs: dict,
config: Optional[RunnableConfig] = None,
node_names: List[str] = [],
callback: Optional[Callable] = None,
stream_mode: str = "messages",
include_subgraphs: bool = False,
) -> Dict[str, Any]:
"""
LangGraph์˜ ์‹คํ–‰ ๊ฒฐ๊ณผ๋ฅผ ๋น„๋™๊ธฐ์ ์œผ๋กœ ์ŠคํŠธ๋ฆฌ๋ฐํ•˜๊ณ  ์ง์ ‘ ์ถœ๋ ฅํ•˜๋Š” ํ•จ์ˆ˜์ž…๋‹ˆ๋‹ค.
Args:
graph (CompiledStateGraph): ์‹คํ–‰ํ•  ์ปดํŒŒ์ผ๋œ LangGraph ๊ฐ์ฒด
inputs (dict): ๊ทธ๋ž˜ํ”„์— ์ „๋‹ฌํ•  ์ž…๋ ฅ๊ฐ’ ๋”•์…”๋„ˆ๋ฆฌ
config (Optional[RunnableConfig]): ์‹คํ–‰ ์„ค์ • (์„ ํƒ์ )
node_names (List[str], optional): ์ถœ๋ ฅํ•  ๋…ธ๋“œ ์ด๋ฆ„ ๋ชฉ๋ก. ๊ธฐ๋ณธ๊ฐ’์€ ๋นˆ ๋ฆฌ์ŠคํŠธ
callback (Optional[Callable], optional): ๊ฐ ์ฒญํฌ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ ์ฝœ๋ฐฑ ํ•จ์ˆ˜. ๊ธฐ๋ณธ๊ฐ’์€ None
์ฝœ๋ฐฑ ํ•จ์ˆ˜๋Š” {"node": str, "content": Any} ํ˜•ํƒœ์˜ ๋”•์…”๋„ˆ๋ฆฌ๋ฅผ ์ธ์ž๋กœ ๋ฐ›์Šต๋‹ˆ๋‹ค.
stream_mode (str, optional): ์ŠคํŠธ๋ฆฌ๋ฐ ๋ชจ๋“œ ("messages" ๋˜๋Š” "updates"). ๊ธฐ๋ณธ๊ฐ’์€ "messages"
include_subgraphs (bool, optional): ์„œ๋ธŒ๊ทธ๋ž˜ํ”„ ํฌํ•จ ์—ฌ๋ถ€. ๊ธฐ๋ณธ๊ฐ’์€ False
Returns:
Dict[str, Any]: ์ตœ์ข… ๊ฒฐ๊ณผ (์„ ํƒ์ )
"""
config = config or {}
final_result = {}
def format_namespace(namespace):
return namespace[-1].split(":")[0] if len(namespace) > 0 else "root graph"
prev_node = ""
if stream_mode == "messages":
async for chunk_msg, metadata in graph.astream(
inputs, config, stream_mode=stream_mode
):
curr_node = metadata["langgraph_node"]
final_result = {
"node": curr_node,
"content": chunk_msg,
"metadata": metadata,
}
# node_names๊ฐ€ ๋น„์–ด์žˆ๊ฑฐ๋‚˜ ํ˜„์žฌ ๋…ธ๋“œ๊ฐ€ node_names์— ์žˆ๋Š” ๊ฒฝ์šฐ์—๋งŒ ์ฒ˜๋ฆฌ
if not node_names or curr_node in node_names:
# ์ฝœ๋ฐฑ ํ•จ์ˆ˜๊ฐ€ ์žˆ๋Š” ๊ฒฝ์šฐ ์‹คํ–‰
if callback:
result = callback({"node": curr_node, "content": chunk_msg})
if hasattr(result, "__await__"):
await result
# ์ฝœ๋ฐฑ์ด ์—†๋Š” ๊ฒฝ์šฐ ๊ธฐ๋ณธ ์ถœ๋ ฅ
else:
# ๋…ธ๋“œ๊ฐ€ ๋ณ€๊ฒฝ๋œ ๊ฒฝ์šฐ์—๋งŒ ๊ตฌ๋ถ„์„  ์ถœ๋ ฅ
if curr_node != prev_node:
print("\n" + "=" * 50)
print(f"๐Ÿ”„ Node: \033[1;36m{curr_node}\033[0m ๐Ÿ”„")
print("- " * 25)
# Claude/Anthropic ๋ชจ๋ธ์˜ ํ† ํฐ ์ฒญํฌ ์ฒ˜๋ฆฌ - ํ•ญ์ƒ ํ…์ŠคํŠธ๋งŒ ์ถ”์ถœ
if hasattr(chunk_msg, "content"):
# ๋ฆฌ์ŠคํŠธ ํ˜•ํƒœ์˜ content (Anthropic/Claude ์Šคํƒ€์ผ)
if isinstance(chunk_msg.content, list):
for item in chunk_msg.content:
if isinstance(item, dict) and "text" in item:
print(item["text"], end="", flush=True)
# ๋ฌธ์ž์—ด ํ˜•ํƒœ์˜ content
elif isinstance(chunk_msg.content, str):
print(chunk_msg.content, end="", flush=True)
# ๊ทธ ์™ธ ํ˜•ํƒœ์˜ chunk_msg ์ฒ˜๋ฆฌ
else:
print(chunk_msg, end="", flush=True)
prev_node = curr_node
elif stream_mode == "updates":
# ์—๋Ÿฌ ์ˆ˜์ •: ์–ธํŒจํ‚น ๋ฐฉ์‹ ๋ณ€๊ฒฝ
# REACT ์—์ด์ „ํŠธ ๋“ฑ ์ผ๋ถ€ ๊ทธ๋ž˜ํ”„์—์„œ๋Š” ๋‹จ์ผ ๋”•์…”๋„ˆ๋ฆฌ๋งŒ ๋ฐ˜ํ™˜ํ•จ
async for chunk in graph.astream(
inputs, config, stream_mode=stream_mode, subgraphs=include_subgraphs
):
# ๋ฐ˜ํ™˜ ํ˜•์‹์— ๋”ฐ๋ผ ์ฒ˜๋ฆฌ ๋ฐฉ๋ฒ• ๋ถ„๊ธฐ
if isinstance(chunk, tuple) and len(chunk) == 2:
# ๊ธฐ์กด ์˜ˆ์ƒ ํ˜•์‹: (namespace, chunk_dict)
namespace, node_chunks = chunk
else:
# ๋‹จ์ผ ๋”•์…”๋„ˆ๋ฆฌ๋งŒ ๋ฐ˜ํ™˜ํ•˜๋Š” ๊ฒฝ์šฐ (REACT ์—์ด์ „ํŠธ ๋“ฑ)
namespace = [] # ๋นˆ ๋„ค์ž„์ŠคํŽ˜์ด์Šค (๋ฃจํŠธ ๊ทธ๋ž˜ํ”„)
node_chunks = chunk # chunk ์ž์ฒด๊ฐ€ ๋…ธ๋“œ ์ฒญํฌ ๋”•์…”๋„ˆ๋ฆฌ
# ๋”•์…”๋„ˆ๋ฆฌ์ธ์ง€ ํ™•์ธํ•˜๊ณ  ํ•ญ๋ชฉ ์ฒ˜๋ฆฌ
if isinstance(node_chunks, dict):
for node_name, node_chunk in node_chunks.items():
final_result = {
"node": node_name,
"content": node_chunk,
"namespace": namespace,
}
# node_names๊ฐ€ ๋น„์–ด์žˆ์ง€ ์•Š์€ ๊ฒฝ์šฐ์—๋งŒ ํ•„ํ„ฐ๋ง
if len(node_names) > 0 and node_name not in node_names:
continue
# ์ฝœ๋ฐฑ ํ•จ์ˆ˜๊ฐ€ ์žˆ๋Š” ๊ฒฝ์šฐ ์‹คํ–‰
if callback is not None:
result = callback({"node": node_name, "content": node_chunk})
if hasattr(result, "__await__"):
await result
# ์ฝœ๋ฐฑ์ด ์—†๋Š” ๊ฒฝ์šฐ ๊ธฐ๋ณธ ์ถœ๋ ฅ
else:
# ๋…ธ๋“œ๊ฐ€ ๋ณ€๊ฒฝ๋œ ๊ฒฝ์šฐ์—๋งŒ ๊ตฌ๋ถ„์„  ์ถœ๋ ฅ (messages ๋ชจ๋“œ์™€ ๋™์ผํ•˜๊ฒŒ)
if node_name != prev_node:
print("\n" + "=" * 50)
print(f"๐Ÿ”„ Node: \033[1;36m{node_name}\033[0m ๐Ÿ”„")
print("- " * 25)
# ๋…ธ๋“œ์˜ ์ฒญํฌ ๋ฐ์ดํ„ฐ ์ถœ๋ ฅ - ํ…์ŠคํŠธ ์ค‘์‹ฌ์œผ๋กœ ์ฒ˜๋ฆฌ
if isinstance(node_chunk, dict):
for k, v in node_chunk.items():
if isinstance(v, BaseMessage):
# BaseMessage์˜ content ์†์„ฑ์ด ํ…์ŠคํŠธ๋‚˜ ๋ฆฌ์ŠคํŠธ์ธ ๊ฒฝ์šฐ๋ฅผ ์ฒ˜๋ฆฌ
if hasattr(v, "content"):
if isinstance(v.content, list):
for item in v.content:
if (
isinstance(item, dict)
and "text" in item
):
print(
item["text"], end="", flush=True
)
else:
print(v.content, end="", flush=True)
else:
v.pretty_print()
elif isinstance(v, list):
for list_item in v:
if isinstance(list_item, BaseMessage):
if hasattr(list_item, "content"):
if isinstance(list_item.content, list):
for item in list_item.content:
if (
isinstance(item, dict)
and "text" in item
):
print(
item["text"],
end="",
flush=True,
)
else:
print(
list_item.content,
end="",
flush=True,
)
else:
list_item.pretty_print()
elif (
isinstance(list_item, dict)
and "text" in list_item
):
print(list_item["text"], end="", flush=True)
else:
print(list_item, end="", flush=True)
elif isinstance(v, dict) and "text" in v:
print(v["text"], end="", flush=True)
else:
print(v, end="", flush=True)
elif node_chunk is not None:
if hasattr(node_chunk, "__iter__") and not isinstance(
node_chunk, str
):
for item in node_chunk:
if isinstance(item, dict) and "text" in item:
print(item["text"], end="", flush=True)
else:
print(item, end="", flush=True)
else:
print(node_chunk, end="", flush=True)
# ๊ตฌ๋ถ„์„ ์„ ์—ฌ๊ธฐ์„œ ์ถœ๋ ฅํ•˜์ง€ ์•Š์Œ (messages ๋ชจ๋“œ์™€ ๋™์ผํ•˜๊ฒŒ)
prev_node = node_name
else:
# ๋”•์…”๋„ˆ๋ฆฌ๊ฐ€ ์•„๋‹Œ ๊ฒฝ์šฐ ์ „์ฒด ์ฒญํฌ ์ถœ๋ ฅ
print("\n" + "=" * 50)
print(f"๐Ÿ”„ Raw output ๐Ÿ”„")
print("- " * 25)
print(node_chunks, end="", flush=True)
# ๊ตฌ๋ถ„์„ ์„ ์—ฌ๊ธฐ์„œ ์ถœ๋ ฅํ•˜์ง€ ์•Š์Œ
final_result = {"content": node_chunks}
else:
raise ValueError(
f"Invalid stream_mode: {stream_mode}. Must be 'messages' or 'updates'."
)
# ํ•„์š”์— ๋”ฐ๋ผ ์ตœ์ข… ๊ฒฐ๊ณผ ๋ฐ˜ํ™˜
return final_result
async def ainvoke_graph(
graph: CompiledStateGraph,
inputs: dict,
config: Optional[RunnableConfig] = None,
node_names: List[str] = [],
callback: Optional[Callable] = None,
include_subgraphs: bool = True,
) -> Dict[str, Any]:
"""
LangGraph ์•ฑ์˜ ์‹คํ–‰ ๊ฒฐ๊ณผ๋ฅผ ๋น„๋™๊ธฐ์ ์œผ๋กœ ์ŠคํŠธ๋ฆฌ๋ฐํ•˜์—ฌ ์ถœ๋ ฅํ•˜๋Š” ํ•จ์ˆ˜์ž…๋‹ˆ๋‹ค.
Args:
graph (CompiledStateGraph): ์‹คํ–‰ํ•  ์ปดํŒŒ์ผ๋œ LangGraph ๊ฐ์ฒด
inputs (dict): ๊ทธ๋ž˜ํ”„์— ์ „๋‹ฌํ•  ์ž…๋ ฅ๊ฐ’ ๋”•์…”๋„ˆ๋ฆฌ
config (Optional[RunnableConfig]): ์‹คํ–‰ ์„ค์ • (์„ ํƒ์ )
node_names (List[str], optional): ์ถœ๋ ฅํ•  ๋…ธ๋“œ ์ด๋ฆ„ ๋ชฉ๋ก. ๊ธฐ๋ณธ๊ฐ’์€ ๋นˆ ๋ฆฌ์ŠคํŠธ
callback (Optional[Callable], optional): ๊ฐ ์ฒญํฌ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ ์ฝœ๋ฐฑ ํ•จ์ˆ˜. ๊ธฐ๋ณธ๊ฐ’์€ None
์ฝœ๋ฐฑ ํ•จ์ˆ˜๋Š” {"node": str, "content": Any} ํ˜•ํƒœ์˜ ๋”•์…”๋„ˆ๋ฆฌ๋ฅผ ์ธ์ž๋กœ ๋ฐ›์Šต๋‹ˆ๋‹ค.
include_subgraphs (bool, optional): ์„œ๋ธŒ๊ทธ๋ž˜ํ”„ ํฌํ•จ ์—ฌ๋ถ€. ๊ธฐ๋ณธ๊ฐ’์€ True
Returns:
Dict[str, Any]: ์ตœ์ข… ๊ฒฐ๊ณผ (๋งˆ์ง€๋ง‰ ๋…ธ๋“œ์˜ ์ถœ๋ ฅ)
"""
config = config or {}
final_result = {}
def format_namespace(namespace):
return namespace[-1].split(":")[0] if len(namespace) > 0 else "root graph"
# subgraphs ๋งค๊ฐœ๋ณ€์ˆ˜๋ฅผ ํ†ตํ•ด ์„œ๋ธŒ๊ทธ๋ž˜ํ”„์˜ ์ถœ๋ ฅ๋„ ํฌํ•จ
async for chunk in graph.astream(
inputs, config, stream_mode="updates", subgraphs=include_subgraphs
):
# ๋ฐ˜ํ™˜ ํ˜•์‹์— ๋”ฐ๋ผ ์ฒ˜๋ฆฌ ๋ฐฉ๋ฒ• ๋ถ„๊ธฐ
if isinstance(chunk, tuple) and len(chunk) == 2:
# ๊ธฐ์กด ์˜ˆ์ƒ ํ˜•์‹: (namespace, chunk_dict)
namespace, node_chunks = chunk
else:
# ๋‹จ์ผ ๋”•์…”๋„ˆ๋ฆฌ๋งŒ ๋ฐ˜ํ™˜ํ•˜๋Š” ๊ฒฝ์šฐ (REACT ์—์ด์ „ํŠธ ๋“ฑ)
namespace = [] # ๋นˆ ๋„ค์ž„์ŠคํŽ˜์ด์Šค (๋ฃจํŠธ ๊ทธ๋ž˜ํ”„)
node_chunks = chunk # chunk ์ž์ฒด๊ฐ€ ๋…ธ๋“œ ์ฒญํฌ ๋”•์…”๋„ˆ๋ฆฌ
# ๋”•์…”๋„ˆ๋ฆฌ์ธ์ง€ ํ™•์ธํ•˜๊ณ  ํ•ญ๋ชฉ ์ฒ˜๋ฆฌ
if isinstance(node_chunks, dict):
for node_name, node_chunk in node_chunks.items():
final_result = {
"node": node_name,
"content": node_chunk,
"namespace": namespace,
}
# node_names๊ฐ€ ๋น„์–ด์žˆ์ง€ ์•Š์€ ๊ฒฝ์šฐ์—๋งŒ ํ•„ํ„ฐ๋ง
if node_names and node_name not in node_names:
continue
# ์ฝœ๋ฐฑ ํ•จ์ˆ˜๊ฐ€ ์žˆ๋Š” ๊ฒฝ์šฐ ์‹คํ–‰
if callback is not None:
result = callback({"node": node_name, "content": node_chunk})
# ์ฝ”๋ฃจํ‹ด์ธ ๊ฒฝ์šฐ await
if hasattr(result, "__await__"):
await result
# ์ฝœ๋ฐฑ์ด ์—†๋Š” ๊ฒฝ์šฐ ๊ธฐ๋ณธ ์ถœ๋ ฅ
else:
print("\n" + "=" * 50)
formatted_namespace = format_namespace(namespace)
if formatted_namespace == "root graph":
print(f"๐Ÿ”„ Node: \033[1;36m{node_name}\033[0m ๐Ÿ”„")
else:
print(
f"๐Ÿ”„ Node: \033[1;36m{node_name}\033[0m in [\033[1;33m{formatted_namespace}\033[0m] ๐Ÿ”„"
)
print("- " * 25)
# ๋…ธ๋“œ์˜ ์ฒญํฌ ๋ฐ์ดํ„ฐ ์ถœ๋ ฅ
if isinstance(node_chunk, dict):
for k, v in node_chunk.items():
if isinstance(v, BaseMessage):
v.pretty_print()
elif isinstance(v, list):
for list_item in v:
if isinstance(list_item, BaseMessage):
list_item.pretty_print()
else:
print(list_item)
elif isinstance(v, dict):
for node_chunk_key, node_chunk_value in v.items():
print(f"{node_chunk_key}:\n{node_chunk_value}")
else:
print(f"\033[1;32m{k}\033[0m:\n{v}")
elif node_chunk is not None:
if hasattr(node_chunk, "__iter__") and not isinstance(
node_chunk, str
):
for item in node_chunk:
print(item)
else:
print(node_chunk)
print("=" * 50)
else:
# ๋”•์…”๋„ˆ๋ฆฌ๊ฐ€ ์•„๋‹Œ ๊ฒฝ์šฐ ์ „์ฒด ์ฒญํฌ ์ถœ๋ ฅ
print("\n" + "=" * 50)
print(f"๐Ÿ”„ Raw output ๐Ÿ”„")
print("- " * 25)
print(node_chunks)
print("=" * 50)
final_result = {"content": node_chunks}
# ์ตœ์ข… ๊ฒฐ๊ณผ ๋ฐ˜ํ™˜
return final_result