minhvtt commited on
Commit
afb1117
·
verified ·
1 Parent(s): 6e0d2d8

Upload 2 files

Browse files
Files changed (2) hide show
  1. agent_chat_stream.py +111 -100
  2. tools_service.py +212 -183
agent_chat_stream.py CHANGED
@@ -1,100 +1,111 @@
1
- """
2
- Agent Chat Streaming Endpoint
3
- SSE-based real-time streaming for Sales & Feedback agents
4
- """
5
- from typing import AsyncGenerator
6
- from stream_utils import format_sse, EVENT_STATUS, EVENT_TOKEN, EVENT_DONE, EVENT_ERROR, EVENT_METADATA
7
- from datetime import datetime
8
-
9
-
10
- async def agent_chat_stream(
11
- request,
12
- agent_service,
13
- conversation_service
14
- ) -> AsyncGenerator[str, None]:
15
- """
16
- Stream agent responses in real-time (SSE format)
17
-
18
- Args:
19
- request: ChatRequest with message, session_id, mode, user_id
20
- agent_service: AgentService instance
21
- conversation_service: ConversationService instance
22
-
23
- Yields SSE events:
24
- - status: Processing updates
25
- - token: Text chunks
26
- - metadata: Session info
27
- - done: Completion signal
28
- - error: Error messages
29
- """
30
- try:
31
- # === SESSION MANAGEMENT ===
32
- session_id = request.session_id
33
- if not session_id:
34
- session_id = conversation_service.create_session(
35
- metadata={"user_agent": "api", "created_via": "agent_stream"},
36
- user_id=request.user_id
37
- )
38
- yield format_sse(EVENT_METADATA, {"session_id": session_id})
39
-
40
- # Get conversation history
41
- history = conversation_service.get_conversation_history(session_id)
42
-
43
- # Convert to messages format
44
- messages = []
45
- for h in history:
46
- messages.append({"role": h["role"], "content": h["content"]})
47
-
48
-
49
- # Determine mode
50
- mode = getattr(request, 'mode', 'sales') # Default to sales
51
-
52
- # === STATUS UPDATE ===
53
- if mode == 'feedback':
54
- yield format_sse(EVENT_STATUS, "Đang kiểm tra lịch sử sự kiện của bạn...")
55
- else:
56
- yield format_sse(EVENT_STATUS, "Đang vấn...")
57
-
58
- # === CALL AGENT ===
59
- result = await agent_service.chat(
60
- user_message=request.message,
61
- conversation_history=messages,
62
- mode=mode,
63
- user_id=request.user_id
64
- )
65
-
66
- agent_response = result["message"]
67
-
68
- # === STREAM RESPONSE TOKEN BY TOKEN ===
69
- # Simple character-by-character streaming
70
- chunk_size = 5 # Characters per chunk
71
- for i in range(0, len(agent_response), chunk_size):
72
- chunk = agent_response[i:i+chunk_size]
73
- yield format_sse(EVENT_TOKEN, chunk)
74
- # Small delay for smoother streaming
75
- import asyncio
76
- await asyncio.sleep(0.02)
77
-
78
- # === SAVE HISTORY ===
79
- conversation_service.add_message(
80
- session_id=session_id,
81
- role="user",
82
- content=request.message
83
- )
84
- conversation_service.add_message(
85
- session_id=session_id,
86
- role="assistant",
87
- content=agent_response
88
- )
89
-
90
- # === DONE ===
91
- yield format_sse(EVENT_DONE, {
92
- "session_id": session_id,
93
- "timestamp": datetime.utcnow().isoformat(),
94
- "mode": mode,
95
- "tool_calls": len(result.get("tool_calls", []))
96
- })
97
-
98
- except Exception as e:
99
- print(f"⚠️ Agent Stream Error: {e}")
100
- yield format_sse(EVENT_ERROR, str(e))
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Agent Chat Streaming Endpoint
3
+ SSE-based real-time streaming for Sales & Feedback agents
4
+ """
5
+ from typing import AsyncGenerator
6
+ from stream_utils import format_sse, EVENT_STATUS, EVENT_TOKEN, EVENT_DONE, EVENT_ERROR, EVENT_METADATA
7
+ from datetime import datetime
8
+
9
+
10
+ async def agent_chat_stream(
11
+ request,
12
+ agent_service,
13
+ conversation_service
14
+ ) -> AsyncGenerator[str, None]:
15
+ """
16
+ Stream agent responses in real-time (SSE format)
17
+
18
+ Args:
19
+ request: ChatRequest with message, session_id, mode, user_id
20
+ agent_service: AgentService instance
21
+ conversation_service: ConversationService instance
22
+
23
+ Yields SSE events:
24
+ - status: Processing updates
25
+ - token: Text chunks
26
+ - metadata: Session info
27
+ - done: Completion signal
28
+ - error: Error messages
29
+ """
30
+ try:
31
+ # === SESSION MANAGEMENT ===
32
+ session_id = request.session_id
33
+ if not session_id:
34
+ session_id = conversation_service.create_session(
35
+ metadata={"user_agent": "api", "created_via": "agent_stream"},
36
+ user_id=request.user_id
37
+ )
38
+ yield format_sse(EVENT_METADATA, {"session_id": session_id})
39
+
40
+ # Get conversation history
41
+ history = conversation_service.get_conversation_history(session_id)
42
+
43
+ # Convert to messages format
44
+ messages = []
45
+ for h in history:
46
+ messages.append({"role": h["role"], "content": h["content"]})
47
+
48
+
49
+ # Determine mode
50
+ mode = getattr(request, 'mode', 'sales') # Default to sales
51
+ user_id = getattr(request, 'user_id', None)
52
+ access_token = getattr(request, 'access_token', None)
53
+
54
+ # Debug logging
55
+ print(f"📋 Request Info:")
56
+ print(f" - Mode: {mode}")
57
+ print(f" - User ID: {user_id}")
58
+ print(f" - Access Token: {'✅ Present' if access_token else '❌ Missing'}")
59
+ if access_token:
60
+ print(f" - Token preview: {access_token[:20]}...")
61
+
62
+ # === STATUS UPDATE ===
63
+ if mode == 'feedback':
64
+ yield format_sse(EVENT_STATUS, "Đang kiểm tra lịch sử sự kiện của bạn...")
65
+ else:
66
+ yield format_sse(EVENT_STATUS, "Đang tư vấn...")
67
+
68
+ # === CALL AGENT ===
69
+ result = await agent_service.chat(
70
+ user_message=request.message,
71
+ conversation_history=messages,
72
+ mode=mode,
73
+ user_id=user_id,
74
+ access_token=access_token
75
+ )
76
+
77
+ agent_response = result["message"]
78
+
79
+ # === STREAM RESPONSE TOKEN BY TOKEN ===
80
+ # Simple character-by-character streaming
81
+ chunk_size = 5 # Characters per chunk
82
+ for i in range(0, len(agent_response), chunk_size):
83
+ chunk = agent_response[i:i+chunk_size]
84
+ yield format_sse(EVENT_TOKEN, chunk)
85
+ # Small delay for smoother streaming
86
+ import asyncio
87
+ await asyncio.sleep(0.02)
88
+
89
+ # === SAVE HISTORY ===
90
+ conversation_service.add_message(
91
+ session_id=session_id,
92
+ role="user",
93
+ content=request.message
94
+ )
95
+ conversation_service.add_message(
96
+ session_id=session_id,
97
+ role="assistant",
98
+ content=agent_response
99
+ )
100
+
101
+ # === DONE ===
102
+ yield format_sse(EVENT_DONE, {
103
+ "session_id": session_id,
104
+ "timestamp": datetime.utcnow().isoformat(),
105
+ "mode": mode,
106
+ "tool_calls": len(result.get("tool_calls", []))
107
+ })
108
+
109
+ except Exception as e:
110
+ print(f"⚠️ Agent Stream Error: {e}")
111
+ yield format_sse(EVENT_ERROR, str(e))
tools_service.py CHANGED
@@ -1,183 +1,212 @@
1
- """
2
- Tools Service for LLM Function Calling
3
- HuggingFace-compatible với prompt engineering
4
- """
5
- import httpx
6
- from typing import List, Dict, Any, Optional
7
- import json
8
- import asyncio
9
-
10
-
11
- class ToolsService:
12
- """
13
- Manages external API tools that LLM can call via prompt engineering
14
- """
15
-
16
- def __init__(self, base_url: str = "https://hoalacrent.io.vn/api/v0"):
17
- self.base_url = base_url
18
- self.client = httpx.AsyncClient(timeout=10.0)
19
-
20
- def get_tools_definition(self) -> List[Dict]:
21
- """
22
- Return list of tool definitions (OpenAI format style)
23
- Used for constructing system prompt
24
- """
25
- return [
26
- {
27
- "name": "search_events",
28
- "description": "Tìm kiếm sự kiện phù hợp theo từ khóa, vibe, hoặc thời gian.",
29
- "parameters": {
30
- "type": "object",
31
- "properties": {
32
- "query": {"type": "string", "description": "Từ khóa tìm kiếm (VD: 'nhạc rock', 'hài kịch')"},
33
- "vibe": {"type": "string", "description": "Vibe/Mood (VD: 'chill', 'sôi động', 'hẹn hò')"},
34
- "time": {"type": "string", "description": "Thời gian (VD: 'cuối tuần này', 'tối nay')"}
35
- }
36
- }
37
- },
38
- {
39
- "name": "get_event_details",
40
- "description": "Lấy thông tin chi tiết (giá, địa điểm, thời gian) của sự kiện.",
41
- "parameters": {
42
- "type": "object",
43
- "properties": {
44
- "event_id": {"type": "string", "description": "ID của sự kiện (MongoDB ID)"}
45
- },
46
- "required": ["event_id"]
47
- }
48
- },
49
- {
50
- "name": "get_purchased_events",
51
- "description": "Kiểm tra lịch sử các sự kiện user đã mua vé hoặc tham gia.",
52
- "parameters": {
53
- "type": "object",
54
- "properties": {
55
- "user_id": {"type": "string", "description": "ID của user"}
56
- },
57
- "required": ["user_id"]
58
- }
59
- },
60
- {
61
- "name": "save_feedback",
62
- "description": "Lưu đánh giá/feedback của user về sự kiện.",
63
- "parameters": {
64
- "type": "object",
65
- "properties": {
66
- "event_id": {"type": "string", "description": "ID sự kiện"},
67
- "rating": {"type": "integer", "description": "Số sao đánh giá (1-5)"},
68
- "comment": {"type": "string", "description": "Nội dung nhận xét"}
69
- },
70
- "required": ["event_id", "rating"]
71
- }
72
- },
73
- {
74
- "name": "save_lead",
75
- "description": "Lưu thông tin khách hàng quan tâm (Lead).",
76
- "parameters": {
77
- "type": "object",
78
- "properties": {
79
- "email": {"type": "string"},
80
- "phone": {"type": "string"},
81
- "interest": {"type": "string"}
82
- }
83
- }
84
- }
85
- ]
86
-
87
- async def execute_tool(self, tool_name: str, arguments: Dict, access_token: Optional[str] = None) -> Any:
88
- """
89
- Execute a tool by name with arguments
90
-
91
- Args:
92
- tool_name: Name of the tool
93
- arguments: Tool arguments
94
- access_token: JWT token for authenticated API calls
95
- """
96
- print(f"🔧 Executing Tool: {tool_name} with args: {arguments}")
97
-
98
- try:
99
- if tool_name == "get_event_details":
100
- return await self._get_event_details(arguments.get("event_id") or arguments.get("event_code"))
101
-
102
- elif tool_name == "get_purchased_events":
103
- return await self._get_purchased_events(
104
- arguments.get("user_id"),
105
- access_token=access_token # Pass access_token
106
- )
107
-
108
- elif tool_name == "save_feedback":
109
- return await self._save_feedback(
110
- arguments.get("event_id"),
111
- arguments.get("rating"),
112
- arguments.get("comment")
113
- )
114
-
115
- elif tool_name == "search_events":
116
- # Note: This usually requires RAG service, so we return a special signal
117
- # The Agent Service will handle RAG search
118
- return {"action": "run_rag_search", "query": arguments}
119
-
120
- elif tool_name == "save_lead":
121
- # Placeholder for lead saving
122
- return {"success": True, "message": "Lead saved successfully"}
123
-
124
- else:
125
- return {"error": f"Unknown tool: {tool_name}"}
126
-
127
- except Exception as e:
128
- print(f"⚠️ Tool Execution Error: {e}")
129
- return {"error": str(e)}
130
-
131
- async def _get_event_details(self, event_id: str) -> Dict:
132
- """Call API to get event details"""
133
- if not event_id:
134
- return {"error": "Missing event_id"}
135
-
136
- try:
137
- url = f"{self.base_url}/event/get-event-by-id"
138
-
139
- response = await self.client.get(url, params={"id": event_id})
140
- if response.status_code == 200:
141
- data = response.json()
142
- if data.get("success"):
143
- return data.get("data")
144
- return {"error": "Event not found", "details": response.text}
145
- except Exception as e:
146
- return {"error": str(e)}
147
-
148
- async def _get_purchased_events(self, user_id: str, access_token: Optional[str] = None) -> List[Dict]:
149
- """Call API to get purchased events for user (requires auth)"""
150
- if not user_id:
151
- return []
152
-
153
- try:
154
- url = f"{self.base_url}/event/get-purchase-event-by-user-id/{user_id}"
155
- print(f"🔍 Calling API: {url}")
156
-
157
- # Add Authorization header if access_token provided
158
- headers = {}
159
- if access_token:
160
- headers["Authorization"] = f"Bearer {access_token}"
161
- print(f"🔐 Using access_token for authentication")
162
-
163
- response = await self.client.get(url, headers=headers)
164
- if response.status_code == 200:
165
- data = response.json()
166
- # API returns {data: [...]}
167
- return data.get("data", [])
168
-
169
- print(f"⚠️ API Error: {response.status_code} - {response.text}")
170
- return []
171
- except Exception as e:
172
- print(f"⚠️ API Exception: {e}")
173
- return []
174
-
175
- async def _save_feedback(self, event_id: str, rating: int, comment: str) -> Dict:
176
- """Save feedback (Mock or Real API)"""
177
- # TODO: Implement real API call when available
178
- print(f"📝 Saving Feedback: Event={event_id}, Rating={rating}, Comment={comment}")
179
- return {"success": True, "message": "Feedback recorded"}
180
-
181
- async def close(self):
182
- """Close HTTP client"""
183
- await self.client.aclose()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Tools Service for LLM Function Calling
3
+ HuggingFace-compatible với prompt engineering
4
+ """
5
+ import httpx
6
+ from typing import List, Dict, Any, Optional
7
+ import json
8
+ import asyncio
9
+
10
+
11
+ class ToolsService:
12
+ """
13
+ Manages external API tools that LLM can call via prompt engineering
14
+ """
15
+
16
+ def __init__(self, base_url: str = "https://hoalacrent.io.vn/api/v0"):
17
+ self.base_url = base_url
18
+ self.client = httpx.AsyncClient(timeout=10.0)
19
+
20
+ def get_tools_definition(self) -> List[Dict]:
21
+ """
22
+ Return list of tool definitions (OpenAI format style)
23
+ Used for constructing system prompt
24
+ """
25
+ return [
26
+ {
27
+ "name": "search_events",
28
+ "description": "Tìm kiếm sự kiện phù hợp theo từ khóa, vibe, hoặc thời gian.",
29
+ "parameters": {
30
+ "type": "object",
31
+ "properties": {
32
+ "query": {"type": "string", "description": "Từ khóa tìm kiếm (VD: 'nhạc rock', 'hài kịch')"},
33
+ "vibe": {"type": "string", "description": "Vibe/Mood (VD: 'chill', 'sôi động', 'hẹn hò')"},
34
+ "time": {"type": "string", "description": "Thời gian (VD: 'cuối tuần này', 'tối nay')"}
35
+ }
36
+ }
37
+ },
38
+ {
39
+ "name": "get_event_details",
40
+ "description": "Lấy thông tin chi tiết (giá, địa điểm, thời gian) của sự kiện.",
41
+ "parameters": {
42
+ "type": "object",
43
+ "properties": {
44
+ "event_id": {"type": "string", "description": "ID của sự kiện (MongoDB ID)"}
45
+ },
46
+ "required": ["event_id"]
47
+ }
48
+ },
49
+ {
50
+ "name": "get_purchased_events",
51
+ "description": "Kiểm tra lịch sử các sự kiện user đã mua vé hoặc tham gia.",
52
+ "parameters": {
53
+ "type": "object",
54
+ "properties": {
55
+ "user_id": {"type": "string", "description": "ID của user"}
56
+ },
57
+ "required": ["user_id"]
58
+ }
59
+ },
60
+ {
61
+ "name": "save_feedback",
62
+ "description": "Lưu đánh giá/feedback của user về sự kiện.",
63
+ "parameters": {
64
+ "type": "object",
65
+ "properties": {
66
+ "event_id": {"type": "string", "description": "ID sự kiện"},
67
+ "rating": {"type": "integer", "description": "Số sao đánh giá (1-5)"},
68
+ "comment": {"type": "string", "description": "Nội dung nhận xét"}
69
+ },
70
+ "required": ["event_id", "rating"]
71
+ }
72
+ },
73
+ {
74
+ "name": "save_lead",
75
+ "description": "Lưu thông tin khách hàng quan tâm (Lead).",
76
+ "parameters": {
77
+ "type": "object",
78
+ "properties": {
79
+ "email": {"type": "string"},
80
+ "phone": {"type": "string"},
81
+ "interest": {"type": "string"}
82
+ }
83
+ }
84
+ }
85
+ ]
86
+
87
+ async def execute_tool(self, tool_name: str, arguments: Dict, access_token: Optional[str] = None) -> Any:
88
+ """
89
+ Execute a tool by name with arguments
90
+
91
+ Args:
92
+ tool_name: Name of the tool
93
+ arguments: Tool arguments
94
+ access_token: JWT token for authenticated API calls
95
+ """
96
+ print(f"\n🔧 ===== TOOL EXECUTION =====")
97
+ print(f"Tool: {tool_name}")
98
+ print(f"Arguments: {arguments}")
99
+ print(f"Access Token: {'✅ Present' if access_token else '❌ Missing'}")
100
+ if access_token:
101
+ print(f"Token preview: {access_token[:30]}...")
102
+
103
+ try:
104
+ if tool_name == "get_event_details":
105
+ return await self._get_event_details(arguments.get("event_id") or arguments.get("event_code"))
106
+
107
+ elif tool_name == "get_purchased_events":
108
+ print(f"→ Calling _get_purchased_events with:")
109
+ print(f" user_id: {arguments.get('user_id')}")
110
+ print(f" access_token: {'✅' if access_token else '❌'}")
111
+ return await self._get_purchased_events(
112
+ arguments.get("user_id"),
113
+ access_token=access_token # Pass access_token
114
+ )
115
+
116
+ elif tool_name == "save_feedback":
117
+ return await self._save_feedback(
118
+ arguments.get("event_id"),
119
+ arguments.get("rating"),
120
+ arguments.get("comment")
121
+ )
122
+
123
+ elif tool_name == "search_events":
124
+ # Note: This usually requires RAG service, so we return a special signal
125
+ # The Agent Service will handle RAG search
126
+ return {"action": "run_rag_search", "query": arguments}
127
+
128
+ elif tool_name == "save_lead":
129
+ # Placeholder for lead saving
130
+ return {"success": True, "message": "Lead saved successfully"}
131
+
132
+ else:
133
+ return {"error": f"Unknown tool: {tool_name}"}
134
+
135
+ except Exception as e:
136
+ print(f"⚠️ Tool Execution Error: {e}")
137
+ return {"error": str(e)}
138
+
139
+ async def _get_event_details(self, event_id: str) -> Dict:
140
+ """Call API to get event details"""
141
+ if not event_id:
142
+ return {"error": "Missing event_id"}
143
+
144
+ try:
145
+ url = f"{self.base_url}/event/get-event-by-id"
146
+
147
+ response = await self.client.get(url, params={"id": event_id})
148
+ if response.status_code == 200:
149
+ data = response.json()
150
+ if data.get("success"):
151
+ return data.get("data")
152
+ return {"error": "Event not found", "details": response.text}
153
+ except Exception as e:
154
+ return {"error": str(e)}
155
+
156
+ async def _get_purchased_events(self, user_id: str, access_token: Optional[str] = None) -> List[Dict]:
157
+ """Call API to get purchased events for user (requires auth)"""
158
+ print(f"\n🎫 ===== GET PURCHASED EVENTS =====")
159
+ print(f"User ID: {user_id}")
160
+ print(f"Access Token: {'✅ Present' if access_token else '❌ Missing'}")
161
+
162
+ if not user_id:
163
+ print("⚠️ No user_id provided, returning empty list")
164
+ return []
165
+
166
+ try:
167
+ url = f"{self.base_url}/event/get-purchase-event-by-user-id/{user_id}"
168
+ print(f"🔍 API URL: {url}")
169
+
170
+ # Add Authorization header if access_token provided
171
+ headers = {}
172
+ if access_token:
173
+ headers["Authorization"] = f"Bearer {access_token}"
174
+ print(f"🔐 Authorization Header Added:")
175
+ print(f" Bearer {access_token[:30]}...")
176
+ else:
177
+ print(f"⚠️ No access_token - calling API without auth")
178
+
179
+ print(f"📡 Headers: {headers}")
180
+ print(f"🚀 Calling API...")
181
+
182
+ response = await self.client.get(url, headers=headers)
183
+
184
+ print(f"📥 Response Status: {response.status_code}")
185
+ print(f"📦 Response Headers: {dict(response.headers)}")
186
+
187
+ if response.status_code == 200:
188
+ data = response.json()
189
+ print(f"✅ Success! Data keys: {list(data.keys())}")
190
+ events = data.get("data", [])
191
+ print(f"📊 Found {len(events)} purchased events")
192
+ return events
193
+ else:
194
+ print(f"❌ API Error: {response.status_code}")
195
+ print(f"Response body: {response.text[:500]}")
196
+ return []
197
+
198
+ except Exception as e:
199
+ print(f"⚠️ Exception in _get_purchased_events: {type(e).__name__}: {e}")
200
+ import traceback
201
+ traceback.print_exc()
202
+ return []
203
+
204
+ async def _save_feedback(self, event_id: str, rating: int, comment: str) -> Dict:
205
+ """Save feedback (Mock or Real API)"""
206
+ # TODO: Implement real API call when available
207
+ print(f"📝 Saving Feedback: Event={event_id}, Rating={rating}, Comment={comment}")
208
+ return {"success": True, "message": "Feedback recorded"}
209
+
210
+ async def close(self):
211
+ """Close HTTP client"""
212
+ await self.client.aclose()