| |
| |
| package watcher |
|
|
| import ( |
| "context" |
| "strings" |
| "sync" |
| "time" |
|
|
| "github.com/fsnotify/fsnotify" |
| "github.com/router-for-me/CLIProxyAPI/v6/internal/config" |
| "gopkg.in/yaml.v3" |
|
|
| sdkAuth "github.com/router-for-me/CLIProxyAPI/v6/sdk/auth" |
| coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" |
| log "github.com/sirupsen/logrus" |
| ) |
|
|
| |
| type storePersister interface { |
| PersistConfig(ctx context.Context) error |
| PersistAuthFiles(ctx context.Context, message string, paths ...string) error |
| } |
|
|
| type authDirProvider interface { |
| AuthDir() string |
| } |
|
|
| |
| type Watcher struct { |
| configPath string |
| authDir string |
| config *config.Config |
| clientsMutex sync.RWMutex |
| configReloadMu sync.Mutex |
| configReloadTimer *time.Timer |
| reloadCallback func(*config.Config) |
| watcher *fsnotify.Watcher |
| lastAuthHashes map[string]string |
| lastRemoveTimes map[string]time.Time |
| lastConfigHash string |
| authQueue chan<- AuthUpdate |
| currentAuths map[string]*coreauth.Auth |
| runtimeAuths map[string]*coreauth.Auth |
| dispatchMu sync.Mutex |
| dispatchCond *sync.Cond |
| pendingUpdates map[string]AuthUpdate |
| pendingOrder []string |
| dispatchCancel context.CancelFunc |
| storePersister storePersister |
| mirroredAuthDir string |
| oldConfigYaml []byte |
| } |
|
|
| |
| type AuthUpdateAction string |
|
|
| const ( |
| AuthUpdateActionAdd AuthUpdateAction = "add" |
| AuthUpdateActionModify AuthUpdateAction = "modify" |
| AuthUpdateActionDelete AuthUpdateAction = "delete" |
| ) |
|
|
| |
| type AuthUpdate struct { |
| Action AuthUpdateAction |
| ID string |
| Auth *coreauth.Auth |
| } |
|
|
| const ( |
| |
| |
| replaceCheckDelay = 50 * time.Millisecond |
| configReloadDebounce = 150 * time.Millisecond |
| authRemoveDebounceWindow = 1 * time.Second |
| ) |
|
|
| |
| func NewWatcher(configPath, authDir string, reloadCallback func(*config.Config)) (*Watcher, error) { |
| watcher, errNewWatcher := fsnotify.NewWatcher() |
| if errNewWatcher != nil { |
| return nil, errNewWatcher |
| } |
| w := &Watcher{ |
| configPath: configPath, |
| authDir: authDir, |
| reloadCallback: reloadCallback, |
| watcher: watcher, |
| lastAuthHashes: make(map[string]string), |
| } |
| w.dispatchCond = sync.NewCond(&w.dispatchMu) |
| if store := sdkAuth.GetTokenStore(); store != nil { |
| if persister, ok := store.(storePersister); ok { |
| w.storePersister = persister |
| log.Debug("persistence-capable token store detected; watcher will propagate persisted changes") |
| } |
| if provider, ok := store.(authDirProvider); ok { |
| if fixed := strings.TrimSpace(provider.AuthDir()); fixed != "" { |
| w.mirroredAuthDir = fixed |
| log.Debugf("mirrored auth directory locked to %s", fixed) |
| } |
| } |
| } |
| return w, nil |
| } |
|
|
| |
| func (w *Watcher) Start(ctx context.Context) error { |
| return w.start(ctx) |
| } |
|
|
| |
| func (w *Watcher) Stop() error { |
| w.stopDispatch() |
| w.stopConfigReloadTimer() |
| return w.watcher.Close() |
| } |
|
|
| |
| func (w *Watcher) SetConfig(cfg *config.Config) { |
| w.clientsMutex.Lock() |
| defer w.clientsMutex.Unlock() |
| w.config = cfg |
| w.oldConfigYaml, _ = yaml.Marshal(cfg) |
| } |
|
|
| |
| func (w *Watcher) SetAuthUpdateQueue(queue chan<- AuthUpdate) { |
| w.setAuthUpdateQueue(queue) |
| } |
|
|
| |
| |
| |
| func (w *Watcher) DispatchRuntimeAuthUpdate(update AuthUpdate) bool { |
| return w.dispatchRuntimeAuthUpdate(update) |
| } |
|
|
| |
| func (w *Watcher) SnapshotCoreAuths() []*coreauth.Auth { |
| w.clientsMutex.RLock() |
| cfg := w.config |
| w.clientsMutex.RUnlock() |
| return snapshotCoreAuths(cfg, w.authDir) |
| } |
|
|