From a539aa53bdd3c4020698a4d66d65984694a44325 Mon Sep 17 00:00:00 2001 From: Mukhtar Akere Date: Sat, 14 Jun 2025 16:09:28 +0100 Subject: [PATCH] - Speed up repairs when checking links \n - Remove run on start for repairs since it causes issues \n - Add support for arr-specific debrid - Support for queuing system - Support for no-op when sending torrents to debrid --- docs/docs/configuration/index.md | 3 +- docs/docs/features/repair-worker.md | 2 - internal/config/config.go | 2 +- pkg/arr/arr.go | 6 +- pkg/debrid/debrid.go | 5 +- pkg/debrid/providers/alldebrid/alldebrid.go | 11 +- .../providers/debrid_link/debrid_link.go | 9 +- pkg/debrid/providers/realdebrid/misc.go | 1 + pkg/debrid/providers/realdebrid/realdebrid.go | 76 ++++----- pkg/debrid/providers/torbox/torbox.go | 10 +- pkg/debrid/store/download_link.go | 9 +- pkg/debrid/store/repair.go | 25 +-- pkg/debrid/types/client.go | 2 +- pkg/qbit/context.go | 2 +- pkg/qbit/http.go | 14 +- pkg/qbit/torrent.go | 22 +-- pkg/qbit/types.go | 11 -- pkg/repair/repair.go | 11 -- pkg/store/downloader.go | 11 +- pkg/store/misc.go | 2 +- pkg/store/request.go | 151 ++++++++++++------ pkg/store/torrent.go | 131 ++++++++++----- pkg/store/torrent_storage.go | 98 +++--------- pkg/store/types.go | 88 ++++++++++ pkg/web/api.go | 9 +- pkg/web/templates/config.html | 20 +-- pkg/web/templates/download.html | 31 ++-- pkg/webdav/file.go | 21 +-- 28 files changed, 428 insertions(+), 355 deletions(-) create mode 100644 pkg/debrid/providers/realdebrid/misc.go create mode 100644 pkg/store/types.go diff --git a/docs/docs/configuration/index.md b/docs/docs/configuration/index.md index 77b156b..3215985 100644 --- a/docs/docs/configuration/index.md +++ b/docs/docs/configuration/index.md @@ -23,8 +23,7 @@ Here's a minimal configuration to get started: }, "repair": { "enabled": false, - "interval": "12h", - "run_on_start": false + "interval": "12h" }, "use_auth": false, "log_level": "info" diff --git a/docs/docs/features/repair-worker.md b/docs/docs/features/repair-worker.md index 3d62400..a291822 100644 --- a/docs/docs/features/repair-worker.md +++ b/docs/docs/features/repair-worker.md @@ -19,7 +19,6 @@ To enable and configure the Repair Worker, add the following to your `config.jso "repair": { "enabled": true, "interval": "12h", - "run_on_start": false, "use_webdav": false, "zurg_url": "http://localhost:9999", "auto_process": true @@ -30,7 +29,6 @@ To enable and configure the Repair Worker, add the following to your `config.jso - `enabled`: Set to `true` to enable the Repair Worker. - `interval`: The time interval for the Repair Worker to run (e.g., `12h`, `1d`). -- `run_on_start`: If set to `true`, the Repair Worker will run immediately after Decypharr starts. - `use_webdav`: If set to `true`, the Repair Worker will use WebDAV for file operations. - `zurg_url`: The URL for the Zurg service (if using). - `auto_process`: If set to `true`, the Repair Worker will automatically process files that it finds issues with. diff --git a/internal/config/config.go b/internal/config/config.go index d84401d..329531e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -53,12 +53,12 @@ type Arr struct { Cleanup bool `json:"cleanup,omitempty"` SkipRepair bool `json:"skip_repair,omitempty"` DownloadUncached *bool `json:"download_uncached,omitempty"` + SelectedDebrid string `json:"selected_debrid,omitempty"` } type Repair struct { Enabled bool `json:"enabled,omitempty"` Interval string `json:"interval,omitempty"` - RunOnStart bool `json:"run_on_start,omitempty"` ZurgURL string `json:"zurg_url,omitempty"` AutoProcess bool `json:"auto_process,omitempty"` UseWebDav bool `json:"use_webdav,omitempty"` diff --git a/pkg/arr/arr.go b/pkg/arr/arr.go index 4b90efd..c9ee2b2 100644 --- a/pkg/arr/arr.go +++ b/pkg/arr/arr.go @@ -34,10 +34,11 @@ type Arr struct { Cleanup bool `json:"cleanup"` SkipRepair bool `json:"skip_repair"` DownloadUncached *bool `json:"download_uncached"` + SelectedDebrid string `json:"selected_debrid,omitempty"` // The debrid service selected for this arr client *request.Client } -func New(name, host, token string, cleanup, skipRepair bool, downloadUncached *bool) *Arr { +func New(name, host, token string, cleanup, skipRepair bool, downloadUncached *bool, selectedDebrid string) *Arr { return &Arr{ Name: name, Host: host, @@ -47,6 +48,7 @@ func New(name, host, token string, cleanup, skipRepair bool, downloadUncached *b SkipRepair: skipRepair, DownloadUncached: downloadUncached, client: request.New(), + SelectedDebrid: selectedDebrid, } } @@ -145,7 +147,7 @@ func NewStorage() *Storage { arrs := make(map[string]*Arr) for _, a := range config.Get().Arrs { name := a.Name - arrs[name] = New(name, a.Host, a.Token, a.Cleanup, a.SkipRepair, a.DownloadUncached) + arrs[name] = New(name, a.Host, a.Token, a.Cleanup, a.SkipRepair, a.DownloadUncached, a.SelectedDebrid) } return &Storage{ Arrs: arrs, diff --git a/pkg/debrid/debrid.go b/pkg/debrid/debrid.go index 98d3ae5..e7762ae 100644 --- a/pkg/debrid/debrid.go +++ b/pkg/debrid/debrid.go @@ -158,7 +158,7 @@ func createDebridClient(dc config.Debrid) (types.Client, error) { } } -func Process(ctx context.Context, store *Storage, selectedDebrid string, magnet *utils.Magnet, a *arr.Arr, isSymlink, overrideDownloadUncached bool) (*types.Torrent, error) { +func Process(ctx context.Context, store *Storage, selectedDebrid string, magnet *utils.Magnet, a *arr.Arr, action string, overrideDownloadUncached bool) (*types.Torrent, error) { debridTorrent := &types.Torrent{ InfoHash: magnet.InfoHash, @@ -200,6 +200,7 @@ func Process(ctx context.Context, store *Storage, selectedDebrid string, magnet Str("Arr", a.Name). Str("Hash", debridTorrent.InfoHash). Str("Name", debridTorrent.Name). + Str("Action", action). Msg("Processing torrent") if !overrideDownloadUncached && a.DownloadUncached == nil { @@ -215,7 +216,7 @@ func Process(ctx context.Context, store *Storage, selectedDebrid string, magnet _logger.Info().Str("id", dbt.Id).Msgf("Torrent: %s submitted to %s", dbt.Name, db.Name()) store.lastUsed = index - torrent, err := db.CheckStatus(dbt, isSymlink) + torrent, err := db.CheckStatus(dbt) if err != nil && torrent != nil && torrent.Id != "" { // Delete the torrent if it was not downloaded go func(id string) { diff --git a/pkg/debrid/providers/alldebrid/alldebrid.go b/pkg/debrid/providers/alldebrid/alldebrid.go index e635992..c41748f 100644 --- a/pkg/debrid/providers/alldebrid/alldebrid.go +++ b/pkg/debrid/providers/alldebrid/alldebrid.go @@ -259,7 +259,7 @@ func (ad *AllDebrid) UpdateTorrent(t *types.Torrent) error { return nil } -func (ad *AllDebrid) CheckStatus(torrent *types.Torrent, isSymlink bool) (*types.Torrent, error) { +func (ad *AllDebrid) CheckStatus(torrent *types.Torrent) (*types.Torrent, error) { for { err := ad.UpdateTorrent(torrent) @@ -269,13 +269,7 @@ func (ad *AllDebrid) CheckStatus(torrent *types.Torrent, isSymlink bool) (*types status := torrent.Status if status == "downloaded" { ad.logger.Info().Msgf("Torrent: %s downloaded", torrent.Name) - if !isSymlink { - - if err = ad.GetFileDownloadLinks(torrent); err != nil { - return torrent, err - } - } - break + return torrent, nil } else if utils.Contains(ad.GetDownloadingStatus(), status) { if !torrent.DownloadUncached { return torrent, fmt.Errorf("torrent: %s not cached", torrent.Name) @@ -288,7 +282,6 @@ func (ad *AllDebrid) CheckStatus(torrent *types.Torrent, isSymlink bool) (*types } } - return torrent, nil } func (ad *AllDebrid) DeleteTorrent(torrentId string) error { diff --git a/pkg/debrid/providers/debrid_link/debrid_link.go b/pkg/debrid/providers/debrid_link/debrid_link.go index a109495..98df6d1 100644 --- a/pkg/debrid/providers/debrid_link/debrid_link.go +++ b/pkg/debrid/providers/debrid_link/debrid_link.go @@ -316,7 +316,7 @@ func (dl *DebridLink) SubmitMagnet(t *types.Torrent) (*types.Torrent, error) { return t, nil } -func (dl *DebridLink) CheckStatus(torrent *types.Torrent, isSymlink bool) (*types.Torrent, error) { +func (dl *DebridLink) CheckStatus(torrent *types.Torrent) (*types.Torrent, error) { for { err := dl.UpdateTorrent(torrent) if err != nil || torrent == nil { @@ -325,11 +325,7 @@ func (dl *DebridLink) CheckStatus(torrent *types.Torrent, isSymlink bool) (*type status := torrent.Status if status == "downloaded" { dl.logger.Info().Msgf("Torrent: %s downloaded", torrent.Name) - - if err = dl.GetFileDownloadLinks(torrent); err != nil { - return torrent, err - } - break + return torrent, nil } else if utils.Contains(dl.GetDownloadingStatus(), status) { if !torrent.DownloadUncached { return torrent, fmt.Errorf("torrent: %s not cached", torrent.Name) @@ -342,7 +338,6 @@ func (dl *DebridLink) CheckStatus(torrent *types.Torrent, isSymlink bool) (*type } } - return torrent, nil } func (dl *DebridLink) DeleteTorrent(torrentId string) error { diff --git a/pkg/debrid/providers/realdebrid/misc.go b/pkg/debrid/providers/realdebrid/misc.go new file mode 100644 index 0000000..c127ea7 --- /dev/null +++ b/pkg/debrid/providers/realdebrid/misc.go @@ -0,0 +1 @@ +package realdebrid diff --git a/pkg/debrid/providers/realdebrid/realdebrid.go b/pkg/debrid/providers/realdebrid/realdebrid.go index ff102fa..28354c4 100644 --- a/pkg/debrid/providers/realdebrid/realdebrid.go +++ b/pkg/debrid/providers/realdebrid/realdebrid.go @@ -468,7 +468,7 @@ func (r *RealDebrid) UpdateTorrent(t *types.Torrent) error { return nil } -func (r *RealDebrid) CheckStatus(t *types.Torrent, isSymlink bool) (*types.Torrent, error) { +func (r *RealDebrid) CheckStatus(t *types.Torrent) (*types.Torrent, error) { url := fmt.Sprintf("%s/torrents/info/%s", r.Host, t.Id) req, _ := http.NewRequest(http.MethodGet, url, nil) for { @@ -525,12 +525,7 @@ func (r *RealDebrid) CheckStatus(t *types.Torrent, isSymlink bool) (*types.Torre } r.logger.Info().Msgf("Torrent: %s downloaded to RD", t.Name) - if !isSymlink { - if err = r.GetFileDownloadLinks(t); err != nil { - return t, err - } - } - break + return t, nil } else if utils.Contains(r.GetDownloadingStatus(), status) { if !t.DownloadUncached { return t, fmt.Errorf("torrent: %s not cached", t.Name) @@ -541,7 +536,6 @@ func (r *RealDebrid) CheckStatus(t *types.Torrent, isSymlink bool) (*types.Torre } } - return t, nil } func (r *RealDebrid) DeleteTorrent(torrentId string) error { @@ -555,63 +549,55 @@ func (r *RealDebrid) DeleteTorrent(torrentId string) error { } func (r *RealDebrid) GetFileDownloadLinks(t *types.Torrent) error { - filesCh := make(chan types.File, len(t.Files)) - errCh := make(chan error, len(t.Files)) - linksCh := make(chan *types.DownloadLink) - var wg sync.WaitGroup - wg.Add(len(t.Files)) - for _, f := range t.Files { + var mu sync.Mutex + var firstErr error + + files := make(map[string]types.File) + links := make(map[string]*types.DownloadLink) + + _files := t.GetFiles() + wg.Add(len(_files)) + + for _, f := range _files { go func(file types.File) { defer wg.Done() link, err := r.GetDownloadLink(t, &file) if err != nil { - errCh <- err + mu.Lock() + if firstErr == nil { + firstErr = err + } + mu.Unlock() return } if link == nil { - errCh <- fmt.Errorf("realdebrid API error: download link not found for file %s", file.Name) + mu.Lock() + if firstErr == nil { + firstErr = fmt.Errorf("realdebrid API error: download link not found for file %s", file.Name) + } + mu.Unlock() return } - linksCh <- link + file.DownloadLink = link - filesCh <- file + + mu.Lock() + files[file.Name] = file + links[link.Link] = link + mu.Unlock() }(f) } - go func() { - wg.Wait() - close(filesCh) - close(linksCh) - close(errCh) - }() + wg.Wait() - // Collect results - files := make(map[string]types.File, len(t.Files)) - for file := range filesCh { - files[file.Name] = file - } - - // Collect download links - links := make(map[string]*types.DownloadLink) - for link := range linksCh { - if link == nil { - continue - } - links[link.Link] = link + if firstErr != nil { + return firstErr } // Add links to cache r.accounts.SetDownloadLinks(links) - - // Check for errors - for err := range errCh { - if err != nil { - return err // Return the first error encountered - } - } - t.Files = files return nil } diff --git a/pkg/debrid/providers/torbox/torbox.go b/pkg/debrid/providers/torbox/torbox.go index 8d7e2ef..7d346a1 100644 --- a/pkg/debrid/providers/torbox/torbox.go +++ b/pkg/debrid/providers/torbox/torbox.go @@ -312,7 +312,7 @@ func (tb *Torbox) UpdateTorrent(t *types.Torrent) error { return nil } -func (tb *Torbox) CheckStatus(torrent *types.Torrent, isSymlink bool) (*types.Torrent, error) { +func (tb *Torbox) CheckStatus(torrent *types.Torrent) (*types.Torrent, error) { for { err := tb.UpdateTorrent(torrent) @@ -322,12 +322,7 @@ func (tb *Torbox) CheckStatus(torrent *types.Torrent, isSymlink bool) (*types.To status := torrent.Status if status == "downloaded" { tb.logger.Info().Msgf("Torrent: %s downloaded", torrent.Name) - if !isSymlink { - if err = tb.GetFileDownloadLinks(torrent); err != nil { - return torrent, err - } - } - break + return torrent, nil } else if utils.Contains(tb.GetDownloadingStatus(), status) { if !torrent.DownloadUncached { return torrent, fmt.Errorf("torrent: %s not cached", torrent.Name) @@ -340,7 +335,6 @@ func (tb *Torbox) CheckStatus(torrent *types.Torrent, isSymlink bool) (*types.To } } - return torrent, nil } func (tb *Torbox) DeleteTorrent(torrentId string) error { diff --git a/pkg/debrid/store/download_link.go b/pkg/debrid/store/download_link.go index 19c57d6..0f73891 100644 --- a/pkg/debrid/store/download_link.go +++ b/pkg/debrid/store/download_link.go @@ -34,8 +34,6 @@ func (c *Cache) GetDownloadLink(torrentName, filename, fileLink string) (string, // Check link cache if dl, err := c.checkDownloadLink(fileLink); dl != "" && err == nil { return dl, nil - } else { - c.logger.Trace().Msgf("Download link check failed: %v", err) } if req, inFlight := c.downloadLinkRequests.Load(fileLink); inFlight { @@ -54,6 +52,13 @@ func (c *Cache) GetDownloadLink(torrentName, filename, fileLink string) (string, c.downloadLinkRequests.Delete(fileLink) return "", err } + + if dl == nil || dl.DownloadLink == "" { + err = fmt.Errorf("download link is empty for %s in torrent %s", filename, torrentName) + req.Complete("", err) + c.downloadLinkRequests.Delete(fileLink) + return "", err + } req.Complete(dl.DownloadLink, err) c.downloadLinkRequests.Delete(fileLink) return dl.DownloadLink, err diff --git a/pkg/debrid/store/repair.go b/pkg/debrid/store/repair.go index 201109e..53cd0ab 100644 --- a/pkg/debrid/store/repair.go +++ b/pkg/debrid/store/repair.go @@ -90,18 +90,25 @@ func (c *Cache) GetBrokenFiles(t *CachedTorrent, filenames []string) []string { files = t.Files + var wg sync.WaitGroup + + wg.Add(len(files)) + for _, f := range files { // Check if file link is still missing - if f.Link == "" { - brokenFiles = append(brokenFiles, f.Name) - } else { - // Check if file.Link not in the downloadLink Cache - if err := c.client.CheckLink(f.Link); err != nil { - if errors.Is(err, utils.HosterUnavailableError) { - brokenFiles = append(brokenFiles, f.Name) + go func(f types.File) { + defer wg.Done() + if f.Link == "" { + brokenFiles = append(brokenFiles, f.Name) + } else { + // Check if file.Link not in the downloadLink Cache + if err := c.client.CheckLink(f.Link); err != nil { + if errors.Is(err, utils.HosterUnavailableError) { + brokenFiles = append(brokenFiles, f.Name) + } } } - } + }(f) } // Try to reinsert the torrent if it's broken @@ -202,7 +209,7 @@ func (c *Cache) reInsertTorrent(ct *CachedTorrent) (*CachedTorrent, error) { return ct, fmt.Errorf("failed to submit magnet: empty torrent") } newTorrent.DownloadUncached = false // Set to false, avoid re-downloading - newTorrent, err = c.client.CheckStatus(newTorrent, true) + newTorrent, err = c.client.CheckStatus(newTorrent) if err != nil { if newTorrent != nil && newTorrent.Id != "" { // Delete the torrent if it was not downloaded diff --git a/pkg/debrid/types/client.go b/pkg/debrid/types/client.go index 8dfef25..fdcc2cd 100644 --- a/pkg/debrid/types/client.go +++ b/pkg/debrid/types/client.go @@ -6,7 +6,7 @@ import ( type Client interface { SubmitMagnet(tr *Torrent) (*Torrent, error) - CheckStatus(tr *Torrent, isSymlink bool) (*Torrent, error) + CheckStatus(tr *Torrent) (*Torrent, error) GetFileDownloadLinks(tr *Torrent) error GetDownloadLink(tr *Torrent, file *File) (*DownloadLink, error) DeleteTorrent(torrentId string) error diff --git a/pkg/qbit/context.go b/pkg/qbit/context.go index e6b941a..e68f6a2 100644 --- a/pkg/qbit/context.go +++ b/pkg/qbit/context.go @@ -87,7 +87,7 @@ func (q *QBit) authContext(next http.Handler) http.Handler { a := arrs.Get(category) if a == nil { downloadUncached := false - a = arr.New(category, "", "", false, false, &downloadUncached) + a = arr.New(category, "", "", false, false, &downloadUncached, "") } if err == nil { host = strings.TrimSpace(host) diff --git a/pkg/qbit/http.go b/pkg/qbit/http.go index f87ea58..a73468a 100644 --- a/pkg/qbit/http.go +++ b/pkg/qbit/http.go @@ -88,12 +88,15 @@ func (q *QBit) handleTorrentsAdd(w http.ResponseWriter, r *http.Request) { return } - isSymlink := strings.ToLower(r.FormValue("sequentialDownload")) != "true" + action := "symlink" + if strings.ToLower(r.FormValue("sequentialDownload")) != "true" { + action = "download" + } debridName := r.FormValue("debrid") category := r.FormValue("category") _arr := getArr(ctx) if _arr == nil { - _arr = arr.New(category, "", "", false, false, nil) + _arr = arr.New(category, "", "", false, false, nil, "") } atleastOne := false @@ -104,7 +107,7 @@ func (q *QBit) handleTorrentsAdd(w http.ResponseWriter, r *http.Request) { urlList = append(urlList, strings.TrimSpace(u)) } for _, url := range urlList { - if err := q.addMagnet(ctx, url, _arr, debridName, isSymlink); err != nil { + if err := q.addMagnet(ctx, url, _arr, debridName, action); err != nil { q.logger.Error().Err(err).Msgf("Error adding magnet") http.Error(w, err.Error(), http.StatusBadRequest) return @@ -117,7 +120,7 @@ func (q *QBit) handleTorrentsAdd(w http.ResponseWriter, r *http.Request) { if r.MultipartForm != nil && r.MultipartForm.File != nil { if files := r.MultipartForm.File["torrents"]; len(files) > 0 { for _, fileHeader := range files { - if err := q.addTorrent(ctx, fileHeader, _arr, debridName, isSymlink); err != nil { + if err := q.addTorrent(ctx, fileHeader, _arr, debridName, action); err != nil { q.logger.Error().Err(err).Msgf("Error adding torrent") http.Error(w, err.Error(), http.StatusBadRequest) return @@ -241,8 +244,7 @@ func (q *QBit) handleTorrentFiles(w http.ResponseWriter, r *http.Request) { if torrent == nil { return } - files := q.getTorrentFiles(torrent) - request.JSONResponse(w, files, http.StatusOK) + request.JSONResponse(w, torrent.Files, http.StatusOK) } func (q *QBit) handleSetCategory(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/qbit/torrent.go b/pkg/qbit/torrent.go index c3f37de..fb40586 100644 --- a/pkg/qbit/torrent.go +++ b/pkg/qbit/torrent.go @@ -13,14 +13,14 @@ import ( ) // All torrent-related helpers goes here -func (q *QBit) addMagnet(ctx context.Context, url string, arr *arr.Arr, debrid string, isSymlink bool) error { +func (q *QBit) addMagnet(ctx context.Context, url string, arr *arr.Arr, debrid string, action string) error { magnet, err := utils.GetMagnetFromUrl(url) if err != nil { return fmt.Errorf("error parsing magnet link: %w", err) } _store := store.Get() - importReq := store.NewImportRequest(debrid, q.DownloadFolder, magnet, arr, isSymlink, false, "", store.ImportTypeQBitTorrent) + importReq := store.NewImportRequest(debrid, q.DownloadFolder, magnet, arr, action, false, "", store.ImportTypeQBitTorrent) err = _store.AddTorrent(ctx, importReq) if err != nil { @@ -29,7 +29,7 @@ func (q *QBit) addMagnet(ctx context.Context, url string, arr *arr.Arr, debrid s return nil } -func (q *QBit) addTorrent(ctx context.Context, fileHeader *multipart.FileHeader, arr *arr.Arr, debrid string, isSymlink bool) error { +func (q *QBit) addTorrent(ctx context.Context, fileHeader *multipart.FileHeader, arr *arr.Arr, debrid string, action string) error { file, _ := fileHeader.Open() defer file.Close() var reader io.Reader = file @@ -38,7 +38,7 @@ func (q *QBit) addTorrent(ctx context.Context, fileHeader *multipart.FileHeader, return fmt.Errorf("error reading file: %s \n %w", fileHeader.Filename, err) } _store := store.Get() - importReq := store.NewImportRequest(debrid, q.DownloadFolder, magnet, arr, isSymlink, false, "", store.ImportTypeQBitTorrent) + importReq := store.NewImportRequest(debrid, q.DownloadFolder, magnet, arr, action, false, "", store.ImportTypeQBitTorrent) err = _store.AddTorrent(ctx, importReq) if err != nil { return fmt.Errorf("failed to process torrent: %w", err) @@ -83,20 +83,6 @@ func (q *QBit) GetTorrentProperties(t *store.Torrent) *TorrentProperties { } } -func (q *QBit) getTorrentFiles(t *store.Torrent) []*TorrentFile { - files := make([]*TorrentFile, 0) - if t.DebridTorrent == nil { - return files - } - for _, file := range t.DebridTorrent.GetFiles() { - files = append(files, &TorrentFile{ - Name: file.Path, - Size: file.Size, - }) - } - return files -} - func (q *QBit) setTorrentTags(t *store.Torrent, tags []string) bool { torrentTags := strings.Split(t.Tags, ",") for _, tag := range tags { diff --git a/pkg/qbit/types.go b/pkg/qbit/types.go index 719e75a..8e95508 100644 --- a/pkg/qbit/types.go +++ b/pkg/qbit/types.go @@ -202,17 +202,6 @@ type TorrentProperties struct { UpSpeedAvg int `json:"up_speed_avg,omitempty"` } -type TorrentFile struct { - Index int `json:"index,omitempty"` - Name string `json:"name,omitempty"` - Size int64 `json:"size,omitempty"` - Progress int `json:"progress,omitempty"` - Priority int `json:"priority,omitempty"` - IsSeed bool `json:"is_seed,omitempty"` - PieceRange []int `json:"piece_range,omitempty"` - Availability float64 `json:"availability,omitempty"` -} - func getAppPreferences() *AppPreferences { preferences := &AppPreferences{ AddTrackers: "", diff --git a/pkg/repair/repair.go b/pkg/repair/repair.go index 9a6daf8..fa5d98f 100644 --- a/pkg/repair/repair.go +++ b/pkg/repair/repair.go @@ -32,7 +32,6 @@ type Repair struct { arrs *arr.Storage deb *debrid.Storage interval string - runOnStart bool ZurgURL string IsZurg bool useWebdav bool @@ -86,7 +85,6 @@ func New(arrs *arr.Storage, engine *debrid.Storage) *Repair { arrs: arrs, logger: logger.New("repair"), interval: cfg.Repair.Interval, - runOnStart: cfg.Repair.RunOnStart, ZurgURL: cfg.Repair.ZurgURL, useWebdav: cfg.Repair.UseWebDav, autoProcess: cfg.Repair.AutoProcess, @@ -121,15 +119,6 @@ func (r *Repair) Reset() { } func (r *Repair) Start(ctx context.Context) error { - //r.ctx = ctx - if r.runOnStart { - r.logger.Info().Msgf("Running initial repair") - go func() { - if err := r.AddJob([]string{}, []string{}, r.autoProcess, true); err != nil { - r.logger.Error().Err(err).Msg("Error running initial repair") - } - }() - } r.scheduler, _ = gocron.NewScheduler(gocron.WithLocation(time.Local)) diff --git a/pkg/store/downloader.go b/pkg/store/downloader.go index dc2244c..8510863 100644 --- a/pkg/store/downloader.go +++ b/pkg/store/downloader.go @@ -56,8 +56,7 @@ Loop: return resp.Err() } -func (s *Store) processDownload(torrent *Torrent) (string, error) { - debridTorrent := torrent.DebridTorrent +func (s *Store) processDownload(torrent *Torrent, debridTorrent *types.Torrent) (string, error) { s.logger.Info().Msgf("Downloading %d files...", len(debridTorrent.Files)) torrentPath := filepath.Join(torrent.SavePath, utils.RemoveExtension(debridTorrent.OriginalFilename)) torrentPath = utils.RemoveInvalidChars(torrentPath) @@ -66,12 +65,11 @@ func (s *Store) processDownload(torrent *Torrent) (string, error) { // add the previous error to the error and return return "", fmt.Errorf("failed to create directory: %s: %v", torrentPath, err) } - s.downloadFiles(torrent, torrentPath) + s.downloadFiles(torrent, debridTorrent, torrentPath) return torrentPath, nil } -func (s *Store) downloadFiles(torrent *Torrent, parent string) { - debridTorrent := torrent.DebridTorrent +func (s *Store) downloadFiles(torrent *Torrent, debridTorrent *types.Torrent, parent string) { var wg sync.WaitGroup totalSize := int64(0) @@ -151,8 +149,7 @@ func (s *Store) downloadFiles(torrent *Torrent, parent string) { s.logger.Info().Msgf("Downloaded all files for %s", debridTorrent.Name) } -func (s *Store) processSymlink(torrent *Torrent) (string, error) { - debridTorrent := torrent.DebridTorrent +func (s *Store) processSymlink(torrent *Torrent, debridTorrent *types.Torrent) (string, error) { files := debridTorrent.Files if len(files) == 0 { return "", fmt.Errorf("no video files found") diff --git a/pkg/store/misc.go b/pkg/store/misc.go index 630e269..892815f 100644 --- a/pkg/store/misc.go +++ b/pkg/store/misc.go @@ -10,7 +10,7 @@ func createTorrentFromMagnet(req *ImportRequest) *Torrent { magnet := req.Magnet arrName := req.Arr.Name torrent := &Torrent{ - ID: "", + ID: req.Id, Hash: strings.ToLower(magnet.InfoHash), Name: magnet.Name, Size: magnet.Size, diff --git a/pkg/store/request.go b/pkg/store/request.go index 86f0bca..dc1b38f 100644 --- a/pkg/store/request.go +++ b/pkg/store/request.go @@ -2,9 +2,11 @@ package store import ( "bytes" + "cmp" "context" "encoding/json" "fmt" + "github.com/google/uuid" "github.com/sirrobot01/decypharr/internal/request" "github.com/sirrobot01/decypharr/internal/utils" "github.com/sirrobot01/decypharr/pkg/arr" @@ -23,11 +25,12 @@ const ( ) type ImportRequest struct { + Id string `json:"id"` DownloadFolder string `json:"downloadFolder"` SelectedDebrid string `json:"debrid"` Magnet *utils.Magnet `json:"magnet"` Arr *arr.Arr `json:"arr"` - IsSymlink bool `json:"isSymlink"` + Action string `json:"action"` DownloadUncached bool `json:"downloadUncached"` CallBackUrl string `json:"callBackUrl"` @@ -39,14 +42,15 @@ type ImportRequest struct { Async bool `json:"async"` } -func NewImportRequest(debrid string, downloadFolder string, magnet *utils.Magnet, arr *arr.Arr, isSymlink, downloadUncached bool, callBackUrl string, importType ImportType) *ImportRequest { +func NewImportRequest(debrid string, downloadFolder string, magnet *utils.Magnet, arr *arr.Arr, action string, downloadUncached bool, callBackUrl string, importType ImportType) *ImportRequest { return &ImportRequest{ + Id: uuid.New().String(), Status: "started", DownloadFolder: downloadFolder, - SelectedDebrid: debrid, + SelectedDebrid: cmp.Or(arr.SelectedDebrid, debrid), // Use debrid from arr if available Magnet: magnet, Arr: arr, - IsSymlink: isSymlink, + Action: action, DownloadUncached: downloadUncached, CallBackUrl: callBackUrl, Type: importType, @@ -106,21 +110,22 @@ func (i *ImportRequest) markAsCompleted(torrent *Torrent, debridTorrent *debridT } type ImportQueue struct { - queue map[string]chan *ImportRequest // Map to hold queues for different debrid services - mu sync.RWMutex // Mutex to protect access to the queue map - ctx context.Context - cancel context.CancelFunc - capacity int // Capacity of each channel in the queue + queue []*ImportRequest + mu sync.RWMutex + ctx context.Context + cancel context.CancelFunc + cond *sync.Cond // For blocking operations } func NewImportQueue(ctx context.Context, capacity int) *ImportQueue { ctx, cancel := context.WithCancel(ctx) - return &ImportQueue{ - queue: make(map[string]chan *ImportRequest), - ctx: ctx, - cancel: cancel, - capacity: capacity, + iq := &ImportQueue{ + queue: make([]*ImportRequest, 0, capacity), + ctx: ctx, + cancel: cancel, } + iq.cond = sync.NewCond(&iq.mu) + return iq } func (iq *ImportQueue) Push(req *ImportRequest) error { @@ -131,62 +136,104 @@ func (iq *ImportQueue) Push(req *ImportRequest) error { iq.mu.Lock() defer iq.mu.Unlock() - if _, exists := iq.queue[req.SelectedDebrid]; !exists { - iq.queue[req.SelectedDebrid] = make(chan *ImportRequest, iq.capacity) // Create a new channel for the debrid service + select { + case <-iq.ctx.Done(): + return fmt.Errorf("queue is shutting down") + default: } + if len(iq.queue) >= cap(iq.queue) { + return fmt.Errorf("queue is full") + } + + iq.queue = append(iq.queue, req) + iq.cond.Signal() // Wake up any waiting Pop() + return nil +} + +func (iq *ImportQueue) Pop() (*ImportRequest, error) { + iq.mu.Lock() + defer iq.mu.Unlock() + select { - case iq.queue[req.SelectedDebrid] <- req: - return nil case <-iq.ctx.Done(): - return fmt.Errorf("retry queue is shutting down") + return nil, fmt.Errorf("queue is shutting down") + default: } + + if len(iq.queue) == 0 { + return nil, fmt.Errorf("no import requests available") + } + + req := iq.queue[0] + iq.queue = iq.queue[1:] + return req, nil } -func (iq *ImportQueue) TryPop(selectedDebrid string) (*ImportRequest, error) { - iq.mu.RLock() - defer iq.mu.RUnlock() +// Delete specific request by ID +func (iq *ImportQueue) Delete(requestID string) bool { + iq.mu.Lock() + defer iq.mu.Unlock() - if ch, exists := iq.queue[selectedDebrid]; exists { - select { - case req := <-ch: - return req, nil - case <-iq.ctx.Done(): - return nil, fmt.Errorf("queue is shutting down") - default: - return nil, fmt.Errorf("no import request available for %s", selectedDebrid) + for i, req := range iq.queue { + if req.Id == requestID { + // Remove from slice + iq.queue = append(iq.queue[:i], iq.queue[i+1:]...) + return true } } - return nil, fmt.Errorf("no queue exists for %s", selectedDebrid) + return false } -func (iq *ImportQueue) Size(selectedDebrid string) int { +// DeleteWhere requests matching a condition +func (iq *ImportQueue) DeleteWhere(predicate func(*ImportRequest) bool) int { + iq.mu.Lock() + defer iq.mu.Unlock() + + deleted := 0 + for i := len(iq.queue) - 1; i >= 0; i-- { + if predicate(iq.queue[i]) { + iq.queue = append(iq.queue[:i], iq.queue[i+1:]...) + deleted++ + } + } + return deleted +} + +// Find request without removing it +func (iq *ImportQueue) Find(requestID string) *ImportRequest { iq.mu.RLock() defer iq.mu.RUnlock() - if ch, exists := iq.queue[selectedDebrid]; exists { - return len(ch) + for _, req := range iq.queue { + if req.Id == requestID { + return req + } } - return 0 + return nil +} + +func (iq *ImportQueue) Size() int { + iq.mu.RLock() + defer iq.mu.RUnlock() + return len(iq.queue) +} + +func (iq *ImportQueue) IsEmpty() bool { + return iq.Size() == 0 +} + +// List all requests (copy to avoid race conditions) +func (iq *ImportQueue) List() []*ImportRequest { + iq.mu.RLock() + defer iq.mu.RUnlock() + + result := make([]*ImportRequest, len(iq.queue)) + copy(result, iq.queue) + return result } func (iq *ImportQueue) Close() { iq.cancel() - iq.mu.Lock() - defer iq.mu.Unlock() - - for _, ch := range iq.queue { - // Drain remaining items before closing - for { - select { - case <-ch: - // Discard remaining items - default: - close(ch) - goto nextChannel - } - } - nextChannel: - } - iq.queue = make(map[string]chan *ImportRequest) + iq.cond.Broadcast() } diff --git a/pkg/store/torrent.go b/pkg/store/torrent.go index 22798b6..aec09e0 100644 --- a/pkg/store/torrent.go +++ b/pkg/store/torrent.go @@ -16,7 +16,7 @@ import ( func (s *Store) AddTorrent(ctx context.Context, importReq *ImportRequest) error { torrent := createTorrentFromMagnet(importReq) - debridTorrent, err := debridTypes.Process(ctx, s.debrid, importReq.SelectedDebrid, importReq.Magnet, importReq.Arr, importReq.IsSymlink, importReq.DownloadUncached) + debridTorrent, err := debridTypes.Process(ctx, s.debrid, importReq.SelectedDebrid, importReq.Magnet, importReq.Arr, importReq.Action, importReq.DownloadUncached) if err != nil { var httpErr *utils.HTTPError @@ -25,8 +25,8 @@ func (s *Store) AddTorrent(ctx context.Context, importReq *ImportRequest) error case "too_many_active_downloads": // Handle too much active downloads error s.logger.Warn().Msgf("Too many active downloads for %s, adding to queue", importReq.Magnet.Name) - err := s.addToQueue(importReq) - if err != nil { + + if err := s.addToQueue(importReq); err != nil { s.logger.Error().Err(err).Msgf("Failed to add %s to queue", importReq.Magnet.Name) return err } @@ -65,9 +65,9 @@ func (s *Store) addToQueue(importReq *ImportRequest) error { return nil } -func (s *Store) processFromQueue(ctx context.Context, selectedDebrid string) error { +func (s *Store) processFromQueue(ctx context.Context) error { // Pop the next import request from the queue - importReq, err := s.importsQueue.TryPop(selectedDebrid) + importReq, err := s.importsQueue.Pop() if err != nil { return err } @@ -105,10 +105,13 @@ func (s *Store) trackAvailableSlots(ctx context.Context) { availableSlots[name] = slots } + if s.importsQueue.Size() <= 0 { + // Queue is empty, no need to process + return + } + for name, slots := range availableSlots { - if s.importsQueue.Size(name) <= 0 { - continue - } + s.logger.Debug().Msgf("Available slots for %s: %d", name, slots) // If slots are available, process the next import request from the queue for slots > 0 { @@ -116,7 +119,7 @@ func (s *Store) trackAvailableSlots(ctx context.Context) { case <-ctx.Done(): return // Exit if context is done default: - if err := s.processFromQueue(ctx, name); err != nil { + if err := s.processFromQueue(ctx); err != nil { s.logger.Error().Err(err).Msg("Error processing from queue") return // Exit on error } @@ -139,7 +142,7 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp _arr := importReq.Arr for debridTorrent.Status != "downloaded" { s.logger.Debug().Msgf("%s <- (%s) Download Progress: %.2f%%", debridTorrent.Debrid, debridTorrent.Name, debridTorrent.Progress) - dbT, err := client.CheckStatus(debridTorrent, importReq.IsSymlink) + dbT, err := client.CheckStatus(debridTorrent) if err != nil { if dbT != nil && dbT.Id != "" { // Delete the torrent if it was not downloaded @@ -174,17 +177,43 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp // Check if debrid supports webdav by checking cache timer := time.Now() - if importReq.IsSymlink { + + onFailed := func(err error) { + if err != nil { + s.markTorrentAsFailed(torrent) + go func() { + _ = client.DeleteTorrent(debridTorrent.Id) + }() + s.logger.Error().Err(err).Msgf("Error occured while processing torrent %s", debridTorrent.Name) + importReq.markAsFailed(err, torrent, debridTorrent) + return + } + } + + onSuccess := func(torrentSymlinkPath string) { + torrent.TorrentPath = torrentSymlinkPath + s.updateTorrent(torrent, debridTorrent) + s.logger.Info().Msgf("Adding %s took %s", debridTorrent.Name, time.Since(timer)) + + go importReq.markAsCompleted(torrent, debridTorrent) // Mark the import request as completed, send callback if needed + go func() { + if err := request.SendDiscordMessage("download_complete", "success", torrent.discordContext()); err != nil { + s.logger.Error().Msgf("Error sending discord message: %v", err) + } + }() + _arr.Refresh() + } + + switch importReq.Action { + case "symlink": + // Symlink action, we will create a symlink to the torrent + s.logger.Debug().Msgf("Post-Download Action: Symlink") cache := deb.Cache() if cache != nil { s.logger.Info().Msgf("Using internal webdav for %s", debridTorrent.Debrid) - // Use webdav to download the file - if err := cache.Add(debridTorrent); err != nil { - s.logger.Error().Msgf("Error adding torrent to cache: %v", err) - s.markTorrentAsFailed(torrent) - importReq.markAsFailed(err, torrent, debridTorrent) + onFailed(err) return } @@ -194,31 +223,45 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp } else { // User is using either zurg or debrid webdav - torrentSymlinkPath, err = s.processSymlink(torrent) // /mnt/symlinks/{category}/MyTVShow/ + torrentSymlinkPath, err = s.processSymlink(torrent, debridTorrent) // /mnt/symlinks/{category}/MyTVShow/ } - } else { - torrentSymlinkPath, err = s.processDownload(torrent) - } - if err != nil { - s.markTorrentAsFailed(torrent) - go func() { - _ = client.DeleteTorrent(debridTorrent.Id) - }() - s.logger.Error().Err(err).Msgf("Error occured while processing torrent %s", debridTorrent.Name) - importReq.markAsFailed(err, torrent, debridTorrent) + if err != nil { + onFailed(err) + return + } + if torrentSymlinkPath == "" { + err = fmt.Errorf("symlink path is empty for %s", debridTorrent.Name) + onFailed(err) + } + onSuccess(torrentSymlinkPath) return - } - torrent.TorrentPath = torrentSymlinkPath - s.updateTorrent(torrent, debridTorrent) - s.logger.Info().Msgf("Adding %s took %s", debridTorrent.Name, time.Since(timer)) - - go importReq.markAsCompleted(torrent, debridTorrent) // Mark the import request as completed, send callback if needed - go func() { - if err := request.SendDiscordMessage("download_complete", "success", torrent.discordContext()); err != nil { - s.logger.Error().Msgf("Error sending discord message: %v", err) + case "download": + // Download action, we will download the torrent to the specified folder + // Generate download links + s.logger.Debug().Msgf("Post-Download Action: Download") + if err := client.GetFileDownloadLinks(debridTorrent); err != nil { + onFailed(err) + return } - }() - _arr.Refresh() + s.logger.Debug().Msgf("Download Post-Download Action") + torrentSymlinkPath, err = s.processDownload(torrent, debridTorrent) + if err != nil { + onFailed(err) + return + } + if torrentSymlinkPath == "" { + err = fmt.Errorf("download path is empty for %s", debridTorrent.Name) + onFailed(err) + return + } + onSuccess(torrentSymlinkPath) + case "none": + s.logger.Debug().Msgf("Post-Download Action: None") + // No action, just update the torrent and mark it as completed + onSuccess(torrent.TorrentPath) + default: + // Action is none, do nothing, fallthrough + } } func (s *Store) markTorrentAsFailed(t *Torrent) *Torrent { @@ -253,10 +296,18 @@ func (s *Store) partialTorrentUpdate(t *Torrent, debridTorrent *types.Torrent) * if speed != 0 { eta = int((totalSize - sizeCompleted) / speed) } - t.ID = debridTorrent.Id + files := make([]*File, 0, len(debridTorrent.Files)) + for index, file := range debridTorrent.GetFiles() { + files = append(files, &File{ + Index: index, + Name: file.Path, + Size: file.Size, + }) + } + t.DebridID = debridTorrent.Id t.Name = debridTorrent.Name t.AddedOn = addedOn.Unix() - t.DebridTorrent = debridTorrent + t.Files = files t.Debrid = debridTorrent.Debrid t.Size = totalSize t.Completed = sizeCompleted diff --git a/pkg/store/torrent_storage.go b/pkg/store/torrent_storage.go index e55dfab..2b36ada 100644 --- a/pkg/store/torrent_storage.go +++ b/pkg/store/torrent_storage.go @@ -3,7 +3,6 @@ package store import ( "encoding/json" "fmt" - "github.com/sirrobot01/decypharr/pkg/debrid/types" "os" "sort" "sync" @@ -183,10 +182,18 @@ func (ts *TorrentStorage) Delete(hash, category string, removeFromDebrid bool) { if torrent == nil { return } - if removeFromDebrid && torrent.ID != "" && torrent.Debrid != "" { - dbClient := Get().debrid.Client(torrent.Debrid) + st := Get() + // Check if torrent is queued for download + + if torrent.State == "queued" && torrent.ID != "" { + // Remove the torrent from the import queue if it exists + st.importsQueue.Delete(torrent.ID) + } + + if removeFromDebrid && torrent.DebridID != "" && torrent.Debrid != "" { + dbClient := st.debrid.Client(torrent.Debrid) if dbClient != nil { - _ = dbClient.DeleteTorrent(torrent.ID) + _ = dbClient.DeleteTorrent(torrent.DebridID) } } @@ -212,14 +219,21 @@ func (ts *TorrentStorage) DeleteMultiple(hashes []string, removeFromDebrid bool) defer ts.mu.Unlock() toDelete := make(map[string]string) + st := Get() + for _, hash := range hashes { for key, torrent := range ts.torrents { if torrent == nil { continue } + + if torrent.State == "queued" && torrent.ID != "" { + // Remove the torrent from the import queue if it exists + st.importsQueue.Delete(torrent.ID) + } if torrent.Hash == hash { - if removeFromDebrid && torrent.ID != "" && torrent.Debrid != "" { - toDelete[torrent.ID] = torrent.Debrid + if removeFromDebrid && torrent.DebridID != "" && torrent.Debrid != "" { + toDelete[torrent.DebridID] = torrent.Debrid } delete(ts.torrents, key) if torrent.ContentPath != "" { @@ -238,7 +252,7 @@ func (ts *TorrentStorage) DeleteMultiple(hashes []string, removeFromDebrid bool) } }() - clients := Get().debrid.Clients() + clients := st.debrid.Clients() go func() { for id, debrid := range toDelete { @@ -274,73 +288,3 @@ func (ts *TorrentStorage) Reset() { defer ts.mu.Unlock() ts.torrents = make(Torrents) } - -type Torrent struct { - ID string `json:"id"` - Debrid string `json:"debrid"` - TorrentPath string `json:"-"` - DebridTorrent *types.Torrent `json:"-"` - - AddedOn int64 `json:"added_on,omitempty"` - AmountLeft int64 `json:"amount_left"` - AutoTmm bool `json:"auto_tmm"` - Availability float64 `json:"availability,omitempty"` - Category string `json:"category,omitempty"` - Completed int64 `json:"completed"` - CompletionOn int `json:"completion_on,omitempty"` - ContentPath string `json:"content_path"` - DlLimit int `json:"dl_limit"` - Dlspeed int64 `json:"dlspeed"` - Downloaded int64 `json:"downloaded"` - DownloadedSession int64 `json:"downloaded_session"` - Eta int `json:"eta"` - FlPiecePrio bool `json:"f_l_piece_prio,omitempty"` - ForceStart bool `json:"force_start,omitempty"` - Hash string `json:"hash"` - LastActivity int64 `json:"last_activity,omitempty"` - MagnetUri string `json:"magnet_uri,omitempty"` - MaxRatio int `json:"max_ratio,omitempty"` - MaxSeedingTime int `json:"max_seeding_time,omitempty"` - Name string `json:"name,omitempty"` - NumComplete int `json:"num_complete,omitempty"` - NumIncomplete int `json:"num_incomplete,omitempty"` - NumLeechs int `json:"num_leechs,omitempty"` - NumSeeds int `json:"num_seeds,omitempty"` - Priority int `json:"priority,omitempty"` - Progress float64 `json:"progress"` - Ratio int `json:"ratio,omitempty"` - RatioLimit int `json:"ratio_limit,omitempty"` - SavePath string `json:"save_path"` - SeedingTimeLimit int `json:"seeding_time_limit,omitempty"` - SeenComplete int64 `json:"seen_complete,omitempty"` - SeqDl bool `json:"seq_dl"` - Size int64 `json:"size,omitempty"` - State string `json:"state,omitempty"` - SuperSeeding bool `json:"super_seeding"` - Tags string `json:"tags,omitempty"` - TimeActive int `json:"time_active,omitempty"` - TotalSize int64 `json:"total_size,omitempty"` - Tracker string `json:"tracker,omitempty"` - UpLimit int64 `json:"up_limit,omitempty"` - Uploaded int64 `json:"uploaded,omitempty"` - UploadedSession int64 `json:"uploaded_session,omitempty"` - Upspeed int64 `json:"upspeed,omitempty"` - Source string `json:"source,omitempty"` - - sync.Mutex -} - -func (t *Torrent) IsReady() bool { - return (t.AmountLeft <= 0 || t.Progress == 1) && t.TorrentPath != "" -} - -func (t *Torrent) discordContext() string { - format := ` - **Name:** %s - **Arr:** %s - **Hash:** %s - **MagnetURI:** %s - **Debrid:** %s - ` - return fmt.Sprintf(format, t.Name, t.Category, t.Hash, t.MagnetUri, t.Debrid) -} diff --git a/pkg/store/types.go b/pkg/store/types.go new file mode 100644 index 0000000..ba9200f --- /dev/null +++ b/pkg/store/types.go @@ -0,0 +1,88 @@ +package store + +import ( + "fmt" + "sync" +) + +type File struct { + Index int `json:"index,omitempty"` + Name string `json:"name,omitempty"` + Size int64 `json:"size,omitempty"` + Progress int `json:"progress,omitempty"` + Priority int `json:"priority,omitempty"` + IsSeed bool `json:"is_seed,omitempty"` + PieceRange []int `json:"piece_range,omitempty"` + Availability float64 `json:"availability,omitempty"` +} + +type Torrent struct { + ID string `json:"id"` + DebridID string `json:"debrid_id"` + Debrid string `json:"debrid"` + TorrentPath string `json:"-"` + Files []*File `json:"files,omitempty"` + + AddedOn int64 `json:"added_on,omitempty"` + AmountLeft int64 `json:"amount_left"` + AutoTmm bool `json:"auto_tmm"` + Availability float64 `json:"availability,omitempty"` + Category string `json:"category,omitempty"` + Completed int64 `json:"completed"` + CompletionOn int `json:"completion_on,omitempty"` + ContentPath string `json:"content_path"` + DlLimit int `json:"dl_limit"` + Dlspeed int64 `json:"dlspeed"` + Downloaded int64 `json:"downloaded"` + DownloadedSession int64 `json:"downloaded_session"` + Eta int `json:"eta"` + FlPiecePrio bool `json:"f_l_piece_prio,omitempty"` + ForceStart bool `json:"force_start,omitempty"` + Hash string `json:"hash"` + LastActivity int64 `json:"last_activity,omitempty"` + MagnetUri string `json:"magnet_uri,omitempty"` + MaxRatio int `json:"max_ratio,omitempty"` + MaxSeedingTime int `json:"max_seeding_time,omitempty"` + Name string `json:"name,omitempty"` + NumComplete int `json:"num_complete,omitempty"` + NumIncomplete int `json:"num_incomplete,omitempty"` + NumLeechs int `json:"num_leechs,omitempty"` + NumSeeds int `json:"num_seeds,omitempty"` + Priority int `json:"priority,omitempty"` + Progress float64 `json:"progress"` + Ratio int `json:"ratio,omitempty"` + RatioLimit int `json:"ratio_limit,omitempty"` + SavePath string `json:"save_path"` + SeedingTimeLimit int `json:"seeding_time_limit,omitempty"` + SeenComplete int64 `json:"seen_complete,omitempty"` + SeqDl bool `json:"seq_dl"` + Size int64 `json:"size,omitempty"` + State string `json:"state,omitempty"` + SuperSeeding bool `json:"super_seeding"` + Tags string `json:"tags,omitempty"` + TimeActive int `json:"time_active,omitempty"` + TotalSize int64 `json:"total_size,omitempty"` + Tracker string `json:"tracker,omitempty"` + UpLimit int64 `json:"up_limit,omitempty"` + Uploaded int64 `json:"uploaded,omitempty"` + UploadedSession int64 `json:"uploaded_session,omitempty"` + Upspeed int64 `json:"upspeed,omitempty"` + Source string `json:"source,omitempty"` + + sync.Mutex +} + +func (t *Torrent) IsReady() bool { + return (t.AmountLeft <= 0 || t.Progress == 1) && t.TorrentPath != "" +} + +func (t *Torrent) discordContext() string { + format := ` + **Name:** %s + **Arr:** %s + **Hash:** %s + **MagnetURI:** %s + **Debrid:** %s + ` + return fmt.Sprintf(format, t.Name, t.Category, t.Hash, t.MagnetUri, t.Debrid) +} diff --git a/pkg/web/api.go b/pkg/web/api.go index 9c53e4c..c62adf9 100644 --- a/pkg/web/api.go +++ b/pkg/web/api.go @@ -33,7 +33,7 @@ func (wb *Web) handleAddContent(w http.ResponseWriter, r *http.Request) { errs := make([]string, 0) arrName := r.FormValue("arr") - notSymlink := r.FormValue("notSymlink") == "true" + action := r.FormValue("action") debridName := r.FormValue("debrid") callbackUrl := r.FormValue("callbackUrl") downloadFolder := r.FormValue("downloadFolder") @@ -45,7 +45,7 @@ func (wb *Web) handleAddContent(w http.ResponseWriter, r *http.Request) { _arr := _store.Arr().Get(arrName) if _arr == nil { - _arr = arr.New(arrName, "", "", false, false, &downloadUncached) + _arr = arr.New(arrName, "", "", false, false, &downloadUncached, "") } // Handle URLs @@ -64,7 +64,7 @@ func (wb *Web) handleAddContent(w http.ResponseWriter, r *http.Request) { continue } - importReq := store.NewImportRequest(debridName, downloadFolder, magnet, _arr, !notSymlink, downloadUncached, callbackUrl, store.ImportTypeAPI) + importReq := store.NewImportRequest(debridName, downloadFolder, magnet, _arr, action, downloadUncached, callbackUrl, store.ImportTypeAPI) if err := _store.AddTorrent(ctx, importReq); err != nil { wb.logger.Error().Err(err).Str("url", url).Msg("Failed to add torrent") errs = append(errs, fmt.Sprintf("URL %s: %v", url, err)) @@ -89,7 +89,7 @@ func (wb *Web) handleAddContent(w http.ResponseWriter, r *http.Request) { continue } - importReq := store.NewImportRequest(debridName, downloadFolder, magnet, _arr, !notSymlink, downloadUncached, callbackUrl, store.ImportTypeAPI) + importReq := store.NewImportRequest(debridName, downloadFolder, magnet, _arr, action, downloadUncached, callbackUrl, store.ImportTypeAPI) err = _store.AddTorrent(ctx, importReq) if err != nil { wb.logger.Error().Err(err).Str("file", fileHeader.Filename).Msg("Failed to add torrent") @@ -251,6 +251,7 @@ func (wb *Web) handleUpdateConfig(w http.ResponseWriter, r *http.Request) { Cleanup: a.Cleanup, SkipRepair: a.SkipRepair, DownloadUncached: a.DownloadUncached, + SelectedDebrid: a.SelectedDebrid, }) } currentConfig.Arrs = updatedConfig.Arrs diff --git a/pkg/web/templates/config.html b/pkg/web/templates/config.html index 9760612..341ea71 100644 --- a/pkg/web/templates/config.html +++ b/pkg/web/templates/config.html @@ -279,13 +279,6 @@ Use Internal Webdav for repair(make sure webdav is enabled in the debrid section -
-
- - -
- Run repair on startup -
@@ -650,6 +643,15 @@
+
+ +
@@ -1068,7 +1070,6 @@ repair: { enabled: document.querySelector('[name="repair.enabled"]').checked, interval: document.querySelector('[name="repair.interval"]').value, - run_on_start: document.querySelector('[name="repair.run_on_start"]').checked, zurg_url: document.querySelector('[name="repair.zurg_url"]').value, workers: parseInt(document.querySelector('[name="repair.workers"]').value), use_webdav: document.querySelector('[name="repair.use_webdav"]').checked, @@ -1149,7 +1150,8 @@ token: document.querySelector(`[name="arr[${i}].token"]`).value, cleanup: document.querySelector(`[name="arr[${i}].cleanup"]`).checked, skip_repair: document.querySelector(`[name="arr[${i}].skip_repair"]`).checked, - download_uncached: document.querySelector(`[name="arr[${i}].download_uncached"]`).checked + download_uncached: document.querySelector(`[name="arr[${i}].download_uncached"]`).checked, + selectedDebrid: document.querySelector(`[name="arr[${i}].selected_debrid"]`).value }; if (arr.name && arr.host) { diff --git a/pkg/web/templates/download.html b/pkg/web/templates/download.html index bf3ab58..74e2d14 100644 --- a/pkg/web/templates/download.html +++ b/pkg/web/templates/download.html @@ -18,12 +18,21 @@
-
+
+ + + Choose how to handle the added torrent (Default to symlinks) +
+
Default is your qbittorent download_folder
-
+
Optional, leave empty if not using Arr @@ -45,12 +54,6 @@ {{ end }}
-
-
- - -
-
@@ -74,21 +77,21 @@ document.addEventListener('DOMContentLoaded', () => { const loadSavedDownloadOptions = () => { const savedCategory = localStorage.getItem('downloadCategory'); - const savedSymlink = localStorage.getItem('downloadSymlink'); + const savedAction = localStorage.getItem('downloadAction'); const savedDownloadUncached = localStorage.getItem('downloadUncached'); document.getElementById('arr').value = savedCategory || ''; - document.getElementById('isSymlink').checked = savedSymlink === 'true'; + document.getElementById('downloadAction').value = savedAction || 'symlink'; document.getElementById('downloadUncached').checked = savedDownloadUncached === 'true'; document.getElementById('downloadFolder').value = localStorage.getItem('downloadFolder') || downloadFolder || ''; }; const saveCurrentDownloadOptions = () => { const arr = document.getElementById('arr').value; - const isSymlink = document.getElementById('isSymlink').checked; + const downloadAction = document.getElementById('downloadAction').value; const downloadUncached = document.getElementById('downloadUncached').checked; const downloadFolder = document.getElementById('downloadFolder').value; localStorage.setItem('downloadCategory', arr); - localStorage.setItem('downloadSymlink', isSymlink.toString()); + localStorage.setItem('downloadAction', downloadAction); localStorage.setItem('downloadUncached', downloadUncached.toString()); localStorage.setItem('downloadFolder', downloadFolder); }; @@ -136,7 +139,7 @@ formData.append('arr', document.getElementById('arr').value); formData.append('downloadFolder', document.getElementById('downloadFolder').value); - formData.append('notSymlink', document.getElementById('isSymlink').checked); + formData.append('action', document.getElementById('downloadAction').value); formData.append('downloadUncached', document.getElementById('downloadUncached').checked); formData.append('debrid', document.getElementById('debrid') ? document.getElementById('debrid').value : ''); @@ -168,7 +171,7 @@ // Save the download options to local storage when they change document.getElementById('arr').addEventListener('change', saveCurrentDownloadOptions); - document.getElementById('isSymlink').addEventListener('change', saveCurrentDownloadOptions); + document.getElementById('downloadAction').addEventListener('change', saveCurrentDownloadOptions); // Read the URL parameters for a magnet link and add it to the download queue if found const urlParams = new URLSearchParams(window.location.search); diff --git a/pkg/webdav/file.go b/pkg/webdav/file.go index 2d6bc7a..06612b0 100644 --- a/pkg/webdav/file.go +++ b/pkg/webdav/file.go @@ -127,7 +127,7 @@ func (f *File) stream() (*http.Response, error) { if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { f.downloadLink = "" - cleanupResp := func() { + cleanupResp := func(resp *http.Response) { if resp.Body != nil { _, _ = io.Copy(io.Discard, resp.Body) resp.Body.Close() @@ -138,7 +138,7 @@ func (f *File) stream() (*http.Response, error) { case http.StatusServiceUnavailable: // Read the body to check for specific error messages body, readErr := io.ReadAll(resp.Body) - resp.Body.Close() + cleanupResp(resp) if readErr != nil { _log.Trace().Msgf("Failed to read response body: %v", readErr) @@ -156,10 +156,10 @@ func (f *File) stream() (*http.Response, error) { return nil, fmt.Errorf("service unavailable: %s", bodyStr) case http.StatusNotFound: - cleanupResp() + cleanupResp(resp) // Mark download link as not found // Regenerate a new download link - _log.Trace().Msgf("File not found (404) for %s. Marking link as invalid and regenerating", f.name) + _log.Trace().Msgf("Link not found (404) for %s. Marking link as invalid and regenerating", f.name) f.cache.MarkDownloadLinkAsInvalid(f.link, downloadLink, "link_not_found") // Generate a new download link downloadLink, err := f.getDownloadLink() @@ -191,16 +191,9 @@ func (f *File) stream() (*http.Response, error) { } if newResp.StatusCode != http.StatusOK && newResp.StatusCode != http.StatusPartialContent { - cleanupBody := func() { - if newResp.Body != nil { - _, _ = io.Copy(io.Discard, newResp.Body) - newResp.Body.Close() - } - } - - cleanupBody() + cleanupResp(newResp) _log.Trace().Msgf("Regenerated link also failed with status %d", newResp.StatusCode) - f.cache.MarkDownloadLinkAsInvalid(f.link, downloadLink, "link_not_found") + f.cache.MarkDownloadLinkAsInvalid(f.link, downloadLink, newResp.Status) return nil, fmt.Errorf("failed with status code %d even after link regeneration", newResp.StatusCode) } @@ -208,7 +201,7 @@ func (f *File) stream() (*http.Response, error) { default: body, _ := io.ReadAll(resp.Body) - resp.Body.Close() + cleanupResp(resp) _log.Trace().Msgf("Unexpected status code %d for %s: %s", resp.StatusCode, f.name, string(body)) return nil, fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(body))