Add a new worker that checks if an account is opened
Some checks failed
Release Docker Build / docker (push) Has been cancelled
GoReleaser / goreleaser (push) Has been cancelled

This commit is contained in:
Mukhtar Akere
2025-09-17 23:30:45 +01:00
parent 3f0870cd1c
commit 22dae9efad
6 changed files with 139 additions and 32 deletions

View File

@@ -1,6 +1,8 @@
package account
import (
"fmt"
"net/http"
"sync/atomic"
"github.com/puzpuzpuz/xsync/v4"
@@ -17,6 +19,9 @@ type Account struct {
TrafficUsed atomic.Int64 `json:"traffic_used"` // Traffic used in bytes
Username string `json:"username"` // Username for the account
httpClient *request.Client
// Account reactivation tracking
DisableCount atomic.Int32 `json:"disable_count"`
}
func (a *Account) Equals(other *Account) bool {
@@ -69,3 +74,46 @@ func (a *Account) StoreDownloadLinks(dls map[string]*types.DownloadLink) {
a.StoreDownloadLink(*dl)
}
}
// MarkDisabled marks the account as disabled and increments the disable count
func (a *Account) MarkDisabled() {
a.Disabled.Store(true)
a.DisableCount.Add(1)
}
func (a *Account) Reset() {
a.DisableCount.Store(0)
a.Disabled.Store(false)
}
func (a *Account) CheckBandwidth() error {
// Get a one of the download links to check if the account is still valid
downloadLink := ""
a.links.Range(func(key string, dl types.DownloadLink) bool {
if dl.DownloadLink != "" {
downloadLink = dl.DownloadLink
return false
}
return true
})
if downloadLink == "" {
return fmt.Errorf("no download link found")
}
// Let's check the download link status
req, err := http.NewRequest(http.MethodGet, downloadLink, nil)
if err != nil {
return err
}
// Use a simple client
client := http.DefaultClient
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
return fmt.Errorf("account check failed with status code %d", resp.StatusCode)
}
return nil
}

View File

@@ -14,16 +14,22 @@ import (
"go.uber.org/ratelimit"
)
const (
MaxDisableCount = 3
)
type Manager struct {
debrid string
current atomic.Pointer[Account]
accounts *xsync.Map[string, *Account]
logger zerolog.Logger
}
func NewManager(debridConf config.Debrid, downloadRL ratelimit.Limiter, logger zerolog.Logger) *Manager {
m := &Manager{
debrid: debridConf.Name,
accounts: xsync.NewMap[string, *Account](),
logger: logger,
}
var firstAccount *Account
@@ -95,8 +101,16 @@ func (m *Manager) Current() *Account {
// Slow path - find new current account
activeAccounts := m.Active()
if len(activeAccounts) == 0 {
m.current.Store(nil)
return nil
// No active accounts left, try to use disabled ones
m.logger.Warn().Str("debrid", m.debrid).Msg("No active accounts available, all accounts are disabled")
allAccounts := m.All()
if len(allAccounts) == 0 {
m.logger.Error().Str("debrid", m.debrid).Msg("No accounts configured")
m.current.Store(nil)
return nil
}
m.current.Store(allAccounts[0])
return allAccounts[0]
}
newCurrent := activeAccounts[0]
@@ -104,16 +118,12 @@ func (m *Manager) Current() *Account {
return newCurrent
}
func (m *Manager) setCurrent(account *Account) {
m.current.Store(account)
}
func (m *Manager) Disable(account *Account) {
if account == nil {
return
}
account.Disabled.Store(true)
account.MarkDisabled()
// If we're disabling the current account, it will be replaced
// on the next Current() call - no need to proactively update
@@ -131,7 +141,7 @@ func (m *Manager) Disable(account *Account) {
func (m *Manager) Reset() {
m.accounts.Range(func(key string, acc *Account) bool {
acc.Disabled.Store(false)
acc.Reset()
return true
})
@@ -144,12 +154,6 @@ func (m *Manager) Reset() {
}
}
func (m *Manager) Update(account *Account) {
if account != nil {
m.accounts.Store(account.Token, account)
}
}
func (m *Manager) GetAccount(token string) (*Account, error) {
if token == "" {
return nil, fmt.Errorf("token cannot be empty")
@@ -209,3 +213,27 @@ func (m *Manager) Stats() []map[string]any {
}
return stats
}
func (m *Manager) CheckAndResetBandwidth() {
found := false
m.accounts.Range(func(key string, acc *Account) bool {
if acc.Disabled.Load() && acc.DisableCount.Load() < MaxDisableCount {
if err := acc.CheckBandwidth(); err == nil {
acc.Disabled.Store(false)
found = true
m.logger.Info().Str("debrid", m.debrid).Str("token", utils.Mask(acc.Token)).Msg("Re-activated disabled account")
} else {
m.logger.Debug().Err(err).Str("debrid", m.debrid).Str("token", utils.Mask(acc.Token)).Msg("Account still disabled")
}
}
return true
})
if found {
// If we re-activated any account, reset current to first active
activeAccounts := m.Active()
if len(activeAccounts) > 0 {
m.current.Store(activeAccounts[0])
}
}
}

View File

@@ -109,11 +109,54 @@ func (d *Storage) StartWorker(ctx context.Context) error {
ctx = context.Background()
}
// Start all debrid syncAccounts
// Runs every 1m
if err := d.syncAccounts(); err != nil {
return err
// Start syncAccounts worker
go d.syncAccountsWorker(ctx)
// Start bandwidth reset worker
go d.checkBandwidthWorker(ctx)
return nil
}
func (d *Storage) checkBandwidthWorker(ctx context.Context) {
if ctx == nil {
ctx = context.Background()
}
ticker := time.NewTicker(30 * time.Minute)
go func() {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
d.checkAccountBandwidth()
}
}
}()
}
func (d *Storage) checkAccountBandwidth() {
d.mu.Lock()
defer d.mu.Unlock()
for _, debrid := range d.debrids {
if debrid == nil || debrid.client == nil {
continue
}
accountManager := debrid.client.AccountManager()
if accountManager == nil {
continue
}
accountManager.CheckAndResetBandwidth()
}
}
func (d *Storage) syncAccountsWorker(ctx context.Context) {
if ctx == nil {
ctx = context.Background()
}
_ = d.syncAccounts()
ticker := time.NewTicker(5 * time.Minute)
go func() {
for {
@@ -125,7 +168,7 @@ func (d *Storage) StartWorker(ctx context.Context) error {
}
}
}()
return nil
}
func (d *Storage) syncAccounts() error {

View File

@@ -104,7 +104,7 @@ func (ad *AllDebrid) SubmitMagnet(torrent *types.Torrent) (*types.Torrent, error
}
magnets := data.Data.Magnets
if len(magnets) == 0 {
return nil, fmt.Errorf("error adding torrent")
return nil, fmt.Errorf("error adding torrent. No magnets returned")
}
magnet := magnets[0]
torrentId := strconv.Itoa(magnet.ID)

View File

@@ -101,7 +101,6 @@ func (f *File) getDownloadByteRange() (*[2]int64, error) {
return byteRange, nil
}
// setVideoStreamingHeaders sets the necessary headers for video streaming
// It returns error and a boolean indicating if the request is a range request
func (f *File) servePreloadedContent(w http.ResponseWriter, r *http.Request) error {
content := f.content
@@ -156,8 +155,6 @@ func (f *File) streamWithRetry(w http.ResponseWriter, r *http.Request, networkRe
return &streamError{Err: err, StatusCode: http.StatusInternalServerError}
}
setVideoStreamingHeaders(upstreamReq)
isRangeRequest := f.handleRangeRequest(upstreamReq, r, w)
if isRangeRequest == -1 {
return &streamError{Err: fmt.Errorf("invalid range"), StatusCode: http.StatusRequestedRangeNotSatisfiable}

View File

@@ -212,12 +212,3 @@ func parseRange(s string, size int64) ([]httpRange, error) {
}
return ranges, nil
}
func setVideoStreamingHeaders(req *http.Request) {
// Request optimizations for faster response
req.Header.Set("Accept", "*/*")
req.Header.Set("Accept-Encoding", "identity")
req.Header.Set("Connection", "keep-alive")
req.Header.Set("User-Agent", "VideoStream/1.0")
req.Header.Set("Priority", "u=1")
}