package errgroup import ( "context" "fmt" "sync" "sync/atomic" "github.com/avast/retry-go" ) type token struct{} type Group struct { cancel func(error) ctx context.Context opts []retry.Option success uint64 wg sync.WaitGroup sem chan token } func NewGroupWithContext(ctx context.Context, limit int, retryOpts ...retry.Option) (*Group, context.Context) { ctx, cancel := context.WithCancelCause(ctx) return (&Group{cancel: cancel, ctx: ctx, opts: append(retryOpts, retry.Context(ctx))}).SetLimit(limit), ctx } func (g *Group) done() { if g.sem != nil { <-g.sem } g.wg.Done() atomic.AddUint64(&g.success, 1) } func (g *Group) Wait() error { g.wg.Wait() return context.Cause(g.ctx) } func (g *Group) Go(f func(ctx context.Context) error) { if g.sem != nil { g.sem <- token{} } g.wg.Add(1) go func() { defer g.done() if err := retry.Do(func() error { return f(g.ctx) }, g.opts...); err != nil { g.cancel(err) } }() } func (g *Group) TryGo(f func(ctx context.Context) error) bool { if g.sem != nil { select { case g.sem <- token{}: default: return false } } g.wg.Add(1) go func() { defer g.done() if err := retry.Do(func() error { return f(g.ctx) }, g.opts...); err != nil { g.cancel(err) } }() return true } func (g *Group) SetLimit(n int) *Group { if len(g.sem) != 0 { panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) } if n > 0 { g.sem = make(chan token, n) } else { g.sem = nil } return g } func (g *Group) Success() uint64 { return atomic.LoadUint64(&g.success) } func (g *Group) Err() error { return context.Cause(g.ctx) }