Update stream client; Add repair strategy

This commit is contained in:
Mukhtar Akere
2025-07-01 04:42:33 +01:00
parent a4ee0973cc
commit 8d092615db
5 changed files with 119 additions and 33 deletions

View File

@@ -12,6 +12,13 @@ import (
"sync" "sync"
) )
type RepairStrategy string
const (
RepairStrategyPerFile RepairStrategy = "per_file"
RepairStrategyPerTorrent RepairStrategy = "per_torrent"
)
var ( var (
instance *Config instance *Config
once sync.Once once sync.Once
@@ -67,6 +74,7 @@ type Repair struct {
UseWebDav bool `json:"use_webdav,omitempty"` UseWebDav bool `json:"use_webdav,omitempty"`
Workers int `json:"workers,omitempty"` Workers int `json:"workers,omitempty"`
ReInsert bool `json:"reinsert,omitempty"` ReInsert bool `json:"reinsert,omitempty"`
Strategy RepairStrategy `json:"strategy,omitempty"`
} }
type Auth struct { type Auth struct {
@@ -352,6 +360,11 @@ func (c *Config) setDefaults() {
c.URLBase += "/" c.URLBase += "/"
} }
// Set repair defaults
if c.Repair.Strategy == "" {
c.Repair.Strategy = RepairStrategyPerTorrent
}
// Load the auth file // Load the auth file
c.Auth = c.GetAuth() c.Auth = c.GetAuth()
} }

View File

@@ -115,8 +115,10 @@ func (a *Arr) Validate() error {
if err != nil { if err != nil {
return err return err
} }
if resp.StatusCode != http.StatusOK { defer resp.Body.Close()
return fmt.Errorf("arr test failed: %s", resp.Status) // If response is not 200 or 404(this is the case for Lidarr, etc), return an error
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNotFound {
return fmt.Errorf("failed to validate arr %s: %s", a.Name, resp.Status)
} }
return nil return nil
} }

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/sirrobot01/decypharr/internal/config"
"github.com/sirrobot01/decypharr/internal/utils" "github.com/sirrobot01/decypharr/internal/utils"
"github.com/sirrobot01/decypharr/pkg/debrid/types" "github.com/sirrobot01/decypharr/pkg/debrid/types"
"sync" "sync"
@@ -60,6 +61,7 @@ func (c *Cache) markAsSuccessfullyReinserted(torrentId string) {
func (c *Cache) GetBrokenFiles(t *CachedTorrent, filenames []string) []string { func (c *Cache) GetBrokenFiles(t *CachedTorrent, filenames []string) []string {
files := make(map[string]types.File) files := make(map[string]types.File)
repairStrategy := config.Get().Repair.Strategy
brokenFiles := make([]string, 0) brokenFiles := make([]string, 0)
if len(filenames) > 0 { if len(filenames) > 0 {
for name, f := range t.Files { for name, f := range t.Files {
@@ -93,6 +95,10 @@ func (c *Cache) GetBrokenFiles(t *CachedTorrent, filenames []string) []string {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
// Use a mutex to protect brokenFiles slice and torrent-wide failure flag
var mu sync.Mutex
torrentWideFailed := false
wg.Add(len(files)) wg.Add(len(files))
for _, f := range files { for _, f := range files {
@@ -106,14 +112,33 @@ func (c *Cache) GetBrokenFiles(t *CachedTorrent, filenames []string) []string {
} }
if f.Link == "" { if f.Link == "" {
cancel() mu.Lock()
if repairStrategy == config.RepairStrategyPerTorrent {
torrentWideFailed = true
mu.Unlock()
cancel() // Signal all other goroutines to stop
return
} else {
// per_file strategy - only mark this file as broken
brokenFiles = append(brokenFiles, f.Name)
}
mu.Unlock()
return return
} }
if err := c.client.CheckLink(f.Link); err != nil { if err := c.client.CheckLink(f.Link); err != nil {
if errors.Is(err, utils.HosterUnavailableError) { if errors.Is(err, utils.HosterUnavailableError) {
mu.Lock()
if repairStrategy == config.RepairStrategyPerTorrent {
torrentWideFailed = true
mu.Unlock()
cancel() // Signal all other goroutines to stop cancel() // Signal all other goroutines to stop
return return
} else {
// per_file strategy - only mark this file as broken
brokenFiles = append(brokenFiles, f.Name)
}
mu.Unlock()
} }
} }
}(f) }(f)
@@ -121,12 +146,14 @@ func (c *Cache) GetBrokenFiles(t *CachedTorrent, filenames []string) []string {
wg.Wait() wg.Wait()
// If context was cancelled, mark all files as broken // Handle the result based on strategy
if ctx.Err() != nil { if repairStrategy == config.RepairStrategyPerTorrent && torrentWideFailed {
// Mark all files as broken for per_torrent strategy
for _, f := range files { for _, f := range files {
brokenFiles = append(brokenFiles, f.Name) brokenFiles = append(brokenFiles, f.Name)
} }
} }
// For per_file strategy, brokenFiles already contains only the broken ones
// Try to reinsert the torrent if it's broken // Try to reinsert the torrent if it's broken
if len(brokenFiles) > 0 && t.Torrent != nil { if len(brokenFiles) > 0 && t.Torrent != nil {

View File

@@ -337,6 +337,14 @@
</div> </div>
</div> </div>
<div class="row"> <div class="row">
<div class="col-md-4 mb-3">
<label class="form-label" for="repair.strategy">Repair Strategy</label>
<select class="form-select" name="repair.strategy" id="repair.strategy">
<option value="per_torrent" selected>Per Torrent</option>
<option value="per_file">Per File</option>
</select>
<small class="form-text text-muted">How to handle repairs, per torrent or per file</small>
</div>
<div class="col-md-4 mb-3"> <div class="col-md-4 mb-3">
<div class="form-check"> <div class="form-check">
<input type="checkbox" class="form-check-input" name="repair.use_webdav" <input type="checkbox" class="form-check-input" name="repair.use_webdav"
@@ -1226,6 +1234,7 @@
enabled: document.querySelector('[name="repair.enabled"]').checked, enabled: document.querySelector('[name="repair.enabled"]').checked,
interval: document.querySelector('[name="repair.interval"]').value, interval: document.querySelector('[name="repair.interval"]').value,
zurg_url: document.querySelector('[name="repair.zurg_url"]').value, zurg_url: document.querySelector('[name="repair.zurg_url"]').value,
strategy: document.querySelector('[name="repair.strategy"]').value,
workers: parseInt(document.querySelector('[name="repair.workers"]').value), workers: parseInt(document.querySelector('[name="repair.workers"]').value),
use_webdav: document.querySelector('[name="repair.use_webdav"]').checked, use_webdav: document.querySelector('[name="repair.use_webdav"]').checked,
auto_process: document.querySelector('[name="repair.auto_process"]').checked auto_process: document.querySelector('[name="repair.auto_process"]').checked

View File

@@ -15,14 +15,16 @@ import (
var sharedClient = &http.Client{ var sharedClient = &http.Client{
Transport: &http.Transport{ Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
MaxIdleConns: 100, MaxIdleConns: 200,
MaxIdleConnsPerHost: 20, MaxIdleConnsPerHost: 50,
MaxConnsPerHost: 50, MaxConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second, IdleConnTimeout: 300 * time.Second,
TLSHandshakeTimeout: 10 * time.Second, TLSHandshakeTimeout: 15 * time.Second,
ResponseHeaderTimeout: 30 * time.Second, ResponseHeaderTimeout: 60 * time.Second,
ExpectContinueTimeout: 1 * time.Second, ExpectContinueTimeout: 2 * time.Second,
DisableKeepAlives: false, DisableKeepAlives: false,
WriteBufferSize: 64 * 1024,
ReadBufferSize: 64 * 1024,
}, },
Timeout: 0, Timeout: 0,
} }
@@ -143,7 +145,7 @@ func (f *File) StreamResponse(w http.ResponseWriter, r *http.Request) error {
} }
func (f *File) streamWithRetry(w http.ResponseWriter, r *http.Request, retryCount int) error { func (f *File) streamWithRetry(w http.ResponseWriter, r *http.Request, retryCount int) error {
const maxRetries = 0 const maxRetries = 3
_log := f.cache.Logger() _log := f.cache.Logger()
// Get download link (with caching optimization) // Get download link (with caching optimization)
@@ -311,6 +313,11 @@ func (f *File) handleRangeRequest(upstreamReq *http.Request, r *http.Request, w
start, end := ranges[0].start, ranges[0].end start, end := ranges[0].start, ranges[0].end
if byteRange != nil { if byteRange != nil {
// Add bounds checking to prevent overflow
if start > f.size-byteRange[0] || end > f.size-byteRange[0] {
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", f.size))
return -1 // Invalid range after offset
}
start += byteRange[0] start += byteRange[0]
end += byteRange[0] end += byteRange[0]
} }
@@ -321,9 +328,11 @@ func (f *File) handleRangeRequest(upstreamReq *http.Request, r *http.Request, w
func (f *File) streamVideoOptimized(w http.ResponseWriter, src io.Reader) error { func (f *File) streamVideoOptimized(w http.ResponseWriter, src io.Reader) error {
// Use larger buffer for video streaming (better throughput) // Use larger buffer for video streaming (better throughput)
buf := make([]byte, 64*1024) // 64KB buffer buf := make([]byte, 256*1024) // 256KB buffer for better performance
flushInterval := 8 * 1024 // Flush every 8KB for responsive streaming
var totalWritten int
// First chunk optimization - send immediately for faster start // Preread first chunk for immediate response
n, err := src.Read(buf) n, err := src.Read(buf)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
if isClientDisconnection(err) { if isClientDisconnection(err) {
@@ -341,6 +350,7 @@ func (f *File) streamVideoOptimized(w http.ResponseWriter, src io.Reader) error
} }
return &streamError{Err: writeErr, StatusCode: 0} return &streamError{Err: writeErr, StatusCode: 0}
} }
totalWritten += n
// Flush immediately for faster video start // Flush immediately for faster video start
if flusher, ok := w.(http.Flusher); ok { if flusher, ok := w.(http.Flusher); ok {
@@ -352,16 +362,41 @@ func (f *File) streamVideoOptimized(w http.ResponseWriter, src io.Reader) error
return nil return nil
} }
// Continue with optimized copy for remaining data // Stream remaining data with periodic flushing
_, err = io.CopyBuffer(w, src, buf) for {
n, err := src.Read(buf)
if n > 0 {
_, writeErr := w.Write(buf[:n])
if writeErr != nil {
if isClientDisconnection(writeErr) {
return &streamError{Err: writeErr, StatusCode: 0, IsClientDisconnection: true}
}
return &streamError{Err: writeErr, StatusCode: 0}
}
totalWritten += n
// Flush periodically to maintain streaming performance
if totalWritten%flushInterval == 0 {
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
}
}
if err != nil { if err != nil {
if err == io.EOF {
// Final flush
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
return nil
}
if isClientDisconnection(err) { if isClientDisconnection(err) {
return &streamError{Err: err, StatusCode: 0, IsClientDisconnection: true} return &streamError{Err: err, StatusCode: 0, IsClientDisconnection: true}
} }
return &streamError{Err: err, StatusCode: 0} return &streamError{Err: err, StatusCode: 0}
} }
}
return nil
} }
/* /*