- Add support for multiple api keys
- Fix minor bugs, removes goroutine mem leaks
This commit is contained in:
Mukhtar Akere
2025-03-28 23:44:21 +01:00
parent f9bc7ad914
commit dc2301eb98
24 changed files with 419 additions and 369 deletions

4
go.mod
View File

@@ -1,6 +1,6 @@
module github.com/sirrobot01/debrid-blackhole
go 1.23
go 1.23.0
toolchain go1.23.2
@@ -19,7 +19,7 @@ require (
github.com/valyala/fastjson v1.6.4
golang.org/x/crypto v0.33.0
golang.org/x/net v0.35.0
golang.org/x/sync v0.11.0
golang.org/x/sync v0.12.0
golang.org/x/time v0.8.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)

4
go.sum
View File

@@ -249,8 +249,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View File

@@ -64,6 +64,7 @@ type Repair struct {
ZurgURL string `json:"zurg_url"`
AutoProcess bool `json:"auto_process"`
UseWebDav bool `json:"use_webdav"`
Workers int `json:"workers"`
}
type Auth struct {

View File

@@ -16,6 +16,12 @@ var HosterUnavailableError = &HTTPError{
Code: "hoster_unavailable",
}
var TrafficExceededError = &HTTPError{
StatusCode: 503,
Message: "Traffic exceeded",
Code: "traffic_exceeded",
}
var ErrLinkBroken = &HTTPError{
StatusCode: 404,
Message: "File is unavailable",

View File

@@ -3,7 +3,6 @@ package request
import (
"bytes"
"compress/gzip"
"context"
"crypto/tls"
"fmt"
"github.com/goccy/go-json"
@@ -67,9 +66,16 @@ func (c *Client) WithMaxRetries(retries int) *Client {
}
// WithTimeout sets the request timeout
func (c *Client) WithTimeout(timeout time.Duration) *Client {
func WithTimeout(timeout time.Duration) ClientOption {
return func(c *Client) {
c.timeout = timeout
return c
}
}
func WithRedirectPolicy(policy func(req *http.Request, via []*http.Request) error) ClientOption {
return func(c *Client) {
c.client.CheckRedirect = policy
}
}
// WithRateLimiter sets a rate limiter
@@ -84,14 +90,19 @@ func (c *Client) WithHeaders(headers map[string]string) *Client {
return c
}
func (c *Client) SetHeader(key, value string) {
c.headers[key] = value
}
func (c *Client) WithLogger(logger zerolog.Logger) *Client {
c.logger = logger
return c
}
func (c *Client) WithTransport(transport *http.Transport) *Client {
func WithTransport(transport *http.Transport) ClientOption {
return func(c *Client) {
c.client.Transport = transport
return c
}
}
// WithRetryableStatus adds status codes that should trigger a retry
@@ -128,15 +139,6 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
req.Body.Close()
}
// Apply timeout to the request context if not already present
if c.timeout > 0 {
var cancel context.CancelFunc
ctx := req.Context()
ctx, cancel = context.WithTimeout(ctx, c.timeout)
defer cancel()
req = req.WithContext(ctx)
}
backoff := time.Millisecond * 500
var resp *http.Response
@@ -226,6 +228,15 @@ func (c *Client) MakeRequest(req *http.Request) ([]byte, error) {
return bodyBytes, nil
}
func (c *Client) Get(url string) (*http.Response, error) {
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("creating GET request: %w", err)
}
return c.Do(req)
}
// New creates a new HTTP client with the specified options
func New(options ...ClientOption) *Client {
client := &Client{
@@ -239,6 +250,7 @@ func New(options ...ClientOption) *Client {
http.StatusGatewayTimeout: true,
},
logger: logger.NewLogger("request"),
timeout: 60 * time.Second,
}
// Apply options

View File

@@ -8,6 +8,7 @@ import (
"encoding/hex"
"fmt"
"github.com/anacrolix/torrent/metainfo"
"github.com/sirrobot01/debrid-blackhole/internal/request"
"io"
"log"
"net/http"
@@ -198,9 +199,7 @@ func GetInfohashFromURL(url string) (string, error) {
var magnetLink string
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client := &http.Client{
Timeout: 30 * time.Second,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
redirectFunc := func(req *http.Request, via []*http.Request) error {
if len(via) >= 3 {
return fmt.Errorf("stopped after 3 redirects")
}
@@ -210,8 +209,11 @@ func GetInfohashFromURL(url string) (string, error) {
return http.ErrUseLastResponse
}
return nil
},
}
client := request.New(
request.WithTimeout(30*time.Second),
request.WithRedirectPolicy(redirectFunc),
)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return "", err

11
main.go
View File

@@ -9,7 +9,10 @@ import (
"log"
"net/http"
_ "net/http/pprof" // registers pprof handlers
"os"
"os/signal"
"runtime/debug"
"syscall"
)
func main() {
@@ -35,10 +38,14 @@ func main() {
if err := config.SetConfigPath(configPath); err != nil {
log.Fatal(err)
}
config.GetConfig()
ctx := context.Background()
// Create a context that's cancelled on SIGINT/SIGTERM
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
if err := decypharr.Start(ctx); err != nil {
log.Fatal(err)
}
}

View File

@@ -2,13 +2,13 @@ package arr
import (
"bytes"
"crypto/tls"
"fmt"
"github.com/goccy/go-json"
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/internal/request"
"io"
"net/http"
"strconv"
"strings"
"sync"
"time"
@@ -32,7 +32,7 @@ type Arr struct {
Cleanup bool `json:"cleanup"`
SkipRepair bool `json:"skip_repair"`
DownloadUncached *bool `json:"download_uncached"`
client *http.Client
client *request.Client
}
func New(name, host, token string, cleanup, skipRepair bool, downloadUncached *bool) *Arr {
@@ -44,12 +44,7 @@ func New(name, host, token string, cleanup, skipRepair bool, downloadUncached *b
Cleanup: cleanup,
SkipRepair: skipRepair,
DownloadUncached: downloadUncached,
client: &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
Proxy: http.ProxyFromEnvironment,
},
},
client: request.New(),
}
}
@@ -77,12 +72,7 @@ func (a *Arr) Request(method, endpoint string, payload interface{}) (*http.Respo
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Api-Key", a.Token)
if a.client == nil {
a.client = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
Proxy: http.ProxyFromEnvironment,
},
}
a.client = request.New()
}
var resp *http.Response
@@ -179,3 +169,21 @@ func (as *Storage) GetAll() []*Arr {
}
return arrs
}
func (a *Arr) Refresh() error {
payload := struct {
Name string `json:"name"`
}{
Name: "RefreshMonitoredDownloads",
}
resp, err := a.Request(http.MethodPost, "api/v3/command", payload)
if err == nil && resp != nil {
statusOk := strconv.Itoa(resp.StatusCode)[0] == '2'
if statusOk {
return nil
}
}
return fmt.Errorf("failed to refresh: %v", err)
}

View File

@@ -61,28 +61,34 @@ func (a *Arr) GetMedia(mediaId string) ([]Content, error) {
if err != nil {
continue
}
defer resp.Body.Close()
var ct Content
var seriesFiles []seriesFile
episodeFileIDMap := make(map[int]int)
func() {
defer resp.Body.Close()
if err = json.NewDecoder(resp.Body).Decode(&seriesFiles); err != nil {
continue
return
}
ct := Content{
ct = Content{
Title: d.Title,
Id: d.Id,
}
}()
resp, err = a.Request(http.MethodGet, fmt.Sprintf("api/v3/episode?seriesId=%d", d.Id), nil)
if err != nil {
continue
}
func() {
defer resp.Body.Close()
var episodes []episode
if err = json.NewDecoder(resp.Body).Decode(&episodes); err != nil {
continue
return
}
episodeFileIDMap := make(map[int]int)
for _, e := range episodes {
episodeFileIDMap[e.EpisodeFileID] = e.Id
}
}()
files := make([]ContentFile, 0)
for _, file := range seriesFiles {
eId, ok := episodeFileIDMap[file.Id]
@@ -128,15 +134,16 @@ func GetMovies(a *Arr, tvId string) ([]Content, error) {
}
contents := make([]Content, 0)
for _, movie := range movies {
if movie.MovieFile.Id == 0 || movie.MovieFile.Path == "" {
// Skip movies without files
continue
}
ct := Content{
Title: movie.Title,
Id: movie.Id,
}
files := make([]ContentFile, 0)
if movie.MovieFile.Id == 0 || movie.MovieFile.Path == "" {
// Skip movies without files
continue
}
files = append(files, ContentFile{
FileId: movie.MovieFile.Id,
Id: movie.Id,

View File

@@ -1,25 +0,0 @@
package arr
import (
"fmt"
"net/http"
"strconv"
)
func (a *Arr) Refresh() error {
payload := struct {
Name string `json:"name"`
}{
Name: "RefreshMonitoredDownloads",
}
resp, err := a.Request(http.MethodPost, "api/v3/command", payload)
if err == nil && resp != nil {
statusOk := strconv.Itoa(resp.StatusCode)[0] == '2'
if statusOk {
return nil
}
}
return fmt.Errorf("failed to refresh: %v", err)
}

View File

@@ -1,31 +0,0 @@
package arr
import (
"github.com/goccy/go-json"
"net/http"
url2 "net/url"
)
type TMDBResponse struct {
Page int `json:"page"`
Results []struct {
ID int `json:"id"`
Name string `json:"name"`
MediaType string `json:"media_type"`
PosterPath string `json:"poster_path"`
} `json:"results"`
}
func SearchTMDB(term string) (*TMDBResponse, error) {
resp, err := http.Get("https://api.themoviedb.org/3/search/multi?api_key=key&query=" + url2.QueryEscape(term))
if err != nil {
return nil, err
}
var data *TMDBResponse
if err = json.NewDecoder(resp.Body).Decode(&data); err != nil {
return nil, err
}
return data, nil
}

View File

@@ -1,5 +0,0 @@
package arr
func Readfile(path string) error {
return nil
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/sirrobot01/debrid-blackhole/internal/utils"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"slices"
"strings"
"time"
"net/http"
@@ -22,6 +23,7 @@ type AllDebrid struct {
Name string
Host string `json:"host"`
APIKey string
ExtraAPIKeys []string
DownloadUncached bool
client *request.Client
@@ -319,8 +321,14 @@ func (ad *AllDebrid) GetMountPath() string {
func New(dc config.Debrid) *AllDebrid {
rl := request.ParseRateLimit(dc.RateLimit)
apiKeys := strings.Split(dc.APIKey, ",")
extraKeys := make([]string, 0)
if len(apiKeys) > 1 {
extraKeys = apiKeys[1:]
}
mainKey := apiKeys[0]
headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", dc.APIKey),
"Authorization": fmt.Sprintf("Bearer %s", mainKey),
}
_log := logger.NewLogger(dc.Name)
client := request.New().
@@ -329,7 +337,8 @@ func New(dc config.Debrid) *AllDebrid {
return &AllDebrid{
Name: "alldebrid",
Host: dc.Host,
APIKey: dc.APIKey,
APIKey: mainKey,
ExtraAPIKeys: extraKeys,
DownloadUncached: dc.DownloadUncached,
client: client,
MountPath: dc.Folder,

View File

@@ -82,6 +82,7 @@ type Cache struct {
torrentsRefreshMu sync.RWMutex // for refreshing torrents
saveSemaphore chan struct{}
ctx context.Context
}
func NewCache(dc config.Debrid, client types.Client) *Cache {
@@ -98,13 +99,17 @@ func NewCache(dc config.Debrid, client types.Client) *Cache {
if err != nil {
autoExpiresLinksAfter = time.Hour * 24
}
workers := runtime.NumCPU() * 50
if dc.Workers > 0 {
workers = dc.Workers
}
return &Cache{
dir: filepath.Join(cfg.Path, "cache", dc.Name), // path to save cache files
torrents: xsync.NewMapOf[string, *CachedTorrent](),
torrentsNames: xsync.NewMapOf[string, *CachedTorrent](),
client: client,
logger: logger.NewLogger(fmt.Sprintf("%s-webdav", client.GetName())),
workers: 200,
workers: workers,
downloadLinks: xsync.NewMapOf[string, downloadLinkCache](),
torrentRefreshInterval: torrentRefreshInterval,
downloadLinksRefreshInterval: downloadLinksRefreshInterval,
@@ -113,6 +118,7 @@ func NewCache(dc config.Debrid, client types.Client) *Cache {
autoExpiresLinksAfter: autoExpiresLinksAfter,
repairsInProgress: xsync.NewMapOf[string, bool](),
saveSemaphore: make(chan struct{}, 10),
ctx: context.Background(),
}
}
@@ -159,10 +165,11 @@ func (c *Cache) GetListing() []os.FileInfo {
return nil
}
func (c *Cache) Start() error {
func (c *Cache) Start(ctx context.Context) error {
if err := os.MkdirAll(c.dir, 0755); err != nil {
return fmt.Errorf("failed to create cache directory: %w", err)
}
c.ctx = ctx
if err := c.Sync(); err != nil {
return fmt.Errorf("failed to sync cache: %w", err)
@@ -378,25 +385,19 @@ func (c *Cache) Sync() error {
}
func (c *Cache) sync(torrents []*types.Torrent) error {
// Calculate optimal workers - balance between CPU and IO
workers := runtime.NumCPU() * 50 // A more balanced multiplier for BadgerDB
// Create channels with appropriate buffering
workChan := make(chan *types.Torrent, workers*2)
workChan := make(chan *types.Torrent, min(1000, len(torrents)))
// Use an atomic counter for progress tracking
var processed int64
var errorCount int64
// Create a context with cancellation in case of critical errors
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create a wait group for workers
var wg sync.WaitGroup
// Start workers
for i := 0; i < workers; i++ {
for i := 0; i < c.workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
@@ -418,7 +419,7 @@ func (c *Cache) sync(torrents []*types.Torrent) error {
c.logger.Info().Msgf("Progress: %d/%d torrents processed", count, len(torrents))
}
case <-ctx.Done():
case <-c.ctx.Done():
return // Context cancelled, exit goroutine
}
}
@@ -430,7 +431,7 @@ func (c *Cache) sync(torrents []*types.Torrent) error {
select {
case workChan <- t:
// Work sent successfully
case <-ctx.Done():
case <-c.ctx.Done():
break // Context cancelled
}
}

View File

@@ -1,12 +1,10 @@
package debrid
import (
"context"
"fmt"
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/internal/request"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"golang.org/x/sync/errgroup"
"io"
"net/http"
"os"
@@ -14,6 +12,7 @@ import (
"slices"
"sort"
"strings"
"sync"
"time"
)
@@ -62,7 +61,8 @@ func (c *Cache) refreshListings() {
}
// Atomic store of the complete ready-to-use slice
c.listings.Store(files)
c.resetPropfindResponse()
//c.resetPropfindResponse()
_ = c.RefreshXml()
if err := c.RefreshRclone(); err != nil {
c.logger.Debug().Err(err).Msg("Failed to refresh rclone")
}
@@ -136,7 +136,7 @@ func (c *Cache) refreshTorrents() {
newTorrents := make([]*types.Torrent, 0)
for _, t := range _newTorrents {
if !slices.Contains(deletedTorrents, t.Id) {
newTorrents = append(newTorrents, t) // <-- FIXED: Use newTorrents
newTorrents = append(newTorrents, t)
}
}
@@ -149,28 +149,40 @@ func (c *Cache) refreshTorrents() {
}
c.logger.Info().Msgf("Found %d new torrents", len(newTorrents))
g, ctx := errgroup.WithContext(context.Background())
for _, t := range newTorrents {
t := t
g.Go(func() error {
workChan := make(chan *types.Torrent, min(100, len(newTorrents)))
errChan := make(chan error, len(newTorrents))
var wg sync.WaitGroup
for i := 0; i < c.workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for t := range workChan {
select {
case <-ctx.Done():
return ctx.Err()
case <-c.ctx.Done():
return
default:
}
if err := c.ProcessTorrent(t, true); err != nil {
return err
c.logger.Debug().Err(err).Msgf("Failed to process new torrent %s", t.Id)
errChan <- err
}
return nil
})
}
}()
}
if err := g.Wait(); err != nil {
c.logger.Debug().Err(err).Msg("Failed to process new torrents")
for _, t := range newTorrents {
select {
case <-c.ctx.Done():
break
default:
workChan <- t
}
}
close(workChan)
wg.Wait()
c.logger.Debug().Msgf("Processed %d new torrents", len(newTorrents))
}
func (c *Cache) RefreshRclone() error {

View File

@@ -42,7 +42,7 @@ func (c *Cache) IsTorrentBroken(t *CachedTorrent, filenames []string) bool {
} else {
// Check if file.Link not in the downloadLink Cache
if err := c.client.CheckLink(f.Link); err != nil {
if errors.Is(err, request.ErrLinkBroken) {
if errors.Is(err, request.HosterUnavailableError) {
isBroken = true
break
} else {

View File

@@ -18,7 +18,7 @@ func (c *Cache) RefreshXml() error {
return fmt.Errorf("failed to refresh XML for %s: %v", parent, err)
}
}
c.logger.Debug().Msgf("Refreshed XML cache for %s", c.client.GetName())
c.logger.Trace().Msgf("Refreshed XML cache for %s", c.client.GetName())
return nil
}

View File

@@ -21,6 +21,7 @@ type DebridLink struct {
Name string
Host string `json:"host"`
APIKey string
ExtraAPIKeys []string
DownloadUncached bool
client *request.Client
@@ -260,8 +261,14 @@ func (dl *DebridLink) GetDownloadUncached() bool {
func New(dc config.Debrid) *DebridLink {
rl := request.ParseRateLimit(dc.RateLimit)
apiKeys := strings.Split(dc.APIKey, ",")
extraKeys := make([]string, 0)
if len(apiKeys) > 1 {
extraKeys = apiKeys[1:]
}
mainKey := apiKeys[0]
headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", dc.APIKey),
"Authorization": fmt.Sprintf("Bearer %s", mainKey),
"Content-Type": "application/json",
}
_log := logger.NewLogger(dc.Name)
@@ -271,7 +278,8 @@ func New(dc config.Debrid) *DebridLink {
return &DebridLink{
Name: "debridlink",
Host: dc.Host,
APIKey: dc.APIKey,
APIKey: mainKey,
ExtraAPIKeys: extraKeys,
DownloadUncached: dc.DownloadUncached,
client: client,
MountPath: dc.Folder,

View File

@@ -23,7 +23,10 @@ import (
type RealDebrid struct {
Name string
Host string `json:"host"`
APIKey string
ExtraAPIKeys []string // This is used for bandwidth
DownloadUncached bool
client *request.Client
@@ -262,7 +265,6 @@ func (r *RealDebrid) DeleteTorrent(torrentId string) {
}
func (r *RealDebrid) GenerateDownloadLinks(t *types.Torrent) error {
url := fmt.Sprintf("%s/unrestrict/link/", r.Host)
filesCh := make(chan types.File, len(t.Files))
errCh := make(chan error, len(t.Files))
@@ -273,32 +275,13 @@ func (r *RealDebrid) GenerateDownloadLinks(t *types.Torrent) error {
go func(file types.File) {
defer wg.Done()
payload := gourl.Values{"link": {file.Link}}
req, err := http.NewRequest(http.MethodPost, url, strings.NewReader(payload.Encode()))
link, err := r.GetDownloadLink(t, &file)
if err != nil {
errCh <- err
return
}
resp, err := r.client.Do(req)
if err != nil {
errCh <- err
return
}
if resp.StatusCode == http.StatusServiceUnavailable {
errCh <- request.HosterUnavailableError
return
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
var data UnrestrictResponse
if err = json.Unmarshal(b, &data); err != nil {
errCh <- err
return
}
file.DownloadLink = data.Download
file.DownloadLink = link
filesCh <- file
}(f)
}
@@ -337,12 +320,12 @@ func (r *RealDebrid) CheckLink(link string) error {
return err
}
if resp.StatusCode == http.StatusNotFound {
return request.ErrLinkBroken // File has been removed
return request.HosterUnavailableError // File has been removed
}
return nil
}
func (r *RealDebrid) GetDownloadLink(t *types.Torrent, file *types.File) (string, error) {
func (r *RealDebrid) _getDownloadLink(file *types.File) (string, error) {
url := fmt.Sprintf("%s/unrestrict/link/", r.Host)
payload := gourl.Values{
"link": {file.Link},
@@ -352,8 +335,25 @@ func (r *RealDebrid) GetDownloadLink(t *types.Torrent, file *types.File) (string
if err != nil {
return "", err
}
if resp.StatusCode == http.StatusServiceUnavailable {
return "", request.HosterUnavailableError
if resp.StatusCode != http.StatusOK {
// Read the response body to get the error message
b, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
var data ErrorResponse
if err = json.Unmarshal(b, &data); err != nil {
return "", err
}
switch data.ErrorCode {
case 23:
return "", request.TrafficExceededError
case 24:
return "", request.HosterUnavailableError // Link has been nerfed
default:
return "", fmt.Errorf("realdebrid API error: %d", resp.StatusCode)
}
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
@@ -365,6 +365,23 @@ func (r *RealDebrid) GetDownloadLink(t *types.Torrent, file *types.File) (string
return "", err
}
return data.Download, nil
}
func (r *RealDebrid) GetDownloadLink(t *types.Torrent, file *types.File) (string, error) {
link, err := r._getDownloadLink(file)
if err == nil {
return link, nil
}
for _, key := range r.ExtraAPIKeys {
r.client.SetHeader("Authorization", fmt.Sprintf("Bearer %s", key))
if link, err := r._getDownloadLink(file); err == nil {
return link, nil
}
}
// Reset to main API key
r.client.SetHeader("Authorization", fmt.Sprintf("Bearer %s", r.APIKey))
return "", err
}
func (r *RealDebrid) GetCheckCached() bool {
@@ -431,7 +448,7 @@ func (r *RealDebrid) getTorrents(offset int, limit int) (int, []*types.Torrent,
}
func (r *RealDebrid) GetTorrents() ([]*types.Torrent, error) {
limit := 5000
limit := 1000
// Get first batch and total count
totalItems, firstBatch, err := r.getTorrents(0, limit)
@@ -472,7 +489,7 @@ func (r *RealDebrid) GetTorrents() ([]*types.Torrent, error) {
func (r *RealDebrid) GetDownloads() (map[string]types.DownloadLinks, error) {
links := make(map[string]types.DownloadLinks)
offset := 0
limit := 5000
limit := 1000
for {
dl, err := r._getDownloads(offset, limit)
if err != nil {
@@ -538,8 +555,14 @@ func (r *RealDebrid) GetMountPath() string {
func New(dc config.Debrid) *RealDebrid {
rl := request.ParseRateLimit(dc.RateLimit)
apiKeys := strings.Split(dc.APIKey, ",")
extraKeys := make([]string, 0)
if len(apiKeys) > 1 {
extraKeys = apiKeys[1:]
}
mainKey := apiKeys[0]
headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", dc.APIKey),
"Authorization": fmt.Sprintf("Bearer %s", mainKey),
}
_log := logger.NewLogger(dc.Name)
client := request.New().
@@ -550,7 +573,8 @@ func New(dc config.Debrid) *RealDebrid {
return &RealDebrid{
Name: "realdebrid",
Host: dc.Host,
APIKey: dc.APIKey,
APIKey: mainKey,
ExtraAPIKeys: extraKeys,
DownloadUncached: dc.DownloadUncached,
client: client,
MountPath: dc.Folder,

View File

@@ -134,3 +134,8 @@ type DownloadsResponse struct {
Streamable int `json:"streamable"`
Generated time.Time `json:"generated"`
}
type ErrorResponse struct {
Error string `json:"error"`
ErrorCode int `json:"error_code"`
}

View File

@@ -26,6 +26,7 @@ type Torbox struct {
Name string
Host string `json:"host"`
APIKey string
ExtraAPIKeys []string
DownloadUncached bool
client *request.Client
@@ -34,6 +35,35 @@ type Torbox struct {
CheckCached bool
}
func New(dc config.Debrid) *Torbox {
rl := request.ParseRateLimit(dc.RateLimit)
apiKeys := strings.Split(dc.APIKey, ",")
extraKeys := make([]string, 0)
if len(apiKeys) > 1 {
extraKeys = apiKeys[1:]
}
mainKey := apiKeys[0]
headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", mainKey),
}
_log := logger.NewLogger(dc.Name)
client := request.New().
WithHeaders(headers).
WithRateLimiter(rl).WithLogger(_log)
return &Torbox{
Name: "torbox",
Host: dc.Host,
APIKey: mainKey,
ExtraAPIKeys: extraKeys,
DownloadUncached: dc.DownloadUncached,
client: client,
MountPath: dc.Folder,
logger: _log,
CheckCached: dc.CheckCached,
}
}
func (tb *Torbox) GetName() string {
return tb.Name
}
@@ -312,28 +342,6 @@ func (tb *Torbox) GetDownloadUncached() bool {
return tb.DownloadUncached
}
func New(dc config.Debrid) *Torbox {
rl := request.ParseRateLimit(dc.RateLimit)
headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", dc.APIKey),
}
_log := logger.NewLogger(dc.Name)
client := request.New().
WithHeaders(headers).
WithRateLimiter(rl).WithLogger(_log)
return &Torbox{
Name: "torbox",
Host: dc.Host,
APIKey: dc.APIKey,
DownloadUncached: dc.DownloadUncached,
client: client,
MountPath: dc.Folder,
logger: _log,
CheckCached: dc.CheckCached,
}
}
func (tb *Torbox) GetDownloads() (map[string]types.DownloadLinks, error) {
return nil, nil
}

View File

@@ -1,13 +1,12 @@
package qbit
import (
"crypto/tls"
"fmt"
"github.com/cavaliergopher/grab/v3"
"github.com/sirrobot01/debrid-blackhole/internal/request"
"github.com/sirrobot01/debrid-blackhole/internal/utils"
debrid "github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"io"
"net/http"
"os"
"path/filepath"
"sync"
@@ -92,16 +91,9 @@ func (q *QBit) downloadFiles(torrent *Torrent, parent string) {
}
q.UpdateTorrentMin(torrent, debridTorrent)
}
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
Proxy: http.ProxyFromEnvironment,
}
client := &grab.Client{
UserAgent: "qBitTorrent",
HTTPClient: &http.Client{
Transport: tr,
},
HTTPClient: request.New(request.WithTimeout(0)),
}
for _, file := range debridTorrent.Files {
if file.DownloadLink == "" {

View File

@@ -16,13 +16,11 @@ import (
"net/http"
"net/url"
"os"
"os/signal"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
"syscall"
"time"
)
@@ -38,6 +36,8 @@ type Repair struct {
autoProcess bool
logger zerolog.Logger
filename string
workers int
ctx context.Context
}
func New(arrs *arr.Storage, engine *debrid.Engine) *Repair {
@@ -46,6 +46,10 @@ func New(arrs *arr.Storage, engine *debrid.Engine) *Repair {
if err != nil {
duration = time.Hour * 24
}
workers := runtime.NumCPU() * 20
if cfg.Repair.Workers > 0 {
workers = cfg.Repair.Workers
}
r := &Repair{
arrs: arrs,
logger: logger.NewLogger("repair"),
@@ -56,6 +60,8 @@ func New(arrs *arr.Storage, engine *debrid.Engine) *Repair {
autoProcess: cfg.Repair.AutoProcess,
filename: filepath.Join(cfg.Path, "repair.json"),
deb: engine,
workers: workers,
ctx: context.Background(),
}
if r.ZurgURL != "" {
r.IsZurg = true
@@ -66,6 +72,44 @@ func New(arrs *arr.Storage, engine *debrid.Engine) *Repair {
return r
}
func (r *Repair) Start(ctx context.Context) error {
cfg := config.GetConfig()
r.ctx = ctx
if r.runOnStart {
r.logger.Info().Msgf("Running initial repair")
go func() {
if err := r.AddJob([]string{}, []string{}, r.autoProcess, true); err != nil {
r.logger.Error().Err(err).Msg("Error running initial repair")
}
}()
}
ticker := time.NewTicker(r.duration)
defer ticker.Stop()
r.logger.Info().Msgf("Starting repair worker with %v interval", r.duration)
for {
select {
case <-r.ctx.Done():
r.logger.Info().Msg("Repair worker stopped")
return nil
case t := <-ticker.C:
r.logger.Info().Msgf("Running repair at %v", t.Format("15:04:05"))
if err := r.AddJob([]string{}, []string{}, r.autoProcess, true); err != nil {
r.logger.Error().Err(err).Msg("Error running repair")
}
// If using time-of-day schedule, reset the ticker for next day
if strings.Contains(cfg.Repair.Interval, ":") {
ticker.Reset(r.duration)
}
r.logger.Info().Msgf("Next scheduled repair at %v", t.Add(r.duration).Format("15:04:05"))
}
}
}
type JobStatus string
const (
@@ -196,9 +240,10 @@ func (r *Repair) AddJob(arrsNames []string, mediaIDs []string, autoProcess, recu
job.Recurrent = recurrent
r.reset(job)
r.Jobs[key] = job
r.saveToFile()
go r.saveToFile()
go func() {
if err := r.repair(job); err != nil {
r.logger.Error().Err(err).Msg("Error running repair")
r.logger.Error().Err(err).Msg("Error running repair")
job.FailedAt = time.Now()
job.Error = err.Error()
@@ -215,18 +260,19 @@ func (r *Repair) repair(job *Job) error {
return err
}
// Create a new error group with context
g, ctx := errgroup.WithContext(context.Background())
g.SetLimit(4)
// Use a mutex to protect concurrent access to brokenItems
var mu sync.Mutex
brokenItems := map[string][]arr.ContentFile{}
g, ctx := errgroup.WithContext(r.ctx)
for _, a := range job.Arrs {
a := a // Capture range variable
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
var items []arr.ContentFile
var err error
@@ -238,13 +284,6 @@ func (r *Repair) repair(job *Job) error {
}
} else {
for _, id := range job.MediaIDs {
// Check if any other goroutine has failed
select {
case <-ctx.Done():
return ctx.Err()
default:
}
someItems, err := r.repairArr(job, a, id)
if err != nil {
r.logger.Error().Err(err).Msgf("Error repairing %s with ID %s", a, id)
@@ -313,66 +352,6 @@ func (r *Repair) repair(job *Job) error {
return nil
}
func (r *Repair) Start(ctx context.Context) error {
ctx, stop := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
defer stop()
cfg := config.GetConfig()
if r.runOnStart {
r.logger.Info().Msgf("Running initial repair")
go func() {
if err := r.AddJob([]string{}, []string{}, r.autoProcess, true); err != nil {
r.logger.Error().Err(err).Msg("Error running initial repair")
}
}()
}
ticker := time.NewTicker(r.duration)
defer ticker.Stop()
r.logger.Info().Msgf("Starting repair worker with %v interval", r.duration)
for {
select {
case <-ctx.Done():
r.logger.Info().Msg("Repair worker stopped")
return nil
case t := <-ticker.C:
r.logger.Info().Msgf("Running repair at %v", t.Format("15:04:05"))
if err := r.AddJob([]string{}, []string{}, r.autoProcess, true); err != nil {
r.logger.Error().Err(err).Msg("Error running repair")
}
// If using time-of-day schedule, reset the ticker for next day
if strings.Contains(cfg.Repair.Interval, ":") {
ticker.Reset(r.duration)
}
r.logger.Info().Msgf("Next scheduled repair at %v", t.Add(r.duration).Format("15:04:05"))
}
}
}
func (r *Repair) getUniquePaths(media arr.Content) map[string]string {
// Use zurg setup to check file availability with zurg
// This reduces bandwidth usage significantly
uniqueParents := make(map[string]string)
files := media.Files
for _, file := range files {
target := getSymlinkTarget(file.Path)
if target != "" {
file.IsSymlink = true
dir, f := filepath.Split(target)
parent := filepath.Base(filepath.Clean(dir))
// Set target path folder/file.mkv
file.TargetPath = f
uniqueParents[parent] = target
}
}
return uniqueParents
}
func (r *Repair) repairArr(j *Job, _arr string, tmdbId string) ([]arr.ContentFile, error) {
brokenItems := make([]arr.ContentFile, 0)
a := r.arrs.Get(_arr)
@@ -395,25 +374,21 @@ func (r *Repair) repairArr(j *Job, _arr string, tmdbId string) ([]arr.ContentFil
return brokenItems, nil
}
// Create a new error group
g, ctx := errgroup.WithContext(context.Background())
// Limit concurrent goroutines
g.SetLimit(10)
// Mutex for brokenItems
var mu sync.Mutex
var wg sync.WaitGroup
workerChan := make(chan arr.Content, min(len(media), r.workers))
for _, m := range media {
m := m // Create a new variable scoped to the loop iteration
g.Go(func() error {
// Check if context was canceled
for i := 0; i < r.workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for m := range workerChan {
select {
case <-ctx.Done():
return ctx.Err()
case <-r.ctx.Done():
return
default:
}
items := r.getBrokenFiles(m)
if items != nil {
r.logger.Debug().Msgf("Found %d broken files for %s", len(items), m.Title)
@@ -421,7 +396,6 @@ func (r *Repair) repairArr(j *Job, _arr string, tmdbId string) ([]arr.ContentFil
r.logger.Info().Msgf("Auto processing %d broken items for %s", len(items), m.Title)
// Delete broken items
if err := a.DeleteFiles(items); err != nil {
r.logger.Debug().Msgf("Failed to delete broken items for %s: %v", m.Title, err)
}
@@ -436,18 +410,50 @@ func (r *Repair) repairArr(j *Job, _arr string, tmdbId string) ([]arr.ContentFil
brokenItems = append(brokenItems, items...)
mu.Unlock()
}
return nil
})
}
}()
}
if err := g.Wait(); err != nil {
return brokenItems, err
for _, m := range media {
select {
case <-r.ctx.Done():
break
default:
workerChan <- m
}
}
close(workerChan)
wg.Wait()
if len(brokenItems) == 0 {
r.logger.Info().Msgf("No broken items found for %s", a.Name)
return brokenItems, nil
}
r.logger.Info().Msgf("Repair completed for %s. %d broken items found", a.Name, len(brokenItems))
return brokenItems, nil
}
func (r *Repair) getUniquePaths(media arr.Content) map[string]string {
// Use zurg setup to check file availability with zurg
// This reduces bandwidth usage significantly
uniqueParents := make(map[string]string)
files := media.Files
for _, file := range files {
target := getSymlinkTarget(file.Path)
if target != "" {
file.IsSymlink = true
dir, f := filepath.Split(target)
parent := filepath.Base(filepath.Clean(dir))
// Set target path folder/file.mkv
file.TargetPath = f
uniqueParents[parent] = target
}
}
return uniqueParents
}
func (r *Repair) isMediaAccessible(m arr.Content) bool {
files := m.Files
if len(files) == 0 {
@@ -516,16 +522,14 @@ func (r *Repair) getZurgBrokenFiles(media arr.Content) []arr.ContentFile {
brokenFiles := make([]arr.ContentFile, 0)
uniqueParents := collectFiles(media)
client := &http.Client{
Timeout: 0,
Transport: &http.Transport{
tr := &http.Transport{
TLSHandshakeTimeout: 60 * time.Second,
DialContext: (&net.Dialer{
Timeout: 20 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
},
}
client := request.New(request.WithTimeout(0), request.WithTransport(tr))
// Access zurg url + symlink folder + first file(encoded)
for parent, f := range uniqueParents {
r.logger.Debug().Msgf("Checking %s", parent)
@@ -619,8 +623,7 @@ func (r *Repair) getWebdavBrokenFiles(media arr.Content) []arr.ContentFile {
torrentName := filepath.Clean(filepath.Base(torrentPath))
torrent := cache.GetTorrentByName(torrentName)
if torrent == nil {
r.logger.Debug().Msgf("Torrent not found for %s. Marking as broken", torrentName)
brokenFiles = append(brokenFiles, f...)
r.logger.Debug().Msgf("No torrent found for %s. Skipping", torrentName)
continue
}
files := make([]string, 0)
@@ -692,14 +695,20 @@ func (r *Repair) ProcessJob(id string) error {
return nil
}
// Create a new error group
g := new(errgroup.Group)
g.SetLimit(runtime.NumCPU() * 4)
g, ctx := errgroup.WithContext(r.ctx)
g.SetLimit(r.workers)
for arrName, items := range brokenItems {
items := items
arrName := arrName
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
a := r.arrs.Get(arrName)
if a == nil {
r.logger.Error().Msgf("Arr %s not found", arrName)
@@ -779,5 +788,5 @@ func (r *Repair) DeleteJobs(ids []string) {
}
}
}
r.saveToFile()
go r.saveToFile()
}

View File

@@ -69,7 +69,7 @@ func (wd *WebDav) Start(ctx context.Context) error {
wg.Add(1)
go func(h *Handler) {
defer wg.Done()
if err := h.cache.Start(); err != nil {
if err := h.cache.Start(ctx); err != nil {
select {
case errChan <- err:
default: