Rewrote account switching, fix some minor bugs here and there

This commit is contained in:
Mukhtar Akere
2025-09-16 21:15:24 +01:00
parent 76f5b85313
commit 30b2db06e7
34 changed files with 945 additions and 866 deletions

View File

@@ -7,8 +7,6 @@ import (
"crypto/tls"
"errors"
"fmt"
"github.com/sirrobot01/decypharr/internal/request"
"github.com/sirrobot01/decypharr/pkg/rclone"
"net/http"
"os"
"path"
@@ -20,6 +18,10 @@ import (
"sync/atomic"
"time"
"github.com/puzpuzpuz/xsync/v4"
"github.com/sirrobot01/decypharr/pkg/debrid/common"
"github.com/sirrobot01/decypharr/pkg/rclone"
"github.com/sirrobot01/decypharr/pkg/debrid/types"
"encoding/json"
@@ -43,20 +45,6 @@ const (
WebdavUseHash WebDavFolderNaming = "infohash"
)
var streamingTransport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
MaxIdleConns: 200,
MaxIdleConnsPerHost: 100,
MaxConnsPerHost: 200,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 60 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DisableKeepAlives: true,
ForceAttemptHTTP2: false,
TLSNextProto: make(map[string]func(string, *tls.Conn) http.RoundTripper),
}
type CachedTorrent struct {
*types.Torrent
AddedOn time.Time `json:"added_on"`
@@ -89,17 +77,17 @@ type RepairRequest struct {
type Cache struct {
dir string
client types.Client
client common.Client
logger zerolog.Logger
torrents *torrentCache
invalidDownloadLinks sync.Map
folderNaming WebDavFolderNaming
torrents *torrentCache
folderNaming WebDavFolderNaming
listingDebouncer *utils.Debouncer[bool]
// monitors
repairRequest sync.Map
failedToReinsert sync.Map
invalidDownloadLinks *xsync.Map[string, string]
repairRequest *xsync.Map[string, *reInsertRequest]
failedToReinsert *xsync.Map[string, struct{}]
// repair
repairChan chan RepairRequest
@@ -127,7 +115,7 @@ type Cache struct {
httpClient *http.Client
}
func NewDebridCache(dc config.Debrid, client types.Client, mounter *rclone.Mount) *Cache {
func NewDebridCache(dc config.Debrid, client common.Client, mounter *rclone.Mount) *Cache {
cfg := config.Get()
cet, err := time.LoadLocation("CET")
if err != nil {
@@ -170,11 +158,13 @@ func NewDebridCache(dc config.Debrid, client types.Client, mounter *rclone.Mount
}
_log := logger.New(fmt.Sprintf("%s-webdav", client.Name()))
if dc.Proxy != "" {
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
MaxIdleConns: 10,
MaxIdleConnsPerHost: 2,
}
transport := streamingTransport
request.SetProxy(transport, dc.Proxy)
httpClient := &http.Client{
Transport: transport,
Timeout: 0,
@@ -198,8 +188,12 @@ func NewDebridCache(dc config.Debrid, client types.Client, mounter *rclone.Mount
customFolders: customFolders,
mounter: mounter,
ready: make(chan struct{}),
httpClient: httpClient,
ready: make(chan struct{}),
httpClient: httpClient,
invalidDownloadLinks: xsync.NewMap[string, string](),
repairRequest: xsync.NewMap[string, *reInsertRequest](),
failedToReinsert: xsync.NewMap[string, struct{}](),
repairChan: make(chan RepairRequest, 100), // Initialize the repair channel, max 100 requests buffered
}
c.listingDebouncer = utils.NewDebouncer[bool](100*time.Millisecond, func(refreshRclone bool) {
@@ -250,9 +244,9 @@ func (c *Cache) Reset() {
c.torrents.reset()
// 3. Clear any sync.Maps
c.invalidDownloadLinks = sync.Map{}
c.repairRequest = sync.Map{}
c.failedToReinsert = sync.Map{}
c.invalidDownloadLinks = xsync.NewMap[string, string]()
c.repairRequest = xsync.NewMap[string, *reInsertRequest]()
c.failedToReinsert = xsync.NewMap[string, struct{}]()
// 5. Rebuild the listing debouncer
c.listingDebouncer = utils.NewDebouncer[bool](
@@ -285,7 +279,6 @@ func (c *Cache) Start(ctx context.Context) error {
// initial download links
go c.refreshDownloadLinks(ctx)
c.repairChan = make(chan RepairRequest, 100) // Initialize the repair channel, max 100 requests buffered
go c.repairWorker(ctx)
cfg := config.Get()
@@ -777,7 +770,7 @@ func (c *Cache) Add(t *types.Torrent) error {
}
func (c *Cache) Client() types.Client {
func (c *Cache) Client() common.Client {
return c.client
}

View File

@@ -3,6 +3,7 @@ package store
import (
"errors"
"fmt"
"github.com/sirrobot01/decypharr/internal/utils"
"github.com/sirrobot01/decypharr/pkg/debrid/types"
)
@@ -30,43 +31,44 @@ func (r *downloadLinkRequest) Wait() (string, error) {
return r.result, r.err
}
func (c *Cache) GetDownloadLink(torrentName, filename, fileLink string) (string, error) {
func (c *Cache) GetDownloadLink(torrentName, filename, fileLink string) (types.DownloadLink, error) {
// Check link cache
if dl, err := c.checkDownloadLink(fileLink); dl != "" && err == nil {
if dl, err := c.checkDownloadLink(fileLink); err == nil && !dl.Empty() {
return dl, nil
}
dl, err := c.fetchDownloadLink(torrentName, filename, fileLink)
if err != nil {
return "", err
return types.DownloadLink{}, err
}
if dl == nil || dl.DownloadLink == "" {
if dl.Empty() {
err = fmt.Errorf("download link is empty for %s in torrent %s", filename, torrentName)
return "", err
return types.DownloadLink{}, err
}
return dl.DownloadLink, err
return dl, err
}
func (c *Cache) fetchDownloadLink(torrentName, filename, fileLink string) (*types.DownloadLink, error) {
func (c *Cache) fetchDownloadLink(torrentName, filename, fileLink string) (types.DownloadLink, error) {
emptyDownloadLink := types.DownloadLink{}
ct := c.GetTorrentByName(torrentName)
if ct == nil {
return nil, fmt.Errorf("torrent not found")
return emptyDownloadLink, fmt.Errorf("torrent not found")
}
file, ok := ct.GetFile(filename)
if !ok {
return nil, fmt.Errorf("file %s not found in torrent %s", filename, torrentName)
return emptyDownloadLink, fmt.Errorf("file %s not found in torrent %s", filename, torrentName)
}
if file.Link == "" {
// file link is empty, refresh the torrent to get restricted links
ct = c.refreshTorrent(file.TorrentId) // Refresh the torrent from the debrid
if ct == nil {
return nil, fmt.Errorf("failed to refresh torrent")
return emptyDownloadLink, fmt.Errorf("failed to refresh torrent")
} else {
file, ok = ct.GetFile(filename)
if !ok {
return nil, fmt.Errorf("file %s not found in refreshed torrent %s", filename, torrentName)
return emptyDownloadLink, fmt.Errorf("file %s not found in refreshed torrent %s", filename, torrentName)
}
}
}
@@ -76,56 +78,53 @@ func (c *Cache) fetchDownloadLink(torrentName, filename, fileLink string) (*type
// Try to reinsert the torrent?
newCt, err := c.reInsertTorrent(ct)
if err != nil {
return nil, fmt.Errorf("failed to reinsert torrent. %w", err)
return emptyDownloadLink, fmt.Errorf("failed to reinsert torrent. %w", err)
}
ct = newCt
file, ok = ct.GetFile(filename)
if !ok {
return nil, fmt.Errorf("file %s not found in reinserted torrent %s", filename, torrentName)
return emptyDownloadLink, fmt.Errorf("file %s not found in reinserted torrent %s", filename, torrentName)
}
}
c.logger.Trace().Msgf("Getting download link for %s(%s)", filename, file.Link)
downloadLink, account, err := c.client.GetDownloadLink(ct.Torrent, &file)
downloadLink, err := c.client.GetDownloadLink(ct.Torrent, &file)
if err != nil {
if errors.Is(err, utils.HosterUnavailableError) {
c.logger.Trace().
Str("account", account.Username).
Str("token", utils.Mask(downloadLink.Token)).
Str("filename", filename).
Str("torrent_id", ct.Id).
Msg("Hoster unavailable, attempting to reinsert torrent")
newCt, err := c.reInsertTorrent(ct)
if err != nil {
return nil, fmt.Errorf("failed to reinsert torrent: %w", err)
return emptyDownloadLink, fmt.Errorf("failed to reinsert torrent: %w", err)
}
ct = newCt
file, ok = ct.GetFile(filename)
if !ok {
return nil, fmt.Errorf("file %s not found in reinserted torrent %s", filename, torrentName)
return emptyDownloadLink, fmt.Errorf("file %s not found in reinserted torrent %s", filename, torrentName)
}
// Retry getting the download link
downloadLink, account, err = c.client.GetDownloadLink(ct.Torrent, &file)
downloadLink, err = c.client.GetDownloadLink(ct.Torrent, &file)
if err != nil {
return nil, fmt.Errorf("retry failed to get download link: %w", err)
return emptyDownloadLink, fmt.Errorf("retry failed to get download link: %w", err)
}
if downloadLink == nil {
return nil, fmt.Errorf("download link is empty after retry")
if downloadLink.Empty() {
return emptyDownloadLink, fmt.Errorf("download link is empty after retry")
}
return nil, nil
return emptyDownloadLink, fmt.Errorf("download link is empty after retry")
} else if errors.Is(err, utils.TrafficExceededError) {
// This is likely a fair usage limit error
return nil, err
return emptyDownloadLink, err
} else {
return nil, fmt.Errorf("failed to get download link: %w", err)
return emptyDownloadLink, fmt.Errorf("failed to get download link: %w", err)
}
}
if downloadLink == nil {
return nil, fmt.Errorf("download link is empty")
if downloadLink.Empty() {
return emptyDownloadLink, fmt.Errorf("download link is empty")
}
// Set link to cache
go c.client.Accounts().SetDownloadLink(account, downloadLink)
return downloadLink, nil
}
@@ -136,28 +135,33 @@ func (c *Cache) GetFileDownloadLinks(t CachedTorrent) {
}
}
func (c *Cache) checkDownloadLink(link string) (string, error) {
dl, _, err := c.client.Accounts().GetDownloadLink(link)
func (c *Cache) checkDownloadLink(link string) (types.DownloadLink, error) {
dl, err := c.client.AccountManager().GetDownloadLink(link)
if err != nil {
return "", err
return dl, err
}
if !c.downloadLinkIsInvalid(dl.DownloadLink) {
return dl.DownloadLink, nil
return dl, nil
}
return "", fmt.Errorf("download link not found for %s", link)
return types.DownloadLink{}, fmt.Errorf("download link not found for %s", link)
}
func (c *Cache) MarkDownloadLinkAsInvalid(link, downloadLink, reason string) {
c.invalidDownloadLinks.Store(downloadLink, reason)
func (c *Cache) MarkDownloadLinkAsInvalid(downloadLink types.DownloadLink, reason string) {
c.invalidDownloadLinks.Store(downloadLink.DownloadLink, reason)
// Remove the download api key from active
if reason == "bandwidth_exceeded" {
// Disable the account
account, err := c.client.Accounts().GetAccountFromLink(link)
accountManager := c.client.AccountManager()
account, err := accountManager.GetAccount(downloadLink.Token)
if err != nil {
c.logger.Error().Err(err).Str("token", utils.Mask(downloadLink.Token)).Msg("Failed to get account to disable")
return
}
c.client.Accounts().Disable(account)
if account == nil {
c.logger.Error().Str("token", utils.Mask(downloadLink.Token)).Msg("Account not found to disable")
return
}
accountManager.Disable(account)
}
}
@@ -179,5 +183,10 @@ func (c *Cache) GetDownloadByteRange(torrentName, filename string) (*[2]int64, e
}
func (c *Cache) GetTotalActiveDownloadLinks() int {
return c.client.Accounts().GetLinksCount()
total := 0
allAccounts := c.client.AccountManager().Active()
for _, acc := range allAccounts {
total += acc.DownloadLinksCount()
}
return total
}

View File

@@ -249,5 +249,5 @@ func (c *Cache) refreshDownloadLinks(ctx context.Context) {
return
}
c.logger.Debug().Msgf("Refreshed download %d links", c.client.Accounts().GetLinksCount())
c.logger.Debug().Msgf("Refreshed download links")
}

View File

@@ -4,11 +4,13 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/puzpuzpuz/xsync/v4"
"github.com/sirrobot01/decypharr/internal/config"
"github.com/sirrobot01/decypharr/internal/utils"
"github.com/sirrobot01/decypharr/pkg/debrid/types"
"sync"
"time"
)
type reInsertRequest struct {
@@ -219,8 +221,7 @@ func (c *Cache) reInsertTorrent(ct *CachedTorrent) (*CachedTorrent, error) {
if _, ok := c.failedToReinsert.Load(oldID); ok {
return ct, fmt.Errorf("can't retry re-insert for %s", torrent.Id)
}
if reqI, inFlight := c.repairRequest.Load(oldID); inFlight {
req := reqI.(*reInsertRequest)
if req, inFlight := c.repairRequest.Load(oldID); inFlight {
c.logger.Debug().Msgf("Waiting for existing reinsert request to complete for torrent %s", oldID)
return req.Wait()
}
@@ -305,9 +306,8 @@ func (c *Cache) reInsertTorrent(ct *CachedTorrent) (*CachedTorrent, error) {
func (c *Cache) resetInvalidLinks(ctx context.Context) {
c.logger.Debug().Msgf("Resetting accounts")
c.invalidDownloadLinks = sync.Map{}
c.client.Accounts().Reset() // Reset the active download keys
c.invalidDownloadLinks = xsync.NewMap[string, string]()
c.client.AccountManager().Reset() // Reset the active download keys
// Refresh the download links
c.refreshDownloadLinks(ctx)
}