From 22dae9efad59978c44e7a4fb740d7485f6c8765f Mon Sep 17 00:00:00 2001 From: Mukhtar Akere Date: Wed, 17 Sep 2025 23:30:45 +0100 Subject: [PATCH] Add a new worker that checks if an account is opened --- pkg/debrid/account/account.go | 48 ++++++++++++++++++ pkg/debrid/account/manager.go | 56 +++++++++++++++------ pkg/debrid/debrid.go | 53 +++++++++++++++++-- pkg/debrid/providers/alldebrid/alldebrid.go | 2 +- pkg/webdav/file.go | 3 -- pkg/webdav/misc.go | 9 ---- 6 files changed, 139 insertions(+), 32 deletions(-) diff --git a/pkg/debrid/account/account.go b/pkg/debrid/account/account.go index b37229b..04fa4e2 100644 --- a/pkg/debrid/account/account.go +++ b/pkg/debrid/account/account.go @@ -1,6 +1,8 @@ package account import ( + "fmt" + "net/http" "sync/atomic" "github.com/puzpuzpuz/xsync/v4" @@ -17,6 +19,9 @@ type Account struct { TrafficUsed atomic.Int64 `json:"traffic_used"` // Traffic used in bytes Username string `json:"username"` // Username for the account httpClient *request.Client + + // Account reactivation tracking + DisableCount atomic.Int32 `json:"disable_count"` } func (a *Account) Equals(other *Account) bool { @@ -69,3 +74,46 @@ func (a *Account) StoreDownloadLinks(dls map[string]*types.DownloadLink) { a.StoreDownloadLink(*dl) } } + +// MarkDisabled marks the account as disabled and increments the disable count +func (a *Account) MarkDisabled() { + a.Disabled.Store(true) + a.DisableCount.Add(1) +} + +func (a *Account) Reset() { + a.DisableCount.Store(0) + a.Disabled.Store(false) +} + +func (a *Account) CheckBandwidth() error { + // Get a one of the download links to check if the account is still valid + downloadLink := "" + a.links.Range(func(key string, dl types.DownloadLink) bool { + if dl.DownloadLink != "" { + downloadLink = dl.DownloadLink + return false + } + return true + }) + if downloadLink == "" { + return fmt.Errorf("no download link found") + } + + // Let's check the download link status + req, err := http.NewRequest(http.MethodGet, downloadLink, nil) + if err != nil { + return err + } + // Use a simple client + client := http.DefaultClient + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { + return fmt.Errorf("account check failed with status code %d", resp.StatusCode) + } + return nil +} diff --git a/pkg/debrid/account/manager.go b/pkg/debrid/account/manager.go index fb44817..643b715 100644 --- a/pkg/debrid/account/manager.go +++ b/pkg/debrid/account/manager.go @@ -14,16 +14,22 @@ import ( "go.uber.org/ratelimit" ) +const ( + MaxDisableCount = 3 +) + type Manager struct { debrid string current atomic.Pointer[Account] accounts *xsync.Map[string, *Account] + logger zerolog.Logger } func NewManager(debridConf config.Debrid, downloadRL ratelimit.Limiter, logger zerolog.Logger) *Manager { m := &Manager{ debrid: debridConf.Name, accounts: xsync.NewMap[string, *Account](), + logger: logger, } var firstAccount *Account @@ -95,8 +101,16 @@ func (m *Manager) Current() *Account { // Slow path - find new current account activeAccounts := m.Active() if len(activeAccounts) == 0 { - m.current.Store(nil) - return nil + // No active accounts left, try to use disabled ones + m.logger.Warn().Str("debrid", m.debrid).Msg("No active accounts available, all accounts are disabled") + allAccounts := m.All() + if len(allAccounts) == 0 { + m.logger.Error().Str("debrid", m.debrid).Msg("No accounts configured") + m.current.Store(nil) + return nil + } + m.current.Store(allAccounts[0]) + return allAccounts[0] } newCurrent := activeAccounts[0] @@ -104,16 +118,12 @@ func (m *Manager) Current() *Account { return newCurrent } -func (m *Manager) setCurrent(account *Account) { - m.current.Store(account) -} - func (m *Manager) Disable(account *Account) { if account == nil { return } - account.Disabled.Store(true) + account.MarkDisabled() // If we're disabling the current account, it will be replaced // on the next Current() call - no need to proactively update @@ -131,7 +141,7 @@ func (m *Manager) Disable(account *Account) { func (m *Manager) Reset() { m.accounts.Range(func(key string, acc *Account) bool { - acc.Disabled.Store(false) + acc.Reset() return true }) @@ -144,12 +154,6 @@ func (m *Manager) Reset() { } } -func (m *Manager) Update(account *Account) { - if account != nil { - m.accounts.Store(account.Token, account) - } -} - func (m *Manager) GetAccount(token string) (*Account, error) { if token == "" { return nil, fmt.Errorf("token cannot be empty") @@ -209,3 +213,27 @@ func (m *Manager) Stats() []map[string]any { } return stats } + +func (m *Manager) CheckAndResetBandwidth() { + found := false + m.accounts.Range(func(key string, acc *Account) bool { + if acc.Disabled.Load() && acc.DisableCount.Load() < MaxDisableCount { + if err := acc.CheckBandwidth(); err == nil { + acc.Disabled.Store(false) + found = true + m.logger.Info().Str("debrid", m.debrid).Str("token", utils.Mask(acc.Token)).Msg("Re-activated disabled account") + } else { + m.logger.Debug().Err(err).Str("debrid", m.debrid).Str("token", utils.Mask(acc.Token)).Msg("Account still disabled") + } + } + return true + }) + if found { + // If we re-activated any account, reset current to first active + activeAccounts := m.Active() + if len(activeAccounts) > 0 { + m.current.Store(activeAccounts[0]) + } + + } +} diff --git a/pkg/debrid/debrid.go b/pkg/debrid/debrid.go index 5e75bd2..dd43646 100644 --- a/pkg/debrid/debrid.go +++ b/pkg/debrid/debrid.go @@ -109,11 +109,54 @@ func (d *Storage) StartWorker(ctx context.Context) error { ctx = context.Background() } - // Start all debrid syncAccounts - // Runs every 1m - if err := d.syncAccounts(); err != nil { - return err + // Start syncAccounts worker + go d.syncAccountsWorker(ctx) + + // Start bandwidth reset worker + go d.checkBandwidthWorker(ctx) + + return nil +} + +func (d *Storage) checkBandwidthWorker(ctx context.Context) { + if ctx == nil { + ctx = context.Background() } + ticker := time.NewTicker(30 * time.Minute) + go func() { + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + d.checkAccountBandwidth() + } + } + }() +} + +func (d *Storage) checkAccountBandwidth() { + d.mu.Lock() + defer d.mu.Unlock() + + for _, debrid := range d.debrids { + if debrid == nil || debrid.client == nil { + continue + } + accountManager := debrid.client.AccountManager() + if accountManager == nil { + continue + } + accountManager.CheckAndResetBandwidth() + } +} + +func (d *Storage) syncAccountsWorker(ctx context.Context) { + if ctx == nil { + ctx = context.Background() + } + + _ = d.syncAccounts() ticker := time.NewTicker(5 * time.Minute) go func() { for { @@ -125,7 +168,7 @@ func (d *Storage) StartWorker(ctx context.Context) error { } } }() - return nil + } func (d *Storage) syncAccounts() error { diff --git a/pkg/debrid/providers/alldebrid/alldebrid.go b/pkg/debrid/providers/alldebrid/alldebrid.go index 94a426e..7e53447 100644 --- a/pkg/debrid/providers/alldebrid/alldebrid.go +++ b/pkg/debrid/providers/alldebrid/alldebrid.go @@ -104,7 +104,7 @@ func (ad *AllDebrid) SubmitMagnet(torrent *types.Torrent) (*types.Torrent, error } magnets := data.Data.Magnets if len(magnets) == 0 { - return nil, fmt.Errorf("error adding torrent") + return nil, fmt.Errorf("error adding torrent. No magnets returned") } magnet := magnets[0] torrentId := strconv.Itoa(magnet.ID) diff --git a/pkg/webdav/file.go b/pkg/webdav/file.go index a5ca60a..e0ea316 100644 --- a/pkg/webdav/file.go +++ b/pkg/webdav/file.go @@ -101,7 +101,6 @@ func (f *File) getDownloadByteRange() (*[2]int64, error) { return byteRange, nil } -// setVideoStreamingHeaders sets the necessary headers for video streaming // It returns error and a boolean indicating if the request is a range request func (f *File) servePreloadedContent(w http.ResponseWriter, r *http.Request) error { content := f.content @@ -156,8 +155,6 @@ func (f *File) streamWithRetry(w http.ResponseWriter, r *http.Request, networkRe return &streamError{Err: err, StatusCode: http.StatusInternalServerError} } - setVideoStreamingHeaders(upstreamReq) - isRangeRequest := f.handleRangeRequest(upstreamReq, r, w) if isRangeRequest == -1 { return &streamError{Err: fmt.Errorf("invalid range"), StatusCode: http.StatusRequestedRangeNotSatisfiable} diff --git a/pkg/webdav/misc.go b/pkg/webdav/misc.go index 97a9091..9de243d 100644 --- a/pkg/webdav/misc.go +++ b/pkg/webdav/misc.go @@ -212,12 +212,3 @@ func parseRange(s string, size int64) ([]httpRange, error) { } return ranges, nil } - -func setVideoStreamingHeaders(req *http.Request) { - // Request optimizations for faster response - req.Header.Set("Accept", "*/*") - req.Header.Set("Accept-Encoding", "identity") - req.Header.Set("Connection", "keep-alive") - req.Header.Set("User-Agent", "VideoStream/1.0") - req.Header.Set("Priority", "u=1") -}