diff --git a/internal/config/misc.go b/internal/config/misc.go index c18e6bf..6bbaaea 100644 --- a/internal/config/misc.go +++ b/internal/config/misc.go @@ -36,12 +36,12 @@ func getDefaultExtensions() []string { } // Remove duplicates - seen := make(map[string]bool) + seen := make(map[string]struct{}) var unique []string for _, ext := range allExts { - if !seen[ext] { - seen[ext] = true + if _, ok := seen[ext]; !ok { + seen[ext] = struct{}{} unique = append(unique, ext) } } diff --git a/internal/request/request.go b/internal/request/request.go index 3000db8..34cbbba 100644 --- a/internal/request/request.go +++ b/internal/request/request.go @@ -59,7 +59,7 @@ type Client struct { maxRetries int timeout time.Duration skipTLSVerify bool - retryableStatus map[int]bool + retryableStatus map[int]struct{} logger zerolog.Logger proxy string @@ -139,7 +139,7 @@ func WithTransport(transport *http.Transport) ClientOption { func WithRetryableStatus(statusCodes ...int) ClientOption { return func(c *Client) { for _, code := range statusCodes { - c.retryableStatus[code] = true + c.retryableStatus[code] = struct{}{} } } } @@ -250,7 +250,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { } // Check if the status code is retryable - if !c.retryableStatus[resp.StatusCode] || attempt == c.maxRetries { + if _, ok := c.retryableStatus[resp.StatusCode]; !ok || attempt == c.maxRetries { return resp, nil } @@ -314,12 +314,12 @@ func New(options ...ClientOption) *Client { client := &Client{ maxRetries: 3, skipTLSVerify: true, - retryableStatus: map[int]bool{ - http.StatusTooManyRequests: true, - http.StatusInternalServerError: true, - http.StatusBadGateway: true, - http.StatusServiceUnavailable: true, - http.StatusGatewayTimeout: true, + retryableStatus: map[int]struct{}{ + http.StatusTooManyRequests: struct{}{}, + http.StatusInternalServerError: struct{}{}, + http.StatusBadGateway: struct{}{}, + http.StatusServiceUnavailable: struct{}{}, + http.StatusGatewayTimeout: struct{}{}, }, logger: logger.New("request"), timeout: 60 * time.Second, @@ -341,11 +341,32 @@ func New(options ...ClientOption) *Client { // Check if transport was set by WithTransport option if client.client.Transport == nil { - // No custom transport provided, create the default one transport := &http.Transport{ TLSClientConfig: &tls.Config{ InsecureSkipVerify: client.skipTLSVerify, }, + // Connection pooling + MaxIdleConns: 100, + MaxIdleConnsPerHost: 50, + MaxConnsPerHost: 100, + + // Timeouts + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ResponseHeaderTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + + // TCP keep-alive + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + + // Enable HTTP/2 + ForceAttemptHTTP2: true, + + // Disable compression to save CPU + DisableCompression: false, } // Configure proxy if needed diff --git a/pkg/debrid/alldebrid/alldebrid.go b/pkg/debrid/alldebrid/alldebrid.go index 38cac4a..63560b9 100644 --- a/pkg/debrid/alldebrid/alldebrid.go +++ b/pkg/debrid/alldebrid/alldebrid.go @@ -3,6 +3,7 @@ package alldebrid import ( "fmt" "github.com/goccy/go-json" + "github.com/puzpuzpuz/xsync/v3" "github.com/rs/zerolog" "github.com/sirrobot01/debrid-blackhole/internal/config" "github.com/sirrobot01/debrid-blackhole/internal/logger" @@ -19,13 +20,12 @@ import ( ) type AllDebrid struct { - Name string - Host string `json:"host"` - APIKey string - DownloadKeys []string - ActiveDownloadKeys []string - DownloadUncached bool - client *request.Client + Name string + Host string `json:"host"` + APIKey string + DownloadKeys *xsync.MapOf[string, types.Account] + DownloadUncached bool + client *request.Client MountPath string logger zerolog.Logger @@ -45,11 +45,21 @@ func New(dc config.Debrid) *AllDebrid { request.WithRateLimiter(rl), request.WithProxy(dc.Proxy), ) + + accounts := xsync.NewMapOf[string, types.Account]() + for idx, key := range dc.DownloadAPIKeys { + id := strconv.Itoa(idx) + accounts.Store(id, types.Account{ + Name: key, + ID: id, + Token: key, + }) + } return &AllDebrid{ Name: "alldebrid", Host: dc.Host, APIKey: dc.APIKey, - DownloadKeys: dc.DownloadAPIKeys, + DownloadKeys: accounts, DownloadUncached: dc.DownloadUncached, client: client, MountPath: dc.Folder, @@ -252,13 +262,14 @@ func (ad *AllDebrid) GenerateDownloadLinks(t *types.Torrent) error { for _, file := range t.Files { go func(file types.File) { defer wg.Done() - link, err := ad.GetDownloadLink(t, &file) + link, accountId, err := ad.GetDownloadLink(t, &file) if err != nil { errCh <- err return } file.DownloadLink = link file.Generated = time.Now() + file.AccountId = accountId if link == "" { errCh <- fmt.Errorf("error getting download links %w", err) return @@ -287,7 +298,7 @@ func (ad *AllDebrid) GenerateDownloadLinks(t *types.Torrent) error { return nil } -func (ad *AllDebrid) GetDownloadLink(t *types.Torrent, file *types.File) (string, error) { +func (ad *AllDebrid) GetDownloadLink(t *types.Torrent, file *types.File) (string, string, error) { url := fmt.Sprintf("%s/link/unlock", ad.Host) query := gourl.Values{} query.Add("link", file.Link) @@ -295,17 +306,17 @@ func (ad *AllDebrid) GetDownloadLink(t *types.Torrent, file *types.File) (string req, _ := http.NewRequest(http.MethodGet, url, nil) resp, err := ad.client.MakeRequest(req) if err != nil { - return "", err + return "", "", err } var data DownloadLink if err = json.Unmarshal(resp, &data); err != nil { - return "", err + return "", "", err } link := data.Data.Link if link == "" { - return "", fmt.Errorf("error getting download links %s", data.Error.Message) + return "", "", fmt.Errorf("error getting download links %s", data.Error.Message) } - return link, nil + return link, "0", nil } func (ad *AllDebrid) GetCheckCached() bool { @@ -364,9 +375,9 @@ func (ad *AllDebrid) GetMountPath() string { return ad.MountPath } -func (ad *AllDebrid) RemoveActiveDownloadKey() { +func (ad *AllDebrid) DisableAccount(accountId string) { } func (ad *AllDebrid) ResetActiveDownloadKeys() { - ad.ActiveDownloadKeys = ad.DownloadKeys + } diff --git a/pkg/debrid/debrid/cache.go b/pkg/debrid/debrid/cache.go index f5be995..71ef238 100644 --- a/pkg/debrid/debrid/cache.go +++ b/pkg/debrid/debrid/cache.go @@ -46,7 +46,7 @@ type CachedTorrent struct { type downloadLinkCache struct { Link string - KeyIndex int + AccountId string ExpiresAt time.Time } @@ -79,7 +79,7 @@ type Cache struct { // repair repairChan chan RepairRequest - repairsInProgress *xsync.MapOf[string, bool] + repairsInProgress *xsync.MapOf[string, struct{}] // config workers int @@ -128,7 +128,7 @@ func New(dc config.Debrid, client types.Client) *Cache { PropfindResp: xsync.NewMapOf[string, PropfindResponse](), folderNaming: WebDavFolderNaming(dc.FolderNaming), autoExpiresLinksAfter: autoExpiresLinksAfter, - repairsInProgress: xsync.NewMapOf[string, bool](), + repairsInProgress: xsync.NewMapOf[string, struct{}](), saveSemaphore: make(chan struct{}, 50), ctx: context.Background(), } @@ -238,9 +238,6 @@ func (c *Cache) load() (map[string]*CachedTorrent, error) { ct.AddedOn = addedOn ct.IsComplete = true results.Store(ct.Id, &ct) - } else { - // Delete the file if it's not complete - _ = os.Remove(filePath) } } } @@ -284,9 +281,9 @@ func (c *Cache) Sync() error { c.logger.Info().Msgf("Got %d torrents from %s", len(torrents), c.client.GetName()) newTorrents := make([]*types.Torrent, 0) - idStore := make(map[string]bool, len(torrents)) + idStore := make(map[string]struct{}, len(torrents)) for _, t := range torrents { - idStore[t.Id] = true + idStore[t.Id] = struct{}{} if _, ok := cachedTorrents[t.Id]; !ok { newTorrents = append(newTorrents, t) } @@ -622,7 +619,7 @@ func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) string { } c.logger.Trace().Msgf("Getting download link for %s", filename) - downloadLink, err := c.client.GetDownloadLink(ct.Torrent, &file) + downloadLink, accountId, err := c.client.GetDownloadLink(ct.Torrent, &file) if err != nil { if errors.Is(err, request.HosterUnavailableError) { c.logger.Debug().Err(err).Msgf("Hoster is unavailable. Triggering repair for %s", ct.Name) @@ -634,7 +631,7 @@ func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) string { c.logger.Debug().Msgf("Reinserted torrent %s", ct.Name) file = ct.Files[filename] // Retry getting the download link - downloadLink, err = c.client.GetDownloadLink(ct.Torrent, &file) + downloadLink, accountId, err = c.client.GetDownloadLink(ct.Torrent, &file) if err != nil { c.logger.Debug().Err(err).Msgf("Failed to get download link for %s", file.Link) return "" @@ -645,9 +642,10 @@ func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) string { } file.DownloadLink = downloadLink file.Generated = time.Now() + file.AccountId = accountId ct.Files[filename] = file go func() { - c.updateDownloadLink(file.Link, downloadLink, 0) + c.updateDownloadLink(file.Link, downloadLink, accountId) c.setTorrent(ct) }() return file.DownloadLink @@ -660,10 +658,11 @@ func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) string { } file.DownloadLink = downloadLink file.Generated = time.Now() + file.AccountId = accountId ct.Files[filename] = file go func() { - c.updateDownloadLink(file.Link, downloadLink, 0) + c.updateDownloadLink(file.Link, downloadLink, file.AccountId) c.setTorrent(ct) }() return file.DownloadLink @@ -674,7 +673,7 @@ func (c *Cache) GenerateDownloadLinks(t *CachedTorrent) { c.logger.Error().Err(err).Msg("Failed to generate download links") } for _, file := range t.Files { - c.updateDownloadLink(file.Link, file.DownloadLink, 0) + c.updateDownloadLink(file.Link, file.DownloadLink, file.AccountId) } c.SaveTorrent(t) @@ -702,11 +701,11 @@ func (c *Cache) AddTorrent(t *types.Torrent) error { } -func (c *Cache) updateDownloadLink(link, downloadLink string, keyIndex int) { +func (c *Cache) updateDownloadLink(link, downloadLink string, accountId string) { c.downloadLinks.Store(link, downloadLinkCache{ Link: downloadLink, ExpiresAt: time.Now().Add(c.autoExpiresLinksAfter), - KeyIndex: keyIndex, + AccountId: accountId, }) } @@ -719,16 +718,17 @@ func (c *Cache) checkDownloadLink(link string) string { return "" } -func (c *Cache) RemoveDownloadLink(link string) { - c.downloadLinks.Delete(link) -} - -func (c *Cache) MarkDownloadLinkAsInvalid(downloadLink, reason string) { +func (c *Cache) MarkDownloadLinkAsInvalid(link, downloadLink, reason string) { c.invalidDownloadLinks.Store(downloadLink, reason) // Remove the download api key from active if reason == "bandwidth_exceeded" { - c.client.RemoveActiveDownloadKey() + if dl, ok := c.downloadLinks.Load(link); ok { + if dl.AccountId != "" && dl.Link == downloadLink { + c.client.DisableAccount(dl.AccountId) + } + } } + c.downloadLinks.Delete(link) // Remove the download link from cache } func (c *Cache) IsDownloadLinkInvalid(downloadLink string) bool { @@ -745,6 +745,8 @@ func (c *Cache) GetClient() types.Client { func (c *Cache) DeleteTorrent(id string) error { c.logger.Info().Msgf("Deleting torrent %s", id) + c.torrentsRefreshMu.Lock() + defer c.torrentsRefreshMu.Unlock() if t, ok := c.torrents.Load(id); ok { _ = c.client.DeleteTorrent(id) // SKip error handling, we don't care if it fails @@ -769,9 +771,21 @@ func (c *Cache) DeleteTorrents(ids []string) { } func (c *Cache) removeFromDB(torrentId string) { + // Moves the torrent file to the trash filePath := filepath.Join(c.dir, torrentId+".json") - if err := os.Remove(filePath); err != nil { - c.logger.Debug().Err(err).Msgf("Failed to remove file: %s", filePath) + + // Check if the file exists + if _, err := os.Stat(filePath); errors.Is(err, os.ErrNotExist) { + return + } + + // Move the file to the trash + trashPath := filepath.Join(c.dir, "trash", torrentId+".json") + if err := os.MkdirAll(filepath.Dir(trashPath), 0755); err != nil { + return + } + if err := os.Rename(filePath, trashPath); err != nil { + return } } diff --git a/pkg/debrid/debrid/refresh.go b/pkg/debrid/debrid/refresh.go index e7d78c1..539e59e 100644 --- a/pkg/debrid/debrid/refresh.go +++ b/pkg/debrid/debrid/refresh.go @@ -93,9 +93,9 @@ func (c *Cache) refreshTorrents() { // Get the newly added torrents only _newTorrents := make([]*types.Torrent, 0) - idStore := make(map[string]bool, len(debTorrents)) + idStore := make(map[string]struct{}, len(debTorrents)) for _, t := range debTorrents { - idStore[t.Id] = true + idStore[t.Id] = struct{}{} if _, ok := torrents[t.Id]; !ok { _newTorrents = append(_newTorrents, t) } @@ -244,6 +244,6 @@ func (c *Cache) refreshDownloadLinks() { } } - c.logger.Trace().Msgf("Refreshed %d download links", len(downloadLinks)) + c.logger.Debug().Msgf("Refreshed %d download links", len(downloadLinks)) } diff --git a/pkg/debrid/debrid/repair.go b/pkg/debrid/debrid/repair.go index d0f05a1..4832258 100644 --- a/pkg/debrid/debrid/repair.go +++ b/pkg/debrid/debrid/repair.go @@ -66,7 +66,7 @@ func (c *Cache) repairWorker() { } // Mark as in progress - c.repairsInProgress.Store(torrentId, true) + c.repairsInProgress.Store(torrentId, struct{}{}) c.logger.Debug().Str("torrentId", req.TorrentID).Msg("Received repair request") // Get the torrent from the cache diff --git a/pkg/debrid/debrid/worker.go b/pkg/debrid/debrid/worker.go index 71b8d90..b1d159a 100644 --- a/pkg/debrid/debrid/worker.go +++ b/pkg/debrid/debrid/worker.go @@ -59,7 +59,7 @@ func (c *Cache) resetInvalidLinksWorker() { timer := time.NewTimer(initialWait) defer timer.Stop() - c.logger.Debug().Msgf("Scheduled invalid links reset at %s (in %s)", next.Format("2006-01-02 15:04:05 MST"), initialWait) + c.logger.Debug().Msgf("Scheduled Links Reset at %s (in %s)", next.Format("2006-01-02 15:04:05 MST"), initialWait) // Wait for the first execution <-timer.C diff --git a/pkg/debrid/debrid_link/debrid_link.go b/pkg/debrid/debrid_link/debrid_link.go index a635a6b..378eddc 100644 --- a/pkg/debrid/debrid_link/debrid_link.go +++ b/pkg/debrid/debrid_link/debrid_link.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "github.com/goccy/go-json" + "github.com/puzpuzpuz/xsync/v3" "github.com/rs/zerolog" "github.com/sirrobot01/debrid-blackhole/internal/config" "github.com/sirrobot01/debrid-blackhole/internal/logger" @@ -11,6 +12,7 @@ import ( "github.com/sirrobot01/debrid-blackhole/internal/utils" "github.com/sirrobot01/debrid-blackhole/pkg/debrid/types" "slices" + "strconv" "time" "net/http" @@ -18,13 +20,12 @@ import ( ) type DebridLink struct { - Name string - Host string `json:"host"` - APIKey string - DownloadKeys []string - ActiveDownloadKeys []string - DownloadUncached bool - client *request.Client + Name string + Host string `json:"host"` + APIKey string + DownloadKeys *xsync.MapOf[string, types.Account] + DownloadUncached bool + client *request.Client MountPath string logger zerolog.Logger @@ -244,8 +245,8 @@ func (dl *DebridLink) GetDownloads() (map[string]types.DownloadLinks, error) { return nil, nil } -func (dl *DebridLink) GetDownloadLink(t *types.Torrent, file *types.File) (string, error) { - return file.DownloadLink, nil +func (dl *DebridLink) GetDownloadLink(t *types.Torrent, file *types.File) (string, string, error) { + return file.DownloadLink, "0", nil } func (dl *DebridLink) GetDownloadingStatus() []string { @@ -274,11 +275,21 @@ func New(dc config.Debrid) *DebridLink { request.WithRateLimiter(rl), request.WithProxy(dc.Proxy), ) + + accounts := xsync.NewMapOf[string, types.Account]() + for idx, key := range dc.DownloadAPIKeys { + id := strconv.Itoa(idx) + accounts.Store(id, types.Account{ + Name: key, + ID: id, + Token: key, + }) + } return &DebridLink{ Name: "debridlink", Host: dc.Host, APIKey: dc.APIKey, - DownloadKeys: dc.DownloadAPIKeys, + DownloadKeys: accounts, DownloadUncached: dc.DownloadUncached, client: client, MountPath: dc.Folder, @@ -369,9 +380,8 @@ func (dl *DebridLink) GetMountPath() string { return dl.MountPath } -func (dl *DebridLink) RemoveActiveDownloadKey() { +func (dl *DebridLink) DisableAccount(accountId string) { } func (dl *DebridLink) ResetActiveDownloadKeys() { - dl.ActiveDownloadKeys = dl.DownloadKeys } diff --git a/pkg/debrid/realdebrid/realdebrid.go b/pkg/debrid/realdebrid/realdebrid.go index 66276a1..73f249e 100644 --- a/pkg/debrid/realdebrid/realdebrid.go +++ b/pkg/debrid/realdebrid/realdebrid.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "github.com/goccy/go-json" + "github.com/puzpuzpuz/xsync/v3" "github.com/rs/zerolog" "github.com/sirrobot01/debrid-blackhole/internal/config" "github.com/sirrobot01/debrid-blackhole/internal/logger" @@ -15,6 +16,7 @@ import ( gourl "net/url" "path/filepath" "slices" + "sort" "strconv" "strings" "sync" @@ -25,13 +27,12 @@ type RealDebrid struct { Name string Host string `json:"host"` - APIKey string - DownloadKeys []string // This is used for bandwidth - ActiveDownloadKeys []string // This is used for active downloads api keys - downloadKeysMux sync.Mutex + APIKey string + DownloadKeys *xsync.MapOf[string, types.Account] // index | Account DownloadUncached bool client *request.Client + downloadClient *request.Client MountPath string logger zerolog.Logger @@ -45,8 +46,24 @@ func New(dc config.Debrid) *RealDebrid { "Authorization": fmt.Sprintf("Bearer %s", dc.APIKey), } _log := logger.New(dc.Name) - client := request.New( - request.WithHeaders(headers), + + accounts := xsync.NewMapOf[string, types.Account]() + firstDownloadKey := dc.DownloadAPIKeys[0] + for idx, key := range dc.DownloadAPIKeys { + id := strconv.Itoa(idx) + accounts.Store(id, types.Account{ + Name: key, + ID: id, + Token: key, + }) + } + + downloadHeaders := map[string]string{ + "Authorization": fmt.Sprintf("Bearer %s", firstDownloadKey), + } + + downloadClient := request.New( + request.WithHeaders(downloadHeaders), request.WithRateLimiter(rl), request.WithLogger(_log), request.WithMaxRetries(5), @@ -54,17 +71,27 @@ func New(dc config.Debrid) *RealDebrid { request.WithProxy(dc.Proxy), request.WithStatusCooldown(447, 10*time.Second), // 447 is a fair use error ) + + client := request.New( + request.WithHeaders(headers), + request.WithRateLimiter(rl), + request.WithLogger(_log), + request.WithMaxRetries(5), + request.WithRetryableStatus(429), + request.WithProxy(dc.Proxy), + ) + return &RealDebrid{ - Name: "realdebrid", - Host: dc.Host, - APIKey: dc.APIKey, - DownloadKeys: dc.DownloadAPIKeys, - ActiveDownloadKeys: dc.DownloadAPIKeys, - DownloadUncached: dc.DownloadUncached, - client: client, - MountPath: dc.Folder, - logger: logger.New(dc.Name), - CheckCached: dc.CheckCached, + Name: "realdebrid", + Host: dc.Host, + APIKey: dc.APIKey, + DownloadKeys: accounts, + DownloadUncached: dc.DownloadUncached, + client: client, + downloadClient: downloadClient, + MountPath: dc.Folder, + logger: logger.New(dc.Name), + CheckCached: dc.CheckCached, } } @@ -319,13 +346,14 @@ func (r *RealDebrid) GenerateDownloadLinks(t *types.Torrent) error { go func(file types.File) { defer wg.Done() - link, err := r.GetDownloadLink(t, &file) + link, accountId, err := r.GetDownloadLink(t, &file) if err != nil { errCh <- err return } file.DownloadLink = link + file.AccountId = accountId filesCh <- file }(f) } @@ -375,7 +403,7 @@ func (r *RealDebrid) _getDownloadLink(file *types.File) (string, error) { "link": {file.Link}, } req, _ := http.NewRequest(http.MethodPost, url, strings.NewReader(payload.Encode())) - resp, err := r.client.Do(req) + resp, err := r.downloadClient.Do(req) if err != nil { return "", err } @@ -398,7 +426,9 @@ func (r *RealDebrid) _getDownloadLink(file *types.File) (string, error) { case 19: return "", request.HosterUnavailableError // File has been removed case 36: - return "", request.TrafficExceededError // File has been nerfed + return "", request.TrafficExceededError // traffic exceeded + case 34: + return "", request.TrafficExceededError // traffic exceeded default: return "", fmt.Errorf("realdebrid API error: Status: %d || Code: %d", resp.StatusCode, data.ErrorCode) } @@ -415,29 +445,47 @@ func (r *RealDebrid) _getDownloadLink(file *types.File) (string, error) { } -func (r *RealDebrid) GetDownloadLink(t *types.Torrent, file *types.File) (string, error) { - if len(r.ActiveDownloadKeys) < 1 { +func (r *RealDebrid) GetDownloadLink(t *types.Torrent, file *types.File) (string, string, error) { + defer r.downloadClient.SetHeader("Authorization", fmt.Sprintf("Bearer %s", r.APIKey)) + var ( + downloadLink string + accountId string + err error + ) + accounts := r.getActiveAccounts() + if len(accounts) < 1 { // No active download keys. It's likely that the key has reached bandwidth limit - return "", fmt.Errorf("no active download keys") + return "", "", fmt.Errorf("no active download keys") } - defer r.client.SetHeader("Authorization", fmt.Sprintf("Bearer %s", r.APIKey)) - for idx := range r.ActiveDownloadKeys { - r.client.SetHeader("Authorization", fmt.Sprintf("Bearer %s", r.ActiveDownloadKeys[idx])) - link, err := r._getDownloadLink(file) - if err == nil && link != "" { - return link, nil - } + for _, account := range accounts { + r.downloadClient.SetHeader("Authorization", fmt.Sprintf("Bearer %s", account.Token)) + downloadLink, err = r._getDownloadLink(file) if err != nil { if errors.Is(err, request.TrafficExceededError) { - // Retry with the next API key continue - } else { - return "", err } + // If the error is not traffic exceeded, skip generating the link with a new key + return "", "", err + } else { + // If we successfully generated a link, break the loop + accountId = account.ID + file.AccountId = accountId + break } + } - // If we reach here, it means all API keys have been exhausted - return "", fmt.Errorf("all API keys have been exhausted") + if downloadLink != "" { + // If we successfully generated a link, return it + return downloadLink, accountId, nil + } + // If we reach here, it means all keys are disabled or traffic exceeded + if err != nil { + if errors.Is(err, request.TrafficExceededError) { + return "", "", request.TrafficExceededError + } + return "", "", fmt.Errorf("error generating download link: %v", err) + } + return "", "", fmt.Errorf("error generating download link: %v", err) } func (r *RealDebrid) GetCheckCached() bool { @@ -476,7 +524,7 @@ func (r *RealDebrid) getTorrents(offset int, limit int) (int, []*types.Torrent, if err = json.Unmarshal(body, &data); err != nil { return 0, nil, err } - filenames := map[string]bool{} + filenames := map[string]struct{}{} for _, t := range data { if t.Status != "downloaded" { continue @@ -499,6 +547,7 @@ func (r *RealDebrid) getTorrents(offset int, limit int) (int, []*types.Torrent, MountPath: r.MountPath, Added: t.Added.Format(time.RFC3339), }) + filenames[t.Filename] = struct{}{} } return totalItems, torrents, nil } @@ -546,13 +595,14 @@ func (r *RealDebrid) GetDownloads() (map[string]types.DownloadLinks, error) { links := make(map[string]types.DownloadLinks) offset := 0 limit := 1000 - if len(r.ActiveDownloadKeys) < 1 { + + accounts := r.getActiveAccounts() + + if len(accounts) < 1 { // No active download keys. It's likely that the key has reached bandwidth limit return nil, fmt.Errorf("no active download keys") } - r.client.SetHeader("Authorization", fmt.Sprintf("Bearer %s", r.ActiveDownloadKeys[0])) - // Reset to the API key - defer r.client.SetHeader("Authorization", fmt.Sprintf("Bearer %s", r.APIKey)) + r.downloadClient.SetHeader("Authorization", fmt.Sprintf("Bearer %s", accounts[0].Token)) for { dl, err := r._getDownloads(offset, limit) if err != nil { @@ -581,7 +631,7 @@ func (r *RealDebrid) _getDownloads(offset int, limit int) ([]types.DownloadLinks url = fmt.Sprintf("%s&offset=%d", url, offset) } req, _ := http.NewRequest(http.MethodGet, url, nil) - resp, err := r.client.MakeRequest(req) + resp, err := r.downloadClient.MakeRequest(req) if err != nil { return nil, err } @@ -616,16 +666,33 @@ func (r *RealDebrid) GetMountPath() string { return r.MountPath } -func (r *RealDebrid) RemoveActiveDownloadKey() { - r.downloadKeysMux.Lock() - defer r.downloadKeysMux.Unlock() - if len(r.ActiveDownloadKeys) > 0 { - r.ActiveDownloadKeys = r.ActiveDownloadKeys[1:] +func (r *RealDebrid) DisableAccount(accountId string) { + if value, ok := r.DownloadKeys.Load(accountId); ok { + value.Disabled = true + r.DownloadKeys.Store(accountId, value) + r.logger.Info().Msgf("Disabled account Index: %s", value.ID) } } func (r *RealDebrid) ResetActiveDownloadKeys() { - r.downloadKeysMux.Lock() - defer r.downloadKeysMux.Unlock() - r.ActiveDownloadKeys = r.DownloadKeys + r.DownloadKeys.Range(func(key string, value types.Account) bool { + value.Disabled = false + r.DownloadKeys.Store(key, value) + return true + }) +} + +func (r *RealDebrid) getActiveAccounts() []types.Account { + accounts := make([]types.Account, 0) + r.DownloadKeys.Range(func(key string, value types.Account) bool { + if value.Disabled { + return true + } + accounts = append(accounts, value) + return true + }) + sort.Slice(accounts, func(i, j int) bool { + return accounts[i].ID < accounts[j].ID + }) + return accounts } diff --git a/pkg/debrid/torbox/torbox.go b/pkg/debrid/torbox/torbox.go index e938bb8..51959ef 100644 --- a/pkg/debrid/torbox/torbox.go +++ b/pkg/debrid/torbox/torbox.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "github.com/goccy/go-json" + "github.com/puzpuzpuz/xsync/v3" "github.com/rs/zerolog" "github.com/sirrobot01/debrid-blackhole/internal/config" "github.com/sirrobot01/debrid-blackhole/internal/logger" @@ -22,13 +23,12 @@ import ( ) type Torbox struct { - Name string - Host string `json:"host"` - APIKey string - DownloadKeys []string - ActiveDownloadKeys []string - DownloadUncached bool - client *request.Client + Name string + Host string `json:"host"` + APIKey string + DownloadKeys *xsync.MapOf[string, types.Account] + DownloadUncached bool + client *request.Client MountPath string logger zerolog.Logger @@ -49,11 +49,21 @@ func New(dc config.Debrid) *Torbox { request.WithProxy(dc.Proxy), ) + accounts := xsync.NewMapOf[string, types.Account]() + for idx, key := range dc.DownloadAPIKeys { + id := strconv.Itoa(idx) + accounts.Store(id, types.Account{ + Name: key, + ID: id, + Token: key, + }) + } + return &Torbox{ Name: "torbox", Host: dc.Host, APIKey: dc.APIKey, - DownloadKeys: dc.DownloadAPIKeys, + DownloadKeys: accounts, DownloadUncached: dc.DownloadUncached, client: client, MountPath: dc.Folder, @@ -281,12 +291,13 @@ func (tb *Torbox) GenerateDownloadLinks(t *types.Torrent) error { for _, file := range t.Files { go func() { defer wg.Done() - link, err := tb.GetDownloadLink(t, &file) + link, accountId, err := tb.GetDownloadLink(t, &file) if err != nil { errCh <- err return } file.DownloadLink = link + file.AccountId = accountId filesCh <- file }() } @@ -313,7 +324,7 @@ func (tb *Torbox) GenerateDownloadLinks(t *types.Torrent) error { return nil } -func (tb *Torbox) GetDownloadLink(t *types.Torrent, file *types.File) (string, error) { +func (tb *Torbox) GetDownloadLink(t *types.Torrent, file *types.File) (string, string, error) { url := fmt.Sprintf("%s/api/torrents/requestdl/", tb.Host) query := gourl.Values{} query.Add("torrent_id", t.Id) @@ -323,17 +334,17 @@ func (tb *Torbox) GetDownloadLink(t *types.Torrent, file *types.File) (string, e req, _ := http.NewRequest(http.MethodGet, url, nil) resp, err := tb.client.MakeRequest(req) if err != nil { - return "", err + return "", "", err } var data DownloadLinksResponse if err = json.Unmarshal(resp, &data); err != nil { - return "", err + return "", "", err } if data.Data == nil { - return "", fmt.Errorf("error getting download links") + return "", "", fmt.Errorf("error getting download links") } link := *data.Data - return link, nil + return link, "0", nil } func (tb *Torbox) GetDownloadingStatus() []string { @@ -364,9 +375,9 @@ func (tb *Torbox) GetMountPath() string { return tb.MountPath } -func (tb *Torbox) RemoveActiveDownloadKey() { +func (tb *Torbox) DisableAccount(accountId string) { } func (tb *Torbox) ResetActiveDownloadKeys() { - tb.ActiveDownloadKeys = tb.DownloadKeys + } diff --git a/pkg/debrid/types/client.go b/pkg/debrid/types/client.go index ebb38d1..ee96b5e 100644 --- a/pkg/debrid/types/client.go +++ b/pkg/debrid/types/client.go @@ -8,7 +8,7 @@ type Client interface { SubmitMagnet(tr *Torrent) (*Torrent, error) CheckStatus(tr *Torrent, isSymlink bool) (*Torrent, error) GenerateDownloadLinks(tr *Torrent) error - GetDownloadLink(tr *Torrent, file *File) (string, error) + GetDownloadLink(tr *Torrent, file *File) (string, string, error) DeleteTorrent(torrentId string) error IsAvailable(infohashes []string) map[string]bool GetCheckCached() bool @@ -21,6 +21,6 @@ type Client interface { GetDownloads() (map[string]DownloadLinks, error) CheckLink(link string) error GetMountPath() string - RemoveActiveDownloadKey() + DisableAccount(string) ResetActiveDownloadKeys() } diff --git a/pkg/debrid/types/torrent.go b/pkg/debrid/types/torrent.go index ab2026b..2c28bc0 100644 --- a/pkg/debrid/types/torrent.go +++ b/pkg/debrid/types/torrent.go @@ -78,6 +78,7 @@ type File struct { Path string `json:"path"` Link string `json:"link"` DownloadLink string `json:"download_link"` + AccountId string `json:"account_id"` Generated time.Time `json:"generated"` } @@ -118,3 +119,10 @@ func (t *Torrent) GetFile(id string) *File { } return nil } + +type Account struct { + ID string `json:"id"` + Disabled bool `json:"disabled"` + Name string `json:"name"` + Token string `json:"token"` +} diff --git a/pkg/webdav/file.go b/pkg/webdav/file.go index 003070c..42bbbcf 100644 --- a/pkg/webdav/file.go +++ b/pkg/webdav/file.go @@ -82,7 +82,7 @@ func (f *File) stream() (*http.Response, error) { downloadLink = f.getDownloadLink() // Uses the first API key if downloadLink == "" { - _log.Error().Msgf("Failed to get download link for %s", f.name) + _log.Error().Msgf("Failed to get download link for %s. Empty download link", f.name) return nil, fmt.Errorf("failed to get download link") } @@ -100,6 +100,7 @@ func (f *File) stream() (*http.Response, error) { if err != nil { return resp, fmt.Errorf("HTTP request error: %w", err) } + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { closeResp := func() { @@ -110,7 +111,7 @@ func (f *File) stream() (*http.Response, error) { if resp.StatusCode == http.StatusServiceUnavailable { closeResp() // Read the body to consume the response - f.cache.MarkDownloadLinkAsInvalid(downloadLink, "bandwidth_exceeded") + f.cache.MarkDownloadLinkAsInvalid(f.link, downloadLink, "bandwidth_exceeded") // Retry with a different API key if it's available return f.stream() @@ -118,8 +119,7 @@ func (f *File) stream() (*http.Response, error) { closeResp() // Mark download link as not found // Regenerate a new download link - f.cache.MarkDownloadLinkAsInvalid(downloadLink, "link_not_found") - f.cache.RemoveDownloadLink(f.link) // Remove the link from the cache + f.cache.MarkDownloadLinkAsInvalid(f.link, downloadLink, "link_not_found") // Generate a new download link downloadLink = f.getDownloadLink() if downloadLink == "" { @@ -141,7 +141,7 @@ func (f *File) stream() (*http.Response, error) { if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { closeResp() // Read the body to consume the response - f.cache.MarkDownloadLinkAsInvalid(downloadLink, "link_not_found") + f.cache.MarkDownloadLinkAsInvalid(f.link, downloadLink, "link_not_found") return resp, fmt.Errorf("link not found") } return resp, nil