package notification import ( "context" "sync/atomic" "github.com/chroma/chroma-coordinator/internal/common" "github.com/chroma/chroma-coordinator/internal/model" "github.com/pingcap/log" "go.uber.org/zap" ) type NotificationProcessor interface { common.Component Process(ctx context.Context) error Trigger(ctx context.Context, triggerMsg TriggerMessage) } type SimpleNotificationProcessor struct { ctx context.Context store NotificationStore notifer Notifier channel chan TriggerMessage doneChannel chan bool running atomic.Bool } type TriggerMessage struct { Msg model.Notification ResultChan chan error } const triggerChannelSize = 1000 var _ NotificationProcessor = &SimpleNotificationProcessor{} func NewSimpleNotificationProcessor(ctx context.Context, store NotificationStore, notifier Notifier) *SimpleNotificationProcessor { return &SimpleNotificationProcessor{ ctx: ctx, store: store, notifer: notifier, channel: make(chan TriggerMessage, triggerChannelSize), doneChannel: make(chan bool), } } func (n *SimpleNotificationProcessor) Start() error { // During startup, first sending all pending notifications in the store to the notification topic log.Info("Starting notification processor") err := n.sendPendingNotifications(n.ctx) if err != nil { log.Error("Failed to send pending notifications", zap.Error(err)) return err } n.running.Store(true) go n.Process(n.ctx) return nil } func (n *SimpleNotificationProcessor) Stop() error { n.running.Store(false) n.doneChannel <- true return nil } func (n *SimpleNotificationProcessor) Process(ctx context.Context) error { log.Info("Waiting for new notifications") for { select { case triggerMsg := <-n.channel: msg := triggerMsg.Msg log.Info("Received notification", zap.Any("msg", msg)) running := n.running.Load() log.Info("Notification processor is running", zap.Bool("running", running)) // We need to block here until the notifications are sent successfully for running { // Check the notification store if this notification is already processed // If it is already processed, just return // If it is not processed, send notifications and remove from the store notifications, err := n.store.GetNotifications(ctx, msg.CollectionID) if err != nil { log.Error("Failed to get notifications", zap.Error(err)) triggerMsg.ResultChan <- err continue } if len(notifications) == 0 { log.Info("No pending notifications found") triggerMsg.ResultChan <- nil break } log.Info("Got notifications from notification store", zap.Any("notifications", notifications)) err = n.notifer.Notify(ctx, notifications) if err != nil { log.Error("Failed to send pending notifications", zap.Error(err)) } else { n.store.RemoveNotifications(ctx, notifications) log.Info("Rmove notifications from notification store", zap.Any("notifications", notifications)) triggerMsg.ResultChan <- nil break } } case <-n.doneChannel: log.Info("Stopping notification processor") return nil } } } func (n *SimpleNotificationProcessor) Trigger(ctx context.Context, triggerMsg TriggerMessage) { log.Info("Triggering notification", zap.Any("msg", triggerMsg.Msg)) if len(n.channel) == triggerChannelSize { log.Error("Notification channel is full, dropping notification", zap.Any("msg", triggerMsg.Msg)) triggerMsg.ResultChan <- nil return } n.channel <- triggerMsg } func (n *SimpleNotificationProcessor) sendPendingNotifications(ctx context.Context) error { notificationMap, err := n.store.GetAllPendingNotifications(ctx) if err != nil { log.Error("Failed to get all pending notifications", zap.Error(err)) return err } for collectionID, notifications := range notificationMap { log.Info("Sending pending notifications", zap.Any("collectionID", collectionID), zap.Any("notifications", notifications)) for { err = n.notifer.Notify(ctx, notifications) if err != nil { log.Error("Failed to send pending notifications", zap.Error(err)) } else { n.store.RemoveNotifications(ctx, notifications) break } } } return nil }