diff --git a/internal/config/config.go b/internal/config/config.go index 84bca76..13750a4 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -12,6 +12,13 @@ import ( "sync" ) +type RepairStrategy string + +const ( + RepairStrategyPerFile RepairStrategy = "per_file" + RepairStrategyPerTorrent RepairStrategy = "per_torrent" +) + var ( instance *Config once sync.Once @@ -60,13 +67,14 @@ type Arr struct { } type Repair struct { - Enabled bool `json:"enabled,omitempty"` - Interval string `json:"interval,omitempty"` - ZurgURL string `json:"zurg_url,omitempty"` - AutoProcess bool `json:"auto_process,omitempty"` - UseWebDav bool `json:"use_webdav,omitempty"` - Workers int `json:"workers,omitempty"` - ReInsert bool `json:"reinsert,omitempty"` + Enabled bool `json:"enabled,omitempty"` + Interval string `json:"interval,omitempty"` + ZurgURL string `json:"zurg_url,omitempty"` + AutoProcess bool `json:"auto_process,omitempty"` + UseWebDav bool `json:"use_webdav,omitempty"` + Workers int `json:"workers,omitempty"` + ReInsert bool `json:"reinsert,omitempty"` + Strategy RepairStrategy `json:"strategy,omitempty"` } type Auth struct { @@ -352,6 +360,11 @@ func (c *Config) setDefaults() { c.URLBase += "/" } + // Set repair defaults + if c.Repair.Strategy == "" { + c.Repair.Strategy = RepairStrategyPerTorrent + } + // Load the auth file c.Auth = c.GetAuth() } diff --git a/pkg/arr/arr.go b/pkg/arr/arr.go index 9b77cec..5934d5f 100644 --- a/pkg/arr/arr.go +++ b/pkg/arr/arr.go @@ -115,8 +115,10 @@ func (a *Arr) Validate() error { if err != nil { return err } - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("arr test failed: %s", resp.Status) + defer resp.Body.Close() + // If response is not 200 or 404(this is the case for Lidarr, etc), return an error + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNotFound { + return fmt.Errorf("failed to validate arr %s: %s", a.Name, resp.Status) } return nil } diff --git a/pkg/debrid/store/repair.go b/pkg/debrid/store/repair.go index b807f27..dccf0d8 100644 --- a/pkg/debrid/store/repair.go +++ b/pkg/debrid/store/repair.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/sirrobot01/decypharr/internal/config" "github.com/sirrobot01/decypharr/internal/utils" "github.com/sirrobot01/decypharr/pkg/debrid/types" "sync" @@ -60,6 +61,7 @@ func (c *Cache) markAsSuccessfullyReinserted(torrentId string) { func (c *Cache) GetBrokenFiles(t *CachedTorrent, filenames []string) []string { files := make(map[string]types.File) + repairStrategy := config.Get().Repair.Strategy brokenFiles := make([]string, 0) if len(filenames) > 0 { for name, f := range t.Files { @@ -93,6 +95,10 @@ func (c *Cache) GetBrokenFiles(t *CachedTorrent, filenames []string) []string { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // Use a mutex to protect brokenFiles slice and torrent-wide failure flag + var mu sync.Mutex + torrentWideFailed := false + wg.Add(len(files)) for _, f := range files { @@ -106,14 +112,33 @@ func (c *Cache) GetBrokenFiles(t *CachedTorrent, filenames []string) []string { } if f.Link == "" { - cancel() + mu.Lock() + if repairStrategy == config.RepairStrategyPerTorrent { + torrentWideFailed = true + mu.Unlock() + cancel() // Signal all other goroutines to stop + return + } else { + // per_file strategy - only mark this file as broken + brokenFiles = append(brokenFiles, f.Name) + } + mu.Unlock() return } if err := c.client.CheckLink(f.Link); err != nil { if errors.Is(err, utils.HosterUnavailableError) { - cancel() // Signal all other goroutines to stop - return + mu.Lock() + if repairStrategy == config.RepairStrategyPerTorrent { + torrentWideFailed = true + mu.Unlock() + cancel() // Signal all other goroutines to stop + return + } else { + // per_file strategy - only mark this file as broken + brokenFiles = append(brokenFiles, f.Name) + } + mu.Unlock() } } }(f) @@ -121,12 +146,14 @@ func (c *Cache) GetBrokenFiles(t *CachedTorrent, filenames []string) []string { wg.Wait() - // If context was cancelled, mark all files as broken - if ctx.Err() != nil { + // Handle the result based on strategy + if repairStrategy == config.RepairStrategyPerTorrent && torrentWideFailed { + // Mark all files as broken for per_torrent strategy for _, f := range files { brokenFiles = append(brokenFiles, f.Name) } } + // For per_file strategy, brokenFiles already contains only the broken ones // Try to reinsert the torrent if it's broken if len(brokenFiles) > 0 && t.Torrent != nil { diff --git a/pkg/web/templates/config.html b/pkg/web/templates/config.html index eb1999e..45dad9e 100644 --- a/pkg/web/templates/config.html +++ b/pkg/web/templates/config.html @@ -337,6 +337,14 @@
+
+ + + How to handle repairs, per torrent or per file +
f.size-byteRange[0] || end > f.size-byteRange[0] { + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", f.size)) + return -1 // Invalid range after offset + } start += byteRange[0] end += byteRange[0] } @@ -321,9 +328,11 @@ func (f *File) handleRangeRequest(upstreamReq *http.Request, r *http.Request, w func (f *File) streamVideoOptimized(w http.ResponseWriter, src io.Reader) error { // Use larger buffer for video streaming (better throughput) - buf := make([]byte, 64*1024) // 64KB buffer + buf := make([]byte, 256*1024) // 256KB buffer for better performance + flushInterval := 8 * 1024 // Flush every 8KB for responsive streaming + var totalWritten int - // First chunk optimization - send immediately for faster start + // Preread first chunk for immediate response n, err := src.Read(buf) if err != nil && err != io.EOF { if isClientDisconnection(err) { @@ -341,6 +350,7 @@ func (f *File) streamVideoOptimized(w http.ResponseWriter, src io.Reader) error } return &streamError{Err: writeErr, StatusCode: 0} } + totalWritten += n // Flush immediately for faster video start if flusher, ok := w.(http.Flusher); ok { @@ -352,16 +362,41 @@ func (f *File) streamVideoOptimized(w http.ResponseWriter, src io.Reader) error return nil } - // Continue with optimized copy for remaining data - _, err = io.CopyBuffer(w, src, buf) - if err != nil { - if isClientDisconnection(err) { - return &streamError{Err: err, StatusCode: 0, IsClientDisconnection: true} - } - return &streamError{Err: err, StatusCode: 0} - } + // Stream remaining data with periodic flushing + for { + n, err := src.Read(buf) + if n > 0 { + _, writeErr := w.Write(buf[:n]) + if writeErr != nil { + if isClientDisconnection(writeErr) { + return &streamError{Err: writeErr, StatusCode: 0, IsClientDisconnection: true} + } + return &streamError{Err: writeErr, StatusCode: 0} + } + totalWritten += n - return nil + // Flush periodically to maintain streaming performance + if totalWritten%flushInterval == 0 { + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } + } + } + + if err != nil { + if err == io.EOF { + // Final flush + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } + return nil + } + if isClientDisconnection(err) { + return &streamError{Err: err, StatusCode: 0, IsClientDisconnection: true} + } + return &streamError{Err: err, StatusCode: 0} + } + } } /*