Spaces:
Sleeping
Sleeping
File size: 8,568 Bytes
fc5fa78 e7b58b1 fc5fa78 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
"""
RESTful API for Federated Learning Server
Handles client registration, model updates, and coordination
"""
from flask import Flask, request, jsonify
import logging
import threading
import time
from typing import Dict, Any, List
from ..server.coordinator import FederatedCoordinator
from ..utils.metrics import calculate_model_similarity
logger = logging.getLogger(__name__)
class FederatedAPI:
def __init__(self, coordinator: FederatedCoordinator, host: str = "0.0.0.0", port: int = 8080):
self.app = Flask(__name__)
self.coordinator = coordinator
self.host = host
self.port = port
self._setup_routes()
def _setup_routes(self):
"""Setup API routes"""
@self.app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
return jsonify({
'status': 'healthy',
'timestamp': time.time(),
'active_clients': len(self.coordinator.clients),
'current_round': getattr(self.coordinator, 'current_round', 0)
})
@self.app.route('/register', methods=['POST'])
def register_client():
"""Register a new client"""
try:
data = request.get_json()
client_id = data.get('client_id')
client_info = data.get('client_info', {})
if not client_id:
return jsonify({'error': 'client_id is required'}), 400
success = self.coordinator.register_client(client_id, client_info)
if success:
return jsonify({
'status': 'registered',
'client_id': client_id,
'server_config': self.coordinator.get_client_config()
})
else:
return jsonify({'error': 'Registration failed'}), 400
except Exception as e:
logger.error(f"Error registering client: {str(e)}")
return jsonify({'error': str(e)}), 500
@self.app.route('/get_model', methods=['POST'])
def get_global_model():
"""Get the current global model"""
try:
data = request.get_json()
client_id = data.get('client_id')
if not client_id or client_id not in self.coordinator.clients:
return jsonify({'error': 'Invalid client_id'}), 400
model_weights = self.coordinator.get_global_model()
return jsonify({
'model_weights': model_weights,
'round': getattr(self.coordinator, 'current_round', 0),
'timestamp': time.time()
})
except Exception as e:
logger.error(f"Error getting global model: {str(e)}")
return jsonify({'error': str(e)}), 500
@self.app.route('/submit_update', methods=['POST'])
def submit_model_update():
"""Submit a model update from client"""
try:
data = request.get_json()
client_id = data.get('client_id')
model_weights = data.get('model_weights')
training_metrics = data.get('metrics', {})
if not client_id or not model_weights:
return jsonify({'error': 'client_id and model_weights are required'}), 400
if client_id not in self.coordinator.clients:
return jsonify({'error': 'Client not registered'}), 400
# Store the update
self.coordinator.receive_model_update(client_id, model_weights, training_metrics)
return jsonify({
'status': 'update_received',
'client_id': client_id,
'timestamp': time.time()
})
except Exception as e:
logger.error(f"Error submitting model update: {str(e)}")
return jsonify({'error': str(e)}), 500
@self.app.route('/training_status', methods=['GET'])
def get_training_status():
"""Get current training status"""
try:
return jsonify({
'current_round': getattr(self.coordinator, 'current_round', 0),
'total_rounds': self.coordinator.config.get('federated', {}).get('num_rounds', 10),
'active_clients': len(self.coordinator.clients),
'clients_ready': len(getattr(self.coordinator, 'client_updates', {})),
'min_clients': self.coordinator.config.get('federated', {}).get('min_clients', 2),
'training_active': getattr(self.coordinator, 'training_active', False)
})
except Exception as e:
logger.error(f"Error getting training status: {str(e)}")
return jsonify({'error': str(e)}), 500
@self.app.route('/rag/query', methods=['POST'])
def rag_query():
"""Handle RAG queries"""
try:
data = request.get_json()
query = data.get('query')
client_id = data.get('client_id')
if not query:
return jsonify({'error': 'query is required'}), 400
# This will be implemented when we integrate RAG
return jsonify({
'response': 'RAG functionality coming soon',
'query': query,
'timestamp': time.time()
})
except Exception as e:
logger.error(f"Error processing RAG query: {str(e)}")
return jsonify({'error': str(e)}), 500
@self.app.route('/predict', methods=['POST'])
def predict():
"""Predict using the current global model."""
try:
data = request.get_json()
features = data.get('features')
if features is None or not isinstance(features, list) or len(features) != 32:
return jsonify({'error': 'features must be a list of 32 floats'}), 400
# Get global model weights
model_weights = self.coordinator.get_global_model()
if model_weights is None:
return jsonify({'error': 'Global model not available yet'}), 503
# Build the model (same as client)
import tensorflow as tf
import numpy as np
input_dim = 32
model = tf.keras.Sequential([
tf.keras.layers.Input(shape=(input_dim,)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(1)
])
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='mse')
model.set_weights([np.array(w) for w in model_weights])
# Prepare input and predict
x = np.array(features, dtype=np.float32).reshape(1, -1)
pred = model.predict(x)
prediction = float(pred[0, 0])
return jsonify({'prediction': prediction})
except Exception as e:
logger.error(f"Error in prediction endpoint: {str(e)}")
return jsonify({'error': str(e)}), 500
def run(self, debug: bool = False):
"""Run the API server"""
logger.info(f"Starting Federated API server on {self.host}:{self.port}")
self.app.run(host=self.host, port=self.port, debug=debug, threaded=True)
def run_threaded(self, debug: bool = False):
"""Run the API server in a separate thread"""
def run_server():
self.app.run(host=self.host, port=self.port, debug=debug, threaded=True)
thread = threading.Thread(target=run_server, daemon=True)
thread.start()
logger.info(f"Federated API server started in background on {self.host}:{self.port}")
return thread
|