| import aiosqlite |
| import os |
| from datetime import datetime, timedelta |
| from app.core.config import settings |
| from custom_logger import logger_config as logger |
|
|
| async def insert_task(task_id: str, filename: str, text: str, voice: str, speed: float, status: str, hide_from_ui: int): |
| async with aiosqlite.connect(settings.DATABASE_FILE) as db: |
| await db.execute('''INSERT INTO tasks |
| (id, filename, text, voice, speed, status, created_at, hide_from_ui) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?)''', |
| (task_id, filename, text, voice, speed, status, datetime.now().isoformat(), hide_from_ui)) |
| await db.commit() |
| logger.debug(f"Inserted task {task_id} into database.") |
|
|
| async def update_status(task_id: str, status: str, output_file: str = None, error: str = None): |
| async with aiosqlite.connect(settings.DATABASE_FILE) as db: |
| if status == 'completed': |
| await db.execute('''UPDATE tasks |
| SET status = ?, output_file = ?, processed_at = ?, progress = 100, progress_text = 'Completed' |
| WHERE id = ?''', |
| (status, output_file, datetime.now().isoformat(), task_id)) |
| logger.info(f"Task ID {task_id} marked as completed.") |
| elif status == 'failed': |
| await db.execute('''UPDATE tasks |
| SET status = ?, error = ?, processed_at = ?, progress_text = 'Failed' |
| WHERE id = ?''', |
| (status, str(error), datetime.now().isoformat(), task_id)) |
| logger.error(f"Task ID {task_id} marked as failed. Error: {error}") |
| else: |
| await db.execute('UPDATE tasks SET status = ? WHERE id = ?', (status, task_id)) |
| logger.debug(f"Task ID {task_id} status updated to {status}.") |
| await db.commit() |
|
|
| async def update_progress(task_id: str, progress: int, progress_text: str = None): |
| async with aiosqlite.connect(settings.DATABASE_FILE) as db: |
| await db.execute('UPDATE tasks SET progress = ?, progress_text = ? WHERE id = ?', |
| (progress, progress_text, task_id)) |
| await db.commit() |
| logger.debug(f"Task ID {task_id} progress updated to {progress}% ({progress_text}).") |
|
|
| async def get_next_not_started(): |
| async with aiosqlite.connect(settings.DATABASE_FILE) as db: |
| db.row_factory = aiosqlite.Row |
| async with db.execute('''SELECT * FROM tasks |
| WHERE status = 'not_started' |
| ORDER BY created_at ASC |
| LIMIT 1''') as cursor: |
| return await cursor.fetchone() |
|
|
| async def cleanup_old_entries(): |
| try: |
| async with aiosqlite.connect(settings.DATABASE_FILE) as db: |
| db.row_factory = aiosqlite.Row |
| cutoff_date = (datetime.now() - timedelta(days=settings.RETENTION_DAYS)).isoformat() |
| |
| async with db.execute('''SELECT id, output_file FROM tasks |
| WHERE created_at < ?''', (cutoff_date,)) as cursor: |
| old_entries = await cursor.fetchall() |
| |
| if old_entries: |
| deleted_files = 0 |
| deleted_rows = 0 |
| |
| for entry in old_entries: |
| output_file = entry['output_file'] |
| if output_file: |
| filepath = os.path.join(settings.UPLOAD_FOLDER, output_file) |
| if os.path.exists(filepath): |
| try: |
| os.remove(filepath) |
| deleted_files += 1 |
| except Exception as e: |
| logger.warning(f"Failed to delete old file {filepath}: {e}") |
| |
| async with db.execute('''DELETE FROM tasks WHERE created_at < ?''', (cutoff_date,)) as cursor: |
| deleted_rows = cursor.rowcount |
| await db.commit() |
| |
| if deleted_rows > 0 or deleted_files > 0: |
| logger.info(f"Cleanup: Deleted {deleted_rows} old entries and {deleted_files} audio files") |
| except Exception as e: |
| logger.error(f"Cleanup error: {e}") |
|
|
| async def get_average_processing_time(): |
| async with aiosqlite.connect(settings.DATABASE_FILE) as db: |
| db.row_factory = aiosqlite.Row |
| async with db.execute('''SELECT created_at, processed_at FROM tasks |
| WHERE status = 'completed' AND processed_at IS NOT NULL |
| ORDER BY processed_at DESC LIMIT 20''') as cursor: |
| completed_rows = await cursor.fetchall() |
| |
| if not completed_rows: |
| return 30.0 |
| |
| total_seconds = 0 |
| count = 0 |
| for r in completed_rows: |
| try: |
| created = datetime.fromisoformat(r['created_at']) |
| processed = datetime.fromisoformat(r['processed_at']) |
| duration = (processed - created).total_seconds() |
| if duration > 0: |
| total_seconds += duration |
| count += 1 |
| except: |
| continue |
| |
| return total_seconds / count if count > 0 else 30.0 |
|
|
| async def get_all_tasks(): |
| async with aiosqlite.connect(settings.DATABASE_FILE) as db: |
| db.row_factory = aiosqlite.Row |
| |
| avg_time = await get_average_processing_time() |
| |
| async with db.execute('''SELECT id FROM tasks |
| WHERE status = 'not_started' |
| ORDER BY created_at ASC''') as cursor: |
| queue_ids = [row['id'] for row in await cursor.fetchall()] |
| |
| async with db.execute('''SELECT COUNT(*) as count FROM tasks WHERE status = 'processing' ''') as cursor: |
| row = await cursor.fetchone() |
| processing_count = row['count'] |
| |
| async with db.execute('SELECT * FROM tasks WHERE hide_from_ui = 0 OR hide_from_ui IS NULL ORDER BY created_at DESC') as cursor: |
| rows = await cursor.fetchall() |
| |
| return rows, queue_ids, processing_count, avg_time |
|
|
| async def get_task_by_id(task_id: str): |
| async with aiosqlite.connect(settings.DATABASE_FILE) as db: |
| db.row_factory = aiosqlite.Row |
| async with db.execute('SELECT * FROM tasks WHERE id = ?', (task_id,)) as cursor: |
| row = await cursor.fetchone() |
| |
| if not row: |
| return None |
| |
| queue_position = None |
| estimated_start_seconds = None |
| |
| if row['status'] == 'not_started': |
| avg_time = await get_average_processing_time() |
| |
| async with db.execute('''SELECT COUNT(*) as position FROM tasks |
| WHERE status = 'not_started' AND created_at < ?''', |
| (row['created_at'],)) as cursor: |
| position_row = await cursor.fetchone() |
| queue_position = position_row['position'] + 1 |
| |
| async with db.execute('''SELECT COUNT(*) as count FROM tasks WHERE status = 'processing' ''') as cursor: |
| count_row = await cursor.fetchone() |
| processing_count = count_row['count'] |
| |
| tasks_ahead = queue_position - 1 + processing_count |
| estimated_start_seconds = round(tasks_ahead * avg_time) |
| |
| return row, queue_position, estimated_start_seconds |
|
|