Spaces:
Sleeping
Sleeping
package grpccoordinator | |
import ( | |
"context" | |
"errors" | |
"time" | |
"github.com/apache/pulsar-client-go/pulsar" | |
"github.com/chroma/chroma-coordinator/internal/coordinator" | |
"github.com/chroma/chroma-coordinator/internal/grpccoordinator/grpcutils" | |
"github.com/chroma/chroma-coordinator/internal/memberlist_manager" | |
"github.com/chroma/chroma-coordinator/internal/metastore/db/dao" | |
"github.com/chroma/chroma-coordinator/internal/metastore/db/dbcore" | |
"github.com/chroma/chroma-coordinator/internal/notification" | |
"github.com/chroma/chroma-coordinator/internal/proto/coordinatorpb" | |
"github.com/chroma/chroma-coordinator/internal/utils" | |
"github.com/pingcap/log" | |
"go.uber.org/zap" | |
"google.golang.org/grpc" | |
"google.golang.org/grpc/health" | |
"gorm.io/gorm" | |
) | |
type Config struct { | |
// GrpcConfig config | |
GrpcConfig *grpcutils.GrpcConfig | |
// System catalog provider | |
SystemCatalogProvider string | |
// MetaTable config | |
Username string | |
Password string | |
Address string | |
Port int | |
DBName string | |
MaxIdleConns int | |
MaxOpenConns int | |
// Notification config | |
NotificationStoreProvider string | |
NotifierProvider string | |
NotificationTopic string | |
// Pulsar config | |
PulsarAdminURL string | |
PulsarURL string | |
PulsarTenant string | |
PulsarNamespace string | |
// Kubernetes config | |
KubernetesNamespace string | |
WorkerMemberlistName string | |
// Assignment policy config can be "simple" or "rendezvous" | |
AssignmentPolicy string | |
// Watcher config | |
WatchInterval time.Duration | |
// Config for testing | |
Testing bool | |
} | |
// Server wraps Coordinator with GRPC services. | |
// | |
// When Testing is set to true, the GRPC services will not be intialzed. This is | |
// convenient for end-to-end property based testing. | |
type Server struct { | |
coordinatorpb.UnimplementedSysDBServer | |
coordinator coordinator.ICoordinator | |
grpcServer grpcutils.GrpcServer | |
healthServer *health.Server | |
} | |
func New(config Config) (*Server, error) { | |
if config.SystemCatalogProvider == "memory" { | |
return NewWithGrpcProvider(config, grpcutils.Default, nil) | |
} else if config.SystemCatalogProvider == "database" { | |
dBConfig := dbcore.DBConfig{ | |
Username: config.Username, | |
Password: config.Password, | |
Address: config.Address, | |
Port: config.Port, | |
DBName: config.DBName, | |
MaxIdleConns: config.MaxIdleConns, | |
MaxOpenConns: config.MaxOpenConns, | |
} | |
db, err := dbcore.Connect(dBConfig) | |
if err != nil { | |
return nil, err | |
} | |
return NewWithGrpcProvider(config, grpcutils.Default, db) | |
} else { | |
return nil, errors.New("invalid system catalog provider, only memory and database are supported") | |
} | |
} | |
func NewWithGrpcProvider(config Config, provider grpcutils.GrpcProvider, db *gorm.DB) (*Server, error) { | |
ctx := context.Background() | |
s := &Server{ | |
healthServer: health.NewServer(), | |
} | |
var assignmentPolicy coordinator.CollectionAssignmentPolicy | |
if config.AssignmentPolicy == "simple" { | |
log.Info("Using simple assignment policy") | |
assignmentPolicy = coordinator.NewSimpleAssignmentPolicy(config.PulsarTenant, config.PulsarNamespace) | |
} else if config.AssignmentPolicy == "rendezvous" { | |
log.Info("Using rendezvous assignment policy") | |
err := utils.CreateTopics(config.PulsarAdminURL, config.PulsarTenant, config.PulsarNamespace, coordinator.Topics[:]) | |
if err != nil { | |
log.Error("Failed to create topics", zap.Error(err)) | |
return nil, err | |
} | |
assignmentPolicy = coordinator.NewRendezvousAssignmentPolicy(config.PulsarTenant, config.PulsarNamespace) | |
} else { | |
return nil, errors.New("invalid assignment policy, only simple and rendezvous are supported") | |
} | |
var notificationStore notification.NotificationStore | |
if config.NotificationStoreProvider == "memory" { | |
log.Info("Using memory notification store") | |
notificationStore = notification.NewMemoryNotificationStore() | |
} else if config.NotificationStoreProvider == "database" { | |
txnImpl := dbcore.NewTxImpl() | |
metaDomain := dao.NewMetaDomain() | |
notificationStore = notification.NewDatabaseNotificationStore(txnImpl, metaDomain) | |
} else { | |
return nil, errors.New("invalid notification store provider, only memory and database are supported") | |
} | |
var notifier notification.Notifier | |
var client pulsar.Client | |
var producer pulsar.Producer | |
if config.NotifierProvider == "memory" { | |
log.Info("Using memory notifier") | |
notifier = notification.NewMemoryNotifier() | |
} else if config.NotifierProvider == "pulsar" { | |
log.Info("Using pulsar notifier") | |
pulsarNotifier, pulsarClient, pulsarProducer, err := createPulsarNotifer(config.PulsarURL, config.NotificationTopic) | |
notifier = pulsarNotifier | |
client = pulsarClient | |
producer = pulsarProducer | |
if err != nil { | |
log.Error("Failed to create pulsar notifier", zap.Error(err)) | |
return nil, err | |
} | |
} else { | |
return nil, errors.New("invalid notifier provider, only memory and pulsar are supported") | |
} | |
if client != nil { | |
defer client.Close() | |
} | |
if producer != nil { | |
defer producer.Close() | |
} | |
coordinator, err := coordinator.NewCoordinator(ctx, assignmentPolicy, db, notificationStore, notifier) | |
if err != nil { | |
return nil, err | |
} | |
s.coordinator = coordinator | |
s.coordinator.Start() | |
if !config.Testing { | |
memberlist_manager, err := createMemberlistManager(config) | |
if err != nil { | |
return nil, err | |
} | |
// Start the memberlist manager | |
err = memberlist_manager.Start() | |
if err != nil { | |
return nil, err | |
} | |
s.grpcServer, err = provider.StartGrpcServer("coordinator", config.GrpcConfig, func(registrar grpc.ServiceRegistrar) { | |
coordinatorpb.RegisterSysDBServer(registrar, s) | |
}) | |
if err != nil { | |
return nil, err | |
} | |
} | |
return s, nil | |
} | |
func createMemberlistManager(config Config) (*memberlist_manager.MemberlistManager, error) { | |
// TODO: Make this configuration | |
log.Info("Starting memberlist manager") | |
memberlist_name := config.WorkerMemberlistName | |
namespace := config.KubernetesNamespace | |
clientset, err := utils.GetKubernetesInterface() | |
if err != nil { | |
return nil, err | |
} | |
dynamicClient, err := utils.GetKubernetesDynamicInterface() | |
if err != nil { | |
return nil, err | |
} | |
nodeWatcher := memberlist_manager.NewKubernetesWatcher(clientset, namespace, "worker", config.WatchInterval) | |
memberlistStore := memberlist_manager.NewCRMemberlistStore(dynamicClient, namespace, memberlist_name) | |
memberlist_manager := memberlist_manager.NewMemberlistManager(nodeWatcher, memberlistStore) | |
return memberlist_manager, nil | |
} | |
func createPulsarNotifer(pulsarURL string, notificationTopic string) (*notification.PulsarNotifier, pulsar.Client, pulsar.Producer, error) { | |
client, err := pulsar.NewClient(pulsar.ClientOptions{ | |
URL: pulsarURL, | |
}) | |
if err != nil { | |
log.Error("Failed to create pulsar client", zap.Error(err)) | |
return nil, nil, nil, err | |
} | |
producer, err := client.CreateProducer(pulsar.ProducerOptions{ | |
Topic: notificationTopic, | |
}) | |
if err != nil { | |
log.Error("Failed to create producer", zap.Error(err)) | |
return nil, nil, nil, err | |
} | |
notifier := notification.NewPulsarNotifier(producer) | |
return notifier, client, producer, nil | |
} | |
func (s *Server) Close() error { | |
s.healthServer.Shutdown() | |
s.coordinator.Stop() | |
return nil | |
} | |