Add repair and download rate limit
This commit is contained in:
3
go.mod
3
go.mod
@@ -14,16 +14,17 @@ require (
|
||||
github.com/robfig/cron/v3 v3.0.1
|
||||
github.com/rs/zerolog v1.33.0
|
||||
github.com/stanNthe5/stringbuf v0.0.3
|
||||
go.uber.org/ratelimit v0.3.1
|
||||
golang.org/x/crypto v0.33.0
|
||||
golang.org/x/net v0.35.0
|
||||
golang.org/x/sync v0.12.0
|
||||
golang.org/x/time v0.8.0
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/anacrolix/missinggo v1.3.0 // indirect
|
||||
github.com/anacrolix/missinggo/v2 v2.7.3 // indirect
|
||||
github.com/benbjohnson/clock v1.3.0 // indirect
|
||||
github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 // indirect
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||
github.com/google/go-cmp v0.6.0 // indirect
|
||||
|
||||
8
go.sum
8
go.sum
@@ -36,6 +36,8 @@ github.com/anacrolix/tagflag v1.1.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CM
|
||||
github.com/anacrolix/torrent v1.55.0 h1:s9yh/YGdPmbN9dTa+0Inh2dLdrLQRvEAj1jdFW/Hdd8=
|
||||
github.com/anacrolix/torrent v1.55.0/go.mod h1:sBdZHBSZNj4de0m+EbYg7vvs/G/STubxu/GzzNbojsE=
|
||||
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
|
||||
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
|
||||
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
|
||||
github.com/benbjohnson/immutable v0.2.0/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI=
|
||||
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
|
||||
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
|
||||
@@ -216,8 +218,12 @@ github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPy
|
||||
go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
|
||||
go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
|
||||
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
|
||||
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
|
||||
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0=
|
||||
go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus=
|
||||
@@ -266,8 +272,6 @@ golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
|
||||
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
|
||||
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
|
||||
@@ -19,17 +19,19 @@ var (
|
||||
)
|
||||
|
||||
type Debrid struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
APIKey string `json:"api_key,omitempty"`
|
||||
DownloadAPIKeys []string `json:"download_api_keys,omitempty"`
|
||||
Folder string `json:"folder,omitempty"`
|
||||
DownloadUncached bool `json:"download_uncached,omitempty"`
|
||||
CheckCached bool `json:"check_cached,omitempty"`
|
||||
RateLimit string `json:"rate_limit,omitempty"` // 200/minute or 10/second
|
||||
Proxy string `json:"proxy,omitempty"`
|
||||
UnpackRar bool `json:"unpack_rar,omitempty"`
|
||||
AddSamples bool `json:"add_samples,omitempty"`
|
||||
MinimumFreeSlot int `json:"minimum_free_slot,omitempty"` // Minimum active pots to use this debrid
|
||||
Name string `json:"name,omitempty"`
|
||||
APIKey string `json:"api_key,omitempty"`
|
||||
DownloadAPIKeys []string `json:"download_api_keys,omitempty"`
|
||||
Folder string `json:"folder,omitempty"`
|
||||
DownloadUncached bool `json:"download_uncached,omitempty"`
|
||||
CheckCached bool `json:"check_cached,omitempty"`
|
||||
RateLimit string `json:"rate_limit,omitempty"` // 200/minute or 10/second
|
||||
RepairRateLimit string `json:"repair_rate_limit,omitempty"`
|
||||
DownloadRateLimit string `json:"download_rate_limit,omitempty"`
|
||||
Proxy string `json:"proxy,omitempty"`
|
||||
UnpackRar bool `json:"unpack_rar,omitempty"`
|
||||
AddSamples bool `json:"add_samples,omitempty"`
|
||||
MinimumFreeSlot int `json:"minimum_free_slot,omitempty"` // Minimum active pots to use this debrid
|
||||
|
||||
UseWebDav bool `json:"use_webdav,omitempty"`
|
||||
WebDav
|
||||
|
||||
@@ -9,10 +9,9 @@ import (
|
||||
"fmt"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/sirrobot01/decypharr/internal/logger"
|
||||
"go.uber.org/ratelimit"
|
||||
"golang.org/x/net/proxy"
|
||||
"golang.org/x/time/rate"
|
||||
"io"
|
||||
"math"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
@@ -52,7 +51,7 @@ type ClientOption func(*Client)
|
||||
// Client represents an HTTP client with additional capabilities
|
||||
type Client struct {
|
||||
client *http.Client
|
||||
rateLimiter *rate.Limiter
|
||||
rateLimiter ratelimit.Limiter
|
||||
headers map[string]string
|
||||
headersMu sync.RWMutex
|
||||
maxRetries int
|
||||
@@ -84,7 +83,7 @@ func WithRedirectPolicy(policy func(req *http.Request, via []*http.Request) erro
|
||||
}
|
||||
|
||||
// WithRateLimiter sets a rate limiter
|
||||
func WithRateLimiter(rl *rate.Limiter) ClientOption {
|
||||
func WithRateLimiter(rl ratelimit.Limiter) ClientOption {
|
||||
return func(c *Client) {
|
||||
c.rateLimiter = rl
|
||||
}
|
||||
@@ -136,9 +135,11 @@ func WithProxy(proxyURL string) ClientOption {
|
||||
// doRequest performs a single HTTP request with rate limiting
|
||||
func (c *Client) doRequest(req *http.Request) (*http.Response, error) {
|
||||
if c.rateLimiter != nil {
|
||||
err := c.rateLimiter.Wait(req.Context())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("rate limiter wait: %w", err)
|
||||
select {
|
||||
case <-req.Context().Done():
|
||||
return nil, req.Context().Err()
|
||||
default:
|
||||
c.rateLimiter.Take()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -339,7 +340,10 @@ func New(options ...ClientOption) *Client {
|
||||
return client
|
||||
}
|
||||
|
||||
func ParseRateLimit(rateStr string) *rate.Limiter {
|
||||
func ParseRateLimit(rateStr string) ratelimit.Limiter {
|
||||
if rateStr == "" {
|
||||
return nil
|
||||
}
|
||||
parts := strings.SplitN(rateStr, "/", 2)
|
||||
if len(parts) != 2 {
|
||||
return nil
|
||||
@@ -351,23 +355,21 @@ func ParseRateLimit(rateStr string) *rate.Limiter {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Set slack size to 10%
|
||||
slackSize := count / 10
|
||||
|
||||
// normalize unit
|
||||
unit := strings.ToLower(strings.TrimSpace(parts[1]))
|
||||
unit = strings.TrimSuffix(unit, "s")
|
||||
burstSize := int(math.Ceil(float64(count) * 0.1))
|
||||
if burstSize < 1 {
|
||||
burstSize = 1
|
||||
}
|
||||
if burstSize > count {
|
||||
burstSize = count
|
||||
}
|
||||
switch unit {
|
||||
case "minute", "min":
|
||||
return rate.NewLimiter(rate.Limit(float64(count)/60.0), burstSize)
|
||||
return ratelimit.New(count, ratelimit.Per(time.Minute), ratelimit.WithSlack(slackSize))
|
||||
case "second", "sec":
|
||||
return rate.NewLimiter(rate.Limit(float64(count)), burstSize)
|
||||
return ratelimit.New(count, ratelimit.Per(time.Second), ratelimit.WithSlack(slackSize))
|
||||
case "hour", "hr":
|
||||
return rate.NewLimiter(rate.Limit(float64(count)/3600.0), burstSize)
|
||||
return ratelimit.New(count, ratelimit.Per(time.Hour), ratelimit.WithSlack(slackSize))
|
||||
case "day", "d":
|
||||
return ratelimit.New(count, ratelimit.Per(24*time.Hour), ratelimit.WithSlack(slackSize))
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ type RealDebrid struct {
|
||||
DownloadUncached bool
|
||||
client *request.Client
|
||||
downloadClient *request.Client
|
||||
repairClient *request.Client
|
||||
autoExpiresLinksAfter time.Duration
|
||||
|
||||
MountPath string
|
||||
@@ -49,6 +50,8 @@ type RealDebrid struct {
|
||||
|
||||
func New(dc config.Debrid) (*RealDebrid, error) {
|
||||
rl := request.ParseRateLimit(dc.RateLimit)
|
||||
repairRl := request.ParseRateLimit(dc.RepairRateLimit)
|
||||
downloadRl := request.ParseRateLimit(dc.DownloadRateLimit)
|
||||
|
||||
headers := map[string]string{
|
||||
"Authorization": fmt.Sprintf("Bearer %s", dc.APIKey),
|
||||
@@ -77,11 +80,20 @@ func New(dc config.Debrid) (*RealDebrid, error) {
|
||||
request.WithProxy(dc.Proxy),
|
||||
),
|
||||
downloadClient: request.New(
|
||||
request.WithRateLimiter(downloadRl),
|
||||
request.WithLogger(_log),
|
||||
request.WithMaxRetries(10),
|
||||
request.WithRetryableStatus(429, 447, 502),
|
||||
request.WithProxy(dc.Proxy),
|
||||
),
|
||||
repairClient: request.New(
|
||||
request.WithRateLimiter(repairRl),
|
||||
request.WithHeaders(headers),
|
||||
request.WithLogger(_log),
|
||||
request.WithMaxRetries(4),
|
||||
request.WithRetryableStatus(429, 502),
|
||||
request.WithProxy(dc.Proxy),
|
||||
),
|
||||
MountPath: dc.Folder,
|
||||
logger: logger.New(dc.Name),
|
||||
rarSemaphore: make(chan struct{}, 2),
|
||||
@@ -608,7 +620,7 @@ func (r *RealDebrid) CheckLink(link string) error {
|
||||
"link": {link},
|
||||
}
|
||||
req, _ := http.NewRequest(http.MethodPost, url, strings.NewReader(payload.Encode()))
|
||||
resp, err := r.client.Do(req)
|
||||
resp, err := r.repairClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -670,35 +670,17 @@ func (r *Repair) getWebdavBrokenFiles(job *Job, media arr.Content) []arr.Content
|
||||
|
||||
brokenFiles := make([]arr.ContentFile, 0)
|
||||
uniqueParents := collectFiles(media)
|
||||
var brokenFilesMutex sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Limit concurrent torrent checks
|
||||
semaphore := make(chan struct{}, min(len(uniqueParents), 30)) // Limit to 5 concurrent checks
|
||||
for torrentPath, files := range uniqueParents {
|
||||
wg.Add(1)
|
||||
go func(torrentPath string, files []arr.ContentFile) {
|
||||
defer wg.Done()
|
||||
semaphore <- struct{}{} // Acquire
|
||||
defer func() { <-semaphore }() // Release
|
||||
|
||||
select {
|
||||
case <-job.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
brokenFilesForTorrent := r.checkTorrentFiles(torrentPath, files, clients, caches)
|
||||
|
||||
if len(brokenFilesForTorrent) > 0 {
|
||||
brokenFilesMutex.Lock()
|
||||
brokenFiles = append(brokenFiles, brokenFilesForTorrent...)
|
||||
brokenFilesMutex.Unlock()
|
||||
}
|
||||
}(torrentPath, files)
|
||||
select {
|
||||
case <-job.ctx.Done():
|
||||
return brokenFiles
|
||||
default:
|
||||
}
|
||||
brokenFilesForTorrent := r.checkTorrentFiles(torrentPath, files, clients, caches)
|
||||
if len(brokenFilesForTorrent) > 0 {
|
||||
brokenFiles = append(brokenFiles, brokenFilesForTorrent...)
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
if len(brokenFiles) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user