Move to per-torrent repair; Fix issues issues with adding torrents

This commit is contained in:
Mukhtar Akere
2025-06-23 18:54:52 +01:00
parent 54c421a480
commit 817051589e
6 changed files with 100 additions and 73 deletions

View File

@@ -40,12 +40,10 @@ func RemoveInvalidChars(value string) string {
} }
func RemoveExtension(value string) string { func RemoveExtension(value string) string {
loc := mediaRegex.FindStringIndex(value) if loc := mediaRegex.FindStringIndex(value); loc != nil {
if loc != nil {
return value[:loc[0]] return value[:loc[0]]
} else {
return value
} }
return value
} }
func IsMediaFile(path string) bool { func IsMediaFile(path string) bool {

View File

@@ -514,9 +514,9 @@ func (c *Cache) setTorrent(t CachedTorrent, callback func(torrent CachedTorrent)
updatedTorrent.Files = mergedFiles updatedTorrent.Files = mergedFiles
} }
c.torrents.set(torrentName, t, updatedTorrent) c.torrents.set(torrentName, t, updatedTorrent)
c.SaveTorrent(t) go c.SaveTorrent(t)
if callback != nil { if callback != nil {
callback(updatedTorrent) go callback(updatedTorrent)
} }
} }
@@ -702,6 +702,7 @@ func (c *Cache) ProcessTorrent(t *types.Torrent) error {
func (c *Cache) Add(t *types.Torrent) error { func (c *Cache) Add(t *types.Torrent) error {
if len(t.Files) == 0 { if len(t.Files) == 0 {
c.logger.Warn().Msgf("Torrent %s has no files to add. Refreshing", t.Id)
if err := c.client.UpdateTorrent(t); err != nil { if err := c.client.UpdateTorrent(t); err != nil {
return fmt.Errorf("failed to update torrent: %w", err) return fmt.Errorf("failed to update torrent: %w", err)
} }

View File

@@ -137,66 +137,74 @@ func (c *Cache) refreshRclone() error {
} }
client := &http.Client{ client := &http.Client{
Timeout: 60 * time.Second, Timeout: 30 * time.Second,
Transport: &http.Transport{ Transport: &http.Transport{
MaxIdleConns: 10, MaxIdleConns: 10,
IdleConnTimeout: 60 * time.Second, IdleConnTimeout: 30 * time.Second,
DisableCompression: false, DisableCompression: false,
MaxIdleConnsPerHost: 5, MaxIdleConnsPerHost: 5,
}, },
} }
// Create form data // Create form data
data := "" data := c.buildRcloneRequestData()
if err := c.sendRcloneRequest(client, "vfs/forget", data); err != nil {
c.logger.Error().Err(err).Msg("Failed to send rclone vfs/forget request")
}
if err := c.sendRcloneRequest(client, "vfs/refresh", data); err != nil {
c.logger.Error().Err(err).Msg("Failed to send rclone vfs/refresh request")
}
return nil
}
func (c *Cache) buildRcloneRequestData() string {
cfg := c.config
dirs := strings.FieldsFunc(cfg.RcRefreshDirs, func(r rune) bool { dirs := strings.FieldsFunc(cfg.RcRefreshDirs, func(r rune) bool {
return r == ',' || r == '&' return r == ',' || r == '&'
}) })
if len(dirs) == 0 { if len(dirs) == 0 {
data = "dir=__all__" return "dir=__all__"
} else { }
for index, dir := range dirs {
if dir != "" { var data strings.Builder
if index == 0 { for index, dir := range dirs {
data += "dir=" + dir if dir != "" {
} else { if index == 0 {
data += "&dir" + fmt.Sprint(index+1) + "=" + dir data.WriteString("dir=" + dir)
} } else {
data.WriteString("&dir" + fmt.Sprint(index+1) + "=" + dir)
} }
} }
} }
return data.String()
}
sendRequest := func(endpoint string) error { func (c *Cache) sendRcloneRequest(client *http.Client, endpoint, data string) error {
req, err := http.NewRequest("POST", fmt.Sprintf("%s/%s", cfg.RcUrl, endpoint), strings.NewReader(data)) req, err := http.NewRequest("POST", fmt.Sprintf("%s/%s", c.config.RcUrl, endpoint), strings.NewReader(data))
if err != nil { if err != nil {
return err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
if cfg.RcUser != "" && cfg.RcPass != "" {
req.SetBasicAuth(cfg.RcUser, cfg.RcPass)
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
return fmt.Errorf("failed to perform %s: %s - %s", endpoint, resp.Status, string(body))
}
_, _ = io.Copy(io.Discard, resp.Body)
return nil
}
if err := sendRequest("vfs/forget"); err != nil {
return err
}
if err := sendRequest("vfs/refresh"); err != nil {
return err return err
} }
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
if c.config.RcUser != "" && c.config.RcPass != "" {
req.SetBasicAuth(c.config.RcUser, c.config.RcPass)
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
return fmt.Errorf("failed to perform %s: %s - %s", endpoint, resp.Status, string(body))
}
_, _ = io.Copy(io.Discard, resp.Body)
return nil return nil
} }

View File

@@ -89,23 +89,31 @@ func (c *Cache) GetBrokenFiles(t *CachedTorrent, filenames []string) []string {
} }
files = t.Files files = t.Files
var wg sync.WaitGroup var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg.Add(len(files)) wg.Add(len(files))
for _, f := range files { for _, f := range files {
// Check if file link is still missing
go func(f types.File) { go func(f types.File) {
defer wg.Done() defer wg.Done()
select {
case <-ctx.Done():
return
default:
}
if f.Link == "" { if f.Link == "" {
brokenFiles = append(brokenFiles, f.Name) cancel()
} else { return
// Check if file.Link not in the downloadLink Cache }
if err := c.client.CheckLink(f.Link); err != nil {
if errors.Is(err, utils.HosterUnavailableError) { if err := c.client.CheckLink(f.Link); err != nil {
brokenFiles = append(brokenFiles, f.Name) if errors.Is(err, utils.HosterUnavailableError) {
} cancel() // Signal all other goroutines to stop
return
} }
} }
}(f) }(f)
@@ -113,6 +121,13 @@ func (c *Cache) GetBrokenFiles(t *CachedTorrent, filenames []string) []string {
wg.Wait() wg.Wait()
// If context was cancelled, mark all files as broken
if ctx.Err() != nil {
for _, f := range files {
brokenFiles = append(brokenFiles, f.Name)
}
}
// Try to reinsert the torrent if it's broken // Try to reinsert the torrent if it's broken
if len(brokenFiles) > 0 && t.Torrent != nil { if len(brokenFiles) > 0 && t.Torrent != nil {
// Check if the torrent is already in progress // Check if the torrent is already in progress

View File

@@ -171,17 +171,18 @@ func (tc *torrentCache) refreshListing() {
wg.Add(1) // for all listing wg.Add(1) // for all listing
go func() { go func() {
defer wg.Done()
listing := make([]os.FileInfo, len(all)) listing := make([]os.FileInfo, len(all))
for i, sf := range all { for i, sf := range all {
listing[i] = &fileInfo{sf.id, sf.name, sf.size, 0755 | os.ModeDir, sf.modTime, true} listing[i] = &fileInfo{sf.id, sf.name, sf.size, 0755 | os.ModeDir, sf.modTime, true}
} }
tc.listing.Store(listing) tc.listing.Store(listing)
}() }()
wg.Done()
wg.Add(1) wg.Add(1)
// For __bad__ // For __bad__
go func() { go func() {
defer wg.Done()
listing := make([]os.FileInfo, 0) listing := make([]os.FileInfo, 0)
for _, sf := range all { for _, sf := range all {
if sf.bad { if sf.bad {
@@ -203,7 +204,6 @@ func (tc *torrentCache) refreshListing() {
} }
tc.folders.Unlock() tc.folders.Unlock()
}() }()
wg.Done()
now := time.Now() now := time.Now()
wg.Add(len(tc.directoriesFilters)) // for each directory filter wg.Add(len(tc.directoriesFilters)) // for each directory filter

View File

@@ -57,6 +57,8 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp
client := deb.Client() client := deb.Client()
downloadingStatuses := client.GetDownloadingStatus() downloadingStatuses := client.GetDownloadingStatus()
_arr := importReq.Arr _arr := importReq.Arr
backoff := time.NewTimer(s.refreshInterval)
defer backoff.Stop()
for debridTorrent.Status != "downloaded" { for debridTorrent.Status != "downloaded" {
s.logger.Debug().Msgf("%s <- (%s) Download Progress: %.2f%%", debridTorrent.Debrid, debridTorrent.Name, debridTorrent.Progress) s.logger.Debug().Msgf("%s <- (%s) Download Progress: %.2f%%", debridTorrent.Debrid, debridTorrent.Name, debridTorrent.Progress)
dbT, err := client.CheckStatus(debridTorrent) dbT, err := client.CheckStatus(debridTorrent)
@@ -83,10 +85,12 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp
if debridTorrent.Status == "downloaded" || !utils.Contains(downloadingStatuses, debridTorrent.Status) { if debridTorrent.Status == "downloaded" || !utils.Contains(downloadingStatuses, debridTorrent.Status) {
break break
} }
if !utils.Contains(client.GetDownloadingStatus(), debridTorrent.Status) { select {
break case <-backoff.C:
// Increase interval gradually, cap at max
nextInterval := min(s.refreshInterval*2, 30*time.Second)
backoff.Reset(nextInterval)
} }
time.Sleep(s.refreshInterval)
} }
var torrentSymlinkPath string var torrentSymlinkPath string
var err error var err error
@@ -96,15 +100,15 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp
timer := time.Now() timer := time.Now()
onFailed := func(err error) { onFailed := func(err error) {
if err != nil { s.markTorrentAsFailed(torrent)
s.markTorrentAsFailed(torrent) go func() {
go func() { if deleteErr := client.DeleteTorrent(debridTorrent.Id); deleteErr != nil {
_ = client.DeleteTorrent(debridTorrent.Id) s.logger.Warn().Err(deleteErr).Msgf("Failed to delete torrent %s", debridTorrent.Id)
}() }
s.logger.Error().Err(err).Msgf("Error occured while processing torrent %s", debridTorrent.Name) }()
importReq.markAsFailed(err, torrent, debridTorrent) s.logger.Error().Err(err).Msgf("Error occured while processing torrent %s", debridTorrent.Name)
return importReq.markAsFailed(err, torrent, debridTorrent)
} return
} }
onSuccess := func(torrentSymlinkPath string) { onSuccess := func(torrentSymlinkPath string) {
@@ -118,7 +122,9 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp
s.logger.Error().Msgf("Error sending discord message: %v", err) s.logger.Error().Msgf("Error sending discord message: %v", err)
} }
}() }()
_arr.Refresh() go func() {
_arr.Refresh()
}()
} }
switch importReq.Action { switch importReq.Action {
@@ -137,7 +143,6 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp
rclonePath := filepath.Join(debridTorrent.MountPath, cache.GetTorrentFolder(debridTorrent)) // /mnt/remote/realdebrid/MyTVShow rclonePath := filepath.Join(debridTorrent.MountPath, cache.GetTorrentFolder(debridTorrent)) // /mnt/remote/realdebrid/MyTVShow
torrentFolderNoExt := utils.RemoveExtension(debridTorrent.Name) torrentFolderNoExt := utils.RemoveExtension(debridTorrent.Name)
torrentSymlinkPath, err = s.createSymlinksWebdav(torrent, debridTorrent, rclonePath, torrentFolderNoExt) // /mnt/symlinks/{category}/MyTVShow/ torrentSymlinkPath, err = s.createSymlinksWebdav(torrent, debridTorrent, rclonePath, torrentFolderNoExt) // /mnt/symlinks/{category}/MyTVShow/
} else { } else {
// User is using either zurg or debrid webdav // User is using either zurg or debrid webdav
torrentSymlinkPath, err = s.processSymlink(torrent, debridTorrent) // /mnt/symlinks/{category}/MyTVShow/ torrentSymlinkPath, err = s.processSymlink(torrent, debridTorrent) // /mnt/symlinks/{category}/MyTVShow/