- Fix bandwidth limit error

- Add cooldowns for fair usage limit bug
- Fix repair bugs
This commit is contained in:
Mukhtar Akere
2025-04-09 20:00:06 +01:00
parent 92177b150b
commit a357897222
11 changed files with 205 additions and 156 deletions

View File

@@ -62,6 +62,23 @@ type Client struct {
retryableStatus map[int]bool retryableStatus map[int]bool
logger zerolog.Logger logger zerolog.Logger
proxy string proxy string
// cooldown
statusCooldowns map[int]time.Duration
statusCooldownsMu sync.RWMutex
lastStatusTime map[int]time.Time
lastStatusTimeMu sync.RWMutex
}
func WithStatusCooldown(statusCode int, cooldown time.Duration) ClientOption {
return func(c *Client) {
c.statusCooldownsMu.Lock()
if c.statusCooldowns == nil {
c.statusCooldowns = make(map[int]time.Duration)
}
c.statusCooldowns[statusCode] = cooldown
c.statusCooldownsMu.Unlock()
}
} }
// WithMaxRetries sets the maximum number of retry attempts // WithMaxRetries sets the maximum number of retry attempts
@@ -177,7 +194,40 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
} }
c.headersMu.RUnlock() c.headersMu.RUnlock()
if attempt > 0 && resp != nil {
c.statusCooldownsMu.RLock()
cooldown, exists := c.statusCooldowns[resp.StatusCode]
c.statusCooldownsMu.RUnlock()
if exists {
c.lastStatusTimeMu.RLock()
lastTime, timeExists := c.lastStatusTime[resp.StatusCode]
c.lastStatusTimeMu.RUnlock()
if timeExists {
elapsed := time.Since(lastTime)
if elapsed < cooldown {
// We need to wait longer for this status code
waitTime := cooldown - elapsed
select {
case <-req.Context().Done():
return nil, req.Context().Err()
case <-time.After(waitTime):
// Continue after waiting
}
}
}
}
}
resp, err = c.doRequest(req) resp, err = c.doRequest(req)
if err == nil {
c.lastStatusTimeMu.Lock()
c.lastStatusTime[resp.StatusCode] = time.Now()
c.lastStatusTimeMu.Unlock()
}
if err != nil { if err != nil {
// Check if this is a network error that might be worth retrying // Check if this is a network error that might be worth retrying
if attempt < c.maxRetries { if attempt < c.maxRetries {
@@ -271,10 +321,12 @@ func New(options ...ClientOption) *Client {
http.StatusServiceUnavailable: true, http.StatusServiceUnavailable: true,
http.StatusGatewayTimeout: true, http.StatusGatewayTimeout: true,
}, },
logger: logger.New("request"), logger: logger.New("request"),
timeout: 60 * time.Second, timeout: 60 * time.Second,
proxy: "", proxy: "",
headers: make(map[string]string), // Initialize headers map headers: make(map[string]string), // Initialize headers map
statusCooldowns: make(map[int]time.Duration),
lastStatusTime: make(map[int]time.Time),
} }
// default http client // default http client

View File

@@ -19,12 +19,13 @@ import (
) )
type AllDebrid struct { type AllDebrid struct {
Name string Name string
Host string `json:"host"` Host string `json:"host"`
APIKey string APIKey string
DownloadKeys []string DownloadKeys []string
DownloadUncached bool ActiveDownloadKeys []string
client *request.Client DownloadUncached bool
client *request.Client
MountPath string MountPath string
logger zerolog.Logger logger zerolog.Logger
@@ -251,7 +252,7 @@ func (ad *AllDebrid) GenerateDownloadLinks(t *types.Torrent) error {
for _, file := range t.Files { for _, file := range t.Files {
go func(file types.File) { go func(file types.File) {
defer wg.Done() defer wg.Done()
link, err := ad.GetDownloadLink(t, &file, 0) link, err := ad.GetDownloadLink(t, &file)
if err != nil { if err != nil {
errCh <- err errCh <- err
return return
@@ -286,7 +287,7 @@ func (ad *AllDebrid) GenerateDownloadLinks(t *types.Torrent) error {
return nil return nil
} }
func (ad *AllDebrid) GetDownloadLink(t *types.Torrent, file *types.File, index int) (string, error) { func (ad *AllDebrid) GetDownloadLink(t *types.Torrent, file *types.File) (string, error) {
url := fmt.Sprintf("%s/link/unlock", ad.Host) url := fmt.Sprintf("%s/link/unlock", ad.Host)
query := gourl.Values{} query := gourl.Values{}
query.Add("link", file.Link) query.Add("link", file.Link)
@@ -363,6 +364,9 @@ func (ad *AllDebrid) GetMountPath() string {
return ad.MountPath return ad.MountPath
} }
func (ad *AllDebrid) GetDownloadKeys() []string { func (ad *AllDebrid) RemoveActiveDownloadKey() {
return ad.DownloadKeys }
func (ad *AllDebrid) ResetActiveDownloadKeys() {
ad.ActiveDownloadKeys = ad.DownloadKeys
} }

View File

@@ -585,7 +585,7 @@ func (c *Cache) ProcessTorrent(t *types.Torrent, refreshRclone bool) error {
return nil return nil
} }
func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string, index int) string { func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) string {
// Check link cache // Check link cache
if dl := c.checkDownloadLink(fileLink); dl != "" { if dl := c.checkDownloadLink(fileLink); dl != "" {
@@ -622,7 +622,7 @@ func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string, index int)
} }
c.logger.Trace().Msgf("Getting download link for %s", filename) c.logger.Trace().Msgf("Getting download link for %s", filename)
downloadLink, err := c.client.GetDownloadLink(ct.Torrent, &file, index) downloadLink, err := c.client.GetDownloadLink(ct.Torrent, &file)
if err != nil { if err != nil {
if errors.Is(err, request.HosterUnavailableError) { if errors.Is(err, request.HosterUnavailableError) {
c.logger.Debug().Err(err).Msgf("Hoster is unavailable. Triggering repair for %s", ct.Name) c.logger.Debug().Err(err).Msgf("Hoster is unavailable. Triggering repair for %s", ct.Name)
@@ -634,7 +634,7 @@ func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string, index int)
c.logger.Debug().Msgf("Reinserted torrent %s", ct.Name) c.logger.Debug().Msgf("Reinserted torrent %s", ct.Name)
file = ct.Files[filename] file = ct.Files[filename]
// Retry getting the download link // Retry getting the download link
downloadLink, err = c.client.GetDownloadLink(ct.Torrent, &file, index) downloadLink, err = c.client.GetDownloadLink(ct.Torrent, &file)
if err != nil { if err != nil {
c.logger.Debug().Err(err).Msgf("Failed to get download link for %s", file.Link) c.logger.Debug().Err(err).Msgf("Failed to get download link for %s", file.Link)
return "" return ""
@@ -651,6 +651,8 @@ func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string, index int)
c.setTorrent(ct) c.setTorrent(ct)
}() }()
return file.DownloadLink return file.DownloadLink
} else if errors.Is(err, request.TrafficExceededError) {
// This is likely a fair usage limit error
} else { } else {
c.logger.Debug().Err(err).Msgf("Failed to get download link for %s", file.Link) c.logger.Debug().Err(err).Msgf("Failed to get download link for %s", file.Link)
return "" return ""
@@ -723,6 +725,10 @@ func (c *Cache) RemoveDownloadLink(link string) {
func (c *Cache) MarkDownloadLinkAsInvalid(downloadLink, reason string) { func (c *Cache) MarkDownloadLinkAsInvalid(downloadLink, reason string) {
c.invalidDownloadLinks.Store(downloadLink, reason) c.invalidDownloadLinks.Store(downloadLink, reason)
// Remove the download api key from active
if reason == "bandwidth_exceeded" {
c.client.RemoveActiveDownloadKey()
}
} }
func (c *Cache) IsDownloadLinkInvalid(downloadLink string) bool { func (c *Cache) IsDownloadLinkInvalid(downloadLink string) bool {
@@ -781,7 +787,3 @@ func (c *Cache) OnRemove(torrentId string) {
func (c *Cache) GetLogger() zerolog.Logger { func (c *Cache) GetLogger() zerolog.Logger {
return c.logger return c.logger
} }
func (c *Cache) TotalDownloadKeys() int {
return len(c.client.GetDownloadKeys())
}

View File

@@ -162,13 +162,7 @@ func (c *Cache) reInsertTorrent(torrent *types.Torrent) (*CachedTorrent, error)
return ct, nil return ct, nil
} }
func (c *Cache) refreshDownloadLink(link string) error {
// A generated download link has being limited
// Generate a new one with other API keys
// Temporarily remove the old one
return nil
}
func (c *Cache) resetInvalidLinks() { func (c *Cache) resetInvalidLinks() {
c.invalidDownloadLinks = xsync.NewMapOf[string, string]() c.invalidDownloadLinks = xsync.NewMapOf[string, string]()
c.client.ResetActiveDownloadKeys() // Reset the active download keys
} }

View File

@@ -4,7 +4,7 @@ import "time"
func (c *Cache) Refresh() error { func (c *Cache) Refresh() error {
// For now, we just want to refresh the listing and download links // For now, we just want to refresh the listing and download links
go c.refreshDownloadLinksWorker() //go c.refreshDownloadLinksWorker()
go c.refreshTorrentsWorker() go c.refreshTorrentsWorker()
go c.resetInvalidLinksWorker() go c.resetInvalidLinksWorker()
return nil return nil

View File

@@ -18,12 +18,13 @@ import (
) )
type DebridLink struct { type DebridLink struct {
Name string Name string
Host string `json:"host"` Host string `json:"host"`
APIKey string APIKey string
DownloadKeys []string DownloadKeys []string
DownloadUncached bool ActiveDownloadKeys []string
client *request.Client DownloadUncached bool
client *request.Client
MountPath string MountPath string
logger zerolog.Logger logger zerolog.Logger
@@ -243,7 +244,7 @@ func (dl *DebridLink) GetDownloads() (map[string]types.DownloadLinks, error) {
return nil, nil return nil, nil
} }
func (dl *DebridLink) GetDownloadLink(t *types.Torrent, file *types.File, index int) (string, error) { func (dl *DebridLink) GetDownloadLink(t *types.Torrent, file *types.File) (string, error) {
return file.DownloadLink, nil return file.DownloadLink, nil
} }
@@ -368,6 +369,9 @@ func (dl *DebridLink) GetMountPath() string {
return dl.MountPath return dl.MountPath
} }
func (dl *DebridLink) GetDownloadKeys() []string { func (dl *DebridLink) RemoveActiveDownloadKey() {
return dl.DownloadKeys }
func (dl *DebridLink) ResetActiveDownloadKeys() {
dl.ActiveDownloadKeys = dl.DownloadKeys
} }

View File

@@ -25,8 +25,10 @@ type RealDebrid struct {
Name string Name string
Host string `json:"host"` Host string `json:"host"`
APIKey string APIKey string
DownloadKeys []string // This is used for bandwidth DownloadKeys []string // This is used for bandwidth
ActiveDownloadKeys []string // This is used for active downloads api keys
downloadKeysMux sync.Mutex
DownloadUncached bool DownloadUncached bool
client *request.Client client *request.Client
@@ -36,6 +38,36 @@ type RealDebrid struct {
CheckCached bool CheckCached bool
} }
func New(dc config.Debrid) *RealDebrid {
rl := request.ParseRateLimit(dc.RateLimit)
headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", dc.APIKey),
}
_log := logger.New(dc.Name)
client := request.New(
request.WithHeaders(headers),
request.WithRateLimiter(rl),
request.WithLogger(_log),
request.WithMaxRetries(5),
request.WithRetryableStatus(429, 447),
request.WithProxy(dc.Proxy),
request.WithStatusCooldown(447, 10*time.Second), // 447 is a fair use error
)
return &RealDebrid{
Name: "realdebrid",
Host: dc.Host,
APIKey: dc.APIKey,
DownloadKeys: dc.DownloadAPIKeys,
ActiveDownloadKeys: dc.DownloadAPIKeys,
DownloadUncached: dc.DownloadUncached,
client: client,
MountPath: dc.Folder,
logger: logger.New(dc.Name),
CheckCached: dc.CheckCached,
}
}
func (r *RealDebrid) GetName() string { func (r *RealDebrid) GetName() string {
return r.Name return r.Name
} }
@@ -287,7 +319,7 @@ func (r *RealDebrid) GenerateDownloadLinks(t *types.Torrent) error {
go func(file types.File) { go func(file types.File) {
defer wg.Done() defer wg.Done()
link, err := r.GetDownloadLink(t, &file, 0) link, err := r.GetDownloadLink(t, &file)
if err != nil { if err != nil {
errCh <- err errCh <- err
return return
@@ -365,8 +397,10 @@ func (r *RealDebrid) _getDownloadLink(file *types.File) (string, error) {
return "", request.HosterUnavailableError // Link has been nerfed return "", request.HosterUnavailableError // Link has been nerfed
case 19: case 19:
return "", request.HosterUnavailableError // File has been removed return "", request.HosterUnavailableError // File has been removed
case 36:
return "", request.TrafficExceededError // File has been nerfed
default: default:
return "", fmt.Errorf("realdebrid API error: %d", resp.StatusCode) return "", fmt.Errorf("realdebrid API error: Status: %d || Code: %d", resp.StatusCode, data.ErrorCode)
} }
} }
b, err := io.ReadAll(resp.Body) b, err := io.ReadAll(resp.Body)
@@ -381,28 +415,29 @@ func (r *RealDebrid) _getDownloadLink(file *types.File) (string, error) {
} }
func (r *RealDebrid) GetDownloadLink(t *types.Torrent, file *types.File, index int) (string, error) { func (r *RealDebrid) GetDownloadLink(t *types.Torrent, file *types.File) (string, error) {
if len(r.ActiveDownloadKeys) < 1 {
if index >= len(r.DownloadKeys) { // No active download keys. It's likely that the key has reached bandwidth limit
// Reset to first key return "", fmt.Errorf("no active download keys")
index = 0
} }
r.client.SetHeader("Authorization", fmt.Sprintf("Bearer %s", r.DownloadKeys[index])) defer r.client.SetHeader("Authorization", fmt.Sprintf("Bearer %s", r.APIKey))
link, err := r._getDownloadLink(file) for idx := range r.ActiveDownloadKeys {
if err == nil && link != "" { r.client.SetHeader("Authorization", fmt.Sprintf("Bearer %s", r.ActiveDownloadKeys[idx]))
return link, nil link, err := r._getDownloadLink(file)
} if err == nil && link != "" {
if err != nil && errors.Is(err, request.HosterUnavailableError) { return link, nil
// Try the next key }
if index+1 < len(r.DownloadKeys) { if err != nil {
return r.GetDownloadLink(t, file, index+1) if errors.Is(err, request.TrafficExceededError) {
// Retry with the next API key
continue
} else {
return "", err
}
} }
} }
// If we reach here, it means we have tried all keys // If we reach here, it means all API keys have been exhausted
// and none of them worked, or the error is not related to the keys return "", fmt.Errorf("all API keys have been exhausted")
// Reset to the first key
r.client.SetHeader("Authorization", fmt.Sprintf("Bearer %s", r.APIKey))
return link, err
} }
func (r *RealDebrid) GetCheckCached() bool { func (r *RealDebrid) GetCheckCached() bool {
@@ -511,6 +546,13 @@ func (r *RealDebrid) GetDownloads() (map[string]types.DownloadLinks, error) {
links := make(map[string]types.DownloadLinks) links := make(map[string]types.DownloadLinks)
offset := 0 offset := 0
limit := 1000 limit := 1000
if len(r.ActiveDownloadKeys) < 1 {
// No active download keys. It's likely that the key has reached bandwidth limit
return nil, fmt.Errorf("no active download keys")
}
r.client.SetHeader("Authorization", fmt.Sprintf("Bearer %s", r.ActiveDownloadKeys[0]))
// Reset to the API key
defer r.client.SetHeader("Authorization", fmt.Sprintf("Bearer %s", r.APIKey))
for { for {
dl, err := r._getDownloads(offset, limit) dl, err := r._getDownloads(offset, limit)
if err != nil { if err != nil {
@@ -574,34 +616,16 @@ func (r *RealDebrid) GetMountPath() string {
return r.MountPath return r.MountPath
} }
func New(dc config.Debrid) *RealDebrid { func (r *RealDebrid) RemoveActiveDownloadKey() {
rl := request.ParseRateLimit(dc.RateLimit) r.downloadKeysMux.Lock()
defer r.downloadKeysMux.Unlock()
headers := map[string]string{ if len(r.ActiveDownloadKeys) > 0 {
"Authorization": fmt.Sprintf("Bearer %s", dc.APIKey), r.ActiveDownloadKeys = r.ActiveDownloadKeys[1:]
}
_log := logger.New(dc.Name)
client := request.New(
request.WithHeaders(headers),
request.WithRateLimiter(rl),
request.WithLogger(_log),
request.WithMaxRetries(5),
request.WithRetryableStatus(429),
request.WithProxy(dc.Proxy),
)
return &RealDebrid{
Name: "realdebrid",
Host: dc.Host,
APIKey: dc.APIKey,
DownloadKeys: dc.DownloadAPIKeys,
DownloadUncached: dc.DownloadUncached,
client: client,
MountPath: dc.Folder,
logger: logger.New(dc.Name),
CheckCached: dc.CheckCached,
} }
} }
func (r *RealDebrid) GetDownloadKeys() []string { func (r *RealDebrid) ResetActiveDownloadKeys() {
return r.DownloadKeys r.downloadKeysMux.Lock()
defer r.downloadKeysMux.Unlock()
r.ActiveDownloadKeys = r.DownloadKeys
} }

View File

@@ -22,12 +22,13 @@ import (
) )
type Torbox struct { type Torbox struct {
Name string Name string
Host string `json:"host"` Host string `json:"host"`
APIKey string APIKey string
DownloadKeys []string DownloadKeys []string
DownloadUncached bool ActiveDownloadKeys []string
client *request.Client DownloadUncached bool
client *request.Client
MountPath string MountPath string
logger zerolog.Logger logger zerolog.Logger
@@ -280,7 +281,7 @@ func (tb *Torbox) GenerateDownloadLinks(t *types.Torrent) error {
for _, file := range t.Files { for _, file := range t.Files {
go func() { go func() {
defer wg.Done() defer wg.Done()
link, err := tb.GetDownloadLink(t, &file, 0) link, err := tb.GetDownloadLink(t, &file)
if err != nil { if err != nil {
errCh <- err errCh <- err
return return
@@ -312,7 +313,7 @@ func (tb *Torbox) GenerateDownloadLinks(t *types.Torrent) error {
return nil return nil
} }
func (tb *Torbox) GetDownloadLink(t *types.Torrent, file *types.File, index int) (string, error) { func (tb *Torbox) GetDownloadLink(t *types.Torrent, file *types.File) (string, error) {
url := fmt.Sprintf("%s/api/torrents/requestdl/", tb.Host) url := fmt.Sprintf("%s/api/torrents/requestdl/", tb.Host)
query := gourl.Values{} query := gourl.Values{}
query.Add("torrent_id", t.Id) query.Add("torrent_id", t.Id)
@@ -363,6 +364,9 @@ func (tb *Torbox) GetMountPath() string {
return tb.MountPath return tb.MountPath
} }
func (tb *Torbox) GetDownloadKeys() []string { func (tb *Torbox) RemoveActiveDownloadKey() {
return tb.DownloadKeys }
func (tb *Torbox) ResetActiveDownloadKeys() {
tb.ActiveDownloadKeys = tb.DownloadKeys
} }

View File

@@ -8,7 +8,7 @@ type Client interface {
SubmitMagnet(tr *Torrent) (*Torrent, error) SubmitMagnet(tr *Torrent) (*Torrent, error)
CheckStatus(tr *Torrent, isSymlink bool) (*Torrent, error) CheckStatus(tr *Torrent, isSymlink bool) (*Torrent, error)
GenerateDownloadLinks(tr *Torrent) error GenerateDownloadLinks(tr *Torrent) error
GetDownloadLink(tr *Torrent, file *File, index int) (string, error) GetDownloadLink(tr *Torrent, file *File) (string, error)
DeleteTorrent(torrentId string) error DeleteTorrent(torrentId string) error
IsAvailable(infohashes []string) map[string]bool IsAvailable(infohashes []string) map[string]bool
GetCheckCached() bool GetCheckCached() bool
@@ -21,5 +21,6 @@ type Client interface {
GetDownloads() (map[string]DownloadLinks, error) GetDownloads() (map[string]DownloadLinks, error)
CheckLink(link string) error CheckLink(link string) error
GetMountPath() string GetMountPath() string
GetDownloadKeys() []string RemoveActiveDownloadKey()
ResetActiveDownloadKeys()
} }

View File

@@ -747,13 +747,21 @@ func (r *Repair) loadFromFile() {
r.Jobs = make(map[string]*Job) r.Jobs = make(map[string]*Job)
return return
} }
jobs := make(map[string]*Job) _jobs := make(map[string]*Job)
err = json.Unmarshal(data, &jobs) err = json.Unmarshal(data, &_jobs)
if err != nil { if err != nil {
r.logger.Trace().Err(err).Msg("Failed to unmarshal jobs; resetting") r.logger.Trace().Err(err).Msg("Failed to unmarshal jobs; resetting")
r.Jobs = make(map[string]*Job) r.Jobs = make(map[string]*Job)
return return
} }
jobs := make(map[string]*Job)
for k, v := range _jobs {
if v.Status != JobPending {
// Skip jobs that are not pending processing due to reboot
continue
}
jobs[k] = v
}
r.Jobs = jobs r.Jobs = jobs
} }

View File

@@ -3,20 +3,13 @@ package webdav
import ( import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"github.com/sirrobot01/debrid-blackhole/internal/request"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/debrid" "github.com/sirrobot01/debrid-blackhole/pkg/debrid/debrid"
"io" "io"
"net/http" "net/http"
"os" "os"
"sync"
"time" "time"
) )
var (
sdClient *request.Client
once sync.Once
)
var sharedClient = &http.Client{ var sharedClient = &http.Client{
Transport: &http.Transport{ Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
@@ -33,32 +26,6 @@ var sharedClient = &http.Client{
Timeout: 60 * time.Second, Timeout: 60 * time.Second,
} }
func getClient() *request.Client {
once.Do(func() {
var transport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
Proxy: http.ProxyFromEnvironment,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 20,
MaxConnsPerHost: 50,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DisableKeepAlives: false,
}
sdClient = request.New(
request.WithTransport(transport),
request.WithTimeout(30*time.Second),
request.WithHeaders(map[string]string{
"Accept": "*/*",
"Connection": "keep-alive",
}),
)
})
return sdClient
}
type File struct { type File struct {
cache *debrid.Cache cache *debrid.Cache
fileId string fileId string
@@ -92,20 +59,20 @@ func (f *File) Close() error {
return nil return nil
} }
func (f *File) getDownloadLink(index int) string { func (f *File) getDownloadLink() string {
// Check if we already have a final URL cached // Check if we already have a final URL cached
if f.downloadLink != "" && isValidURL(f.downloadLink) { if f.downloadLink != "" && isValidURL(f.downloadLink) {
return f.downloadLink return f.downloadLink
} }
downloadLink := f.cache.GetDownloadLink(f.torrentId, f.name, f.link, index) downloadLink := f.cache.GetDownloadLink(f.torrentId, f.name, f.link)
if downloadLink != "" && isValidURL(downloadLink) { if downloadLink != "" && isValidURL(downloadLink) {
return downloadLink return downloadLink
} }
return "" return ""
} }
func (f *File) stream(index int) (*http.Response, error) { func (f *File) stream() (*http.Response, error) {
client := sharedClient // Might be replaced with the custom client client := sharedClient // Might be replaced with the custom client
_log := f.cache.GetLogger() _log := f.cache.GetLogger()
var ( var (
@@ -113,7 +80,7 @@ func (f *File) stream(index int) (*http.Response, error) {
downloadLink string downloadLink string
) )
downloadLink = f.getDownloadLink(index) // Uses the first API key downloadLink = f.getDownloadLink() // Uses the first API key
if downloadLink == "" { if downloadLink == "" {
_log.Error().Msgf("Failed to get download link for %s", f.name) _log.Error().Msgf("Failed to get download link for %s", f.name)
return nil, fmt.Errorf("failed to get download link") return nil, fmt.Errorf("failed to get download link")
@@ -136,27 +103,16 @@ func (f *File) stream(index int) (*http.Response, error) {
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
closeResp := func() { closeResp := func() {
io.Copy(io.Discard, resp.Body) _, _ = io.Copy(io.Discard, resp.Body)
resp.Body.Close() resp.Body.Close()
} }
if resp.StatusCode == http.StatusServiceUnavailable { if resp.StatusCode == http.StatusServiceUnavailable {
closeResp() closeResp()
// Read the body to consume the response // Read the body to consume the response
f.cache.MarkDownloadLinkAsInvalid(downloadLink, "bandwidth limit reached") f.cache.MarkDownloadLinkAsInvalid(downloadLink, "bandwidth_exceeded")
// Generate a new download link // Retry with a different API key if it's available
if index < f.cache.TotalDownloadKeys()-1 { return f.stream()
// Retry with a different download key
_log.Debug().Str("link", downloadLink).
Str("torrentId", f.torrentId).
Str("fileId", f.fileId).
Msgf("Bandwidth limit reached, retrying with another API key, attempt %d", index+1)
return f.stream(index + 1) // Retry with the next download key
} else {
// No more download keys available, return an error
_log.Error().Msgf("Bandwidth limit reached for all download keys")
return nil, fmt.Errorf("bandwidth_limit_exceeded")
}
} else if resp.StatusCode == http.StatusNotFound { } else if resp.StatusCode == http.StatusNotFound {
closeResp() closeResp()
@@ -165,7 +121,7 @@ func (f *File) stream(index int) (*http.Response, error) {
f.cache.MarkDownloadLinkAsInvalid(downloadLink, "link_not_found") f.cache.MarkDownloadLinkAsInvalid(downloadLink, "link_not_found")
f.cache.RemoveDownloadLink(f.link) // Remove the link from the cache f.cache.RemoveDownloadLink(f.link) // Remove the link from the cache
// Generate a new download link // Generate a new download link
downloadLink = f.getDownloadLink(index) downloadLink = f.getDownloadLink()
if downloadLink == "" { if downloadLink == "" {
_log.Error().Msgf("Failed to get download link for %s", f.name) _log.Error().Msgf("Failed to get download link for %s", f.name)
return nil, fmt.Errorf("failed to get download link") return nil, fmt.Errorf("failed to get download link")
@@ -224,7 +180,7 @@ func (f *File) Read(p []byte) (n int, err error) {
} }
// Make the request to get the file // Make the request to get the file
resp, err := f.stream(0) resp, err := f.stream()
if err != nil { if err != nil {
return 0, err return 0, err
} }