File size: 8,748 Bytes
be069c2 db88504 be069c2 db88504 be069c2 db88504 be069c2 db88504 be069c2 db88504 be069c2 db88504 be069c2 db88504 be069c2 96bf186 be069c2 db88504 be069c2 db88504 be069c2 db88504 be069c2 db88504 be069c2 db88504 be069c2 024bf4a be069c2 024bf4a db88504 024bf4a db88504 be069c2 db88504 be069c2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
import torch
import numpy as np
import logging
import queue
import threading
import time
from dataclasses import dataclass
from typing import Optional, Dict, List
import gc
from datetime import datetime, time as dt_time
import os
from collections import deque
import asyncio
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class JobRequest:
"""Represents a single voice conversion request"""
id: str # Unique identifier for the job
audio_data: np.ndarray # Input audio data
model_name: str # Name of the RVC model to use
priority: int = 1 # Priority level (1-5, 5 being highest)
timestamp: float = None # When the job was submitted
def __post_init__(self):
self.timestamp = time.time() if self.timestamp is None else self.timestamp
class ModelCache:
"""Manages cached models with LRU eviction policy"""
def __init__(self, max_models: int = 3):
self.max_models = max_models
self.models: Dict[str, torch.nn.Module] = {}
self.model_usage: deque = deque()
self.lock = threading.Lock()
def get_model(self, model_name: str) -> Optional[torch.nn.Module]:
"""Get model from cache, implementing LRU policy"""
with self.lock:
if model_name in self.models:
# Update usage history
self.model_usage.remove(model_name)
self.model_usage.append(model_name)
return self.models[model_name]
return None
def add_model(self, model_name: str, model: torch.nn.Module):
"""Add model to cache, evicting least recently used if necessary"""
with self.lock:
if len(self.models) >= self.max_models:
# Evict least recently used model
lru_model = self.model_usage.popleft()
del self.models[lru_model]
# Force garbage collection to free GPU memory
gc.collect()
torch.cuda.empty_cache()
self.models[model_name] = model
self.model_usage.append(model_name)
class JobQueue:
"""Manages prioritized job queue with rate limiting"""
def __init__(self, max_size: int = 100):
self.queue = queue.PriorityQueue(maxsize=max_size)
self.processing: Dict[str, JobRequest] = {}
self.lock = threading.Lock()
self.last_processed = time.time()
self.rate_limit = 1.0 # Minimum seconds between jobs
def add_job(self, job: JobRequest) -> bool:
"""Add job to queue with priority"""
try:
# Priority tuple: (priority reversed, timestamp, job)
# Lower number = higher priority
self.queue.put((6 - job.priority, job.timestamp, job), block=False)
logger.info(f"Added job {job.id} to queue. Priority: {job.priority}")
return True
except queue.Full:
logger.warning("Queue is full, job rejected")
return False
def get_next_job(self) -> Optional[JobRequest]:
"""Get next job respecting rate limiting"""
if time.time() - self.last_processed < self.rate_limit:
return None
try:
_, _, job = self.queue.get(block=False)
with self.lock:
self.processing[job.id] = job
self.last_processed = time.time()
return job
except queue.Empty:
return None
class RVCService:
"""Main service class for RVC processing"""
def __init__(self):
self.model_cache = ModelCache(max_models=3)
self.job_queue = JobQueue(max_size=100)
self.is_running = False
self.worker_thread = None
# Operating hours (24-hour format)
self.start_time = dt_time(9, 0) # 9:00 AM
self.end_time = dt_time(0, 0) # 12:00 AM
def within_operating_hours(self) -> bool:
"""Check if current time is within operating hours"""
current_time = datetime.now().time()
# For testing/development, always return True
# TODO: Implement proper operating hours check for production
return True
# When ready for production, uncomment this:
# if self.start_time <= self.end_time:
# return self.start_time <= current_time <= self.end_time
# else: # Handles overnight operation (e.g., 9 AM to 12 AM)
# return current_time >= self.start_time or current_time <= self.end_time
async def process_audio(self, job: JobRequest) -> Optional[np.ndarray]:
"""Process a single audio conversion job"""
try:
# Get or load model
model = self.model_cache.get_model(job.model_name)
if model is None:
logger.info(f"Loading model {job.model_name}")
# Here you would load your RVC model
# model = load_rvc_model(job.model_name)
self.model_cache.add_model(job.model_name, model)
# Process audio
with torch.cuda.amp.autocast():
# Your RVC processing logic here
# output = model.convert_voice(job.audio_data)
output = job.audio_data # Placeholder
return output
except Exception as e:
logger.error(f"Error processing job {job.id}: {str(e)}")
return None
async def worker_loop(self):
"""Main worker loop processing jobs from queue"""
while self.is_running:
try:
# Check operating hours
if not self.within_operating_hours():
logger.info("Outside operating hours, worker sleeping...")
await asyncio.sleep(300) # Check every 5 minutes
continue
# Get next job
job = self.job_queue.get_next_job()
if job is None:
await asyncio.sleep(0.1) # Prevent busy waiting
continue
logger.info(f"Processing job {job.id}")
output = await self.process_audio(job)
if output is not None:
logger.info(f"Successfully processed job {job.id}")
else:
logger.error(f"Failed to process job {job.id}")
# Cleanup
with self.job_queue.lock:
self.job_queue.processing.pop(job.id, None)
except Exception as e:
logger.error(f"Worker error: {str(e)}")
await asyncio.sleep(1) # Prevent rapid error loops
def start(self):
"""Start the service"""
if not self.is_running:
self.is_running = True
# Create a new event loop for the worker
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Start the worker loop in the background
def run_worker():
loop.run_until_complete(self.worker_loop())
self.worker_thread = threading.Thread(target=run_worker)
self.worker_thread.daemon = True
self.worker_thread.start()
logger.info("RVC Service started")
def stop(self):
"""Stop the service"""
self.is_running = False
logger.info("RVC Service stopping...")
async def submit_job(self, audio_data: np.ndarray, model_name: str, priority: int = 1) -> str:
"""Submit a new job to the service"""
job_id = f"job_{int(time.time())}_{id(audio_data)}"
job = JobRequest(
id=job_id,
audio_data=audio_data,
model_name=model_name,
priority=priority
)
if self.job_queue.add_job(job):
return job_id
return None
# Memory management utilities
def cleanup_gpu_memory():
"""Force cleanup of GPU memory"""
gc.collect()
torch.cuda.empty_cache()
def monitor_gpu_memory():
"""Log GPU memory usage"""
if torch.cuda.is_available():
allocated = torch.cuda.memory_allocated() / 1024**2
reserved = torch.cuda.memory_reserved() / 1024**2
logger.info(f"GPU Memory: {allocated:.2f}MB allocated, {reserved:.2f}MB reserved") |