Molion0000 commited on
Commit
442f485
·
1 Parent(s): e83bc35

Fix #1746: [openai.py logic for streaming complete]

Browse files
Files changed (1) hide show
  1. lightrag/llm/openai.py +24 -4
lightrag/llm/openai.py CHANGED
@@ -210,9 +210,16 @@ async def openai_complete_if_cache(
210
  async def inner():
211
  # Track if we've started iterating
212
  iteration_started = False
 
 
213
  try:
214
  iteration_started = True
215
  async for chunk in response:
 
 
 
 
 
216
  # Check if choices exists and is not empty
217
  if not hasattr(chunk, "choices") or not chunk.choices:
218
  logger.warning(f"Received chunk without choices: {chunk}")
@@ -222,16 +229,29 @@ async def openai_complete_if_cache(
222
  if not hasattr(chunk.choices[0], "delta") or not hasattr(
223
  chunk.choices[0].delta, "content"
224
  ):
225
- logger.warning(
226
- f"Received chunk without delta content: {chunk.choices[0]}"
227
- )
228
  continue
 
229
  content = chunk.choices[0].delta.content
230
  if content is None:
231
  continue
232
  if r"\u" in content:
233
  content = safe_unicode_decode(content.encode("utf-8"))
 
234
  yield content
 
 
 
 
 
 
 
 
 
 
 
 
 
235
  except Exception as e:
236
  logger.error(f"Error in stream response: {str(e)}")
237
  # Try to clean up resources if possible
@@ -451,4 +471,4 @@ async def openai_embed(
451
  response = await openai_async_client.embeddings.create(
452
  model=model, input=texts, encoding_format="float"
453
  )
454
- return np.array([dp.embedding for dp in response.data])
 
210
  async def inner():
211
  # Track if we've started iterating
212
  iteration_started = False
213
+ final_chunk_usage = None
214
+
215
  try:
216
  iteration_started = True
217
  async for chunk in response:
218
+ # Check if this chunk has usage information (final chunk)
219
+ if hasattr(chunk, "usage") and chunk.usage:
220
+ final_chunk_usage = chunk.usage
221
+ logger.debug(f"Received usage info in streaming chunk: {chunk.usage}")
222
+
223
  # Check if choices exists and is not empty
224
  if not hasattr(chunk, "choices") or not chunk.choices:
225
  logger.warning(f"Received chunk without choices: {chunk}")
 
229
  if not hasattr(chunk.choices[0], "delta") or not hasattr(
230
  chunk.choices[0].delta, "content"
231
  ):
232
+ # This might be the final chunk, continue to check for usage
 
 
233
  continue
234
+
235
  content = chunk.choices[0].delta.content
236
  if content is None:
237
  continue
238
  if r"\u" in content:
239
  content = safe_unicode_decode(content.encode("utf-8"))
240
+
241
  yield content
242
+
243
+ # After streaming is complete, track token usage
244
+ if token_tracker and final_chunk_usage:
245
+ # Use actual usage from the API
246
+ token_counts = {
247
+ "prompt_tokens": getattr(final_chunk_usage, "prompt_tokens", 0),
248
+ "completion_tokens": getattr(final_chunk_usage, "completion_tokens", 0),
249
+ "total_tokens": getattr(final_chunk_usage, "total_tokens", 0),
250
+ }
251
+ token_tracker.add_usage(token_counts)
252
+ logger.debug(f"Streaming token usage (from API): {token_counts}")
253
+ elif token_tracker:
254
+ logger.debug("No usage information available in streaming response")
255
  except Exception as e:
256
  logger.error(f"Error in stream response: {str(e)}")
257
  # Try to clean up resources if possible
 
471
  response = await openai_async_client.embeddings.create(
472
  model=model, input=texts, encoding_format="float"
473
  )
474
+ return np.array([dp.embedding for dp in response.data])