Spaces:
Runtime error
Runtime error
package errgroup | |
import ( | |
"context" | |
"fmt" | |
"sync" | |
"sync/atomic" | |
"github.com/avast/retry-go" | |
) | |
type token struct{} | |
type Group struct { | |
cancel func(error) | |
ctx context.Context | |
opts []retry.Option | |
success uint64 | |
wg sync.WaitGroup | |
sem chan token | |
} | |
func NewGroupWithContext(ctx context.Context, limit int, retryOpts ...retry.Option) (*Group, context.Context) { | |
ctx, cancel := context.WithCancelCause(ctx) | |
return (&Group{cancel: cancel, ctx: ctx, opts: append(retryOpts, retry.Context(ctx))}).SetLimit(limit), ctx | |
} | |
func (g *Group) done() { | |
if g.sem != nil { | |
<-g.sem | |
} | |
g.wg.Done() | |
atomic.AddUint64(&g.success, 1) | |
} | |
func (g *Group) Wait() error { | |
g.wg.Wait() | |
return context.Cause(g.ctx) | |
} | |
func (g *Group) Go(f func(ctx context.Context) error) { | |
if g.sem != nil { | |
g.sem <- token{} | |
} | |
g.wg.Add(1) | |
go func() { | |
defer g.done() | |
if err := retry.Do(func() error { return f(g.ctx) }, g.opts...); err != nil { | |
g.cancel(err) | |
} | |
}() | |
} | |
func (g *Group) TryGo(f func(ctx context.Context) error) bool { | |
if g.sem != nil { | |
select { | |
case g.sem <- token{}: | |
default: | |
return false | |
} | |
} | |
g.wg.Add(1) | |
go func() { | |
defer g.done() | |
if err := retry.Do(func() error { return f(g.ctx) }, g.opts...); err != nil { | |
g.cancel(err) | |
} | |
}() | |
return true | |
} | |
func (g *Group) SetLimit(n int) *Group { | |
if len(g.sem) != 0 { | |
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) | |
} | |
if n > 0 { | |
g.sem = make(chan token, n) | |
} else { | |
g.sem = nil | |
} | |
return g | |
} | |
func (g *Group) Success() uint64 { | |
return atomic.LoadUint64(&g.success) | |
} | |
func (g *Group) Err() error { | |
return context.Cause(g.ctx) | |
} | |