Fix repair bug; fix torrent refreshes

This commit is contained in:
Mukhtar Akere
2025-04-19 10:41:45 +01:00
parent 52877107c9
commit dc8ee3d150
9 changed files with 150 additions and 67 deletions

View File

@@ -61,6 +61,6 @@ EXPOSE 8181 8282
VOLUME ["/app"]
USER nonroot:nonroot
HEALTHCHECK --retries=3 CMD ["/usr/bin/healthcheck", "--config", "/app"]
HEALTHCHECK --interval=5s --timeout=3s --retries=10 CMD ["/usr/bin/healthcheck", "--config", "/app"]
CMD ["/usr/bin/decypharr", "--config", "/app"]

View File

@@ -121,6 +121,7 @@ func WithTransport(transport *http.Transport) ClientOption {
// WithRetryableStatus adds status codes that should trigger a retry
func WithRetryableStatus(statusCodes ...int) ClientOption {
return func(c *Client) {
c.retryableStatus = make(map[int]struct{}) // reset the map
for _, code := range statusCodes {
c.retryableStatus[code] = struct{}{}
}

View File

@@ -342,14 +342,13 @@ func (c *Cache) sync(torrents []*types.Torrent) error {
return // Channel closed, exit goroutine
}
if err := c.ProcessTorrent(t, false); err != nil {
if err := c.ProcessTorrent(t); err != nil {
c.logger.Error().Err(err).Str("torrent", t.Name).Msg("sync error")
atomic.AddInt64(&errorCount, 1)
}
count := atomic.AddInt64(&processed, 1)
if count%1000 == 0 {
c.refreshListings()
c.logger.Info().Msgf("Progress: %d/%d torrents processed", count, len(torrents))
}
@@ -376,7 +375,7 @@ func (c *Cache) sync(torrents []*types.Torrent) error {
// Wait for all workers to complete
wg.Wait()
c.refreshListings()
c.RefreshListings(true) // final refresh
c.logger.Info().Msgf("Sync complete: %d torrents processed, %d errors", len(torrents), errorCount)
return nil
}
@@ -412,7 +411,7 @@ func (c *Cache) setTorrents(torrents map[string]*CachedTorrent) {
c.torrentsNames.Store(c.GetTorrentFolder(t.Torrent), t)
}
c.refreshListings()
c.RefreshListings(true)
c.SaveTorrents()
}
@@ -532,7 +531,7 @@ func (c *Cache) saveTorrent(id string, data []byte) {
}
}
func (c *Cache) ProcessTorrent(t *types.Torrent, refreshRclone bool) error {
func (c *Cache) ProcessTorrent(t *types.Torrent) error {
isComplete := func(files map[string]types.File) bool {
_complete := len(files) > 0
@@ -572,10 +571,6 @@ func (c *Cache) ProcessTorrent(t *types.Torrent, refreshRclone bool) error {
}
c.setTorrent(ct)
}
if refreshRclone {
c.refreshListings()
}
return nil
}
@@ -685,7 +680,7 @@ func (c *Cache) AddTorrent(t *types.Torrent) error {
AddedOn: addedOn,
}
c.setTorrent(ct)
c.refreshListings()
c.RefreshListings(true)
go c.GenerateDownloadLinks(ct)
return nil
@@ -755,7 +750,7 @@ func (c *Cache) DeleteTorrent(id string) error {
c.torrents.Delete(id)
c.torrentsNames.Delete(c.GetTorrentFolder(t.Torrent))
c.removeFromDB(id)
c.refreshListings()
c.RefreshListings(true)
}
return nil
}
@@ -769,7 +764,7 @@ func (c *Cache) DeleteTorrents(ids []string) {
c.removeFromDB(id)
}
}
c.refreshListings()
c.RefreshListings(true)
}
func (c *Cache) removeFromDB(torrentId string) {

View File

@@ -3,7 +3,6 @@ package debrid
import (
"fmt"
"github.com/sirrobot01/decypharr/internal/config"
"github.com/sirrobot01/decypharr/internal/request"
"github.com/sirrobot01/decypharr/internal/utils"
"github.com/sirrobot01/decypharr/pkg/debrid/types"
"io"
@@ -30,13 +29,13 @@ func (fi *fileInfo) ModTime() time.Time { return fi.modTime }
func (fi *fileInfo) IsDir() bool { return fi.isDir }
func (fi *fileInfo) Sys() interface{} { return nil }
func (c *Cache) refreshListings() {
func (c *Cache) RefreshListings(refreshRclone bool) {
if c.listingRefreshMu.TryLock() {
defer c.listingRefreshMu.Unlock()
} else {
return
}
// COpy the torrents to a string|time map
// Copy the torrents to a string|time map
torrentsTime := make(map[string]time.Time, c.torrents.Size())
torrents := make([]string, 0, c.torrents.Size())
c.torrentsNames.Range(func(key string, value *CachedTorrent) bool {
@@ -60,7 +59,15 @@ func (c *Cache) refreshListings() {
}
// Atomic store of the complete ready-to-use slice
c.listings.Store(files)
_ = c.refreshXml()
if err := c.RefreshParentXml(); err != nil {
c.logger.Debug().Err(err).Msg("Failed to refresh XML")
}
if refreshRclone {
if err := c.RefreshRclone(); err != nil {
c.logger.Trace().Err(err).Msg("Failed to refresh rclone") // silent error
}
}
}
func (c *Cache) refreshTorrents() {
@@ -118,7 +125,7 @@ func (c *Cache) refreshTorrents() {
return
default:
}
if err := c.ProcessTorrent(t, true); err != nil {
if err := c.ProcessTorrent(t); err != nil {
c.logger.Error().Err(err).Msgf("Failed to process new torrent %s", t.Id)
errChan <- err
}
@@ -137,11 +144,12 @@ func (c *Cache) refreshTorrents() {
close(workChan)
wg.Wait()
c.RefreshListings(true)
c.logger.Debug().Msgf("Processed %d new torrents", len(newTorrents))
}
func (c *Cache) RefreshRclone() error {
client := request.Default()
cfg := config.Get().WebDav
if cfg.RcUrl == "" {
@@ -163,6 +171,8 @@ func (c *Cache) RefreshRclone() error {
forgetReq.Header.Set("Content-Type", "application/x-www-form-urlencoded")
// Send the request
client := &http.Client{}
client.Timeout = 10 * time.Second
forgetResp, err := client.Do(forgetReq)
if err != nil {
return err
@@ -173,6 +183,26 @@ func (c *Cache) RefreshRclone() error {
body, _ := io.ReadAll(forgetResp.Body)
return fmt.Errorf("failed to forget rclone: %s - %s", forgetResp.Status, string(body))
}
// Run vfs/refresh
refreshReq, err := http.NewRequest("POST", fmt.Sprintf("%s/vfs/refresh", cfg.RcUrl), strings.NewReader(data))
if err != nil {
return err
}
if cfg.RcUser != "" && cfg.RcPass != "" {
refreshReq.SetBasicAuth(cfg.RcUser, cfg.RcPass)
}
refreshReq.Header.Set("Content-Type", "application/x-www-form-urlencoded")
refreshResp, err := client.Do(refreshReq)
if err != nil {
return err
}
defer refreshResp.Body.Close()
if refreshResp.StatusCode != 200 {
body, _ := io.ReadAll(refreshResp.Body)
return fmt.Errorf("failed to refresh rclone: %s - %s", refreshResp.Status, string(body))
}
return nil
}

View File

@@ -58,14 +58,12 @@ func (c *Cache) IsTorrentBroken(t *CachedTorrent, filenames []string) bool {
// Try to reinsert the torrent if it's broken
if cfg.Repair.ReInsert && isBroken && t.Torrent != nil {
// Check if the torrent is already in progress
if _, inProgress := c.repairsInProgress.Load(t.Torrent.Id); !inProgress {
if err := c.reInsertTorrent(t); err != nil {
c.logger.Error().Err(err).Str("torrentId", t.Torrent.Id).Msg("Failed to reinsert torrent")
return true
} else {
c.logger.Debug().Str("torrentId", t.Torrent.Id).Msg("Reinserted torrent")
return false
}
if err := c.reInsertTorrent(t); err != nil {
c.logger.Error().Err(err).Str("torrentId", t.Torrent.Id).Msg("Failed to reinsert torrent")
return true
} else {
c.logger.Debug().Str("torrentId", t.Torrent.Id).Msg("Reinserted torrent")
return false
}
}
@@ -76,13 +74,6 @@ func (c *Cache) repairWorker() {
// This watches a channel for torrents to repair
for req := range c.repairChan {
torrentId := req.TorrentID
if _, inProgress := c.repairsInProgress.Load(torrentId); inProgress {
c.logger.Debug().Str("torrentId", torrentId).Msg("Skipping duplicate repair request")
continue
}
// Mark as in progress
c.repairsInProgress.Store(torrentId, struct{}{})
c.logger.Debug().Str("torrentId", req.TorrentID).Msg("Received repair request")
// Get the torrent from the cache
@@ -106,29 +97,23 @@ func (c *Cache) repairWorker() {
continue
}
}
c.repairsInProgress.Delete(torrentId)
}
}
func (c *Cache) reInsertTorrent(ct *CachedTorrent) error {
// Check if Magnet is not empty, if empty, reconstruct the magnet
torrent := ct.Torrent
if _, ok := c.repairsInProgress.Load(torrent.Id); ok {
oldID := torrent.Id // Store the old ID
if _, ok := c.repairsInProgress.Load(oldID); ok {
return fmt.Errorf("repair already in progress for torrent %s", torrent.Id)
}
c.repairsInProgress.Store(oldID, struct{}{})
defer c.repairsInProgress.Delete(oldID)
if torrent.Magnet == nil {
torrent.Magnet = utils.ConstructMagnet(torrent.InfoHash, torrent.Name)
}
oldID := torrent.Id
defer func() {
err := c.DeleteTorrent(oldID)
if err != nil {
c.logger.Error().Err(err).Str("torrentId", oldID).Msg("Failed to delete old torrent")
}
}()
// Submit the magnet to the debrid service
torrent.Id = ""
var err error
@@ -167,11 +152,20 @@ func (c *Cache) reInsertTorrent(ct *CachedTorrent) error {
if err != nil {
addedOn = time.Now()
}
// We can safely delete the old torrent here
if oldID != "" {
if err := c.DeleteTorrent(oldID); err != nil {
return fmt.Errorf("failed to delete old torrent: %w", err)
}
}
ct.Torrent = torrent
ct.IsComplete = len(torrent.Files) > 0
ct.AddedOn = addedOn
c.setTorrent(ct)
c.refreshListings()
c.RefreshListings(true)
c.logger.Debug().Str("torrentId", torrent.Id).Msg("Reinserted torrent")
return nil
}

View File

@@ -32,30 +32,24 @@ func (c *Cache) resetPropfindResponse() error {
for _, k := range keys {
c.PropfindResp.Delete(k)
}
if err := c.RefreshRclone(); err != nil {
c.logger.Trace().Err(err).Msg("Failed to refresh rclone") // silent error
}
c.logger.Trace().Msgf("Reset XML cache for %s", c.client.GetName())
return nil
}
func (c *Cache) refreshXml() error {
func (c *Cache) RefreshParentXml() error {
parents := []string{"__all__", "torrents"}
torrents := c.GetListing()
clientName := c.client.GetName()
for _, parent := range parents {
if err := c.refreshParentXml(torrents, parent); err != nil {
if err := c.refreshParentXml(torrents, clientName, parent); err != nil {
return fmt.Errorf("failed to refresh XML for %s: %v", parent, err)
}
}
if err := c.RefreshRclone(); err != nil {
c.logger.Trace().Err(err).Msg("Failed to refresh rclone") // silent error
}
c.logger.Trace().Msgf("Refreshed XML cache for %s", c.client.GetName())
return nil
}
func (c *Cache) refreshParentXml(torrents []os.FileInfo, parent string) error {
func (c *Cache) refreshParentXml(torrents []os.FileInfo, clientName, parent string) error {
// Define the WebDAV namespace
davNS := "DAV:"
@@ -70,7 +64,7 @@ func (c *Cache) refreshParentXml(torrents []os.FileInfo, parent string) error {
currentTime := time.Now().UTC().Format(http.TimeFormat)
// Add the parent directory
baseUrl := path.Clean(fmt.Sprintf("/webdav/%s/%s", c.client.GetName(), parent))
baseUrl := path.Clean(fmt.Sprintf("/webdav/%s/%s", clientName, parent))
parentPath := fmt.Sprintf("%s/", baseUrl)
addDirectoryResponse(multistatus, parentPath, parent, currentTime)
@@ -79,7 +73,7 @@ func (c *Cache) refreshParentXml(torrents []os.FileInfo, parent string) error {
name := torrent.Name()
// Note the path structure change - parent first, then torrent name
torrentPath := fmt.Sprintf("/webdav/%s/%s/%s/",
c.client.GetName(),
clientName,
parent,
name,
)

View File

@@ -149,6 +149,70 @@ func (q *QBit) ProcessSymlink(torrent *Torrent) (string, error) {
return q.createSymlinks(debridTorrent, torrentRclonePath, torrentFolder) // verify cos we're using external webdav
}
func (q *QBit) createSymlinksWebdav(debridTorrent *debrid.Torrent, rclonePath, torrentFolder string) (string, error) {
files := debridTorrent.Files
symlinkPath := filepath.Join(q.DownloadFolder, debridTorrent.Arr.Name, torrentFolder) // /mnt/symlinks/{category}/MyTVShow/
err := os.MkdirAll(symlinkPath, os.ModePerm)
if err != nil {
return "", fmt.Errorf("failed to create directory: %s: %v", symlinkPath, err)
}
remainingFiles := make(map[string]debrid.File)
for _, file := range files {
remainingFiles[file.Name] = file
}
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
timeout := time.After(30 * time.Minute)
filePaths := make([]string, 0, len(files))
for len(remainingFiles) > 0 {
select {
case <-ticker.C:
entries, err := os.ReadDir(rclonePath)
if err != nil {
continue
}
// Check which files exist in this batch
for _, entry := range entries {
if file, exists := remainingFiles[entry.Name()]; exists {
fullFilePath := filepath.Join(rclonePath, file.Name)
fileSymlinkPath := filepath.Join(symlinkPath, file.Name)
if err := os.Symlink(fullFilePath, fileSymlinkPath); err != nil {
q.logger.Debug().Msgf("Failed to create symlink: %s: %v", fileSymlinkPath, err)
} else {
q.logger.Info().Msgf("File is ready: %s", file.Name)
filePaths = append(filePaths, fileSymlinkPath)
delete(remainingFiles, file.Name)
}
}
}
case <-timeout:
q.logger.Warn().Msgf("Timeout waiting for files, %d files still pending", len(remainingFiles))
return symlinkPath, fmt.Errorf("timeout waiting for files")
}
}
if q.SkipPreCache {
return symlinkPath, nil
}
go func() {
if err := q.preCacheFile(debridTorrent.Name, filePaths); err != nil {
q.logger.Error().Msgf("Failed to pre-cache file: %s", err)
} else {
q.logger.Debug().Msgf("Pre-cached %d files", len(filePaths))
}
}() // Pre-cache the files in the background
// Pre-cache the first 256KB and 1MB of the file
return symlinkPath, nil
}
func (q *QBit) createSymlinks(debridTorrent *debrid.Torrent, rclonePath, torrentFolder string) (string, error) {
files := debridTorrent.Files
symlinkPath := filepath.Join(q.DownloadFolder, debridTorrent.Arr.Name, torrentFolder) // /mnt/symlinks/{category}/MyTVShow/
@@ -220,15 +284,6 @@ func (q *QBit) getTorrentPath(rclonePath string, debridTorrent *debrid.Torrent)
}
}
func (q *QBit) createSymLink(torrentFileMountPath, filePath string) string {
err := os.Symlink(torrentFileMountPath, filePath)
if err != nil {
// It's okay if the symlink already exists
q.logger.Debug().Msgf("Failed to create symlink: %s: %v", filePath, err)
}
return filePath
}
func (q *QBit) preCacheFile(name string, filePaths []string) error {
q.logger.Trace().Msgf("Pre-caching file: %s", name)
if len(filePaths) == 0 {

View File

@@ -126,7 +126,9 @@ func (q *QBit) ProcessFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr
}
rclonePath := filepath.Join(debridTorrent.MountPath, cache.GetTorrentFolder(debridTorrent)) // /mnt/remote/realdebrid/MyTVShow
torrentFolderNoExt := utils.RemoveExtension(debridTorrent.Name)
torrentSymlinkPath, err = q.createSymlinks(debridTorrent, rclonePath, torrentFolderNoExt) // /mnt/symlinks/{category}/MyTVShow/
timer := time.Now()
torrentSymlinkPath, err = q.createSymlinksWebdav(debridTorrent, rclonePath, torrentFolderNoExt) // /mnt/symlinks/{category}/MyTVShow/
q.logger.Debug().Msgf("Symlink creation took %s", time.Since(timer))
} else {
// User is using either zurg or debrid webdav

View File

@@ -258,7 +258,19 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// - Otherwise, for deeper (torrent folder) paths, use a longer TTL.
ttl := 1 * time.Minute
if h.isParentPath(r.URL.Path) {
// __all__ or torrents folder
// Manually build the xml
ttl = 30 * time.Second
if served := h.serveFromCacheIfValid(w, r, cacheKey, ttl); served {
return
}
// Refresh the parent XML
h.cache.RefreshListings(false)
// Check again if the cache is valid
// If not, we will use the default WebDAV handler
if served := h.serveFromCacheIfValid(w, r, cacheKey, ttl); served {
return
}
}
if served := h.serveFromCacheIfValid(w, r, cacheKey, ttl); served {