From 5bf1dab5e62ddef3717e92e898f6edfc6fac5912 Mon Sep 17 00:00:00 2001 From: Mukhtar Akere <32229538+sirrobot01@users.noreply.github.com> Date: Sat, 7 Jun 2025 17:23:41 +0100 Subject: [PATCH] Torrent Queuing for Botched torrent (#83) * Implement a queue for handling failed torrent * Add checks for getting slots * Few other cleanups, change some function names --- cmd/decypharr/main.go | 8 +- internal/config/config.go | 5 + .../{request/errors.go => utils/error.go} | 14 +- pkg/debrid/debrid.go | 166 ++++++++++-------- pkg/debrid/providers/alldebrid/alldebrid.go | 32 ++-- .../providers/debrid_link/debrid_link.go | 21 ++- pkg/debrid/providers/realdebrid/realdebrid.go | 110 ++++++++---- pkg/debrid/providers/realdebrid/types.go | 5 + pkg/debrid/providers/torbox/torbox.go | 25 +-- pkg/debrid/store/cache.go | 10 +- pkg/debrid/store/download_link.go | 7 +- pkg/debrid/store/misc.go | 2 +- pkg/debrid/store/repair.go | 3 +- pkg/debrid/types/client.go | 5 +- pkg/qbit/context.go | 2 +- pkg/qbit/qbit.go | 2 +- pkg/qbit/torrent.go | 4 +- pkg/repair/misc.go | 2 +- pkg/repair/repair.go | 6 +- pkg/server/debug.go | 18 +- pkg/server/webhook.go | 2 +- pkg/store/downloader.go | 10 +- pkg/store/request.go | 119 +++++++++++-- pkg/store/store.go | 20 ++- pkg/store/torrent.go | 143 ++++++++++++--- pkg/store/torrent_storage.go | 4 +- pkg/web/api.go | 42 ++--- pkg/web/web.go | 2 +- pkg/webdav/file.go | 2 +- pkg/webdav/webdav.go | 4 +- 30 files changed, 556 insertions(+), 239 deletions(-) rename internal/{request/errors.go => utils/error.go} (69%) diff --git a/cmd/decypharr/main.go b/cmd/decypharr/main.go index 4cbc57a..30d5f72 100644 --- a/cmd/decypharr/main.go +++ b/cmd/decypharr/main.go @@ -145,7 +145,7 @@ func startServices(ctx context.Context, wd *webdav.WebDav, srv *server.Server) e }) safeGo(func() error { - arr := store.GetStore().GetArr() + arr := store.Get().Arr() if arr == nil { return nil } @@ -154,7 +154,7 @@ func startServices(ctx context.Context, wd *webdav.WebDav, srv *server.Server) e if cfg := config.Get(); cfg.Repair.Enabled { safeGo(func() error { - repair := store.GetStore().GetRepair() + repair := store.Get().Repair() if repair != nil { if err := repair.Start(ctx); err != nil { _log.Error().Err(err).Msg("repair failed") @@ -164,6 +164,10 @@ func startServices(ctx context.Context, wd *webdav.WebDav, srv *server.Server) e }) } + safeGo(func() error { + return store.Get().StartQueueSchedule(ctx) + }) + go func() { wg.Wait() close(errChan) diff --git a/internal/config/config.go b/internal/config/config.go index 526d519..dbe079e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -29,6 +29,7 @@ type Debrid struct { Proxy string `json:"proxy,omitempty"` UnpackRar bool `json:"unpack_rar,omitempty"` AddSamples bool `json:"add_samples,omitempty"` + MinimumFreeSlot int `json:"minimum_free_slot,omitempty"` // Minimum active pots to use this debrid UseWebDav bool `json:"use_webdav,omitempty"` WebDav @@ -384,3 +385,7 @@ func Reload() { instance = nil once = sync.Once{} } + +func DefaultFreeSlot() int { + return 10 +} diff --git a/internal/request/errors.go b/internal/utils/error.go similarity index 69% rename from internal/request/errors.go rename to internal/utils/error.go index be40eb0..840a13c 100644 --- a/internal/request/errors.go +++ b/internal/utils/error.go @@ -1,4 +1,6 @@ -package request +package utils + +import "errors" type HTTPError struct { StatusCode int @@ -33,3 +35,13 @@ var TorrentNotFoundError = &HTTPError{ Message: "Torrent not found", Code: "torrent_not_found", } + +var TooManyActiveDownloadsError = &HTTPError{ + StatusCode: 509, + Message: "Too many active downloads", + Code: "too_many_active_downloads", +} + +func IsTooManyActiveDownloadsError(err error) bool { + return errors.As(err, &TooManyActiveDownloadsError) +} diff --git a/pkg/debrid/debrid.go b/pkg/debrid/debrid.go index 0506181..98d3ae5 100644 --- a/pkg/debrid/debrid.go +++ b/pkg/debrid/debrid.go @@ -2,6 +2,7 @@ package debrid import ( "context" + "errors" "fmt" "github.com/sirrobot01/decypharr/internal/config" "github.com/sirrobot01/decypharr/internal/logger" @@ -13,25 +14,34 @@ import ( "github.com/sirrobot01/decypharr/pkg/debrid/providers/torbox" "github.com/sirrobot01/decypharr/pkg/debrid/store" "github.com/sirrobot01/decypharr/pkg/debrid/types" - "strings" "sync" ) +type Debrid struct { + cache *store.Cache // Could be nil if not using WebDAV + client types.Client // HTTP client for making requests to the debrid service +} + +func (de *Debrid) Client() types.Client { + return de.client +} + +func (de *Debrid) Cache() *store.Cache { + return de.cache +} + type Storage struct { - clients map[string]types.Client - clientsLock sync.Mutex - caches map[string]*store.Cache - cachesLock sync.Mutex - LastUsed string + debrids map[string]*Debrid + mu sync.RWMutex + lastUsed string } func NewStorage() *Storage { cfg := config.Get() - clients := make(map[string]types.Client) _logger := logger.Default() - caches := make(map[string]*store.Cache) + debrids := make(map[string]*Debrid) for _, dc := range cfg.Debrids { client, err := createDebridClient(dc) @@ -39,89 +49,100 @@ func NewStorage() *Storage { _logger.Error().Err(err).Str("Debrid", dc.Name).Msg("failed to connect to debrid client") continue } - _log := client.GetLogger() + var cache *store.Cache + _log := client.Logger() if dc.UseWebDav { - caches[dc.Name] = store.NewDebridCache(dc, client) + cache = store.NewDebridCache(dc, client) _log.Info().Msg("Debrid Service started with WebDAV") } else { _log.Info().Msg("Debrid Service started") } - clients[dc.Name] = client + debrids[dc.Name] = &Debrid{ + cache: cache, + client: client, + } } d := &Storage{ - clients: clients, - LastUsed: "", - caches: caches, + debrids: debrids, + lastUsed: "", } return d } -func (d *Storage) GetClient(name string) types.Client { - d.clientsLock.Lock() - defer d.clientsLock.Unlock() - client, exists := d.clients[name] - if !exists { - return nil +func (d *Storage) Debrid(name string) *Debrid { + d.mu.RLock() + defer d.mu.RUnlock() + if debrid, exists := d.debrids[name]; exists { + return debrid } - return client + return nil +} + +func (d *Storage) Debrids() map[string]*Debrid { + d.mu.RLock() + defer d.mu.RUnlock() + debridsCopy := make(map[string]*Debrid) + for name, debrid := range d.debrids { + if debrid != nil { + debridsCopy[name] = debrid + } + } + return debridsCopy +} + +func (d *Storage) Client(name string) types.Client { + d.mu.RLock() + defer d.mu.RUnlock() + if client, exists := d.debrids[name]; exists { + return client.client + } + return nil } func (d *Storage) Reset() { - d.clientsLock.Lock() - d.clients = make(map[string]types.Client) - d.clientsLock.Unlock() - - d.cachesLock.Lock() - d.caches = make(map[string]*store.Cache) - d.cachesLock.Unlock() - d.LastUsed = "" + d.mu.Lock() + d.debrids = make(map[string]*Debrid) + d.mu.Unlock() + d.lastUsed = "" } -func (d *Storage) GetClients() map[string]types.Client { - d.clientsLock.Lock() - defer d.clientsLock.Unlock() +func (d *Storage) Clients() map[string]types.Client { + d.mu.RLock() + defer d.mu.RUnlock() clientsCopy := make(map[string]types.Client) - for name, client := range d.clients { - clientsCopy[name] = client + for name, debrid := range d.debrids { + if debrid != nil && debrid.client != nil { + clientsCopy[name] = debrid.client + } } return clientsCopy } -func (d *Storage) GetCaches() map[string]*store.Cache { - d.clientsLock.Lock() - defer d.clientsLock.Unlock() +func (d *Storage) Caches() map[string]*store.Cache { + d.mu.RLock() + defer d.mu.RUnlock() cachesCopy := make(map[string]*store.Cache) - for name, cache := range d.caches { - cachesCopy[name] = cache + for name, debrid := range d.debrids { + if debrid != nil && debrid.cache != nil { + cachesCopy[name] = debrid.cache + } } return cachesCopy } func (d *Storage) FilterClients(filter func(types.Client) bool) map[string]types.Client { - d.clientsLock.Lock() - defer d.clientsLock.Unlock() + d.mu.Lock() + defer d.mu.Unlock() filteredClients := make(map[string]types.Client) - for name, client := range d.clients { - if filter(client) { - filteredClients[name] = client + for name, client := range d.debrids { + if client != nil && filter(client.client) { + filteredClients[name] = client.client } } return filteredClients } -func (d *Storage) FilterCaches(filter func(*store.Cache) bool) map[string]*store.Cache { - d.cachesLock.Lock() - defer d.cachesLock.Unlock() - filteredCaches := make(map[string]*store.Cache) - for name, cache := range d.caches { - if filter(cache) { - filteredCaches[name] = cache - } - } - return filteredCaches -} - func createDebridClient(dc config.Debrid) (types.Client, error) { switch dc.Name { case "realdebrid": @@ -137,7 +158,7 @@ func createDebridClient(dc config.Debrid) (types.Client, error) { } } -func ProcessTorrent(ctx context.Context, store *Storage, selectedDebrid string, magnet *utils.Magnet, a *arr.Arr, isSymlink, overrideDownloadUncached bool) (*types.Torrent, error) { +func Process(ctx context.Context, store *Storage, selectedDebrid string, magnet *utils.Magnet, a *arr.Arr, isSymlink, overrideDownloadUncached bool) (*types.Torrent, error) { debridTorrent := &types.Torrent{ InfoHash: magnet.InfoHash, @@ -149,7 +170,7 @@ func ProcessTorrent(ctx context.Context, store *Storage, selectedDebrid string, } clients := store.FilterClients(func(c types.Client) bool { - if selectedDebrid != "" && c.GetName() != selectedDebrid { + if selectedDebrid != "" && c.Name() != selectedDebrid { return false } return true @@ -173,9 +194,9 @@ func ProcessTorrent(ctx context.Context, store *Storage, selectedDebrid string, } for index, db := range clients { - _logger := db.GetLogger() + _logger := db.Logger() _logger.Info(). - Str("Debrid", db.GetName()). + Str("Debrid", db.Name()). Str("Arr", a.Name). Str("Hash", debridTorrent.InfoHash). Str("Name", debridTorrent.Name). @@ -191,8 +212,8 @@ func ProcessTorrent(ctx context.Context, store *Storage, selectedDebrid string, continue } dbt.Arr = a - _logger.Info().Str("id", dbt.Id).Msgf("Torrent: %s submitted to %s", dbt.Name, db.GetName()) - store.LastUsed = index + _logger.Info().Str("id", dbt.Id).Msgf("Torrent: %s submitted to %s", dbt.Name, db.Name()) + store.lastUsed = index torrent, err := db.CheckStatus(dbt, isSymlink) if err != nil && torrent != nil && torrent.Id != "" { @@ -201,18 +222,19 @@ func ProcessTorrent(ctx context.Context, store *Storage, selectedDebrid string, _ = db.DeleteTorrent(id) }(torrent.Id) } - return torrent, err + if err != nil { + errs = append(errs, err) + continue + } + if torrent == nil { + errs = append(errs, fmt.Errorf("torrent %s returned nil after checking status", dbt.Name)) + continue + } + return torrent, nil } if len(errs) == 0 { return nil, fmt.Errorf("failed to process torrent: no clients available") } - if len(errs) == 1 { - return nil, fmt.Errorf("failed to process torrent: %w", errs[0]) - } else { - errStrings := make([]string, 0, len(errs)) - for _, err := range errs { - errStrings = append(errStrings, err.Error()) - } - return nil, fmt.Errorf("failed to process torrent: %s", strings.Join(errStrings, ", ")) - } + joinedErrors := errors.Join(errs...) + return nil, fmt.Errorf("failed to process torrent: %w", joinedErrors) } diff --git a/pkg/debrid/providers/alldebrid/alldebrid.go b/pkg/debrid/providers/alldebrid/alldebrid.go index e870d68..a8abaa6 100644 --- a/pkg/debrid/providers/alldebrid/alldebrid.go +++ b/pkg/debrid/providers/alldebrid/alldebrid.go @@ -18,17 +18,18 @@ import ( ) type AllDebrid struct { - Name string + name string Host string `json:"host"` APIKey string accounts map[string]types.Account DownloadUncached bool client *request.Client - MountPath string - logger zerolog.Logger - checkCached bool - addSamples bool + MountPath string + logger zerolog.Logger + checkCached bool + addSamples bool + minimumFreeSlot int } func (ad *AllDebrid) GetProfile() (*types.Profile, error) { @@ -59,7 +60,7 @@ func New(dc config.Debrid) (*AllDebrid, error) { } } return &AllDebrid{ - Name: "alldebrid", + name: "alldebrid", Host: "http://api.alldebrid.com/v4.1", APIKey: dc.APIKey, accounts: accounts, @@ -69,14 +70,15 @@ func New(dc config.Debrid) (*AllDebrid, error) { logger: logger.New(dc.Name), checkCached: dc.CheckCached, addSamples: dc.AddSamples, + minimumFreeSlot: dc.MinimumFreeSlot, }, nil } -func (ad *AllDebrid) GetName() string { - return ad.Name +func (ad *AllDebrid) Name() string { + return ad.name } -func (ad *AllDebrid) GetLogger() zerolog.Logger { +func (ad *AllDebrid) Logger() zerolog.Logger { return ad.logger } @@ -204,7 +206,7 @@ func (ad *AllDebrid) GetTorrent(torrentId string) (*types.Torrent, error) { OriginalFilename: name, Files: make(map[string]types.File), InfoHash: data.Hash, - Debrid: ad.Name, + Debrid: ad.name, MountPath: ad.MountPath, Added: time.Unix(data.CompletionDate, 0).Format(time.RFC3339), } @@ -244,7 +246,7 @@ func (ad *AllDebrid) UpdateTorrent(t *types.Torrent) error { t.OriginalFilename = name t.Folder = name t.MountPath = ad.MountPath - t.Debrid = ad.Name + t.Debrid = ad.name t.Bytes = data.Size t.Seeders = data.Seeders t.Added = time.Unix(data.CompletionDate, 0).Format(time.RFC3339) @@ -406,7 +408,7 @@ func (ad *AllDebrid) GetTorrents() ([]*types.Torrent, error) { OriginalFilename: magnet.Filename, Files: make(map[string]types.File), InfoHash: magnet.Hash, - Debrid: ad.Name, + Debrid: ad.name, MountPath: ad.MountPath, Added: time.Unix(magnet.CompletionDate, 0).Format(time.RFC3339), }) @@ -444,3 +446,9 @@ func (ad *AllDebrid) ResetActiveDownloadKeys() { func (ad *AllDebrid) DeleteDownloadLink(linkId string) error { return nil } + +func (ad *AllDebrid) GetAvailableSlots() (int, error) { + // This function is a placeholder for AllDebrid + //TODO: Implement the logic to check available slots for AllDebrid + return 0, fmt.Errorf("GetAvailableSlots not implemented for AllDebrid") +} diff --git a/pkg/debrid/providers/debrid_link/debrid_link.go b/pkg/debrid/providers/debrid_link/debrid_link.go index e15a35c..ffa3311 100644 --- a/pkg/debrid/providers/debrid_link/debrid_link.go +++ b/pkg/debrid/providers/debrid_link/debrid_link.go @@ -18,7 +18,7 @@ import ( ) type DebridLink struct { - Name string + name string Host string `json:"host"` APIKey string accounts map[string]types.Account @@ -56,7 +56,7 @@ func New(dc config.Debrid) (*DebridLink, error) { } } return &DebridLink{ - Name: "debridlink", + name: "debridlink", Host: "https://debrid-link.com/api/v2", APIKey: dc.APIKey, accounts: accounts, @@ -73,11 +73,11 @@ func (dl *DebridLink) GetProfile() (*types.Profile, error) { return nil, nil } -func (dl *DebridLink) GetName() string { - return dl.Name +func (dl *DebridLink) Name() string { + return dl.name } -func (dl *DebridLink) GetLogger() zerolog.Logger { +func (dl *DebridLink) Logger() zerolog.Logger { return dl.logger } @@ -163,7 +163,7 @@ func (dl *DebridLink) GetTorrent(torrentId string) (*types.Torrent, error) { Filename: name, OriginalFilename: name, MountPath: dl.MountPath, - Debrid: dl.Name, + Debrid: dl.name, Added: time.Unix(t.Created, 0).Format(time.RFC3339), } cfg := config.Get() @@ -288,7 +288,7 @@ func (dl *DebridLink) SubmitMagnet(t *types.Torrent) (*types.Torrent, error) { t.Filename = name t.OriginalFilename = name t.MountPath = dl.MountPath - t.Debrid = dl.Name + t.Debrid = dl.name t.Added = time.Unix(data.Created, 0).Format(time.RFC3339) for _, f := range data.Files { file := types.File{ @@ -428,7 +428,7 @@ func (dl *DebridLink) getTorrents(page, perPage int) ([]*types.Torrent, error) { OriginalFilename: t.Name, InfoHash: t.HashString, Files: make(map[string]types.File), - Debrid: dl.Name, + Debrid: dl.name, MountPath: dl.MountPath, Added: time.Unix(t.Created, 0).Format(time.RFC3339), } @@ -476,3 +476,8 @@ func (dl *DebridLink) ResetActiveDownloadKeys() { func (dl *DebridLink) DeleteDownloadLink(linkId string) error { return nil } + +func (dl *DebridLink) GetAvailableSlots() (int, error) { + //TODO: Implement the logic to check available slots for DebridLink + return 0, fmt.Errorf("GetAvailableSlots not implemented for DebridLink") +} diff --git a/pkg/debrid/providers/realdebrid/realdebrid.go b/pkg/debrid/providers/realdebrid/realdebrid.go index 261188d..297a075 100644 --- a/pkg/debrid/providers/realdebrid/realdebrid.go +++ b/pkg/debrid/providers/realdebrid/realdebrid.go @@ -25,7 +25,7 @@ import ( ) type RealDebrid struct { - Name string + name string Host string `json:"host"` APIKey string @@ -41,10 +41,12 @@ type RealDebrid struct { logger zerolog.Logger UnpackRar bool - rarSemaphore chan struct{} - checkCached bool - addSamples bool - Profile *types.Profile + rarSemaphore chan struct{} + checkCached bool + addSamples bool + Profile *types.Profile + minimumFreeSlot int // Minimum number of active pots to maintain (used for cached stuffs, etc.) + } func New(dc config.Debrid) (*RealDebrid, error) { @@ -71,7 +73,7 @@ func New(dc config.Debrid) (*RealDebrid, error) { } r := &RealDebrid{ - Name: "realdebrid", + name: "realdebrid", Host: "https://api.real-debrid.com/rest/1.0", APIKey: dc.APIKey, accounts: accounts, @@ -98,6 +100,7 @@ func New(dc config.Debrid) (*RealDebrid, error) { rarSemaphore: make(chan struct{}, 2), checkCached: dc.CheckCached, addSamples: dc.AddSamples, + minimumFreeSlot: dc.MinimumFreeSlot, } if _, err := r.GetProfile(); err != nil { @@ -107,11 +110,11 @@ func New(dc config.Debrid) (*RealDebrid, error) { } } -func (r *RealDebrid) GetName() string { - return r.Name +func (r *RealDebrid) Name() string { + return r.name } -func (r *RealDebrid) GetLogger() zerolog.Logger { +func (r *RealDebrid) Logger() zerolog.Logger { return r.logger } @@ -337,15 +340,30 @@ func (r *RealDebrid) addTorrent(t *types.Torrent) (*types.Torrent, error) { return nil, err } req.Header.Add("Content-Type", "application/x-bittorrent") - resp, err := r.client.MakeRequest(req) + resp, err := r.client.Do(req) if err != nil { return nil, err } - if err = json.Unmarshal(resp, &data); err != nil { + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + // Handle multiple_downloads + + if resp.StatusCode == 509 { + return nil, utils.TooManyActiveDownloadsError + } + + bodyBytes, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("realdebrid API error: Status: %d || Body: %s", resp.StatusCode, string(bodyBytes)) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("reading response body: %w", err) + } + if err = json.Unmarshal(bodyBytes, &data); err != nil { return nil, err } t.Id = data.Id - t.Debrid = r.Name + t.Debrid = r.name t.MountPath = r.MountPath return t, nil } @@ -357,15 +375,30 @@ func (r *RealDebrid) addMagnet(t *types.Torrent) (*types.Torrent, error) { } var data AddMagnetSchema req, _ := http.NewRequest(http.MethodPost, url, strings.NewReader(payload.Encode())) - resp, err := r.client.MakeRequest(req) + resp, err := r.client.Do(req) if err != nil { return nil, err } - if err = json.Unmarshal(resp, &data); err != nil { + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + // Handle multiple_downloads + + if resp.StatusCode == 509 { + return nil, utils.TooManyActiveDownloadsError + } + + bodyBytes, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("realdebrid API error: Status: %d || Body: %s", resp.StatusCode, string(bodyBytes)) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("reading response body: %w", err) + } + if err = json.Unmarshal(bodyBytes, &data); err != nil { return nil, err } t.Id = data.Id - t.Debrid = r.Name + t.Debrid = r.name t.MountPath = r.MountPath return t, nil } @@ -384,7 +417,7 @@ func (r *RealDebrid) GetTorrent(torrentId string) (*types.Torrent, error) { } if resp.StatusCode != http.StatusOK { if resp.StatusCode == http.StatusNotFound { - return nil, request.TorrentNotFoundError + return nil, utils.TorrentNotFoundError } return nil, fmt.Errorf("realdebrid API error: Status: %d || Body: %s", resp.StatusCode, string(bodyBytes)) } @@ -406,7 +439,7 @@ func (r *RealDebrid) GetTorrent(torrentId string) (*types.Torrent, error) { Filename: data.Filename, OriginalFilename: data.OriginalFilename, Links: data.Links, - Debrid: r.Name, + Debrid: r.name, MountPath: r.MountPath, } t.Files = r.getTorrentFiles(t, data) // Get selected files @@ -427,7 +460,7 @@ func (r *RealDebrid) UpdateTorrent(t *types.Torrent) error { } if resp.StatusCode != http.StatusOK { if resp.StatusCode == http.StatusNotFound { - return request.TorrentNotFoundError + return utils.TorrentNotFoundError } return fmt.Errorf("realdebrid API error: Status: %d || Body: %s", resp.StatusCode, string(bodyBytes)) } @@ -447,7 +480,7 @@ func (r *RealDebrid) UpdateTorrent(t *types.Torrent) error { t.OriginalFilename = data.OriginalFilename t.Links = data.Links t.MountPath = r.MountPath - t.Debrid = r.Name + t.Debrid = r.name t.Added = data.Added t.Files, _ = r.getSelectedFiles(t, data) // Get selected files @@ -478,7 +511,7 @@ func (r *RealDebrid) CheckStatus(t *types.Torrent, isSymlink bool) (*types.Torre t.Seeders = data.Seeders t.Links = data.Links t.Status = status - t.Debrid = r.Name + t.Debrid = r.name t.MountPath = r.MountPath if status == "waiting_files_selection" { t.Files = r.getTorrentFiles(t, data) @@ -499,6 +532,9 @@ func (r *RealDebrid) CheckStatus(t *types.Torrent, isSymlink bool) (*types.Torre return t, err } if res.StatusCode != http.StatusNoContent { + if res.StatusCode == 509 { + return nil, utils.TooManyActiveDownloadsError + } return t, fmt.Errorf("realdebrid API error: Status: %d", res.StatusCode) } } else if status == "downloaded" { @@ -593,7 +629,7 @@ func (r *RealDebrid) CheckLink(link string) error { return err } if resp.StatusCode == http.StatusNotFound { - return request.HosterUnavailableError // File has been removed + return utils.HosterUnavailableError // File has been removed } return nil } @@ -622,17 +658,17 @@ func (r *RealDebrid) _getDownloadLink(file *types.File) (*types.DownloadLink, er } switch data.ErrorCode { case 19: - return nil, request.HosterUnavailableError // File has been removed + return nil, utils.HosterUnavailableError // File has been removed case 23: - return nil, request.TrafficExceededError + return nil, utils.TrafficExceededError case 24: - return nil, request.HosterUnavailableError // Link has been nerfed + return nil, utils.HosterUnavailableError // Link has been nerfed case 34: - return nil, request.TrafficExceededError // traffic exceeded + return nil, utils.TrafficExceededError // traffic exceeded case 35: - return nil, request.HosterUnavailableError + return nil, utils.HosterUnavailableError case 36: - return nil, request.TrafficExceededError // traffic exceeded + return nil, utils.TrafficExceededError // traffic exceeded default: return nil, fmt.Errorf("realdebrid API error: Status: %d || Code: %d", resp.StatusCode, data.ErrorCode) } @@ -674,7 +710,7 @@ func (r *RealDebrid) GetDownloadLink(t *types.Torrent, file *types.File) (*types downloadLink, err := r._getDownloadLink(file) retries := 0 if err != nil { - if errors.Is(err, request.TrafficExceededError) { + if errors.Is(err, utils.TrafficExceededError) { // Retries generating retries = 5 } else { @@ -688,7 +724,7 @@ func (r *RealDebrid) GetDownloadLink(t *types.Torrent, file *types.File) (*types if err == nil { return downloadLink, nil } - if !errors.Is(err, request.TrafficExceededError) { + if !errors.Is(err, utils.TrafficExceededError) { return nil, err } // Add a delay before retrying @@ -750,7 +786,7 @@ func (r *RealDebrid) getTorrents(offset int, limit int) (int, []*types.Torrent, Links: t.Links, Files: make(map[string]types.File), InfoHash: t.Hash, - Debrid: r.Name, + Debrid: r.name, MountPath: r.MountPath, Added: t.Added.Format(time.RFC3339), }) @@ -941,3 +977,17 @@ func (r *RealDebrid) GetProfile() (*types.Profile, error) { } return profile, nil } + +func (r *RealDebrid) GetAvailableSlots() (int, error) { + url := fmt.Sprintf("%s/torrents/activeCount", r.Host) + req, _ := http.NewRequest(http.MethodGet, url, nil) + resp, err := r.client.MakeRequest(req) + if err != nil { + return 0, nil + } + var data AvailableSlotsResponse + if json.Unmarshal(resp, &data) != nil { + return 0, fmt.Errorf("error unmarshalling available slots response: %w", err) + } + return data.TotalSlots - data.ActiveSlots - r.minimumFreeSlot, nil // Ensure we maintain minimum active pots +} diff --git a/pkg/debrid/providers/realdebrid/types.go b/pkg/debrid/providers/realdebrid/types.go index 5195e60..f276e7d 100644 --- a/pkg/debrid/providers/realdebrid/types.go +++ b/pkg/debrid/providers/realdebrid/types.go @@ -151,3 +151,8 @@ type profileResponse struct { Premium int `json:"premium"` Expiration time.Time `json:"expiration"` } + +type AvailableSlotsResponse struct { + ActiveSlots int `json:"nb"` + TotalSlots int `json:"limit"` +} diff --git a/pkg/debrid/providers/torbox/torbox.go b/pkg/debrid/providers/torbox/torbox.go index 7f22280..337d598 100644 --- a/pkg/debrid/providers/torbox/torbox.go +++ b/pkg/debrid/providers/torbox/torbox.go @@ -24,7 +24,7 @@ import ( ) type Torbox struct { - Name string + name string Host string `json:"host"` APIKey string accounts map[string]types.Account @@ -67,7 +67,7 @@ func New(dc config.Debrid) (*Torbox, error) { } return &Torbox{ - Name: "torbox", + name: "torbox", Host: "https://api.torbox.app/v1", APIKey: dc.APIKey, accounts: accounts, @@ -80,11 +80,11 @@ func New(dc config.Debrid) (*Torbox, error) { }, nil } -func (tb *Torbox) GetName() string { - return tb.Name +func (tb *Torbox) Name() string { + return tb.name } -func (tb *Torbox) GetLogger() zerolog.Logger { +func (tb *Torbox) Logger() zerolog.Logger { return tb.logger } @@ -166,7 +166,7 @@ func (tb *Torbox) SubmitMagnet(torrent *types.Torrent) (*types.Torrent, error) { torrentId := strconv.Itoa(dt.Id) torrent.Id = torrentId torrent.MountPath = tb.MountPath - torrent.Debrid = tb.Name + torrent.Debrid = tb.name return torrent, nil } @@ -215,7 +215,7 @@ func (tb *Torbox) GetTorrent(torrentId string) (*types.Torrent, error) { Filename: data.Name, OriginalFilename: data.Name, MountPath: tb.MountPath, - Debrid: tb.Name, + Debrid: tb.name, Files: make(map[string]types.File), Added: data.CreatedAt.Format(time.RFC3339), } @@ -250,7 +250,7 @@ func (tb *Torbox) GetTorrent(torrentId string) (*types.Torrent, error) { } t.OriginalFilename = strings.Split(cleanPath, "/")[0] - t.Debrid = tb.Name + t.Debrid = tb.name return t, nil } @@ -279,7 +279,7 @@ func (tb *Torbox) UpdateTorrent(t *types.Torrent) error { t.Filename = name t.OriginalFilename = name t.MountPath = tb.MountPath - t.Debrid = tb.Name + t.Debrid = tb.name cfg := config.Get() for _, f := range data.Files { fileName := filepath.Base(f.Name) @@ -311,7 +311,7 @@ func (tb *Torbox) UpdateTorrent(t *types.Torrent) error { } t.OriginalFilename = strings.Split(cleanPath, "/")[0] - t.Debrid = tb.Name + t.Debrid = tb.name return nil } @@ -470,3 +470,8 @@ func (tb *Torbox) ResetActiveDownloadKeys() { func (tb *Torbox) DeleteDownloadLink(linkId string) error { return nil } + +func (tb *Torbox) GetAvailableSlots() (int, error) { + //TODO: Implement the logic to check available slots for Torbox + return 0, fmt.Errorf("not implemented") +} diff --git a/pkg/debrid/store/cache.go b/pkg/debrid/store/cache.go index 0412645..5edb3f5 100644 --- a/pkg/debrid/store/cache.go +++ b/pkg/debrid/store/cache.go @@ -143,7 +143,7 @@ func NewDebridCache(dc config.Debrid, client types.Client) *Cache { customFolders = append(customFolders, name) } - _log := logger.New(fmt.Sprintf("%s-webdav", client.GetName())) + _log := logger.New(fmt.Sprintf("%s-webdav", client.Name())) c := &Cache{ dir: filepath.Join(cfg.Path, "cache", dc.Name), // path to save cache files @@ -248,7 +248,7 @@ func (c *Cache) Start(ctx context.Context) error { go c.repairWorker(ctx) cfg := config.Get() - name := c.client.GetName() + name := c.client.Name() addr := cfg.BindAddress + ":" + cfg.Port + cfg.URLBase + "webdav/" + name + "/" c.logger.Info().Msgf("%s WebDav server running at %s", name, addr) @@ -379,7 +379,7 @@ func (c *Cache) Sync(ctx context.Context) error { totalTorrents := len(torrents) - c.logger.Info().Msgf("%d torrents found from %s", totalTorrents, c.client.GetName()) + c.logger.Info().Msgf("%d torrents found from %s", totalTorrents, c.client.Name()) newTorrents := make([]*types.Torrent, 0) idStore := make(map[string]struct{}, totalTorrents) @@ -719,7 +719,7 @@ func (c *Cache) Add(t *types.Torrent) error { } -func (c *Cache) GetClient() types.Client { +func (c *Cache) Client() types.Client { return c.client } @@ -866,6 +866,6 @@ func (c *Cache) RemoveFile(torrentId string, filename string) error { return nil } -func (c *Cache) GetLogger() zerolog.Logger { +func (c *Cache) Logger() zerolog.Logger { return c.logger } diff --git a/pkg/debrid/store/download_link.go b/pkg/debrid/store/download_link.go index a404d27..951cb1d 100644 --- a/pkg/debrid/store/download_link.go +++ b/pkg/debrid/store/download_link.go @@ -3,12 +3,11 @@ package store import ( "errors" "fmt" + "github.com/sirrobot01/decypharr/internal/utils" "github.com/sirrobot01/decypharr/pkg/debrid/types" "sync" "time" - - "github.com/sirrobot01/decypharr/internal/request" ) type linkCache struct { @@ -146,7 +145,7 @@ func (c *Cache) fetchDownloadLink(torrentName, filename, fileLink string) (strin c.logger.Trace().Msgf("Getting download link for %s(%s)", filename, file.Link) downloadLink, err := c.client.GetDownloadLink(ct.Torrent, &file) if err != nil { - if errors.Is(err, request.HosterUnavailableError) { + if errors.Is(err, utils.HosterUnavailableError) { newCt, err := c.reInsertTorrent(ct) if err != nil { return "", fmt.Errorf("failed to reinsert torrent: %w", err) @@ -166,7 +165,7 @@ func (c *Cache) fetchDownloadLink(torrentName, filename, fileLink string) (strin } c.updateDownloadLink(downloadLink) return "", nil - } else if errors.Is(err, request.TrafficExceededError) { + } else if errors.Is(err, utils.TrafficExceededError) { // This is likely a fair usage limit error return "", err } else { diff --git a/pkg/debrid/store/misc.go b/pkg/debrid/store/misc.go index d0c089b..7908187 100644 --- a/pkg/debrid/store/misc.go +++ b/pkg/debrid/store/misc.go @@ -28,7 +28,7 @@ func mergeFiles(torrents ...CachedTorrent) map[string]types.File { func (c *Cache) GetIngests() ([]types.IngestData, error) { torrents := c.GetTorrents() - debridName := c.client.GetName() + debridName := c.client.Name() var ingests []types.IngestData for _, torrent := range torrents { ingests = append(ingests, types.IngestData{ diff --git a/pkg/debrid/store/repair.go b/pkg/debrid/store/repair.go index 9234995..8fbdb04 100644 --- a/pkg/debrid/store/repair.go +++ b/pkg/debrid/store/repair.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/sirrobot01/decypharr/internal/request" "github.com/sirrobot01/decypharr/internal/utils" "github.com/sirrobot01/decypharr/pkg/debrid/types" "sync" @@ -98,7 +97,7 @@ func (c *Cache) GetBrokenFiles(t *CachedTorrent, filenames []string) []string { } else { // Check if file.Link not in the downloadLink Cache if err := c.client.CheckLink(f.Link); err != nil { - if errors.Is(err, request.HosterUnavailableError) { + if errors.Is(err, utils.HosterUnavailableError) { brokenFiles = append(brokenFiles, f.Name) } } diff --git a/pkg/debrid/types/client.go b/pkg/debrid/types/client.go index f9d967b..61b4f2b 100644 --- a/pkg/debrid/types/client.go +++ b/pkg/debrid/types/client.go @@ -16,8 +16,8 @@ type Client interface { UpdateTorrent(torrent *Torrent) error GetTorrent(torrentId string) (*Torrent, error) GetTorrents() ([]*Torrent, error) - GetName() string - GetLogger() zerolog.Logger + Name() string + Logger() zerolog.Logger GetDownloadingStatus() []string GetDownloads() (map[string]DownloadLink, error) CheckLink(link string) error @@ -26,4 +26,5 @@ type Client interface { ResetActiveDownloadKeys() DeleteDownloadLink(linkId string) error GetProfile() (*Profile, error) + GetAvailableSlots() (int, error) } diff --git a/pkg/qbit/context.go b/pkg/qbit/context.go index 893cc76..e6b941a 100644 --- a/pkg/qbit/context.go +++ b/pkg/qbit/context.go @@ -82,7 +82,7 @@ func (q *QBit) authContext(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { host, token, err := decodeAuthHeader(r.Header.Get("Authorization")) category := getCategory(r.Context()) - arrs := store.GetStore().GetArr() + arrs := store.Get().Arr() // Check if arr exists a := arrs.Get(category) if a == nil { diff --git a/pkg/qbit/qbit.go b/pkg/qbit/qbit.go index 04c92a4..77a4334 100644 --- a/pkg/qbit/qbit.go +++ b/pkg/qbit/qbit.go @@ -25,7 +25,7 @@ func New() *QBit { Password: cfg.Password, DownloadFolder: cfg.DownloadFolder, Categories: cfg.Categories, - storage: store.GetStore().GetTorrentStorage(), + storage: store.Get().Torrents(), logger: logger.New("qbit"), } } diff --git a/pkg/qbit/torrent.go b/pkg/qbit/torrent.go index 482a299..c3f37de 100644 --- a/pkg/qbit/torrent.go +++ b/pkg/qbit/torrent.go @@ -18,7 +18,7 @@ func (q *QBit) addMagnet(ctx context.Context, url string, arr *arr.Arr, debrid s if err != nil { return fmt.Errorf("error parsing magnet link: %w", err) } - _store := store.GetStore() + _store := store.Get() importReq := store.NewImportRequest(debrid, q.DownloadFolder, magnet, arr, isSymlink, false, "", store.ImportTypeQBitTorrent) @@ -37,7 +37,7 @@ func (q *QBit) addTorrent(ctx context.Context, fileHeader *multipart.FileHeader, if err != nil { return fmt.Errorf("error reading file: %s \n %w", fileHeader.Filename, err) } - _store := store.GetStore() + _store := store.Get() importReq := store.NewImportRequest(debrid, q.DownloadFolder, magnet, arr, isSymlink, false, "", store.ImportTypeQBitTorrent) err = _store.AddTorrent(ctx, importReq) if err != nil { diff --git a/pkg/repair/misc.go b/pkg/repair/misc.go index f2d2b64..d946a98 100644 --- a/pkg/repair/misc.go +++ b/pkg/repair/misc.go @@ -159,7 +159,7 @@ func (r *Repair) findDebridForPath(dir string, clients map[string]types.Client) } if filepath.Clean(mountPath) == filepath.Clean(dir) { - debridName := client.GetName() + debridName := client.Name() // Cache the result r.cacheMutex.Lock() diff --git a/pkg/repair/repair.go b/pkg/repair/repair.go index 453fb29..9a6daf8 100644 --- a/pkg/repair/repair.go +++ b/pkg/repair/repair.go @@ -228,7 +228,7 @@ func (r *Repair) newJob(arrsNames []string, mediaIDs []string) *Job { func (r *Repair) preRunChecks() error { if r.useWebdav { - caches := r.deb.GetCaches() + caches := r.deb.Caches() if len(caches) == 0 { return fmt.Errorf("no caches found") } @@ -639,13 +639,13 @@ func (r *Repair) getZurgBrokenFiles(job *Job, media arr.Content) []arr.ContentFi func (r *Repair) getWebdavBrokenFiles(job *Job, media arr.Content) []arr.ContentFile { // Use internal webdav setup to check file availability - caches := r.deb.GetCaches() + caches := r.deb.Caches() if len(caches) == 0 { r.logger.Info().Msg("No caches found. Can't use webdav") return nil } - clients := r.deb.GetClients() + clients := r.deb.Clients() if len(clients) == 0 { r.logger.Info().Msg("No clients found. Can't use webdav") return nil diff --git a/pkg/server/debug.go b/pkg/server/debug.go index 2b20b9a..4204e5a 100644 --- a/pkg/server/debug.go +++ b/pkg/server/debug.go @@ -12,13 +12,13 @@ import ( func (s *Server) handleIngests(w http.ResponseWriter, r *http.Request) { ingests := make([]debridTypes.IngestData, 0) - _store := store.GetStore() - debrids := _store.GetDebrid() + _store := store.Get() + debrids := _store.Debrid() if debrids == nil { http.Error(w, "Debrid service is not enabled", http.StatusInternalServerError) return } - for _, cache := range debrids.GetCaches() { + for _, cache := range debrids.Caches() { if cache == nil { s.logger.Error().Msg("Debrid cache is nil, skipping") continue @@ -42,15 +42,15 @@ func (s *Server) handleIngestsByDebrid(w http.ResponseWriter, r *http.Request) { return } - _store := store.GetStore() - debrids := _store.GetDebrid() + _store := store.Get() + debrids := _store.Debrid() if debrids == nil { http.Error(w, "Debrid service is not enabled", http.StatusInternalServerError) return } - caches := debrids.GetCaches() + caches := debrids.Caches() cache, exists := caches[debridName] if !exists { @@ -92,13 +92,13 @@ func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) { "go_version": runtime.Version(), } - debrids := store.GetStore().GetDebrid() + debrids := store.Get().Debrid() if debrids == nil { request.JSONResponse(w, stats, http.StatusOK) return } - clients := debrids.GetClients() - caches := debrids.GetCaches() + clients := debrids.Clients() + caches := debrids.Caches() profiles := make([]*debridTypes.Profile, 0) for debridName, client := range clients { profile, err := client.GetProfile() diff --git a/pkg/server/webhook.go b/pkg/server/webhook.go index bc81ccb..fde99e9 100644 --- a/pkg/server/webhook.go +++ b/pkg/server/webhook.go @@ -38,7 +38,7 @@ func (s *Server) handleTautulli(w http.ResponseWriter, r *http.Request) { http.Error(w, "Invalid ID", http.StatusBadRequest) return } - repair := store.GetStore().GetRepair() + repair := store.Get().Repair() mediaId := cmp.Or(payload.TmdbID, payload.TvdbID) diff --git a/pkg/store/downloader.go b/pkg/store/downloader.go index 29692f3..dc2244c 100644 --- a/pkg/store/downloader.go +++ b/pkg/store/downloader.go @@ -13,7 +13,7 @@ import ( "github.com/sirrobot01/decypharr/internal/utils" ) -func Download(client *grab.Client, url, filename string, byterange *[2]int64, progressCallback func(int64, int64)) error { +func grabber(client *grab.Client, url, filename string, byterange *[2]int64, progressCallback func(int64, int64)) error { req, err := grab.NewRequest(filename, url) if err != nil { return err @@ -56,7 +56,7 @@ Loop: return resp.Err() } -func (s *Store) ProcessManualFile(torrent *Torrent) (string, error) { +func (s *Store) processDownload(torrent *Torrent) (string, error) { debridTorrent := torrent.DebridTorrent s.logger.Info().Msgf("Downloading %d files...", len(debridTorrent.Files)) torrentPath := filepath.Join(torrent.SavePath, utils.RemoveExtension(debridTorrent.OriginalFilename)) @@ -96,7 +96,7 @@ func (s *Store) downloadFiles(torrent *Torrent, parent string) { if totalSize > 0 { debridTorrent.Progress = float64(debridTorrent.SizeDownloaded) / float64(totalSize) * 100 } - s.UpdateTorrentMin(torrent, debridTorrent) + s.partialTorrentUpdate(torrent, debridTorrent) } client := &grab.Client{ UserAgent: "Decypharr[QBitTorrent]", @@ -119,7 +119,7 @@ func (s *Store) downloadFiles(torrent *Torrent, parent string) { defer func() { <-s.downloadSemaphore }() filename := file.Name - err := Download( + err := grabber( client, file.DownloadLink.DownloadLink, filepath.Join(parent, filename), @@ -151,7 +151,7 @@ func (s *Store) downloadFiles(torrent *Torrent, parent string) { s.logger.Info().Msgf("Downloaded all files for %s", debridTorrent.Name) } -func (s *Store) ProcessSymlink(torrent *Torrent) (string, error) { +func (s *Store) processSymlink(torrent *Torrent) (string, error) { debridTorrent := torrent.DebridTorrent files := debridTorrent.Files if len(files) == 0 { diff --git a/pkg/store/request.go b/pkg/store/request.go index 80439e0..86f0bca 100644 --- a/pkg/store/request.go +++ b/pkg/store/request.go @@ -2,13 +2,16 @@ package store import ( "bytes" + "context" "encoding/json" + "fmt" "github.com/sirrobot01/decypharr/internal/request" "github.com/sirrobot01/decypharr/internal/utils" "github.com/sirrobot01/decypharr/pkg/arr" debridTypes "github.com/sirrobot01/decypharr/pkg/debrid/types" "net/http" "net/url" + "sync" "time" ) @@ -19,23 +22,9 @@ const ( ImportTypeAPI ImportType = "api" ) -func NewImportRequest(debrid string, downloadFolder string, magnet *utils.Magnet, arr *arr.Arr, isSymlink, downloadUncached bool, callBackUrl string, importType ImportType) *ImportRequest { - return &ImportRequest{ - Status: "started", - DownloadFolder: downloadFolder, - Debrid: debrid, - Magnet: magnet, - Arr: arr, - IsSymlink: isSymlink, - DownloadUncached: downloadUncached, - CallBackUrl: callBackUrl, - Type: importType, - } -} - type ImportRequest struct { DownloadFolder string `json:"downloadFolder"` - Debrid string `json:"debrid"` + SelectedDebrid string `json:"debrid"` Magnet *utils.Magnet `json:"magnet"` Arr *arr.Arr `json:"arr"` IsSymlink bool `json:"isSymlink"` @@ -50,6 +39,20 @@ type ImportRequest struct { Async bool `json:"async"` } +func NewImportRequest(debrid string, downloadFolder string, magnet *utils.Magnet, arr *arr.Arr, isSymlink, downloadUncached bool, callBackUrl string, importType ImportType) *ImportRequest { + return &ImportRequest{ + Status: "started", + DownloadFolder: downloadFolder, + SelectedDebrid: debrid, + Magnet: magnet, + Arr: arr, + IsSymlink: isSymlink, + DownloadUncached: downloadUncached, + CallBackUrl: callBackUrl, + Type: importType, + } +} + type importResponse struct { Status string `json:"status"` CompletedAt time.Time `json:"completedAt"` @@ -101,3 +104,89 @@ func (i *ImportRequest) markAsCompleted(torrent *Torrent, debridTorrent *debridT i.CompletedAt = time.Now() i.sendCallback(torrent, debridTorrent) } + +type ImportQueue struct { + queue map[string]chan *ImportRequest // Map to hold queues for different debrid services + mu sync.RWMutex // Mutex to protect access to the queue map + ctx context.Context + cancel context.CancelFunc + capacity int // Capacity of each channel in the queue +} + +func NewImportQueue(ctx context.Context, capacity int) *ImportQueue { + ctx, cancel := context.WithCancel(ctx) + return &ImportQueue{ + queue: make(map[string]chan *ImportRequest), + ctx: ctx, + cancel: cancel, + capacity: capacity, + } +} + +func (iq *ImportQueue) Push(req *ImportRequest) error { + if req == nil { + return fmt.Errorf("import request cannot be nil") + } + + iq.mu.Lock() + defer iq.mu.Unlock() + + if _, exists := iq.queue[req.SelectedDebrid]; !exists { + iq.queue[req.SelectedDebrid] = make(chan *ImportRequest, iq.capacity) // Create a new channel for the debrid service + } + + select { + case iq.queue[req.SelectedDebrid] <- req: + return nil + case <-iq.ctx.Done(): + return fmt.Errorf("retry queue is shutting down") + } +} + +func (iq *ImportQueue) TryPop(selectedDebrid string) (*ImportRequest, error) { + iq.mu.RLock() + defer iq.mu.RUnlock() + + if ch, exists := iq.queue[selectedDebrid]; exists { + select { + case req := <-ch: + return req, nil + case <-iq.ctx.Done(): + return nil, fmt.Errorf("queue is shutting down") + default: + return nil, fmt.Errorf("no import request available for %s", selectedDebrid) + } + } + return nil, fmt.Errorf("no queue exists for %s", selectedDebrid) +} + +func (iq *ImportQueue) Size(selectedDebrid string) int { + iq.mu.RLock() + defer iq.mu.RUnlock() + + if ch, exists := iq.queue[selectedDebrid]; exists { + return len(ch) + } + return 0 +} + +func (iq *ImportQueue) Close() { + iq.cancel() + iq.mu.Lock() + defer iq.mu.Unlock() + + for _, ch := range iq.queue { + // Drain remaining items before closing + for { + select { + case <-ch: + // Discard remaining items + default: + close(ch) + goto nextChannel + } + } + nextChannel: + } + iq.queue = make(map[string]chan *ImportRequest) +} diff --git a/pkg/store/store.go b/pkg/store/store.go index d404152..4492ffc 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -2,6 +2,7 @@ package store import ( "cmp" + "context" "github.com/rs/zerolog" "github.com/sirrobot01/decypharr/internal/config" "github.com/sirrobot01/decypharr/internal/logger" @@ -16,6 +17,7 @@ type Store struct { repair *repair.Repair arr *arr.Storage debrid *debrid.Storage + importsQueue *ImportQueue // Queued import requests(probably from too_many_active_downloads) torrents *TorrentStorage logger zerolog.Logger refreshInterval time.Duration @@ -28,8 +30,8 @@ var ( once sync.Once ) -// GetStore returns the singleton instance -func GetStore() *Store { +// Get returns the singleton instance +func Get() *Store { once.Do(func() { arrs := arr.NewStorage() deb := debrid.NewStorage() @@ -45,6 +47,7 @@ func GetStore() *Store { refreshInterval: time.Duration(cmp.Or(qbitCfg.RefreshInterval, 10)) * time.Minute, skipPreCache: qbitCfg.SkipPreCache, downloadSemaphore: make(chan struct{}, cmp.Or(qbitCfg.MaxDownloads, 5)), + importsQueue: NewImportQueue(context.Background(), 1000), } }) return instance @@ -55,21 +58,26 @@ func Reset() { if instance.debrid != nil { instance.debrid.Reset() } + + if instance.importsQueue != nil { + instance.importsQueue.Close() + } + close(instance.downloadSemaphore) } once = sync.Once{} instance = nil } -func (s *Store) GetArr() *arr.Storage { +func (s *Store) Arr() *arr.Storage { return s.arr } -func (s *Store) GetDebrid() *debrid.Storage { +func (s *Store) Debrid() *debrid.Storage { return s.debrid } -func (s *Store) GetRepair() *repair.Repair { +func (s *Store) Repair() *repair.Repair { return s.repair } -func (s *Store) GetTorrentStorage() *TorrentStorage { +func (s *Store) Torrents() *TorrentStorage { return s.torrents } diff --git a/pkg/store/torrent.go b/pkg/store/torrent.go index 419c28b..22798b6 100644 --- a/pkg/store/torrent.go +++ b/pkg/store/torrent.go @@ -3,6 +3,7 @@ package store import ( "cmp" "context" + "errors" "fmt" "github.com/sirrobot01/decypharr/internal/request" "github.com/sirrobot01/decypharr/internal/utils" @@ -15,22 +16,125 @@ import ( func (s *Store) AddTorrent(ctx context.Context, importReq *ImportRequest) error { torrent := createTorrentFromMagnet(importReq) - debridTorrent, err := debridTypes.ProcessTorrent(ctx, s.debrid, importReq.Debrid, importReq.Magnet, importReq.Arr, importReq.IsSymlink, importReq.DownloadUncached) - if err != nil || debridTorrent == nil { - if err == nil { - err = fmt.Errorf("failed to process torrent") + debridTorrent, err := debridTypes.Process(ctx, s.debrid, importReq.SelectedDebrid, importReq.Magnet, importReq.Arr, importReq.IsSymlink, importReq.DownloadUncached) + + if err != nil { + var httpErr *utils.HTTPError + if ok := errors.As(err, &httpErr); ok { + switch httpErr.Code { + case "too_many_active_downloads": + // Handle too much active downloads error + s.logger.Warn().Msgf("Too many active downloads for %s, adding to queue", importReq.Magnet.Name) + err := s.addToQueue(importReq) + if err != nil { + s.logger.Error().Err(err).Msgf("Failed to add %s to queue", importReq.Magnet.Name) + return err + } + torrent.State = "queued" + default: + // Unhandled error, return it, caller logs it + return err + } + } else { + // Unhandled error, return it, caller logs it + return err } - // This error is returned immediately to the user(no need for callback) - return err } - torrent = s.UpdateTorrentMin(torrent, debridTorrent) + torrent = s.partialTorrentUpdate(torrent, debridTorrent) s.torrents.AddOrUpdate(torrent) go s.processFiles(torrent, debridTorrent, importReq) // We can send async for file processing not to delay the response return nil } +func (s *Store) addToQueue(importReq *ImportRequest) error { + if importReq.Magnet == nil { + return fmt.Errorf("magnet is required") + } + + if importReq.Arr == nil { + return fmt.Errorf("arr is required") + } + + importReq.Status = "queued" + importReq.CompletedAt = time.Time{} + importReq.Error = nil + err := s.importsQueue.Push(importReq) + if err != nil { + return err + } + return nil +} + +func (s *Store) processFromQueue(ctx context.Context, selectedDebrid string) error { + // Pop the next import request from the queue + importReq, err := s.importsQueue.TryPop(selectedDebrid) + if err != nil { + return err + } + if importReq == nil { + return nil + } + return s.AddTorrent(ctx, importReq) +} + +func (s *Store) StartQueueSchedule(ctx context.Context) error { + + s.trackAvailableSlots(ctx) // Initial tracking of available slots + + ticker := time.NewTicker(time.Minute) + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + s.trackAvailableSlots(ctx) + } + } +} + +func (s *Store) trackAvailableSlots(ctx context.Context) { + // This function tracks the available slots for each debrid client + availableSlots := make(map[string]int) + + for name, deb := range s.debrid.Debrids() { + slots, err := deb.Client().GetAvailableSlots() + if err != nil { + continue + } + availableSlots[name] = slots + } + + for name, slots := range availableSlots { + if s.importsQueue.Size(name) <= 0 { + continue + } + s.logger.Debug().Msgf("Available slots for %s: %d", name, slots) + // If slots are available, process the next import request from the queue + for slots > 0 { + select { + case <-ctx.Done(): + return // Exit if context is done + default: + if err := s.processFromQueue(ctx, name); err != nil { + s.logger.Error().Err(err).Msg("Error processing from queue") + return // Exit on error + } + slots-- // Decrease the available slots after processing + } + } + } +} + func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, importReq *ImportRequest) { - client := s.debrid.GetClient(debridTorrent.Debrid) + + if debridTorrent == nil { + // Early return if debridTorrent is nil + return + } + + deb := s.debrid.Debrid(debridTorrent.Debrid) + client := deb.Client() downloadingStatuses := client.GetDownloadingStatus() _arr := importReq.Arr for debridTorrent.Status != "downloaded" { @@ -53,7 +157,7 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp } debridTorrent = dbT - torrent = s.UpdateTorrentMin(torrent, debridTorrent) + torrent = s.partialTorrentUpdate(torrent, debridTorrent) // Exit the loop for downloading statuses to prevent memory buildup if debridTorrent.Status == "downloaded" || !utils.Contains(downloadingStatuses, debridTorrent.Status) { @@ -71,9 +175,8 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp // Check if debrid supports webdav by checking cache timer := time.Now() if importReq.IsSymlink { - caches := s.debrid.GetCaches() - cache, useWebdav := caches[debridTorrent.Debrid] - if useWebdav { + cache := deb.Cache() + if cache != nil { s.logger.Info().Msgf("Using internal webdav for %s", debridTorrent.Debrid) // Use webdav to download the file @@ -91,10 +194,10 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp } else { // User is using either zurg or debrid webdav - torrentSymlinkPath, err = s.ProcessSymlink(torrent) // /mnt/symlinks/{category}/MyTVShow/ + torrentSymlinkPath, err = s.processSymlink(torrent) // /mnt/symlinks/{category}/MyTVShow/ } } else { - torrentSymlinkPath, err = s.ProcessManualFile(torrent) + torrentSymlinkPath, err = s.processDownload(torrent) } if err != nil { s.markTorrentAsFailed(torrent) @@ -106,7 +209,7 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp return } torrent.TorrentPath = torrentSymlinkPath - s.UpdateTorrent(torrent, debridTorrent) + s.updateTorrent(torrent, debridTorrent) s.logger.Info().Msgf("Adding %s took %s", debridTorrent.Name, time.Since(timer)) go importReq.markAsCompleted(torrent, debridTorrent) // Mark the import request as completed, send callback if needed @@ -129,7 +232,7 @@ func (s *Store) markTorrentAsFailed(t *Torrent) *Torrent { return t } -func (s *Store) UpdateTorrentMin(t *Torrent, debridTorrent *types.Torrent) *Torrent { +func (s *Store) partialTorrentUpdate(t *Torrent, debridTorrent *types.Torrent) *Torrent { if debridTorrent == nil { return t } @@ -170,17 +273,17 @@ func (s *Store) UpdateTorrentMin(t *Torrent, debridTorrent *types.Torrent) *Torr return t } -func (s *Store) UpdateTorrent(t *Torrent, debridTorrent *types.Torrent) *Torrent { +func (s *Store) updateTorrent(t *Torrent, debridTorrent *types.Torrent) *Torrent { if debridTorrent == nil { return t } - if debridClient := s.debrid.GetClients()[debridTorrent.Debrid]; debridClient != nil { + if debridClient := s.debrid.Clients()[debridTorrent.Debrid]; debridClient != nil { if debridTorrent.Status != "downloaded" { _ = debridClient.UpdateTorrent(debridTorrent) } } - t = s.UpdateTorrentMin(t, debridTorrent) + t = s.partialTorrentUpdate(t, debridTorrent) t.ContentPath = t.TorrentPath + string(os.PathSeparator) if t.IsReady() { @@ -200,7 +303,7 @@ func (s *Store) UpdateTorrent(t *Torrent, debridTorrent *types.Torrent) *Torrent s.torrents.Update(t) return t } - updatedT := s.UpdateTorrent(t, debridTorrent) + updatedT := s.updateTorrent(t, debridTorrent) t = updatedT case <-time.After(10 * time.Minute): // Add a timeout diff --git a/pkg/store/torrent_storage.go b/pkg/store/torrent_storage.go index c6f9b8f..e55dfab 100644 --- a/pkg/store/torrent_storage.go +++ b/pkg/store/torrent_storage.go @@ -184,7 +184,7 @@ func (ts *TorrentStorage) Delete(hash, category string, removeFromDebrid bool) { return } if removeFromDebrid && torrent.ID != "" && torrent.Debrid != "" { - dbClient := GetStore().debrid.GetClient(torrent.Debrid) + dbClient := Get().debrid.Client(torrent.Debrid) if dbClient != nil { _ = dbClient.DeleteTorrent(torrent.ID) } @@ -238,7 +238,7 @@ func (ts *TorrentStorage) DeleteMultiple(hashes []string, removeFromDebrid bool) } }() - clients := GetStore().debrid.GetClients() + clients := Get().debrid.Clients() go func() { for id, debrid := range toDelete { diff --git a/pkg/web/api.go b/pkg/web/api.go index a9f6aba..9c53e4c 100644 --- a/pkg/web/api.go +++ b/pkg/web/api.go @@ -17,8 +17,8 @@ import ( ) func (wb *Web) handleGetArrs(w http.ResponseWriter, r *http.Request) { - _store := store.GetStore() - request.JSONResponse(w, _store.GetArr().GetAll(), http.StatusOK) + _store := store.Get() + request.JSONResponse(w, _store.Arr().GetAll(), http.StatusOK) } func (wb *Web) handleAddContent(w http.ResponseWriter, r *http.Request) { @@ -27,7 +27,7 @@ func (wb *Web) handleAddContent(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusBadRequest) return } - _store := store.GetStore() + _store := store.Get() results := make([]*store.ImportRequest, 0) errs := make([]string, 0) @@ -43,7 +43,7 @@ func (wb *Web) handleAddContent(w http.ResponseWriter, r *http.Request) { downloadUncached := r.FormValue("downloadUncached") == "true" - _arr := _store.GetArr().Get(arrName) + _arr := _store.Arr().Get(arrName) if _arr == nil { _arr = arr.New(arrName, "", "", false, false, &downloadUncached) } @@ -66,6 +66,7 @@ func (wb *Web) handleAddContent(w http.ResponseWriter, r *http.Request) { importReq := store.NewImportRequest(debridName, downloadFolder, magnet, _arr, !notSymlink, downloadUncached, callbackUrl, store.ImportTypeAPI) if err := _store.AddTorrent(ctx, importReq); err != nil { + wb.logger.Error().Err(err).Str("url", url).Msg("Failed to add torrent") errs = append(errs, fmt.Sprintf("URL %s: %v", url, err)) continue } @@ -91,6 +92,7 @@ func (wb *Web) handleAddContent(w http.ResponseWriter, r *http.Request) { importReq := store.NewImportRequest(debridName, downloadFolder, magnet, _arr, !notSymlink, downloadUncached, callbackUrl, store.ImportTypeAPI) err = _store.AddTorrent(ctx, importReq) if err != nil { + wb.logger.Error().Err(err).Str("file", fileHeader.Filename).Msg("Failed to add torrent") errs = append(errs, fmt.Sprintf("File %s: %v", fileHeader.Filename, err)) continue } @@ -114,12 +116,12 @@ func (wb *Web) handleRepairMedia(w http.ResponseWriter, r *http.Request) { return } - _store := store.GetStore() + _store := store.Get() var arrs []string if req.ArrName != "" { - _arr := _store.GetArr().Get(req.ArrName) + _arr := _store.Arr().Get(req.ArrName) if _arr == nil { http.Error(w, "No Arrs found to repair", http.StatusNotFound) return @@ -129,7 +131,7 @@ func (wb *Web) handleRepairMedia(w http.ResponseWriter, r *http.Request) { if req.Async { go func() { - if err := _store.GetRepair().AddJob(arrs, req.MediaIds, req.AutoProcess, false); err != nil { + if err := _store.Repair().AddJob(arrs, req.MediaIds, req.AutoProcess, false); err != nil { wb.logger.Error().Err(err).Msg("Failed to repair media") } }() @@ -137,7 +139,7 @@ func (wb *Web) handleRepairMedia(w http.ResponseWriter, r *http.Request) { return } - if err := _store.GetRepair().AddJob([]string{req.ArrName}, req.MediaIds, req.AutoProcess, false); err != nil { + if err := _store.Repair().AddJob([]string{req.ArrName}, req.MediaIds, req.AutoProcess, false); err != nil { http.Error(w, fmt.Sprintf("Failed to repair: %v", err), http.StatusInternalServerError) return } @@ -181,8 +183,8 @@ func (wb *Web) handleDeleteTorrents(w http.ResponseWriter, r *http.Request) { func (wb *Web) handleGetConfig(w http.ResponseWriter, r *http.Request) { cfg := config.Get() arrCfgs := make([]config.Arr, 0) - _store := store.GetStore() - for _, a := range _store.GetArr().GetAll() { + _store := store.Get() + for _, a := range _store.Arr().GetAll() { arrCfgs = append(arrCfgs, config.Arr{ Host: a.Host, Name: a.Name, @@ -237,8 +239,8 @@ func (wb *Web) handleUpdateConfig(w http.ResponseWriter, r *http.Request) { } // Update Arrs through the service - _store := store.GetStore() - _arr := _store.GetArr() + _store := store.Get() + _arr := _store.Arr() _arr.Clear() // Clear existing arrs for _, a := range updatedConfig.Arrs { @@ -270,8 +272,8 @@ func (wb *Web) handleUpdateConfig(w http.ResponseWriter, r *http.Request) { } func (wb *Web) handleGetRepairJobs(w http.ResponseWriter, r *http.Request) { - _store := store.GetStore() - request.JSONResponse(w, _store.GetRepair().GetJobs(), http.StatusOK) + _store := store.Get() + request.JSONResponse(w, _store.Repair().GetJobs(), http.StatusOK) } func (wb *Web) handleProcessRepairJob(w http.ResponseWriter, r *http.Request) { @@ -280,8 +282,8 @@ func (wb *Web) handleProcessRepairJob(w http.ResponseWriter, r *http.Request) { http.Error(w, "No job ID provided", http.StatusBadRequest) return } - _store := store.GetStore() - if err := _store.GetRepair().ProcessJob(id); err != nil { + _store := store.Get() + if err := _store.Repair().ProcessJob(id); err != nil { wb.logger.Error().Err(err).Msg("Failed to process repair job") } w.WriteHeader(http.StatusOK) @@ -301,8 +303,8 @@ func (wb *Web) handleDeleteRepairJob(w http.ResponseWriter, r *http.Request) { return } - _store := store.GetStore() - _store.GetRepair().DeleteJobs(req.IDs) + _store := store.Get() + _store.Repair().DeleteJobs(req.IDs) w.WriteHeader(http.StatusOK) } @@ -312,8 +314,8 @@ func (wb *Web) handleStopRepairJob(w http.ResponseWriter, r *http.Request) { http.Error(w, "No job ID provided", http.StatusBadRequest) return } - _store := store.GetStore() - if err := _store.GetRepair().StopJob(id); err != nil { + _store := store.Get() + if err := _store.Repair().StopJob(id); err != nil { wb.logger.Error().Err(err).Msg("Failed to stop repair job") http.Error(w, "Failed to stop job: "+err.Error(), http.StatusInternalServerError) return diff --git a/pkg/web/web.go b/pkg/web/web.go index aa20c58..3c24d87 100644 --- a/pkg/web/web.go +++ b/pkg/web/web.go @@ -79,6 +79,6 @@ func New() *Web { logger: logger.New("ui"), templates: templates, cookie: cookieStore, - torrents: store.GetStore().GetTorrentStorage(), + torrents: store.Get().Torrents(), } } diff --git a/pkg/webdav/file.go b/pkg/webdav/file.go index 3833b5f..2d6bc7a 100644 --- a/pkg/webdav/file.go +++ b/pkg/webdav/file.go @@ -86,7 +86,7 @@ func (f *File) getDownloadByteRange() (*[2]int64, error) { func (f *File) stream() (*http.Response, error) { client := sharedClient - _log := f.cache.GetLogger() + _log := f.cache.Logger() downloadLink, err := f.getDownloadLink() if err != nil { diff --git a/pkg/webdav/webdav.go b/pkg/webdav/webdav.go index 7259234..6a52b6b 100644 --- a/pkg/webdav/webdav.go +++ b/pkg/webdav/webdav.go @@ -95,8 +95,8 @@ func New() *WebDav { Handlers: make([]*Handler, 0), URLBase: urlBase, } - for name, c := range store.GetStore().GetDebrid().GetCaches() { - h := NewHandler(name, urlBase, c, c.GetLogger()) + for name, c := range store.Get().Debrid().Caches() { + h := NewHandler(name, urlBase, c, c.Logger()) w.Handlers = append(w.Handlers, h) } return w