diff --git a/Dockerfile b/Dockerfile index 0c1c539..07969cf 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,7 +20,7 @@ RUN --mount=type=cache,target=/go/pkg/mod \ CGO_ENABLED=0 GOOS=$TARGETOS GOARCH=$TARGETARCH \ go build -trimpath \ -ldflags="-w -s -X github.com/sirrobot01/debrid-blackhole/pkg/version.Version=${VERSION} -X github.com/sirrobot01/debrid-blackhole/pkg/version.Channel=${CHANNEL}" \ - -o /blackhole + -o /decypharr # Build healthcheck (optimized) RUN --mount=type=cache,target=/go/pkg/mod \ @@ -43,12 +43,12 @@ FROM gcr.io/distroless/static-debian12:nonroot LABEL version = "${VERSION}-${CHANNEL}" LABEL org.opencontainers.image.source = "https://github.com/sirrobot01/debrid-blackhole" -LABEL org.opencontainers.image.title = "debrid-blackhole" +LABEL org.opencontainers.image.title = "decypharr" LABEL org.opencontainers.image.authors = "sirrobot01" LABEL org.opencontainers.image.documentation = "https://github.com/sirrobot01/debrid-blackhole/blob/main/README.md" # Copy binaries -COPY --from=builder --chown=nonroot:nonroot /blackhole /usr/bin/blackhole +COPY --from=builder --chown=nonroot:nonroot /decypharr /usr/bin/decypharr COPY --from=builder --chown=nonroot:nonroot /healthcheck /usr/bin/healthcheck # Copy pre-made directory structure @@ -63,4 +63,4 @@ USER nonroot:nonroot HEALTHCHECK CMD ["/usr/bin/healthcheck"] -CMD ["/usr/bin/blackhole", "--config", "/app"] \ No newline at end of file +CMD ["/usr/bin/decypharr", "--config", "/app"] \ No newline at end of file diff --git a/README.md b/README.md index f792f86..309336b 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,6 @@ This is an implementation of QbitTorrent with a **Multiple Debrid service suppor - [Proxy Config](#proxy-config) - [Qbittorrent Config](#qbittorrent-config) - [Arrs Config](#arrs-config) -- [Proxy](#proxy) - [Repair Worker](#repair-worker) - [WebDAV](#webdav) - [WebDAV Config](#webdav-config) @@ -131,12 +130,6 @@ This is the default config file. You can create a `config.json` file in the root "folder": "/mnt/remote/realdebrid/__all__/" } ], - "proxy": { - "enabled": false, - "port": "8100", - "username": "username", - "password": "password" - }, "qbittorrent": { "port": "8282", "download_folder": "/mnt/symlinks/", @@ -172,6 +165,7 @@ Full config are [here](doc/config.full.json) - The `name` key is the name of the debrid provider - The `host` key is the API endpoint of the debrid provider - The `api_key` key is the API key of the debrid provider. This can be comma separated for multiple API keys +- The `download_api_keys` key is the API key of the debrid provider. By default, this is the same as the `api_key` key. This is used to download the torrents. This is an array of API keys. This is useful for those using multiple api keys. The API keys are used to download the torrents. - The `folder` key is the folder where your debrid folder is mounted(webdav, rclone, zurg etc). e.g `data/realdebrid/torrents/`, `/media/remote/alldebrid/magnets/` - The `rate_limit` key is the rate limit of the debrid provider(null by default) - The `download_uncached` bool key is used to download uncached torrents(disabled by default) @@ -189,14 +183,6 @@ The `repair` key is used to enable the repair worker - The `zurg_url` is the url of the zurg server. Typically `http://localhost:9999` or `http://zurg:9999` - The `auto_process` is used to automatically process the repair worker. This will delete broken symlinks and re-search for missing files -##### Proxy Config -- The `enabled` key is used to enable the proxy -- The `port` key is the port the proxy will listen on -- The `log_level` key is used to set the log level of the proxy. The default value is `info` -- The `username` and `password` keys are used for basic authentication -- The `cached_only` means only cached torrents will be returned - - ##### Qbittorrent Config - The `port` key is the port the qBittorrent will listen on - The `download_folder` is the folder where the torrents will be downloaded. e.g `/media/symlinks/` @@ -250,14 +236,6 @@ You can use the webdav server with media players like Infuse, VidHub or mount it - The `rc_url`, `rc_user`, `rc_pass` keys are used to trigger a vfs refresh on your rclone. This speeds up the process of getting the files. This is useful for rclone users. T -### Proxy - -#### **Note**: Proxy has stopped working for Real Debrid, Debrid Link, and All Debrid. It still works for Torbox. This is due to the changes in the API of the Debrid Providers. - -The proxy is useful in filtering out un-cached Debrid torrents. -The proxy is a simple HTTP proxy that requires basic authentication. The proxy can be enabled by setting the `proxy.enabled` to `true` in the config file. -The proxy listens on the port `8181` by default. The username and password can be set in the config file. - ### Changelog - View the [CHANGELOG.md](CHANGELOG.md) for the latest changes diff --git a/doc/config.full.json b/doc/config.full.json index af85f3f..fa256c7 100644 --- a/doc/config.full.json +++ b/doc/config.full.json @@ -5,13 +5,17 @@ "host": "https://api.real-debrid.com/rest/1.0", "api_key": "realdebrid_key", "folder": "/mnt/remote/realdebrid/__all__/", + "proxy": "", "rate_limit": "250/minute", "download_uncached": false, "check_cached": false, "use_webdav": true, "torrents_refresh_interval": "15s", "folder_naming": "original_no_ext", - "auto_expire_links_after": "3d" + "auto_expire_links_after": "3d", + "rc_url": "http://your-ip-address:9990", + "rc_user": "your_rclone_rc_user", + "rc_pass": "your_rclone_rc_pass" }, { "name": "torbox", @@ -41,14 +45,6 @@ "check_cached": false } ], - "proxy": { - "enabled": true, - "port": "8100", - "log_level": "info", - "username": "username", - "password": "password", - "cached_only": true - }, "max_cache_size": 1000, "qbittorrent": { "port": "8282", @@ -95,14 +91,5 @@ "max_file_size": "", "allowed_file_types": [], "use_auth": false, - "discord_webhook_url": "https://discord.com/api/webhooks/...", - "webdav": { - "torrents_refresh_interval": "15s", - "download_links_refresh_interval": "1h", - "folder_naming": "original", - "auto_expire_links_after": "24h", - "rc_url": "http://your-ip-address:9990", - "rc_user": "your_rclone_rc_user", - "rc_pass": "your_rclone_rc_pass" - } + "discord_webhook_url": "https://discord.com/api/webhooks/..." } \ No newline at end of file diff --git a/doc/download.png b/doc/download.png deleted file mode 100644 index 25af9fe..0000000 Binary files a/doc/download.png and /dev/null differ diff --git a/doc/main.png b/doc/main.png index 7f7abf5..0510a66 100644 Binary files a/doc/main.png and b/doc/main.png differ diff --git a/internal/config/config.go b/internal/config/config.go index e8fd386..5d10f0e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -7,7 +7,6 @@ import ( "github.com/goccy/go-json" "os" "path/filepath" - "strings" "sync" ) @@ -18,14 +17,15 @@ var ( ) type Debrid struct { - Name string `json:"name"` - Host string `json:"host"` - APIKey string `json:"api_key"` - DownloadAPIKeys string `json:"download_api_keys"` - Folder string `json:"folder"` - DownloadUncached bool `json:"download_uncached"` - CheckCached bool `json:"check_cached"` - RateLimit string `json:"rate_limit"` // 200/minute or 10/second + Name string `json:"name"` + Host string `json:"host"` + APIKey string `json:"api_key"` + DownloadAPIKeys []string `json:"download_api_keys"` + Folder string `json:"folder"` + DownloadUncached bool `json:"download_uncached"` + CheckCached bool `json:"check_cached"` + RateLimit string `json:"rate_limit"` // 200/minute or 10/second + Proxy string `json:"proxy"` UseWebDav bool `json:"use_webdav"` WebDav @@ -192,15 +192,15 @@ func validateDebrids(debrids []Debrid) error { return nil } -func validateQbitTorrent(config *QBitTorrent) error { - if config.DownloadFolder == "" { - return errors.New("qbittorent download folder is required") - } - if _, err := os.Stat(config.DownloadFolder); os.IsNotExist(err) { - return fmt.Errorf("qbittorent download folder(%s) does not exist", config.DownloadFolder) - } - return nil -} +//func validateQbitTorrent(config *QBitTorrent) error { +// if config.DownloadFolder == "" { +// return errors.New("qbittorent download folder is required") +// } +// if _, err := os.Stat(config.DownloadFolder); os.IsNotExist(err) { +// return fmt.Errorf("qbittorent download folder(%s) does not exist", config.DownloadFolder) +// } +// return nil +//} func validateConfig(config *Config) error { // Run validations concurrently @@ -299,15 +299,9 @@ func (c *Config) NeedsSetup() bool { func (c *Config) updateDebrid(d Debrid) Debrid { - downloadAPIKeys := strings.Split(d.DownloadAPIKeys, ",") - newApiKeys := make([]string, 0, len(downloadAPIKeys)) - for _, key := range downloadAPIKeys { - key = strings.TrimSpace(key) - if key != "" { - newApiKeys = append(newApiKeys, key) - } + if len(d.DownloadAPIKeys) == 0 { + d.DownloadAPIKeys = append(d.DownloadAPIKeys, d.APIKey) } - d.DownloadAPIKeys = strings.Join(newApiKeys, ",") if !d.UseWebDav { return d diff --git a/internal/request/request.go b/internal/request/request.go index 9f51d4e..199f4e5 100644 --- a/internal/request/request.go +++ b/internal/request/request.go @@ -3,15 +3,18 @@ package request import ( "bytes" "compress/gzip" + "context" "crypto/tls" "fmt" "github.com/goccy/go-json" "github.com/rs/zerolog" "github.com/sirrobot01/debrid-blackhole/internal/logger" + "golang.org/x/net/proxy" "golang.org/x/time/rate" "io" "math" "math/rand" + "net" "net/http" "net/url" "regexp" @@ -52,11 +55,13 @@ type Client struct { client *http.Client rateLimiter *rate.Limiter headers map[string]string + headersMu sync.RWMutex maxRetries int timeout time.Duration skipTLSVerify bool retryableStatus map[int]bool logger zerolog.Logger + proxy string } // WithMaxRetries sets the maximum number of retry attempts @@ -89,12 +94,16 @@ func WithRateLimiter(rl *rate.Limiter) ClientOption { // WithHeaders sets default headers func WithHeaders(headers map[string]string) ClientOption { return func(c *Client) { + c.headersMu.Lock() c.headers = headers + c.headersMu.Unlock() } } func (c *Client) SetHeader(key, value string) { + c.headersMu.Lock() c.headers[key] = value + c.headersMu.Unlock() } func WithLogger(logger zerolog.Logger) ClientOption { @@ -118,6 +127,12 @@ func WithRetryableStatus(statusCodes ...int) ClientOption { } } +func WithProxy(proxyURL string) ClientOption { + return func(c *Client) { + c.proxy = proxyURL + } +} + // doRequest performs a single HTTP request with rate limiting func (c *Client) doRequest(req *http.Request) (*http.Response, error) { if c.rateLimiter != nil { @@ -154,11 +169,13 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { } // Apply headers + c.headersMu.RLock() if c.headers != nil { for key, value := range c.headers { req.Header.Set(key, value) } } + c.headersMu.RUnlock() resp, err = c.doRequest(req) if err != nil { @@ -256,25 +273,67 @@ func New(options ...ClientOption) *Client { }, logger: logger.New("request"), timeout: 60 * time.Second, + proxy: "", + headers: make(map[string]string), // Initialize headers map } - // Apply options + // default http client + client.client = &http.Client{ + Timeout: client.timeout, + } + + // Apply options before configuring transport for _, option := range options { option(client) } - // Create transport - transport := &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: client.skipTLSVerify, - }, - Proxy: http.ProxyFromEnvironment, - } + // Check if transport was set by WithTransport option + if client.client.Transport == nil { + // No custom transport provided, create the default one + transport := &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: client.skipTLSVerify, + }, + } - // Create HTTP client - client.client = &http.Client{ - Transport: transport, - Timeout: client.timeout, + // Configure proxy if needed + if client.proxy != "" { + if strings.HasPrefix(client.proxy, "socks5://") { + // Handle SOCKS5 proxy + socksURL, err := url.Parse(client.proxy) + if err != nil { + client.logger.Error().Msgf("Failed to parse SOCKS5 proxy URL: %v", err) + } else { + auth := &proxy.Auth{} + if socksURL.User != nil { + auth.User = socksURL.User.Username() + password, _ := socksURL.User.Password() + auth.Password = password + } + + dialer, err := proxy.SOCKS5("tcp", socksURL.Host, auth, proxy.Direct) + if err != nil { + client.logger.Error().Msgf("Failed to create SOCKS5 dialer: %v", err) + } else { + transport.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) { + return dialer.Dial(network, addr) + } + } + } + } else { + proxyURL, err := url.Parse(client.proxy) + if err != nil { + client.logger.Error().Msgf("Failed to parse proxy URL: %v", err) + } else { + transport.Proxy = http.ProxyURL(proxyURL) + } + } + } else { + transport.Proxy = http.ProxyFromEnvironment + } + + // Set the transport to the client + client.client.Transport = transport } return client diff --git a/internal/utils/regex.go b/internal/utils/regex.go index cffe521..44f56c6 100644 --- a/internal/utils/regex.go +++ b/internal/utils/regex.go @@ -11,7 +11,7 @@ var ( MUSICMATCH = "(?i)(\\.)(mp2|mp3|m4a|m4b|m4p|ogg|oga|opus|wma|wav|wv|flac|ape|aif|aiff|aifc)$" ) -var SAMPLEMATCH = `(?i)(^|[\\/]|\s|[(])(?:sample|trailer|thumb|special|extras?)s?([\s._\-/)]|$)` +var SAMPLEMATCH = `(?i)(^|[\\/])(sample|trailer|thumb|special|extras?)s?([\s._-]|$|/)|(\(sample\))|(-\s*sample)` func RegexMatch(regex string, value string) bool { re := regexp.MustCompile(regex) diff --git a/pkg/debrid/alldebrid/alldebrid.go b/pkg/debrid/alldebrid/alldebrid.go index 0cb879c..6ddb922 100644 --- a/pkg/debrid/alldebrid/alldebrid.go +++ b/pkg/debrid/alldebrid/alldebrid.go @@ -14,7 +14,6 @@ import ( "path/filepath" "slices" "strconv" - "strings" "sync" "time" ) @@ -23,7 +22,7 @@ type AllDebrid struct { Name string Host string `json:"host"` APIKey string - ExtraAPIKeys []string + DownloadKeys []string DownloadUncached bool client *request.Client @@ -34,26 +33,22 @@ type AllDebrid struct { func New(dc config.Debrid) *AllDebrid { rl := request.ParseRateLimit(dc.RateLimit) - apiKeys := strings.Split(dc.APIKey, ",") - extraKeys := make([]string, 0) - if len(apiKeys) > 1 { - extraKeys = apiKeys[1:] - } - mainKey := apiKeys[0] + headers := map[string]string{ - "Authorization": fmt.Sprintf("Bearer %s", mainKey), + "Authorization": fmt.Sprintf("Bearer %s", dc.APIKey), } _log := logger.New(dc.Name) client := request.New( request.WithHeaders(headers), request.WithLogger(_log), request.WithRateLimiter(rl), + request.WithProxy(dc.Proxy), ) return &AllDebrid{ Name: "alldebrid", Host: dc.Host, - APIKey: mainKey, - ExtraAPIKeys: extraKeys, + APIKey: dc.APIKey, + DownloadKeys: dc.DownloadAPIKeys, DownloadUncached: dc.DownloadUncached, client: client, MountPath: dc.Folder, @@ -256,7 +251,7 @@ func (ad *AllDebrid) GenerateDownloadLinks(t *types.Torrent) error { for _, file := range t.Files { go func(file types.File) { defer wg.Done() - link, err := ad.GetDownloadLink(t, &file) + link, err := ad.GetDownloadLink(t, &file, 0) if err != nil { errCh <- err return @@ -291,7 +286,7 @@ func (ad *AllDebrid) GenerateDownloadLinks(t *types.Torrent) error { return nil } -func (ad *AllDebrid) GetDownloadLink(t *types.Torrent, file *types.File) (string, error) { +func (ad *AllDebrid) GetDownloadLink(t *types.Torrent, file *types.File, index int) (string, error) { url := fmt.Sprintf("%s/link/unlock", ad.Host) query := gourl.Values{} query.Add("link", file.Link) @@ -367,3 +362,7 @@ func (ad *AllDebrid) CheckLink(link string) error { func (ad *AllDebrid) GetMountPath() string { return ad.MountPath } + +func (ad *AllDebrid) GetDownloadKeys() []string { + return ad.DownloadKeys +} diff --git a/pkg/debrid/debrid/cache.go b/pkg/debrid/debrid/cache.go index 579b521..c21de31 100644 --- a/pkg/debrid/debrid/cache.go +++ b/pkg/debrid/debrid/cache.go @@ -46,6 +46,7 @@ type CachedTorrent struct { type downloadLinkCache struct { Link string + KeyIndex int ExpiresAt time.Time } @@ -68,12 +69,13 @@ type Cache struct { client types.Client logger zerolog.Logger - torrents *xsync.MapOf[string, *CachedTorrent] // key: torrent.Id, value: *CachedTorrent - torrentsNames *xsync.MapOf[string, *CachedTorrent] // key: torrent.Name, value: torrent - listings atomic.Value - downloadLinks *xsync.MapOf[string, downloadLinkCache] - PropfindResp *xsync.MapOf[string, PropfindResponse] - folderNaming WebDavFolderNaming + torrents *xsync.MapOf[string, *CachedTorrent] // key: torrent.Id, value: *CachedTorrent + torrentsNames *xsync.MapOf[string, *CachedTorrent] // key: torrent.Name, value: torrent + listings atomic.Value + downloadLinks *xsync.MapOf[string, downloadLinkCache] + invalidDownloadLinks *xsync.MapOf[string, string] + PropfindResp *xsync.MapOf[string, PropfindResponse] + folderNaming WebDavFolderNaming // repair repairChan chan RepairRequest @@ -116,6 +118,7 @@ func New(dc config.Debrid, client types.Client) *Cache { dir: filepath.Join(cfg.Path, "cache", dc.Name), // path to save cache files torrents: xsync.NewMapOf[string, *CachedTorrent](), torrentsNames: xsync.NewMapOf[string, *CachedTorrent](), + invalidDownloadLinks: xsync.NewMapOf[string, string](), client: client, logger: logger.New(fmt.Sprintf("%s-webdav", client.GetName())), workers: workers, @@ -161,6 +164,8 @@ func (c *Cache) Start(ctx context.Context) error { func (c *Cache) load() (map[string]*CachedTorrent, error) { torrents := make(map[string]*CachedTorrent) + var results sync.Map + if err := os.MkdirAll(c.dir, 0755); err != nil { return torrents, fmt.Errorf("failed to create cache directory: %w", err) } @@ -170,54 +175,102 @@ func (c *Cache) load() (map[string]*CachedTorrent, error) { return torrents, fmt.Errorf("failed to read cache directory: %w", err) } - now := time.Now() + // Get only json files + var jsonFiles []os.DirEntry for _, file := range files { - fileName := file.Name() - if file.IsDir() || filepath.Ext(fileName) != ".json" { - continue - } - - filePath := filepath.Join(c.dir, fileName) - data, err := os.ReadFile(filePath) - if err != nil { - c.logger.Debug().Err(err).Msgf("Failed to read file: %s", filePath) - continue - } - - var ct CachedTorrent - if err := json.Unmarshal(data, &ct); err != nil { - c.logger.Debug().Err(err).Msgf("Failed to unmarshal file: %s", filePath) - continue - } - isComplete := true - if len(ct.Files) != 0 { - // Check if all files are valid, if not, delete the file.json and remove from cache. - for _, f := range ct.Files { - if !f.IsValid() { - isComplete = false - break - } - } - if isComplete { - addedOn, err := time.Parse(time.RFC3339, ct.Added) - if err != nil { - addedOn = now - } - ct.AddedOn = addedOn - ct.IsComplete = true - torrents[ct.Id] = &ct - } else { - // Delete the file if it's not complete - _ = os.Remove(filePath) - } - + if !file.IsDir() && filepath.Ext(file.Name()) == ".json" { + jsonFiles = append(jsonFiles, file) } } + if len(jsonFiles) == 0 { + return torrents, nil + } + + // Create channels with appropriate buffering + workChan := make(chan os.DirEntry, min(c.workers, len(jsonFiles))) + + // Create a wait group for workers + var wg sync.WaitGroup + + // Start workers + for i := 0; i < c.workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + now := time.Now() + + for { + file, ok := <-workChan + if !ok { + return // Channel closed, exit goroutine + } + + fileName := file.Name() + filePath := filepath.Join(c.dir, fileName) + data, err := os.ReadFile(filePath) + if err != nil { + c.logger.Debug().Err(err).Msgf("Failed to read file: %s", filePath) + continue + } + + var ct CachedTorrent + if err := json.Unmarshal(data, &ct); err != nil { + c.logger.Debug().Err(err).Msgf("Failed to unmarshal file: %s", filePath) + continue + } + + isComplete := true + if len(ct.Files) != 0 { + // Check if all files are valid, if not, delete the file.json and remove from cache. + for _, f := range ct.Files { + if f.Link == "" { + isComplete = false + break + } + } + + if isComplete { + addedOn, err := time.Parse(time.RFC3339, ct.Added) + if err != nil { + addedOn = now + } + ct.AddedOn = addedOn + ct.IsComplete = true + results.Store(ct.Id, &ct) + } else { + // Delete the file if it's not complete + _ = os.Remove(filePath) + } + } + } + }() + } + + // Feed work to workers + for _, file := range jsonFiles { + workChan <- file + } + + // Signal workers that no more work is coming + close(workChan) + + // Wait for all workers to complete + wg.Wait() + + // Convert sync.Map to regular map + results.Range(func(key, value interface{}) bool { + id, _ := key.(string) + torrent, _ := value.(*CachedTorrent) + torrents[id] = torrent + return true + }) + return torrents, nil } func (c *Cache) Sync() error { + defer c.logger.Info().Msg("WebDav server sync complete") cachedTorrents, err := c.load() if err != nil { c.logger.Debug().Err(err).Msg("Failed to load cache") @@ -486,38 +539,45 @@ func (c *Cache) saveTorrent(id string, data []byte) { } func (c *Cache) ProcessTorrent(t *types.Torrent, refreshRclone bool) error { - if len(t.Files) == 0 { + + isComplete := func(files map[string]types.File) bool { + _complete := len(files) > 0 + for _, file := range files { + if file.Link == "" { + _complete = false + break + } + } + return _complete + } + + if !isComplete(t.Files) { if err := c.client.UpdateTorrent(t); err != nil { return fmt.Errorf("failed to update torrent: %w", err) } } - // Validate each file in the torrent - isComplete := true - for _, file := range t.Files { - if file.Link == "" { - isComplete = false - continue - } - } - if !isComplete { - c.logger.Debug().Msgf("Torrent %s is not complete, missing link for file %s. Triggering a reinsert", t.Id) - if err := c.ReInsertTorrent(t); err != nil { - c.logger.Error().Err(err).Msgf("Failed to reinsert torrent %s", t.Id) - return fmt.Errorf("failed to reinsert torrent: %w", err) - } - } + if !isComplete(t.Files) { + c.logger.Debug().Msgf("Torrent %s is still not complete. Triggering a reinsert(disabled)", t.Id) + //ct, err := c.reInsertTorrent(t) + //if err != nil { + // c.logger.Debug().Err(err).Msgf("Failed to reinsert torrent %s", t.Id) + // return err + //} + //c.logger.Debug().Msgf("Reinserted torrent %s", ct.Id) - addedOn, err := time.Parse(time.RFC3339, t.Added) - if err != nil { - addedOn = time.Now() + } else { + addedOn, err := time.Parse(time.RFC3339, t.Added) + if err != nil { + addedOn = time.Now() + } + ct := &CachedTorrent{ + Torrent: t, + IsComplete: len(t.Files) > 0, + AddedOn: addedOn, + } + c.setTorrent(ct) } - ct := &CachedTorrent{ - Torrent: t, - IsComplete: len(t.Files) > 0, - AddedOn: addedOn, - } - c.setTorrent(ct) if refreshRclone { c.refreshListings() @@ -525,7 +585,7 @@ func (c *Cache) ProcessTorrent(t *types.Torrent, refreshRclone bool) error { return nil } -func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) string { +func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string, index int) string { // Check link cache if dl := c.checkDownloadLink(fileLink); dl != "" { @@ -548,44 +608,60 @@ func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) string { } } + // If file.Link is still empty, return + if file.Link == "" { + c.logger.Debug().Msgf("File link is empty for %s. Release is probably nerfed", filename) + // Try to reinsert the torrent? + ct, err := c.reInsertTorrent(ct.Torrent) + if err != nil { + c.logger.Debug().Err(err).Msgf("Failed to reinsert torrent %s", ct.Name) + return "" + } + file = ct.Files[filename] + c.logger.Debug().Msgf("Reinserted torrent %s", ct.Name) + } + c.logger.Trace().Msgf("Getting download link for %s", filename) - downloadLink, err := c.client.GetDownloadLink(ct.Torrent, &file) + downloadLink, err := c.client.GetDownloadLink(ct.Torrent, &file, index) if err != nil { if errors.Is(err, request.HosterUnavailableError) { - c.logger.Debug().Err(err).Msgf("Hoster is unavailable for %s/%s", ct.Name, filename) + c.logger.Debug().Err(err).Msgf("Hoster is unavailable. Triggering repair for %s", ct.Name) + ct, err := c.reInsertTorrent(ct.Torrent) + if err != nil { + c.logger.Debug().Err(err).Msgf("Failed to reinsert torrent %s", ct.Name) + return "" + } + c.logger.Debug().Msgf("Reinserted torrent %s", ct.Name) + file = ct.Files[filename] + // Retry getting the download link + downloadLink, err = c.client.GetDownloadLink(ct.Torrent, &file, index) + if err != nil { + c.logger.Debug().Err(err).Msgf("Failed to get download link for %s", file.Link) + return "" + } + if downloadLink == "" { + c.logger.Debug().Msgf("Download link is empty for %s", file.Link) + return "" + } + file.DownloadLink = downloadLink + file.Generated = time.Now() + ct.Files[filename] = file + go func() { + c.updateDownloadLink(file.Link, downloadLink, 0) + c.setTorrent(ct) + }() + return file.DownloadLink + } else { + c.logger.Debug().Err(err).Msgf("Failed to get download link for %s", file.Link) return "" - // This code is commented iut due to the fact that if a torrent link is uncached, it's likely that we can't redownload it again - // Do not attempt to repair the torrent if the hoster is unavailable - // Check link here?? - //c.logger.Debug().Err(err).Msgf("Hoster is unavailable. Triggering repair for %s", ct.Name) - //if err := c.repairTorrent(ct); err != nil { - // c.logger.Error().Err(err).Msgf("Failed to trigger repair for %s", ct.Name) - // return "" - //} - //// Generate download link for the file then - //f := ct.Files[filename] - //downloadLink, _ = c.client.GetDownloadLink(ct.Torrent, &f) - //f.DownloadLink = downloadLink - //file.Generated = time.Now() - //ct.Files[filename] = f - //c.updateDownloadLink(file.Link, downloadLink) - // - //go func() { - // go c.setTorrent(ct) - //}() - // - //return downloadLink // Gets download link in the next pass } - - c.logger.Debug().Err(err).Msgf("Failed to get download link for :%s", file.Link) - return "" } file.DownloadLink = downloadLink file.Generated = time.Now() ct.Files[filename] = file go func() { - c.updateDownloadLink(file.Link, downloadLink) + c.updateDownloadLink(file.Link, downloadLink, 0) c.setTorrent(ct) }() return file.DownloadLink @@ -596,7 +672,7 @@ func (c *Cache) GenerateDownloadLinks(t *CachedTorrent) { c.logger.Error().Err(err).Msg("Failed to generate download links") } for _, file := range t.Files { - c.updateDownloadLink(file.Link, file.DownloadLink) + c.updateDownloadLink(file.Link, file.DownloadLink, 0) } c.SaveTorrent(t) @@ -624,22 +700,39 @@ func (c *Cache) AddTorrent(t *types.Torrent) error { } -func (c *Cache) updateDownloadLink(link, downloadLink string) { +func (c *Cache) updateDownloadLink(link, downloadLink string, keyIndex int) { c.downloadLinks.Store(link, downloadLinkCache{ Link: downloadLink, - ExpiresAt: time.Now().Add(c.autoExpiresLinksAfter), // Expires in 24 hours + ExpiresAt: time.Now().Add(c.autoExpiresLinksAfter), + KeyIndex: keyIndex, }) } func (c *Cache) checkDownloadLink(link string) string { if dl, ok := c.downloadLinks.Load(link); ok { - if dl.ExpiresAt.After(time.Now()) { + if dl.ExpiresAt.After(time.Now()) && !c.IsDownloadLinkInvalid(dl.Link) { return dl.Link } } return "" } +func (c *Cache) RemoveDownloadLink(link string) { + c.downloadLinks.Delete(link) +} + +func (c *Cache) MarkDownloadLinkAsInvalid(downloadLink, reason string) { + c.invalidDownloadLinks.Store(downloadLink, reason) +} + +func (c *Cache) IsDownloadLinkInvalid(downloadLink string) bool { + if reason, ok := c.invalidDownloadLinks.Load(downloadLink); ok { + c.logger.Debug().Msgf("Download link %s is invalid: %s", downloadLink, reason) + return true + } + return false +} + func (c *Cache) GetClient() types.Client { return c.client } @@ -688,3 +781,7 @@ func (c *Cache) OnRemove(torrentId string) { func (c *Cache) GetLogger() zerolog.Logger { return c.logger } + +func (c *Cache) TotalDownloadKeys() int { + return len(c.client.GetDownloadKeys()) +} diff --git a/pkg/debrid/debrid/refresh.go b/pkg/debrid/debrid/refresh.go index 89ab585..e7d78c1 100644 --- a/pkg/debrid/debrid/refresh.go +++ b/pkg/debrid/debrid/refresh.go @@ -8,7 +8,6 @@ import ( "io" "net/http" "os" - "path/filepath" "slices" "sort" "strings" @@ -67,29 +66,6 @@ func (c *Cache) refreshListings() { } } -func (c *Cache) resetPropfindResponse() { - // Right now, parents are hardcoded - parents := []string{"__all__", "torrents"} - // Reset only the parent directories - // Convert the parents to a keys - // This is a bit hacky, but it works - // Instead of deleting all the keys, we only delete the parent keys, e.g __all__/ or torrents/ - keys := make([]string, 0, len(parents)) - for _, p := range parents { - // Construct the key - // construct url - url := filepath.Clean(filepath.Join("/webdav", c.client.GetName(), p)) - key0 := fmt.Sprintf("propfind:%s:0", url) - key1 := fmt.Sprintf("propfind:%s:1", url) - keys = append(keys, key0, key1) - } - - // Delete the keys - for _, k := range keys { - c.PropfindResp.Delete(k) - } -} - func (c *Cache) refreshTorrents() { if c.torrentsRefreshMu.TryLock() { defer c.torrentsRefreshMu.Unlock() @@ -127,7 +103,7 @@ func (c *Cache) refreshTorrents() { // Check for deleted torrents deletedTorrents := make([]string, 0) - for id, _ := range torrents { + for id := range torrents { if _, ok := idStore[id]; !ok { deletedTorrents = append(deletedTorrents, id) } @@ -264,8 +240,7 @@ func (c *Cache) refreshDownloadLinks() { ExpiresAt: v.Generated.Add(c.autoExpiresLinksAfter - timeSince), }) } else { - //c.downloadLinks.Delete(k) don't delete, just log - c.logger.Trace().Msgf("Download link for %s expired", k) + c.downloadLinks.Delete(k) } } diff --git a/pkg/debrid/debrid/repair.go b/pkg/debrid/debrid/repair.go index 9c5fbea..276a1fd 100644 --- a/pkg/debrid/debrid/repair.go +++ b/pkg/debrid/debrid/repair.go @@ -3,6 +3,7 @@ package debrid import ( "errors" "fmt" + "github.com/puzpuzpuz/xsync/v3" "github.com/sirrobot01/debrid-blackhole/internal/request" "github.com/sirrobot01/debrid-blackhole/internal/utils" "github.com/sirrobot01/debrid-blackhole/pkg/debrid/types" @@ -35,6 +36,8 @@ func (c *Cache) IsTorrentBroken(t *CachedTorrent, filenames []string) bool { } } + files = t.Files + for _, f := range files { // Check if file link is still missing if f.Link == "" { @@ -46,11 +49,7 @@ func (c *Cache) IsTorrentBroken(t *CachedTorrent, filenames []string) bool { if errors.Is(err, request.HosterUnavailableError) { isBroken = true break - } else { - // This might just be a temporary error } - } else { - // Generate a new download link? } } } @@ -59,70 +58,48 @@ func (c *Cache) IsTorrentBroken(t *CachedTorrent, filenames []string) bool { func (c *Cache) repairWorker() { // This watches a channel for torrents to repair - for { - select { - case req := <-c.repairChan: - torrentId := req.TorrentID - if _, inProgress := c.repairsInProgress.Load(torrentId); inProgress { - c.logger.Debug().Str("torrentId", torrentId).Msg("Skipping duplicate repair request") - continue - } - - // Mark as in progress - c.repairsInProgress.Store(torrentId, true) - c.logger.Debug().Str("torrentId", req.TorrentID).Msg("Received repair request") - - // Get the torrent from the cache - cachedTorrent, ok := c.torrents.Load(torrentId) - if !ok || cachedTorrent == nil { - c.logger.Warn().Str("torrentId", torrentId).Msg("Torrent not found in cache") - continue - } - - switch req.Type { - case RepairTypeReinsert: - c.logger.Debug().Str("torrentId", torrentId).Msg("Reinserting torrent") - c.reInsertTorrent(cachedTorrent) - case RepairTypeDelete: - c.logger.Debug().Str("torrentId", torrentId).Msg("Deleting torrent") - if err := c.DeleteTorrent(torrentId); err != nil { - c.logger.Error().Err(err).Str("torrentId", torrentId).Msg("Failed to delete torrent") - continue - } - - } - c.repairsInProgress.Delete(torrentId) + for req := range c.repairChan { + torrentId := req.TorrentID + if _, inProgress := c.repairsInProgress.Load(torrentId); inProgress { + c.logger.Debug().Str("torrentId", torrentId).Msg("Skipping duplicate repair request") + continue } + + // Mark as in progress + c.repairsInProgress.Store(torrentId, true) + c.logger.Debug().Str("torrentId", req.TorrentID).Msg("Received repair request") + + // Get the torrent from the cache + cachedTorrent, ok := c.torrents.Load(torrentId) + if !ok || cachedTorrent == nil { + c.logger.Warn().Str("torrentId", torrentId).Msg("Torrent not found in cache") + continue + } + + switch req.Type { + case RepairTypeReinsert: + c.logger.Debug().Str("torrentId", torrentId).Msg("Reinserting torrent") + var err error + cachedTorrent, err = c.reInsertTorrent(cachedTorrent.Torrent) + if err != nil { + c.logger.Error().Err(err).Str("torrentId", cachedTorrent.Id).Msg("Failed to reinsert torrent") + continue + } + case RepairTypeDelete: + c.logger.Debug().Str("torrentId", torrentId).Msg("Deleting torrent") + if err := c.DeleteTorrent(torrentId); err != nil { + c.logger.Error().Err(err).Str("torrentId", torrentId).Msg("Failed to delete torrent") + continue + } + } + c.repairsInProgress.Delete(torrentId) } } -func (c *Cache) reInsertTorrent(t *CachedTorrent) { - // Reinsert the torrent into the cache - c.torrents.Store(t.Id, t) - c.logger.Debug().Str("torrentId", t.Id).Msg("Reinserted torrent into cache") -} - -func (c *Cache) submitForRepair(repairType RepairType, torrentId, fileName string) { - // Submitting a torrent for repair.Not used yet - - // Check if already in progress before even submitting - if _, inProgress := c.repairsInProgress.Load(torrentId); inProgress { - c.logger.Debug().Str("torrentID", torrentId).Msg("Repair already in progress") - return - } - - select { - case c.repairChan <- RepairRequest{TorrentID: torrentId, FileName: fileName}: - c.logger.Debug().Str("torrentID", torrentId).Msg("Submitted for repair") - default: - c.logger.Warn().Str("torrentID", torrentId).Msg("Repair channel full, skipping repair request") - } -} - -func (c *Cache) ReInsertTorrent(torrent *types.Torrent) error { +func (c *Cache) reInsertTorrent(torrent *types.Torrent) (*CachedTorrent, error) { // Check if Magnet is not empty, if empty, reconstruct the magnet if _, ok := c.repairsInProgress.Load(torrent.Id); ok { - return fmt.Errorf("repair already in progress for torrent %s", torrent.Id) + return nil, fmt.Errorf("repair already in progress for torrent %s", torrent.Id) } if torrent.Magnet == nil { @@ -130,8 +107,12 @@ func (c *Cache) ReInsertTorrent(torrent *types.Torrent) error { } oldID := torrent.Id - defer c.repairsInProgress.Delete(oldID) - defer c.DeleteTorrent(oldID) + defer func() { + err := c.DeleteTorrent(oldID) + if err != nil { + c.logger.Error().Err(err).Str("torrentId", oldID).Msg("Failed to delete old torrent") + } + }() // Submit the magnet to the debrid service torrent.Id = "" @@ -139,12 +120,12 @@ func (c *Cache) ReInsertTorrent(torrent *types.Torrent) error { torrent, err = c.client.SubmitMagnet(torrent) if err != nil { // Remove the old torrent from the cache and debrid service - return fmt.Errorf("failed to submit magnet: %w", err) + return nil, fmt.Errorf("failed to submit magnet: %w", err) } // Check if the torrent was submitted if torrent == nil || torrent.Id == "" { - return fmt.Errorf("failed to submit magnet: empty torrent") + return nil, fmt.Errorf("failed to submit magnet: empty torrent") } torrent.DownloadUncached = false // Set to false, avoid re-downloading torrent, err = c.client.CheckStatus(torrent, true) @@ -152,24 +133,22 @@ func (c *Cache) ReInsertTorrent(torrent *types.Torrent) error { // Torrent is likely in progress _ = c.DeleteTorrent(torrent.Id) - return fmt.Errorf("failed to check status: %w", err) + return nil, fmt.Errorf("failed to check status: %w", err) } if torrent == nil { - return fmt.Errorf("failed to check status: empty torrent") - } - - for _, file := range torrent.Files { - if file.Link == "" { - c.logger.Debug().Msgf("Torrent %s is still not complete, missing link for file %s.", torrent.Name, file.Name) - // Delete the torrent from the cache - _ = c.DeleteTorrent(torrent.Id) - return fmt.Errorf("torrent %s is still not complete, missing link for file %s", torrent.Name, file.Name) - } + return nil, fmt.Errorf("failed to check status: empty torrent") } // Update the torrent in the cache addedOn, err := time.Parse(time.RFC3339, torrent.Added) + for _, f := range torrent.Files { + if f.Link == "" { + // Delete the new torrent + _ = c.DeleteTorrent(torrent.Id) + return nil, fmt.Errorf("failed to reinsert torrent: empty link") + } + } if err != nil { addedOn = time.Now() } @@ -180,5 +159,17 @@ func (c *Cache) ReInsertTorrent(torrent *types.Torrent) error { } c.setTorrent(ct) c.refreshListings() + return ct, nil +} + +func (c *Cache) refreshDownloadLink(link string) error { + // A generated download link has being limited + // Generate a new one with other API keys + // Temporarily remove the old one return nil } + +func (c *Cache) resetDownloadLinks() { + c.invalidDownloadLinks = xsync.NewMapOf[string, string]() + c.downloadLinks = xsync.NewMapOf[string, downloadLinkCache]() +} diff --git a/pkg/debrid/debrid/worker.go b/pkg/debrid/debrid/worker.go index 8423159..b4c6c3f 100644 --- a/pkg/debrid/debrid/worker.go +++ b/pkg/debrid/debrid/worker.go @@ -13,11 +13,8 @@ func (c *Cache) refreshDownloadLinksWorker() { refreshTicker := time.NewTicker(c.downloadLinksRefreshInterval) defer refreshTicker.Stop() - for { - select { - case <-refreshTicker.C: - c.refreshDownloadLinks() - } + for range refreshTicker.C { + c.refreshDownloadLinks() } } @@ -25,10 +22,7 @@ func (c *Cache) refreshTorrentsWorker() { refreshTicker := time.NewTicker(c.torrentRefreshInterval) defer refreshTicker.Stop() - for { - select { - case <-refreshTicker.C: - c.refreshTorrents() - } + for range refreshTicker.C { + c.refreshTorrents() } } diff --git a/pkg/debrid/debrid_link/debrid_link.go b/pkg/debrid/debrid_link/debrid_link.go index a9bbffa..17c820a 100644 --- a/pkg/debrid/debrid_link/debrid_link.go +++ b/pkg/debrid/debrid_link/debrid_link.go @@ -21,7 +21,7 @@ type DebridLink struct { Name string Host string `json:"host"` APIKey string - ExtraAPIKeys []string + DownloadKeys []string DownloadUncached bool client *request.Client @@ -243,7 +243,7 @@ func (dl *DebridLink) GetDownloads() (map[string]types.DownloadLinks, error) { return nil, nil } -func (dl *DebridLink) GetDownloadLink(t *types.Torrent, file *types.File) (string, error) { +func (dl *DebridLink) GetDownloadLink(t *types.Torrent, file *types.File, index int) (string, error) { return file.DownloadLink, nil } @@ -261,14 +261,9 @@ func (dl *DebridLink) GetDownloadUncached() bool { func New(dc config.Debrid) *DebridLink { rl := request.ParseRateLimit(dc.RateLimit) - apiKeys := strings.Split(dc.APIKey, ",") - extraKeys := make([]string, 0) - if len(apiKeys) > 1 { - extraKeys = apiKeys[1:] - } - mainKey := apiKeys[0] + headers := map[string]string{ - "Authorization": fmt.Sprintf("Bearer %s", mainKey), + "Authorization": fmt.Sprintf("Bearer %s", dc.APIKey), "Content-Type": "application/json", } _log := logger.New(dc.Name) @@ -276,12 +271,13 @@ func New(dc config.Debrid) *DebridLink { request.WithHeaders(headers), request.WithLogger(_log), request.WithRateLimiter(rl), + request.WithProxy(dc.Proxy), ) return &DebridLink{ Name: "debridlink", Host: dc.Host, - APIKey: mainKey, - ExtraAPIKeys: extraKeys, + APIKey: dc.APIKey, + DownloadKeys: dc.DownloadAPIKeys, DownloadUncached: dc.DownloadUncached, client: client, MountPath: dc.Folder, @@ -371,3 +367,7 @@ func (dl *DebridLink) CheckLink(link string) error { func (dl *DebridLink) GetMountPath() string { return dl.MountPath } + +func (dl *DebridLink) GetDownloadKeys() []string { + return dl.DownloadKeys +} diff --git a/pkg/debrid/realdebrid/realdebrid.go b/pkg/debrid/realdebrid/realdebrid.go index 48c266e..038c6ad 100644 --- a/pkg/debrid/realdebrid/realdebrid.go +++ b/pkg/debrid/realdebrid/realdebrid.go @@ -1,6 +1,7 @@ package realdebrid import ( + "errors" "fmt" "github.com/goccy/go-json" "github.com/rs/zerolog" @@ -25,7 +26,7 @@ type RealDebrid struct { Host string `json:"host"` APIKey string - ExtraAPIKeys []string // This is used for bandwidth + DownloadKeys []string // This is used for bandwidth DownloadUncached bool client *request.Client @@ -43,45 +44,58 @@ func (r *RealDebrid) GetLogger() zerolog.Logger { return r.logger } +func getSelectedFiles(t *types.Torrent, data TorrentInfo) map[string]types.File { + selectedFiles := make([]types.File, 0) + for _, f := range data.Files { + if f.Selected == 1 { + name := filepath.Base(f.Path) + file := types.File{ + Name: name, + Path: name, + Size: f.Bytes, + Id: strconv.Itoa(f.ID), + } + selectedFiles = append(selectedFiles, file) + } + } + files := make(map[string]types.File) + for index, f := range selectedFiles { + if index >= len(data.Links) { + break + } + f.Link = data.Links[index] + files[f.Name] = f + } + return files +} + // getTorrentFiles returns a list of torrent files from the torrent info // validate is used to determine if the files should be validated // if validate is false, selected files will be returned -func getTorrentFiles(t *types.Torrent, data TorrentInfo, validate bool) map[string]types.File { +func getTorrentFiles(t *types.Torrent, data TorrentInfo) map[string]types.File { files := make(map[string]types.File) cfg := config.Get() idx := 0 + for _, f := range data.Files { name := filepath.Base(f.Path) - if validate { - if utils.IsSampleFile(f.Path) { - // Skip sample files - continue - } - - if !cfg.IsAllowedFile(name) { - continue - } - if !cfg.IsSizeAllowed(f.Bytes) { - continue - } - } else { - if f.Selected == 0 { - continue - } + if utils.IsSampleFile(f.Path) { + // Skip sample files + continue } - fileId := f.ID - _link := "" - if len(data.Links) > idx { - _link = data.Links[idx] + if !cfg.IsAllowedFile(name) { + continue + } + if !cfg.IsSizeAllowed(f.Bytes) { + continue } file := types.File{ Name: name, Path: name, Size: f.Bytes, - Id: strconv.Itoa(fileId), - Link: _link, + Id: strconv.Itoa(f.ID), } files[name] = file idx++ @@ -182,7 +196,7 @@ func (r *RealDebrid) UpdateTorrent(t *types.Torrent) error { t.MountPath = r.MountPath t.Debrid = r.Name t.Added = data.Added - t.Files = getTorrentFiles(t, data, false) // Get selected files + t.Files = getSelectedFiles(t, data) // Get selected files return nil } @@ -213,7 +227,7 @@ func (r *RealDebrid) CheckStatus(t *types.Torrent, isSymlink bool) (*types.Torre t.Debrid = r.Name t.MountPath = r.MountPath if status == "waiting_files_selection" { - t.Files = getTorrentFiles(t, data, true) + t.Files = getTorrentFiles(t, data) if len(t.Files) == 0 { return t, fmt.Errorf("no video files found") } @@ -231,7 +245,7 @@ func (r *RealDebrid) CheckStatus(t *types.Torrent, isSymlink bool) (*types.Torre return t, err } } else if status == "downloaded" { - t.Files = getTorrentFiles(t, data, false) // Get selected files + t.Files = getSelectedFiles(t, data) // Get selected files r.logger.Info().Msgf("Torrent: %s downloaded to RD", t.Name) if !isSymlink { err = r.GenerateDownloadLinks(t) @@ -273,7 +287,7 @@ func (r *RealDebrid) GenerateDownloadLinks(t *types.Torrent) error { go func(file types.File) { defer wg.Done() - link, err := r.GetDownloadLink(t, &file) + link, err := r.GetDownloadLink(t, &file, 0) if err != nil { errCh <- err return @@ -333,6 +347,7 @@ func (r *RealDebrid) _getDownloadLink(file *types.File) (string, error) { if err != nil { return "", err } + defer resp.Body.Close() if resp.StatusCode != http.StatusOK { // Read the response body to get the error message b, err := io.ReadAll(resp.Body) @@ -348,12 +363,12 @@ func (r *RealDebrid) _getDownloadLink(file *types.File) (string, error) { return "", request.TrafficExceededError case 24: return "", request.HosterUnavailableError // Link has been nerfed + case 19: + return "", request.HosterUnavailableError // File has been removed default: return "", fmt.Errorf("realdebrid API error: %d", resp.StatusCode) } - } - defer resp.Body.Close() b, err := io.ReadAll(resp.Body) if err != nil { return "", err @@ -366,20 +381,28 @@ func (r *RealDebrid) _getDownloadLink(file *types.File) (string, error) { } -func (r *RealDebrid) GetDownloadLink(t *types.Torrent, file *types.File) (string, error) { +func (r *RealDebrid) GetDownloadLink(t *types.Torrent, file *types.File, index int) (string, error) { + + if index >= len(r.DownloadKeys) { + // Reset to first key + index = 0 + } + r.client.SetHeader("Authorization", fmt.Sprintf("Bearer %s", r.DownloadKeys[index])) link, err := r._getDownloadLink(file) - if err == nil { + if err == nil && link != "" { return link, nil } - for _, key := range r.ExtraAPIKeys { - r.client.SetHeader("Authorization", fmt.Sprintf("Bearer %s", key)) - if link, err := r._getDownloadLink(file); err == nil { - return link, nil + if err != nil && errors.Is(err, request.HosterUnavailableError) { + // Try the next key + if index+1 < len(r.DownloadKeys) { + return r.GetDownloadLink(t, file, index+1) } } - // Reset to main API key + // If we reach here, it means we have tried all keys + // and none of them worked, or the error is not related to the keys + // Reset to the first key r.client.SetHeader("Authorization", fmt.Sprintf("Bearer %s", r.APIKey)) - return "", err + return link, err } func (r *RealDebrid) GetCheckCached() bool { @@ -553,11 +576,7 @@ func (r *RealDebrid) GetMountPath() string { func New(dc config.Debrid) *RealDebrid { rl := request.ParseRateLimit(dc.RateLimit) - apiKeys := strings.Split(dc.DownloadAPIKeys, ",") - extraKeys := make([]string, 0) - if len(apiKeys) > 1 { - extraKeys = apiKeys[1:] - } + headers := map[string]string{ "Authorization": fmt.Sprintf("Bearer %s", dc.APIKey), } @@ -568,12 +587,13 @@ func New(dc config.Debrid) *RealDebrid { request.WithLogger(_log), request.WithMaxRetries(5), request.WithRetryableStatus(429), + request.WithProxy(dc.Proxy), ) return &RealDebrid{ Name: "realdebrid", Host: dc.Host, APIKey: dc.APIKey, - ExtraAPIKeys: extraKeys, + DownloadKeys: dc.DownloadAPIKeys, DownloadUncached: dc.DownloadUncached, client: client, MountPath: dc.Folder, @@ -581,3 +601,7 @@ func New(dc config.Debrid) *RealDebrid { CheckCached: dc.CheckCached, } } + +func (r *RealDebrid) GetDownloadKeys() []string { + return r.DownloadKeys +} diff --git a/pkg/debrid/torbox/torbox.go b/pkg/debrid/torbox/torbox.go index 1ea4060..6b2955b 100644 --- a/pkg/debrid/torbox/torbox.go +++ b/pkg/debrid/torbox/torbox.go @@ -25,7 +25,7 @@ type Torbox struct { Name string Host string `json:"host"` APIKey string - ExtraAPIKeys []string + DownloadKeys []string DownloadUncached bool client *request.Client @@ -36,27 +36,23 @@ type Torbox struct { func New(dc config.Debrid) *Torbox { rl := request.ParseRateLimit(dc.RateLimit) - apiKeys := strings.Split(dc.APIKey, ",") - extraKeys := make([]string, 0) - if len(apiKeys) > 1 { - extraKeys = apiKeys[1:] - } - mainKey := apiKeys[0] + headers := map[string]string{ - "Authorization": fmt.Sprintf("Bearer %s", mainKey), + "Authorization": fmt.Sprintf("Bearer %s", dc.APIKey), } _log := logger.New(dc.Name) client := request.New( request.WithHeaders(headers), request.WithRateLimiter(rl), request.WithLogger(_log), + request.WithProxy(dc.Proxy), ) return &Torbox{ Name: "torbox", Host: dc.Host, - APIKey: mainKey, - ExtraAPIKeys: extraKeys, + APIKey: dc.APIKey, + DownloadKeys: dc.DownloadAPIKeys, DownloadUncached: dc.DownloadUncached, client: client, MountPath: dc.Folder, @@ -284,7 +280,7 @@ func (tb *Torbox) GenerateDownloadLinks(t *types.Torrent) error { for _, file := range t.Files { go func() { defer wg.Done() - link, err := tb.GetDownloadLink(t, &file) + link, err := tb.GetDownloadLink(t, &file, 0) if err != nil { errCh <- err return @@ -316,7 +312,7 @@ func (tb *Torbox) GenerateDownloadLinks(t *types.Torrent) error { return nil } -func (tb *Torbox) GetDownloadLink(t *types.Torrent, file *types.File) (string, error) { +func (tb *Torbox) GetDownloadLink(t *types.Torrent, file *types.File, index int) (string, error) { url := fmt.Sprintf("%s/api/torrents/requestdl/", tb.Host) query := gourl.Values{} query.Add("torrent_id", t.Id) @@ -366,3 +362,7 @@ func (tb *Torbox) CheckLink(link string) error { func (tb *Torbox) GetMountPath() string { return tb.MountPath } + +func (tb *Torbox) GetDownloadKeys() []string { + return tb.DownloadKeys +} diff --git a/pkg/debrid/types/debrid.go b/pkg/debrid/types/debrid.go index ffb7dc7..c2f5edb 100644 --- a/pkg/debrid/types/debrid.go +++ b/pkg/debrid/types/debrid.go @@ -8,7 +8,7 @@ type Client interface { SubmitMagnet(tr *Torrent) (*Torrent, error) CheckStatus(tr *Torrent, isSymlink bool) (*Torrent, error) GenerateDownloadLinks(tr *Torrent) error - GetDownloadLink(tr *Torrent, file *File) (string, error) + GetDownloadLink(tr *Torrent, file *File, index int) (string, error) DeleteTorrent(torrentId string) error IsAvailable(infohashes []string) map[string]bool GetCheckCached() bool @@ -21,4 +21,5 @@ type Client interface { GetDownloads() (map[string]DownloadLinks, error) CheckLink(link string) error GetMountPath() string + GetDownloadKeys() []string } diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index 4a4f088..3d46e58 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -321,7 +321,7 @@ func (p *Proxy) Start(ctx context.Context) error { proxy.OnRequest(goproxy.ReqHostMatches(regexp.MustCompile("^.443$"))).HandleConnect(goproxy.AlwaysMitm) proxy.OnResponse( - UrlMatches(regexp.MustCompile("^.*/api\\?t=(search|tvsearch|movie)(&.*)?$")), + UrlMatches(regexp.MustCompile(`^.*/api\?t=(search|tvsearch|movie)(&.*)?$`)), goproxy.StatusCodeIs(http.StatusOK, http.StatusAccepted)).DoFunc( func(resp *http.Response, ctx *goproxy.ProxyCtx) *http.Response { return p.ProcessResponse(resp) diff --git a/pkg/qbit/downloader.go b/pkg/qbit/downloader.go index c664b5b..d872b92 100644 --- a/pkg/qbit/downloader.go +++ b/pkg/qbit/downloader.go @@ -225,14 +225,15 @@ func (q *QBit) preCacheFile(name string, filePaths []string) error { for _, filePath := range filePaths { func() { file, err := os.Open(filePath) - defer file.Close() + defer func(file *os.File) { + _ = file.Close() + }(file) if err != nil { return } // Pre-cache the file header (first 256KB) using 16KB chunks. q.readSmallChunks(file, 0, 256*1024, 16*1024) q.readSmallChunks(file, 1024*1024, 64*1024, 16*1024) - return }() } @@ -264,5 +265,4 @@ func (q *QBit) readSmallChunks(file *os.File, startPos int64, totalToRead int, c bytesRemaining -= n } - return } diff --git a/pkg/repair/clean.go b/pkg/repair/clean.go index d6a117d..d992322 100644 --- a/pkg/repair/clean.go +++ b/pkg/repair/clean.go @@ -1,149 +1,159 @@ package repair -import ( - "context" - "fmt" - "github.com/sirrobot01/debrid-blackhole/internal/request" - "golang.org/x/sync/errgroup" - "runtime" - "sync" - "time" -) +//func (r *Repair) clean(job *Job) error { +// // Create a new error group +// g, ctx := errgroup.WithContext(context.Background()) +// +// uniqueItems := make(map[string]string) +// mu := sync.Mutex{} +// +// // Limit concurrent goroutines +// g.SetLimit(10) +// +// for _, a := range job.Arrs { +// a := a // Capture range variable +// g.Go(func() error { +// // Check if context was canceled +// select { +// case <-ctx.Done(): +// return ctx.Err() +// default: +// } +// +// items, err := r.cleanArr(job, a, "") +// if err != nil { +// r.logger.Error().Err(err).Msgf("Error cleaning %s", a) +// return err +// } +// +// // Safely append the found items to the shared slice +// if len(items) > 0 { +// mu.Lock() +// for k, v := range items { +// uniqueItems[k] = v +// } +// mu.Unlock() +// } +// +// return nil +// }) +// } +// +// if err := g.Wait(); err != nil { +// return err +// } +// +// if len(uniqueItems) == 0 { +// job.CompletedAt = time.Now() +// job.Status = JobCompleted +// +// go func() { +// if err := request.SendDiscordMessage("repair_clean_complete", "success", job.discordContext()); err != nil { +// r.logger.Error().Msgf("Error sending discord message: %v", err) +// } +// }() +// +// return nil +// } +// +// cache := r.deb.Caches["realdebrid"] +// if cache == nil { +// return fmt.Errorf("cache not found") +// } +// torrents := cache.GetTorrents() +// +// dangling := make([]string, 0) +// for _, t := range torrents { +// if _, ok := uniqueItems[t.Name]; !ok { +// dangling = append(dangling, t.Id) +// } +// } +// +// r.logger.Info().Msgf("Found %d delapitated items", len(dangling)) +// +// if len(dangling) == 0 { +// job.CompletedAt = time.Now() +// job.Status = JobCompleted +// return nil +// } +// +// client := r.deb.Clients["realdebrid"] +// if client == nil { +// return fmt.Errorf("client not found") +// } +// for _, id := range dangling { +// err := client.DeleteTorrent(id) +// if err != nil { +// return err +// } +// } +// +// return nil +//} +// +//func (r *Repair) cleanArr(j *Job, _arr string, tmdbId string) (map[string]string, error) { +// uniqueItems := make(map[string]string) +// a := r.arrs.Get(_arr) +// +// r.logger.Info().Msgf("Starting repair for %s", a.Name) +// media, err := a.GetMedia(tmdbId) +// if err != nil { +// r.logger.Info().Msgf("Failed to get %s media: %v", a.Name, err) +// return uniqueItems, err +// } +// +// // Create a new error group +// g, ctx := errgroup.WithContext(context.Background()) +// +// mu := sync.Mutex{} +// +// // Limit concurrent goroutines +// g.SetLimit(runtime.NumCPU() * 4) +// +// for _, m := range media { +// m := m // Create a new variable scoped to the loop iteration +// g.Go(func() error { +// // Check if context was canceled +// select { +// case <-ctx.Done(): +// return ctx.Err() +// default: +// } +// +// u := r.getUniquePaths(m) +// for k, v := range u { +// mu.Lock() +// uniqueItems[k] = v +// mu.Unlock() +// } +// return nil +// }) +// } +// +// if err := g.Wait(); err != nil { +// return uniqueItems, err +// } +// +// r.logger.Info().Msgf("Repair completed for %s. %d unique items", a.Name, len(uniqueItems)) +// return uniqueItems, nil +//} -func (r *Repair) clean(job *Job) error { - // Create a new error group - g, ctx := errgroup.WithContext(context.Background()) - - uniqueItems := make(map[string]string) - mu := sync.Mutex{} - - // Limit concurrent goroutines - g.SetLimit(10) - - for _, a := range job.Arrs { - a := a // Capture range variable - g.Go(func() error { - // Check if context was canceled - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - items, err := r.cleanArr(job, a, "") - if err != nil { - r.logger.Error().Err(err).Msgf("Error cleaning %s", a) - return err - } - - // Safely append the found items to the shared slice - if len(items) > 0 { - mu.Lock() - for k, v := range items { - uniqueItems[k] = v - } - mu.Unlock() - } - - return nil - }) - } - - if err := g.Wait(); err != nil { - return err - } - - if len(uniqueItems) == 0 { - job.CompletedAt = time.Now() - job.Status = JobCompleted - - go func() { - if err := request.SendDiscordMessage("repair_clean_complete", "success", job.discordContext()); err != nil { - r.logger.Error().Msgf("Error sending discord message: %v", err) - } - }() - - return nil - } - - cache := r.deb.Caches["realdebrid"] - if cache == nil { - return fmt.Errorf("cache not found") - } - torrents := cache.GetTorrents() - - dangling := make([]string, 0) - for _, t := range torrents { - if _, ok := uniqueItems[t.Name]; !ok { - dangling = append(dangling, t.Id) - } - } - - r.logger.Info().Msgf("Found %d delapitated items", len(dangling)) - - if len(dangling) == 0 { - job.CompletedAt = time.Now() - job.Status = JobCompleted - return nil - } - - client := r.deb.Clients["realdebrid"] - if client == nil { - return fmt.Errorf("client not found") - } - for _, id := range dangling { - err := client.DeleteTorrent(id) - if err != nil { - return err - } - } - - return nil -} - -func (r *Repair) cleanArr(j *Job, _arr string, tmdbId string) (map[string]string, error) { - uniqueItems := make(map[string]string) - a := r.arrs.Get(_arr) - - r.logger.Info().Msgf("Starting repair for %s", a.Name) - media, err := a.GetMedia(tmdbId) - if err != nil { - r.logger.Info().Msgf("Failed to get %s media: %v", a.Name, err) - return uniqueItems, err - } - - // Create a new error group - g, ctx := errgroup.WithContext(context.Background()) - - mu := sync.Mutex{} - - // Limit concurrent goroutines - g.SetLimit(runtime.NumCPU() * 4) - - for _, m := range media { - m := m // Create a new variable scoped to the loop iteration - g.Go(func() error { - // Check if context was canceled - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - u := r.getUniquePaths(m) - for k, v := range u { - mu.Lock() - uniqueItems[k] = v - mu.Unlock() - } - return nil - }) - } - - if err := g.Wait(); err != nil { - return uniqueItems, err - } - - r.logger.Info().Msgf("Repair completed for %s. %d unique items", a.Name, len(uniqueItems)) - return uniqueItems, nil -} +//func (r *Repair) getUniquePaths(media arr.Content) map[string]string { +// // Use zurg setup to check file availability with zurg +// // This reduces bandwidth usage significantly +// +// uniqueParents := make(map[string]string) +// files := media.Files +// for _, file := range files { +// target := getSymlinkTarget(file.Path) +// if target != "" { +// file.IsSymlink = true +// dir, f := filepath.Split(target) +// parent := filepath.Base(filepath.Clean(dir)) +// // Set target path folder/file.mkv +// file.TargetPath = f +// uniqueParents[parent] = target +// } +// } +// return uniqueParents +//} diff --git a/pkg/repair/repair.go b/pkg/repair/repair.go index c9f8ffe..b5300a6 100644 --- a/pkg/repair/repair.go +++ b/pkg/repair/repair.go @@ -434,26 +434,6 @@ func (r *Repair) repairArr(j *Job, _arr string, tmdbId string) ([]arr.ContentFil return brokenItems, nil } -func (r *Repair) getUniquePaths(media arr.Content) map[string]string { - // Use zurg setup to check file availability with zurg - // This reduces bandwidth usage significantly - - uniqueParents := make(map[string]string) - files := media.Files - for _, file := range files { - target := getSymlinkTarget(file.Path) - if target != "" { - file.IsSymlink = true - dir, f := filepath.Split(target) - parent := filepath.Base(filepath.Clean(dir)) - // Set target path folder/file.mkv - file.TargetPath = f - uniqueParents[parent] = target - } - } - return uniqueParents -} - func (r *Repair) isMediaAccessible(m arr.Content) bool { files := m.Files if len(files) == 0 { @@ -758,7 +738,7 @@ func (r *Repair) saveToFile() { if err != nil { r.logger.Debug().Err(err).Msg("Failed to marshal jobs") } - err = os.WriteFile(r.filename, data, 0644) + _ = os.WriteFile(r.filename, data, 0644) } func (r *Repair) loadFromFile() { diff --git a/pkg/webdav/file.go b/pkg/webdav/file.go index e2032cc..533990b 100644 --- a/pkg/webdav/file.go +++ b/pkg/webdav/file.go @@ -3,13 +3,20 @@ package webdav import ( "crypto/tls" "fmt" + "github.com/sirrobot01/debrid-blackhole/internal/request" "github.com/sirrobot01/debrid-blackhole/pkg/debrid/debrid" "io" "net/http" "os" + "sync" "time" ) +var ( + sdClient *request.Client + once sync.Once +) + var sharedClient = &http.Client{ Transport: &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, @@ -26,6 +33,32 @@ var sharedClient = &http.Client{ Timeout: 60 * time.Second, } +func getClient() *request.Client { + once.Do(func() { + var transport = &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + Proxy: http.ProxyFromEnvironment, + MaxIdleConns: 100, + MaxIdleConnsPerHost: 20, + MaxConnsPerHost: 50, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ResponseHeaderTimeout: 30 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + DisableKeepAlives: false, + } + sdClient = request.New( + request.WithTransport(transport), + request.WithTimeout(30*time.Second), + request.WithHeaders(map[string]string{ + "Accept": "*/*", + "Connection": "keep-alive", + }), + ) + }) + return sdClient +} + type File struct { cache *debrid.Cache fileId string @@ -47,6 +80,8 @@ type File struct { link string } +// You can not download this file because you have exceeded your traffic on this hoster + // File interface implementations for File func (f *File) Close() error { @@ -57,21 +92,113 @@ func (f *File) Close() error { return nil } -func (f *File) GetDownloadLink() string { +func (f *File) getDownloadLink(index int) string { // Check if we already have a final URL cached if f.downloadLink != "" && isValidURL(f.downloadLink) { return f.downloadLink } - downloadLink := f.cache.GetDownloadLink(f.torrentId, f.name, f.link) + downloadLink := f.cache.GetDownloadLink(f.torrentId, f.name, f.link, index) if downloadLink != "" && isValidURL(downloadLink) { - f.downloadLink = downloadLink return downloadLink } - return "" } +func (f *File) stream(index int) (*http.Response, error) { + client := sharedClient // Might be replaced with the custom client + _log := f.cache.GetLogger() + var ( + err error + downloadLink string + ) + + downloadLink = f.getDownloadLink(index) // Uses the first API key + if downloadLink == "" { + _log.Error().Msgf("Failed to get download link for %s", f.name) + return nil, fmt.Errorf("failed to get download link") + } + + req, err := http.NewRequest("GET", downloadLink, nil) + if err != nil { + _log.Error().Msgf("Failed to create HTTP request: %s", err) + return nil, fmt.Errorf("failed to create HTTP request: %w", err) + } + + if f.offset > 0 { + req.Header.Set("Range", fmt.Sprintf("bytes=%d-", f.offset)) + } + + resp, err := client.Do(req) + if err != nil { + return resp, fmt.Errorf("HTTP request error: %w", err) + } + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { + + closeResp := func() { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + + if resp.StatusCode == http.StatusServiceUnavailable { + closeResp() + // Read the body to consume the response + f.cache.MarkDownloadLinkAsInvalid(downloadLink, "bandwidth limit reached") + // Generate a new download link + if index < f.cache.TotalDownloadKeys()-1 { + // Retry with a different download key + _log.Debug().Str("link", downloadLink). + Str("torrentId", f.torrentId). + Str("fileId", f.fileId). + Msgf("Bandwidth limit reached, retrying with another API key, attempt %d", index+1) + return f.stream(index + 1) // Retry with the next download key + } else { + // No more download keys available, return an error + _log.Error().Msgf("Bandwidth limit reached for all download keys") + return nil, fmt.Errorf("bandwidth_limit_exceeded") + } + + } else if resp.StatusCode == http.StatusNotFound { + closeResp() + // Mark download link as not found + // Regenerate a new download link + f.cache.MarkDownloadLinkAsInvalid(downloadLink, "link_not_found") + f.cache.RemoveDownloadLink(f.link) // Remove the link from the cache + // Generate a new download link + downloadLink = f.getDownloadLink(index) + if downloadLink == "" { + _log.Error().Msgf("Failed to get download link for %s", f.name) + return nil, fmt.Errorf("failed to get download link") + } + req, err = http.NewRequest("GET", downloadLink, nil) + if err != nil { + return nil, fmt.Errorf("failed to create HTTP request: %w", err) + } + if f.offset > 0 { + req.Header.Set("Range", fmt.Sprintf("bytes=%d-", f.offset)) + } + + resp, err = client.Do(req) + if err != nil { + return resp, fmt.Errorf("HTTP request error: %w", err) + } + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { + closeResp() + // Read the body to consume the response + f.cache.MarkDownloadLinkAsInvalid(downloadLink, "link_not_found") + return resp, fmt.Errorf("link not found") + } + return resp, nil + + } else { + closeResp() + return resp, fmt.Errorf("unexpected HTTP status: %d", resp.StatusCode) + } + + } + return resp, nil +} + func (f *File) Read(p []byte) (n int, err error) { if f.isDir { return 0, os.ErrInvalid @@ -96,39 +223,15 @@ func (f *File) Read(p []byte) (n int, err error) { f.reader = nil } - downloadLink := f.GetDownloadLink() - if downloadLink == "" { - return 0, io.EOF - } - - req, err := http.NewRequest("GET", downloadLink, nil) + // Make the request to get the file + resp, err := f.stream(0) if err != nil { - return 0, fmt.Errorf("failed to create HTTP request: %w", err) + return 0, err + } + if resp == nil { + return 0, fmt.Errorf("failed to get response") } - if f.offset > 0 { - req.Header.Set("Range", fmt.Sprintf("bytes=%d-", f.offset)) - } - - // Set headers as needed - req.Header.Set("Connection", "keep-alive") - req.Header.Set("Accept", "*/*") - - resp, err := sharedClient.Do(req) - if err != nil { - return 0, fmt.Errorf("HTTP request error: %w", err) - } - - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { - resp.Body.Close() - _log := f.cache.GetLogger() - _log.Debug(). - Str("downloadLink", downloadLink). - Str("link", f.link). - Str("torrentId", f.torrentId). - Msgf("Unexpected HTTP status: %d", resp.StatusCode) - return 0, fmt.Errorf("unexpected HTTP status: %d", resp.StatusCode) - } f.reader = resp.Body f.seekPending = false } diff --git a/pkg/webdav/handler.go b/pkg/webdav/handler.go index 46ae8dc..e2ea416 100644 --- a/pkg/webdav/handler.go +++ b/pkg/webdav/handler.go @@ -18,17 +18,14 @@ import ( path "path/filepath" "slices" "strings" - "sync" "time" ) type Handler struct { - Name string - logger zerolog.Logger - cache *debrid.Cache - lastRefresh time.Time - refreshMutex sync.Mutex - RootPath string + Name string + logger zerolog.Logger + cache *debrid.Cache + RootPath string } func NewHandler(name string, cache *debrid.Cache, logger zerolog.Logger) *Handler { @@ -299,11 +296,11 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("Vary", "Accept-Encoding") w.Header().Set("Content-Length", fmt.Sprintf("%d", len(gzippedData))) w.WriteHeader(responseRecorder.Code) - w.Write(gzippedData) + _, _ = w.Write(gzippedData) } else { w.Header().Set("Content-Length", fmt.Sprintf("%d", len(responseData))) w.WriteHeader(responseRecorder.Code) - w.Write(responseData) + _, _ = w.Write(responseData) } return } @@ -422,11 +419,11 @@ func (h *Handler) serveFromCacheIfValid(w http.ResponseWriter, r *http.Request, w.Header().Set("Vary", "Accept-Encoding") w.Header().Set("Content-Length", fmt.Sprintf("%d", len(respCache.GzippedData))) w.WriteHeader(http.StatusOK) - w.Write(respCache.GzippedData) + _, _ = w.Write(respCache.GzippedData) } else { w.Header().Set("Content-Length", fmt.Sprintf("%d", len(respCache.Data))) w.WriteHeader(http.StatusOK) - w.Write(respCache.Data) + _, _ = w.Write(respCache.Data) } return true }