package task import ( "github.com/alist-org/alist/v3/pkg/generic_sync" "github.com/alist-org/alist/v3/pkg/utils" "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) type Manager[K comparable] struct { curID K workerC chan struct{} updateID func(*K) tasks generic_sync.MapOf[K, *Task[K]] } func (tm *Manager[K]) Submit(task *Task[K]) K { if tm.updateID != nil { tm.updateID(&tm.curID) task.ID = tm.curID } tm.tasks.Store(task.ID, task) tm.do(task) return task.ID } func (tm *Manager[K]) do(task *Task[K]) { go func() { log.Debugf("task [%s] waiting for worker", task.Name) select { case <-tm.workerC: log.Debugf("task [%s] starting", task.Name) task.run() log.Debugf("task [%s] ended", task.Name) case <-task.Ctx.Done(): log.Debugf("task [%s] canceled", task.Name) return } // return worker tm.workerC <- struct{}{} }() } func (tm *Manager[K]) GetAll() []*Task[K] { return tm.tasks.Values() } func (tm *Manager[K]) Get(tid K) (*Task[K], bool) { return tm.tasks.Load(tid) } func (tm *Manager[K]) MustGet(tid K) *Task[K] { task, _ := tm.Get(tid) return task } func (tm *Manager[K]) Retry(tid K) error { t, ok := tm.Get(tid) if !ok { return errors.WithStack(ErrTaskNotFound) } tm.do(t) return nil } func (tm *Manager[K]) Cancel(tid K) error { t, ok := tm.Get(tid) if !ok { return errors.WithStack(ErrTaskNotFound) } t.Cancel() return nil } func (tm *Manager[K]) Remove(tid K) error { t, ok := tm.Get(tid) if !ok { return errors.WithStack(ErrTaskNotFound) } if !t.Done() { return errors.WithStack(ErrTaskRunning) } tm.tasks.Delete(tid) return nil } // RemoveAll removes all tasks from the manager, this maybe shouldn't be used // because the task maybe still running. func (tm *Manager[K]) RemoveAll() { tm.tasks.Clear() } func (tm *Manager[K]) RemoveByStates(states ...string) { tasks := tm.GetAll() for _, task := range tasks { if utils.SliceContains(states, task.GetState()) { _ = tm.Remove(task.ID) } } } func (tm *Manager[K]) GetByStates(states ...string) []*Task[K] { var tasks []*Task[K] tm.tasks.Range(func(key K, value *Task[K]) bool { if utils.SliceContains(states, value.GetState()) { tasks = append(tasks, value) } return true }) return tasks } func (tm *Manager[K]) ListUndone() []*Task[K] { return tm.GetByStates(PENDING, RUNNING, CANCELING) } func (tm *Manager[K]) ListDone() []*Task[K] { return tm.GetByStates(SUCCEEDED, CANCELED, ERRORED) } func (tm *Manager[K]) ClearDone() { tm.RemoveByStates(SUCCEEDED, CANCELED, ERRORED) } func (tm *Manager[K]) ClearSucceeded() { tm.RemoveByStates(SUCCEEDED) } func (tm *Manager[K]) RawTasks() *generic_sync.MapOf[K, *Task[K]] { return &tm.tasks } func NewTaskManager[K comparable](maxWorker int, updateID ...func(*K)) *Manager[K] { tm := &Manager[K]{ tasks: generic_sync.MapOf[K, *Task[K]]{}, workerC: make(chan struct{}, maxWorker), } for i := 0; i < maxWorker; i++ { tm.workerC <- struct{}{} } if len(updateID) > 0 { tm.updateID = updateID[0] } return tm }