| package model |
|
|
| import ( |
| "errors" |
| "sync" |
| "time" |
|
|
| "github.com/QuantumNous/new-api/common" |
|
|
| "github.com/bytedance/gopkg/util/gopool" |
| "gorm.io/gorm" |
| ) |
|
|
| const ( |
| BatchUpdateTypeUserQuota = iota |
| BatchUpdateTypeTokenQuota |
| BatchUpdateTypeUsedQuota |
| BatchUpdateTypeChannelUsedQuota |
| BatchUpdateTypeRequestCount |
| BatchUpdateTypeCount |
| ) |
|
|
| var batchUpdateStores []map[int]int |
| var batchUpdateLocks []sync.Mutex |
|
|
| func init() { |
| for i := 0; i < BatchUpdateTypeCount; i++ { |
| batchUpdateStores = append(batchUpdateStores, make(map[int]int)) |
| batchUpdateLocks = append(batchUpdateLocks, sync.Mutex{}) |
| } |
| } |
|
|
| func InitBatchUpdater() { |
| gopool.Go(func() { |
| for { |
| time.Sleep(time.Duration(common.BatchUpdateInterval) * time.Second) |
| batchUpdate() |
| } |
| }) |
| } |
|
|
| func addNewRecord(type_ int, id int, value int) { |
| batchUpdateLocks[type_].Lock() |
| defer batchUpdateLocks[type_].Unlock() |
| if _, ok := batchUpdateStores[type_][id]; !ok { |
| batchUpdateStores[type_][id] = value |
| } else { |
| batchUpdateStores[type_][id] += value |
| } |
| } |
|
|
| func batchUpdate() { |
| |
| hasData := false |
| for i := 0; i < BatchUpdateTypeCount; i++ { |
| batchUpdateLocks[i].Lock() |
| if len(batchUpdateStores[i]) > 0 { |
| hasData = true |
| batchUpdateLocks[i].Unlock() |
| break |
| } |
| batchUpdateLocks[i].Unlock() |
| } |
|
|
| if !hasData { |
| return |
| } |
|
|
| common.SysLog("batch update started") |
| for i := 0; i < BatchUpdateTypeCount; i++ { |
| batchUpdateLocks[i].Lock() |
| store := batchUpdateStores[i] |
| batchUpdateStores[i] = make(map[int]int) |
| batchUpdateLocks[i].Unlock() |
| |
| for key, value := range store { |
| switch i { |
| case BatchUpdateTypeUserQuota: |
| err := increaseUserQuota(key, value) |
| if err != nil { |
| common.SysLog("failed to batch update user quota: " + err.Error()) |
| } |
| case BatchUpdateTypeTokenQuota: |
| err := increaseTokenQuota(key, value) |
| if err != nil { |
| common.SysLog("failed to batch update token quota: " + err.Error()) |
| } |
| case BatchUpdateTypeUsedQuota: |
| updateUserUsedQuota(key, value) |
| case BatchUpdateTypeRequestCount: |
| updateUserRequestCount(key, value) |
| case BatchUpdateTypeChannelUsedQuota: |
| updateChannelUsedQuota(key, value) |
| } |
| } |
| } |
| common.SysLog("batch update finished") |
| } |
|
|
| func RecordExist(err error) (bool, error) { |
| if err == nil { |
| return true, nil |
| } |
| if errors.Is(err, gorm.ErrRecordNotFound) { |
| return false, nil |
| } |
| return false, err |
| } |
|
|
| func shouldUpdateRedis(fromDB bool, err error) bool { |
| return common.RedisEnabled && fromDB && err == nil |
| } |
|
|