package mq import ( "sync" "github.com/alist-org/alist/v3/pkg/generic" ) type Message[T any] struct { Content T } type BasicConsumer[T any] func(Message[T]) type AllConsumer[T any] func([]Message[T]) type MQ[T any] interface { Publish(Message[T]) Consume(BasicConsumer[T]) ConsumeAll(AllConsumer[T]) Clear() Len() int } type inMemoryMQ[T any] struct { queue generic.Queue[Message[T]] sync.Mutex } func NewInMemoryMQ[T any]() MQ[T] { return &inMemoryMQ[T]{queue: *generic.NewQueue[Message[T]]()} } func (mq *inMemoryMQ[T]) Publish(msg Message[T]) { mq.Lock() defer mq.Unlock() mq.queue.Push(msg) } func (mq *inMemoryMQ[T]) Consume(consumer BasicConsumer[T]) { mq.Lock() defer mq.Unlock() for !mq.queue.IsEmpty() { consumer(mq.queue.Pop()) } } func (mq *inMemoryMQ[T]) ConsumeAll(consumer AllConsumer[T]) { mq.Lock() defer mq.Unlock() consumer(mq.queue.PopAll()) } func (mq *inMemoryMQ[T]) Clear() { mq.Lock() defer mq.Unlock() mq.queue.Clear() } func (mq *inMemoryMQ[T]) Len() int { return mq.queue.Len() }