factorstudios commited on
Commit
8b5393c
·
verified ·
1 Parent(s): 14ccf3f

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +26 -61
app.py CHANGED
@@ -29,15 +29,11 @@ HF_STATE_FILE = "processing_state_transcriptions.json"
29
  LOCAL_STATE_FOLDER = Path(".state")
30
  LOCAL_STATE_FOLDER.mkdir(exist_ok=True)
31
 
32
- # Processing configuration
33
- MAX_UPLOADS_BEFORE_PAUSE = 120 # Pause uploading after 120 files
34
- UPLOAD_PAUSE_ENABLED = True
35
-
36
  # Directory within the HF dataset where the audio files are located
37
  AUDIO_FILE_PREFIX = "audio/"
38
 
39
  WHISPER_SERVERS = [
40
- f"https://eliasishere-makeitfr-mineo-{i}.hf.space/transcribe" for i in range(1, 21)
41
  ]
42
 
43
  # Temporary storage for audio files
@@ -61,11 +57,6 @@ class WhisperServer:
61
  """Files per second"""
62
  return self.total_processed / self.total_time if self.total_time > 0 else 0
63
 
64
- def assign_file(self, file_index: int):
65
- """Assign a file index to this server"""
66
- self.is_processing = True
67
- self.current_file_index = file_index
68
-
69
  def release(self):
70
  """Release the server for a new file"""
71
  self.is_processing = False
@@ -73,42 +64,37 @@ class WhisperServer:
73
 
74
  # Global state for whisper servers
75
  servers = [WhisperServer(url) for url in WHISPER_SERVERS]
76
- server_lock = asyncio.Lock() # Lock for thread-safe server state access
77
 
78
  # --- Progress and State Management Functions ---
79
 
80
  def load_progress() -> Dict:
81
- """Loads the local processing progress from the JSON file."""
82
  default_structure = {
83
  "last_processed_index": 0,
84
- "processed_files": {}, # {index: repo_path}
85
- "file_list": [], # Full list of all zip files found in the dataset
86
  "uploaded_count": 0
87
  }
88
  if PROGRESS_FILE.exists():
89
  try:
90
  with PROGRESS_FILE.open('r') as f:
91
  data = json.load(f)
92
- # Ensure all keys exist
93
  for key, value in default_structure.items():
94
  if key not in data:
95
  data[key] = value
96
  return data
97
  except json.JSONDecodeError:
98
- print(f"[{FLOW_ID}] WARNING: Progress file is corrupted. Starting fresh.")
99
-
100
  return default_structure
101
 
102
  def save_progress(progress_data: Dict):
103
- """Saves the local processing progress to the JSON file."""
104
  try:
105
  with PROGRESS_FILE.open('w') as f:
106
  json.dump(progress_data, f, indent=4)
107
  except Exception as e:
108
- print(f"[{FLOW_ID}] CRITICAL ERROR: Could not save progress to {PROGRESS_FILE}: {e}")
109
 
110
  def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]:
111
- """Load state from JSON file with migration logic for new structure."""
112
  if os.path.exists(file_path):
113
  try:
114
  with open(file_path, "r") as f:
@@ -118,17 +104,15 @@ def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str,
118
  if "next_download_index" not in data:
119
  data["next_download_index"] = 0
120
  return data
121
- except json.JSONDecodeError:
122
- print(f"[{FLOW_ID}] WARNING: Corrupted state file: {file_path}")
123
  return default_value
124
 
125
  def save_json_state(file_path: str, data: Dict[str, Any]):
126
- """Save state to JSON file"""
127
  with open(file_path, "w") as f:
128
  json.dump(data, f, indent=2)
129
 
130
  async def download_hf_state() -> Dict[str, Any]:
131
- """Downloads the state file from Hugging Face or returns a default state."""
132
  local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
133
  default_state = {"next_download_index": 0, "file_states": {}}
134
  try:
@@ -142,11 +126,10 @@ async def download_hf_state() -> Dict[str, Any]:
142
  )
143
  return load_json_state(str(local_path), default_state)
144
  except Exception as e:
145
- print(f"[{FLOW_ID}] Failed to download state file: {str(e)}. Using local/default.")
146
  return load_json_state(str(local_path), default_state)
147
 
148
  async def upload_hf_state(state: Dict[str, Any]) -> bool:
149
- """Uploads the state file to Hugging Face."""
150
  local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
151
  try:
152
  save_json_state(str(local_path), state)
@@ -162,8 +145,6 @@ async def upload_hf_state(state: Dict[str, Any]) -> bool:
162
  print(f"[{FLOW_ID}] Failed to upload state file: {str(e)}")
163
  return False
164
 
165
- # --- Hugging Face Utility Functions ---
166
-
167
  async def get_audio_file_list(progress_data: Dict) -> List[str]:
168
  if progress_data.get('file_list'):
169
  return progress_data['file_list']
@@ -201,7 +182,6 @@ async def transcribe_with_server(server: WhisperServer, wav_path: Path) -> Optio
201
  return None
202
 
203
  async def process_file_task(wav_file: str, state: Dict, progress: Dict):
204
- # Find an available server
205
  server = None
206
  while server is None:
207
  async with server_lock:
@@ -214,11 +194,10 @@ async def process_file_task(wav_file: str, state: Dict, progress: Dict):
214
  await asyncio.sleep(1)
215
 
216
  try:
217
- wav_filename = Path(wav_file).name
218
- wav_path = TEMP_DIR / wav_filename
219
-
220
- # Download
221
- hf_hub_download(
222
  repo_id=HF_AUDIO_DATASET_ID,
223
  filename=wav_file,
224
  repo_type="dataset",
@@ -226,12 +205,15 @@ async def process_file_task(wav_file: str, state: Dict, progress: Dict):
226
  local_dir_use_symlinks=False,
227
  token=HF_TOKEN
228
  )
 
229
 
 
 
 
230
  # Transcribe
231
  result = await transcribe_with_server(server, wav_path)
232
 
233
  if result:
234
- # Upload transcription result to HF
235
  json_filename = Path(wav_file).with_suffix('.json').name
236
  json_content = json.dumps(result, indent=2, ensure_ascii=False).encode('utf-8')
237
 
@@ -251,6 +233,7 @@ async def process_file_task(wav_file: str, state: Dict, progress: Dict):
251
  state["file_states"][wav_file] = "failed_transcription"
252
  print(f"[{FLOW_ID}] ❌ Failed: {wav_file}")
253
 
 
254
  if wav_path.exists():
255
  wav_path.unlink()
256
 
@@ -270,7 +253,6 @@ async def main_processing_loop():
270
  file_list = await get_audio_file_list(progress)
271
 
272
  if not file_list:
273
- print(f"[{FLOW_ID}] File list empty, retrying in 60s...")
274
  await asyncio.sleep(60)
275
  continue
276
 
@@ -288,16 +270,13 @@ async def main_processing_loop():
288
  # 1. Handpick failed_transcription files
289
  failed_files = [f for f, s in state.get("file_states", {}).items() if s == "failed_transcription"]
290
 
291
- # 2. Also check for new files based on next_download_index
292
  next_idx = state.get("next_download_index", 0)
293
- # We take a larger chunk to allow for more skipping without re-fetching the list
294
- new_files_chunk = file_list[next_idx:next_idx + 500]
295
 
296
- # Combine: Prioritize failed files, then add new ones
297
  files_to_check = failed_files + [f for f in new_files_chunk if f not in state["file_states"]]
298
 
299
  if not files_to_check:
300
- print(f"[{FLOW_ID}] No files to process. Sleeping...")
301
  await asyncio.sleep(60)
302
  continue
303
 
@@ -310,71 +289,57 @@ async def main_processing_loop():
310
  expected_json_name = Path(f).with_suffix('.json').name
311
 
312
  if expected_json_name in existing_json_files:
313
- # Mark locally but DO NOT upload yet
314
  if state["file_states"].get(f) != "processed":
315
  state["file_states"][f] = "processed"
316
  state_changed_locally = True
317
-
318
- # Update next_download_index if it's a new file
319
  if f in new_files_chunk:
320
  current_idx = file_list.index(f)
321
  if current_idx >= state.get("next_download_index", 0):
322
  state["next_download_index"] = current_idx + 1
323
  continue
324
 
325
- # If we reach here, we found an UNPROCESSED file
326
  print(f"[{FLOW_ID}] Found unprocessed file: {f}")
327
 
328
- # Before processing, if we have local changes (skips), upload the state once
329
  if state_changed_locally:
330
- print(f"[{FLOW_ID}] Synchronizing skipped files to HF state before processing...")
331
  await upload_hf_state(state)
332
  state_changed_locally = False
333
 
334
  files_to_process.append(f)
335
-
336
- # Once we find an unprocessed file, we stop the skip-scan and start processing
337
- # This ensures we process files as soon as we find them
338
  break
339
 
340
- # If we scanned everything and only found skips, upload the state once at the end
341
  if state_changed_locally and not files_to_process:
342
- print(f"[{FLOW_ID}] Uploading final batch of skips to HF state...")
343
  await upload_hf_state(state)
344
 
345
  if not files_to_process:
346
- # If we only found skips, the loop will restart and check the next chunk
347
  continue
348
 
349
- print(f"[{FLOW_ID}] Processing batch of {len(files_to_process)} unprocessed files...")
350
-
351
- # Process the found unprocessed file(s)
352
- # (In this logic, it's usually just 1 file at a time to ensure frequent state updates)
353
  batch_size = len(servers)
354
  for i in range(0, len(files_to_process), batch_size):
355
  batch = files_to_process[i:i + batch_size]
356
  tasks = [process_file_task(f, state, progress) for f in batch]
357
  await asyncio.gather(*tasks)
358
 
359
- # Update next_download_index
360
  for f in batch:
361
  if f in file_list:
362
  current_idx = file_list.index(f)
363
  if current_idx >= state.get("next_download_index", 0):
364
  state["next_download_index"] = current_idx + 1
365
 
366
- # Save and upload state after processing the unprocessed file
367
  await upload_hf_state(state)
368
  save_progress(progress)
369
 
370
- await asyncio.sleep(2) # Short sleep before looking for the next unprocessed file
371
 
372
  except Exception as e:
373
  print(f"[{FLOW_ID}] Error in main loop: {e}")
374
  await asyncio.sleep(60)
375
 
376
  # --- FastAPI App ---
377
-
378
  app = FastAPI(title=f"Flow Server {FLOW_ID} API")
379
 
380
  @app.on_event("startup")
 
29
  LOCAL_STATE_FOLDER = Path(".state")
30
  LOCAL_STATE_FOLDER.mkdir(exist_ok=True)
31
 
 
 
 
 
32
  # Directory within the HF dataset where the audio files are located
33
  AUDIO_FILE_PREFIX = "audio/"
34
 
35
  WHISPER_SERVERS = [
36
+ f"https://makeitfr-mineo-{i}.hf.space/transcribe" for i in range(1, 21)
37
  ]
38
 
39
  # Temporary storage for audio files
 
57
  """Files per second"""
58
  return self.total_processed / self.total_time if self.total_time > 0 else 0
59
 
 
 
 
 
 
60
  def release(self):
61
  """Release the server for a new file"""
62
  self.is_processing = False
 
64
 
65
  # Global state for whisper servers
66
  servers = [WhisperServer(url) for url in WHISPER_SERVERS]
67
+ server_lock = asyncio.Lock()
68
 
69
  # --- Progress and State Management Functions ---
70
 
71
  def load_progress() -> Dict:
 
72
  default_structure = {
73
  "last_processed_index": 0,
74
+ "processed_files": {},
75
+ "file_list": [],
76
  "uploaded_count": 0
77
  }
78
  if PROGRESS_FILE.exists():
79
  try:
80
  with PROGRESS_FILE.open('r') as f:
81
  data = json.load(f)
 
82
  for key, value in default_structure.items():
83
  if key not in data:
84
  data[key] = value
85
  return data
86
  except json.JSONDecodeError:
87
+ print(f"[{FLOW_ID}] WARNING: Progress file is corrupted.")
 
88
  return default_structure
89
 
90
  def save_progress(progress_data: Dict):
 
91
  try:
92
  with PROGRESS_FILE.open('w') as f:
93
  json.dump(progress_data, f, indent=4)
94
  except Exception as e:
95
+ print(f"[{FLOW_ID}] CRITICAL ERROR: Could not save progress: {e}")
96
 
97
  def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]:
 
98
  if os.path.exists(file_path):
99
  try:
100
  with open(file_path, "r") as f:
 
104
  if "next_download_index" not in data:
105
  data["next_download_index"] = 0
106
  return data
107
+ except Exception:
108
+ pass
109
  return default_value
110
 
111
  def save_json_state(file_path: str, data: Dict[str, Any]):
 
112
  with open(file_path, "w") as f:
113
  json.dump(data, f, indent=2)
114
 
115
  async def download_hf_state() -> Dict[str, Any]:
 
116
  local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
117
  default_state = {"next_download_index": 0, "file_states": {}}
118
  try:
 
126
  )
127
  return load_json_state(str(local_path), default_state)
128
  except Exception as e:
129
+ print(f"[{FLOW_ID}] Failed to download state file: {str(e)}")
130
  return load_json_state(str(local_path), default_state)
131
 
132
  async def upload_hf_state(state: Dict[str, Any]) -> bool:
 
133
  local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
134
  try:
135
  save_json_state(str(local_path), state)
 
145
  print(f"[{FLOW_ID}] Failed to upload state file: {str(e)}")
146
  return False
147
 
 
 
148
  async def get_audio_file_list(progress_data: Dict) -> List[str]:
149
  if progress_data.get('file_list'):
150
  return progress_data['file_list']
 
182
  return None
183
 
184
  async def process_file_task(wav_file: str, state: Dict, progress: Dict):
 
185
  server = None
186
  while server is None:
187
  async with server_lock:
 
194
  await asyncio.sleep(1)
195
 
196
  try:
197
+ # FIX: Ensure we use the correct local path for the downloaded file
198
+ # hf_hub_download returns the absolute path to the downloaded file
199
+ print(f"[{FLOW_ID}] Downloading {wav_file}...")
200
+ downloaded_path_str = hf_hub_download(
 
201
  repo_id=HF_AUDIO_DATASET_ID,
202
  filename=wav_file,
203
  repo_type="dataset",
 
205
  local_dir_use_symlinks=False,
206
  token=HF_TOKEN
207
  )
208
+ wav_path = Path(downloaded_path_str)
209
 
210
+ if not wav_path.exists():
211
+ raise FileNotFoundError(f"Downloaded file not found at {wav_path}")
212
+
213
  # Transcribe
214
  result = await transcribe_with_server(server, wav_path)
215
 
216
  if result:
 
217
  json_filename = Path(wav_file).with_suffix('.json').name
218
  json_content = json.dumps(result, indent=2, ensure_ascii=False).encode('utf-8')
219
 
 
233
  state["file_states"][wav_file] = "failed_transcription"
234
  print(f"[{FLOW_ID}] ❌ Failed: {wav_file}")
235
 
236
+ # Cleanup
237
  if wav_path.exists():
238
  wav_path.unlink()
239
 
 
253
  file_list = await get_audio_file_list(progress)
254
 
255
  if not file_list:
 
256
  await asyncio.sleep(60)
257
  continue
258
 
 
270
  # 1. Handpick failed_transcription files
271
  failed_files = [f for f, s in state.get("file_states", {}).items() if s == "failed_transcription"]
272
 
273
+ # 2. Check for new files
274
  next_idx = state.get("next_download_index", 0)
275
+ new_files_chunk = file_list[next_idx:next_idx + 1000]
 
276
 
 
277
  files_to_check = failed_files + [f for f in new_files_chunk if f not in state["file_states"]]
278
 
279
  if not files_to_check:
 
280
  await asyncio.sleep(60)
281
  continue
282
 
 
289
  expected_json_name = Path(f).with_suffix('.json').name
290
 
291
  if expected_json_name in existing_json_files:
 
292
  if state["file_states"].get(f) != "processed":
293
  state["file_states"][f] = "processed"
294
  state_changed_locally = True
 
 
295
  if f in new_files_chunk:
296
  current_idx = file_list.index(f)
297
  if current_idx >= state.get("next_download_index", 0):
298
  state["next_download_index"] = current_idx + 1
299
  continue
300
 
301
+ # Found an UNPROCESSED file
302
  print(f"[{FLOW_ID}] Found unprocessed file: {f}")
303
 
304
+ # Upload skipped state before processing
305
  if state_changed_locally:
306
+ print(f"[{FLOW_ID}] Synchronizing skipped files to HF state...")
307
  await upload_hf_state(state)
308
  state_changed_locally = False
309
 
310
  files_to_process.append(f)
 
 
 
311
  break
312
 
 
313
  if state_changed_locally and not files_to_process:
314
+ print(f"[{FLOW_ID}] Uploading final batch of skips...")
315
  await upload_hf_state(state)
316
 
317
  if not files_to_process:
 
318
  continue
319
 
320
+ # Process the unprocessed file
 
 
 
321
  batch_size = len(servers)
322
  for i in range(0, len(files_to_process), batch_size):
323
  batch = files_to_process[i:i + batch_size]
324
  tasks = [process_file_task(f, state, progress) for f in batch]
325
  await asyncio.gather(*tasks)
326
 
 
327
  for f in batch:
328
  if f in file_list:
329
  current_idx = file_list.index(f)
330
  if current_idx >= state.get("next_download_index", 0):
331
  state["next_download_index"] = current_idx + 1
332
 
 
333
  await upload_hf_state(state)
334
  save_progress(progress)
335
 
336
+ await asyncio.sleep(2)
337
 
338
  except Exception as e:
339
  print(f"[{FLOW_ID}] Error in main loop: {e}")
340
  await asyncio.sleep(60)
341
 
342
  # --- FastAPI App ---
 
343
  app = FastAPI(title=f"Flow Server {FLOW_ID} API")
344
 
345
  @app.on_event("startup")