CodeWeaver / DYNAMIC_PARALLEL_SEARCH.md
ㅅㅎㅇ
Initial commit for Hugging Face Spaces
ea80cdc

A newer version of the Gradio SDK is available: 6.14.0

Upgrade

Dynamic Parallel Search for Multiple Independent Questions

개요

CodeWeaver Phase 4는 다중 독립 질문을 Send API로 동적 병렬 처리하여, 각 질문마다 독립적인 검색 파이프라인을 실행합니다.

핵심 철학

"기존 그래프를 100% 재사용하되, 질문 개수만큼 복제해서 병렬 실행한다"

  • 기존 코드 재사용률: ~95%
  • 새로운 노드: 5개 추가
  • 새로운 edge 함수: 1개 추가 (fanout_multi_questions)
  • 수정된 노드: 2개 수정 (create_plan, generate_answer)

주요 기능

1. 자동 질문 유형 감지

create_plan_node가 질문을 분석하여 3가지 케이스로 분류:

Case 1: single_topic

  • 정의: 하나의 주제를 다각도로 묻는 경우
  • 예시: "Spring Security JWT 인증 구현 방법"
  • 서브질문: ["개념", "구현", "예제"] (답변 섹션 구조용)
  • 실행: 기존 그래프 1회 (검색은 원본 질문으로)

Case 2: multiple_questions

  • 정의: 서로 무관한 독립 질문 (최대 2개)
  • 예시: "JWT가 뭐야? CORS는?"
  • 서브질문: ["JWT가 뭐야?", "CORS는?"] (각각 별도 검색)
  • 실행: Send API로 기존 그래프 2회 병렬 실행

Case 3: too_many

  • 정의: 질문 3개 이상
  • 예시: "JWT? CORS? Docker?"
  • 실행: 친절한 에러 메시지 표시, 대화 계속 가능
  • 하드 가드: LLM 분류와 무관하게 물음표 개수(3개 이상) 또는 질문 후보 개수(3개 이상)로 결정론적 차단

2. 질문 개수 제한

비용 및 품질 관리를 위해 최대 2개 질문으로 제한:

입력: "JWT? CORS? Docker? Redis?"
처리: too_many 케이스 → 에러 메시지
안내: "하나의 주제로 통합" 또는 "2개만 선택" 권장

3. Send API 동적 복제

중요: LangGraph에서 List[Send]는 노드 반환값이 아니라 conditional edge 함수 반환값으로만 사용됩니다.

# initiate_dynamic_search_node: state 준비만 (dict 반환)
def initiate_dynamic_search_node(state: AgentState) -> dict:
    return {"intermediate_steps": [...]}  # Send 반환 안 함!

# fanout_multi_questions: conditional edge 함수 (List[Send] 반환)
def fanout_multi_questions(state: AgentState) -> List[Send]:
    sends = []
    for i, question in enumerate(["JWT가 뭐야?", "CORS는?"]):
        child_state = state.model_copy(deep=True)
        child_state.user_question = question
        child_state.is_multi_question = True
        # ... 메타데이터 설정 ...
        sends.append(Send("run_single_question_worker", child_state))
    return sends

# run_single_question_worker: 내부 서브그래프 실행
# 각 Send는 독립적으로 내부 그래프를 실행:
# analyze → cache → classify → search(×3) → collect → eval → subgraph → generate
# → multi_answers에 결과 추가

4. Reducer 자동 Fan-in (Reset 기능 포함)

# State 정의 (커스텀 reducer 사용)
multi_answers: Annotated[List[Dict[str, Any]], merge_multi_answers] = []

# merge_multi_answers reducer:
# - 기본 동작: old + new (병렬 worker에서 답변을 동시에 append)
# - 리셋 동작: new의 첫 원소가 {"__token__": "__RESET_MULTI_ANS__"}이면
#   old를 버리고 new[1:]로 교체 (이전 턴 누적 방지)

# run_single_question_worker 1이 리턴:
{"multi_answers": [{"index": 0, "question": "JWT가 뭐야?", "answer": "..."}]}

# run_single_question_worker 2가 리턴:
{"multi_answers": [{"index": 1, "question": "CORS는?", "answer": "..."}]}

# LangGraph Reducer가 자동 병합:
state.multi_answers = [
    {"index": 0, ...},
    {"index": 1, ...}
]

# combine_answers_node가 이를 통합 Markdown으로 변환

그래프 흐름

graph TD
    START[START] --> plan[create_plan]
    
    plan -->|single_topic| analyze[analyze_question]
    plan -->|multiple_questions 2개| dynamic[initiate_dynamic_search]
    plan -->|too_many 3+| tooMany[handle_too_many_questions]
    
    tooMany --> END
    
    analyze --> cache[check_cache]
    cache -->|hit| returnCache[return_cached_answer]
    cache -->|miss| classify[classify_intent]
    
    returnCache --> END
    
    classify --> searchSO[search_stackoverflow]
    classify --> searchGH[search_github]
    classify --> searchDocs[search_official_docs]
    
    searchSO --> collect[collect_results]
    searchGH --> collect
    searchDocs --> collect
    
    collect --> eval[evaluate_results]
    
    eval -->|needs_refinement| refine[refine_search]
    eval -->|sufficient| filterNode[filter_and_score]
    
    refine --> classify
    
    filterNode --> summarize[summarize_results]
    summarize --> generate[generate_answer]
    
    generate -->|is_multi_question| combine[combine_answers]
    generate -->|single_topic| END
    
    combine --> END
    
    dynamic --> fanout[fanout_multi_questions<br/>conditional edge]
    fanout -.Send Q1.-> worker1[run_single_question_worker<br/>내부 서브그래프]
    fanout -.Send Q2.-> worker2[run_single_question_worker<br/>내부 서브그래프]
    worker1 --> combine
    worker2 --> combine

흐름 설명

Single Topic (기존 동작 유지)

START → create_plan (case: single_topic)
      → analyze → cache → classify → search(×3) → collect → eval → subgraph → generate → END

Multiple Questions (신규)

START → create_plan (case: multiple_questions)
      → initiate_dynamic_search (state 준비)
      → fanout_multi_questions (conditional edge)
          ├─ Send("run_single_question_worker", Q1) → [내부 서브그래프 전체 파이프라인] → multi_answers[0]
          └─ Send("run_single_question_worker", Q2) → [내부 서브그래프 전체 파이프라인] → multi_answers[1]
      → combine_answers (자동 fan-in) → END

Too Many (신규)

START → create_plan (case: too_many)
      → handle_too_many_questions → END
(사용자는 즉시 다시 질문 가능)

구현 상세

State 확장

# src/agent/state.py

class AgentState(BaseModel):
    # ... 기존 필드 ...
    
    # Phase 4: Dynamic Parallel Search
    is_multi_question: bool = False
    sub_question_index: int = 0
    sub_question_text: Optional[str] = None
    original_multi_question: Optional[str] = None
    multi_answers: Annotated[List[Dict[str, Any]], merge_multi_answers] = []

새로운 노드 (5개)

1. create_plan_node (수정)

  • 위치: src/agent/nodes.py 라인 206
  • 역할: 질문 유형 및 개수 판단
  • 변경:
    • case 필드 추가 (single_topic/multiple_questions/too_many)
    • 하드 가드 추가: _hard_guard_too_many 함수로 3개 이상 질문 결정론적 차단
      • 물음표 개수(3개 이상) 또는 질문 후보 개수(3개 이상) 감지
      • LLM 분류와 무관하게 too_many로 강제

2. handle_too_many_questions_node (신규)

  • 위치: src/agent/nodes.py 라인 1068
  • 역할: 3개 이상 질문 시 안내 메시지
  • 특징: 대화 종료하지 않음 (즉시 재질문 가능)

3. initiate_dynamic_search_node (신규)

  • 위치: src/agent/nodes.py 라인 1092
  • 역할: 다중 질문 처리 진입점, state 준비
  • 핵심: dict만 반환 (Send는 반환하지 않음)

4. fanout_multi_questions (신규 - Edge 함수)

  • 위치: src/agent/nodes.py 라인 1110
  • 역할: conditional edge 함수로 List[Send] 반환
  • 핵심: 각 서브 질문을 run_single_question_worker로 Send

5. run_single_question_worker_node (신규)

  • 위치: src/agent/nodes.py 라인 1306
  • 역할: 내부 서브그래프를 실행하여 state 충돌 방지
  • 핵심:
    • 독립된 단일 질문 그래프를 내부에서 실행
    • outer graph의 scalar state 채널 충돌 방지
    • 결과를 multi_answers reducer에만 추가

6. combine_answers_node (신규)

  • 위치: src/agent/nodes.py 라인 1168
  • 역할: multi_answers를 통합 Markdown 포맷으로 변환
  • 특징: 자동 fan-in (모든 Send 완료 대기)

수정된 노드 (1개)

generate_answer_node (5줄 추가)

  • 위치: src/agent/nodes.py 라인 726
  • 추가 내용:
# 기존 로직 마지막에 추가
if state.is_multi_question:
    updates["multi_answers"] = [{
        "index": state.sub_question_index,
        "question": state.sub_question_text,
        "answer": final_answer
    }]

그래프 재구성

# src/agent/graph.py

# 1. START 진입점 변경
graph.add_edge(START, "create_plan")  # 기존: analyze_question

# 2. create_plan 후 분기 추가
graph.add_conditional_edges(
    "create_plan",
    route_after_plan,
    {
        "analyze_question": "analyze_question",
        "initiate_dynamic_search": "initiate_dynamic_search",
        "handle_too_many_questions": "handle_too_many_questions"
    }
)

# 3. initiate_dynamic_search 후 fan-out
graph.add_conditional_edges(
    "initiate_dynamic_search",
    fanout_multi_questions,  # List[Send] 반환
)

# 4. run_single_question_worker 후 fan-in
graph.add_edge("run_single_question_worker", "combine_answers")

# 5. generate_answer 후 분기 추가
graph.add_conditional_edges(
    "generate_answer",
    route_after_generate,
    {
        "combine_answers": "combine_answers",
        END: END
    }
)

사용 예시

예시 1: 단일 주제 (기존 동작)

from CodeWeaver.src.agent.graph import create_agent
from langchain_core.messages import HumanMessage

agent = create_agent()

result = await agent.ainvoke({
    "user_question": "React hooks 완벽 가이드",
    "messages": [HumanMessage(content="React hooks 완벽 가이드")]
})

# 결과
# plan.case: "single_topic"
# plan.sub_questions: ["hooks란", "주요 hooks", "실무 패턴"]
# 흐름: 기존 그래프 1회 실행
# 출력: 일반 답변 형식

예시 2: 다중 독립 질문 (신규)

result = await agent.ainvoke({
    "user_question": "JWT가 뭐야? CORS 에러는 어떻게 해결해?",
    "messages": [HumanMessage(content="JWT가 뭐야? CORS 에러는 어떻게 해결해?")]
})

# 결과
# plan.case: "multiple_questions"
# plan.sub_questions: ["JWT가 뭐야?", "CORS 에러는 어떻게 해결해?"]
# 흐름: Send API로 그래프 2회 병렬 실행
# 출력:

출력 예시:

# 다중 질문 답변

원본 질문: JWT가 뭐야? CORS 에러는 어떻게 해결해?

---

## 1. JWT가 뭐야?

JWT(JSON Web Token)는 인증 정보를 안전하게 전송하기 위한...

[상세 답변...]

---

## 2. CORS 에러는 어떻게 해결해?

CORS(Cross-Origin Resource Sharing) 에러는...

[상세 답변...]

예시 3: 질문 3개 이상

result = await agent.ainvoke({
    "user_question": "JWT? CORS? Docker?",
    "messages": [HumanMessage(content="JWT? CORS? Docker?")]
})

# 결과
# plan.case: "too_many"
# 출력:

출력 예시:

죄송합니다. 한 번에 최대 2개의 질문까지만 처리할 수 있습니다.

다음 중 하나를 선택해서 다시 질문해 주세요:

1. **하나의 주제로 통합해서 질문**
   예: "JWT 인증과 CORS 설정을 함께 구현하는 방법"

2. **가장 중요한 2개 질문만 선택**
   예: "JWT가 뭐야? 내 코드에 어떻게 적용해?"

3. **질문을 나눠서 순차적으로 질문**
   예: 먼저 "JWT가 뭐야?" 질문 → 답변 확인 → 다음 질문

어떻게 도와드릴까요?

테스트

테스트 파일은 프로젝트 루트에 있습니다. (삭제됨 - 필요시 재생성)

테스트 시나리오

  1. 단일 주제: "Spring Security JWT 인증 구현 방법"

    • 기존 그래프 1회 실행
    • multi_answers 비어있음
    • 일반 답변 형식
  2. 다중 질문 2개: "JWT가 뭐야? CORS는?"

    • Send API로 그래프 2회 병렬 실행
    • multi_answers에 2개 항목
    • 섹션 구분된 통합 답변
  3. 질문 3개 이상: "JWT? CORS? Docker?"

    • handle_too_many_questions로 분기
    • 친절한 에러 메시지
    • 대화 계속 가능
  4. 엣지 케이스: "JWT? CORS? Docker? Redis?"

    • 하드 가드로 무조건 too_many 차단 (물음표 4개 감지)
    • LLM 분류와 무관하게 차단 보장

성능 고려사항

병렬 실행

  • 단일 주제: 3개 검색 노드 병렬 (기존)
  • 다중 질문 (2개): 2×3=6개 검색 노드 병렬
  • LangGraph Send API가 자동 병렬화 관리

비용 관리

  • 질문 개수 제한: 최대 2개
  • 검색 결과 개수: 소스당 3-5개
  • 다중 질문 시 의도 분류 생략 (기본값 "learning" 사용)

캐싱

  • 단일 주제: 전체 답변 캐시 ✅
  • 다중 질문: 각 서브 질문 답변 개별 캐시 ✅
    • Q1 답변 → Q1 질문으로 캐시
    • Q2 답변 → Q2 질문으로 캐시
  • 다음번 동일 질문 시 개별 캐시 히트 가능

기술적 핵심

1. Send API 패턴 (Conditional Edge 함수 사용)

# ❌ 잘못된 방법: 노드에서 Send 반환
def initiate_dynamic_search_node(state):
    return [Send(...), Send(...)]  # 에러 발생!

# ✅ 올바른 방법: conditional edge 함수에서 Send 반환
def fanout_multi_questions(state: AgentState) -> List[Send]:
    sends = []
    for i, question in enumerate(sub_questions):
        child_state = state.model_copy(deep=True)
        child_state.user_question = question
        sends.append(Send("run_single_question_worker", child_state))
    return sends

# 그래프 설정
graph.add_conditional_edges(
    "initiate_dynamic_search",
    fanout_multi_questions,  # List[Send] 반환
)

# LangGraph가 자동으로:
# 1. 두 Send를 병렬 실행
# 2. 각 Send의 모든 노드 실행 대기
# 3. 다음 공통 노드로 이동 (combine_answers)

2. Reducer 자동 병합 (Reset 기능 포함)

# State 정의 (커스텀 reducer)
multi_answers: Annotated[List[Dict[str, Any]], merge_multi_answers] = []

# merge_multi_answers reducer:
def merge_multi_answers(old: List[Dict], new: List[Dict]) -> List[Dict]:
    if not new:
        return old
    # Reset 토큰 체크
    if new[0].get("__token__") == "__RESET_MULTI_ANS__":
        return new[1:]  # 이전 턴 누적 방지
    return old + new  # 기본 병합

# create_plan_node에서 매 실행 시작 시 리셋:
updates["multi_answers"] = [{"__token__": "__RESET_MULTI_ANS__"}]

# 병렬 실행 시:
# [Q1_answer] + [Q2_answer] = [Q1_answer, Q2_answer]

3. Fan-in 보장

# 모든 검색 노드가 collect_results로 연결
graph.add_edge("search_stackoverflow", "collect_results")
graph.add_edge("search_github", "collect_results")
graph.add_edge("search_official_docs", "collect_results")

# LangGraph가 자동으로:
# 1. 3개 검색 모두 완료 대기
# 2. collect_results 1회만 실행

코드 변경 요약

파일별 변경사항

파일 추가 수정 삭제
state.py 5 필드, 1 reducer 함수 - -
nodes.py 5 노드 + 1 edge 함수 (~300줄) 2 노드 (create_plan 하드 가드 추가, generate_answer 5줄) -
graph.py 3 routing 함수, 엣지 재구성 build_agent_graph -

총 변경량: ~350줄 추가, ~100줄 수정

재사용률

  • 기존 노드 재사용: 12/16 (75%)
  • 기존 로직 재사용: ~95% (검색, 평가, 필터링, 요약 등)
  • 새로운 개념: Send API + Reducer만

LangGraph 공식 가이드라인 준수

✅ Graph API

  • StateGraph 사용
  • Pydantic BaseModel state
  • START/END 명시

✅ Workflows + Agents

  • Send API로 동적 병렬화
  • Conditional edges로 라우팅
  • Fan-out/Fan-in 패턴

✅ Thinking in LangGraph

  • 노드는 순수 함수 (한 가지 일만)
  • State는 불변 업데이트
  • Reducer로 병합 자동화

한계 및 향후 개선

현재 한계

  1. 질문 개수 제한: 최대 2개

    • 비용 vs 품질 트레이드오프
    • 향후 3-4개로 확장 가능
  2. 캐싱 전략: 통합 답변은 캐시 안 됨

    • 각 서브 질문은 개별 캐시됨
    • 동일한 다중 질문 재입력 시 개별 캐시 히트
  3. Refinement 루프: 다중 질문에서도 각각 독립적으로 작동

    • 한 질문 refine 시 다른 질문에 영향 없음

향후 개선 방향

  1. 더 많은 질문 지원: 3-4개까지 확장
  2. 혼합 질문 감지: "JWT가 뭐야? 그걸 Spring에 적용하려면?" (순차 의존)
  3. 스트리밍 답변: 각 서브 질문 완료 즉시 스트리밍
  4. 우선순위: 중요도에 따라 질문 순서 조정

참고 자료

문의

구현 관련 질문이나 버그 리포트는 이슈로 등록해주세요.