mashrur950 commited on
Commit
1ba61d9
·
1 Parent(s): 14eb81d

validation error resolve

Browse files
Files changed (3) hide show
  1. chat/geocoding.py +83 -4
  2. chat/tools.py +94 -18
  3. proxy.py +7 -0
chat/geocoding.py CHANGED
@@ -6,10 +6,18 @@ Handles address validation with Google Maps API and smart mock fallback
6
  import os
7
  import googlemaps
8
  import logging
 
 
9
  from typing import Dict, Optional
10
 
11
  logger = logging.getLogger(__name__)
12
 
 
 
 
 
 
 
13
  # Common city coordinates for mock geocoding
14
  CITY_COORDINATES = {
15
  "san francisco": (37.7749, -122.4194),
@@ -47,8 +55,13 @@ class GeocodingService:
47
  self.gmaps_client = None
48
  else:
49
  try:
50
- self.gmaps_client = googlemaps.Client(key=self.google_maps_key)
51
- logger.info("Geocoding: Using Google Maps API")
 
 
 
 
 
52
  except Exception as e:
53
  logger.error(f"Failed to initialize Google Maps client: {e}")
54
  self.use_mock = True
@@ -56,7 +69,8 @@ class GeocodingService:
56
 
57
  def geocode(self, address: str) -> Dict:
58
  """
59
- Geocode address, using mock if API unavailable
 
60
 
61
  Args:
62
  address: Street address to geocode
@@ -73,6 +87,35 @@ class GeocodingService:
73
  logger.error(f"Google Maps API failed: {e}, falling back to mock")
74
  return self._geocode_mock(address)
75
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
76
  def _geocode_google(self, address: str) -> Dict:
77
  """Real Google Maps API geocoding"""
78
  try:
@@ -128,7 +171,8 @@ class GeocodingService:
128
 
129
  def reverse_geocode(self, lat: float, lng: float) -> Dict:
130
  """
131
- Reverse geocode coordinates to address
 
132
 
133
  Args:
134
  lat: Latitude
@@ -146,6 +190,41 @@ class GeocodingService:
146
  logger.error(f"Google Maps reverse geocoding failed: {e}, falling back to mock")
147
  return self._reverse_geocode_mock(lat, lng)
148
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
149
  def _reverse_geocode_google(self, lat: float, lng: float) -> Dict:
150
  """Real Google Maps API reverse geocoding"""
151
  try:
 
6
  import os
7
  import googlemaps
8
  import logging
9
+ import asyncio
10
+ from concurrent.futures import ThreadPoolExecutor
11
  from typing import Dict, Optional
12
 
13
  logger = logging.getLogger(__name__)
14
 
15
+ # Thread pool for running blocking geocoding calls
16
+ _geocoding_executor = ThreadPoolExecutor(max_workers=3, thread_name_prefix="geocoding")
17
+
18
+ # Timeout for geocoding operations (seconds)
19
+ GEOCODING_TIMEOUT = 10
20
+
21
  # Common city coordinates for mock geocoding
22
  CITY_COORDINATES = {
23
  "san francisco": (37.7749, -122.4194),
 
55
  self.gmaps_client = None
56
  else:
57
  try:
58
+ # Configure with timeout to prevent hanging
59
+ self.gmaps_client = googlemaps.Client(
60
+ key=self.google_maps_key,
61
+ timeout=GEOCODING_TIMEOUT, # 10 second timeout
62
+ retry_timeout=GEOCODING_TIMEOUT # Total retry timeout
63
+ )
64
+ logger.info("Geocoding: Using Google Maps API (timeout: 10s)")
65
  except Exception as e:
66
  logger.error(f"Failed to initialize Google Maps client: {e}")
67
  self.use_mock = True
 
69
 
70
  def geocode(self, address: str) -> Dict:
71
  """
72
+ Geocode address, using mock if API unavailable.
73
+ This is a synchronous method with built-in timeout.
74
 
75
  Args:
76
  address: Street address to geocode
 
87
  logger.error(f"Google Maps API failed: {e}, falling back to mock")
88
  return self._geocode_mock(address)
89
 
90
+ async def geocode_async(self, address: str) -> Dict:
91
+ """
92
+ Async-safe geocoding that runs blocking calls in a thread pool.
93
+ Use this in async contexts to prevent blocking the event loop.
94
+
95
+ Args:
96
+ address: Street address to geocode
97
+
98
+ Returns:
99
+ Dict with keys: lat, lng, formatted_address, confidence
100
+ """
101
+ if self.use_mock:
102
+ return self._geocode_mock(address)
103
+
104
+ loop = asyncio.get_event_loop()
105
+ try:
106
+ # Run blocking geocode in thread pool with timeout
107
+ result = await asyncio.wait_for(
108
+ loop.run_in_executor(_geocoding_executor, self._geocode_google, address),
109
+ timeout=GEOCODING_TIMEOUT + 2 # Extra buffer beyond client timeout
110
+ )
111
+ return result
112
+ except asyncio.TimeoutError:
113
+ logger.error(f"Geocoding timed out for address: {address}, falling back to mock")
114
+ return self._geocode_mock(address)
115
+ except Exception as e:
116
+ logger.error(f"Async geocoding failed: {e}, falling back to mock")
117
+ return self._geocode_mock(address)
118
+
119
  def _geocode_google(self, address: str) -> Dict:
120
  """Real Google Maps API geocoding"""
121
  try:
 
171
 
172
  def reverse_geocode(self, lat: float, lng: float) -> Dict:
173
  """
174
+ Reverse geocode coordinates to address.
175
+ This is a synchronous method with built-in timeout.
176
 
177
  Args:
178
  lat: Latitude
 
190
  logger.error(f"Google Maps reverse geocoding failed: {e}, falling back to mock")
191
  return self._reverse_geocode_mock(lat, lng)
192
 
193
+ async def reverse_geocode_async(self, lat: float, lng: float) -> Dict:
194
+ """
195
+ Async-safe reverse geocoding that runs blocking calls in a thread pool.
196
+ Use this in async contexts to prevent blocking the event loop.
197
+
198
+ Args:
199
+ lat: Latitude
200
+ lng: Longitude
201
+
202
+ Returns:
203
+ Dict with keys: address, city, state, country, formatted_address
204
+ """
205
+ if self.use_mock:
206
+ return self._reverse_geocode_mock(lat, lng)
207
+
208
+ loop = asyncio.get_event_loop()
209
+ try:
210
+ # Run blocking reverse_geocode in thread pool with timeout
211
+ result = await asyncio.wait_for(
212
+ loop.run_in_executor(
213
+ _geocoding_executor,
214
+ self._reverse_geocode_google,
215
+ lat,
216
+ lng
217
+ ),
218
+ timeout=GEOCODING_TIMEOUT + 2
219
+ )
220
+ return result
221
+ except asyncio.TimeoutError:
222
+ logger.error(f"Reverse geocoding timed out for ({lat}, {lng}), falling back to mock")
223
+ return self._reverse_geocode_mock(lat, lng)
224
+ except Exception as e:
225
+ logger.error(f"Async reverse geocoding failed: {e}, falling back to mock")
226
+ return self._reverse_geocode_mock(lat, lng)
227
+
228
  def _reverse_geocode_google(self, lat: float, lng: float) -> Dict:
229
  """Real Google Maps API reverse geocoding"""
230
  try:
chat/tools.py CHANGED
@@ -8,19 +8,70 @@ import sys
8
  from pathlib import Path
9
  from datetime import datetime, timedelta
10
  import logging
 
11
 
12
  # Add parent directory to path
13
  sys.path.insert(0, str(Path(__file__).parent.parent))
14
 
15
  from database.connection import execute_write, execute_query, get_db_connection
16
- from chat.geocoding import GeocodingService
17
  from psycopg2.extras import RealDictCursor
18
 
19
  logger = logging.getLogger(__name__)
20
 
 
 
 
21
  # Initialize geocoding service
22
  geocoding_service = GeocodingService()
23
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
24
  # Tool schemas for Claude
25
  TOOLS_SCHEMA = [
26
  {
@@ -625,7 +676,8 @@ def handle_geocode_address(tool_input: dict) -> dict:
625
 
626
  logger.info(f"Geocoding address: {address}")
627
 
628
- result = geocoding_service.geocode(address)
 
629
 
630
  return {
631
  "success": True,
@@ -828,8 +880,8 @@ def _location_to_latlng(location: str) -> dict:
828
  except ValueError:
829
  pass # Not valid coordinates, treat as address
830
 
831
- # Geocode the address
832
- geocoded = geocoding_service.geocode(location)
833
  return {
834
  "latitude": geocoded["lat"],
835
  "longitude": geocoded["lng"]
@@ -1034,9 +1086,9 @@ def _calculate_route_routes_api(origin: str, destination: str, mode: str, altern
1034
  speed_intervals = travel_advisory.get("speedReadingIntervals", [])
1035
  has_traffic_data = len(speed_intervals) > 0
1036
 
1037
- # Get origin and destination addresses (geocode if needed)
1038
- origin_geocoded = geocoding_service.geocode(origin)
1039
- dest_geocoded = geocoding_service.geocode(destination)
1040
 
1041
  # Build enhanced response with vehicle-specific data
1042
  response_data = {
@@ -1178,10 +1230,10 @@ def _calculate_route_mock(origin: str, destination: str, mode: str) -> dict:
1178
  import math
1179
  from datetime import datetime
1180
 
1181
- # Try to geocode both locations to get coordinates
1182
  try:
1183
- origin_geocoded = geocoding_service.geocode(origin)
1184
- dest_geocoded = geocoding_service.geocode(destination)
1185
 
1186
  origin_lat = origin_geocoded["lat"]
1187
  origin_lng = origin_geocoded["lng"]
@@ -1510,6 +1562,24 @@ def handle_create_driver(tool_input: dict, user_id: str = None) -> dict:
1510
  skills_raw = tool_input.get("skills", [])
1511
  skills = list(skills_raw) if skills_raw else []
1512
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1513
  status = tool_input.get("status", "active")
1514
 
1515
  # Validate ALL required fields (name, vehicle_type, current_lat, current_lng)
@@ -1651,12 +1721,10 @@ def handle_update_order(tool_input: dict, user_id: str = None) -> dict:
1651
  current_assigned_driver = existing[0].get("assigned_driver_id")
1652
 
1653
  # Auto-geocode if delivery address is updated without coordinates
 
1654
  if "delivery_address" in tool_input and ("delivery_lat" not in tool_input or "delivery_lng" not in tool_input):
1655
- from chat.geocoding import GeocodingService
1656
- geocoding_service = GeocodingService()
1657
-
1658
  try:
1659
- geocode_result = geocoding_service.geocode(tool_input["delivery_address"])
1660
  tool_input["delivery_lat"] = geocode_result["lat"]
1661
  tool_input["delivery_lng"] = geocode_result["lng"]
1662
  logger.info(f"Auto-geocoded delivery address: {geocode_result['formatted_address']}")
@@ -3263,12 +3331,11 @@ def handle_get_driver_details(tool_input: dict, user_id: str = None) -> dict:
3263
  skills = []
3264
 
3265
  # Reverse geocode location to get address
 
3266
  location_address = None
3267
  if row['current_lat'] and row['current_lng']:
3268
  try:
3269
- from chat.geocoding import GeocodingService
3270
- geocoding_service = GeocodingService()
3271
- reverse_result = geocoding_service.reverse_geocode(
3272
  float(row['current_lat']),
3273
  float(row['current_lng'])
3274
  )
@@ -4273,7 +4340,16 @@ Analyze ALL parameters comprehensively:
4273
 
4274
  **IMPORTANT:** Return ONLY valid JSON. Do not include markdown formatting, code blocks, or explanatory text outside the JSON."""
4275
 
4276
- response = model.generate_content(prompt)
 
 
 
 
 
 
 
 
 
4277
  response_text = response.text.strip()
4278
 
4279
  # Clean response (remove markdown code blocks if present)
 
8
  from pathlib import Path
9
  from datetime import datetime, timedelta
10
  import logging
11
+ from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
12
 
13
  # Add parent directory to path
14
  sys.path.insert(0, str(Path(__file__).parent.parent))
15
 
16
  from database.connection import execute_write, execute_query, get_db_connection
17
+ from chat.geocoding import GeocodingService, GEOCODING_TIMEOUT
18
  from psycopg2.extras import RealDictCursor
19
 
20
  logger = logging.getLogger(__name__)
21
 
22
+ # Thread pool for running blocking operations (geocoding, external APIs)
23
+ _blocking_executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix="blocking_ops")
24
+
25
  # Initialize geocoding service
26
  geocoding_service = GeocodingService()
27
 
28
+
29
+ def safe_geocode(address: str) -> dict:
30
+ """
31
+ Thread-safe geocoding with timeout protection.
32
+ Runs geocoding in a thread pool to prevent blocking the main event loop.
33
+
34
+ Args:
35
+ address: Address to geocode
36
+
37
+ Returns:
38
+ Geocoding result dict
39
+ """
40
+ try:
41
+ future = _blocking_executor.submit(geocoding_service.geocode, address)
42
+ result = future.result(timeout=GEOCODING_TIMEOUT + 2)
43
+ return result
44
+ except FuturesTimeoutError:
45
+ logger.error(f"Geocoding timed out for: {address}, using mock fallback")
46
+ return geocoding_service._geocode_mock(address)
47
+ except Exception as e:
48
+ logger.error(f"Geocoding failed for {address}: {e}, using mock fallback")
49
+ return geocoding_service._geocode_mock(address)
50
+
51
+
52
+ def safe_reverse_geocode(lat: float, lng: float) -> dict:
53
+ """
54
+ Thread-safe reverse geocoding with timeout protection.
55
+ Runs reverse geocoding in a thread pool to prevent blocking.
56
+
57
+ Args:
58
+ lat: Latitude
59
+ lng: Longitude
60
+
61
+ Returns:
62
+ Reverse geocoding result dict
63
+ """
64
+ try:
65
+ future = _blocking_executor.submit(geocoding_service.reverse_geocode, lat, lng)
66
+ result = future.result(timeout=GEOCODING_TIMEOUT + 2)
67
+ return result
68
+ except FuturesTimeoutError:
69
+ logger.error(f"Reverse geocoding timed out for ({lat}, {lng}), using mock fallback")
70
+ return geocoding_service._reverse_geocode_mock(lat, lng)
71
+ except Exception as e:
72
+ logger.error(f"Reverse geocoding failed for ({lat}, {lng}): {e}, using mock fallback")
73
+ return geocoding_service._reverse_geocode_mock(lat, lng)
74
+
75
  # Tool schemas for Claude
76
  TOOLS_SCHEMA = [
77
  {
 
676
 
677
  logger.info(f"Geocoding address: {address}")
678
 
679
+ # Use safe_geocode with timeout protection
680
+ result = safe_geocode(address)
681
 
682
  return {
683
  "success": True,
 
880
  except ValueError:
881
  pass # Not valid coordinates, treat as address
882
 
883
+ # Geocode the address using safe version with timeout
884
+ geocoded = safe_geocode(location)
885
  return {
886
  "latitude": geocoded["lat"],
887
  "longitude": geocoded["lng"]
 
1086
  speed_intervals = travel_advisory.get("speedReadingIntervals", [])
1087
  has_traffic_data = len(speed_intervals) > 0
1088
 
1089
+ # Get origin and destination addresses (geocode if needed) - use safe version with timeout
1090
+ origin_geocoded = safe_geocode(origin)
1091
+ dest_geocoded = safe_geocode(destination)
1092
 
1093
  # Build enhanced response with vehicle-specific data
1094
  response_data = {
 
1230
  import math
1231
  from datetime import datetime
1232
 
1233
+ # Try to geocode both locations to get coordinates (using safe version)
1234
  try:
1235
+ origin_geocoded = safe_geocode(origin)
1236
+ dest_geocoded = safe_geocode(destination)
1237
 
1238
  origin_lat = origin_geocoded["lat"]
1239
  origin_lng = origin_geocoded["lng"]
 
1562
  skills_raw = tool_input.get("skills", [])
1563
  skills = list(skills_raw) if skills_raw else []
1564
 
1565
+ # Define valid skills
1566
+ VALID_SKILLS = [
1567
+ "refrigerated",
1568
+ "medical_certified",
1569
+ "fragile_handler",
1570
+ "overnight",
1571
+ "express_delivery"
1572
+ ]
1573
+
1574
+ # Validate skills
1575
+ if skills:
1576
+ invalid_skills = [s for s in skills if s not in VALID_SKILLS]
1577
+ if invalid_skills:
1578
+ return {
1579
+ "success": False,
1580
+ "error": f"Invalid skills: {invalid_skills}. Valid skills are: {VALID_SKILLS}"
1581
+ }
1582
+
1583
  status = tool_input.get("status", "active")
1584
 
1585
  # Validate ALL required fields (name, vehicle_type, current_lat, current_lng)
 
1721
  current_assigned_driver = existing[0].get("assigned_driver_id")
1722
 
1723
  # Auto-geocode if delivery address is updated without coordinates
1724
+ # Use safe_geocode with timeout protection
1725
  if "delivery_address" in tool_input and ("delivery_lat" not in tool_input or "delivery_lng" not in tool_input):
 
 
 
1726
  try:
1727
+ geocode_result = safe_geocode(tool_input["delivery_address"])
1728
  tool_input["delivery_lat"] = geocode_result["lat"]
1729
  tool_input["delivery_lng"] = geocode_result["lng"]
1730
  logger.info(f"Auto-geocoded delivery address: {geocode_result['formatted_address']}")
 
3331
  skills = []
3332
 
3333
  # Reverse geocode location to get address
3334
+ # Use safe_reverse_geocode with timeout protection
3335
  location_address = None
3336
  if row['current_lat'] and row['current_lng']:
3337
  try:
3338
+ reverse_result = safe_reverse_geocode(
 
 
3339
  float(row['current_lat']),
3340
  float(row['current_lng'])
3341
  )
 
4340
 
4341
  **IMPORTANT:** Return ONLY valid JSON. Do not include markdown formatting, code blocks, or explanatory text outside the JSON."""
4342
 
4343
+ # Call Gemini with timeout protection (30 seconds max)
4344
+ try:
4345
+ future = _blocking_executor.submit(model.generate_content, prompt)
4346
+ response = future.result(timeout=30)
4347
+ except FuturesTimeoutError:
4348
+ logger.error("Gemini AI call timed out after 30 seconds")
4349
+ return {
4350
+ "success": False,
4351
+ "error": "AI assignment timed out. Please try auto_assign_order instead."
4352
+ }
4353
  response_text = response.text.strip()
4354
 
4355
  # Clean response (remove markdown code blocks if present)
proxy.py CHANGED
@@ -16,6 +16,7 @@ import asyncio
16
  import logging
17
  import os
18
  from aiohttp import web, ClientSession, ClientTimeout
 
19
  from urllib.parse import urlencode, parse_qs, urlparse, urlunparse
20
  import sys
21
 
@@ -169,6 +170,12 @@ async def proxy_handler(request):
169
  headers=dict(resp.headers)
170
  )
171
 
 
 
 
 
 
 
172
  except Exception as e:
173
  import traceback
174
  error_details = traceback.format_exc()
 
16
  import logging
17
  import os
18
  from aiohttp import web, ClientSession, ClientTimeout
19
+ from aiohttp.client_exceptions import ClientConnectionResetError
20
  from urllib.parse import urlencode, parse_qs, urlparse, urlunparse
21
  import sys
22
 
 
170
  headers=dict(resp.headers)
171
  )
172
 
173
+ except (ClientConnectionResetError, ConnectionResetError) as e:
174
+ # Client disconnected - this is normal for SSE connections
175
+ # Log at DEBUG level to reduce noise
176
+ logger.debug(f"[SSE] Client disconnected: {e}")
177
+ return web.Response(text="Client disconnected", status=499)
178
+
179
  except Exception as e:
180
  import traceback
181
  error_details = traceback.format_exc()