diff --git a/doc/config.full.json b/doc/config.full.json index 8ef74a3..f3eddeb 100644 --- a/doc/config.full.json +++ b/doc/config.full.json @@ -95,6 +95,8 @@ "webdav": { "torrents_refresh_interval": "15s", "download_links_refresh_interval": "1h", + "folder_naming": "original", + "auto_expire_links_after": "24h", "rc_url": "http://192.168.0.219:9990", "rc_user": "your_rclone_rc_user", "rc_pass": "your_rclone_rc_pass" diff --git a/internal/config/config.go b/internal/config/config.go index 042877a..d8a8554 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -31,6 +31,7 @@ type Debrid struct { DownloadLinksRefreshInterval string `json:"downloads_refresh_interval"` TorrentRefreshWorkers int `json:"torrent_refresh_workers"` WebDavFolderNaming string `json:"webdav_folder_naming"` + AutoExpireLinksAfter string `json:"auto_expire_links_after"` } type Proxy struct { @@ -67,6 +68,7 @@ type Repair struct { RunOnStart bool `json:"run_on_start"` ZurgURL string `json:"zurg_url"` AutoProcess bool `json:"auto_process"` + UseWebDav bool `json:"use_webdav"` } type Auth struct { @@ -78,6 +80,7 @@ type WebDav struct { TorrentsRefreshInterval string `json:"torrents_refresh_interval"` DownloadLinksRefreshInterval string `json:"download_links_refresh_interval"` Workers int `json:"workers"` + AutoExpireLinksAfter string `json:"auto_expire_links_after"` // Folder FolderNaming string `json:"folder_naming"` @@ -322,5 +325,8 @@ func (c *Config) GetDebridWebDav(d Debrid) Debrid { if d.WebDavFolderNaming == "" { d.WebDavFolderNaming = cmp.Or(c.WebDav.FolderNaming, "original_no_ext") } + if d.AutoExpireLinksAfter == "" { + d.AutoExpireLinksAfter = cmp.Or(c.WebDav.AutoExpireLinksAfter, "24h") + } return d } diff --git a/internal/request/errors.go b/internal/request/errors.go new file mode 100644 index 0000000..37ad6e4 --- /dev/null +++ b/internal/request/errors.go @@ -0,0 +1,23 @@ +package request + +type HTTPError struct { + StatusCode int + Message string + Code string +} + +func (e *HTTPError) Error() string { + return e.Message +} + +var HosterUnavailableError = &HTTPError{ + StatusCode: 503, + Message: "Hoster is unavailable", + Code: "hoster_unavailable", +} + +var ErrLinkBroken = &HTTPError{ + StatusCode: 404, + Message: "File is unavailable", + Code: "file_unavailable", +} diff --git a/internal/utils/magnet.go b/internal/utils/magnet.go index accad90..17d14bc 100644 --- a/internal/utils/magnet.go +++ b/internal/utils/magnet.go @@ -233,3 +233,15 @@ func GetInfohashFromURL(url string) (string, error) { infoHash := hash.HexString() return infoHash, nil } + +func ConstructMagnet(infoHash, name string) *Magnet { + // Create a magnet link from the infohash and name + name = url.QueryEscape(strings.TrimSpace(name)) + magnetUri := fmt.Sprintf("magnet:?xt=urn:btih:%s&dn=%s", infoHash, name) + return &Magnet{ + InfoHash: infoHash, + Name: name, + Size: 0, + Link: magnetUri, + } +} diff --git a/pkg/debrid/alldebrid/alldebrid.go b/pkg/debrid/alldebrid/alldebrid.go index a24b63d..5ff8339 100644 --- a/pkg/debrid/alldebrid/alldebrid.go +++ b/pkg/debrid/alldebrid/alldebrid.go @@ -309,6 +309,14 @@ func (ad *AllDebrid) GetDownloadUncached() bool { return ad.DownloadUncached } +func (ad *AllDebrid) CheckLink(link string) error { + return nil +} + +func (ad *AllDebrid) GetMountPath() string { + return ad.MountPath +} + func New(dc config.Debrid) *AllDebrid { rl := request.ParseRateLimit(dc.RateLimit) headers := map[string]string{ diff --git a/pkg/debrid/debrid/cache.go b/pkg/debrid/debrid/cache.go index fdabe2d..7f5c207 100644 --- a/pkg/debrid/debrid/cache.go +++ b/pkg/debrid/debrid/cache.go @@ -3,12 +3,14 @@ package debrid import ( "bufio" "context" + "errors" "fmt" "github.com/goccy/go-json" "github.com/puzpuzpuz/xsync/v3" "github.com/rs/zerolog" "github.com/sirrobot01/debrid-blackhole/internal/config" "github.com/sirrobot01/debrid-blackhole/internal/logger" + "github.com/sirrobot01/debrid-blackhole/internal/request" "github.com/sirrobot01/debrid-blackhole/internal/utils" "github.com/sirrobot01/debrid-blackhole/pkg/debrid/types" "os" @@ -39,31 +41,43 @@ type CachedTorrent struct { IsComplete bool `json:"is_complete"` } +type downloadLinkCache struct { + Link string + ExpiresAt time.Time +} + +type RepairRequest struct { + TorrentID string + Priority int + FileName string +} + type Cache struct { dir string client types.Client logger zerolog.Logger - torrents map[string]*CachedTorrent // key: torrent.Id, value: *CachedTorrent - torrentsNames map[string]*CachedTorrent // key: torrent.Name, value: torrent + torrents *xsync.MapOf[string, *CachedTorrent] // key: torrent.Id, value: *CachedTorrent + torrentsNames *xsync.MapOf[string, *CachedTorrent] // key: torrent.Name, value: torrent listings atomic.Value - downloadLinks map[string]string // key: file.Link, value: download link + downloadLinks *xsync.MapOf[string, downloadLinkCache] PropfindResp *xsync.MapOf[string, PropfindResponse] folderNaming WebDavFolderNaming + // repair + repairChan chan RepairRequest + repairsInProgress *xsync.MapOf[string, bool] + // config workers int torrentRefreshInterval time.Duration downloadLinksRefreshInterval time.Duration + autoExpiresLinksAfter time.Duration // refresh mutex listingRefreshMu sync.RWMutex // for refreshing torrents downloadLinksRefreshMu sync.RWMutex // for refreshing download links torrentsRefreshMu sync.RWMutex // for refreshing torrents - - // Data Mutexes - torrentsMutex sync.RWMutex // for torrents and torrentsNames - downloadLinksMutex sync.Mutex // for downloadLinks } func NewCache(dc config.Debrid, client types.Client) *Cache { @@ -76,37 +90,41 @@ func NewCache(dc config.Debrid, client types.Client) *Cache { if err != nil { downloadLinksRefreshInterval = time.Minute * 40 } + autoExpiresLinksAfter, err := time.ParseDuration(dc.AutoExpireLinksAfter) + if err != nil { + autoExpiresLinksAfter = time.Hour * 24 + } return &Cache{ dir: filepath.Join(cfg.Path, "cache", dc.Name), // path to save cache files - torrents: make(map[string]*CachedTorrent), - torrentsNames: make(map[string]*CachedTorrent), + torrents: xsync.NewMapOf[string, *CachedTorrent](), + torrentsNames: xsync.NewMapOf[string, *CachedTorrent](), client: client, logger: logger.NewLogger(fmt.Sprintf("%s-cache", client.GetName())), workers: 200, - downloadLinks: make(map[string]string), + downloadLinks: xsync.NewMapOf[string, downloadLinkCache](), torrentRefreshInterval: torrentRefreshInterval, downloadLinksRefreshInterval: downloadLinksRefreshInterval, PropfindResp: xsync.NewMapOf[string, PropfindResponse](), folderNaming: WebDavFolderNaming(dc.WebDavFolderNaming), + autoExpiresLinksAfter: autoExpiresLinksAfter, + repairsInProgress: xsync.NewMapOf[string, bool](), } } func (c *Cache) GetTorrentFolder(torrent *types.Torrent) string { - folderName := torrent.Name + folderName := torrent.Filename if c.folderNaming == WebDavUseID { folderName = torrent.Id } else if c.folderNaming == WebDavUseOriginalNameNoExt { - folderName = utils.RemoveExtension(torrent.Name) + folderName = utils.RemoveExtension(folderName) } return folderName } func (c *Cache) setTorrent(t *CachedTorrent) { - c.torrentsMutex.Lock() - c.torrents[t.Id] = t + c.torrents.Store(t.Id, t) - c.torrentsNames[c.GetTorrentFolder(t.Torrent)] = t - c.torrentsMutex.Unlock() + c.torrentsNames.Store(c.GetTorrentFolder(t.Torrent), t) go func() { if err := c.SaveTorrent(t); err != nil { @@ -116,14 +134,11 @@ func (c *Cache) setTorrent(t *CachedTorrent) { } func (c *Cache) setTorrents(torrents map[string]*CachedTorrent) { - c.torrentsMutex.Lock() for _, t := range torrents { - c.torrents[t.Id] = t - c.torrentsNames[c.GetTorrentFolder(t.Torrent)] = t + c.torrents.Store(t.Id, t) + c.torrentsNames.Store(c.GetTorrentFolder(t.Torrent), t) } - c.torrentsMutex.Unlock() - c.refreshListings() go func() { @@ -140,22 +155,6 @@ func (c *Cache) GetListing() []os.FileInfo { return nil } -func (c *Cache) GetTorrents() map[string]*CachedTorrent { - c.torrentsMutex.RLock() - defer c.torrentsMutex.RUnlock() - result := make(map[string]*CachedTorrent, len(c.torrents)) - for k, v := range c.torrents { - result[k] = v - } - return result -} - -func (c *Cache) GetTorrentNames() map[string]*CachedTorrent { - c.torrentsMutex.RLock() - defer c.torrentsMutex.RUnlock() - return c.torrentsNames -} - func (c *Cache) Start() error { if err := os.MkdirAll(c.dir, 0755); err != nil { return fmt.Errorf("failed to create cache directory: %w", err) @@ -167,10 +166,6 @@ func (c *Cache) Start() error { // initial download links go func() { - // lock download refresh mutex - c.downloadLinksRefreshMu.Lock() - defer c.downloadLinksRefreshMu.Unlock() - // This prevents the download links from being refreshed twice c.refreshDownloadLinks() }() @@ -181,6 +176,9 @@ func (c *Cache) Start() error { } }() + c.repairChan = make(chan RepairRequest, 100) + go c.repairWorker() + return nil } @@ -239,28 +237,36 @@ func (c *Cache) load() (map[string]*CachedTorrent, error) { return torrents, nil } +func (c *Cache) GetTorrents() map[string]*CachedTorrent { + torrents := make(map[string]*CachedTorrent) + c.torrents.Range(func(key string, value *CachedTorrent) bool { + torrents[key] = value + return true + }) + return torrents +} + func (c *Cache) GetTorrent(id string) *CachedTorrent { - c.torrentsMutex.RLock() - defer c.torrentsMutex.RUnlock() - if t, ok := c.torrents[id]; ok { + if t, ok := c.torrents.Load(id); ok { return t } return nil } func (c *Cache) GetTorrentByName(name string) *CachedTorrent { - if t, ok := c.GetTorrentNames()[name]; ok { + if t, ok := c.torrentsNames.Load(name); ok { return t } return nil } func (c *Cache) SaveTorrents() error { - for _, ct := range c.GetTorrents() { - if err := c.SaveTorrent(ct); err != nil { - return err + c.torrents.Range(func(key string, value *CachedTorrent) bool { + if err := c.SaveTorrent(value); err != nil { + c.logger.Debug().Err(err).Msgf("Failed to save torrent %s", key) } - } + return true + }) return nil } @@ -383,6 +389,7 @@ func (c *Cache) sync(torrents []*types.Torrent) error { count := atomic.AddInt64(&processed, 1) if count%1000 == 0 { + c.refreshListings() c.logger.Info().Msgf("Progress: %d/%d torrents processed", count, len(torrents)) } @@ -448,9 +455,6 @@ func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) string { if file.Link == "" { // file link is empty, refresh the torrent to get restricted links - if ct.IsComplete { - return "" - } ct = c.refreshTorrent(ct) // Refresh the torrent from the debrid if ct == nil { return "" @@ -458,17 +462,40 @@ func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) string { file = ct.Files[filename] } } - c.logger.Trace().Msgf("Getting download link for %s", ct.Name) - link, err := c.client.GetDownloadLink(ct.Torrent, &file) + + c.logger.Trace().Msgf("Getting download link for %s", filename) + downloadLink, err := c.client.GetDownloadLink(ct.Torrent, &file) if err != nil { - c.logger.Error().Err(err).Msg("Failed to get download link") + if errors.Is(err, request.HosterUnavailableError) { + // Check link here?? + c.logger.Debug().Err(err).Msgf("Hoster is unavailable. Triggering repair for %s", ct.Name) + if err := c.repairTorrent(ct); err != nil { + c.logger.Error().Err(err).Msgf("Failed to trigger repair for %s", ct.Name) + return "" + } + // Generate download link for the file then + f := ct.Files[filename] + downloadLink, _ = c.client.GetDownloadLink(ct.Torrent, &f) + f.DownloadLink = downloadLink + file.Generated = time.Now() + ct.Files[filename] = f + c.updateDownloadLink(file.Link, downloadLink) + + go func() { + go c.setTorrent(ct) + }() + + return downloadLink // Gets download link in the next pass + } + + c.logger.Debug().Err(err).Msgf("Failed to get download link for :%s", file.Link) return "" } - file.DownloadLink = link + file.DownloadLink = downloadLink file.Generated = time.Now() ct.Files[filename] = file - go c.updateDownloadLink(file) + go c.updateDownloadLink(file.Link, downloadLink) go c.setTorrent(ct) return file.DownloadLink } @@ -478,7 +505,7 @@ func (c *Cache) GenerateDownloadLinks(t *CachedTorrent) { c.logger.Error().Err(err).Msg("Failed to generate download links") } for _, file := range t.Files { - c.updateDownloadLink(file) + c.updateDownloadLink(file.Link, file.DownloadLink) } go func() { @@ -506,15 +533,18 @@ func (c *Cache) AddTorrent(t *types.Torrent) error { } -func (c *Cache) updateDownloadLink(file types.File) { - c.downloadLinksMutex.Lock() - defer c.downloadLinksMutex.Unlock() - c.downloadLinks[file.Link] = file.DownloadLink +func (c *Cache) updateDownloadLink(link, downloadLink string) { + c.downloadLinks.Store(link, downloadLinkCache{ + Link: downloadLink, + ExpiresAt: time.Now().Add(c.autoExpiresLinksAfter), // Expires in 24 hours + }) } func (c *Cache) checkDownloadLink(link string) string { - if dl, ok := c.downloadLinks[link]; ok { - return dl + if dl, ok := c.downloadLinks.Load(link); ok { + if dl.ExpiresAt.After(time.Now()) { + return dl.Link + } } return "" } @@ -525,26 +555,21 @@ func (c *Cache) GetClient() types.Client { func (c *Cache) DeleteTorrent(id string) { c.logger.Info().Msgf("Deleting torrent %s", id) - c.torrentsMutex.Lock() - defer c.torrentsMutex.Unlock() - if t, ok := c.torrents[id]; ok { - delete(c.torrents, id) - delete(c.torrentsNames, t.Name) - - c.removeFromDB(id) + if t, ok := c.torrents.Load(id); ok { + c.torrents.Delete(id) + c.torrentsNames.Delete(c.GetTorrentFolder(t.Torrent)) + go c.removeFromDB(id) c.refreshListings() } } func (c *Cache) DeleteTorrents(ids []string) { c.logger.Info().Msgf("Deleting %d torrents", len(ids)) - c.torrentsMutex.Lock() - defer c.torrentsMutex.Unlock() for _, id := range ids { - if t, ok := c.torrents[id]; ok { - delete(c.torrents, id) - delete(c.torrentsNames, c.GetTorrentFolder(t.Torrent)) + if t, ok := c.torrents.Load(id); ok { + c.torrents.Delete(id) + c.torrentsNames.Delete(c.GetTorrentFolder(t.Torrent)) go c.removeFromDB(id) } } diff --git a/pkg/debrid/debrid/refresh.go b/pkg/debrid/debrid/refresh.go index 66d3db1..34c498e 100644 --- a/pkg/debrid/debrid/refresh.go +++ b/pkg/debrid/debrid/refresh.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "os" + "path/filepath" "slices" "sort" "strings" @@ -37,12 +38,11 @@ func (c *Cache) refreshListings() { return } // Copy the current torrents to avoid concurrent issues - c.torrentsMutex.RLock() - torrents := make([]string, 0, len(c.torrentsNames)) - for k, _ := range c.torrentsNames { - torrents = append(torrents, k) - } - c.torrentsMutex.RUnlock() + torrents := make([]string, 0, c.torrentsNames.Size()) + c.torrentsNames.Range(func(key string, value *CachedTorrent) bool { + torrents = append(torrents, key) + return true + }) sort.Slice(torrents, func(i, j int) bool { return torrents[i] < torrents[j] @@ -61,26 +61,47 @@ func (c *Cache) refreshListings() { } // Atomic store of the complete ready-to-use slice c.listings.Store(files) - _ = c.RefreshXml() + c.resetPropfindResponse() if err := c.RefreshRclone(); err != nil { c.logger.Debug().Err(err).Msg("Failed to refresh rclone") } } +func (c *Cache) resetPropfindResponse() { + // Right now, parents are hardcoded + parents := []string{"__all__", "torrents"} + // Reset only the parent directories + // Convert the parents to a keys + // This is a bit hacky, but it works + // Instead of deleting all the keys, we only delete the parent keys, e.g __all__/ or torrents/ + keys := make([]string, 0, len(parents)) + for _, p := range parents { + // Construct the key + // construct url + url := filepath.Clean(filepath.Join("/webdav", c.client.GetName(), p)) + key0 := fmt.Sprintf("propfind:%s:0", url) + key1 := fmt.Sprintf("propfind:%s:1", url) + keys = append(keys, key0, key1) + } + + // Delete the keys + for _, k := range keys { + c.PropfindResp.Delete(k) + } +} + func (c *Cache) refreshTorrents() { if c.torrentsRefreshMu.TryLock() { defer c.torrentsRefreshMu.Unlock() } else { return } - c.torrentsMutex.RLock() - currentTorrents := c.torrents // // Create a copy of the current torrents to avoid concurrent issues - torrents := make(map[string]string, len(currentTorrents)) // a mpa of id and name - for _, v := range currentTorrents { - torrents[v.Id] = v.Name - } - c.torrentsMutex.RUnlock() + torrents := make(map[string]string, c.torrents.Size()) // a mpa of id and name + c.torrents.Range(func(key string, t *CachedTorrent) bool { + torrents[t.Id] = t.Name + return true + }) // Get new torrents from the debrid service debTorrents, err := c.client.GetTorrents() @@ -206,14 +227,25 @@ func (c *Cache) refreshDownloadLinks() { } else { return } - c.downloadLinksMutex.Lock() - defer c.downloadLinksMutex.Unlock() downloadLinks, err := c.client.GetDownloads() if err != nil { c.logger.Debug().Err(err).Msg("Failed to get download links") } for k, v := range downloadLinks { - c.downloadLinks[k] = v.DownloadLink + // if link is generated in the last 24 hours, add it to cache + timeSince := time.Since(v.Generated) + if timeSince < c.autoExpiresLinksAfter { + c.downloadLinks.Store(k, downloadLinkCache{ + Link: v.DownloadLink, + ExpiresAt: v.Generated.Add(c.autoExpiresLinksAfter - timeSince), + }) + } else { + //c.downloadLinks.Delete(k) don't delete, just log + c.logger.Trace().Msgf("Download link for %s expired", k) + } } + + c.logger.Debug().Msgf("Refreshed %d download links", len(downloadLinks)) + } diff --git a/pkg/debrid/debrid/repair.go b/pkg/debrid/debrid/repair.go new file mode 100644 index 0000000..be1085b --- /dev/null +++ b/pkg/debrid/debrid/repair.go @@ -0,0 +1,166 @@ +package debrid + +import ( + "errors" + "fmt" + "github.com/sirrobot01/debrid-blackhole/internal/request" + "github.com/sirrobot01/debrid-blackhole/internal/utils" + "github.com/sirrobot01/debrid-blackhole/pkg/debrid/types" + "slices" +) + +func (c *Cache) IsTorrentBroken(t *CachedTorrent, filenames []string) bool { + // Check torrent files + + isBroken := false + files := make(map[string]types.File) + if len(filenames) > 0 { + for name, f := range t.Files { + if slices.Contains(filenames, name) { + files[name] = f + } + } + } else { + files = t.Files + } + + // Check empty links + for _, f := range files { + // Check if file is missing + if f.Link == "" { + // refresh torrent and then break + t = c.refreshTorrent(t) + break + } + } + + for _, f := range files { + // Check if file link is still missing + if f.Link == "" { + isBroken = true + break + } else { + // Check if file.Link not in the downloadLink Cache + if _, ok := c.downloadLinks.Load(f.Link); !ok { + // File not in cache + // Check link + if err := c.client.CheckLink(f.Link); err != nil { + if errors.Is(err, request.ErrLinkBroken) { + isBroken = true + break + } else { + // This might just be a temporary error + } + } else { + // Generate a new download link? + } + } else { + // Link is in cache + // We might skip checking for now, it seems rd removes uncached links + } + } + } + return isBroken +} + +func (c *Cache) repairWorker() { + // This watches a channel for torrents to repair + c.logger.Info().Msg("Starting repair worker") + + for { + select { + case req := <-c.repairChan: + torrentId := req.TorrentID + if _, inProgress := c.repairsInProgress.Load(torrentId); inProgress { + c.logger.Debug().Str("torrentId", torrentId).Msg("Skipping duplicate repair request") + continue + } + + // Mark as in progress + c.repairsInProgress.Store(torrentId, true) + c.logger.Debug().Str("torrentId", req.TorrentID).Msg("Received repair request") + + // Get the torrent from the cache + cachedTorrent, ok := c.torrents.Load(torrentId) + if !ok || cachedTorrent == nil { + c.logger.Warn().Str("torrentId", torrentId).Msg("Torrent not found in cache") + continue + } + + // Check if torrent is broken + if c.IsTorrentBroken(cachedTorrent, nil) { + c.logger.Info().Str("torrentId", torrentId).Msg("Repairing broken torrent") + // Repair torrent + if err := c.repairTorrent(cachedTorrent); err != nil { + c.logger.Error().Err(err).Str("torrentId", torrentId).Msg("Failed to repair torrent") + } else { + c.logger.Info().Str("torrentId", torrentId).Msg("Torrent repaired") + } + } else { + c.logger.Debug().Str("torrentId", torrentId).Msg("Torrent is not broken") + } + c.repairsInProgress.Delete(torrentId) + } + } +} + +func (c *Cache) SubmitForRepair(torrentId, fileName string) { + // Submitting a torrent for repair.Not used yet + + // Check if already in progress before even submitting + if _, inProgress := c.repairsInProgress.Load(torrentId); inProgress { + c.logger.Debug().Str("torrentID", torrentId).Msg("Repair already in progress") + return + } + + select { + case c.repairChan <- RepairRequest{TorrentID: torrentId, FileName: fileName}: + c.logger.Debug().Str("torrentID", torrentId).Msg("Submitted for repair") + default: + c.logger.Warn().Str("torrentID", torrentId).Msg("Repair channel full, skipping repair request") + } +} + +func (c *Cache) repairTorrent(t *CachedTorrent) error { + // Check if Magnet is not empty, if empty, reconstruct the magnet + + if _, inProgress := c.repairsInProgress.Load(t.Id); inProgress { + c.logger.Debug().Str("torrentID", t.Id).Msg("Repair already in progress") + return nil + } + + torrent := t.Torrent + if torrent.Magnet == nil { + torrent.Magnet = utils.ConstructMagnet(t.InfoHash, t.Name) + } + + oldID := torrent.Id + + // Submit the magnet to the debrid service + torrent.Id = "" + var err error + torrent, err = c.client.SubmitMagnet(torrent) + if err != nil { + return fmt.Errorf("failed to submit magnet: %w", err) + } + + // Check if the torrent was submitted + if torrent == nil || torrent.Id == "" { + return fmt.Errorf("failed to submit magnet: empty torrent") + } + torrent, err = c.client.CheckStatus(torrent, true) + if err != nil { + return fmt.Errorf("failed to check status: %w", err) + } + + c.client.DeleteTorrent(oldID) // delete the old torrent + c.DeleteTorrent(oldID) // Remove from listings + + // Update the torrent in the cache + t.Torrent = torrent + c.setTorrent(t) + c.refreshListings() + + c.repairsInProgress.Delete(oldID) + return nil +} diff --git a/pkg/debrid/debrid_link/debrid_link.go b/pkg/debrid/debrid_link/debrid_link.go index ae5bb3c..1dc5797 100644 --- a/pkg/debrid/debrid_link/debrid_link.go +++ b/pkg/debrid/debrid_link/debrid_link.go @@ -353,3 +353,11 @@ func (dl *DebridLink) getTorrents(page, perPage int) ([]*types.Torrent, error) { } return torrents, nil } + +func (dl *DebridLink) CheckLink(link string) error { + return nil +} + +func (dl *DebridLink) GetMountPath() string { + return dl.MountPath +} diff --git a/pkg/debrid/realdebrid/realdebrid.go b/pkg/debrid/realdebrid/realdebrid.go index 7467ef0..d57b076 100644 --- a/pkg/debrid/realdebrid/realdebrid.go +++ b/pkg/debrid/realdebrid/realdebrid.go @@ -17,7 +17,6 @@ import ( "strconv" "strings" "sync" - "time" ) type RealDebrid struct { @@ -167,7 +166,7 @@ func (r *RealDebrid) UpdateTorrent(t *types.Torrent) error { if err != nil { return err } - t.Name = data.OriginalFilename + t.Name = data.Filename t.Bytes = data.Bytes t.Folder = data.OriginalFilename t.Progress = data.Progress @@ -262,41 +261,105 @@ func (r *RealDebrid) DeleteTorrent(torrentId string) { func (r *RealDebrid) GenerateDownloadLinks(t *types.Torrent) error { url := fmt.Sprintf("%s/unrestrict/link/", r.Host) - files := make(map[string]types.File) + filesCh := make(chan types.File, len(t.Files)) + errCh := make(chan error, len(t.Files)) + + var wg sync.WaitGroup + for _, f := range t.Files { - payload := gourl.Values{ - "link": {f.Link}, - } - req, _ := http.NewRequest(http.MethodPost, url, strings.NewReader(payload.Encode())) - resp, err := r.client.MakeRequest(req) - if err != nil { - fmt.Println(err) - return err - } - var data UnrestrictResponse - if err = json.Unmarshal(resp, &data); err != nil { - return err - } - f.DownloadLink = data.Download - f.Generated = time.Now() - files[f.Name] = f + wg.Add(1) + go func(file types.File) { + defer wg.Done() + + payload := gourl.Values{"link": {file.Link}} + req, err := http.NewRequest(http.MethodPost, url, strings.NewReader(payload.Encode())) + if err != nil { + errCh <- err + return + } + + resp, err := r.client.Do(req) + if err != nil { + errCh <- err + return + } + if resp.StatusCode == http.StatusServiceUnavailable { + errCh <- request.HosterUnavailableError + return + } + defer resp.Body.Close() + b, err := io.ReadAll(resp.Body) + + var data UnrestrictResponse + if err = json.Unmarshal(b, &data); err != nil { + errCh <- err + return + } + + file.DownloadLink = data.Download + filesCh <- file + }(f) } + + go func() { + wg.Wait() + close(filesCh) + close(errCh) + }() + + // Collect results + files := make(map[string]types.File, len(t.Files)) + for file := range filesCh { + files[file.Name] = file + } + + // Check for errors + for err := range errCh { + if err != nil { + return err // Return the first error encountered + } + } + t.Files = files return nil } +func (r *RealDebrid) CheckLink(link string) error { + url := fmt.Sprintf("%s/unrestrict/check", r.Host) + payload := gourl.Values{ + "link": {link}, + } + req, _ := http.NewRequest(http.MethodPost, url, strings.NewReader(payload.Encode())) + resp, err := r.client.Do(req) + if err != nil { + return err + } + if resp.StatusCode == http.StatusNotFound { + return request.ErrLinkBroken // File has been removed + } + return nil +} + func (r *RealDebrid) GetDownloadLink(t *types.Torrent, file *types.File) (string, error) { url := fmt.Sprintf("%s/unrestrict/link/", r.Host) payload := gourl.Values{ "link": {file.Link}, } req, _ := http.NewRequest(http.MethodPost, url, strings.NewReader(payload.Encode())) - resp, err := r.client.MakeRequest(req) + resp, err := r.client.Do(req) + if err != nil { + return "", err + } + if resp.StatusCode == http.StatusServiceUnavailable { + return "", request.HosterUnavailableError + } + defer resp.Body.Close() + b, err := io.ReadAll(resp.Body) if err != nil { return "", err } var data UnrestrictResponse - if err = json.Unmarshal(resp, &data); err != nil { + if err = json.Unmarshal(b, &data); err != nil { return "", err } return data.Download, nil @@ -348,7 +411,7 @@ func (r *RealDebrid) getTorrents(offset int, limit int) (int, []*types.Torrent, } torrents = append(torrents, &types.Torrent{ Id: t.Id, - Name: utils.RemoveInvalidChars(t.Filename), // This changes when we get the files + Name: t.Filename, Bytes: t.Bytes, Progress: t.Progress, Status: t.Status, @@ -481,6 +544,10 @@ func (r *RealDebrid) GetDownloadUncached() bool { return r.DownloadUncached } +func (r *RealDebrid) GetMountPath() string { + return r.MountPath +} + func New(dc config.Debrid) *RealDebrid { rl := request.ParseRateLimit(dc.RateLimit) headers := map[string]string{ @@ -489,7 +556,9 @@ func New(dc config.Debrid) *RealDebrid { _log := logger.NewLogger(dc.Name) client := request.New(). WithHeaders(headers). - WithRateLimiter(rl).WithLogger(_log) + WithRateLimiter(rl).WithLogger(_log). + WithMaxRetries(5). + WithRetryableStatus(429) return &RealDebrid{ Name: "realdebrid", Host: dc.Host, diff --git a/pkg/debrid/torbox/torbox.go b/pkg/debrid/torbox/torbox.go index 91ff0ba..ba8c81e 100644 --- a/pkg/debrid/torbox/torbox.go +++ b/pkg/debrid/torbox/torbox.go @@ -337,3 +337,11 @@ func New(dc config.Debrid) *Torbox { func (tb *Torbox) GetDownloads() (map[string]types.DownloadLinks, error) { return nil, nil } + +func (tb *Torbox) CheckLink(link string) error { + return nil +} + +func (tb *Torbox) GetMountPath() string { + return tb.MountPath +} diff --git a/pkg/debrid/types/debrid.go b/pkg/debrid/types/debrid.go index 7bfb873..a734cb1 100644 --- a/pkg/debrid/types/debrid.go +++ b/pkg/debrid/types/debrid.go @@ -19,4 +19,6 @@ type Client interface { GetLogger() zerolog.Logger GetDownloadingStatus() []string GetDownloads() (map[string]DownloadLinks, error) + CheckLink(link string) error + GetMountPath() string } diff --git a/pkg/repair/clean.go b/pkg/repair/clean.go new file mode 100644 index 0000000..bd40968 --- /dev/null +++ b/pkg/repair/clean.go @@ -0,0 +1,146 @@ +package repair + +import ( + "context" + "fmt" + "github.com/sirrobot01/debrid-blackhole/internal/request" + "golang.org/x/sync/errgroup" + "runtime" + "sync" + "time" +) + +func (r *Repair) clean(job *Job) error { + // Create a new error group + g, ctx := errgroup.WithContext(context.Background()) + + uniqueItems := make(map[string]string) + mu := sync.Mutex{} + + // Limit concurrent goroutines + g.SetLimit(runtime.NumCPU() * 4) + + for _, a := range job.Arrs { + a := a // Capture range variable + g.Go(func() error { + // Check if context was canceled + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + items, err := r.cleanArr(job, a, "") + if err != nil { + r.logger.Error().Err(err).Msgf("Error cleaning %s", a) + return err + } + + // Safely append the found items to the shared slice + if len(items) > 0 { + mu.Lock() + for k, v := range items { + uniqueItems[k] = v + } + mu.Unlock() + } + + return nil + }) + } + + if err := g.Wait(); err != nil { + return err + } + + if len(uniqueItems) == 0 { + job.CompletedAt = time.Now() + job.Status = JobCompleted + + go func() { + if err := request.SendDiscordMessage("repair_clean_complete", "success", job.discordContext()); err != nil { + r.logger.Error().Msgf("Error sending discord message: %v", err) + } + }() + + return nil + } + + cache := r.deb.Caches["realdebrid"] + if cache == nil { + return fmt.Errorf("cache not found") + } + torrents := cache.GetTorrents() + + dangling := make([]string, 0) + for _, t := range torrents { + if _, ok := uniqueItems[t.Name]; !ok { + dangling = append(dangling, t.Id) + } + } + + r.logger.Info().Msgf("Found %d delapitated items", len(dangling)) + + if len(dangling) == 0 { + job.CompletedAt = time.Now() + job.Status = JobCompleted + return nil + } + + client := r.deb.Clients["realdebrid"] + if client == nil { + return fmt.Errorf("client not found") + } + for _, id := range dangling { + client.DeleteTorrent(id) + } + + return nil +} + +func (r *Repair) cleanArr(j *Job, _arr string, tmdbId string) (map[string]string, error) { + uniqueItems := make(map[string]string) + a := r.arrs.Get(_arr) + + r.logger.Info().Msgf("Starting repair for %s", a.Name) + media, err := a.GetMedia(tmdbId) + if err != nil { + r.logger.Info().Msgf("Failed to get %s media: %v", a.Name, err) + return uniqueItems, err + } + + // Create a new error group + g, ctx := errgroup.WithContext(context.Background()) + + mu := sync.Mutex{} + + // Limit concurrent goroutines + g.SetLimit(runtime.NumCPU() * 4) + + for _, m := range media { + m := m // Create a new variable scoped to the loop iteration + g.Go(func() error { + // Check if context was canceled + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + u := r.getUniquePaths(m) + for k, v := range u { + mu.Lock() + uniqueItems[k] = v + mu.Unlock() + } + return nil + }) + } + + if err := g.Wait(); err != nil { + return uniqueItems, err + } + + r.logger.Info().Msgf("Repair completed for %s. %d unique items", a.Name, len(uniqueItems)) + return uniqueItems, nil +} diff --git a/pkg/repair/misc.go b/pkg/repair/misc.go index 31b281d..6cd0137 100644 --- a/pkg/repair/misc.go +++ b/pkg/repair/misc.go @@ -2,6 +2,7 @@ package repair import ( "fmt" + "github.com/sirrobot01/debrid-blackhole/pkg/arr" "os" "path/filepath" "strconv" @@ -129,3 +130,20 @@ func checkFileStart(filePath string) error { } return nil } + +func collectFiles(media arr.Content) map[string][]arr.ContentFile { + uniqueParents := make(map[string][]arr.ContentFile) + files := media.Files + for _, file := range files { + target := getSymlinkTarget(file.Path) + if target != "" { + file.IsSymlink = true + dir, f := filepath.Split(target) + torrentNamePath := filepath.Clean(dir) + // Set target path folder/file.mkv + file.TargetPath = f + uniqueParents[torrentNamePath] = append(uniqueParents[torrentNamePath], file) + } + } + return uniqueParents +} diff --git a/pkg/repair/repair.go b/pkg/repair/repair.go index 6c38cbf..7f1e7d2 100644 --- a/pkg/repair/repair.go +++ b/pkg/repair/repair.go @@ -34,6 +34,7 @@ type Repair struct { runOnStart bool ZurgURL string IsZurg bool + useWebdav bool autoProcess bool logger zerolog.Logger filename string @@ -51,6 +52,7 @@ func New(arrs *arr.Storage, engine *debrid.Engine) *Repair { duration: duration, runOnStart: cfg.Repair.RunOnStart, ZurgURL: cfg.Repair.ZurgURL, + useWebdav: cfg.Repair.UseWebDav, autoProcess: cfg.Repair.AutoProcess, filename: filepath.Join(cfg.Path, "repair.json"), deb: engine, @@ -157,6 +159,13 @@ func (r *Repair) newJob(arrsNames []string, mediaIDs []string) *Job { } func (r *Repair) preRunChecks() error { + + if r.useWebdav { + if len(r.deb.Caches) == 0 { + return fmt.Errorf("no caches found") + } + } + // Check if zurg url is reachable if !r.IsZurg { return nil @@ -362,141 +371,6 @@ func (r *Repair) getUniquePaths(media arr.Content) map[string]string { return uniqueParents } -func (r *Repair) clean(job *Job) error { - // Create a new error group - g, ctx := errgroup.WithContext(context.Background()) - - uniqueItems := make(map[string]string) - mu := sync.Mutex{} - - // Limit concurrent goroutines - g.SetLimit(runtime.NumCPU() * 4) - - for _, a := range job.Arrs { - a := a // Capture range variable - g.Go(func() error { - // Check if context was canceled - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - items, err := r.cleanArr(job, a, "") - if err != nil { - r.logger.Error().Err(err).Msgf("Error cleaning %s", a) - return err - } - - // Safely append the found items to the shared slice - if len(items) > 0 { - mu.Lock() - for k, v := range items { - uniqueItems[k] = v - } - mu.Unlock() - } - - return nil - }) - } - - if err := g.Wait(); err != nil { - return err - } - - if len(uniqueItems) == 0 { - job.CompletedAt = time.Now() - job.Status = JobCompleted - - go func() { - if err := request.SendDiscordMessage("repair_clean_complete", "success", job.discordContext()); err != nil { - r.logger.Error().Msgf("Error sending discord message: %v", err) - } - }() - - return nil - } - - cache := r.deb.Caches["realdebrid"] - if cache == nil { - return fmt.Errorf("cache not found") - } - torrents := cache.GetTorrents() - - dangling := make([]string, 0) - for _, t := range torrents { - if _, ok := uniqueItems[t.Name]; !ok { - dangling = append(dangling, t.Id) - } - } - - r.logger.Info().Msgf("Found %d delapitated items", len(dangling)) - - if len(dangling) == 0 { - job.CompletedAt = time.Now() - job.Status = JobCompleted - return nil - } - - client := r.deb.Clients["realdebrid"] - if client == nil { - return fmt.Errorf("client not found") - } - for _, id := range dangling { - client.DeleteTorrent(id) - } - - return nil -} - -func (r *Repair) cleanArr(j *Job, _arr string, tmdbId string) (map[string]string, error) { - uniqueItems := make(map[string]string) - a := r.arrs.Get(_arr) - - r.logger.Info().Msgf("Starting repair for %s", a.Name) - media, err := a.GetMedia(tmdbId) - if err != nil { - r.logger.Info().Msgf("Failed to get %s media: %v", a.Name, err) - return uniqueItems, err - } - - // Create a new error group - g, ctx := errgroup.WithContext(context.Background()) - - mu := sync.Mutex{} - - // Limit concurrent goroutines - g.SetLimit(runtime.NumCPU() * 4) - - for _, m := range media { - m := m // Create a new variable scoped to the loop iteration - g.Go(func() error { - // Check if context was canceled - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - u := r.getUniquePaths(m) - for k, v := range u { - mu.Lock() - uniqueItems[k] = v - mu.Unlock() - } - return nil - }) - } - - if err := g.Wait(); err != nil { - return uniqueItems, err - } - - r.logger.Info().Msgf("Repair completed for %s. %d unique items", a.Name, len(uniqueItems)) - return uniqueItems, nil -} - func (r *Repair) repairArr(j *Job, _arr string, tmdbId string) ([]arr.ContentFile, error) { brokenItems := make([]arr.ContentFile, 0) a := r.arrs.Get(_arr) @@ -598,7 +472,9 @@ func (r *Repair) isMediaAccessible(m arr.Content) bool { func (r *Repair) getBrokenFiles(media arr.Content) []arr.ContentFile { - if r.IsZurg { + if r.useWebdav { + return r.getWebdavBrokenFiles(media) + } else if r.IsZurg { return r.getZurgBrokenFiles(media) } else { return r.getFileBrokenFiles(media) @@ -610,17 +486,7 @@ func (r *Repair) getFileBrokenFiles(media arr.Content) []arr.ContentFile { brokenFiles := make([]arr.ContentFile, 0) - uniqueParents := make(map[string][]arr.ContentFile) - files := media.Files - for _, file := range files { - target := getSymlinkTarget(file.Path) - if target != "" { - file.IsSymlink = true - dir, _ := filepath.Split(target) - parent := filepath.Base(filepath.Clean(dir)) - uniqueParents[parent] = append(uniqueParents[parent], file) - } - } + uniqueParents := collectFiles(media) for parent, f := range uniqueParents { // Check stat @@ -646,19 +512,7 @@ func (r *Repair) getZurgBrokenFiles(media arr.Content) []arr.ContentFile { // This reduces bandwidth usage significantly brokenFiles := make([]arr.ContentFile, 0) - uniqueParents := make(map[string][]arr.ContentFile) - files := media.Files - for _, file := range files { - target := getSymlinkTarget(file.Path) - if target != "" { - file.IsSymlink = true - dir, f := filepath.Split(target) - parent := filepath.Base(filepath.Clean(dir)) - // Set target path folder/file.mkv - file.TargetPath = f - uniqueParents[parent] = append(uniqueParents[parent], file) - } - } + uniqueParents := collectFiles(media) client := &http.Client{ Timeout: 0, Transport: &http.Transport{ @@ -672,9 +526,9 @@ func (r *Repair) getZurgBrokenFiles(media arr.Content) []arr.ContentFile { // Access zurg url + symlink folder + first file(encoded) for parent, f := range uniqueParents { r.logger.Debug().Msgf("Checking %s", parent) - encodedParent := url.PathEscape(parent) + torrentName := url.PathEscape(filepath.Base(parent)) encodedFile := url.PathEscape(f[0].TargetPath) - fullURL := fmt.Sprintf("%s/http/__all__/%s/%s", r.ZurgURL, encodedParent, encodedFile) + fullURL := fmt.Sprintf("%s/http/__all__/%s/%s", r.ZurgURL, torrentName, encodedFile) // Check file stat first if _, err := os.Stat(f[0].Path); os.IsNotExist(err) { r.logger.Debug().Msgf("Broken symlink found: %s", fullURL) @@ -715,6 +569,76 @@ func (r *Repair) getZurgBrokenFiles(media arr.Content) []arr.ContentFile { return brokenFiles } +func (r *Repair) getWebdavBrokenFiles(media arr.Content) []arr.ContentFile { + // Use internal webdav setup to check file availability + + caches := r.deb.Caches + if len(caches) == 0 { + r.logger.Info().Msg("No caches found. Can't use webdav") + return nil + } + + clients := r.deb.Clients + if len(clients) == 0 { + r.logger.Info().Msg("No clients found. Can't use webdav") + return nil + } + + brokenFiles := make([]arr.ContentFile, 0) + uniqueParents := collectFiles(media) + // Access zurg url + symlink folder + first file(encoded) + for torrentPath, f := range uniqueParents { + r.logger.Debug().Msgf("Checking %s", torrentPath) + // Get the debrid first + dir := filepath.Dir(torrentPath) + debridName := "" + for _, client := range clients { + mountPath := client.GetMountPath() + if mountPath == "" { + continue + } + if filepath.Clean(mountPath) == filepath.Clean(dir) { + debridName = client.GetName() + break + } + } + if debridName == "" { + r.logger.Debug().Msgf("No debrid found for %s. Skipping", torrentPath) + continue + } + cache, ok := caches[debridName] + if !ok { + r.logger.Debug().Msgf("No cache found for %s. Skipping", debridName) + continue + } + // Check if torrent exists + torrentName := filepath.Clean(filepath.Base(torrentPath)) + torrent := cache.GetTorrentByName(torrentName) + if torrent == nil { + r.logger.Debug().Msgf("No torrent found for %s. Skipping", torrentName) + continue + } + files := make([]string, 0) + for _, file := range f { + files = append(files, file.TargetPath) + } + + if cache.IsTorrentBroken(torrent, files) { + r.logger.Debug().Msgf("[webdav] Broken symlink found: %s", torrentPath) + // Delete the torrent? + brokenFiles = append(brokenFiles, f...) + continue + } + + } + if len(brokenFiles) == 0 { + r.logger.Debug().Msgf("No broken files found for %s", media.Title) + return nil + } + r.logger.Debug().Msgf("%d broken files found for %s", len(brokenFiles), media.Title) + return brokenFiles +} + func (r *Repair) GetJob(id string) *Job { for _, job := range r.Jobs { if job.ID == id { diff --git a/pkg/webdav/file.go b/pkg/webdav/file.go index 20b775c..66fc4ab 100644 --- a/pkg/webdav/file.go +++ b/pkg/webdav/file.go @@ -1,6 +1,7 @@ package webdav import ( + "crypto/tls" "fmt" "github.com/sirrobot01/debrid-blackhole/pkg/debrid/debrid" "io" @@ -11,13 +12,8 @@ import ( var sharedClient = &http.Client{ Transport: &http.Transport{ - MaxIdleConns: 100, - IdleConnTimeout: 90 * time.Second, - ResponseHeaderTimeout: 30 * time.Second, - ExpectContinueTimeout: 1 * time.Second, - DisableCompression: false, // Enable compression for faster transfers - DisableKeepAlives: false, - Proxy: http.ProxyFromEnvironment, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + Proxy: http.ProxyFromEnvironment, }, Timeout: 0, } @@ -92,7 +88,7 @@ func (f *File) Read(p []byte) (n int, err error) { downloadLink := f.GetDownloadLink() if downloadLink == "" { - return 0, fmt.Errorf("failed to get download link for file") + return 0, io.EOF } req, err := http.NewRequest("GET", downloadLink, nil) diff --git a/pkg/webdav/handler.go b/pkg/webdav/handler.go index 939d9af..9e5ed09 100644 --- a/pkg/webdav/handler.go +++ b/pkg/webdav/handler.go @@ -191,7 +191,6 @@ func (h *Handler) OpenFile(ctx context.Context, name string, flag int, perm os.F name: file.Name, size: file.Size, link: file.Link, - downloadLink: file.DownloadLink, metadataOnly: metadataOnly, } return fi, nil diff --git a/pkg/webdav/webdav.go b/pkg/webdav/webdav.go index f3469a4..c815955 100644 --- a/pkg/webdav/webdav.go +++ b/pkg/webdav/webdav.go @@ -115,7 +115,6 @@ func (wd *WebDav) setupRootHandler(r chi.Router) { func (wd *WebDav) commonMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("DAV", "1, 2") - w.Header().Set("Cache-Control", "max-age=3600") w.Header().Set("Allow", "OPTIONS, PROPFIND, GET, HEAD, POST, PUT, DELETE, MKCOL, PROPPATCH, COPY, MOVE, LOCK, UNLOCK") w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Methods", "OPTIONS, PROPFIND, GET, HEAD, POST, PUT, DELETE, MKCOL, PROPPATCH, COPY, MOVE, LOCK, UNLOCK")