package notification import ( "context" "github.com/apache/pulsar-client-go/pulsar" "github.com/chroma/chroma-coordinator/internal/model" "github.com/chroma/chroma-coordinator/internal/proto/coordinatorpb" "github.com/pingcap/log" "go.uber.org/zap" "google.golang.org/protobuf/proto" ) type Notifier interface { Notify(ctx context.Context, notifications []model.Notification) error } type PulsarNotifier struct { producer pulsar.Producer } var _ Notifier = &PulsarNotifier{} func NewPulsarNotifier(producer pulsar.Producer) *PulsarNotifier { return &PulsarNotifier{ producer: producer, } } func (p *PulsarNotifier) Notify(ctx context.Context, notifications []model.Notification) error { for _, notification := range notifications { notificationPb := coordinatorpb.Notification{ CollectionId: notification.CollectionID, Type: notification.Type, Status: notification.Status, } payload, err := proto.Marshal(¬ificationPb) if err != nil { log.Error("Failed to marshal notification", zap.Error(err)) return err } message := &pulsar.ProducerMessage{ Key: notification.CollectionID, Payload: payload, } // Since the number of notifications is small, we can send them synchronously // for now. This is easy to reason about hte order of notifications. // // As follow up optimizations, we can send them asynchronously in batches and // track failed messages. _, err = p.producer.Send(ctx, message) if err != nil { log.Error("Failed to send message", zap.Error(err)) return err } log.Info("Published message", zap.Any("message", message)) } return nil } type MemoryNotifier struct { queue []pulsar.ProducerMessage } var _ Notifier = &MemoryNotifier{} func NewMemoryNotifier() *MemoryNotifier { return &MemoryNotifier{ queue: make([]pulsar.ProducerMessage, 0), } } func (m *MemoryNotifier) Notify(ctx context.Context, notifications []model.Notification) error { for _, notification := range notifications { notificationPb := coordinatorpb.Notification{ CollectionId: notification.CollectionID, Type: notification.Type, Status: notification.Status, } payload, err := proto.Marshal(¬ificationPb) if err != nil { log.Error("Failed to marshal notification", zap.Error(err)) return err } message := pulsar.ProducerMessage{ Key: notification.CollectionID, Payload: payload, } m.queue = append(m.queue, message) log.Info("Published message", zap.Any("message", message)) } return nil }