- Revamp decypharr arch \n

- Add callback_ur, download_folder to addContent API \n
- Fix few bugs \n
- More declarative UI keywords
- Speed up repairs
- Few other improvements/bug fixes
This commit is contained in:
Mukhtar Akere
2025-06-02 12:57:36 +01:00
parent 1cd09239f9
commit 9c6c44d785
67 changed files with 1726 additions and 1464 deletions

View File

@@ -3,6 +3,7 @@ package repair
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/go-co-op/gocron/v2"
"github.com/google/uuid"
@@ -12,7 +13,7 @@ import (
"github.com/sirrobot01/decypharr/internal/request"
"github.com/sirrobot01/decypharr/internal/utils"
"github.com/sirrobot01/decypharr/pkg/arr"
"github.com/sirrobot01/decypharr/pkg/debrid/debrid"
"github.com/sirrobot01/decypharr/pkg/debrid"
"golang.org/x/sync/errgroup"
"net"
"net/http"
@@ -29,7 +30,7 @@ import (
type Repair struct {
Jobs map[string]*Job
arrs *arr.Storage
deb *debrid.Engine
deb *debrid.Storage
interval string
runOnStart bool
ZurgURL string
@@ -40,7 +41,10 @@ type Repair struct {
filename string
workers int
scheduler gocron.Scheduler
ctx context.Context
debridPathCache map[string]string // Cache for path -> debrid name mapping
cacheMutex sync.RWMutex
ctx context.Context
}
type JobStatus string
@@ -51,6 +55,7 @@ const (
JobFailed JobStatus = "failed"
JobCompleted JobStatus = "completed"
JobProcessing JobStatus = "processing"
JobCancelled JobStatus = "cancelled"
)
type Job struct {
@@ -66,9 +71,12 @@ type Job struct {
Recurrent bool `json:"recurrent"`
Error string `json:"error"`
cancelFunc context.CancelFunc
ctx context.Context
}
func New(arrs *arr.Storage, engine *debrid.Engine) *Repair {
func New(arrs *arr.Storage, engine *debrid.Storage) *Repair {
cfg := config.Get()
workers := runtime.NumCPU() * 20
if cfg.Repair.Workers > 0 {
@@ -220,7 +228,8 @@ func (r *Repair) newJob(arrsNames []string, mediaIDs []string) *Job {
func (r *Repair) preRunChecks() error {
if r.useWebdav {
if len(r.deb.Caches) == 0 {
caches := r.deb.GetCaches()
if len(caches) == 0 {
return fmt.Errorf("no caches found")
}
return nil
@@ -254,21 +263,59 @@ func (r *Repair) AddJob(arrsNames []string, mediaIDs []string, autoProcess, recu
job.AutoProcess = autoProcess
job.Recurrent = recurrent
r.reset(job)
job.ctx, job.cancelFunc = context.WithCancel(r.ctx)
r.Jobs[key] = job
go r.saveToFile()
go func() {
if err := r.repair(job); err != nil {
r.logger.Error().Err(err).Msg("Error running repair")
r.logger.Error().Err(err).Msg("Error running repair")
job.FailedAt = time.Now()
job.Error = err.Error()
job.Status = JobFailed
job.CompletedAt = time.Now()
if !errors.Is(job.ctx.Err(), context.Canceled) {
job.FailedAt = time.Now()
job.Error = err.Error()
job.Status = JobFailed
job.CompletedAt = time.Now()
} else {
job.FailedAt = time.Now()
job.Error = err.Error()
job.Status = JobFailed
job.CompletedAt = time.Now()
}
}
}()
return nil
}
func (r *Repair) StopJob(id string) error {
job := r.GetJob(id)
if job == nil {
return fmt.Errorf("job %s not found", id)
}
// Check if job can be stopped
if job.Status != JobStarted && job.Status != JobProcessing {
return fmt.Errorf("job %s cannot be stopped (status: %s)", id, job.Status)
}
// Cancel the job
if job.cancelFunc != nil {
job.cancelFunc()
r.logger.Info().Msgf("Job %s cancellation requested", id)
go func() {
if job.Status == JobStarted || job.Status == JobProcessing {
job.Status = JobCancelled
job.CompletedAt = time.Now()
job.Error = "Job was cancelled by user"
r.saveToFile()
}
}()
return nil
}
return fmt.Errorf("job %s cannot be cancelled", id)
}
func (r *Repair) repair(job *Job) error {
defer r.saveToFile()
if err := r.preRunChecks(); err != nil {
@@ -278,7 +325,7 @@ func (r *Repair) repair(job *Job) error {
// Use a mutex to protect concurrent access to brokenItems
var mu sync.Mutex
brokenItems := map[string][]arr.ContentFile{}
g, ctx := errgroup.WithContext(r.ctx)
g, ctx := errgroup.WithContext(job.ctx)
for _, a := range job.Arrs {
a := a // Capture range variable
@@ -321,6 +368,14 @@ func (r *Repair) repair(job *Job) error {
// Wait for all goroutines to complete and check for errors
if err := g.Wait(); err != nil {
// Check if j0b was canceled
if errors.Is(ctx.Err(), context.Canceled) {
job.Status = JobCancelled
job.CompletedAt = time.Now()
job.Error = "Job was cancelled"
return fmt.Errorf("job cancelled")
}
job.FailedAt = time.Now()
job.Error = err.Error()
job.Status = JobFailed
@@ -367,7 +422,7 @@ func (r *Repair) repair(job *Job) error {
return nil
}
func (r *Repair) repairArr(j *Job, _arr string, tmdbId string) ([]arr.ContentFile, error) {
func (r *Repair) repairArr(job *Job, _arr string, tmdbId string) ([]arr.ContentFile, error) {
brokenItems := make([]arr.ContentFile, 0)
a := r.arrs.Get(_arr)
@@ -384,7 +439,7 @@ func (r *Repair) repairArr(j *Job, _arr string, tmdbId string) ([]arr.ContentFil
return brokenItems, nil
}
// Check first media to confirm mounts are accessible
if !r.isMediaAccessible(media[0]) {
if !r.isMediaAccessible(media) {
r.logger.Info().Msgf("Skipping repair. Parent directory not accessible for. Check your mounts")
return brokenItems, nil
}
@@ -400,14 +455,14 @@ func (r *Repair) repairArr(j *Job, _arr string, tmdbId string) ([]arr.ContentFil
defer wg.Done()
for m := range workerChan {
select {
case <-r.ctx.Done():
case <-job.ctx.Done():
return
default:
}
items := r.getBrokenFiles(m)
items := r.getBrokenFiles(job, m)
if items != nil {
r.logger.Debug().Msgf("Found %d broken files for %s", len(items), m.Title)
if j.AutoProcess {
if job.AutoProcess {
r.logger.Info().Msgf("Auto processing %d broken items for %s", len(items), m.Title)
// Delete broken items
@@ -431,7 +486,7 @@ func (r *Repair) repairArr(j *Job, _arr string, tmdbId string) ([]arr.ContentFil
for _, m := range media {
select {
case <-r.ctx.Done():
case <-job.ctx.Done():
break
default:
workerChan <- m
@@ -449,43 +504,49 @@ func (r *Repair) repairArr(j *Job, _arr string, tmdbId string) ([]arr.ContentFil
return brokenItems, nil
}
func (r *Repair) isMediaAccessible(m arr.Content) bool {
files := m.Files
// isMediaAccessible checks if the mounts are accessible
func (r *Repair) isMediaAccessible(media []arr.Content) bool {
firstMedia := media[0]
for _, m := range media {
if len(m.Files) > 0 {
firstMedia = m
break
}
}
files := firstMedia.Files
if len(files) == 0 {
return false
}
firstFile := files[0]
r.logger.Debug().Msgf("Checking parent directory for %s", firstFile.Path)
//if _, err := os.Stat(firstFile.Path); os.IsNotExist(err) {
// r.logger.Debug().Msgf("Parent directory not accessible for %s", firstFile.Path)
// return false
//}
// Check symlink parent directory
symlinkPath := getSymlinkTarget(firstFile.Path)
r.logger.Debug().Msgf("Checking symlink parent directory for %s", symlinkPath)
parentSymlink := ""
if symlinkPath != "" {
parentSymlink := filepath.Dir(filepath.Dir(symlinkPath)) // /mnt/zurg/torrents/movie/movie.mkv -> /mnt/zurg/torrents
parentSymlink = filepath.Dir(filepath.Dir(symlinkPath)) // /mnt/zurg/torrents/movie/movie.mkv -> /mnt/zurg/torrents
}
if parentSymlink != "" {
if _, err := os.Stat(parentSymlink); os.IsNotExist(err) {
return false
}
return true
}
return true
return false
}
func (r *Repair) getBrokenFiles(media arr.Content) []arr.ContentFile {
func (r *Repair) getBrokenFiles(job *Job, media arr.Content) []arr.ContentFile {
if r.useWebdav {
return r.getWebdavBrokenFiles(media)
return r.getWebdavBrokenFiles(job, media)
} else if r.IsZurg {
return r.getZurgBrokenFiles(media)
return r.getZurgBrokenFiles(job, media)
} else {
return r.getFileBrokenFiles(media)
return r.getFileBrokenFiles(job, media)
}
}
func (r *Repair) getFileBrokenFiles(media arr.Content) []arr.ContentFile {
func (r *Repair) getFileBrokenFiles(job *Job, media arr.Content) []arr.ContentFile {
// This checks symlink target, try to get read a tiny bit of the file
brokenFiles := make([]arr.ContentFile, 0)
@@ -510,7 +571,7 @@ func (r *Repair) getFileBrokenFiles(media arr.Content) []arr.ContentFile {
return brokenFiles
}
func (r *Repair) getZurgBrokenFiles(media arr.Content) []arr.ContentFile {
func (r *Repair) getZurgBrokenFiles(job *Job, media arr.Content) []arr.ContentFile {
// Use zurg setup to check file availability with zurg
// This reduces bandwidth usage significantly
@@ -550,12 +611,17 @@ func (r *Repair) getZurgBrokenFiles(media arr.Content) []arr.ContentFile {
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
r.logger.Debug().Msgf("Failed to get download url for %s", fullURL)
resp.Body.Close()
if err := resp.Body.Close(); err != nil {
return nil
}
brokenFiles = append(brokenFiles, file)
continue
}
downloadUrl := resp.Request.URL.String()
resp.Body.Close()
if err := resp.Body.Close(); err != nil {
return nil
}
if downloadUrl != "" {
r.logger.Trace().Msgf("Found download url: %s", downloadUrl)
} else {
@@ -573,16 +639,16 @@ func (r *Repair) getZurgBrokenFiles(media arr.Content) []arr.ContentFile {
return brokenFiles
}
func (r *Repair) getWebdavBrokenFiles(media arr.Content) []arr.ContentFile {
func (r *Repair) getWebdavBrokenFiles(job *Job, media arr.Content) []arr.ContentFile {
// Use internal webdav setup to check file availability
caches := r.deb.Caches
caches := r.deb.GetCaches()
if len(caches) == 0 {
r.logger.Info().Msg("No caches found. Can't use webdav")
return nil
}
clients := r.deb.Clients
clients := r.deb.GetClients()
if len(clients) == 0 {
r.logger.Info().Msg("No clients found. Can't use webdav")
return nil
@@ -590,58 +656,36 @@ func (r *Repair) getWebdavBrokenFiles(media arr.Content) []arr.ContentFile {
brokenFiles := make([]arr.ContentFile, 0)
uniqueParents := collectFiles(media)
for torrentPath, f := range uniqueParents {
r.logger.Debug().Msgf("Checking %s", torrentPath)
// Get the debrid first
dir := filepath.Dir(torrentPath)
debridName := ""
for _, client := range clients {
mountPath := client.GetMountPath()
if mountPath == "" {
continue
var brokenFilesMutex sync.Mutex
var wg sync.WaitGroup
// Limit concurrent torrent checks
semaphore := make(chan struct{}, min(len(uniqueParents), 30)) // Limit to 5 concurrent checks
for torrentPath, files := range uniqueParents {
wg.Add(1)
go func(torrentPath string, files []arr.ContentFile) {
defer wg.Done()
semaphore <- struct{}{} // Acquire
defer func() { <-semaphore }() // Release
select {
case <-job.ctx.Done():
return
default:
}
if filepath.Clean(mountPath) == filepath.Clean(dir) {
debridName = client.GetName()
break
}
}
if debridName == "" {
r.logger.Debug().Msgf("No debrid found for %s. Skipping", torrentPath)
continue
}
cache, ok := caches[debridName]
if !ok {
r.logger.Debug().Msgf("No cache found for %s. Skipping", debridName)
continue
}
// Check if torrent exists
torrentName := filepath.Clean(filepath.Base(torrentPath))
torrent := cache.GetTorrentByName(torrentName)
if torrent == nil {
r.logger.Debug().Msgf("No torrent found for %s. Skipping", torrentName)
brokenFiles = append(brokenFiles, f...)
continue
}
files := make([]string, 0)
for _, file := range f {
files = append(files, file.TargetPath)
}
brokenFilesForTorrent := r.checkTorrentFiles(torrentPath, files, clients, caches)
_brokenFiles := cache.GetBrokenFiles(torrent, files)
totalBrokenFiles := len(_brokenFiles)
if totalBrokenFiles > 0 {
r.logger.Debug().Msgf("%d broken files found in %s", totalBrokenFiles, torrentName)
for _, contentFile := range f {
if utils.Contains(_brokenFiles, contentFile.TargetPath) {
brokenFiles = append(brokenFiles, contentFile)
}
if len(brokenFilesForTorrent) > 0 {
brokenFilesMutex.Lock()
brokenFiles = append(brokenFiles, brokenFilesForTorrent...)
brokenFilesMutex.Unlock()
}
}
}(torrentPath, files)
}
wg.Wait()
if len(brokenFiles) == 0 {
r.logger.Debug().Msgf("No broken files found for %s", media.Title)
return nil
}
r.logger.Debug().Msgf("%d broken files found for %s", len(brokenFiles), media.Title)
@@ -696,7 +740,11 @@ func (r *Repair) ProcessJob(id string) error {
return nil
}
g, ctx := errgroup.WithContext(r.ctx)
if job.ctx == nil || job.ctx.Err() != nil {
job.ctx, job.cancelFunc = context.WithCancel(r.ctx)
}
g, ctx := errgroup.WithContext(job.ctx)
g.SetLimit(r.workers)
for arrName, items := range brokenItems {