Merge branch 'beta'
Some checks failed
GoReleaser / goreleaser (push) Has been cancelled
Release Docker Build / docker (push) Has been cancelled

This commit is contained in:
Mukhtar Akere
2025-06-26 21:15:22 +01:00
21 changed files with 245 additions and 463 deletions

View File

@@ -72,5 +72,5 @@ body:
label: Trace Logs have been provided as applicable
description: Trace logs are **generally required** and are not optional for all bug reports and contain `trace`. Info logs are invalid for bug reports and do not contain `debug` nor `trace`
options:
- label: I have read and followed the steps in the wiki link above and provided the required trace logs - the logs contain `trace` - that are relevant and show this issue.
- label: I have read and followed the steps in the documentation link and provided the required trace logs - the logs contain `trace` - that are relevant and show this issue.
required: true

View File

@@ -62,6 +62,11 @@ Create a `config.json` file in `/opt/decypharr/` with your Decypharr configurati
```
### Docker Compose Setup
- Check your current user and group IDs by running `id -u` and `id -g` in your terminal. You can use these values to set the `PUID` and `PGID` environment variables in the Docker Compose file.
- You should also set `user` to your user ID and group ID in the Docker Compose file to ensure proper file permissions.
Create a `docker-compose.yml` file with the following content:
```yaml
@@ -69,11 +74,14 @@ services:
decypharr:
image: cy01/blackhole:latest
container_name: decypharr
user: "${PUID:-1000}:${PGID:-1000}"
volumes:
- /mnt/:/mnt:rslave
- /opt/decypharr/:/app
environment:
- UMASK=002
- PUID=1000 # Replace with your user ID
- PGID=1000 # Replace with your group ID
ports:
- "8282:8282/tcp"
restart: unless-stopped

3
go.mod
View File

@@ -14,16 +14,17 @@ require (
github.com/robfig/cron/v3 v3.0.1
github.com/rs/zerolog v1.33.0
github.com/stanNthe5/stringbuf v0.0.3
go.uber.org/ratelimit v0.3.1
golang.org/x/crypto v0.33.0
golang.org/x/net v0.35.0
golang.org/x/sync v0.12.0
golang.org/x/time v0.8.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)
require (
github.com/anacrolix/missinggo v1.3.0 // indirect
github.com/anacrolix/missinggo/v2 v2.7.3 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/google/go-cmp v0.6.0 // indirect

8
go.sum
View File

@@ -36,6 +36,8 @@ github.com/anacrolix/tagflag v1.1.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CM
github.com/anacrolix/torrent v1.55.0 h1:s9yh/YGdPmbN9dTa+0Inh2dLdrLQRvEAj1jdFW/Hdd8=
github.com/anacrolix/torrent v1.55.0/go.mod h1:sBdZHBSZNj4de0m+EbYg7vvs/G/STubxu/GzzNbojsE=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/immutable v0.2.0/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
@@ -216,8 +218,12 @@ github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPy
go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0=
go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus=
@@ -266,8 +272,6 @@ golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@@ -19,17 +19,19 @@ var (
)
type Debrid struct {
Name string `json:"name,omitempty"`
APIKey string `json:"api_key,omitempty"`
DownloadAPIKeys []string `json:"download_api_keys,omitempty"`
Folder string `json:"folder,omitempty"`
DownloadUncached bool `json:"download_uncached,omitempty"`
CheckCached bool `json:"check_cached,omitempty"`
RateLimit string `json:"rate_limit,omitempty"` // 200/minute or 10/second
Proxy string `json:"proxy,omitempty"`
UnpackRar bool `json:"unpack_rar,omitempty"`
AddSamples bool `json:"add_samples,omitempty"`
MinimumFreeSlot int `json:"minimum_free_slot,omitempty"` // Minimum active pots to use this debrid
Name string `json:"name,omitempty"`
APIKey string `json:"api_key,omitempty"`
DownloadAPIKeys []string `json:"download_api_keys,omitempty"`
Folder string `json:"folder,omitempty"`
DownloadUncached bool `json:"download_uncached,omitempty"`
CheckCached bool `json:"check_cached,omitempty"`
RateLimit string `json:"rate_limit,omitempty"` // 200/minute or 10/second
RepairRateLimit string `json:"repair_rate_limit,omitempty"`
DownloadRateLimit string `json:"download_rate_limit,omitempty"`
Proxy string `json:"proxy,omitempty"`
UnpackRar bool `json:"unpack_rar,omitempty"`
AddSamples bool `json:"add_samples,omitempty"`
MinimumFreeSlot int `json:"minimum_free_slot,omitempty"` // Minimum active pots to use this debrid
UseWebDav bool `json:"use_webdav,omitempty"`
WebDav

View File

@@ -9,10 +9,9 @@ import (
"fmt"
"github.com/rs/zerolog"
"github.com/sirrobot01/decypharr/internal/logger"
"go.uber.org/ratelimit"
"golang.org/x/net/proxy"
"golang.org/x/time/rate"
"io"
"math"
"math/rand"
"net"
"net/http"
@@ -52,7 +51,7 @@ type ClientOption func(*Client)
// Client represents an HTTP client with additional capabilities
type Client struct {
client *http.Client
rateLimiter *rate.Limiter
rateLimiter ratelimit.Limiter
headers map[string]string
headersMu sync.RWMutex
maxRetries int
@@ -84,7 +83,7 @@ func WithRedirectPolicy(policy func(req *http.Request, via []*http.Request) erro
}
// WithRateLimiter sets a rate limiter
func WithRateLimiter(rl *rate.Limiter) ClientOption {
func WithRateLimiter(rl ratelimit.Limiter) ClientOption {
return func(c *Client) {
c.rateLimiter = rl
}
@@ -136,9 +135,11 @@ func WithProxy(proxyURL string) ClientOption {
// doRequest performs a single HTTP request with rate limiting
func (c *Client) doRequest(req *http.Request) (*http.Response, error) {
if c.rateLimiter != nil {
err := c.rateLimiter.Wait(req.Context())
if err != nil {
return nil, fmt.Errorf("rate limiter wait: %w", err)
select {
case <-req.Context().Done():
return nil, req.Context().Err()
default:
c.rateLimiter.Take()
}
}
@@ -339,7 +340,10 @@ func New(options ...ClientOption) *Client {
return client
}
func ParseRateLimit(rateStr string) *rate.Limiter {
func ParseRateLimit(rateStr string) ratelimit.Limiter {
if rateStr == "" {
return nil
}
parts := strings.SplitN(rateStr, "/", 2)
if len(parts) != 2 {
return nil
@@ -351,23 +355,21 @@ func ParseRateLimit(rateStr string) *rate.Limiter {
return nil
}
// Set slack size to 10%
slackSize := count / 10
// normalize unit
unit := strings.ToLower(strings.TrimSpace(parts[1]))
unit = strings.TrimSuffix(unit, "s")
burstSize := int(math.Ceil(float64(count) * 0.1))
if burstSize < 1 {
burstSize = 1
}
if burstSize > count {
burstSize = count
}
switch unit {
case "minute", "min":
return rate.NewLimiter(rate.Limit(float64(count)/60.0), burstSize)
return ratelimit.New(count, ratelimit.Per(time.Minute), ratelimit.WithSlack(slackSize))
case "second", "sec":
return rate.NewLimiter(rate.Limit(float64(count)), burstSize)
return ratelimit.New(count, ratelimit.Per(time.Second), ratelimit.WithSlack(slackSize))
case "hour", "hr":
return rate.NewLimiter(rate.Limit(float64(count)/3600.0), burstSize)
return ratelimit.New(count, ratelimit.Per(time.Hour), ratelimit.WithSlack(slackSize))
case "day", "d":
return ratelimit.New(count, ratelimit.Per(24*time.Hour), ratelimit.WithSlack(slackSize))
default:
return nil
}

View File

@@ -40,12 +40,10 @@ func RemoveInvalidChars(value string) string {
}
func RemoveExtension(value string) string {
loc := mediaRegex.FindStringIndex(value)
if loc != nil {
if loc := mediaRegex.FindStringIndex(value); loc != nil {
return value[:loc[0]]
} else {
return value
}
return value
}
func IsMediaFile(path string) bool {

View File

@@ -2,6 +2,7 @@ package realdebrid
import (
"bytes"
"cmp"
"encoding/json"
"errors"
"fmt"
@@ -33,6 +34,7 @@ type RealDebrid struct {
DownloadUncached bool
client *request.Client
downloadClient *request.Client
repairClient *request.Client
autoExpiresLinksAfter time.Duration
MountPath string
@@ -49,6 +51,8 @@ type RealDebrid struct {
func New(dc config.Debrid) (*RealDebrid, error) {
rl := request.ParseRateLimit(dc.RateLimit)
repairRl := request.ParseRateLimit(cmp.Or(dc.RepairRateLimit, dc.RateLimit))
downloadRl := request.ParseRateLimit(cmp.Or(dc.DownloadRateLimit, dc.RateLimit))
headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", dc.APIKey),
@@ -77,11 +81,20 @@ func New(dc config.Debrid) (*RealDebrid, error) {
request.WithProxy(dc.Proxy),
),
downloadClient: request.New(
request.WithRateLimiter(downloadRl),
request.WithLogger(_log),
request.WithMaxRetries(10),
request.WithRetryableStatus(429, 447, 502),
request.WithProxy(dc.Proxy),
),
repairClient: request.New(
request.WithRateLimiter(repairRl),
request.WithHeaders(headers),
request.WithLogger(_log),
request.WithMaxRetries(4),
request.WithRetryableStatus(429, 502),
request.WithProxy(dc.Proxy),
),
MountPath: dc.Folder,
logger: logger.New(dc.Name),
rarSemaphore: make(chan struct{}, 2),
@@ -608,7 +621,7 @@ func (r *RealDebrid) CheckLink(link string) error {
"link": {link},
}
req, _ := http.NewRequest(http.MethodPost, url, strings.NewReader(payload.Encode()))
resp, err := r.client.Do(req)
resp, err := r.repairClient.Do(req)
if err != nil {
return err
}
@@ -621,7 +634,7 @@ func (r *RealDebrid) CheckLink(link string) error {
func (r *RealDebrid) _getDownloadLink(file *types.File) (*types.DownloadLink, error) {
url := fmt.Sprintf("%s/unrestrict/link/", r.Host)
_link := file.Link
if strings.HasPrefix(_link, "https://real-debrid.com/d/") {
if strings.HasPrefix(file.Link, "https://real-debrid.com/d/") && len(file.Link) > 39 {
_link = file.Link[0:39]
}
payload := gourl.Values{

View File

@@ -514,9 +514,9 @@ func (c *Cache) setTorrent(t CachedTorrent, callback func(torrent CachedTorrent)
updatedTorrent.Files = mergedFiles
}
c.torrents.set(torrentName, t, updatedTorrent)
c.SaveTorrent(t)
go c.SaveTorrent(t)
if callback != nil {
callback(updatedTorrent)
go callback(updatedTorrent)
}
}
@@ -702,6 +702,7 @@ func (c *Cache) ProcessTorrent(t *types.Torrent) error {
func (c *Cache) Add(t *types.Torrent) error {
if len(t.Files) == 0 {
c.logger.Warn().Msgf("Torrent %s has no files to add. Refreshing", t.Id)
if err := c.client.UpdateTorrent(t); err != nil {
return fmt.Errorf("failed to update torrent: %w", err)
}

View File

@@ -146,57 +146,65 @@ func (c *Cache) refreshRclone() error {
},
}
// Create form data
data := ""
data := c.buildRcloneRequestData()
if err := c.sendRcloneRequest(client, "vfs/forget", data); err != nil {
c.logger.Error().Err(err).Msg("Failed to send rclone vfs/forget request")
}
if err := c.sendRcloneRequest(client, "vfs/refresh", data); err != nil {
c.logger.Error().Err(err).Msg("Failed to send rclone vfs/refresh request")
}
return nil
}
func (c *Cache) buildRcloneRequestData() string {
cfg := c.config
dirs := strings.FieldsFunc(cfg.RcRefreshDirs, func(r rune) bool {
return r == ',' || r == '&'
})
if len(dirs) == 0 {
data = "dir=__all__"
} else {
for index, dir := range dirs {
if dir != "" {
if index == 0 {
data += "dir=" + dir
} else {
data += "&dir" + fmt.Sprint(index+1) + "=" + dir
}
return "dir=__all__"
}
var data strings.Builder
for index, dir := range dirs {
if dir != "" {
if index == 0 {
data.WriteString("dir=" + dir)
} else {
data.WriteString("&dir" + fmt.Sprint(index+1) + "=" + dir)
}
}
}
return data.String()
}
sendRequest := func(endpoint string) error {
req, err := http.NewRequest("POST", fmt.Sprintf("%s/%s", cfg.RcUrl, endpoint), strings.NewReader(data))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
if cfg.RcUser != "" && cfg.RcPass != "" {
req.SetBasicAuth(cfg.RcUser, cfg.RcPass)
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
return fmt.Errorf("failed to perform %s: %s - %s", endpoint, resp.Status, string(body))
}
_, _ = io.Copy(io.Discard, resp.Body)
return nil
}
if err := sendRequest("vfs/forget"); err != nil {
return err
}
if err := sendRequest("vfs/refresh"); err != nil {
func (c *Cache) sendRcloneRequest(client *http.Client, endpoint, data string) error {
req, err := http.NewRequest("POST", fmt.Sprintf("%s/%s", c.config.RcUrl, endpoint), strings.NewReader(data))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
if c.config.RcUser != "" && c.config.RcPass != "" {
req.SetBasicAuth(c.config.RcUser, c.config.RcPass)
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
return fmt.Errorf("failed to perform %s: %s - %s", endpoint, resp.Status, string(body))
}
_, _ = io.Copy(io.Discard, resp.Body)
return nil
}

View File

@@ -89,23 +89,31 @@ func (c *Cache) GetBrokenFiles(t *CachedTorrent, filenames []string) []string {
}
files = t.Files
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg.Add(len(files))
for _, f := range files {
// Check if file link is still missing
go func(f types.File) {
defer wg.Done()
select {
case <-ctx.Done():
return
default:
}
if f.Link == "" {
brokenFiles = append(brokenFiles, f.Name)
} else {
// Check if file.Link not in the downloadLink Cache
if err := c.client.CheckLink(f.Link); err != nil {
if errors.Is(err, utils.HosterUnavailableError) {
brokenFiles = append(brokenFiles, f.Name)
}
cancel()
return
}
if err := c.client.CheckLink(f.Link); err != nil {
if errors.Is(err, utils.HosterUnavailableError) {
cancel() // Signal all other goroutines to stop
return
}
}
}(f)
@@ -113,6 +121,13 @@ func (c *Cache) GetBrokenFiles(t *CachedTorrent, filenames []string) []string {
wg.Wait()
// If context was cancelled, mark all files as broken
if ctx.Err() != nil {
for _, f := range files {
brokenFiles = append(brokenFiles, f.Name)
}
}
// Try to reinsert the torrent if it's broken
if len(brokenFiles) > 0 && t.Torrent != nil {
// Check if the torrent is already in progress

View File

@@ -171,17 +171,18 @@ func (tc *torrentCache) refreshListing() {
wg.Add(1) // for all listing
go func() {
defer wg.Done()
listing := make([]os.FileInfo, len(all))
for i, sf := range all {
listing[i] = &fileInfo{sf.id, sf.name, sf.size, 0755 | os.ModeDir, sf.modTime, true}
}
tc.listing.Store(listing)
}()
wg.Done()
wg.Add(1)
// For __bad__
go func() {
defer wg.Done()
listing := make([]os.FileInfo, 0)
for _, sf := range all {
if sf.bad {
@@ -203,7 +204,6 @@ func (tc *torrentCache) refreshListing() {
}
tc.folders.Unlock()
}()
wg.Done()
now := time.Now()
wg.Add(len(tc.directoriesFilters)) // for each directory filter

View File

@@ -18,7 +18,7 @@ func NewAccounts(debridConf config.Debrid) *Accounts {
if token == "" {
continue
}
account := newAccount(token, idx)
account := newAccount(debridConf.Name, token, idx)
accounts = append(accounts, account)
}
@@ -33,6 +33,7 @@ func NewAccounts(debridConf config.Debrid) *Accounts {
}
type Account struct {
Debrid string // e.g., "realdebrid", "torbox", etc.
Order int
Disabled bool
Token string
@@ -176,30 +177,31 @@ func (a *Accounts) SetDownloadLinks(links map[string]*DownloadLink) {
a.Current().setLinks(links)
}
func newAccount(token string, index int) *Account {
func newAccount(debridName, token string, index int) *Account {
return &Account{
Token: token,
Order: index,
links: make(map[string]*DownloadLink),
Debrid: debridName,
Token: token,
Order: index,
links: make(map[string]*DownloadLink),
}
}
func (a *Account) getLink(fileLink string) (*DownloadLink, bool) {
a.mu.RLock()
defer a.mu.RUnlock()
dl, ok := a.links[fileLink[0:39]]
dl, ok := a.links[a.sliceFileLink(fileLink)]
return dl, ok
}
func (a *Account) setLink(fileLink string, dl *DownloadLink) {
a.mu.Lock()
defer a.mu.Unlock()
a.links[fileLink[0:39]] = dl
a.links[a.sliceFileLink(fileLink)] = dl
}
func (a *Account) deleteLink(fileLink string) {
a.mu.Lock()
defer a.mu.Unlock()
delete(a.links, fileLink[0:39])
delete(a.links, a.sliceFileLink(fileLink))
}
func (a *Account) resetDownloadLinks() {
a.mu.Lock()
@@ -225,6 +227,17 @@ func (a *Account) setLinks(links map[string]*DownloadLink) {
// Expired, continue
continue
}
a.links[dl.Link[0:39]] = dl
a.links[a.sliceFileLink(dl.Link)] = dl
}
}
// slice download link
func (a *Account) sliceFileLink(fileLink string) string {
if a.Debrid != "realdebrid" {
return fileLink
}
if len(fileLink) < 39 {
return fileLink
}
return fileLink[0:39]
}

View File

@@ -75,26 +75,6 @@ type Job struct {
ctx context.Context
}
func (j *Job) getUnprocessedBrokenItems() map[string][]arr.ContentFile {
items := make(map[string][]arr.ContentFile)
for arrName, files := range j.BrokenItems {
if len(files) == 0 {
continue // Skip empty file lists
}
items[arrName] = make([]arr.ContentFile, 0, len(files))
for _, file := range files {
if file.Path != "" && file.TargetPath != "" && !file.Processed {
items[arrName] = append(items[arrName], file)
}
}
}
if len(items) == 0 {
return nil // Return nil if no unprocessed items found
}
return items
}
func New(arrs *arr.Storage, engine *debrid.Storage) *Repair {
cfg := config.Get()
workers := runtime.NumCPU() * 20
@@ -690,35 +670,17 @@ func (r *Repair) getWebdavBrokenFiles(job *Job, media arr.Content) []arr.Content
brokenFiles := make([]arr.ContentFile, 0)
uniqueParents := collectFiles(media)
var brokenFilesMutex sync.Mutex
var wg sync.WaitGroup
// Limit concurrent torrent checks
semaphore := make(chan struct{}, min(len(uniqueParents), 30)) // Limit to 5 concurrent checks
for torrentPath, files := range uniqueParents {
wg.Add(1)
go func(torrentPath string, files []arr.ContentFile) {
defer wg.Done()
semaphore <- struct{}{} // Acquire
defer func() { <-semaphore }() // Release
select {
case <-job.ctx.Done():
return
default:
}
brokenFilesForTorrent := r.checkTorrentFiles(torrentPath, files, clients, caches)
if len(brokenFilesForTorrent) > 0 {
brokenFilesMutex.Lock()
brokenFiles = append(brokenFiles, brokenFilesForTorrent...)
brokenFilesMutex.Unlock()
}
}(torrentPath, files)
select {
case <-job.ctx.Done():
return brokenFiles
default:
}
brokenFilesForTorrent := r.checkTorrentFiles(torrentPath, files, clients, caches)
if len(brokenFilesForTorrent) > 0 {
brokenFiles = append(brokenFiles, brokenFilesForTorrent...)
}
}
wg.Wait()
if len(brokenFiles) == 0 {
return nil
}
@@ -765,7 +727,7 @@ func (r *Repair) ProcessJob(id string) error {
return fmt.Errorf("job %s already failed", id)
}
brokenItems := job.getUnprocessedBrokenItems()
brokenItems := job.BrokenItems
if len(brokenItems) == 0 {
r.logger.Info().Msgf("No broken items found for job %s", id)
job.CompletedAt = time.Now()
@@ -773,144 +735,63 @@ func (r *Repair) ProcessJob(id string) error {
return nil
}
r.logger.Info().Msgf("Processing job %s with %d broken items", id, len(brokenItems))
go r.processJob(job, brokenItems)
return nil
}
func (r *Repair) processJob(job *Job, brokenItems map[string][]arr.ContentFile) {
if job.ctx == nil || job.ctx.Err() != nil {
job.ctx, job.cancelFunc = context.WithCancel(r.ctx)
}
errs := make([]error, 0)
processedCount := 0
g, ctx := errgroup.WithContext(job.ctx)
g.SetLimit(r.workers)
for arrName, items := range brokenItems {
select {
case <-job.ctx.Done():
r.logger.Info().Msgf("Job %s cancelled", job.ID)
job.Status = JobCancelled
job.CompletedAt = time.Now()
job.Error = "Job was cancelled"
return
default:
// Continue processing
}
items := items
arrName := arrName
g.Go(func() error {
a := r.arrs.Get(arrName)
if a == nil {
errs = append(errs, fmt.Errorf("arr %s not found", arrName))
continue
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err := a.DeleteFiles(items); err != nil {
errs = append(errs, fmt.Errorf("failed to delete broken items for %s: %w", arrName, err))
continue
}
// Search for missing items
if err := a.SearchMissing(items); err != nil {
errs = append(errs, fmt.Errorf("failed to search missing items for %s: %w", arrName, err))
continue
}
processedCount += len(items)
// Mark this item as processed
for i := range items {
items[i].Processed = true
}
job.BrokenItems[arrName] = items
a := r.arrs.Get(arrName)
if a == nil {
r.logger.Error().Msgf("Arr %s not found", arrName)
return nil
}
if err := a.DeleteFiles(items); err != nil {
r.logger.Error().Err(err).Msgf("Failed to delete broken items for %s", arrName)
return nil
}
// Search for missing items
if err := a.SearchMissing(items); err != nil {
r.logger.Error().Err(err).Msgf("Failed to search missing items for %s", arrName)
return nil
}
return nil
})
}
// Update job status to in-progress
job.Status = JobProcessing
r.saveToFile()
if len(errs) > 0 {
errMsg := fmt.Sprintf("Job %s encountered errors: %v", job.ID, errs)
job.Error = errMsg
job.FailedAt = time.Now()
job.Status = JobFailed
r.logger.Error().Msg(errMsg)
go func() {
if err := request.SendDiscordMessage("repair_failed", "error", job.discordContext()); err != nil {
r.logger.Error().Msgf("Error sending discord message: %v", err)
}
}()
return
}
remainingItems := job.getUnprocessedBrokenItems()
if len(remainingItems) == 0 {
// All items processed, mark job as completed
job.CompletedAt = time.Now()
job.Status = JobCompleted
r.logger.Info().Msgf("Job %s completed successfully (all items processed)", job.ID)
go func() {
if err := request.SendDiscordMessage("repair_complete", "success", job.discordContext()); err != nil {
r.logger.Error().Msgf("Error sending discord message: %v", err)
}
}()
} else {
// Some items still remain, keep job as pending
job.Status = JobPending
r.logger.Info().Msgf("Job %s: processed %d selected items successfully, %d items remaining", job.ID, processedCount, len(remainingItems))
go func() {
if err := request.SendDiscordMessage("repair_partial_complete", "info", job.discordContext()); err != nil {
r.logger.Error().Msgf("Error sending discord message: %v", err)
}
}()
}
r.saveToFile()
}
// ProcessJobItems processes the selected items for a job
// selectedItems is the map of arr names to the list of file IDs to process
func (r *Repair) ProcessJobItems(id string, selectedItems map[string][]int) error {
job := r.GetJob(id)
if job == nil {
return fmt.Errorf("job %s not found", id)
}
if job.Status != JobPending {
return fmt.Errorf("job %s not pending", id)
}
if job.StartedAt.IsZero() {
return fmt.Errorf("job %s not started", id)
}
if !job.CompletedAt.IsZero() {
return fmt.Errorf("job %s already completed", id)
}
if !job.FailedAt.IsZero() {
return fmt.Errorf("job %s already failed", id)
}
brokenItems := job.getUnprocessedBrokenItems()
validatedItems := make(map[string][]arr.ContentFile)
for arrName, selectedItemsList := range selectedItems {
if jobItems, exists := brokenItems[arrName]; exists {
validItems := make([]arr.ContentFile, 0, len(selectedItemsList))
for _, item := range selectedItemsList {
// Find the item in the job items
for _, jobItem := range jobItems {
if jobItem.FileId == item {
validItems = append(validItems, jobItem)
break
}
}
}
if len(validItems) > 0 {
validatedItems[arrName] = validItems
}
// Launch a goroutine to wait for completion and update the job
go func() {
if err := g.Wait(); err != nil {
job.FailedAt = time.Now()
job.Error = err.Error()
job.CompletedAt = time.Now()
job.Status = JobFailed
r.logger.Error().Err(err).Msgf("Job %s failed", id)
} else {
job.CompletedAt = time.Now()
job.Status = JobCompleted
r.logger.Info().Msgf("Job %s completed successfully", id)
}
}
if len(validatedItems) == 0 {
return fmt.Errorf("no valid items found for job %s", id)
}
job.Status = JobProcessing
r.saveToFile()
go r.processJob(job, validatedItems)
r.saveToFile()
}()
return nil
}

View File

@@ -103,10 +103,5 @@ func (s *Server) getLogs(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Expires", "0")
// Stream the file
_, err = io.Copy(w, file)
if err != nil {
s.logger.Error().Err(err).Msg("Error streaming log file")
http.Error(w, "Error streaming log file", http.StatusInternalServerError)
return
}
_, _ = io.Copy(w, file)
}

View File

@@ -57,6 +57,8 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp
client := deb.Client()
downloadingStatuses := client.GetDownloadingStatus()
_arr := importReq.Arr
backoff := time.NewTimer(s.refreshInterval)
defer backoff.Stop()
for debridTorrent.Status != "downloaded" {
s.logger.Debug().Msgf("%s <- (%s) Download Progress: %.2f%%", debridTorrent.Debrid, debridTorrent.Name, debridTorrent.Progress)
dbT, err := client.CheckStatus(debridTorrent)
@@ -83,10 +85,12 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp
if debridTorrent.Status == "downloaded" || !utils.Contains(downloadingStatuses, debridTorrent.Status) {
break
}
if !utils.Contains(client.GetDownloadingStatus(), debridTorrent.Status) {
break
select {
case <-backoff.C:
// Increase interval gradually, cap at max
nextInterval := min(s.refreshInterval*2, 30*time.Second)
backoff.Reset(nextInterval)
}
time.Sleep(s.refreshInterval)
}
var torrentSymlinkPath string
var err error
@@ -96,15 +100,15 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp
timer := time.Now()
onFailed := func(err error) {
if err != nil {
s.markTorrentAsFailed(torrent)
go func() {
_ = client.DeleteTorrent(debridTorrent.Id)
}()
s.logger.Error().Err(err).Msgf("Error occured while processing torrent %s", debridTorrent.Name)
importReq.markAsFailed(err, torrent, debridTorrent)
return
}
s.markTorrentAsFailed(torrent)
go func() {
if deleteErr := client.DeleteTorrent(debridTorrent.Id); deleteErr != nil {
s.logger.Warn().Err(deleteErr).Msgf("Failed to delete torrent %s", debridTorrent.Id)
}
}()
s.logger.Error().Err(err).Msgf("Error occured while processing torrent %s", debridTorrent.Name)
importReq.markAsFailed(err, torrent, debridTorrent)
return
}
onSuccess := func(torrentSymlinkPath string) {
@@ -118,7 +122,9 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp
s.logger.Error().Msgf("Error sending discord message: %v", err)
}
}()
_arr.Refresh()
go func() {
_arr.Refresh()
}()
}
switch importReq.Action {
@@ -137,7 +143,6 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp
rclonePath := filepath.Join(debridTorrent.MountPath, cache.GetTorrentFolder(debridTorrent)) // /mnt/remote/realdebrid/MyTVShow
torrentFolderNoExt := utils.RemoveExtension(debridTorrent.Name)
torrentSymlinkPath, err = s.createSymlinksWebdav(torrent, debridTorrent, rclonePath, torrentFolderNoExt) // /mnt/symlinks/{category}/MyTVShow/
} else {
// User is using either zurg or debrid webdav
torrentSymlinkPath, err = s.processSymlink(torrent, debridTorrent) // /mnt/symlinks/{category}/MyTVShow/

View File

@@ -326,28 +326,6 @@ func (wb *Web) handleProcessRepairJob(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
func (wb *Web) handleProcessRepairJobItems(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
if id == "" {
http.Error(w, "No job ID provided", http.StatusBadRequest)
return
}
var req struct {
Items map[string][]int `json:"items"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid request body: "+err.Error(), http.StatusBadRequest)
return
}
_store := store.Get()
if err := _store.Repair().ProcessJobItems(id, req.Items); err != nil {
wb.logger.Error().Err(err).Msg("Failed to process repair job items")
http.Error(w, "Failed to process job items: "+err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
func (wb *Web) handleDeleteRepairJob(w http.ResponseWriter, r *http.Request) {
// Read ids from body
var req struct {

View File

@@ -28,7 +28,6 @@ func (wb *Web) Routes() http.Handler {
r.Post("/repair", wb.handleRepairMedia)
r.Get("/repair/jobs", wb.handleGetRepairJobs)
r.Post("/repair/jobs/{id}/process", wb.handleProcessRepairJob)
r.Post("/repair/jobs/{id}/process-items", wb.handleProcessRepairJobItems)
r.Post("/repair/jobs/{id}/stop", wb.handleStopRepairJob)
r.Delete("/repair/jobs", wb.handleDeleteRepairJob)
r.Get("/torrents", wb.handleGetTorrents)

View File

@@ -1005,6 +1005,9 @@
if (config.max_file_size) {
document.querySelector('[name="max_file_size"]').value = config.max_file_size;
}
if (config.remove_stalled_after) {
document.querySelector('[name="remove_stalled_after"]').value = config.remove_stalled_after;
}
if (config.discord_webhook_url) {
document.querySelector('[name="discord_webhook_url"]').value = config.discord_webhook_url;
}

View File

@@ -130,13 +130,7 @@
<h6 class="mb-0">
Broken Items
<span class="badge bg-secondary" id="totalItemsCount">0</span>
<span class="badge bg-primary" id="selectedItemsCount">0 selected</span>
</h6>
<div class="d-flex gap-2">
<button class="btn btn-sm btn-primary" id="processSelectedItemsBtn" disabled>
<i class="bi bi-play-fill me-1"></i>Process Selected
</button>
</div>
</div>
<!-- Filters and Search -->
@@ -171,11 +165,6 @@
<table class="table table-sm table-striped table-hover">
<thead class="sticky-top">
<tr>
<th style="width: 40px;">
<div class="form-check">
<input class="form-check-input" type="checkbox" id="selectAllItemsTable">
</div>
</th>
<th>Arr</th>
<th>Path</th>
<th style="width: 100px;">Type</th>
@@ -294,7 +283,7 @@
if (!response.ok) throw new Error(await response.text());
createToast('Repair process initiated successfully!');
loadJobs(1); // Refresh jobs after submission
await loadJobs(1); // Refresh jobs after submission
} catch (error) {
createToast(`Error starting repair: ${error.message}`, 'error');
} finally {
@@ -459,9 +448,8 @@
document.querySelectorAll('#jobsPagination a[data-page]').forEach(link => {
link.addEventListener('click', (e) => {
e.preventDefault();
const newPage = parseInt(e.currentTarget.dataset.page);
currentPage = newPage;
renderJobsTable(newPage);
currentPage = parseInt(e.currentTarget.dataset.page);
renderJobsTable(currentPage);
});
});
@@ -526,7 +514,6 @@
// modal functions
function processItemsData(brokenItems) {
const items = [];
let itemId = 0;
for (const [arrName, itemsArray] of Object.entries(brokenItems)) {
if (itemsArray && itemsArray.length > 0) {
@@ -601,51 +588,15 @@
row.dataset.itemId = item.id;
row.innerHTML = `
<td>
<div class="form-check">
<input class="form-check-input item-checkbox"
type="checkbox"
value="${item.id}"
${selectedItems.has(item.id) ? 'checked' : ''}>
</div>
</td>
<td><span class="badge bg-info">${item.arr}</span></td>
<td><small class="text-muted" title="${item.path}">${item.path}</small></td>
<td><span class="badge ${item.type === 'movie' ? 'bg-primary' : item.type === 'tv' ? 'bg-success' : 'bg-secondary'}">${item.type}</span></td>
<td><small>${formatFileSize(item.size)}</small></td>
`;
// Make row clickable to toggle selection
row.addEventListener('click', (e) => {
if (e.target.type !== 'checkbox') {
const checkbox = row.querySelector('.item-checkbox');
checkbox.checked = !checkbox.checked;
checkbox.dispatchEvent(new Event('change'));
}
});
tableBody.appendChild(row);
}
// Add event listeners to checkboxes
document.querySelectorAll('.item-checkbox').forEach(checkbox => {
checkbox.addEventListener('change', (e) => {
const itemId = parseInt(e.target.value);
const row = e.target.closest('tr');
if (e.target.checked) {
selectedItems.add(itemId);
row.classList.add('selected');
} else {
selectedItems.delete(itemId);
row.classList.remove('selected');
}
updateItemsStats();
updateSelectAllStates();
});
});
// Create pagination
if (totalPages > 1) {
const prevLi = document.createElement('li');
@@ -674,45 +625,18 @@
document.querySelectorAll('#itemsPagination a[data-items-page]').forEach(link => {
link.addEventListener('click', (e) => {
e.preventDefault();
const newPage = parseInt(e.currentTarget.dataset.itemsPage);
currentItemsPage = newPage;
currentItemsPage = parseInt(e.currentTarget.dataset.itemsPage);
renderBrokenItemsTable();
});
});
updateSelectAllStates();
}
function updateItemsStats() {
document.getElementById('totalItemsCount').textContent = allBrokenItems.length;
document.getElementById('selectedItemsCount').textContent = `${selectedItems.size} selected`;
const processSelectedBtn = document.getElementById('processSelectedItemsBtn');
processSelectedBtn.disabled = selectedItems.size === 0;
// Update footer stats
const footerStats = document.getElementById('modalFooterStats');
footerStats.textContent = `Total: ${allBrokenItems.length} | Filtered: ${filteredItems.length} | Selected: ${selectedItems.size}`;
}
function updateSelectAllStates() {
const selectAllTableCheckbox = document.getElementById('selectAllItemsTable');
const visibleCheckboxes = document.querySelectorAll('.item-checkbox');
const checkedVisible = document.querySelectorAll('.item-checkbox:checked');
if (visibleCheckboxes.length === 0) {
selectAllTableCheckbox.indeterminate = false;
selectAllTableCheckbox.checked = false;
} else if (checkedVisible.length === visibleCheckboxes.length) {
selectAllTableCheckbox.indeterminate = false;
selectAllTableCheckbox.checked = true;
} else if (checkedVisible.length > 0) {
selectAllTableCheckbox.indeterminate = true;
selectAllTableCheckbox.checked = false;
} else {
selectAllTableCheckbox.indeterminate = false;
selectAllTableCheckbox.checked = false;
}
footerStats.textContent = `Total: ${allBrokenItems.length} | Filtered: ${filteredItems.length}`;
}
function populateArrFilter() {
@@ -728,62 +652,6 @@
});
}
document.getElementById('selectAllItemsTable').addEventListener('change', (e) => {
const visibleCheckboxes = document.querySelectorAll('.item-checkbox');
visibleCheckboxes.forEach(checkbox => {
const itemId = parseInt(checkbox.value);
const row = checkbox.closest('tr');
if (e.target.checked) {
selectedItems.add(itemId);
checkbox.checked = true;
row.classList.add('selected');
} else {
selectedItems.delete(itemId);
checkbox.checked = false;
row.classList.remove('selected');
}
});
updateItemsStats();
});
document.getElementById('processSelectedItemsBtn').addEventListener('click', async () => {
if (selectedItems.size === 0) return;
const selectedItemsData = allBrokenItems.filter(item => selectedItems.has(item.id));
// Group by arr
const itemsByArr = {};
selectedItemsData.forEach(item => {
if (!itemsByArr[item.arr]) {
itemsByArr[item.arr] = [];
}
itemsByArr[item.arr].push(item.id);
});
console.log(itemsByArr);
try {
const response = await fetcher(`/api/repair/jobs/${currentJob.id}/process-items`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ items: itemsByArr })
});
if (!response.ok) throw new Error(await response.text());
createToast(`Processing ${selectedItems.size} selected items`);
// Close modal and refresh jobs
const modal = bootstrap.Modal.getInstance(document.getElementById('jobDetailsModal'));
modal.hide();
loadJobs(currentPage);
} catch (error) {
createToast(`Error processing selected items: ${error.message}`, 'error');
}
});
// Filter event listeners
document.getElementById('itemSearchInput').addEventListener('input', applyFilters);
document.getElementById('arrFilterSelect').addEventListener('change', applyFilters);

View File

@@ -238,17 +238,5 @@ func setVideoResponseHeaders(w http.ResponseWriter, resp *http.Response, isRange
w.Header().Set("Content-Range", contentRange)
}
// Video streaming optimizations
w.Header().Set("Accept-Ranges", "bytes") // Enable seeking
w.Header().Set("Connection", "keep-alive") // Keep connection open
// Prevent buffering in proxies/CDNs
w.Header().Set("X-Accel-Buffering", "no") // Nginx
w.Header().Set("Proxy-Buffering", "off") // General proxy
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Range")
w.Header().Set("Access-Control-Expose-Headers", "Content-Length, Content-Range")
w.WriteHeader(resp.StatusCode)
}