package grpccoordinator import ( "context" "errors" "time" "github.com/apache/pulsar-client-go/pulsar" "github.com/chroma/chroma-coordinator/internal/coordinator" "github.com/chroma/chroma-coordinator/internal/grpccoordinator/grpcutils" "github.com/chroma/chroma-coordinator/internal/memberlist_manager" "github.com/chroma/chroma-coordinator/internal/metastore/db/dao" "github.com/chroma/chroma-coordinator/internal/metastore/db/dbcore" "github.com/chroma/chroma-coordinator/internal/notification" "github.com/chroma/chroma-coordinator/internal/proto/coordinatorpb" "github.com/chroma/chroma-coordinator/internal/utils" "github.com/pingcap/log" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/health" "gorm.io/gorm" ) type Config struct { // GrpcConfig config GrpcConfig *grpcutils.GrpcConfig // System catalog provider SystemCatalogProvider string // MetaTable config Username string Password string Address string Port int DBName string MaxIdleConns int MaxOpenConns int // Notification config NotificationStoreProvider string NotifierProvider string NotificationTopic string // Pulsar config PulsarAdminURL string PulsarURL string PulsarTenant string PulsarNamespace string // Kubernetes config KubernetesNamespace string WorkerMemberlistName string // Assignment policy config can be "simple" or "rendezvous" AssignmentPolicy string // Watcher config WatchInterval time.Duration // Config for testing Testing bool } // Server wraps Coordinator with GRPC services. // // When Testing is set to true, the GRPC services will not be intialzed. This is // convenient for end-to-end property based testing. type Server struct { coordinatorpb.UnimplementedSysDBServer coordinator coordinator.ICoordinator grpcServer grpcutils.GrpcServer healthServer *health.Server } func New(config Config) (*Server, error) { if config.SystemCatalogProvider == "memory" { return NewWithGrpcProvider(config, grpcutils.Default, nil) } else if config.SystemCatalogProvider == "database" { dBConfig := dbcore.DBConfig{ Username: config.Username, Password: config.Password, Address: config.Address, Port: config.Port, DBName: config.DBName, MaxIdleConns: config.MaxIdleConns, MaxOpenConns: config.MaxOpenConns, } db, err := dbcore.Connect(dBConfig) if err != nil { return nil, err } return NewWithGrpcProvider(config, grpcutils.Default, db) } else { return nil, errors.New("invalid system catalog provider, only memory and database are supported") } } func NewWithGrpcProvider(config Config, provider grpcutils.GrpcProvider, db *gorm.DB) (*Server, error) { ctx := context.Background() s := &Server{ healthServer: health.NewServer(), } var assignmentPolicy coordinator.CollectionAssignmentPolicy if config.AssignmentPolicy == "simple" { log.Info("Using simple assignment policy") assignmentPolicy = coordinator.NewSimpleAssignmentPolicy(config.PulsarTenant, config.PulsarNamespace) } else if config.AssignmentPolicy == "rendezvous" { log.Info("Using rendezvous assignment policy") err := utils.CreateTopics(config.PulsarAdminURL, config.PulsarTenant, config.PulsarNamespace, coordinator.Topics[:]) if err != nil { log.Error("Failed to create topics", zap.Error(err)) return nil, err } assignmentPolicy = coordinator.NewRendezvousAssignmentPolicy(config.PulsarTenant, config.PulsarNamespace) } else { return nil, errors.New("invalid assignment policy, only simple and rendezvous are supported") } var notificationStore notification.NotificationStore if config.NotificationStoreProvider == "memory" { log.Info("Using memory notification store") notificationStore = notification.NewMemoryNotificationStore() } else if config.NotificationStoreProvider == "database" { txnImpl := dbcore.NewTxImpl() metaDomain := dao.NewMetaDomain() notificationStore = notification.NewDatabaseNotificationStore(txnImpl, metaDomain) } else { return nil, errors.New("invalid notification store provider, only memory and database are supported") } var notifier notification.Notifier var client pulsar.Client var producer pulsar.Producer if config.NotifierProvider == "memory" { log.Info("Using memory notifier") notifier = notification.NewMemoryNotifier() } else if config.NotifierProvider == "pulsar" { log.Info("Using pulsar notifier") pulsarNotifier, pulsarClient, pulsarProducer, err := createPulsarNotifer(config.PulsarURL, config.NotificationTopic) notifier = pulsarNotifier client = pulsarClient producer = pulsarProducer if err != nil { log.Error("Failed to create pulsar notifier", zap.Error(err)) return nil, err } } else { return nil, errors.New("invalid notifier provider, only memory and pulsar are supported") } if client != nil { defer client.Close() } if producer != nil { defer producer.Close() } coordinator, err := coordinator.NewCoordinator(ctx, assignmentPolicy, db, notificationStore, notifier) if err != nil { return nil, err } s.coordinator = coordinator s.coordinator.Start() if !config.Testing { memberlist_manager, err := createMemberlistManager(config) if err != nil { return nil, err } // Start the memberlist manager err = memberlist_manager.Start() if err != nil { return nil, err } s.grpcServer, err = provider.StartGrpcServer("coordinator", config.GrpcConfig, func(registrar grpc.ServiceRegistrar) { coordinatorpb.RegisterSysDBServer(registrar, s) }) if err != nil { return nil, err } } return s, nil } func createMemberlistManager(config Config) (*memberlist_manager.MemberlistManager, error) { // TODO: Make this configuration log.Info("Starting memberlist manager") memberlist_name := config.WorkerMemberlistName namespace := config.KubernetesNamespace clientset, err := utils.GetKubernetesInterface() if err != nil { return nil, err } dynamicClient, err := utils.GetKubernetesDynamicInterface() if err != nil { return nil, err } nodeWatcher := memberlist_manager.NewKubernetesWatcher(clientset, namespace, "worker", config.WatchInterval) memberlistStore := memberlist_manager.NewCRMemberlistStore(dynamicClient, namespace, memberlist_name) memberlist_manager := memberlist_manager.NewMemberlistManager(nodeWatcher, memberlistStore) return memberlist_manager, nil } func createPulsarNotifer(pulsarURL string, notificationTopic string) (*notification.PulsarNotifier, pulsar.Client, pulsar.Producer, error) { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: pulsarURL, }) if err != nil { log.Error("Failed to create pulsar client", zap.Error(err)) return nil, nil, nil, err } producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: notificationTopic, }) if err != nil { log.Error("Failed to create producer", zap.Error(err)) return nil, nil, nil, err } notifier := notification.NewPulsarNotifier(producer) return notifier, client, producer, nil } func (s *Server) Close() error { s.healthServer.Shutdown() s.coordinator.Stop() return nil }