Spaces:
Sleeping
Sleeping
| package notification | |
| import ( | |
| "context" | |
| "sync/atomic" | |
| "github.com/chroma/chroma-coordinator/internal/common" | |
| "github.com/chroma/chroma-coordinator/internal/model" | |
| "github.com/pingcap/log" | |
| "go.uber.org/zap" | |
| ) | |
| type NotificationProcessor interface { | |
| common.Component | |
| Process(ctx context.Context) error | |
| Trigger(ctx context.Context, triggerMsg TriggerMessage) | |
| } | |
| type SimpleNotificationProcessor struct { | |
| ctx context.Context | |
| store NotificationStore | |
| notifer Notifier | |
| channel chan TriggerMessage | |
| doneChannel chan bool | |
| running atomic.Bool | |
| } | |
| type TriggerMessage struct { | |
| Msg model.Notification | |
| ResultChan chan error | |
| } | |
| const triggerChannelSize = 1000 | |
| var _ NotificationProcessor = &SimpleNotificationProcessor{} | |
| func NewSimpleNotificationProcessor(ctx context.Context, store NotificationStore, notifier Notifier) *SimpleNotificationProcessor { | |
| return &SimpleNotificationProcessor{ | |
| ctx: ctx, | |
| store: store, | |
| notifer: notifier, | |
| channel: make(chan TriggerMessage, triggerChannelSize), | |
| doneChannel: make(chan bool), | |
| } | |
| } | |
| func (n *SimpleNotificationProcessor) Start() error { | |
| // During startup, first sending all pending notifications in the store to the notification topic | |
| log.Info("Starting notification processor") | |
| err := n.sendPendingNotifications(n.ctx) | |
| if err != nil { | |
| log.Error("Failed to send pending notifications", zap.Error(err)) | |
| return err | |
| } | |
| n.running.Store(true) | |
| go n.Process(n.ctx) | |
| return nil | |
| } | |
| func (n *SimpleNotificationProcessor) Stop() error { | |
| n.running.Store(false) | |
| n.doneChannel <- true | |
| return nil | |
| } | |
| func (n *SimpleNotificationProcessor) Process(ctx context.Context) error { | |
| log.Info("Waiting for new notifications") | |
| for { | |
| select { | |
| case triggerMsg := <-n.channel: | |
| msg := triggerMsg.Msg | |
| log.Info("Received notification", zap.Any("msg", msg)) | |
| running := n.running.Load() | |
| log.Info("Notification processor is running", zap.Bool("running", running)) | |
| // We need to block here until the notifications are sent successfully | |
| for running { | |
| // Check the notification store if this notification is already processed | |
| // If it is already processed, just return | |
| // If it is not processed, send notifications and remove from the store | |
| notifications, err := n.store.GetNotifications(ctx, msg.CollectionID) | |
| if err != nil { | |
| log.Error("Failed to get notifications", zap.Error(err)) | |
| triggerMsg.ResultChan <- err | |
| continue | |
| } | |
| if len(notifications) == 0 { | |
| log.Info("No pending notifications found") | |
| triggerMsg.ResultChan <- nil | |
| break | |
| } | |
| log.Info("Got notifications from notification store", zap.Any("notifications", notifications)) | |
| err = n.notifer.Notify(ctx, notifications) | |
| if err != nil { | |
| log.Error("Failed to send pending notifications", zap.Error(err)) | |
| } else { | |
| n.store.RemoveNotifications(ctx, notifications) | |
| log.Info("Rmove notifications from notification store", zap.Any("notifications", notifications)) | |
| triggerMsg.ResultChan <- nil | |
| break | |
| } | |
| } | |
| case <-n.doneChannel: | |
| log.Info("Stopping notification processor") | |
| return nil | |
| } | |
| } | |
| } | |
| func (n *SimpleNotificationProcessor) Trigger(ctx context.Context, triggerMsg TriggerMessage) { | |
| log.Info("Triggering notification", zap.Any("msg", triggerMsg.Msg)) | |
| if len(n.channel) == triggerChannelSize { | |
| log.Error("Notification channel is full, dropping notification", zap.Any("msg", triggerMsg.Msg)) | |
| triggerMsg.ResultChan <- nil | |
| return | |
| } | |
| n.channel <- triggerMsg | |
| } | |
| func (n *SimpleNotificationProcessor) sendPendingNotifications(ctx context.Context) error { | |
| notificationMap, err := n.store.GetAllPendingNotifications(ctx) | |
| if err != nil { | |
| log.Error("Failed to get all pending notifications", zap.Error(err)) | |
| return err | |
| } | |
| for collectionID, notifications := range notificationMap { | |
| log.Info("Sending pending notifications", zap.Any("collectionID", collectionID), zap.Any("notifications", notifications)) | |
| for { | |
| err = n.notifer.Notify(ctx, notifications) | |
| if err != nil { | |
| log.Error("Failed to send pending notifications", zap.Error(err)) | |
| } else { | |
| n.store.RemoveNotifications(ctx, notifications) | |
| break | |
| } | |
| } | |
| } | |
| return nil | |
| } | |