- Download Link fix
- reinsert fix
This commit is contained in:
Mukhtar Akere
2025-04-23 16:38:55 +01:00
parent 1a4db69b20
commit 267430e6fb
15 changed files with 358 additions and 236 deletions

View File

@@ -27,3 +27,9 @@ var ErrLinkBroken = &HTTPError{
Message: "File is unavailable", Message: "File is unavailable",
Code: "file_unavailable", Code: "file_unavailable",
} }
var TorrentNotFoundError = &HTTPError{
StatusCode: 404,
Message: "Torrent not found",
Code: "torrent_not_found",
}

View File

@@ -232,12 +232,14 @@ func (ad *AllDebrid) CheckStatus(torrent *types.Torrent, isSymlink bool) (*types
break break
} else if slices.Contains(ad.GetDownloadingStatus(), status) { } else if slices.Contains(ad.GetDownloadingStatus(), status) {
if !torrent.DownloadUncached { if !torrent.DownloadUncached {
_ = ad.DeleteTorrent(torrent.Id)
return torrent, fmt.Errorf("torrent: %s not cached", torrent.Name) return torrent, fmt.Errorf("torrent: %s not cached", torrent.Name)
} }
// Break out of the loop if the torrent is downloading. // Break out of the loop if the torrent is downloading.
// This is necessary to prevent infinite loop since we moved to sync downloading and async processing // This is necessary to prevent infinite loop since we moved to sync downloading and async processing
return torrent, nil return torrent, nil
} else { } else {
_ = ad.DeleteTorrent(torrent.Id)
return torrent, fmt.Errorf("torrent: %s has error", torrent.Name) return torrent, fmt.Errorf("torrent: %s has error", torrent.Name)
} }

View File

@@ -11,7 +11,6 @@ import (
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/sirrobot01/decypharr/internal/config" "github.com/sirrobot01/decypharr/internal/config"
"github.com/sirrobot01/decypharr/internal/logger" "github.com/sirrobot01/decypharr/internal/logger"
"github.com/sirrobot01/decypharr/internal/request"
"github.com/sirrobot01/decypharr/internal/utils" "github.com/sirrobot01/decypharr/internal/utils"
"github.com/sirrobot01/decypharr/pkg/debrid/types" "github.com/sirrobot01/decypharr/pkg/debrid/types"
"os" "os"
@@ -42,8 +41,9 @@ type PropfindResponse struct {
type CachedTorrent struct { type CachedTorrent struct {
*types.Torrent *types.Torrent
AddedOn time.Time `json:"added_on"` AddedOn time.Time `json:"added_on"`
IsComplete bool `json:"is_complete"` IsComplete bool `json:"is_complete"`
DuplicateIds []string `json:"duplicate_ids"`
} }
type downloadLinkCache struct { type downloadLinkCache struct {
@@ -72,7 +72,7 @@ type Cache struct {
client types.Client client types.Client
logger zerolog.Logger logger zerolog.Logger
torrents *xsync.MapOf[string, *CachedTorrent] // key: torrent.Id, value: *CachedTorrent torrents *xsync.MapOf[string, string] // key: torrent.Id, value: {torrent_folder_name}
torrentsNames *xsync.MapOf[string, *CachedTorrent] // key: torrent.Name, value: torrent torrentsNames *xsync.MapOf[string, *CachedTorrent] // key: torrent.Name, value: torrent
listings atomic.Value listings atomic.Value
downloadLinks *xsync.MapOf[string, downloadLinkCache] downloadLinks *xsync.MapOf[string, downloadLinkCache]
@@ -80,15 +80,18 @@ type Cache struct {
PropfindResp *xsync.MapOf[string, PropfindResponse] PropfindResp *xsync.MapOf[string, PropfindResponse]
folderNaming WebDavFolderNaming folderNaming WebDavFolderNaming
// monitors
repairRequest sync.Map
failedToReinsert sync.Map
downloadLinkRequests sync.Map
// repair // repair
repairChan chan RepairRequest repairChan chan RepairRequest
repairsInProgress *xsync.MapOf[string, struct{}]
// config // config
workers int workers int
torrentRefreshInterval string torrentRefreshInterval string
downloadLinksRefreshInterval string downloadLinksRefreshInterval string
autoExpiresLinksAfter string
autoExpiresLinksAfterDuration time.Duration autoExpiresLinksAfterDuration time.Duration
// refresh mutex // refresh mutex
@@ -107,13 +110,13 @@ func New(dc config.Debrid, client types.Client) *Cache {
cet, _ := time.LoadLocation("CET") cet, _ := time.LoadLocation("CET")
s, _ := gocron.NewScheduler(gocron.WithLocation(cet)) s, _ := gocron.NewScheduler(gocron.WithLocation(cet))
autoExpiresLinksAfter, _ := time.ParseDuration(dc.AutoExpireLinksAfter) autoExpiresLinksAfter, err := time.ParseDuration(dc.AutoExpireLinksAfter)
if autoExpiresLinksAfter == 0 { if autoExpiresLinksAfter == 0 || err != nil {
autoExpiresLinksAfter = 24 * time.Hour autoExpiresLinksAfter = 48 * time.Hour
} }
return &Cache{ return &Cache{
dir: path.Join(cfg.Path, "cache", dc.Name), // path to save cache files dir: path.Join(cfg.Path, "cache", dc.Name), // path to save cache files
torrents: xsync.NewMapOf[string, *CachedTorrent](), torrents: xsync.NewMapOf[string, string](),
torrentsNames: xsync.NewMapOf[string, *CachedTorrent](), torrentsNames: xsync.NewMapOf[string, *CachedTorrent](),
invalidDownloadLinks: xsync.NewMapOf[string, string](), invalidDownloadLinks: xsync.NewMapOf[string, string](),
client: client, client: client,
@@ -124,13 +127,10 @@ func New(dc config.Debrid, client types.Client) *Cache {
downloadLinksRefreshInterval: dc.DownloadLinksRefreshInterval, downloadLinksRefreshInterval: dc.DownloadLinksRefreshInterval,
PropfindResp: xsync.NewMapOf[string, PropfindResponse](), PropfindResp: xsync.NewMapOf[string, PropfindResponse](),
folderNaming: WebDavFolderNaming(dc.FolderNaming), folderNaming: WebDavFolderNaming(dc.FolderNaming),
autoExpiresLinksAfter: dc.AutoExpireLinksAfter,
autoExpiresLinksAfterDuration: autoExpiresLinksAfter, autoExpiresLinksAfterDuration: autoExpiresLinksAfter,
repairsInProgress: xsync.NewMapOf[string, struct{}](),
saveSemaphore: make(chan struct{}, 50), saveSemaphore: make(chan struct{}, 50),
ctx: context.Background(), ctx: context.Background(),
scheduler: s,
scheduler: s,
} }
} }
@@ -410,28 +410,28 @@ func (c *Cache) GetTorrentFolder(torrent *types.Torrent) string {
} }
func (c *Cache) setTorrent(t *CachedTorrent) { func (c *Cache) setTorrent(t *CachedTorrent) {
c.torrents.Store(t.Id, t)
torrentKey := c.GetTorrentFolder(t.Torrent) torrentKey := c.GetTorrentFolder(t.Torrent)
if o, ok := c.torrentsNames.Load(torrentKey); ok { if o, ok := c.torrentsNames.Load(torrentKey); ok && t.Id != o.Id {
// If another torrent with the same name exists, merge the files, if the same file exists, // If another torrent with the same name exists, merge the files, if the same file exists,
// keep the one with the most recent added date // keep the one with the most recent added date
mergedFiles := mergeFiles(t, o) mergedFiles := mergeFiles(t, o)
t.Files = mergedFiles t.Files = mergedFiles
} }
c.torrents.Store(t.Id, torrentKey)
c.torrentsNames.Store(torrentKey, t) c.torrentsNames.Store(torrentKey, t)
c.SaveTorrent(t) c.SaveTorrent(t)
} }
func (c *Cache) setTorrents(torrents map[string]*CachedTorrent) { func (c *Cache) setTorrents(torrents map[string]*CachedTorrent) {
for _, t := range torrents { for _, t := range torrents {
c.torrents.Store(t.Id, t)
torrentKey := c.GetTorrentFolder(t.Torrent) torrentKey := c.GetTorrentFolder(t.Torrent)
if o, ok := c.torrentsNames.Load(torrentKey); ok { if o, ok := c.torrentsNames.Load(torrentKey); ok && t.Id != o.Id {
// Save the most recent torrent // Save the most recent torrent
mergedFiles := mergeFiles(t, o) mergedFiles := mergeFiles(t, o)
t.Files = mergedFiles t.Files = mergedFiles
} }
c.torrents.Store(t.Id, torrentKey)
c.torrentsNames.Store(torrentKey, t) c.torrentsNames.Store(torrentKey, t)
} }
@@ -452,20 +452,13 @@ func (c *Cache) Close() error {
func (c *Cache) GetTorrents() map[string]*CachedTorrent { func (c *Cache) GetTorrents() map[string]*CachedTorrent {
torrents := make(map[string]*CachedTorrent) torrents := make(map[string]*CachedTorrent)
c.torrents.Range(func(key string, value *CachedTorrent) bool { c.torrentsNames.Range(func(key string, value *CachedTorrent) bool {
torrents[key] = value torrents[key] = value
return true return true
}) })
return torrents return torrents
} }
func (c *Cache) GetTorrent(id string) *CachedTorrent {
if t, ok := c.torrents.Load(id); ok {
return t
}
return nil
}
func (c *Cache) GetTorrentByName(name string) *CachedTorrent { func (c *Cache) GetTorrentByName(name string) *CachedTorrent {
if t, ok := c.torrentsNames.Load(name); ok { if t, ok := c.torrentsNames.Load(name); ok {
return t return t
@@ -473,8 +466,18 @@ func (c *Cache) GetTorrentByName(name string) *CachedTorrent {
return nil return nil
} }
func (c *Cache) GetTorrent(torrentId string) *CachedTorrent {
if name, ok := c.torrents.Load(torrentId); ok {
if t, ok := c.torrentsNames.Load(name); ok {
return t
}
return nil
}
return nil
}
func (c *Cache) SaveTorrents() { func (c *Cache) SaveTorrents() {
c.torrents.Range(func(key string, value *CachedTorrent) bool { c.torrentsNames.Range(func(key string, value *CachedTorrent) bool {
c.SaveTorrent(value) c.SaveTorrent(value)
return true return true
}) })
@@ -590,93 +593,6 @@ func (c *Cache) ProcessTorrent(t *types.Torrent) error {
return nil return nil
} }
func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) (string, error) {
// Check link cache
if dl := c.checkDownloadLink(fileLink); dl != "" {
return dl, nil
}
ct := c.GetTorrent(torrentId)
if ct == nil {
return "", fmt.Errorf("torrent not found: %s", torrentId)
}
file := ct.Files[filename]
if file.Link == "" {
// file link is empty, refresh the torrent to get restricted links
ct = c.refreshTorrent(ct) // Refresh the torrent from the debrid
if ct == nil {
return "", fmt.Errorf("failed to refresh torrent: %s", torrentId)
} else {
file = ct.Files[filename]
}
}
// If file.Link is still empty, return
if file.Link == "" {
c.logger.Debug().Msgf("File link is empty for %s. Release is probably nerfed", filename)
// Try to reinsert the torrent?
newCt, err := c.reInsertTorrent(ct)
if err != nil {
return "", fmt.Errorf("failed to reinsert torrent: %s. %w", ct.Name, err)
}
ct = newCt
file = ct.Files[filename]
c.logger.Debug().Msgf("Reinserted torrent %s", ct.Name)
}
c.logger.Trace().Msgf("Getting download link for %s(%s)", filename, file.Link)
downloadLink, err := c.client.GetDownloadLink(ct.Torrent, &file)
if err != nil {
if errors.Is(err, request.HosterUnavailableError) {
c.logger.Error().Err(err).Msgf("Hoster is unavailable. Triggering repair for %s", ct.Name)
newCt, err := c.reInsertTorrent(ct)
if err != nil {
return "", fmt.Errorf("failed to reinsert torrent: %w", err)
}
ct = newCt
c.logger.Debug().Msgf("Reinserted torrent %s", ct.Name)
file = ct.Files[filename]
// Retry getting the download link
downloadLink, err = c.client.GetDownloadLink(ct.Torrent, &file)
if err != nil {
return "", err
}
if downloadLink == nil {
return "", fmt.Errorf("download link is empty for %s", file.Link)
}
c.updateDownloadLink(downloadLink)
return "", nil
} else if errors.Is(err, request.TrafficExceededError) {
// This is likely a fair usage limit error
c.logger.Error().Err(err).Msgf("Traffic exceeded for %s", ct.Name)
} else {
return "", fmt.Errorf("failed to get download link: %w", err)
}
}
if downloadLink == nil {
return "", fmt.Errorf("download link is empty for %s", file.Link)
}
c.updateDownloadLink(downloadLink)
return downloadLink.DownloadLink, nil
}
func (c *Cache) GenerateDownloadLinks(t *CachedTorrent) {
if err := c.client.GenerateDownloadLinks(t.Torrent); err != nil {
c.logger.Error().Err(err).Msg("Failed to generate download links")
}
for _, file := range t.Files {
if file.DownloadLink != nil {
c.updateDownloadLink(file.DownloadLink)
}
}
c.SaveTorrent(t)
}
func (c *Cache) AddTorrent(t *types.Torrent) error { func (c *Cache) AddTorrent(t *types.Torrent) error {
if len(t.Files) == 0 { if len(t.Files) == 0 {
if err := c.client.UpdateTorrent(t); err != nil { if err := c.client.UpdateTorrent(t); err != nil {
@@ -699,84 +615,60 @@ func (c *Cache) AddTorrent(t *types.Torrent) error {
} }
func (c *Cache) updateDownloadLink(dl *types.DownloadLink) {
expiresAt, _ := time.ParseDuration(c.autoExpiresLinksAfter)
c.downloadLinks.Store(dl.Link, downloadLinkCache{
Id: dl.Id,
Link: dl.DownloadLink,
ExpiresAt: time.Now().Add(expiresAt),
AccountId: dl.AccountId,
})
}
func (c *Cache) checkDownloadLink(link string) string {
if dl, ok := c.downloadLinks.Load(link); ok {
if dl.ExpiresAt.After(time.Now()) && !c.IsDownloadLinkInvalid(dl.Link) {
return dl.Link
}
}
return ""
}
func (c *Cache) MarkDownloadLinkAsInvalid(link, downloadLink, reason string) {
c.invalidDownloadLinks.Store(downloadLink, reason)
// Remove the download api key from active
if reason == "bandwidth_exceeded" {
if dl, ok := c.downloadLinks.Load(link); ok {
if dl.AccountId != "" && dl.Link == downloadLink {
c.client.DisableAccount(dl.AccountId)
}
}
}
c.removeDownloadLink(link)
}
func (c *Cache) removeDownloadLink(link string) {
if dl, ok := c.downloadLinks.Load(link); ok {
// Delete dl from cache
c.downloadLinks.Delete(link)
// Delete dl from debrid
if dl.Id != "" {
_ = c.client.DeleteDownloadLink(dl.Id)
}
}
}
func (c *Cache) IsDownloadLinkInvalid(downloadLink string) bool {
if reason, ok := c.invalidDownloadLinks.Load(downloadLink); ok {
c.logger.Debug().Msgf("Download link %s is invalid: %s", downloadLink, reason)
return true
}
return false
}
func (c *Cache) GetClient() types.Client { func (c *Cache) GetClient() types.Client {
return c.client return c.client
} }
func (c *Cache) DeleteTorrent(id string) error { func (c *Cache) DeleteTorrent(id string) error {
c.logger.Info().Msgf("Deleting torrent %s", id) c.logger.Info().Msgf("Deleting torrent %s from cache", id)
c.torrentsRefreshMu.Lock() c.torrentsRefreshMu.Lock()
defer c.torrentsRefreshMu.Unlock() defer c.torrentsRefreshMu.Unlock()
if t, ok := c.torrents.Load(id); ok { if c.deleteTorrent(id, true) {
_ = c.client.DeleteTorrent(id) // SKip error handling, we don't care if it fails
c.torrents.Delete(id)
c.torrentsNames.Delete(c.GetTorrentFolder(t.Torrent))
c.removeFromDB(id)
c.RefreshListings(true) c.RefreshListings(true)
c.logger.Info().Msgf("Torrent %s deleted successfully", id)
return nil
} }
return nil return nil
} }
func (c *Cache) deleteTorrent(id string, removeFromDebrid bool) bool {
if torrentName, ok := c.torrents.Load(id); ok {
c.torrents.Delete(id) // Delete from id cache
defer func() {
c.removeFromDB(id)
if removeFromDebrid {
_ = c.client.DeleteTorrent(id) // Skip error handling, we don't care if it fails
}
}() // defer delete from debrid
if t, ok := c.torrentsNames.Load(torrentName); ok {
newFiles := map[string]types.File{}
newId := t.Id
for _, file := range t.Files {
if file.TorrentId != "" && file.TorrentId != id {
newFiles[file.Name] = file
}
}
if len(newFiles) == 0 {
// Delete the torrent since no files are left
c.torrentsNames.Delete(torrentName)
} else {
t.Files = newFiles
t.Id = newId
c.setTorrent(t)
}
}
return true
}
return false
}
func (c *Cache) DeleteTorrents(ids []string) { func (c *Cache) DeleteTorrents(ids []string) {
c.logger.Info().Msgf("Deleting %d torrents", len(ids)) c.logger.Info().Msgf("Deleting %d torrents", len(ids))
for _, id := range ids { for _, id := range ids {
if t, ok := c.torrents.Load(id); ok { _ = c.deleteTorrent(id, true)
c.torrents.Delete(id)
c.torrentsNames.Delete(c.GetTorrentFolder(t.Torrent))
c.removeFromDB(id)
}
} }
c.RefreshListings(true) c.RefreshListings(true)
} }

View File

@@ -0,0 +1,186 @@
package debrid
import (
"errors"
"fmt"
"github.com/sirrobot01/decypharr/internal/request"
"github.com/sirrobot01/decypharr/pkg/debrid/types"
"time"
)
type downloadLinkRequest struct {
result string
err error
done chan struct{}
}
func newDownloadLinkRequest() *downloadLinkRequest {
return &downloadLinkRequest{
done: make(chan struct{}),
}
}
func (r *downloadLinkRequest) Complete(result string, err error) {
r.result = result
r.err = err
close(r.done)
}
func (r *downloadLinkRequest) Wait() (string, error) {
<-r.done
return r.result, r.err
}
func (c *Cache) GetDownloadLink(torrentName, filename, fileLink string) (string, error) {
// Check link cache
if dl := c.checkDownloadLink(fileLink); dl != "" {
return dl, nil
}
if req, inFlight := c.downloadLinkRequests.Load(fileLink); inFlight {
// Wait for the other request to complete and use its result
result := req.(*downloadLinkRequest)
return result.Wait()
}
// Create a new request object
req := newDownloadLinkRequest()
c.downloadLinkRequests.Store(fileLink, req)
downloadLink, err := c.fetchDownloadLink(torrentName, filename, fileLink)
// Complete the request and remove it from the map
req.Complete(downloadLink, err)
c.downloadLinkRequests.Delete(fileLink)
return downloadLink, err
}
func (c *Cache) fetchDownloadLink(torrentName, filename, fileLink string) (string, error) {
ct := c.GetTorrentByName(torrentName)
if ct == nil {
return "", fmt.Errorf("torrent not found: %s", torrentName)
}
file := ct.Files[filename]
if file.Link == "" {
// file link is empty, refresh the torrent to get restricted links
ct = c.refreshTorrent(ct) // Refresh the torrent from the debrid
if ct == nil {
return "", fmt.Errorf("failed to refresh torrent: %s", torrentName)
} else {
file = ct.Files[filename]
}
}
// If file.Link is still empty, return
if file.Link == "" {
c.logger.Debug().Msgf("File link is empty for %s. Release is probably nerfed", filename)
// Try to reinsert the torrent?
newCt, err := c.reInsertTorrent(ct)
if err != nil {
return "", fmt.Errorf("failed to reinsert torrent: %s. %w", ct.Name, err)
}
ct = newCt
file = ct.Files[filename]
c.logger.Debug().Str("name", ct.Name).Str("id", ct.Id).Msgf("Reinserted torrent")
}
c.logger.Trace().Msgf("Getting download link for %s(%s)", filename, file.Link)
downloadLink, err := c.client.GetDownloadLink(ct.Torrent, &file)
if err != nil {
if errors.Is(err, request.HosterUnavailableError) {
newCt, err := c.reInsertTorrent(ct)
if err != nil {
return "", fmt.Errorf("failed to reinsert torrent: %w", err)
}
ct = newCt
file = ct.Files[filename]
c.logger.Debug().Str("name", ct.Name).Str("id", ct.Id).Msgf("Reinserted torrent")
// Retry getting the download link
downloadLink, err = c.client.GetDownloadLink(ct.Torrent, &file)
if err != nil {
return "", err
}
if downloadLink == nil {
return "", fmt.Errorf("download link is empty for %s", file.Link)
}
c.updateDownloadLink(downloadLink)
return "", nil
} else if errors.Is(err, request.TrafficExceededError) {
// This is likely a fair usage limit error
c.logger.Error().Err(err).Msgf("Traffic exceeded for %s", ct.Name)
} else {
return "", fmt.Errorf("failed to get download link: %w", err)
}
}
if downloadLink == nil {
return "", fmt.Errorf("download link is empty for %s", file.Link)
}
c.updateDownloadLink(downloadLink)
return downloadLink.DownloadLink, nil
}
func (c *Cache) GenerateDownloadLinks(t *CachedTorrent) {
if err := c.client.GenerateDownloadLinks(t.Torrent); err != nil {
c.logger.Error().Err(err).Msg("Failed to generate download links")
}
for _, file := range t.Files {
if file.DownloadLink != nil {
c.updateDownloadLink(file.DownloadLink)
}
}
c.SaveTorrent(t)
}
func (c *Cache) updateDownloadLink(dl *types.DownloadLink) {
c.downloadLinks.Store(dl.Link, downloadLinkCache{
Id: dl.Id,
Link: dl.DownloadLink,
ExpiresAt: time.Now().Add(c.autoExpiresLinksAfterDuration),
AccountId: dl.AccountId,
})
}
func (c *Cache) checkDownloadLink(link string) string {
if dl, ok := c.downloadLinks.Load(link); ok {
if dl.ExpiresAt.After(time.Now()) && !c.IsDownloadLinkInvalid(dl.Link) {
return dl.Link
}
}
return ""
}
func (c *Cache) MarkDownloadLinkAsInvalid(link, downloadLink, reason string) {
c.invalidDownloadLinks.Store(downloadLink, reason)
// Remove the download api key from active
if reason == "bandwidth_exceeded" {
if dl, ok := c.downloadLinks.Load(link); ok {
if dl.AccountId != "" && dl.Link == downloadLink {
c.client.DisableAccount(dl.AccountId)
}
}
}
c.removeDownloadLink(link)
}
func (c *Cache) removeDownloadLink(link string) {
if dl, ok := c.downloadLinks.Load(link); ok {
// Delete dl from cache
c.downloadLinks.Delete(link)
// Delete dl from debrid
if dl.Id != "" {
_ = c.client.DeleteDownloadLink(dl.Id)
}
}
}
func (c *Cache) IsDownloadLinkInvalid(downloadLink string) bool {
if reason, ok := c.invalidDownloadLinks.Load(downloadLink); ok {
c.logger.Debug().Msgf("Download link %s is invalid: %s", downloadLink, reason)
return true
}
return false
}

View File

@@ -1,8 +1,10 @@
package debrid package debrid
import ( import (
"errors"
"fmt" "fmt"
"github.com/sirrobot01/decypharr/internal/config" "github.com/sirrobot01/decypharr/internal/config"
"github.com/sirrobot01/decypharr/internal/request"
"github.com/sirrobot01/decypharr/internal/utils" "github.com/sirrobot01/decypharr/internal/utils"
"github.com/sirrobot01/decypharr/pkg/debrid/types" "github.com/sirrobot01/decypharr/pkg/debrid/types"
"io" "io"
@@ -207,7 +209,16 @@ func (c *Cache) refreshTorrent(t *CachedTorrent) *CachedTorrent {
_torrent := t.Torrent _torrent := t.Torrent
err := c.client.UpdateTorrent(_torrent) err := c.client.UpdateTorrent(_torrent)
if err != nil { if err != nil {
c.logger.Debug().Msgf("Failed to get torrent files for %s: %v", t.Id, err) if errors.Is(err, request.TorrentNotFoundError) {
c.logger.Trace().Msgf("Torrent %s not found. Removing from cache", _torrent.Id)
err := c.DeleteTorrent(_torrent.Id)
if err != nil {
c.logger.Error().Err(err).Msgf("Failed to delete torrent %s from cache", _torrent.Id)
return nil
}
return nil
}
c.logger.Debug().Err(err).Msgf("Failed to get torrent files for %s", t.Id)
return nil return nil
} }
if len(t.Files) == 0 { if len(t.Files) == 0 {

View File

@@ -12,6 +12,29 @@ import (
"time" "time"
) )
type reInsertRequest struct {
result *CachedTorrent
err error
done chan struct{}
}
func newReInsertRequest() *reInsertRequest {
return &reInsertRequest{
done: make(chan struct{}),
}
}
func (r *reInsertRequest) Complete(result *CachedTorrent, err error) {
r.result = result
r.err = err
close(r.done)
}
func (r *reInsertRequest) Wait() (*CachedTorrent, error) {
<-r.done
return r.result, r.err
}
func (c *Cache) IsTorrentBroken(t *CachedTorrent, filenames []string) bool { func (c *Cache) IsTorrentBroken(t *CachedTorrent, filenames []string) bool {
// Check torrent files // Check torrent files
@@ -80,8 +103,8 @@ func (c *Cache) repairWorker() {
c.logger.Debug().Str("torrentId", req.TorrentID).Msg("Received repair request") c.logger.Debug().Str("torrentId", req.TorrentID).Msg("Received repair request")
// Get the torrent from the cache // Get the torrent from the cache
cachedTorrent, ok := c.torrents.Load(torrentId) cachedTorrent := c.GetTorrent(torrentId)
if !ok || cachedTorrent == nil { if cachedTorrent == nil {
c.logger.Warn().Str("torrentId", torrentId).Msg("Torrent not found in cache") c.logger.Warn().Str("torrentId", torrentId).Msg("Torrent not found in cache")
continue continue
} }
@@ -107,11 +130,21 @@ func (c *Cache) reInsertTorrent(ct *CachedTorrent) (*CachedTorrent, error) {
// Check if Magnet is not empty, if empty, reconstruct the magnet // Check if Magnet is not empty, if empty, reconstruct the magnet
torrent := ct.Torrent torrent := ct.Torrent
oldID := torrent.Id // Store the old ID oldID := torrent.Id // Store the old ID
if _, ok := c.repairsInProgress.Load(oldID); ok { if _, ok := c.failedToReinsert.Load(oldID); ok {
return ct, fmt.Errorf("repair already in progress for torrent %s", torrent.Id) return ct, fmt.Errorf("can't retry re-insert for %s", torrent.Id)
} }
c.repairsInProgress.Store(oldID, struct{}{}) if reqI, inFlight := c.repairRequest.Load(oldID); inFlight {
defer c.repairsInProgress.Delete(oldID) req := reqI.(*reInsertRequest)
c.logger.Debug().Msgf("Waiting for existing reinsert request to complete for torrent %s", oldID)
return req.Wait()
}
req := newReInsertRequest()
c.repairRequest.Store(oldID, req)
// Make sure we clean up even if there's a panic
defer func() {
c.repairRequest.Delete(oldID)
}()
if torrent.Magnet == nil { if torrent.Magnet == nil {
torrent.Magnet = utils.ConstructMagnet(torrent.InfoHash, torrent.Name) torrent.Magnet = utils.ConstructMagnet(torrent.InfoHash, torrent.Name)
@@ -122,24 +155,24 @@ func (c *Cache) reInsertTorrent(ct *CachedTorrent) (*CachedTorrent, error) {
var err error var err error
torrent, err = c.client.SubmitMagnet(torrent) torrent, err = c.client.SubmitMagnet(torrent)
if err != nil { if err != nil {
c.failedToReinsert.Store(oldID, struct{}{})
// Remove the old torrent from the cache and debrid service // Remove the old torrent from the cache and debrid service
return ct, fmt.Errorf("failed to submit magnet: %w", err) return ct, fmt.Errorf("failed to submit magnet: %w", err)
} }
// Check if the torrent was submitted // Check if the torrent was submitted
if torrent == nil || torrent.Id == "" { if torrent == nil || torrent.Id == "" {
c.failedToReinsert.Store(oldID, struct{}{})
return ct, fmt.Errorf("failed to submit magnet: empty torrent") return ct, fmt.Errorf("failed to submit magnet: empty torrent")
} }
torrent.DownloadUncached = false // Set to false, avoid re-downloading torrent.DownloadUncached = false // Set to false, avoid re-downloading
torrent, err = c.client.CheckStatus(torrent, true) torrent, err = c.client.CheckStatus(torrent, true)
if err != nil && torrent != nil { if err != nil && torrent != nil {
// Torrent is likely uncached, delete it c.failedToReinsert.Store(oldID, struct{}{})
if err := c.client.DeleteTorrent(torrent.Id); err != nil {
c.logger.Error().Err(err).Str("torrentId", torrent.Id).Msg("Failed to delete torrent")
} // Delete the newly added un-cached torrent
return ct, fmt.Errorf("failed to check status: %w", err) return ct, fmt.Errorf("failed to check status: %w", err)
} }
if torrent == nil { if torrent == nil {
c.failedToReinsert.Store(oldID, struct{}{})
return ct, fmt.Errorf("failed to check status: empty torrent") return ct, fmt.Errorf("failed to check status: empty torrent")
} }
@@ -150,18 +183,10 @@ func (c *Cache) reInsertTorrent(ct *CachedTorrent) (*CachedTorrent, error) {
} }
for _, f := range torrent.Files { for _, f := range torrent.Files {
if f.Link == "" { if f.Link == "" {
// Delete the new torrent c.failedToReinsert.Store(oldID, struct{}{})
_ = c.DeleteTorrent(torrent.Id)
return ct, fmt.Errorf("failed to reinsert torrent: empty link") return ct, fmt.Errorf("failed to reinsert torrent: empty link")
} }
} }
// We can safely delete the old torrent here
if oldID != "" {
if err := c.DeleteTorrent(oldID); err != nil {
return ct, fmt.Errorf("failed to delete old torrent: %w", err)
}
}
ct = &CachedTorrent{ ct = &CachedTorrent{
Torrent: torrent, Torrent: torrent,
AddedOn: addedOn, AddedOn: addedOn,
@@ -169,7 +194,16 @@ func (c *Cache) reInsertTorrent(ct *CachedTorrent) (*CachedTorrent, error) {
} }
c.setTorrent(ct) c.setTorrent(ct)
c.RefreshListings(true) c.RefreshListings(true)
c.logger.Debug().Str("torrentId", torrent.Id).Msg("Reinserted torrent")
// We can safely delete the old torrent here
if oldID != "" {
if err := c.DeleteTorrent(oldID); err != nil {
return ct, fmt.Errorf("failed to delete old torrent: %w", err)
}
}
req.Complete(ct, err)
c.failedToReinsert.Delete(oldID) // Delete the old torrent from the failed list
return ct, nil return ct, nil
} }

View File

@@ -64,7 +64,7 @@ func (c *Cache) cleanupWorker() {
} }
deletedTorrents := make([]string, 0) deletedTorrents := make([]string, 0)
c.torrents.Range(func(key string, _ *CachedTorrent) bool { c.torrents.Range(func(key string, _ string) bool {
if _, exists := idStore[key]; !exists { if _, exists := idStore[key]; !exists {
deletedTorrents = append(deletedTorrents, key) deletedTorrents = append(deletedTorrents, key)
} }

View File

@@ -227,12 +227,14 @@ func (dl *DebridLink) CheckStatus(torrent *types.Torrent, isSymlink bool) (*type
break break
} else if slices.Contains(dl.GetDownloadingStatus(), status) { } else if slices.Contains(dl.GetDownloadingStatus(), status) {
if !torrent.DownloadUncached { if !torrent.DownloadUncached {
_ = dl.DeleteTorrent(torrent.Id)
return torrent, fmt.Errorf("torrent: %s not cached", torrent.Name) return torrent, fmt.Errorf("torrent: %s not cached", torrent.Name)
} }
// Break out of the loop if the torrent is downloading. // Break out of the loop if the torrent is downloading.
// This is necessary to prevent infinite loop since we moved to sync downloading and async processing // This is necessary to prevent infinite loop since we moved to sync downloading and async processing
return torrent, nil return torrent, nil
} else { } else {
_ = dl.DeleteTorrent(torrent.Id)
return torrent, fmt.Errorf("torrent: %s has error", torrent.Name) return torrent, fmt.Errorf("torrent: %s has error", torrent.Name)
} }

View File

@@ -266,12 +266,23 @@ func (r *RealDebrid) addMagnet(t *types.Torrent) (*types.Torrent, error) {
func (r *RealDebrid) UpdateTorrent(t *types.Torrent) error { func (r *RealDebrid) UpdateTorrent(t *types.Torrent) error {
url := fmt.Sprintf("%s/torrents/info/%s", r.Host, t.Id) url := fmt.Sprintf("%s/torrents/info/%s", r.Host, t.Id)
req, _ := http.NewRequest(http.MethodGet, url, nil) req, _ := http.NewRequest(http.MethodGet, url, nil)
resp, err := r.client.MakeRequest(req) resp, err := r.client.Do(req)
if err != nil { if err != nil {
return err return err
} }
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("reading response body: %w", err)
}
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
return request.TorrentNotFoundError
}
return fmt.Errorf("realdebrid API error: Status: %d || Body: %s", resp.StatusCode, string(bodyBytes))
}
var data torrentInfo var data torrentInfo
err = json.Unmarshal(resp, &data) err = json.Unmarshal(bodyBytes, &data)
if err != nil { if err != nil {
return err return err
} }
@@ -348,10 +359,12 @@ func (r *RealDebrid) CheckStatus(t *types.Torrent, isSymlink bool) (*types.Torre
break break
} else if slices.Contains(r.GetDownloadingStatus(), status) { } else if slices.Contains(r.GetDownloadingStatus(), status) {
if !t.DownloadUncached { if !t.DownloadUncached {
_ = r.DeleteTorrent(t.Id)
return t, fmt.Errorf("torrent: %s not cached", t.Name) return t, fmt.Errorf("torrent: %s not cached", t.Name)
} }
return t, nil return t, nil
} else { } else {
_ = r.DeleteTorrent(t.Id)
return t, fmt.Errorf("torrent: %s has error: %s", t.Name, status) return t, fmt.Errorf("torrent: %s has error: %s", t.Name, status)
} }

View File

@@ -259,12 +259,14 @@ func (tb *Torbox) CheckStatus(torrent *types.Torrent, isSymlink bool) (*types.To
break break
} else if slices.Contains(tb.GetDownloadingStatus(), status) { } else if slices.Contains(tb.GetDownloadingStatus(), status) {
if !torrent.DownloadUncached { if !torrent.DownloadUncached {
_ = tb.DeleteTorrent(torrent.Id)
return torrent, fmt.Errorf("torrent: %s not cached", torrent.Name) return torrent, fmt.Errorf("torrent: %s not cached", torrent.Name)
} }
// Break out of the loop if the torrent is downloading. // Break out of the loop if the torrent is downloading.
// This is necessary to prevent infinite loop since we moved to sync downloading and async processing // This is necessary to prevent infinite loop since we moved to sync downloading and async processing
return torrent, nil return torrent, nil
} else { } else {
_ = tb.DeleteTorrent(torrent.Id)
return torrent, fmt.Errorf("torrent: %s has error", torrent.Name) return torrent, fmt.Errorf("torrent: %s has error", torrent.Name)
} }

View File

@@ -1,7 +1,6 @@
package qbit package qbit
import ( import (
"fmt"
"github.com/sirrobot01/decypharr/internal/utils" "github.com/sirrobot01/decypharr/internal/utils"
"github.com/sirrobot01/decypharr/pkg/debrid/debrid" "github.com/sirrobot01/decypharr/pkg/debrid/debrid"
"github.com/sirrobot01/decypharr/pkg/service" "github.com/sirrobot01/decypharr/pkg/service"
@@ -71,16 +70,7 @@ func (i *ImportRequest) Process(q *QBit) (err error) {
svc := service.GetService() svc := service.GetService()
torrent := createTorrentFromMagnet(i.Magnet, i.Arr.Name, "manual") torrent := createTorrentFromMagnet(i.Magnet, i.Arr.Name, "manual")
debridTorrent, err := debrid.ProcessTorrent(svc.Debrid, i.Magnet, i.Arr, i.IsSymlink, i.DownloadUncached) debridTorrent, err := debrid.ProcessTorrent(svc.Debrid, i.Magnet, i.Arr, i.IsSymlink, i.DownloadUncached)
if err != nil || debridTorrent == nil { if err != nil {
if debridTorrent != nil {
dbClient := service.GetDebrid().GetClient(debridTorrent.Debrid)
go func() {
_ = dbClient.DeleteTorrent(debridTorrent.Id)
}()
}
if err == nil {
err = fmt.Errorf("failed to process torrent")
}
return err return err
} }
torrent = q.UpdateTorrentMin(torrent, debridTorrent) torrent = q.UpdateTorrentMin(torrent, debridTorrent)

View File

@@ -58,12 +58,6 @@ func (q *QBit) Process(ctx context.Context, magnet *utils.Magnet, category strin
isSymlink := ctx.Value("isSymlink").(bool) isSymlink := ctx.Value("isSymlink").(bool)
debridTorrent, err := db.ProcessTorrent(svc.Debrid, magnet, a, isSymlink, false) debridTorrent, err := db.ProcessTorrent(svc.Debrid, magnet, a, isSymlink, false)
if err != nil || debridTorrent == nil { if err != nil || debridTorrent == nil {
if debridTorrent != nil {
dbClient := service.GetDebrid().GetClient(debridTorrent.Debrid)
go func() {
_ = dbClient.DeleteTorrent(debridTorrent.Id)
}()
}
if err == nil { if err == nil {
err = fmt.Errorf("failed to process torrent") err = fmt.Errorf("failed to process torrent")
} }
@@ -83,12 +77,6 @@ func (q *QBit) ProcessFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr
dbT, err := client.CheckStatus(debridTorrent, isSymlink) dbT, err := client.CheckStatus(debridTorrent, isSymlink)
if err != nil { if err != nil {
q.logger.Error().Msgf("Error checking status: %v", err) q.logger.Error().Msgf("Error checking status: %v", err)
go func() {
err := client.DeleteTorrent(debridTorrent.Id)
if err != nil {
q.logger.Error().Msgf("Error deleting torrent: %v", err)
}
}()
q.MarkAsFailed(torrent) q.MarkAsFailed(torrent)
if err := arr.Refresh(); err != nil { if err := arr.Refresh(); err != nil {
q.logger.Error().Msgf("Error refreshing arr: %v", err) q.logger.Error().Msgf("Error refreshing arr: %v", err)
@@ -141,10 +129,7 @@ func (q *QBit) ProcessFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr
if err != nil { if err != nil {
q.MarkAsFailed(torrent) q.MarkAsFailed(torrent)
go func() { go func() {
err := client.DeleteTorrent(debridTorrent.Id) _ = client.DeleteTorrent(debridTorrent.Id)
if err != nil {
q.logger.Error().Msgf("Error deleting torrent: %v", err)
}
}() }()
q.logger.Info().Msgf("Error: %v", err) q.logger.Info().Msgf("Error: %v", err)
return return

View File

@@ -27,9 +27,9 @@ var sharedClient = &http.Client{
} }
type File struct { type File struct {
cache *debrid.Cache cache *debrid.Cache
fileId string fileId string
torrentId string torrentName string
modTime time.Time modTime time.Time
@@ -63,7 +63,7 @@ func (f *File) getDownloadLink() (string, error) {
if f.downloadLink != "" && isValidURL(f.downloadLink) { if f.downloadLink != "" && isValidURL(f.downloadLink) {
return f.downloadLink, nil return f.downloadLink, nil
} }
downloadLink, err := f.cache.GetDownloadLink(f.torrentId, f.name, f.link) downloadLink, err := f.cache.GetDownloadLink(f.torrentName, f.name, f.link)
if err != nil { if err != nil {
return "", err return "", err
} }
@@ -84,6 +84,7 @@ func (f *File) stream() (*http.Response, error) {
downloadLink, err = f.getDownloadLink() downloadLink, err = f.getDownloadLink()
if err != nil { if err != nil {
_log.Trace().Msgf("Failed to get download link for %s. %s", f.name, err) _log.Trace().Msgf("Failed to get download link for %s. %s", f.name, err)
return nil, io.EOF return nil, io.EOF
} }

View File

@@ -2,7 +2,6 @@ package webdav
import ( import (
"bytes" "bytes"
"cmp"
"context" "context"
"fmt" "fmt"
"github.com/rs/zerolog" "github.com/rs/zerolog"
@@ -171,7 +170,7 @@ func (h *Handler) OpenFile(ctx context.Context, name string, flag int, perm os.F
// Torrent folder level // Torrent folder level
return &File{ return &File{
cache: h.cache, cache: h.cache,
torrentId: cachedTorrent.Id, torrentName: torrentName,
isDir: true, isDir: true,
children: h.getFileInfos(cachedTorrent.Torrent), children: h.getFileInfos(cachedTorrent.Torrent),
name: cachedTorrent.Name, name: cachedTorrent.Name,
@@ -186,7 +185,7 @@ func (h *Handler) OpenFile(ctx context.Context, name string, flag int, perm os.F
if file, ok := cachedTorrent.Files[filename]; ok { if file, ok := cachedTorrent.Files[filename]; ok {
return &File{ return &File{
cache: h.cache, cache: h.cache,
torrentId: cmp.Or(file.TorrentId, cachedTorrent.Id), torrentName: torrentName,
fileId: file.Id, fileId: file.Id,
isDir: false, isDir: false,
name: file.Name, name: file.Name,

View File

@@ -65,7 +65,6 @@ func cleanUpQueues() {
if !a.Cleanup { if !a.Cleanup {
continue continue
} }
_logger.Trace().Msgf("Cleaning up queue for %s", a.Name)
if err := a.CleanupQueue(); err != nil { if err := a.CleanupQueue(); err != nil {
_logger.Error().Err(err).Msg("Error cleaning up queue") _logger.Error().Err(err).Msg("Error cleaning up queue")
} }