gzdaniel commited on
Commit
bd1057c
·
1 Parent(s): c875336

Fix streaming problem for OpenAI

Browse files
Files changed (1) hide show
  1. lightrag/llm/openai.py +55 -39
lightrag/llm/openai.py CHANGED
@@ -177,28 +177,32 @@ async def openai_complete_if_cache(
177
  logger.debug("===== Sending Query to LLM =====")
178
 
179
  try:
180
- async with openai_async_client:
181
- if "response_format" in kwargs:
182
- response = await openai_async_client.beta.chat.completions.parse(
183
- model=model, messages=messages, **kwargs
184
- )
185
- else:
186
- response = await openai_async_client.chat.completions.create(
187
- model=model, messages=messages, **kwargs
188
- )
189
  except APIConnectionError as e:
190
  logger.error(f"OpenAI API Connection Error: {e}")
 
191
  raise
192
  except RateLimitError as e:
193
  logger.error(f"OpenAI API Rate Limit Error: {e}")
 
194
  raise
195
  except APITimeoutError as e:
196
  logger.error(f"OpenAI API Timeout Error: {e}")
 
197
  raise
198
  except Exception as e:
199
  logger.error(
200
  f"OpenAI API Call Failed,\nModel: {model},\nParams: {kwargs}, Got: {e}"
201
  )
 
202
  raise
203
 
204
  if hasattr(response, "__aiter__"):
@@ -243,6 +247,8 @@ async def openai_complete_if_cache(
243
  logger.warning(
244
  f"Failed to close stream response: {close_error}"
245
  )
 
 
246
  raise
247
  finally:
248
  # Ensure resources are released even if no exception occurs
@@ -258,40 +264,50 @@ async def openai_complete_if_cache(
258
  logger.warning(
259
  f"Failed to close stream response in finally block: {close_error}"
260
  )
 
 
261
 
262
  return inner()
263
 
264
  else:
265
- if (
266
- not response
267
- or not response.choices
268
- or not hasattr(response.choices[0], "message")
269
- or not hasattr(response.choices[0].message, "content")
270
- ):
271
- logger.error("Invalid response from OpenAI API")
272
- raise InvalidResponseError("Invalid response from OpenAI API")
273
-
274
- content = response.choices[0].message.content
275
-
276
- if not content or content.strip() == "":
277
- logger.error("Received empty content from OpenAI API")
278
- raise InvalidResponseError("Received empty content from OpenAI API")
279
-
280
- if r"\u" in content:
281
- content = safe_unicode_decode(content.encode("utf-8"))
282
-
283
- if token_tracker and hasattr(response, "usage"):
284
- token_counts = {
285
- "prompt_tokens": getattr(response.usage, "prompt_tokens", 0),
286
- "completion_tokens": getattr(response.usage, "completion_tokens", 0),
287
- "total_tokens": getattr(response.usage, "total_tokens", 0),
288
- }
289
- token_tracker.add_usage(token_counts)
290
-
291
- logger.debug(f"Response content len: {len(content)}")
292
- verbose_debug(f"Response: {response}")
293
-
294
- return content
 
 
 
 
 
 
 
 
295
 
296
 
297
  async def openai_complete(
 
177
  logger.debug("===== Sending Query to LLM =====")
178
 
179
  try:
180
+ # Don't use async with context manager, use client directly
181
+ if "response_format" in kwargs:
182
+ response = await openai_async_client.beta.chat.completions.parse(
183
+ model=model, messages=messages, **kwargs
184
+ )
185
+ else:
186
+ response = await openai_async_client.chat.completions.create(
187
+ model=model, messages=messages, **kwargs
188
+ )
189
  except APIConnectionError as e:
190
  logger.error(f"OpenAI API Connection Error: {e}")
191
+ await openai_async_client.close() # Ensure client is closed
192
  raise
193
  except RateLimitError as e:
194
  logger.error(f"OpenAI API Rate Limit Error: {e}")
195
+ await openai_async_client.close() # Ensure client is closed
196
  raise
197
  except APITimeoutError as e:
198
  logger.error(f"OpenAI API Timeout Error: {e}")
199
+ await openai_async_client.close() # Ensure client is closed
200
  raise
201
  except Exception as e:
202
  logger.error(
203
  f"OpenAI API Call Failed,\nModel: {model},\nParams: {kwargs}, Got: {e}"
204
  )
205
+ await openai_async_client.close() # Ensure client is closed
206
  raise
207
 
208
  if hasattr(response, "__aiter__"):
 
247
  logger.warning(
248
  f"Failed to close stream response: {close_error}"
249
  )
250
+ # Ensure client is closed in case of exception
251
+ await openai_async_client.close()
252
  raise
253
  finally:
254
  # Ensure resources are released even if no exception occurs
 
264
  logger.warning(
265
  f"Failed to close stream response in finally block: {close_error}"
266
  )
267
+ # Note: We don't close the client here for streaming responses
268
+ # The client will be closed by the caller after streaming is complete
269
 
270
  return inner()
271
 
272
  else:
273
+ try:
274
+ if (
275
+ not response
276
+ or not response.choices
277
+ or not hasattr(response.choices[0], "message")
278
+ or not hasattr(response.choices[0].message, "content")
279
+ ):
280
+ logger.error("Invalid response from OpenAI API")
281
+ await openai_async_client.close() # Ensure client is closed
282
+ raise InvalidResponseError("Invalid response from OpenAI API")
283
+
284
+ content = response.choices[0].message.content
285
+
286
+ if not content or content.strip() == "":
287
+ logger.error("Received empty content from OpenAI API")
288
+ await openai_async_client.close() # Ensure client is closed
289
+ raise InvalidResponseError("Received empty content from OpenAI API")
290
+
291
+ if r"\u" in content:
292
+ content = safe_unicode_decode(content.encode("utf-8"))
293
+
294
+ if token_tracker and hasattr(response, "usage"):
295
+ token_counts = {
296
+ "prompt_tokens": getattr(response.usage, "prompt_tokens", 0),
297
+ "completion_tokens": getattr(
298
+ response.usage, "completion_tokens", 0
299
+ ),
300
+ "total_tokens": getattr(response.usage, "total_tokens", 0),
301
+ }
302
+ token_tracker.add_usage(token_counts)
303
+
304
+ logger.debug(f"Response content len: {len(content)}")
305
+ verbose_debug(f"Response: {response}")
306
+
307
+ return content
308
+ finally:
309
+ # Ensure client is closed in all cases for non-streaming responses
310
+ await openai_async_client.close()
311
 
312
 
313
  async def openai_complete(