15 Commits

Author SHA1 Message Date
Mukhtar Akere
fa6920f94a Merge branch 'beta'
Some checks failed
GoReleaser / goreleaser (push) Has been cancelled
Release Docker Build / docker (push) Has been cancelled
2025-07-09 05:14:39 +01:00
Mukhtar Akere
dba5604d79 fix refresh rclone http client 2025-07-07 00:08:48 +01:00
iPromKnight
f656b7e4e2 feat: Allow deleting all __bad__ with a single button (#98) 2025-07-04 20:13:12 +01:00
Mukhtar Akere
c7b07137c5 Fix repair bug 2025-07-03 23:36:30 +01:00
Mukhtar Akere
c0aa4eaeba Fix modtime bug 2025-07-02 01:17:31 +01:00
Mukhtar Akere
2c90e518aa fix playback issues 2025-07-01 16:10:23 +01:00
Mukhtar Akere
dec7d93272 fix streaming 2025-07-01 15:28:19 +01:00
Mukhtar Akere
8d092615db Update stream client; Add repair strategy 2025-07-01 04:42:33 +01:00
iPromKnight
a4ee0973cc fix: AllDebrid webdav compatibility, and uncached downloads (#97) 2025-07-01 04:10:21 +01:00
Mukhtar Akere
ab12610346 Merge branch 'beta'
Some checks failed
GoReleaser / goreleaser (push) Has been cancelled
Release Docker Build / docker (push) Has been cancelled
2025-06-26 21:15:22 +01:00
Mukhtar Akere
1d19be9013 hotfix repair html table 2025-06-26 07:31:12 +01:00
Mukhtar Akere
cee0e20fe1 hotfix repair and download rate limit 2025-06-26 06:08:50 +01:00
Mukhtar Akere
a3e698e04f Add repair and download rate limit 2025-06-26 05:45:20 +01:00
Mukhtar Akere
e123a2fd5e Hotfix issues with 1.0.3 2025-06-26 03:51:28 +01:00
Mukhtar Akere
817051589e Move to per-torrent repair; Fix issues issues with adding torrents 2025-06-23 18:54:52 +01:00
29 changed files with 489 additions and 576 deletions

View File

@@ -72,5 +72,5 @@ body:
label: Trace Logs have been provided as applicable label: Trace Logs have been provided as applicable
description: Trace logs are **generally required** and are not optional for all bug reports and contain `trace`. Info logs are invalid for bug reports and do not contain `debug` nor `trace` description: Trace logs are **generally required** and are not optional for all bug reports and contain `trace`. Info logs are invalid for bug reports and do not contain `debug` nor `trace`
options: options:
- label: I have read and followed the steps in the wiki link above and provided the required trace logs - the logs contain `trace` - that are relevant and show this issue. - label: I have read and followed the steps in the documentation link and provided the required trace logs - the logs contain `trace` - that are relevant and show this issue.
required: true required: true

View File

@@ -62,6 +62,11 @@ Create a `config.json` file in `/opt/decypharr/` with your Decypharr configurati
``` ```
### Docker Compose Setup
- Check your current user and group IDs by running `id -u` and `id -g` in your terminal. You can use these values to set the `PUID` and `PGID` environment variables in the Docker Compose file.
- You should also set `user` to your user ID and group ID in the Docker Compose file to ensure proper file permissions.
Create a `docker-compose.yml` file with the following content: Create a `docker-compose.yml` file with the following content:
```yaml ```yaml
@@ -69,11 +74,14 @@ services:
decypharr: decypharr:
image: cy01/blackhole:latest image: cy01/blackhole:latest
container_name: decypharr container_name: decypharr
user: "${PUID:-1000}:${PGID:-1000}"
volumes: volumes:
- /mnt/:/mnt:rslave - /mnt/:/mnt:rslave
- /opt/decypharr/:/app - /opt/decypharr/:/app
environment: environment:
- UMASK=002 - UMASK=002
- PUID=1000 # Replace with your user ID
- PGID=1000 # Replace with your group ID
ports: ports:
- "8282:8282/tcp" - "8282:8282/tcp"
restart: unless-stopped restart: unless-stopped

3
go.mod
View File

@@ -14,16 +14,17 @@ require (
github.com/robfig/cron/v3 v3.0.1 github.com/robfig/cron/v3 v3.0.1
github.com/rs/zerolog v1.33.0 github.com/rs/zerolog v1.33.0
github.com/stanNthe5/stringbuf v0.0.3 github.com/stanNthe5/stringbuf v0.0.3
go.uber.org/ratelimit v0.3.1
golang.org/x/crypto v0.33.0 golang.org/x/crypto v0.33.0
golang.org/x/net v0.35.0 golang.org/x/net v0.35.0
golang.org/x/sync v0.12.0 golang.org/x/sync v0.12.0
golang.org/x/time v0.8.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/natefinch/lumberjack.v2 v2.2.1
) )
require ( require (
github.com/anacrolix/missinggo v1.3.0 // indirect github.com/anacrolix/missinggo v1.3.0 // indirect
github.com/anacrolix/missinggo/v2 v2.7.3 // indirect github.com/anacrolix/missinggo/v2 v2.7.3 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 // indirect github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/google/go-cmp v0.6.0 // indirect github.com/google/go-cmp v0.6.0 // indirect

8
go.sum
View File

@@ -36,6 +36,8 @@ github.com/anacrolix/tagflag v1.1.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CM
github.com/anacrolix/torrent v1.55.0 h1:s9yh/YGdPmbN9dTa+0Inh2dLdrLQRvEAj1jdFW/Hdd8= github.com/anacrolix/torrent v1.55.0 h1:s9yh/YGdPmbN9dTa+0Inh2dLdrLQRvEAj1jdFW/Hdd8=
github.com/anacrolix/torrent v1.55.0/go.mod h1:sBdZHBSZNj4de0m+EbYg7vvs/G/STubxu/GzzNbojsE= github.com/anacrolix/torrent v1.55.0/go.mod h1:sBdZHBSZNj4de0m+EbYg7vvs/G/STubxu/GzzNbojsE=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/immutable v0.2.0/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI= github.com/benbjohnson/immutable v0.2.0/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
@@ -216,8 +218,12 @@ github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPy
go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0=
go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus=
@@ -266,8 +272,6 @@ golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@@ -12,6 +12,13 @@ import (
"sync" "sync"
) )
type RepairStrategy string
const (
RepairStrategyPerFile RepairStrategy = "per_file"
RepairStrategyPerTorrent RepairStrategy = "per_torrent"
)
var ( var (
instance *Config instance *Config
once sync.Once once sync.Once
@@ -19,17 +26,19 @@ var (
) )
type Debrid struct { type Debrid struct {
Name string `json:"name,omitempty"` Name string `json:"name,omitempty"`
APIKey string `json:"api_key,omitempty"` APIKey string `json:"api_key,omitempty"`
DownloadAPIKeys []string `json:"download_api_keys,omitempty"` DownloadAPIKeys []string `json:"download_api_keys,omitempty"`
Folder string `json:"folder,omitempty"` Folder string `json:"folder,omitempty"`
DownloadUncached bool `json:"download_uncached,omitempty"` DownloadUncached bool `json:"download_uncached,omitempty"`
CheckCached bool `json:"check_cached,omitempty"` CheckCached bool `json:"check_cached,omitempty"`
RateLimit string `json:"rate_limit,omitempty"` // 200/minute or 10/second RateLimit string `json:"rate_limit,omitempty"` // 200/minute or 10/second
Proxy string `json:"proxy,omitempty"` RepairRateLimit string `json:"repair_rate_limit,omitempty"`
UnpackRar bool `json:"unpack_rar,omitempty"` DownloadRateLimit string `json:"download_rate_limit,omitempty"`
AddSamples bool `json:"add_samples,omitempty"` Proxy string `json:"proxy,omitempty"`
MinimumFreeSlot int `json:"minimum_free_slot,omitempty"` // Minimum active pots to use this debrid UnpackRar bool `json:"unpack_rar,omitempty"`
AddSamples bool `json:"add_samples,omitempty"`
MinimumFreeSlot int `json:"minimum_free_slot,omitempty"` // Minimum active pots to use this debrid
UseWebDav bool `json:"use_webdav,omitempty"` UseWebDav bool `json:"use_webdav,omitempty"`
WebDav WebDav
@@ -58,13 +67,14 @@ type Arr struct {
} }
type Repair struct { type Repair struct {
Enabled bool `json:"enabled,omitempty"` Enabled bool `json:"enabled,omitempty"`
Interval string `json:"interval,omitempty"` Interval string `json:"interval,omitempty"`
ZurgURL string `json:"zurg_url,omitempty"` ZurgURL string `json:"zurg_url,omitempty"`
AutoProcess bool `json:"auto_process,omitempty"` AutoProcess bool `json:"auto_process,omitempty"`
UseWebDav bool `json:"use_webdav,omitempty"` UseWebDav bool `json:"use_webdav,omitempty"`
Workers int `json:"workers,omitempty"` Workers int `json:"workers,omitempty"`
ReInsert bool `json:"reinsert,omitempty"` ReInsert bool `json:"reinsert,omitempty"`
Strategy RepairStrategy `json:"strategy,omitempty"`
} }
type Auth struct { type Auth struct {
@@ -350,6 +360,11 @@ func (c *Config) setDefaults() {
c.URLBase += "/" c.URLBase += "/"
} }
// Set repair defaults
if c.Repair.Strategy == "" {
c.Repair.Strategy = RepairStrategyPerTorrent
}
// Load the auth file // Load the auth file
c.Auth = c.GetAuth() c.Auth = c.GetAuth()
} }

View File

@@ -9,10 +9,9 @@ import (
"fmt" "fmt"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/sirrobot01/decypharr/internal/logger" "github.com/sirrobot01/decypharr/internal/logger"
"go.uber.org/ratelimit"
"golang.org/x/net/proxy" "golang.org/x/net/proxy"
"golang.org/x/time/rate"
"io" "io"
"math"
"math/rand" "math/rand"
"net" "net"
"net/http" "net/http"
@@ -52,7 +51,7 @@ type ClientOption func(*Client)
// Client represents an HTTP client with additional capabilities // Client represents an HTTP client with additional capabilities
type Client struct { type Client struct {
client *http.Client client *http.Client
rateLimiter *rate.Limiter rateLimiter ratelimit.Limiter
headers map[string]string headers map[string]string
headersMu sync.RWMutex headersMu sync.RWMutex
maxRetries int maxRetries int
@@ -84,7 +83,7 @@ func WithRedirectPolicy(policy func(req *http.Request, via []*http.Request) erro
} }
// WithRateLimiter sets a rate limiter // WithRateLimiter sets a rate limiter
func WithRateLimiter(rl *rate.Limiter) ClientOption { func WithRateLimiter(rl ratelimit.Limiter) ClientOption {
return func(c *Client) { return func(c *Client) {
c.rateLimiter = rl c.rateLimiter = rl
} }
@@ -136,9 +135,11 @@ func WithProxy(proxyURL string) ClientOption {
// doRequest performs a single HTTP request with rate limiting // doRequest performs a single HTTP request with rate limiting
func (c *Client) doRequest(req *http.Request) (*http.Response, error) { func (c *Client) doRequest(req *http.Request) (*http.Response, error) {
if c.rateLimiter != nil { if c.rateLimiter != nil {
err := c.rateLimiter.Wait(req.Context()) select {
if err != nil { case <-req.Context().Done():
return nil, fmt.Errorf("rate limiter wait: %w", err) return nil, req.Context().Err()
default:
c.rateLimiter.Take()
} }
} }
@@ -339,7 +340,10 @@ func New(options ...ClientOption) *Client {
return client return client
} }
func ParseRateLimit(rateStr string) *rate.Limiter { func ParseRateLimit(rateStr string) ratelimit.Limiter {
if rateStr == "" {
return nil
}
parts := strings.SplitN(rateStr, "/", 2) parts := strings.SplitN(rateStr, "/", 2)
if len(parts) != 2 { if len(parts) != 2 {
return nil return nil
@@ -351,23 +355,21 @@ func ParseRateLimit(rateStr string) *rate.Limiter {
return nil return nil
} }
// Set slack size to 10%
slackSize := count / 10
// normalize unit // normalize unit
unit := strings.ToLower(strings.TrimSpace(parts[1])) unit := strings.ToLower(strings.TrimSpace(parts[1]))
unit = strings.TrimSuffix(unit, "s") unit = strings.TrimSuffix(unit, "s")
burstSize := int(math.Ceil(float64(count) * 0.1))
if burstSize < 1 {
burstSize = 1
}
if burstSize > count {
burstSize = count
}
switch unit { switch unit {
case "minute", "min": case "minute", "min":
return rate.NewLimiter(rate.Limit(float64(count)/60.0), burstSize) return ratelimit.New(count, ratelimit.Per(time.Minute), ratelimit.WithSlack(slackSize))
case "second", "sec": case "second", "sec":
return rate.NewLimiter(rate.Limit(float64(count)), burstSize) return ratelimit.New(count, ratelimit.Per(time.Second), ratelimit.WithSlack(slackSize))
case "hour", "hr": case "hour", "hr":
return rate.NewLimiter(rate.Limit(float64(count)/3600.0), burstSize) return ratelimit.New(count, ratelimit.Per(time.Hour), ratelimit.WithSlack(slackSize))
case "day", "d":
return ratelimit.New(count, ratelimit.Per(24*time.Hour), ratelimit.WithSlack(slackSize))
default: default:
return nil return nil
} }

View File

@@ -40,12 +40,10 @@ func RemoveInvalidChars(value string) string {
} }
func RemoveExtension(value string) string { func RemoveExtension(value string) string {
loc := mediaRegex.FindStringIndex(value) if loc := mediaRegex.FindStringIndex(value); loc != nil {
if loc != nil {
return value[:loc[0]] return value[:loc[0]]
} else {
return value
} }
return value
} }
func IsMediaFile(path string) bool { func IsMediaFile(path string) bool {

View File

@@ -115,8 +115,10 @@ func (a *Arr) Validate() error {
if err != nil { if err != nil {
return err return err
} }
if resp.StatusCode != http.StatusOK { defer resp.Body.Close()
return fmt.Errorf("arr test failed: %s", resp.Status) // If response is not 200 or 404(this is the case for Lidarr, etc), return an error
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNotFound {
return fmt.Errorf("failed to validate arr %s: %s", a.Name, resp.Status)
} }
return nil return nil
} }

View File

@@ -309,7 +309,7 @@ func (ad *AllDebrid) GetFileDownloadLinks(t *types.Torrent) error {
errCh <- err errCh <- err
return return
} }
if link != nil { if link == nil {
errCh <- fmt.Errorf("download link is empty") errCh <- fmt.Errorf("download link is empty")
return return
} }

View File

@@ -1,5 +1,10 @@
package alldebrid package alldebrid
import (
"encoding/json"
"fmt"
)
type errorResponse struct { type errorResponse struct {
Code string `json:"code"` Code string `json:"code"`
Message string `json:"message"` Message string `json:"message"`
@@ -32,6 +37,8 @@ type magnetInfo struct {
Files []MagnetFile `json:"files"` Files []MagnetFile `json:"files"`
} }
type Magnets []magnetInfo
type TorrentInfoResponse struct { type TorrentInfoResponse struct {
Status string `json:"status"` Status string `json:"status"`
Data struct { Data struct {
@@ -43,7 +50,7 @@ type TorrentInfoResponse struct {
type TorrentsListResponse struct { type TorrentsListResponse struct {
Status string `json:"status"` Status string `json:"status"`
Data struct { Data struct {
Magnets []magnetInfo `json:"magnets"` Magnets Magnets `json:"magnets"`
} `json:"data"` } `json:"data"`
Error *errorResponse `json:"error"` Error *errorResponse `json:"error"`
} }
@@ -81,3 +88,27 @@ type DownloadLink struct {
} `json:"data"` } `json:"data"`
Error *errorResponse `json:"error"` Error *errorResponse `json:"error"`
} }
// UnmarshalJSON implements custom unmarshaling for Magnets type
// It can handle both an array of magnetInfo objects or a map with string keys.
// If the input is an array, it will be unmarshaled directly into the Magnets slice.
// If the input is a map, it will extract the values and append them to the Magnets slice.
// If the input is neither, it will return an error.
func (m *Magnets) UnmarshalJSON(data []byte) error {
// Try to unmarshal as array
var arr []magnetInfo
if err := json.Unmarshal(data, &arr); err == nil {
*m = arr
return nil
}
// Try to unmarshal as map
var obj map[string]magnetInfo
if err := json.Unmarshal(data, &obj); err == nil {
for _, v := range obj {
*m = append(*m, v)
}
return nil
}
return fmt.Errorf("magnets: unsupported JSON format")
}

View File

@@ -2,6 +2,7 @@ package realdebrid
import ( import (
"bytes" "bytes"
"cmp"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@@ -33,6 +34,7 @@ type RealDebrid struct {
DownloadUncached bool DownloadUncached bool
client *request.Client client *request.Client
downloadClient *request.Client downloadClient *request.Client
repairClient *request.Client
autoExpiresLinksAfter time.Duration autoExpiresLinksAfter time.Duration
MountPath string MountPath string
@@ -49,6 +51,8 @@ type RealDebrid struct {
func New(dc config.Debrid) (*RealDebrid, error) { func New(dc config.Debrid) (*RealDebrid, error) {
rl := request.ParseRateLimit(dc.RateLimit) rl := request.ParseRateLimit(dc.RateLimit)
repairRl := request.ParseRateLimit(cmp.Or(dc.RepairRateLimit, dc.RateLimit))
downloadRl := request.ParseRateLimit(cmp.Or(dc.DownloadRateLimit, dc.RateLimit))
headers := map[string]string{ headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", dc.APIKey), "Authorization": fmt.Sprintf("Bearer %s", dc.APIKey),
@@ -77,11 +81,20 @@ func New(dc config.Debrid) (*RealDebrid, error) {
request.WithProxy(dc.Proxy), request.WithProxy(dc.Proxy),
), ),
downloadClient: request.New( downloadClient: request.New(
request.WithRateLimiter(downloadRl),
request.WithLogger(_log), request.WithLogger(_log),
request.WithMaxRetries(10), request.WithMaxRetries(10),
request.WithRetryableStatus(429, 447, 502), request.WithRetryableStatus(429, 447, 502),
request.WithProxy(dc.Proxy), request.WithProxy(dc.Proxy),
), ),
repairClient: request.New(
request.WithRateLimiter(repairRl),
request.WithHeaders(headers),
request.WithLogger(_log),
request.WithMaxRetries(4),
request.WithRetryableStatus(429, 502),
request.WithProxy(dc.Proxy),
),
MountPath: dc.Folder, MountPath: dc.Folder,
logger: logger.New(dc.Name), logger: logger.New(dc.Name),
rarSemaphore: make(chan struct{}, 2), rarSemaphore: make(chan struct{}, 2),
@@ -608,7 +621,7 @@ func (r *RealDebrid) CheckLink(link string) error {
"link": {link}, "link": {link},
} }
req, _ := http.NewRequest(http.MethodPost, url, strings.NewReader(payload.Encode())) req, _ := http.NewRequest(http.MethodPost, url, strings.NewReader(payload.Encode()))
resp, err := r.client.Do(req) resp, err := r.repairClient.Do(req)
if err != nil { if err != nil {
return err return err
} }
@@ -621,7 +634,7 @@ func (r *RealDebrid) CheckLink(link string) error {
func (r *RealDebrid) _getDownloadLink(file *types.File) (*types.DownloadLink, error) { func (r *RealDebrid) _getDownloadLink(file *types.File) (*types.DownloadLink, error) {
url := fmt.Sprintf("%s/unrestrict/link/", r.Host) url := fmt.Sprintf("%s/unrestrict/link/", r.Host)
_link := file.Link _link := file.Link
if strings.HasPrefix(_link, "https://real-debrid.com/d/") { if strings.HasPrefix(file.Link, "https://real-debrid.com/d/") && len(file.Link) > 39 {
_link = file.Link[0:39] _link = file.Link[0:39]
} }
payload := gourl.Values{ payload := gourl.Values{

View File

@@ -514,9 +514,9 @@ func (c *Cache) setTorrent(t CachedTorrent, callback func(torrent CachedTorrent)
updatedTorrent.Files = mergedFiles updatedTorrent.Files = mergedFiles
} }
c.torrents.set(torrentName, t, updatedTorrent) c.torrents.set(torrentName, t, updatedTorrent)
c.SaveTorrent(t) go c.SaveTorrent(t)
if callback != nil { if callback != nil {
callback(updatedTorrent) go callback(updatedTorrent)
} }
} }
@@ -702,6 +702,7 @@ func (c *Cache) ProcessTorrent(t *types.Torrent) error {
func (c *Cache) Add(t *types.Torrent) error { func (c *Cache) Add(t *types.Torrent) error {
if len(t.Files) == 0 { if len(t.Files) == 0 {
c.logger.Warn().Msgf("Torrent %s has no files to add. Refreshing", t.Id)
if err := c.client.UpdateTorrent(t); err != nil { if err := c.client.UpdateTorrent(t); err != nil {
return fmt.Errorf("failed to update torrent: %w", err) return fmt.Errorf("failed to update torrent: %w", err)
} }

View File

@@ -136,67 +136,67 @@ func (c *Cache) refreshRclone() error {
return nil return nil
} }
client := &http.Client{ client := http.DefaultClient
Timeout: 60 * time.Second,
Transport: &http.Transport{
MaxIdleConns: 10,
IdleConnTimeout: 60 * time.Second,
DisableCompression: false,
MaxIdleConnsPerHost: 5,
},
}
// Create form data // Create form data
data := "" data := c.buildRcloneRequestData()
if err := c.sendRcloneRequest(client, "vfs/forget", data); err != nil {
c.logger.Error().Err(err).Msg("Failed to send rclone vfs/forget request")
}
if err := c.sendRcloneRequest(client, "vfs/refresh", data); err != nil {
c.logger.Error().Err(err).Msg("Failed to send rclone vfs/refresh request")
}
return nil
}
func (c *Cache) buildRcloneRequestData() string {
cfg := c.config
dirs := strings.FieldsFunc(cfg.RcRefreshDirs, func(r rune) bool { dirs := strings.FieldsFunc(cfg.RcRefreshDirs, func(r rune) bool {
return r == ',' || r == '&' return r == ',' || r == '&'
}) })
if len(dirs) == 0 { if len(dirs) == 0 {
data = "dir=__all__" return "dir=__all__"
} else { }
for index, dir := range dirs {
if dir != "" { var data strings.Builder
if index == 0 { for index, dir := range dirs {
data += "dir=" + dir if dir != "" {
} else { if index == 0 {
data += "&dir" + fmt.Sprint(index+1) + "=" + dir data.WriteString("dir=" + dir)
} } else {
data.WriteString("&dir" + fmt.Sprint(index+1) + "=" + dir)
} }
} }
} }
return data.String()
}
sendRequest := func(endpoint string) error { func (c *Cache) sendRcloneRequest(client *http.Client, endpoint, data string) error {
req, err := http.NewRequest("POST", fmt.Sprintf("%s/%s", cfg.RcUrl, endpoint), strings.NewReader(data)) req, err := http.NewRequest("POST", fmt.Sprintf("%s/%s", c.config.RcUrl, endpoint), strings.NewReader(data))
if err != nil { if err != nil {
return err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
if cfg.RcUser != "" && cfg.RcPass != "" {
req.SetBasicAuth(cfg.RcUser, cfg.RcPass)
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
return fmt.Errorf("failed to perform %s: %s - %s", endpoint, resp.Status, string(body))
}
_, _ = io.Copy(io.Discard, resp.Body)
return nil
}
if err := sendRequest("vfs/forget"); err != nil {
return err
}
if err := sendRequest("vfs/refresh"); err != nil {
return err return err
} }
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
if c.config.RcUser != "" && c.config.RcPass != "" {
req.SetBasicAuth(c.config.RcUser, c.config.RcPass)
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
return fmt.Errorf("failed to perform %s: %s - %s", endpoint, resp.Status, string(body))
}
_, _ = io.Copy(io.Discard, resp.Body)
return nil return nil
} }

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/sirrobot01/decypharr/internal/config"
"github.com/sirrobot01/decypharr/internal/utils" "github.com/sirrobot01/decypharr/internal/utils"
"github.com/sirrobot01/decypharr/pkg/debrid/types" "github.com/sirrobot01/decypharr/pkg/debrid/types"
"sync" "sync"
@@ -60,6 +61,7 @@ func (c *Cache) markAsSuccessfullyReinserted(torrentId string) {
func (c *Cache) GetBrokenFiles(t *CachedTorrent, filenames []string) []string { func (c *Cache) GetBrokenFiles(t *CachedTorrent, filenames []string) []string {
files := make(map[string]types.File) files := make(map[string]types.File)
repairStrategy := config.Get().Repair.Strategy
brokenFiles := make([]string, 0) brokenFiles := make([]string, 0)
if len(filenames) > 0 { if len(filenames) > 0 {
for name, f := range t.Files { for name, f := range t.Files {
@@ -89,23 +91,54 @@ func (c *Cache) GetBrokenFiles(t *CachedTorrent, filenames []string) []string {
} }
files = t.Files files = t.Files
var wg sync.WaitGroup var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Use a mutex to protect brokenFiles slice and torrent-wide failure flag
var mu sync.Mutex
torrentWideFailed := false
wg.Add(len(files)) wg.Add(len(files))
for _, f := range files { for _, f := range files {
// Check if file link is still missing
go func(f types.File) { go func(f types.File) {
defer wg.Done() defer wg.Done()
select {
case <-ctx.Done():
return
default:
}
if f.Link == "" { if f.Link == "" {
brokenFiles = append(brokenFiles, f.Name) mu.Lock()
} else { if repairStrategy == config.RepairStrategyPerTorrent {
// Check if file.Link not in the downloadLink Cache torrentWideFailed = true
if err := c.client.CheckLink(f.Link); err != nil { mu.Unlock()
if errors.Is(err, utils.HosterUnavailableError) { cancel() // Signal all other goroutines to stop
return
} else {
// per_file strategy - only mark this file as broken
brokenFiles = append(brokenFiles, f.Name)
}
mu.Unlock()
return
}
if err := c.client.CheckLink(f.Link); err != nil {
if errors.Is(err, utils.HosterUnavailableError) {
mu.Lock()
if repairStrategy == config.RepairStrategyPerTorrent {
torrentWideFailed = true
mu.Unlock()
cancel() // Signal all other goroutines to stop
return
} else {
// per_file strategy - only mark this file as broken
brokenFiles = append(brokenFiles, f.Name) brokenFiles = append(brokenFiles, f.Name)
} }
mu.Unlock()
} }
} }
}(f) }(f)
@@ -113,6 +146,15 @@ func (c *Cache) GetBrokenFiles(t *CachedTorrent, filenames []string) []string {
wg.Wait() wg.Wait()
// Handle the result based on strategy
if repairStrategy == config.RepairStrategyPerTorrent && torrentWideFailed {
// Mark all files as broken for per_torrent strategy
for _, f := range files {
brokenFiles = append(brokenFiles, f.Name)
}
}
// For per_file strategy, brokenFiles already contains only the broken ones
// Try to reinsert the torrent if it's broken // Try to reinsert the torrent if it's broken
if len(brokenFiles) > 0 && t.Torrent != nil { if len(brokenFiles) > 0 && t.Torrent != nil {
// Check if the torrent is already in progress // Check if the torrent is already in progress

View File

@@ -171,17 +171,18 @@ func (tc *torrentCache) refreshListing() {
wg.Add(1) // for all listing wg.Add(1) // for all listing
go func() { go func() {
defer wg.Done()
listing := make([]os.FileInfo, len(all)) listing := make([]os.FileInfo, len(all))
for i, sf := range all { for i, sf := range all {
listing[i] = &fileInfo{sf.id, sf.name, sf.size, 0755 | os.ModeDir, sf.modTime, true} listing[i] = &fileInfo{sf.id, sf.name, sf.size, 0755 | os.ModeDir, sf.modTime, true}
} }
tc.listing.Store(listing) tc.listing.Store(listing)
}() }()
wg.Done()
wg.Add(1) wg.Add(1)
// For __bad__ // For __bad__
go func() { go func() {
defer wg.Done()
listing := make([]os.FileInfo, 0) listing := make([]os.FileInfo, 0)
for _, sf := range all { for _, sf := range all {
if sf.bad { if sf.bad {
@@ -203,7 +204,6 @@ func (tc *torrentCache) refreshListing() {
} }
tc.folders.Unlock() tc.folders.Unlock()
}() }()
wg.Done()
now := time.Now() now := time.Now()
wg.Add(len(tc.directoriesFilters)) // for each directory filter wg.Add(len(tc.directoriesFilters)) // for each directory filter

View File

@@ -18,7 +18,7 @@ func NewAccounts(debridConf config.Debrid) *Accounts {
if token == "" { if token == "" {
continue continue
} }
account := newAccount(token, idx) account := newAccount(debridConf.Name, token, idx)
accounts = append(accounts, account) accounts = append(accounts, account)
} }
@@ -33,6 +33,7 @@ func NewAccounts(debridConf config.Debrid) *Accounts {
} }
type Account struct { type Account struct {
Debrid string // e.g., "realdebrid", "torbox", etc.
Order int Order int
Disabled bool Disabled bool
Token string Token string
@@ -176,30 +177,31 @@ func (a *Accounts) SetDownloadLinks(links map[string]*DownloadLink) {
a.Current().setLinks(links) a.Current().setLinks(links)
} }
func newAccount(token string, index int) *Account { func newAccount(debridName, token string, index int) *Account {
return &Account{ return &Account{
Token: token, Debrid: debridName,
Order: index, Token: token,
links: make(map[string]*DownloadLink), Order: index,
links: make(map[string]*DownloadLink),
} }
} }
func (a *Account) getLink(fileLink string) (*DownloadLink, bool) { func (a *Account) getLink(fileLink string) (*DownloadLink, bool) {
a.mu.RLock() a.mu.RLock()
defer a.mu.RUnlock() defer a.mu.RUnlock()
dl, ok := a.links[fileLink[0:39]] dl, ok := a.links[a.sliceFileLink(fileLink)]
return dl, ok return dl, ok
} }
func (a *Account) setLink(fileLink string, dl *DownloadLink) { func (a *Account) setLink(fileLink string, dl *DownloadLink) {
a.mu.Lock() a.mu.Lock()
defer a.mu.Unlock() defer a.mu.Unlock()
a.links[fileLink[0:39]] = dl a.links[a.sliceFileLink(fileLink)] = dl
} }
func (a *Account) deleteLink(fileLink string) { func (a *Account) deleteLink(fileLink string) {
a.mu.Lock() a.mu.Lock()
defer a.mu.Unlock() defer a.mu.Unlock()
delete(a.links, fileLink[0:39]) delete(a.links, a.sliceFileLink(fileLink))
} }
func (a *Account) resetDownloadLinks() { func (a *Account) resetDownloadLinks() {
a.mu.Lock() a.mu.Lock()
@@ -225,6 +227,17 @@ func (a *Account) setLinks(links map[string]*DownloadLink) {
// Expired, continue // Expired, continue
continue continue
} }
a.links[dl.Link[0:39]] = dl a.links[a.sliceFileLink(dl.Link)] = dl
} }
} }
// slice download link
func (a *Account) sliceFileLink(fileLink string) string {
if a.Debrid != "realdebrid" {
return fileLink
}
if len(fileLink) < 39 {
return fileLink
}
return fileLink[0:39]
}

View File

@@ -88,6 +88,8 @@ func collectFiles(media arr.Content) map[string][]arr.ContentFile {
func (r *Repair) checkTorrentFiles(torrentPath string, files []arr.ContentFile, clients map[string]types.Client, caches map[string]*store.Cache) []arr.ContentFile { func (r *Repair) checkTorrentFiles(torrentPath string, files []arr.ContentFile, clients map[string]types.Client, caches map[string]*store.Cache) []arr.ContentFile {
brokenFiles := make([]arr.ContentFile, 0) brokenFiles := make([]arr.ContentFile, 0)
emptyFiles := make([]arr.ContentFile, 0)
r.logger.Debug().Msgf("Checking %s", torrentPath) r.logger.Debug().Msgf("Checking %s", torrentPath)
// Get the debrid client // Get the debrid client
@@ -95,17 +97,18 @@ func (r *Repair) checkTorrentFiles(torrentPath string, files []arr.ContentFile,
debridName := r.findDebridForPath(dir, clients) debridName := r.findDebridForPath(dir, clients)
if debridName == "" { if debridName == "" {
r.logger.Debug().Msgf("No debrid found for %s. Skipping", torrentPath) r.logger.Debug().Msgf("No debrid found for %s. Skipping", torrentPath)
return files // Return all files as broken if no debrid found return emptyFiles
} }
cache, ok := caches[debridName] cache, ok := caches[debridName]
if !ok { if !ok {
r.logger.Debug().Msgf("No cache found for %s. Skipping", debridName) r.logger.Debug().Msgf("No cache found for %s. Skipping", debridName)
return files // Return all files as broken if no cache found return emptyFiles
} }
tor, ok := r.torrentsMap.Load(debridName) tor, ok := r.torrentsMap.Load(debridName)
if !ok { if !ok {
r.logger.Debug().Msgf("Could not find torrents for %s. Skipping", debridName) r.logger.Debug().Msgf("Could not find torrents for %s. Skipping", debridName)
return emptyFiles
} }
torrentsMap := tor.(map[string]store.CachedTorrent) torrentsMap := tor.(map[string]store.CachedTorrent)
@@ -114,8 +117,9 @@ func (r *Repair) checkTorrentFiles(torrentPath string, files []arr.ContentFile,
torrentName := filepath.Clean(filepath.Base(torrentPath)) torrentName := filepath.Clean(filepath.Base(torrentPath))
torrent, ok := torrentsMap[torrentName] torrent, ok := torrentsMap[torrentName]
if !ok { if !ok {
r.logger.Debug().Msgf("No torrent found for %s. Skipping", torrentName) r.logger.Debug().Msgf("Can't find torrent %s in %s. Marking as broken", torrentName, debridName)
return files // Return all files as broken if torrent not found // Return all files as broken
return files
} }
// Batch check files // Batch check files

View File

@@ -75,26 +75,6 @@ type Job struct {
ctx context.Context ctx context.Context
} }
func (j *Job) getUnprocessedBrokenItems() map[string][]arr.ContentFile {
items := make(map[string][]arr.ContentFile)
for arrName, files := range j.BrokenItems {
if len(files) == 0 {
continue // Skip empty file lists
}
items[arrName] = make([]arr.ContentFile, 0, len(files))
for _, file := range files {
if file.Path != "" && file.TargetPath != "" && !file.Processed {
items[arrName] = append(items[arrName], file)
}
}
}
if len(items) == 0 {
return nil // Return nil if no unprocessed items found
}
return items
}
func New(arrs *arr.Storage, engine *debrid.Storage) *Repair { func New(arrs *arr.Storage, engine *debrid.Storage) *Repair {
cfg := config.Get() cfg := config.Get()
workers := runtime.NumCPU() * 20 workers := runtime.NumCPU() * 20
@@ -690,35 +670,17 @@ func (r *Repair) getWebdavBrokenFiles(job *Job, media arr.Content) []arr.Content
brokenFiles := make([]arr.ContentFile, 0) brokenFiles := make([]arr.ContentFile, 0)
uniqueParents := collectFiles(media) uniqueParents := collectFiles(media)
var brokenFilesMutex sync.Mutex
var wg sync.WaitGroup
// Limit concurrent torrent checks
semaphore := make(chan struct{}, min(len(uniqueParents), 30)) // Limit to 5 concurrent checks
for torrentPath, files := range uniqueParents { for torrentPath, files := range uniqueParents {
wg.Add(1) select {
go func(torrentPath string, files []arr.ContentFile) { case <-job.ctx.Done():
defer wg.Done() return brokenFiles
semaphore <- struct{}{} // Acquire default:
defer func() { <-semaphore }() // Release }
brokenFilesForTorrent := r.checkTorrentFiles(torrentPath, files, clients, caches)
select { if len(brokenFilesForTorrent) > 0 {
case <-job.ctx.Done(): brokenFiles = append(brokenFiles, brokenFilesForTorrent...)
return }
default:
}
brokenFilesForTorrent := r.checkTorrentFiles(torrentPath, files, clients, caches)
if len(brokenFilesForTorrent) > 0 {
brokenFilesMutex.Lock()
brokenFiles = append(brokenFiles, brokenFilesForTorrent...)
brokenFilesMutex.Unlock()
}
}(torrentPath, files)
} }
wg.Wait()
if len(brokenFiles) == 0 { if len(brokenFiles) == 0 {
return nil return nil
} }
@@ -765,7 +727,7 @@ func (r *Repair) ProcessJob(id string) error {
return fmt.Errorf("job %s already failed", id) return fmt.Errorf("job %s already failed", id)
} }
brokenItems := job.getUnprocessedBrokenItems() brokenItems := job.BrokenItems
if len(brokenItems) == 0 { if len(brokenItems) == 0 {
r.logger.Info().Msgf("No broken items found for job %s", id) r.logger.Info().Msgf("No broken items found for job %s", id)
job.CompletedAt = time.Now() job.CompletedAt = time.Now()
@@ -773,144 +735,63 @@ func (r *Repair) ProcessJob(id string) error {
return nil return nil
} }
r.logger.Info().Msgf("Processing job %s with %d broken items", id, len(brokenItems))
go r.processJob(job, brokenItems)
return nil
}
func (r *Repair) processJob(job *Job, brokenItems map[string][]arr.ContentFile) {
if job.ctx == nil || job.ctx.Err() != nil { if job.ctx == nil || job.ctx.Err() != nil {
job.ctx, job.cancelFunc = context.WithCancel(r.ctx) job.ctx, job.cancelFunc = context.WithCancel(r.ctx)
} }
errs := make([]error, 0) g, ctx := errgroup.WithContext(job.ctx)
processedCount := 0 g.SetLimit(r.workers)
for arrName, items := range brokenItems { for arrName, items := range brokenItems {
select { items := items
case <-job.ctx.Done(): arrName := arrName
r.logger.Info().Msgf("Job %s cancelled", job.ID) g.Go(func() error {
job.Status = JobCancelled
job.CompletedAt = time.Now()
job.Error = "Job was cancelled"
return
default:
// Continue processing
}
a := r.arrs.Get(arrName) select {
if a == nil { case <-ctx.Done():
errs = append(errs, fmt.Errorf("arr %s not found", arrName)) return ctx.Err()
continue default:
} }
if err := a.DeleteFiles(items); err != nil { a := r.arrs.Get(arrName)
errs = append(errs, fmt.Errorf("failed to delete broken items for %s: %w", arrName, err)) if a == nil {
continue r.logger.Error().Msgf("Arr %s not found", arrName)
} return nil
// Search for missing items }
if err := a.SearchMissing(items); err != nil {
errs = append(errs, fmt.Errorf("failed to search missing items for %s: %w", arrName, err)) if err := a.DeleteFiles(items); err != nil {
continue r.logger.Error().Err(err).Msgf("Failed to delete broken items for %s", arrName)
} return nil
processedCount += len(items) }
// Mark this item as processed // Search for missing items
for i := range items { if err := a.SearchMissing(items); err != nil {
items[i].Processed = true r.logger.Error().Err(err).Msgf("Failed to search missing items for %s", arrName)
} return nil
job.BrokenItems[arrName] = items }
return nil
})
} }
// Update job status to in-progress // Update job status to in-progress
job.Status = JobProcessing job.Status = JobProcessing
r.saveToFile() r.saveToFile()
if len(errs) > 0 { // Launch a goroutine to wait for completion and update the job
errMsg := fmt.Sprintf("Job %s encountered errors: %v", job.ID, errs) go func() {
job.Error = errMsg if err := g.Wait(); err != nil {
job.FailedAt = time.Now() job.FailedAt = time.Now()
job.Status = JobFailed job.Error = err.Error()
r.logger.Error().Msg(errMsg) job.CompletedAt = time.Now()
go func() { job.Status = JobFailed
if err := request.SendDiscordMessage("repair_failed", "error", job.discordContext()); err != nil { r.logger.Error().Err(err).Msgf("Job %s failed", id)
r.logger.Error().Msgf("Error sending discord message: %v", err) } else {
} job.CompletedAt = time.Now()
}() job.Status = JobCompleted
return r.logger.Info().Msgf("Job %s completed successfully", id)
}
remainingItems := job.getUnprocessedBrokenItems()
if len(remainingItems) == 0 {
// All items processed, mark job as completed
job.CompletedAt = time.Now()
job.Status = JobCompleted
r.logger.Info().Msgf("Job %s completed successfully (all items processed)", job.ID)
go func() {
if err := request.SendDiscordMessage("repair_complete", "success", job.discordContext()); err != nil {
r.logger.Error().Msgf("Error sending discord message: %v", err)
}
}()
} else {
// Some items still remain, keep job as pending
job.Status = JobPending
r.logger.Info().Msgf("Job %s: processed %d selected items successfully, %d items remaining", job.ID, processedCount, len(remainingItems))
go func() {
if err := request.SendDiscordMessage("repair_partial_complete", "info", job.discordContext()); err != nil {
r.logger.Error().Msgf("Error sending discord message: %v", err)
}
}()
}
r.saveToFile()
}
// ProcessJobItems processes the selected items for a job
// selectedItems is the map of arr names to the list of file IDs to process
func (r *Repair) ProcessJobItems(id string, selectedItems map[string][]int) error {
job := r.GetJob(id)
if job == nil {
return fmt.Errorf("job %s not found", id)
}
if job.Status != JobPending {
return fmt.Errorf("job %s not pending", id)
}
if job.StartedAt.IsZero() {
return fmt.Errorf("job %s not started", id)
}
if !job.CompletedAt.IsZero() {
return fmt.Errorf("job %s already completed", id)
}
if !job.FailedAt.IsZero() {
return fmt.Errorf("job %s already failed", id)
}
brokenItems := job.getUnprocessedBrokenItems()
validatedItems := make(map[string][]arr.ContentFile)
for arrName, selectedItemsList := range selectedItems {
if jobItems, exists := brokenItems[arrName]; exists {
validItems := make([]arr.ContentFile, 0, len(selectedItemsList))
for _, item := range selectedItemsList {
// Find the item in the job items
for _, jobItem := range jobItems {
if jobItem.FileId == item {
validItems = append(validItems, jobItem)
break
}
}
}
if len(validItems) > 0 {
validatedItems[arrName] = validItems
}
} }
}
if len(validatedItems) == 0 {
return fmt.Errorf("no valid items found for job %s", id)
}
job.Status = JobProcessing r.saveToFile()
r.saveToFile() }()
go r.processJob(job, validatedItems)
return nil return nil
} }

View File

@@ -103,10 +103,5 @@ func (s *Server) getLogs(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Expires", "0") w.Header().Set("Expires", "0")
// Stream the file // Stream the file
_, err = io.Copy(w, file) _, _ = io.Copy(w, file)
if err != nil {
s.logger.Error().Err(err).Msg("Error streaming log file")
http.Error(w, "Error streaming log file", http.StatusInternalServerError)
return
}
} }

View File

@@ -9,6 +9,7 @@ import (
"github.com/sirrobot01/decypharr/internal/utils" "github.com/sirrobot01/decypharr/internal/utils"
debridTypes "github.com/sirrobot01/decypharr/pkg/debrid" debridTypes "github.com/sirrobot01/decypharr/pkg/debrid"
"github.com/sirrobot01/decypharr/pkg/debrid/types" "github.com/sirrobot01/decypharr/pkg/debrid/types"
"math"
"os" "os"
"path/filepath" "path/filepath"
"time" "time"
@@ -57,6 +58,8 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp
client := deb.Client() client := deb.Client()
downloadingStatuses := client.GetDownloadingStatus() downloadingStatuses := client.GetDownloadingStatus()
_arr := importReq.Arr _arr := importReq.Arr
backoff := time.NewTimer(s.refreshInterval)
defer backoff.Stop()
for debridTorrent.Status != "downloaded" { for debridTorrent.Status != "downloaded" {
s.logger.Debug().Msgf("%s <- (%s) Download Progress: %.2f%%", debridTorrent.Debrid, debridTorrent.Name, debridTorrent.Progress) s.logger.Debug().Msgf("%s <- (%s) Download Progress: %.2f%%", debridTorrent.Debrid, debridTorrent.Name, debridTorrent.Progress)
dbT, err := client.CheckStatus(debridTorrent) dbT, err := client.CheckStatus(debridTorrent)
@@ -83,10 +86,12 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp
if debridTorrent.Status == "downloaded" || !utils.Contains(downloadingStatuses, debridTorrent.Status) { if debridTorrent.Status == "downloaded" || !utils.Contains(downloadingStatuses, debridTorrent.Status) {
break break
} }
if !utils.Contains(client.GetDownloadingStatus(), debridTorrent.Status) { select {
break case <-backoff.C:
// Increase interval gradually, cap at max
nextInterval := min(s.refreshInterval*2, 30*time.Second)
backoff.Reset(nextInterval)
} }
time.Sleep(s.refreshInterval)
} }
var torrentSymlinkPath string var torrentSymlinkPath string
var err error var err error
@@ -96,15 +101,15 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp
timer := time.Now() timer := time.Now()
onFailed := func(err error) { onFailed := func(err error) {
if err != nil { s.markTorrentAsFailed(torrent)
s.markTorrentAsFailed(torrent) go func() {
go func() { if deleteErr := client.DeleteTorrent(debridTorrent.Id); deleteErr != nil {
_ = client.DeleteTorrent(debridTorrent.Id) s.logger.Warn().Err(deleteErr).Msgf("Failed to delete torrent %s", debridTorrent.Id)
}() }
s.logger.Error().Err(err).Msgf("Error occured while processing torrent %s", debridTorrent.Name) }()
importReq.markAsFailed(err, torrent, debridTorrent) s.logger.Error().Err(err).Msgf("Error occured while processing torrent %s", debridTorrent.Name)
return importReq.markAsFailed(err, torrent, debridTorrent)
} return
} }
onSuccess := func(torrentSymlinkPath string) { onSuccess := func(torrentSymlinkPath string) {
@@ -118,7 +123,9 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp
s.logger.Error().Msgf("Error sending discord message: %v", err) s.logger.Error().Msgf("Error sending discord message: %v", err)
} }
}() }()
_arr.Refresh() go func() {
_arr.Refresh()
}()
} }
switch importReq.Action { switch importReq.Action {
@@ -137,7 +144,6 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp
rclonePath := filepath.Join(debridTorrent.MountPath, cache.GetTorrentFolder(debridTorrent)) // /mnt/remote/realdebrid/MyTVShow rclonePath := filepath.Join(debridTorrent.MountPath, cache.GetTorrentFolder(debridTorrent)) // /mnt/remote/realdebrid/MyTVShow
torrentFolderNoExt := utils.RemoveExtension(debridTorrent.Name) torrentFolderNoExt := utils.RemoveExtension(debridTorrent.Name)
torrentSymlinkPath, err = s.createSymlinksWebdav(torrent, debridTorrent, rclonePath, torrentFolderNoExt) // /mnt/symlinks/{category}/MyTVShow/ torrentSymlinkPath, err = s.createSymlinksWebdav(torrent, debridTorrent, rclonePath, torrentFolderNoExt) // /mnt/symlinks/{category}/MyTVShow/
} else { } else {
// User is using either zurg or debrid webdav // User is using either zurg or debrid webdav
torrentSymlinkPath, err = s.processSymlink(torrent, debridTorrent) // /mnt/symlinks/{category}/MyTVShow/ torrentSymlinkPath, err = s.processSymlink(torrent, debridTorrent) // /mnt/symlinks/{category}/MyTVShow/
@@ -202,6 +208,9 @@ func (s *Store) partialTorrentUpdate(t *Torrent, debridTorrent *types.Torrent) *
} }
totalSize := debridTorrent.Bytes totalSize := debridTorrent.Bytes
progress := (cmp.Or(debridTorrent.Progress, 0.0)) / 100.0 progress := (cmp.Or(debridTorrent.Progress, 0.0)) / 100.0
if math.IsNaN(progress) || math.IsInf(progress, 0) {
progress = 0
}
sizeCompleted := int64(float64(totalSize) * progress) sizeCompleted := int64(float64(totalSize) * progress)
var speed int64 var speed int64

View File

@@ -326,28 +326,6 @@ func (wb *Web) handleProcessRepairJob(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
} }
func (wb *Web) handleProcessRepairJobItems(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
if id == "" {
http.Error(w, "No job ID provided", http.StatusBadRequest)
return
}
var req struct {
Items map[string][]int `json:"items"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid request body: "+err.Error(), http.StatusBadRequest)
return
}
_store := store.Get()
if err := _store.Repair().ProcessJobItems(id, req.Items); err != nil {
wb.logger.Error().Err(err).Msg("Failed to process repair job items")
http.Error(w, "Failed to process job items: "+err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
func (wb *Web) handleDeleteRepairJob(w http.ResponseWriter, r *http.Request) { func (wb *Web) handleDeleteRepairJob(w http.ResponseWriter, r *http.Request) {
// Read ids from body // Read ids from body
var req struct { var req struct {

View File

@@ -28,7 +28,6 @@ func (wb *Web) Routes() http.Handler {
r.Post("/repair", wb.handleRepairMedia) r.Post("/repair", wb.handleRepairMedia)
r.Get("/repair/jobs", wb.handleGetRepairJobs) r.Get("/repair/jobs", wb.handleGetRepairJobs)
r.Post("/repair/jobs/{id}/process", wb.handleProcessRepairJob) r.Post("/repair/jobs/{id}/process", wb.handleProcessRepairJob)
r.Post("/repair/jobs/{id}/process-items", wb.handleProcessRepairJobItems)
r.Post("/repair/jobs/{id}/stop", wb.handleStopRepairJob) r.Post("/repair/jobs/{id}/stop", wb.handleStopRepairJob)
r.Delete("/repair/jobs", wb.handleDeleteRepairJob) r.Delete("/repair/jobs", wb.handleDeleteRepairJob)
r.Get("/torrents", wb.handleGetTorrents) r.Get("/torrents", wb.handleGetTorrents)

View File

@@ -337,6 +337,14 @@
</div> </div>
</div> </div>
<div class="row"> <div class="row">
<div class="col-md-4 mb-3">
<label class="form-label" for="repair.strategy">Repair Strategy</label>
<select class="form-select" name="repair.strategy" id="repair.strategy">
<option value="per_torrent" selected>Per Torrent</option>
<option value="per_file">Per File</option>
</select>
<small class="form-text text-muted">How to handle repairs, per torrent or per file</small>
</div>
<div class="col-md-4 mb-3"> <div class="col-md-4 mb-3">
<div class="form-check"> <div class="form-check">
<input type="checkbox" class="form-check-input" name="repair.use_webdav" <input type="checkbox" class="form-check-input" name="repair.use_webdav"
@@ -1005,6 +1013,9 @@
if (config.max_file_size) { if (config.max_file_size) {
document.querySelector('[name="max_file_size"]').value = config.max_file_size; document.querySelector('[name="max_file_size"]').value = config.max_file_size;
} }
if (config.remove_stalled_after) {
document.querySelector('[name="remove_stalled_after"]').value = config.remove_stalled_after;
}
if (config.discord_webhook_url) { if (config.discord_webhook_url) {
document.querySelector('[name="discord_webhook_url"]').value = config.discord_webhook_url; document.querySelector('[name="discord_webhook_url"]').value = config.discord_webhook_url;
} }
@@ -1223,6 +1234,7 @@
enabled: document.querySelector('[name="repair.enabled"]').checked, enabled: document.querySelector('[name="repair.enabled"]').checked,
interval: document.querySelector('[name="repair.interval"]').value, interval: document.querySelector('[name="repair.interval"]').value,
zurg_url: document.querySelector('[name="repair.zurg_url"]').value, zurg_url: document.querySelector('[name="repair.zurg_url"]').value,
strategy: document.querySelector('[name="repair.strategy"]').value,
workers: parseInt(document.querySelector('[name="repair.workers"]').value), workers: parseInt(document.querySelector('[name="repair.workers"]').value),
use_webdav: document.querySelector('[name="repair.use_webdav"]').checked, use_webdav: document.querySelector('[name="repair.use_webdav"]').checked,
auto_process: document.querySelector('[name="repair.auto_process"]').checked auto_process: document.querySelector('[name="repair.auto_process"]').checked

View File

@@ -130,13 +130,7 @@
<h6 class="mb-0"> <h6 class="mb-0">
Broken Items Broken Items
<span class="badge bg-secondary" id="totalItemsCount">0</span> <span class="badge bg-secondary" id="totalItemsCount">0</span>
<span class="badge bg-primary" id="selectedItemsCount">0 selected</span>
</h6> </h6>
<div class="d-flex gap-2">
<button class="btn btn-sm btn-primary" id="processSelectedItemsBtn" disabled>
<i class="bi bi-play-fill me-1"></i>Process Selected
</button>
</div>
</div> </div>
<!-- Filters and Search --> <!-- Filters and Search -->
@@ -171,11 +165,6 @@
<table class="table table-sm table-striped table-hover"> <table class="table table-sm table-striped table-hover">
<thead class="sticky-top"> <thead class="sticky-top">
<tr> <tr>
<th style="width: 40px;">
<div class="form-check">
<input class="form-check-input" type="checkbox" id="selectAllItemsTable">
</div>
</th>
<th>Arr</th> <th>Arr</th>
<th>Path</th> <th>Path</th>
<th style="width: 100px;">Type</th> <th style="width: 100px;">Type</th>
@@ -294,7 +283,7 @@
if (!response.ok) throw new Error(await response.text()); if (!response.ok) throw new Error(await response.text());
createToast('Repair process initiated successfully!'); createToast('Repair process initiated successfully!');
loadJobs(1); // Refresh jobs after submission await loadJobs(1); // Refresh jobs after submission
} catch (error) { } catch (error) {
createToast(`Error starting repair: ${error.message}`, 'error'); createToast(`Error starting repair: ${error.message}`, 'error');
} finally { } finally {
@@ -459,9 +448,8 @@
document.querySelectorAll('#jobsPagination a[data-page]').forEach(link => { document.querySelectorAll('#jobsPagination a[data-page]').forEach(link => {
link.addEventListener('click', (e) => { link.addEventListener('click', (e) => {
e.preventDefault(); e.preventDefault();
const newPage = parseInt(e.currentTarget.dataset.page); currentPage = parseInt(e.currentTarget.dataset.page);
currentPage = newPage; renderJobsTable(currentPage);
renderJobsTable(newPage);
}); });
}); });
@@ -526,7 +514,6 @@
// modal functions // modal functions
function processItemsData(brokenItems) { function processItemsData(brokenItems) {
const items = []; const items = [];
let itemId = 0;
for (const [arrName, itemsArray] of Object.entries(brokenItems)) { for (const [arrName, itemsArray] of Object.entries(brokenItems)) {
if (itemsArray && itemsArray.length > 0) { if (itemsArray && itemsArray.length > 0) {
@@ -601,51 +588,15 @@
row.dataset.itemId = item.id; row.dataset.itemId = item.id;
row.innerHTML = ` row.innerHTML = `
<td>
<div class="form-check">
<input class="form-check-input item-checkbox"
type="checkbox"
value="${item.id}"
${selectedItems.has(item.id) ? 'checked' : ''}>
</div>
</td>
<td><span class="badge bg-info">${item.arr}</span></td> <td><span class="badge bg-info">${item.arr}</span></td>
<td><small class="text-muted" title="${item.path}">${item.path}</small></td> <td><small class="text-muted" title="${item.path}">${item.path}</small></td>
<td><span class="badge ${item.type === 'movie' ? 'bg-primary' : item.type === 'tv' ? 'bg-success' : 'bg-secondary'}">${item.type}</span></td> <td><span class="badge ${item.type === 'movie' ? 'bg-primary' : item.type === 'tv' ? 'bg-success' : 'bg-secondary'}">${item.type}</span></td>
<td><small>${formatFileSize(item.size)}</small></td> <td><small>${formatFileSize(item.size)}</small></td>
`; `;
// Make row clickable to toggle selection
row.addEventListener('click', (e) => {
if (e.target.type !== 'checkbox') {
const checkbox = row.querySelector('.item-checkbox');
checkbox.checked = !checkbox.checked;
checkbox.dispatchEvent(new Event('change'));
}
});
tableBody.appendChild(row); tableBody.appendChild(row);
} }
// Add event listeners to checkboxes
document.querySelectorAll('.item-checkbox').forEach(checkbox => {
checkbox.addEventListener('change', (e) => {
const itemId = parseInt(e.target.value);
const row = e.target.closest('tr');
if (e.target.checked) {
selectedItems.add(itemId);
row.classList.add('selected');
} else {
selectedItems.delete(itemId);
row.classList.remove('selected');
}
updateItemsStats();
updateSelectAllStates();
});
});
// Create pagination // Create pagination
if (totalPages > 1) { if (totalPages > 1) {
const prevLi = document.createElement('li'); const prevLi = document.createElement('li');
@@ -674,45 +625,18 @@
document.querySelectorAll('#itemsPagination a[data-items-page]').forEach(link => { document.querySelectorAll('#itemsPagination a[data-items-page]').forEach(link => {
link.addEventListener('click', (e) => { link.addEventListener('click', (e) => {
e.preventDefault(); e.preventDefault();
const newPage = parseInt(e.currentTarget.dataset.itemsPage); currentItemsPage = parseInt(e.currentTarget.dataset.itemsPage);
currentItemsPage = newPage;
renderBrokenItemsTable(); renderBrokenItemsTable();
}); });
}); });
updateSelectAllStates();
} }
function updateItemsStats() { function updateItemsStats() {
document.getElementById('totalItemsCount').textContent = allBrokenItems.length; document.getElementById('totalItemsCount').textContent = allBrokenItems.length;
document.getElementById('selectedItemsCount').textContent = `${selectedItems.size} selected`;
const processSelectedBtn = document.getElementById('processSelectedItemsBtn');
processSelectedBtn.disabled = selectedItems.size === 0;
// Update footer stats // Update footer stats
const footerStats = document.getElementById('modalFooterStats'); const footerStats = document.getElementById('modalFooterStats');
footerStats.textContent = `Total: ${allBrokenItems.length} | Filtered: ${filteredItems.length} | Selected: ${selectedItems.size}`; footerStats.textContent = `Total: ${allBrokenItems.length} | Filtered: ${filteredItems.length}`;
}
function updateSelectAllStates() {
const selectAllTableCheckbox = document.getElementById('selectAllItemsTable');
const visibleCheckboxes = document.querySelectorAll('.item-checkbox');
const checkedVisible = document.querySelectorAll('.item-checkbox:checked');
if (visibleCheckboxes.length === 0) {
selectAllTableCheckbox.indeterminate = false;
selectAllTableCheckbox.checked = false;
} else if (checkedVisible.length === visibleCheckboxes.length) {
selectAllTableCheckbox.indeterminate = false;
selectAllTableCheckbox.checked = true;
} else if (checkedVisible.length > 0) {
selectAllTableCheckbox.indeterminate = true;
selectAllTableCheckbox.checked = false;
} else {
selectAllTableCheckbox.indeterminate = false;
selectAllTableCheckbox.checked = false;
}
} }
function populateArrFilter() { function populateArrFilter() {
@@ -728,62 +652,6 @@
}); });
} }
document.getElementById('selectAllItemsTable').addEventListener('change', (e) => {
const visibleCheckboxes = document.querySelectorAll('.item-checkbox');
visibleCheckboxes.forEach(checkbox => {
const itemId = parseInt(checkbox.value);
const row = checkbox.closest('tr');
if (e.target.checked) {
selectedItems.add(itemId);
checkbox.checked = true;
row.classList.add('selected');
} else {
selectedItems.delete(itemId);
checkbox.checked = false;
row.classList.remove('selected');
}
});
updateItemsStats();
});
document.getElementById('processSelectedItemsBtn').addEventListener('click', async () => {
if (selectedItems.size === 0) return;
const selectedItemsData = allBrokenItems.filter(item => selectedItems.has(item.id));
// Group by arr
const itemsByArr = {};
selectedItemsData.forEach(item => {
if (!itemsByArr[item.arr]) {
itemsByArr[item.arr] = [];
}
itemsByArr[item.arr].push(item.id);
});
console.log(itemsByArr);
try {
const response = await fetcher(`/api/repair/jobs/${currentJob.id}/process-items`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ items: itemsByArr })
});
if (!response.ok) throw new Error(await response.text());
createToast(`Processing ${selectedItems.size} selected items`);
// Close modal and refresh jobs
const modal = bootstrap.Modal.getInstance(document.getElementById('jobDetailsModal'));
modal.hide();
loadJobs(currentPage);
} catch (error) {
createToast(`Error processing selected items: ${error.message}`, 'error');
}
});
// Filter event listeners // Filter event listeners
document.getElementById('itemSearchInput').addEventListener('input', applyFilters); document.getElementById('itemSearchInput').addEventListener('input', applyFilters);
document.getElementById('arrFilterSelect').addEventListener('change', applyFilters); document.getElementById('arrFilterSelect').addEventListener('change', applyFilters);

View File

@@ -12,19 +12,24 @@ import (
"github.com/sirrobot01/decypharr/pkg/debrid/store" "github.com/sirrobot01/decypharr/pkg/debrid/store"
) )
var streamingTransport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
MaxIdleConns: 200,
MaxIdleConnsPerHost: 100,
MaxConnsPerHost: 200,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 60 * time.Second, // give the upstream a minute to send headers
ExpectContinueTimeout: 1 * time.Second,
DisableKeepAlives: true, // close after each request
ForceAttemptHTTP2: false, // dont speak HTTP/2
// this line is what truly blocks HTTP/2:
TLSNextProto: make(map[string]func(string, *tls.Conn) http.RoundTripper),
}
var sharedClient = &http.Client{ var sharedClient = &http.Client{
Transport: &http.Transport{ Transport: streamingTransport,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, Timeout: 0,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 20,
MaxConnsPerHost: 50,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DisableKeepAlives: false,
},
Timeout: 0,
} }
type streamError struct { type streamError struct {
@@ -143,7 +148,7 @@ func (f *File) StreamResponse(w http.ResponseWriter, r *http.Request) error {
} }
func (f *File) streamWithRetry(w http.ResponseWriter, r *http.Request, retryCount int) error { func (f *File) streamWithRetry(w http.ResponseWriter, r *http.Request, retryCount int) error {
const maxRetries = 0 const maxRetries = 3
_log := f.cache.Logger() _log := f.cache.Logger()
// Get download link (with caching optimization) // Get download link (with caching optimization)
@@ -192,8 +197,47 @@ func (f *File) streamWithRetry(w http.ResponseWriter, r *http.Request, retryCoun
setVideoResponseHeaders(w, resp, isRangeRequest == 1) setVideoResponseHeaders(w, resp, isRangeRequest == 1)
// Stream with optimized buffering for video return f.streamBuffer(w, resp.Body)
return f.streamVideoOptimized(w, resp.Body) }
func (f *File) streamBuffer(w http.ResponseWriter, src io.Reader) error {
flusher, ok := w.(http.Flusher)
if !ok {
return fmt.Errorf("response does not support flushing")
}
smallBuf := make([]byte, 64*1024) // 64 KB
if n, err := src.Read(smallBuf); n > 0 {
if _, werr := w.Write(smallBuf[:n]); werr != nil {
return werr
}
flusher.Flush()
} else if err != nil && err != io.EOF {
return err
}
buf := make([]byte, 256*1024) // 256 KB
for {
n, readErr := src.Read(buf)
if n > 0 {
if _, writeErr := w.Write(buf[:n]); writeErr != nil {
if isClientDisconnection(writeErr) {
return &streamError{Err: writeErr, StatusCode: 0, IsClientDisconnection: true}
}
return writeErr
}
flusher.Flush()
}
if readErr != nil {
if readErr == io.EOF {
return nil
}
if isClientDisconnection(readErr) {
return &streamError{Err: readErr, StatusCode: 0, IsClientDisconnection: true}
}
return readErr
}
}
} }
func (f *File) handleUpstream(resp *http.Response, retryCount, maxRetries int) (shouldRetry bool, err error) { func (f *File) handleUpstream(resp *http.Response, retryCount, maxRetries int) (shouldRetry bool, err error) {
@@ -319,51 +363,6 @@ func (f *File) handleRangeRequest(upstreamReq *http.Request, r *http.Request, w
return 1 // Valid range request return 1 // Valid range request
} }
func (f *File) streamVideoOptimized(w http.ResponseWriter, src io.Reader) error {
// Use larger buffer for video streaming (better throughput)
buf := make([]byte, 64*1024) // 64KB buffer
// First chunk optimization - send immediately for faster start
n, err := src.Read(buf)
if err != nil && err != io.EOF {
if isClientDisconnection(err) {
return &streamError{Err: err, StatusCode: 0, IsClientDisconnection: true}
}
return &streamError{Err: err, StatusCode: 0}
}
if n > 0 {
// Write first chunk immediately
_, writeErr := w.Write(buf[:n])
if writeErr != nil {
if isClientDisconnection(writeErr) {
return &streamError{Err: writeErr, StatusCode: 0, IsClientDisconnection: true}
}
return &streamError{Err: writeErr, StatusCode: 0}
}
// Flush immediately for faster video start
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
}
if err == io.EOF {
return nil
}
// Continue with optimized copy for remaining data
_, err = io.CopyBuffer(w, src, buf)
if err != nil {
if isClientDisconnection(err) {
return &streamError{Err: err, StatusCode: 0, IsClientDisconnection: true}
}
return &streamError{Err: err, StatusCode: 0}
}
return nil
}
/* /*
These are the methods that implement the os.File interface for the File type. These are the methods that implement the os.File interface for the File type.
Only Stat and ReadDir are used Only Stat and ReadDir are used

View File

@@ -22,6 +22,8 @@ import (
"github.com/sirrobot01/decypharr/pkg/version" "github.com/sirrobot01/decypharr/pkg/version"
) )
const DeleteAllBadTorrentKey = "DELETE_ALL_BAD_TORRENTS"
type Handler struct { type Handler struct {
Name string Name string
logger zerolog.Logger logger zerolog.Logger
@@ -180,7 +182,7 @@ func (h *Handler) getChildren(name string) []os.FileInfo {
if len(parts) == 2 && utils.Contains(h.getParentItems(), parts[0]) { if len(parts) == 2 && utils.Contains(h.getParentItems(), parts[0]) {
torrentName := parts[1] torrentName := parts[1]
if t := h.cache.GetTorrentByName(torrentName); t != nil { if t := h.cache.GetTorrentByName(torrentName); t != nil {
return h.getFileInfos(t.Torrent) return h.getFileInfos(t)
} }
} }
return nil return nil
@@ -267,10 +269,9 @@ func (h *Handler) Stat(ctx context.Context, name string) (os.FileInfo, error) {
return f.Stat() return f.Stat()
} }
func (h *Handler) getFileInfos(torrent *types.Torrent) []os.FileInfo { func (h *Handler) getFileInfos(torrent *store.CachedTorrent) []os.FileInfo {
torrentFiles := torrent.GetFiles() torrentFiles := torrent.GetFiles()
files := make([]os.FileInfo, 0, len(torrentFiles)) files := make([]os.FileInfo, 0, len(torrentFiles))
now := time.Now()
// Sort by file name since the order is lost when using the map // Sort by file name since the order is lost when using the map
sortedFiles := make([]*types.File, 0, len(torrentFiles)) sortedFiles := make([]*types.File, 0, len(torrentFiles))
@@ -286,7 +287,7 @@ func (h *Handler) getFileInfos(torrent *types.Torrent) []os.FileInfo {
name: file.Name, name: file.Name,
size: file.Size, size: file.Size,
mode: 0644, mode: 0644,
modTime: now, modTime: torrent.AddedOn,
isDir: false, isDir: false,
}) })
} }
@@ -309,7 +310,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.handlePropfind(w, r) h.handlePropfind(w, r)
return return
case "DELETE": case "DELETE":
if err := h.handleIDDelete(w, r); err == nil { if err := h.handleDelete(w, r); err == nil {
return return
} }
// fallthrough to default // fallthrough to default
@@ -388,21 +389,23 @@ func (h *Handler) serveDirectory(w http.ResponseWriter, r *http.Request, file we
// Prepare template data // Prepare template data
data := struct { data := struct {
Path string Path string
ParentPath string ParentPath string
ShowParent bool ShowParent bool
Children []os.FileInfo Children []os.FileInfo
URLBase string URLBase string
IsBadPath bool IsBadPath bool
CanDelete bool CanDelete bool
DeleteAllBadTorrentKey string
}{ }{
Path: cleanPath, Path: cleanPath,
ParentPath: parentPath, ParentPath: parentPath,
ShowParent: showParent, ShowParent: showParent,
Children: children, Children: children,
URLBase: h.URLBase, URLBase: h.URLBase,
IsBadPath: isBadPath, IsBadPath: isBadPath,
CanDelete: canDelete, CanDelete: canDelete,
DeleteAllBadTorrentKey: DeleteAllBadTorrentKey,
} }
w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Content-Type", "text/html; charset=utf-8")
@@ -535,8 +538,8 @@ func (h *Handler) handleOptions(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
} }
// handleDelete deletes a torrent from using id // handleDelete deletes a torrent by id, or all bad torrents if the id is DeleteAllBadTorrentKey
func (h *Handler) handleIDDelete(w http.ResponseWriter, r *http.Request) error { func (h *Handler) handleDelete(w http.ResponseWriter, r *http.Request) error {
cleanPath := path.Clean(r.URL.Path) // Remove any leading slashes cleanPath := path.Clean(r.URL.Path) // Remove any leading slashes
_, torrentId := path.Split(cleanPath) _, torrentId := path.Split(cleanPath)
@@ -544,7 +547,15 @@ func (h *Handler) handleIDDelete(w http.ResponseWriter, r *http.Request) error {
return os.ErrNotExist return os.ErrNotExist
} }
cachedTorrent := h.cache.GetTorrent(torrentId) if torrentId == DeleteAllBadTorrentKey {
return h.handleDeleteAll(w)
}
return h.handleDeleteById(w, torrentId)
}
func (h *Handler) handleDeleteById(w http.ResponseWriter, tId string) error {
cachedTorrent := h.cache.GetTorrent(tId)
if cachedTorrent == nil { if cachedTorrent == nil {
return os.ErrNotExist return os.ErrNotExist
} }
@@ -553,3 +564,22 @@ func (h *Handler) handleIDDelete(w http.ResponseWriter, r *http.Request) error {
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
return nil return nil
} }
func (h *Handler) handleDeleteAll(w http.ResponseWriter) error {
badTorrents := h.cache.GetListing("__bad__")
if len(badTorrents) == 0 {
http.Error(w, "No bad torrents to delete", http.StatusNotFound)
return nil
}
for _, fi := range badTorrents {
tName := strings.TrimSpace(strings.SplitN(fi.Name(), "||", 2)[0])
t := h.cache.GetTorrentByName(tName)
if t != nil {
h.cache.OnRemove(t.Id)
}
}
w.WriteHeader(http.StatusNoContent)
return nil
}

View File

@@ -56,7 +56,7 @@ type entry struct {
func filesToXML(urlPath string, fi os.FileInfo, children []os.FileInfo) stringbuf.StringBuf { func filesToXML(urlPath string, fi os.FileInfo, children []os.FileInfo) stringbuf.StringBuf {
now := time.Now().UTC().Format("2006-01-02T15:04:05.000-07:00") now := time.Now().UTC().Format(time.RFC3339)
entries := make([]entry, 0, len(children)+1) entries := make([]entry, 0, len(children)+1)
// Add the current file itself // Add the current file itself
@@ -65,7 +65,7 @@ func filesToXML(urlPath string, fi os.FileInfo, children []os.FileInfo) stringbu
escName: xmlEscape(fi.Name()), escName: xmlEscape(fi.Name()),
isDir: fi.IsDir(), isDir: fi.IsDir(),
size: fi.Size(), size: fi.Size(),
modTime: fi.ModTime().Format("2006-01-02T15:04:05.000-07:00"), modTime: fi.ModTime().Format(time.RFC3339),
}) })
for _, info := range children { for _, info := range children {
@@ -81,7 +81,7 @@ func filesToXML(urlPath string, fi os.FileInfo, children []os.FileInfo) stringbu
escName: xmlEscape(nm), escName: xmlEscape(nm),
isDir: info.IsDir(), isDir: info.IsDir(),
size: info.Size(), size: info.Size(),
modTime: info.ModTime().Format("2006-01-02T15:04:05.000-07:00"), modTime: info.ModTime().Format(time.RFC3339),
}) })
} }
@@ -238,17 +238,5 @@ func setVideoResponseHeaders(w http.ResponseWriter, resp *http.Response, isRange
w.Header().Set("Content-Range", contentRange) w.Header().Set("Content-Range", contentRange)
} }
// Video streaming optimizations
w.Header().Set("Accept-Ranges", "bytes") // Enable seeking
w.Header().Set("Connection", "keep-alive") // Keep connection open
// Prevent buffering in proxies/CDNs
w.Header().Set("X-Accel-Buffering", "no") // Nginx
w.Header().Set("Proxy-Buffering", "off") // General proxy
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Range")
w.Header().Set("Access-Control-Expose-Headers", "Content-Length, Content-Range")
w.WriteHeader(resp.StatusCode) w.WriteHeader(resp.StatusCode)
} }

View File

@@ -55,7 +55,6 @@ func (h *Handler) handlePropfind(w http.ResponseWriter, r *http.Request) {
rawEntries = append(rawEntries, h.getChildren(cleanPath)...) rawEntries = append(rawEntries, h.getChildren(cleanPath)...)
} }
now := time.Now().UTC().Format("2006-01-02T15:04:05.000-07:00")
entries := make([]entry, 0, len(rawEntries)+1) entries := make([]entry, 0, len(rawEntries)+1)
// Add the current file itself // Add the current file itself
entries = append(entries, entry{ entries = append(entries, entry{
@@ -63,7 +62,7 @@ func (h *Handler) handlePropfind(w http.ResponseWriter, r *http.Request) {
escName: xmlEscape(fi.Name()), escName: xmlEscape(fi.Name()),
isDir: fi.IsDir(), isDir: fi.IsDir(),
size: fi.Size(), size: fi.Size(),
modTime: fi.ModTime().Format("2006-01-02T15:04:05.000-07:00"), modTime: fi.ModTime().Format(time.RFC3339),
}) })
for _, info := range rawEntries { for _, info := range rawEntries {
@@ -79,7 +78,7 @@ func (h *Handler) handlePropfind(w http.ResponseWriter, r *http.Request) {
escName: xmlEscape(nm), escName: xmlEscape(nm),
isDir: info.IsDir(), isDir: info.IsDir(),
size: info.Size(), size: info.Size(),
modTime: info.ModTime().Format("2006-01-02T15:04:05.000-07:00"), modTime: info.ModTime().Format(time.RFC3339),
}) })
} }
@@ -108,7 +107,7 @@ func (h *Handler) handlePropfind(w http.ResponseWriter, r *http.Request) {
} }
_, _ = sb.WriteString(`<d:getlastmodified>`) _, _ = sb.WriteString(`<d:getlastmodified>`)
_, _ = sb.WriteString(now) _, _ = sb.WriteString(e.modTime)
_, _ = sb.WriteString(`</d:getlastmodified>`) _, _ = sb.WriteString(`</d:getlastmodified>`)
_, _ = sb.WriteString(`<d:displayname>`) _, _ = sb.WriteString(`<d:displayname>`)

View File

@@ -106,6 +106,19 @@
</li> </li>
{{- end}} {{- end}}
{{$isBadPath := hasSuffix .Path "__bad__"}} {{$isBadPath := hasSuffix .Path "__bad__"}}
{{- if and $isBadPath (gt (len .Children) 0) }}
<li>
<span class="file-number">&nbsp;</span>
<span class="file-name">&nbsp;</span>
<span class="file-info">&nbsp;</span>
<button
class="delete-btn"
id="delete-all-btn"
data-name="{{.DeleteAllBadTorrentKey}}">
Delete All
</button>
</li>
{{- end}}
{{- range $i, $file := .Children}} {{- range $i, $file := .Children}}
<li class="{{if $isBadPath}}disabled{{end}}"> <li class="{{if $isBadPath}}disabled{{end}}">
<a {{ if not $isBadPath}}href="{{urlpath (printf "%s/%s" $.Path $file.Name)}}"{{end}}> <a {{ if not $isBadPath}}href="{{urlpath (printf "%s/%s" $.Path $file.Name)}}"{{end}}>
@@ -118,7 +131,7 @@
</a> </a>
{{- if and $.CanDelete }} {{- if and $.CanDelete }}
<button <button
class="delete-btn" class="delete-btn delete-with-id-btn"
data-name="{{$file.Name}}" data-name="{{$file.Name}}"
data-path="{{printf "%s/%s" $.Path $file.ID}}"> data-path="{{printf "%s/%s" $.Path $file.ID}}">
Delete Delete
@@ -128,7 +141,7 @@
{{- end}} {{- end}}
</ul> </ul>
<script> <script>
document.querySelectorAll('.delete-btn').forEach(btn=>{ document.querySelectorAll('.delete-with-id-btn').forEach(btn=>{
btn.addEventListener('click', ()=>{ btn.addEventListener('click', ()=>{
let p = btn.getAttribute('data-path'); let p = btn.getAttribute('data-path');
let name = btn.getAttribute('data-name'); let name = btn.getAttribute('data-name');
@@ -137,6 +150,14 @@
.then(_=>location.reload()); .then(_=>location.reload());
}); });
}); });
const deleteAllButton = document.getElementById('delete-all-btn');
deleteAllButton.addEventListener('click', () => {
let p = deleteAllButton.getAttribute('data-name');
if (!confirm('Delete all entries marked Bad?')) return;
fetch(p, { method: 'DELETE' })
.then(_=>location.reload());
});
</script> </script>
</body> </body>
</html> </html>