- Deprecate proxy

- Add Proxy for each debrid
- Add support for multiple-API keys
- Use internal http.Client for streaming
- Bug fixes etc
This commit is contained in:
Mukhtar Akere
2025-04-08 17:30:24 +01:00
parent 4659cd4273
commit 4b5e18df94
23 changed files with 788 additions and 599 deletions

View File

@@ -20,7 +20,7 @@ RUN --mount=type=cache,target=/go/pkg/mod \
CGO_ENABLED=0 GOOS=$TARGETOS GOARCH=$TARGETARCH \
go build -trimpath \
-ldflags="-w -s -X github.com/sirrobot01/debrid-blackhole/pkg/version.Version=${VERSION} -X github.com/sirrobot01/debrid-blackhole/pkg/version.Channel=${CHANNEL}" \
-o /blackhole
-o /decypharr
# Build healthcheck (optimized)
RUN --mount=type=cache,target=/go/pkg/mod \
@@ -43,12 +43,12 @@ FROM gcr.io/distroless/static-debian12:nonroot
LABEL version = "${VERSION}-${CHANNEL}"
LABEL org.opencontainers.image.source = "https://github.com/sirrobot01/debrid-blackhole"
LABEL org.opencontainers.image.title = "debrid-blackhole"
LABEL org.opencontainers.image.title = "decypharr"
LABEL org.opencontainers.image.authors = "sirrobot01"
LABEL org.opencontainers.image.documentation = "https://github.com/sirrobot01/debrid-blackhole/blob/main/README.md"
# Copy binaries
COPY --from=builder --chown=nonroot:nonroot /blackhole /usr/bin/blackhole
COPY --from=builder --chown=nonroot:nonroot /decypharr /usr/bin/decypharr
COPY --from=builder --chown=nonroot:nonroot /healthcheck /usr/bin/healthcheck
# Copy pre-made directory structure
@@ -63,4 +63,4 @@ USER nonroot:nonroot
HEALTHCHECK CMD ["/usr/bin/healthcheck"]
CMD ["/usr/bin/blackhole", "--config", "/app"]
CMD ["/usr/bin/decypharr", "--config", "/app"]

View File

@@ -21,7 +21,6 @@ This is an implementation of QbitTorrent with a **Multiple Debrid service suppor
- [Proxy Config](#proxy-config)
- [Qbittorrent Config](#qbittorrent-config)
- [Arrs Config](#arrs-config)
- [Proxy](#proxy)
- [Repair Worker](#repair-worker)
- [WebDAV](#webdav)
- [WebDAV Config](#webdav-config)
@@ -131,12 +130,6 @@ This is the default config file. You can create a `config.json` file in the root
"folder": "/mnt/remote/realdebrid/__all__/"
}
],
"proxy": {
"enabled": false,
"port": "8100",
"username": "username",
"password": "password"
},
"qbittorrent": {
"port": "8282",
"download_folder": "/mnt/symlinks/",
@@ -172,6 +165,7 @@ Full config are [here](doc/config.full.json)
- The `name` key is the name of the debrid provider
- The `host` key is the API endpoint of the debrid provider
- The `api_key` key is the API key of the debrid provider. This can be comma separated for multiple API keys
- The `download_api_keys` key is the API key of the debrid provider. By default, this is the same as the `api_key` key. This is used to download the torrents. This is an array of API keys. This is useful for those using multiple api keys. The API keys are used to download the torrents.
- The `folder` key is the folder where your debrid folder is mounted(webdav, rclone, zurg etc). e.g `data/realdebrid/torrents/`, `/media/remote/alldebrid/magnets/`
- The `rate_limit` key is the rate limit of the debrid provider(null by default)
- The `download_uncached` bool key is used to download uncached torrents(disabled by default)
@@ -189,14 +183,6 @@ The `repair` key is used to enable the repair worker
- The `zurg_url` is the url of the zurg server. Typically `http://localhost:9999` or `http://zurg:9999`
- The `auto_process` is used to automatically process the repair worker. This will delete broken symlinks and re-search for missing files
##### Proxy Config
- The `enabled` key is used to enable the proxy
- The `port` key is the port the proxy will listen on
- The `log_level` key is used to set the log level of the proxy. The default value is `info`
- The `username` and `password` keys are used for basic authentication
- The `cached_only` means only cached torrents will be returned
##### Qbittorrent Config
- The `port` key is the port the qBittorrent will listen on
- The `download_folder` is the folder where the torrents will be downloaded. e.g `/media/symlinks/`
@@ -250,14 +236,6 @@ You can use the webdav server with media players like Infuse, VidHub or mount it
- The `rc_url`, `rc_user`, `rc_pass` keys are used to trigger a vfs refresh on your rclone. This speeds up the process of getting the files. This is useful for rclone users. T
### Proxy
#### **Note**: Proxy has stopped working for Real Debrid, Debrid Link, and All Debrid. It still works for Torbox. This is due to the changes in the API of the Debrid Providers.
The proxy is useful in filtering out un-cached Debrid torrents.
The proxy is a simple HTTP proxy that requires basic authentication. The proxy can be enabled by setting the `proxy.enabled` to `true` in the config file.
The proxy listens on the port `8181` by default. The username and password can be set in the config file.
### Changelog
- View the [CHANGELOG.md](CHANGELOG.md) for the latest changes

View File

@@ -5,13 +5,17 @@
"host": "https://api.real-debrid.com/rest/1.0",
"api_key": "realdebrid_key",
"folder": "/mnt/remote/realdebrid/__all__/",
"proxy": "",
"rate_limit": "250/minute",
"download_uncached": false,
"check_cached": false,
"use_webdav": true,
"torrents_refresh_interval": "15s",
"folder_naming": "original_no_ext",
"auto_expire_links_after": "3d"
"auto_expire_links_after": "3d",
"rc_url": "http://your-ip-address:9990",
"rc_user": "your_rclone_rc_user",
"rc_pass": "your_rclone_rc_pass"
},
{
"name": "torbox",
@@ -41,14 +45,6 @@
"check_cached": false
}
],
"proxy": {
"enabled": true,
"port": "8100",
"log_level": "info",
"username": "username",
"password": "password",
"cached_only": true
},
"max_cache_size": 1000,
"qbittorrent": {
"port": "8282",
@@ -95,14 +91,5 @@
"max_file_size": "",
"allowed_file_types": [],
"use_auth": false,
"discord_webhook_url": "https://discord.com/api/webhooks/...",
"webdav": {
"torrents_refresh_interval": "15s",
"download_links_refresh_interval": "1h",
"folder_naming": "original",
"auto_expire_links_after": "24h",
"rc_url": "http://your-ip-address:9990",
"rc_user": "your_rclone_rc_user",
"rc_pass": "your_rclone_rc_pass"
}
"discord_webhook_url": "https://discord.com/api/webhooks/..."
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 185 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 156 KiB

After

Width:  |  Height:  |  Size: 188 KiB

View File

@@ -7,7 +7,6 @@ import (
"github.com/goccy/go-json"
"os"
"path/filepath"
"strings"
"sync"
)
@@ -18,14 +17,15 @@ var (
)
type Debrid struct {
Name string `json:"name"`
Host string `json:"host"`
APIKey string `json:"api_key"`
DownloadAPIKeys string `json:"download_api_keys"`
Folder string `json:"folder"`
DownloadUncached bool `json:"download_uncached"`
CheckCached bool `json:"check_cached"`
RateLimit string `json:"rate_limit"` // 200/minute or 10/second
Name string `json:"name"`
Host string `json:"host"`
APIKey string `json:"api_key"`
DownloadAPIKeys []string `json:"download_api_keys"`
Folder string `json:"folder"`
DownloadUncached bool `json:"download_uncached"`
CheckCached bool `json:"check_cached"`
RateLimit string `json:"rate_limit"` // 200/minute or 10/second
Proxy string `json:"proxy"`
UseWebDav bool `json:"use_webdav"`
WebDav
@@ -192,15 +192,15 @@ func validateDebrids(debrids []Debrid) error {
return nil
}
func validateQbitTorrent(config *QBitTorrent) error {
if config.DownloadFolder == "" {
return errors.New("qbittorent download folder is required")
}
if _, err := os.Stat(config.DownloadFolder); os.IsNotExist(err) {
return fmt.Errorf("qbittorent download folder(%s) does not exist", config.DownloadFolder)
}
return nil
}
//func validateQbitTorrent(config *QBitTorrent) error {
// if config.DownloadFolder == "" {
// return errors.New("qbittorent download folder is required")
// }
// if _, err := os.Stat(config.DownloadFolder); os.IsNotExist(err) {
// return fmt.Errorf("qbittorent download folder(%s) does not exist", config.DownloadFolder)
// }
// return nil
//}
func validateConfig(config *Config) error {
// Run validations concurrently
@@ -299,15 +299,9 @@ func (c *Config) NeedsSetup() bool {
func (c *Config) updateDebrid(d Debrid) Debrid {
downloadAPIKeys := strings.Split(d.DownloadAPIKeys, ",")
newApiKeys := make([]string, 0, len(downloadAPIKeys))
for _, key := range downloadAPIKeys {
key = strings.TrimSpace(key)
if key != "" {
newApiKeys = append(newApiKeys, key)
}
if len(d.DownloadAPIKeys) == 0 {
d.DownloadAPIKeys = append(d.DownloadAPIKeys, d.APIKey)
}
d.DownloadAPIKeys = strings.Join(newApiKeys, ",")
if !d.UseWebDav {
return d

View File

@@ -3,15 +3,18 @@ package request
import (
"bytes"
"compress/gzip"
"context"
"crypto/tls"
"fmt"
"github.com/goccy/go-json"
"github.com/rs/zerolog"
"github.com/sirrobot01/debrid-blackhole/internal/logger"
"golang.org/x/net/proxy"
"golang.org/x/time/rate"
"io"
"math"
"math/rand"
"net"
"net/http"
"net/url"
"regexp"
@@ -52,11 +55,13 @@ type Client struct {
client *http.Client
rateLimiter *rate.Limiter
headers map[string]string
headersMu sync.RWMutex
maxRetries int
timeout time.Duration
skipTLSVerify bool
retryableStatus map[int]bool
logger zerolog.Logger
proxy string
}
// WithMaxRetries sets the maximum number of retry attempts
@@ -89,12 +94,16 @@ func WithRateLimiter(rl *rate.Limiter) ClientOption {
// WithHeaders sets default headers
func WithHeaders(headers map[string]string) ClientOption {
return func(c *Client) {
c.headersMu.Lock()
c.headers = headers
c.headersMu.Unlock()
}
}
func (c *Client) SetHeader(key, value string) {
c.headersMu.Lock()
c.headers[key] = value
c.headersMu.Unlock()
}
func WithLogger(logger zerolog.Logger) ClientOption {
@@ -118,6 +127,12 @@ func WithRetryableStatus(statusCodes ...int) ClientOption {
}
}
func WithProxy(proxyURL string) ClientOption {
return func(c *Client) {
c.proxy = proxyURL
}
}
// doRequest performs a single HTTP request with rate limiting
func (c *Client) doRequest(req *http.Request) (*http.Response, error) {
if c.rateLimiter != nil {
@@ -154,11 +169,13 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
}
// Apply headers
c.headersMu.RLock()
if c.headers != nil {
for key, value := range c.headers {
req.Header.Set(key, value)
}
}
c.headersMu.RUnlock()
resp, err = c.doRequest(req)
if err != nil {
@@ -256,25 +273,67 @@ func New(options ...ClientOption) *Client {
},
logger: logger.New("request"),
timeout: 60 * time.Second,
proxy: "",
headers: make(map[string]string), // Initialize headers map
}
// Apply options
// default http client
client.client = &http.Client{
Timeout: client.timeout,
}
// Apply options before configuring transport
for _, option := range options {
option(client)
}
// Create transport
transport := &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: client.skipTLSVerify,
},
Proxy: http.ProxyFromEnvironment,
}
// Check if transport was set by WithTransport option
if client.client.Transport == nil {
// No custom transport provided, create the default one
transport := &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: client.skipTLSVerify,
},
}
// Create HTTP client
client.client = &http.Client{
Transport: transport,
Timeout: client.timeout,
// Configure proxy if needed
if client.proxy != "" {
if strings.HasPrefix(client.proxy, "socks5://") {
// Handle SOCKS5 proxy
socksURL, err := url.Parse(client.proxy)
if err != nil {
client.logger.Error().Msgf("Failed to parse SOCKS5 proxy URL: %v", err)
} else {
auth := &proxy.Auth{}
if socksURL.User != nil {
auth.User = socksURL.User.Username()
password, _ := socksURL.User.Password()
auth.Password = password
}
dialer, err := proxy.SOCKS5("tcp", socksURL.Host, auth, proxy.Direct)
if err != nil {
client.logger.Error().Msgf("Failed to create SOCKS5 dialer: %v", err)
} else {
transport.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
return dialer.Dial(network, addr)
}
}
}
} else {
proxyURL, err := url.Parse(client.proxy)
if err != nil {
client.logger.Error().Msgf("Failed to parse proxy URL: %v", err)
} else {
transport.Proxy = http.ProxyURL(proxyURL)
}
}
} else {
transport.Proxy = http.ProxyFromEnvironment
}
// Set the transport to the client
client.client.Transport = transport
}
return client

View File

@@ -11,7 +11,7 @@ var (
MUSICMATCH = "(?i)(\\.)(mp2|mp3|m4a|m4b|m4p|ogg|oga|opus|wma|wav|wv|flac|ape|aif|aiff|aifc)$"
)
var SAMPLEMATCH = `(?i)(^|[\\/]|\s|[(])(?:sample|trailer|thumb|special|extras?)s?([\s._\-/)]|$)`
var SAMPLEMATCH = `(?i)(^|[\\/])(sample|trailer|thumb|special|extras?)s?([\s._-]|$|/)|(\(sample\))|(-\s*sample)`
func RegexMatch(regex string, value string) bool {
re := regexp.MustCompile(regex)

View File

@@ -14,7 +14,6 @@ import (
"path/filepath"
"slices"
"strconv"
"strings"
"sync"
"time"
)
@@ -23,7 +22,7 @@ type AllDebrid struct {
Name string
Host string `json:"host"`
APIKey string
ExtraAPIKeys []string
DownloadKeys []string
DownloadUncached bool
client *request.Client
@@ -34,26 +33,22 @@ type AllDebrid struct {
func New(dc config.Debrid) *AllDebrid {
rl := request.ParseRateLimit(dc.RateLimit)
apiKeys := strings.Split(dc.APIKey, ",")
extraKeys := make([]string, 0)
if len(apiKeys) > 1 {
extraKeys = apiKeys[1:]
}
mainKey := apiKeys[0]
headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", mainKey),
"Authorization": fmt.Sprintf("Bearer %s", dc.APIKey),
}
_log := logger.New(dc.Name)
client := request.New(
request.WithHeaders(headers),
request.WithLogger(_log),
request.WithRateLimiter(rl),
request.WithProxy(dc.Proxy),
)
return &AllDebrid{
Name: "alldebrid",
Host: dc.Host,
APIKey: mainKey,
ExtraAPIKeys: extraKeys,
APIKey: dc.APIKey,
DownloadKeys: dc.DownloadAPIKeys,
DownloadUncached: dc.DownloadUncached,
client: client,
MountPath: dc.Folder,
@@ -256,7 +251,7 @@ func (ad *AllDebrid) GenerateDownloadLinks(t *types.Torrent) error {
for _, file := range t.Files {
go func(file types.File) {
defer wg.Done()
link, err := ad.GetDownloadLink(t, &file)
link, err := ad.GetDownloadLink(t, &file, 0)
if err != nil {
errCh <- err
return
@@ -291,7 +286,7 @@ func (ad *AllDebrid) GenerateDownloadLinks(t *types.Torrent) error {
return nil
}
func (ad *AllDebrid) GetDownloadLink(t *types.Torrent, file *types.File) (string, error) {
func (ad *AllDebrid) GetDownloadLink(t *types.Torrent, file *types.File, index int) (string, error) {
url := fmt.Sprintf("%s/link/unlock", ad.Host)
query := gourl.Values{}
query.Add("link", file.Link)
@@ -367,3 +362,7 @@ func (ad *AllDebrid) CheckLink(link string) error {
func (ad *AllDebrid) GetMountPath() string {
return ad.MountPath
}
func (ad *AllDebrid) GetDownloadKeys() []string {
return ad.DownloadKeys
}

View File

@@ -46,6 +46,7 @@ type CachedTorrent struct {
type downloadLinkCache struct {
Link string
KeyIndex int
ExpiresAt time.Time
}
@@ -68,12 +69,13 @@ type Cache struct {
client types.Client
logger zerolog.Logger
torrents *xsync.MapOf[string, *CachedTorrent] // key: torrent.Id, value: *CachedTorrent
torrentsNames *xsync.MapOf[string, *CachedTorrent] // key: torrent.Name, value: torrent
listings atomic.Value
downloadLinks *xsync.MapOf[string, downloadLinkCache]
PropfindResp *xsync.MapOf[string, PropfindResponse]
folderNaming WebDavFolderNaming
torrents *xsync.MapOf[string, *CachedTorrent] // key: torrent.Id, value: *CachedTorrent
torrentsNames *xsync.MapOf[string, *CachedTorrent] // key: torrent.Name, value: torrent
listings atomic.Value
downloadLinks *xsync.MapOf[string, downloadLinkCache]
invalidDownloadLinks *xsync.MapOf[string, string]
PropfindResp *xsync.MapOf[string, PropfindResponse]
folderNaming WebDavFolderNaming
// repair
repairChan chan RepairRequest
@@ -116,6 +118,7 @@ func New(dc config.Debrid, client types.Client) *Cache {
dir: filepath.Join(cfg.Path, "cache", dc.Name), // path to save cache files
torrents: xsync.NewMapOf[string, *CachedTorrent](),
torrentsNames: xsync.NewMapOf[string, *CachedTorrent](),
invalidDownloadLinks: xsync.NewMapOf[string, string](),
client: client,
logger: logger.New(fmt.Sprintf("%s-webdav", client.GetName())),
workers: workers,
@@ -161,6 +164,8 @@ func (c *Cache) Start(ctx context.Context) error {
func (c *Cache) load() (map[string]*CachedTorrent, error) {
torrents := make(map[string]*CachedTorrent)
var results sync.Map
if err := os.MkdirAll(c.dir, 0755); err != nil {
return torrents, fmt.Errorf("failed to create cache directory: %w", err)
}
@@ -170,54 +175,102 @@ func (c *Cache) load() (map[string]*CachedTorrent, error) {
return torrents, fmt.Errorf("failed to read cache directory: %w", err)
}
now := time.Now()
// Get only json files
var jsonFiles []os.DirEntry
for _, file := range files {
fileName := file.Name()
if file.IsDir() || filepath.Ext(fileName) != ".json" {
continue
}
filePath := filepath.Join(c.dir, fileName)
data, err := os.ReadFile(filePath)
if err != nil {
c.logger.Debug().Err(err).Msgf("Failed to read file: %s", filePath)
continue
}
var ct CachedTorrent
if err := json.Unmarshal(data, &ct); err != nil {
c.logger.Debug().Err(err).Msgf("Failed to unmarshal file: %s", filePath)
continue
}
isComplete := true
if len(ct.Files) != 0 {
// Check if all files are valid, if not, delete the file.json and remove from cache.
for _, f := range ct.Files {
if !f.IsValid() {
isComplete = false
break
}
}
if isComplete {
addedOn, err := time.Parse(time.RFC3339, ct.Added)
if err != nil {
addedOn = now
}
ct.AddedOn = addedOn
ct.IsComplete = true
torrents[ct.Id] = &ct
} else {
// Delete the file if it's not complete
_ = os.Remove(filePath)
}
if !file.IsDir() && filepath.Ext(file.Name()) == ".json" {
jsonFiles = append(jsonFiles, file)
}
}
if len(jsonFiles) == 0 {
return torrents, nil
}
// Create channels with appropriate buffering
workChan := make(chan os.DirEntry, min(c.workers, len(jsonFiles)))
// Create a wait group for workers
var wg sync.WaitGroup
// Start workers
for i := 0; i < c.workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
now := time.Now()
for {
file, ok := <-workChan
if !ok {
return // Channel closed, exit goroutine
}
fileName := file.Name()
filePath := filepath.Join(c.dir, fileName)
data, err := os.ReadFile(filePath)
if err != nil {
c.logger.Debug().Err(err).Msgf("Failed to read file: %s", filePath)
continue
}
var ct CachedTorrent
if err := json.Unmarshal(data, &ct); err != nil {
c.logger.Debug().Err(err).Msgf("Failed to unmarshal file: %s", filePath)
continue
}
isComplete := true
if len(ct.Files) != 0 {
// Check if all files are valid, if not, delete the file.json and remove from cache.
for _, f := range ct.Files {
if f.Link == "" {
isComplete = false
break
}
}
if isComplete {
addedOn, err := time.Parse(time.RFC3339, ct.Added)
if err != nil {
addedOn = now
}
ct.AddedOn = addedOn
ct.IsComplete = true
results.Store(ct.Id, &ct)
} else {
// Delete the file if it's not complete
_ = os.Remove(filePath)
}
}
}
}()
}
// Feed work to workers
for _, file := range jsonFiles {
workChan <- file
}
// Signal workers that no more work is coming
close(workChan)
// Wait for all workers to complete
wg.Wait()
// Convert sync.Map to regular map
results.Range(func(key, value interface{}) bool {
id, _ := key.(string)
torrent, _ := value.(*CachedTorrent)
torrents[id] = torrent
return true
})
return torrents, nil
}
func (c *Cache) Sync() error {
defer c.logger.Info().Msg("WebDav server sync complete")
cachedTorrents, err := c.load()
if err != nil {
c.logger.Debug().Err(err).Msg("Failed to load cache")
@@ -486,38 +539,45 @@ func (c *Cache) saveTorrent(id string, data []byte) {
}
func (c *Cache) ProcessTorrent(t *types.Torrent, refreshRclone bool) error {
if len(t.Files) == 0 {
isComplete := func(files map[string]types.File) bool {
_complete := len(files) > 0
for _, file := range files {
if file.Link == "" {
_complete = false
break
}
}
return _complete
}
if !isComplete(t.Files) {
if err := c.client.UpdateTorrent(t); err != nil {
return fmt.Errorf("failed to update torrent: %w", err)
}
}
// Validate each file in the torrent
isComplete := true
for _, file := range t.Files {
if file.Link == "" {
isComplete = false
continue
}
}
if !isComplete {
c.logger.Debug().Msgf("Torrent %s is not complete, missing link for file %s. Triggering a reinsert", t.Id)
if err := c.ReInsertTorrent(t); err != nil {
c.logger.Error().Err(err).Msgf("Failed to reinsert torrent %s", t.Id)
return fmt.Errorf("failed to reinsert torrent: %w", err)
}
}
if !isComplete(t.Files) {
c.logger.Debug().Msgf("Torrent %s is still not complete. Triggering a reinsert(disabled)", t.Id)
//ct, err := c.reInsertTorrent(t)
//if err != nil {
// c.logger.Debug().Err(err).Msgf("Failed to reinsert torrent %s", t.Id)
// return err
//}
//c.logger.Debug().Msgf("Reinserted torrent %s", ct.Id)
addedOn, err := time.Parse(time.RFC3339, t.Added)
if err != nil {
addedOn = time.Now()
} else {
addedOn, err := time.Parse(time.RFC3339, t.Added)
if err != nil {
addedOn = time.Now()
}
ct := &CachedTorrent{
Torrent: t,
IsComplete: len(t.Files) > 0,
AddedOn: addedOn,
}
c.setTorrent(ct)
}
ct := &CachedTorrent{
Torrent: t,
IsComplete: len(t.Files) > 0,
AddedOn: addedOn,
}
c.setTorrent(ct)
if refreshRclone {
c.refreshListings()
@@ -525,7 +585,7 @@ func (c *Cache) ProcessTorrent(t *types.Torrent, refreshRclone bool) error {
return nil
}
func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) string {
func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string, index int) string {
// Check link cache
if dl := c.checkDownloadLink(fileLink); dl != "" {
@@ -548,44 +608,60 @@ func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) string {
}
}
// If file.Link is still empty, return
if file.Link == "" {
c.logger.Debug().Msgf("File link is empty for %s. Release is probably nerfed", filename)
// Try to reinsert the torrent?
ct, err := c.reInsertTorrent(ct.Torrent)
if err != nil {
c.logger.Debug().Err(err).Msgf("Failed to reinsert torrent %s", ct.Name)
return ""
}
file = ct.Files[filename]
c.logger.Debug().Msgf("Reinserted torrent %s", ct.Name)
}
c.logger.Trace().Msgf("Getting download link for %s", filename)
downloadLink, err := c.client.GetDownloadLink(ct.Torrent, &file)
downloadLink, err := c.client.GetDownloadLink(ct.Torrent, &file, index)
if err != nil {
if errors.Is(err, request.HosterUnavailableError) {
c.logger.Debug().Err(err).Msgf("Hoster is unavailable for %s/%s", ct.Name, filename)
c.logger.Debug().Err(err).Msgf("Hoster is unavailable. Triggering repair for %s", ct.Name)
ct, err := c.reInsertTorrent(ct.Torrent)
if err != nil {
c.logger.Debug().Err(err).Msgf("Failed to reinsert torrent %s", ct.Name)
return ""
}
c.logger.Debug().Msgf("Reinserted torrent %s", ct.Name)
file = ct.Files[filename]
// Retry getting the download link
downloadLink, err = c.client.GetDownloadLink(ct.Torrent, &file, index)
if err != nil {
c.logger.Debug().Err(err).Msgf("Failed to get download link for %s", file.Link)
return ""
}
if downloadLink == "" {
c.logger.Debug().Msgf("Download link is empty for %s", file.Link)
return ""
}
file.DownloadLink = downloadLink
file.Generated = time.Now()
ct.Files[filename] = file
go func() {
c.updateDownloadLink(file.Link, downloadLink, 0)
c.setTorrent(ct)
}()
return file.DownloadLink
} else {
c.logger.Debug().Err(err).Msgf("Failed to get download link for %s", file.Link)
return ""
// This code is commented iut due to the fact that if a torrent link is uncached, it's likely that we can't redownload it again
// Do not attempt to repair the torrent if the hoster is unavailable
// Check link here??
//c.logger.Debug().Err(err).Msgf("Hoster is unavailable. Triggering repair for %s", ct.Name)
//if err := c.repairTorrent(ct); err != nil {
// c.logger.Error().Err(err).Msgf("Failed to trigger repair for %s", ct.Name)
// return ""
//}
//// Generate download link for the file then
//f := ct.Files[filename]
//downloadLink, _ = c.client.GetDownloadLink(ct.Torrent, &f)
//f.DownloadLink = downloadLink
//file.Generated = time.Now()
//ct.Files[filename] = f
//c.updateDownloadLink(file.Link, downloadLink)
//
//go func() {
// go c.setTorrent(ct)
//}()
//
//return downloadLink // Gets download link in the next pass
}
c.logger.Debug().Err(err).Msgf("Failed to get download link for :%s", file.Link)
return ""
}
file.DownloadLink = downloadLink
file.Generated = time.Now()
ct.Files[filename] = file
go func() {
c.updateDownloadLink(file.Link, downloadLink)
c.updateDownloadLink(file.Link, downloadLink, 0)
c.setTorrent(ct)
}()
return file.DownloadLink
@@ -596,7 +672,7 @@ func (c *Cache) GenerateDownloadLinks(t *CachedTorrent) {
c.logger.Error().Err(err).Msg("Failed to generate download links")
}
for _, file := range t.Files {
c.updateDownloadLink(file.Link, file.DownloadLink)
c.updateDownloadLink(file.Link, file.DownloadLink, 0)
}
c.SaveTorrent(t)
@@ -624,22 +700,39 @@ func (c *Cache) AddTorrent(t *types.Torrent) error {
}
func (c *Cache) updateDownloadLink(link, downloadLink string) {
func (c *Cache) updateDownloadLink(link, downloadLink string, keyIndex int) {
c.downloadLinks.Store(link, downloadLinkCache{
Link: downloadLink,
ExpiresAt: time.Now().Add(c.autoExpiresLinksAfter), // Expires in 24 hours
ExpiresAt: time.Now().Add(c.autoExpiresLinksAfter),
KeyIndex: keyIndex,
})
}
func (c *Cache) checkDownloadLink(link string) string {
if dl, ok := c.downloadLinks.Load(link); ok {
if dl.ExpiresAt.After(time.Now()) {
if dl.ExpiresAt.After(time.Now()) && !c.IsDownloadLinkInvalid(dl.Link) {
return dl.Link
}
}
return ""
}
func (c *Cache) RemoveDownloadLink(link string) {
c.downloadLinks.Delete(link)
}
func (c *Cache) MarkDownloadLinkAsInvalid(downloadLink, reason string) {
c.invalidDownloadLinks.Store(downloadLink, reason)
}
func (c *Cache) IsDownloadLinkInvalid(downloadLink string) bool {
if reason, ok := c.invalidDownloadLinks.Load(downloadLink); ok {
c.logger.Debug().Msgf("Download link %s is invalid: %s", downloadLink, reason)
return true
}
return false
}
func (c *Cache) GetClient() types.Client {
return c.client
}
@@ -688,3 +781,7 @@ func (c *Cache) OnRemove(torrentId string) {
func (c *Cache) GetLogger() zerolog.Logger {
return c.logger
}
func (c *Cache) TotalDownloadKeys() int {
return len(c.client.GetDownloadKeys())
}

View File

@@ -8,7 +8,6 @@ import (
"io"
"net/http"
"os"
"path/filepath"
"slices"
"sort"
"strings"
@@ -67,29 +66,6 @@ func (c *Cache) refreshListings() {
}
}
func (c *Cache) resetPropfindResponse() {
// Right now, parents are hardcoded
parents := []string{"__all__", "torrents"}
// Reset only the parent directories
// Convert the parents to a keys
// This is a bit hacky, but it works
// Instead of deleting all the keys, we only delete the parent keys, e.g __all__/ or torrents/
keys := make([]string, 0, len(parents))
for _, p := range parents {
// Construct the key
// construct url
url := filepath.Clean(filepath.Join("/webdav", c.client.GetName(), p))
key0 := fmt.Sprintf("propfind:%s:0", url)
key1 := fmt.Sprintf("propfind:%s:1", url)
keys = append(keys, key0, key1)
}
// Delete the keys
for _, k := range keys {
c.PropfindResp.Delete(k)
}
}
func (c *Cache) refreshTorrents() {
if c.torrentsRefreshMu.TryLock() {
defer c.torrentsRefreshMu.Unlock()
@@ -127,7 +103,7 @@ func (c *Cache) refreshTorrents() {
// Check for deleted torrents
deletedTorrents := make([]string, 0)
for id, _ := range torrents {
for id := range torrents {
if _, ok := idStore[id]; !ok {
deletedTorrents = append(deletedTorrents, id)
}
@@ -264,8 +240,7 @@ func (c *Cache) refreshDownloadLinks() {
ExpiresAt: v.Generated.Add(c.autoExpiresLinksAfter - timeSince),
})
} else {
//c.downloadLinks.Delete(k) don't delete, just log
c.logger.Trace().Msgf("Download link for %s expired", k)
c.downloadLinks.Delete(k)
}
}

View File

@@ -3,6 +3,7 @@ package debrid
import (
"errors"
"fmt"
"github.com/puzpuzpuz/xsync/v3"
"github.com/sirrobot01/debrid-blackhole/internal/request"
"github.com/sirrobot01/debrid-blackhole/internal/utils"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
@@ -35,6 +36,8 @@ func (c *Cache) IsTorrentBroken(t *CachedTorrent, filenames []string) bool {
}
}
files = t.Files
for _, f := range files {
// Check if file link is still missing
if f.Link == "" {
@@ -46,11 +49,7 @@ func (c *Cache) IsTorrentBroken(t *CachedTorrent, filenames []string) bool {
if errors.Is(err, request.HosterUnavailableError) {
isBroken = true
break
} else {
// This might just be a temporary error
}
} else {
// Generate a new download link?
}
}
}
@@ -59,70 +58,48 @@ func (c *Cache) IsTorrentBroken(t *CachedTorrent, filenames []string) bool {
func (c *Cache) repairWorker() {
// This watches a channel for torrents to repair
for {
select {
case req := <-c.repairChan:
torrentId := req.TorrentID
if _, inProgress := c.repairsInProgress.Load(torrentId); inProgress {
c.logger.Debug().Str("torrentId", torrentId).Msg("Skipping duplicate repair request")
continue
}
// Mark as in progress
c.repairsInProgress.Store(torrentId, true)
c.logger.Debug().Str("torrentId", req.TorrentID).Msg("Received repair request")
// Get the torrent from the cache
cachedTorrent, ok := c.torrents.Load(torrentId)
if !ok || cachedTorrent == nil {
c.logger.Warn().Str("torrentId", torrentId).Msg("Torrent not found in cache")
continue
}
switch req.Type {
case RepairTypeReinsert:
c.logger.Debug().Str("torrentId", torrentId).Msg("Reinserting torrent")
c.reInsertTorrent(cachedTorrent)
case RepairTypeDelete:
c.logger.Debug().Str("torrentId", torrentId).Msg("Deleting torrent")
if err := c.DeleteTorrent(torrentId); err != nil {
c.logger.Error().Err(err).Str("torrentId", torrentId).Msg("Failed to delete torrent")
continue
}
}
c.repairsInProgress.Delete(torrentId)
for req := range c.repairChan {
torrentId := req.TorrentID
if _, inProgress := c.repairsInProgress.Load(torrentId); inProgress {
c.logger.Debug().Str("torrentId", torrentId).Msg("Skipping duplicate repair request")
continue
}
// Mark as in progress
c.repairsInProgress.Store(torrentId, true)
c.logger.Debug().Str("torrentId", req.TorrentID).Msg("Received repair request")
// Get the torrent from the cache
cachedTorrent, ok := c.torrents.Load(torrentId)
if !ok || cachedTorrent == nil {
c.logger.Warn().Str("torrentId", torrentId).Msg("Torrent not found in cache")
continue
}
switch req.Type {
case RepairTypeReinsert:
c.logger.Debug().Str("torrentId", torrentId).Msg("Reinserting torrent")
var err error
cachedTorrent, err = c.reInsertTorrent(cachedTorrent.Torrent)
if err != nil {
c.logger.Error().Err(err).Str("torrentId", cachedTorrent.Id).Msg("Failed to reinsert torrent")
continue
}
case RepairTypeDelete:
c.logger.Debug().Str("torrentId", torrentId).Msg("Deleting torrent")
if err := c.DeleteTorrent(torrentId); err != nil {
c.logger.Error().Err(err).Str("torrentId", torrentId).Msg("Failed to delete torrent")
continue
}
}
c.repairsInProgress.Delete(torrentId)
}
}
func (c *Cache) reInsertTorrent(t *CachedTorrent) {
// Reinsert the torrent into the cache
c.torrents.Store(t.Id, t)
c.logger.Debug().Str("torrentId", t.Id).Msg("Reinserted torrent into cache")
}
func (c *Cache) submitForRepair(repairType RepairType, torrentId, fileName string) {
// Submitting a torrent for repair.Not used yet
// Check if already in progress before even submitting
if _, inProgress := c.repairsInProgress.Load(torrentId); inProgress {
c.logger.Debug().Str("torrentID", torrentId).Msg("Repair already in progress")
return
}
select {
case c.repairChan <- RepairRequest{TorrentID: torrentId, FileName: fileName}:
c.logger.Debug().Str("torrentID", torrentId).Msg("Submitted for repair")
default:
c.logger.Warn().Str("torrentID", torrentId).Msg("Repair channel full, skipping repair request")
}
}
func (c *Cache) ReInsertTorrent(torrent *types.Torrent) error {
func (c *Cache) reInsertTorrent(torrent *types.Torrent) (*CachedTorrent, error) {
// Check if Magnet is not empty, if empty, reconstruct the magnet
if _, ok := c.repairsInProgress.Load(torrent.Id); ok {
return fmt.Errorf("repair already in progress for torrent %s", torrent.Id)
return nil, fmt.Errorf("repair already in progress for torrent %s", torrent.Id)
}
if torrent.Magnet == nil {
@@ -130,8 +107,12 @@ func (c *Cache) ReInsertTorrent(torrent *types.Torrent) error {
}
oldID := torrent.Id
defer c.repairsInProgress.Delete(oldID)
defer c.DeleteTorrent(oldID)
defer func() {
err := c.DeleteTorrent(oldID)
if err != nil {
c.logger.Error().Err(err).Str("torrentId", oldID).Msg("Failed to delete old torrent")
}
}()
// Submit the magnet to the debrid service
torrent.Id = ""
@@ -139,12 +120,12 @@ func (c *Cache) ReInsertTorrent(torrent *types.Torrent) error {
torrent, err = c.client.SubmitMagnet(torrent)
if err != nil {
// Remove the old torrent from the cache and debrid service
return fmt.Errorf("failed to submit magnet: %w", err)
return nil, fmt.Errorf("failed to submit magnet: %w", err)
}
// Check if the torrent was submitted
if torrent == nil || torrent.Id == "" {
return fmt.Errorf("failed to submit magnet: empty torrent")
return nil, fmt.Errorf("failed to submit magnet: empty torrent")
}
torrent.DownloadUncached = false // Set to false, avoid re-downloading
torrent, err = c.client.CheckStatus(torrent, true)
@@ -152,24 +133,22 @@ func (c *Cache) ReInsertTorrent(torrent *types.Torrent) error {
// Torrent is likely in progress
_ = c.DeleteTorrent(torrent.Id)
return fmt.Errorf("failed to check status: %w", err)
return nil, fmt.Errorf("failed to check status: %w", err)
}
if torrent == nil {
return fmt.Errorf("failed to check status: empty torrent")
}
for _, file := range torrent.Files {
if file.Link == "" {
c.logger.Debug().Msgf("Torrent %s is still not complete, missing link for file %s.", torrent.Name, file.Name)
// Delete the torrent from the cache
_ = c.DeleteTorrent(torrent.Id)
return fmt.Errorf("torrent %s is still not complete, missing link for file %s", torrent.Name, file.Name)
}
return nil, fmt.Errorf("failed to check status: empty torrent")
}
// Update the torrent in the cache
addedOn, err := time.Parse(time.RFC3339, torrent.Added)
for _, f := range torrent.Files {
if f.Link == "" {
// Delete the new torrent
_ = c.DeleteTorrent(torrent.Id)
return nil, fmt.Errorf("failed to reinsert torrent: empty link")
}
}
if err != nil {
addedOn = time.Now()
}
@@ -180,5 +159,17 @@ func (c *Cache) ReInsertTorrent(torrent *types.Torrent) error {
}
c.setTorrent(ct)
c.refreshListings()
return ct, nil
}
func (c *Cache) refreshDownloadLink(link string) error {
// A generated download link has being limited
// Generate a new one with other API keys
// Temporarily remove the old one
return nil
}
func (c *Cache) resetDownloadLinks() {
c.invalidDownloadLinks = xsync.NewMapOf[string, string]()
c.downloadLinks = xsync.NewMapOf[string, downloadLinkCache]()
}

View File

@@ -13,11 +13,8 @@ func (c *Cache) refreshDownloadLinksWorker() {
refreshTicker := time.NewTicker(c.downloadLinksRefreshInterval)
defer refreshTicker.Stop()
for {
select {
case <-refreshTicker.C:
c.refreshDownloadLinks()
}
for range refreshTicker.C {
c.refreshDownloadLinks()
}
}
@@ -25,10 +22,7 @@ func (c *Cache) refreshTorrentsWorker() {
refreshTicker := time.NewTicker(c.torrentRefreshInterval)
defer refreshTicker.Stop()
for {
select {
case <-refreshTicker.C:
c.refreshTorrents()
}
for range refreshTicker.C {
c.refreshTorrents()
}
}

View File

@@ -21,7 +21,7 @@ type DebridLink struct {
Name string
Host string `json:"host"`
APIKey string
ExtraAPIKeys []string
DownloadKeys []string
DownloadUncached bool
client *request.Client
@@ -243,7 +243,7 @@ func (dl *DebridLink) GetDownloads() (map[string]types.DownloadLinks, error) {
return nil, nil
}
func (dl *DebridLink) GetDownloadLink(t *types.Torrent, file *types.File) (string, error) {
func (dl *DebridLink) GetDownloadLink(t *types.Torrent, file *types.File, index int) (string, error) {
return file.DownloadLink, nil
}
@@ -261,14 +261,9 @@ func (dl *DebridLink) GetDownloadUncached() bool {
func New(dc config.Debrid) *DebridLink {
rl := request.ParseRateLimit(dc.RateLimit)
apiKeys := strings.Split(dc.APIKey, ",")
extraKeys := make([]string, 0)
if len(apiKeys) > 1 {
extraKeys = apiKeys[1:]
}
mainKey := apiKeys[0]
headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", mainKey),
"Authorization": fmt.Sprintf("Bearer %s", dc.APIKey),
"Content-Type": "application/json",
}
_log := logger.New(dc.Name)
@@ -276,12 +271,13 @@ func New(dc config.Debrid) *DebridLink {
request.WithHeaders(headers),
request.WithLogger(_log),
request.WithRateLimiter(rl),
request.WithProxy(dc.Proxy),
)
return &DebridLink{
Name: "debridlink",
Host: dc.Host,
APIKey: mainKey,
ExtraAPIKeys: extraKeys,
APIKey: dc.APIKey,
DownloadKeys: dc.DownloadAPIKeys,
DownloadUncached: dc.DownloadUncached,
client: client,
MountPath: dc.Folder,
@@ -371,3 +367,7 @@ func (dl *DebridLink) CheckLink(link string) error {
func (dl *DebridLink) GetMountPath() string {
return dl.MountPath
}
func (dl *DebridLink) GetDownloadKeys() []string {
return dl.DownloadKeys
}

View File

@@ -1,6 +1,7 @@
package realdebrid
import (
"errors"
"fmt"
"github.com/goccy/go-json"
"github.com/rs/zerolog"
@@ -25,7 +26,7 @@ type RealDebrid struct {
Host string `json:"host"`
APIKey string
ExtraAPIKeys []string // This is used for bandwidth
DownloadKeys []string // This is used for bandwidth
DownloadUncached bool
client *request.Client
@@ -43,45 +44,58 @@ func (r *RealDebrid) GetLogger() zerolog.Logger {
return r.logger
}
func getSelectedFiles(t *types.Torrent, data TorrentInfo) map[string]types.File {
selectedFiles := make([]types.File, 0)
for _, f := range data.Files {
if f.Selected == 1 {
name := filepath.Base(f.Path)
file := types.File{
Name: name,
Path: name,
Size: f.Bytes,
Id: strconv.Itoa(f.ID),
}
selectedFiles = append(selectedFiles, file)
}
}
files := make(map[string]types.File)
for index, f := range selectedFiles {
if index >= len(data.Links) {
break
}
f.Link = data.Links[index]
files[f.Name] = f
}
return files
}
// getTorrentFiles returns a list of torrent files from the torrent info
// validate is used to determine if the files should be validated
// if validate is false, selected files will be returned
func getTorrentFiles(t *types.Torrent, data TorrentInfo, validate bool) map[string]types.File {
func getTorrentFiles(t *types.Torrent, data TorrentInfo) map[string]types.File {
files := make(map[string]types.File)
cfg := config.Get()
idx := 0
for _, f := range data.Files {
name := filepath.Base(f.Path)
if validate {
if utils.IsSampleFile(f.Path) {
// Skip sample files
continue
}
if !cfg.IsAllowedFile(name) {
continue
}
if !cfg.IsSizeAllowed(f.Bytes) {
continue
}
} else {
if f.Selected == 0 {
continue
}
if utils.IsSampleFile(f.Path) {
// Skip sample files
continue
}
fileId := f.ID
_link := ""
if len(data.Links) > idx {
_link = data.Links[idx]
if !cfg.IsAllowedFile(name) {
continue
}
if !cfg.IsSizeAllowed(f.Bytes) {
continue
}
file := types.File{
Name: name,
Path: name,
Size: f.Bytes,
Id: strconv.Itoa(fileId),
Link: _link,
Id: strconv.Itoa(f.ID),
}
files[name] = file
idx++
@@ -182,7 +196,7 @@ func (r *RealDebrid) UpdateTorrent(t *types.Torrent) error {
t.MountPath = r.MountPath
t.Debrid = r.Name
t.Added = data.Added
t.Files = getTorrentFiles(t, data, false) // Get selected files
t.Files = getSelectedFiles(t, data) // Get selected files
return nil
}
@@ -213,7 +227,7 @@ func (r *RealDebrid) CheckStatus(t *types.Torrent, isSymlink bool) (*types.Torre
t.Debrid = r.Name
t.MountPath = r.MountPath
if status == "waiting_files_selection" {
t.Files = getTorrentFiles(t, data, true)
t.Files = getTorrentFiles(t, data)
if len(t.Files) == 0 {
return t, fmt.Errorf("no video files found")
}
@@ -231,7 +245,7 @@ func (r *RealDebrid) CheckStatus(t *types.Torrent, isSymlink bool) (*types.Torre
return t, err
}
} else if status == "downloaded" {
t.Files = getTorrentFiles(t, data, false) // Get selected files
t.Files = getSelectedFiles(t, data) // Get selected files
r.logger.Info().Msgf("Torrent: %s downloaded to RD", t.Name)
if !isSymlink {
err = r.GenerateDownloadLinks(t)
@@ -273,7 +287,7 @@ func (r *RealDebrid) GenerateDownloadLinks(t *types.Torrent) error {
go func(file types.File) {
defer wg.Done()
link, err := r.GetDownloadLink(t, &file)
link, err := r.GetDownloadLink(t, &file, 0)
if err != nil {
errCh <- err
return
@@ -333,6 +347,7 @@ func (r *RealDebrid) _getDownloadLink(file *types.File) (string, error) {
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
// Read the response body to get the error message
b, err := io.ReadAll(resp.Body)
@@ -348,12 +363,12 @@ func (r *RealDebrid) _getDownloadLink(file *types.File) (string, error) {
return "", request.TrafficExceededError
case 24:
return "", request.HosterUnavailableError // Link has been nerfed
case 19:
return "", request.HosterUnavailableError // File has been removed
default:
return "", fmt.Errorf("realdebrid API error: %d", resp.StatusCode)
}
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
@@ -366,20 +381,28 @@ func (r *RealDebrid) _getDownloadLink(file *types.File) (string, error) {
}
func (r *RealDebrid) GetDownloadLink(t *types.Torrent, file *types.File) (string, error) {
func (r *RealDebrid) GetDownloadLink(t *types.Torrent, file *types.File, index int) (string, error) {
if index >= len(r.DownloadKeys) {
// Reset to first key
index = 0
}
r.client.SetHeader("Authorization", fmt.Sprintf("Bearer %s", r.DownloadKeys[index]))
link, err := r._getDownloadLink(file)
if err == nil {
if err == nil && link != "" {
return link, nil
}
for _, key := range r.ExtraAPIKeys {
r.client.SetHeader("Authorization", fmt.Sprintf("Bearer %s", key))
if link, err := r._getDownloadLink(file); err == nil {
return link, nil
if err != nil && errors.Is(err, request.HosterUnavailableError) {
// Try the next key
if index+1 < len(r.DownloadKeys) {
return r.GetDownloadLink(t, file, index+1)
}
}
// Reset to main API key
// If we reach here, it means we have tried all keys
// and none of them worked, or the error is not related to the keys
// Reset to the first key
r.client.SetHeader("Authorization", fmt.Sprintf("Bearer %s", r.APIKey))
return "", err
return link, err
}
func (r *RealDebrid) GetCheckCached() bool {
@@ -553,11 +576,7 @@ func (r *RealDebrid) GetMountPath() string {
func New(dc config.Debrid) *RealDebrid {
rl := request.ParseRateLimit(dc.RateLimit)
apiKeys := strings.Split(dc.DownloadAPIKeys, ",")
extraKeys := make([]string, 0)
if len(apiKeys) > 1 {
extraKeys = apiKeys[1:]
}
headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", dc.APIKey),
}
@@ -568,12 +587,13 @@ func New(dc config.Debrid) *RealDebrid {
request.WithLogger(_log),
request.WithMaxRetries(5),
request.WithRetryableStatus(429),
request.WithProxy(dc.Proxy),
)
return &RealDebrid{
Name: "realdebrid",
Host: dc.Host,
APIKey: dc.APIKey,
ExtraAPIKeys: extraKeys,
DownloadKeys: dc.DownloadAPIKeys,
DownloadUncached: dc.DownloadUncached,
client: client,
MountPath: dc.Folder,
@@ -581,3 +601,7 @@ func New(dc config.Debrid) *RealDebrid {
CheckCached: dc.CheckCached,
}
}
func (r *RealDebrid) GetDownloadKeys() []string {
return r.DownloadKeys
}

View File

@@ -25,7 +25,7 @@ type Torbox struct {
Name string
Host string `json:"host"`
APIKey string
ExtraAPIKeys []string
DownloadKeys []string
DownloadUncached bool
client *request.Client
@@ -36,27 +36,23 @@ type Torbox struct {
func New(dc config.Debrid) *Torbox {
rl := request.ParseRateLimit(dc.RateLimit)
apiKeys := strings.Split(dc.APIKey, ",")
extraKeys := make([]string, 0)
if len(apiKeys) > 1 {
extraKeys = apiKeys[1:]
}
mainKey := apiKeys[0]
headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", mainKey),
"Authorization": fmt.Sprintf("Bearer %s", dc.APIKey),
}
_log := logger.New(dc.Name)
client := request.New(
request.WithHeaders(headers),
request.WithRateLimiter(rl),
request.WithLogger(_log),
request.WithProxy(dc.Proxy),
)
return &Torbox{
Name: "torbox",
Host: dc.Host,
APIKey: mainKey,
ExtraAPIKeys: extraKeys,
APIKey: dc.APIKey,
DownloadKeys: dc.DownloadAPIKeys,
DownloadUncached: dc.DownloadUncached,
client: client,
MountPath: dc.Folder,
@@ -284,7 +280,7 @@ func (tb *Torbox) GenerateDownloadLinks(t *types.Torrent) error {
for _, file := range t.Files {
go func() {
defer wg.Done()
link, err := tb.GetDownloadLink(t, &file)
link, err := tb.GetDownloadLink(t, &file, 0)
if err != nil {
errCh <- err
return
@@ -316,7 +312,7 @@ func (tb *Torbox) GenerateDownloadLinks(t *types.Torrent) error {
return nil
}
func (tb *Torbox) GetDownloadLink(t *types.Torrent, file *types.File) (string, error) {
func (tb *Torbox) GetDownloadLink(t *types.Torrent, file *types.File, index int) (string, error) {
url := fmt.Sprintf("%s/api/torrents/requestdl/", tb.Host)
query := gourl.Values{}
query.Add("torrent_id", t.Id)
@@ -366,3 +362,7 @@ func (tb *Torbox) CheckLink(link string) error {
func (tb *Torbox) GetMountPath() string {
return tb.MountPath
}
func (tb *Torbox) GetDownloadKeys() []string {
return tb.DownloadKeys
}

View File

@@ -8,7 +8,7 @@ type Client interface {
SubmitMagnet(tr *Torrent) (*Torrent, error)
CheckStatus(tr *Torrent, isSymlink bool) (*Torrent, error)
GenerateDownloadLinks(tr *Torrent) error
GetDownloadLink(tr *Torrent, file *File) (string, error)
GetDownloadLink(tr *Torrent, file *File, index int) (string, error)
DeleteTorrent(torrentId string) error
IsAvailable(infohashes []string) map[string]bool
GetCheckCached() bool
@@ -21,4 +21,5 @@ type Client interface {
GetDownloads() (map[string]DownloadLinks, error)
CheckLink(link string) error
GetMountPath() string
GetDownloadKeys() []string
}

View File

@@ -321,7 +321,7 @@ func (p *Proxy) Start(ctx context.Context) error {
proxy.OnRequest(goproxy.ReqHostMatches(regexp.MustCompile("^.443$"))).HandleConnect(goproxy.AlwaysMitm)
proxy.OnResponse(
UrlMatches(regexp.MustCompile("^.*/api\\?t=(search|tvsearch|movie)(&.*)?$")),
UrlMatches(regexp.MustCompile(`^.*/api\?t=(search|tvsearch|movie)(&.*)?$`)),
goproxy.StatusCodeIs(http.StatusOK, http.StatusAccepted)).DoFunc(
func(resp *http.Response, ctx *goproxy.ProxyCtx) *http.Response {
return p.ProcessResponse(resp)

View File

@@ -225,14 +225,15 @@ func (q *QBit) preCacheFile(name string, filePaths []string) error {
for _, filePath := range filePaths {
func() {
file, err := os.Open(filePath)
defer file.Close()
defer func(file *os.File) {
_ = file.Close()
}(file)
if err != nil {
return
}
// Pre-cache the file header (first 256KB) using 16KB chunks.
q.readSmallChunks(file, 0, 256*1024, 16*1024)
q.readSmallChunks(file, 1024*1024, 64*1024, 16*1024)
return
}()
}
@@ -264,5 +265,4 @@ func (q *QBit) readSmallChunks(file *os.File, startPos int64, totalToRead int, c
bytesRemaining -= n
}
return
}

View File

@@ -1,149 +1,159 @@
package repair
import (
"context"
"fmt"
"github.com/sirrobot01/debrid-blackhole/internal/request"
"golang.org/x/sync/errgroup"
"runtime"
"sync"
"time"
)
//func (r *Repair) clean(job *Job) error {
// // Create a new error group
// g, ctx := errgroup.WithContext(context.Background())
//
// uniqueItems := make(map[string]string)
// mu := sync.Mutex{}
//
// // Limit concurrent goroutines
// g.SetLimit(10)
//
// for _, a := range job.Arrs {
// a := a // Capture range variable
// g.Go(func() error {
// // Check if context was canceled
// select {
// case <-ctx.Done():
// return ctx.Err()
// default:
// }
//
// items, err := r.cleanArr(job, a, "")
// if err != nil {
// r.logger.Error().Err(err).Msgf("Error cleaning %s", a)
// return err
// }
//
// // Safely append the found items to the shared slice
// if len(items) > 0 {
// mu.Lock()
// for k, v := range items {
// uniqueItems[k] = v
// }
// mu.Unlock()
// }
//
// return nil
// })
// }
//
// if err := g.Wait(); err != nil {
// return err
// }
//
// if len(uniqueItems) == 0 {
// job.CompletedAt = time.Now()
// job.Status = JobCompleted
//
// go func() {
// if err := request.SendDiscordMessage("repair_clean_complete", "success", job.discordContext()); err != nil {
// r.logger.Error().Msgf("Error sending discord message: %v", err)
// }
// }()
//
// return nil
// }
//
// cache := r.deb.Caches["realdebrid"]
// if cache == nil {
// return fmt.Errorf("cache not found")
// }
// torrents := cache.GetTorrents()
//
// dangling := make([]string, 0)
// for _, t := range torrents {
// if _, ok := uniqueItems[t.Name]; !ok {
// dangling = append(dangling, t.Id)
// }
// }
//
// r.logger.Info().Msgf("Found %d delapitated items", len(dangling))
//
// if len(dangling) == 0 {
// job.CompletedAt = time.Now()
// job.Status = JobCompleted
// return nil
// }
//
// client := r.deb.Clients["realdebrid"]
// if client == nil {
// return fmt.Errorf("client not found")
// }
// for _, id := range dangling {
// err := client.DeleteTorrent(id)
// if err != nil {
// return err
// }
// }
//
// return nil
//}
//
//func (r *Repair) cleanArr(j *Job, _arr string, tmdbId string) (map[string]string, error) {
// uniqueItems := make(map[string]string)
// a := r.arrs.Get(_arr)
//
// r.logger.Info().Msgf("Starting repair for %s", a.Name)
// media, err := a.GetMedia(tmdbId)
// if err != nil {
// r.logger.Info().Msgf("Failed to get %s media: %v", a.Name, err)
// return uniqueItems, err
// }
//
// // Create a new error group
// g, ctx := errgroup.WithContext(context.Background())
//
// mu := sync.Mutex{}
//
// // Limit concurrent goroutines
// g.SetLimit(runtime.NumCPU() * 4)
//
// for _, m := range media {
// m := m // Create a new variable scoped to the loop iteration
// g.Go(func() error {
// // Check if context was canceled
// select {
// case <-ctx.Done():
// return ctx.Err()
// default:
// }
//
// u := r.getUniquePaths(m)
// for k, v := range u {
// mu.Lock()
// uniqueItems[k] = v
// mu.Unlock()
// }
// return nil
// })
// }
//
// if err := g.Wait(); err != nil {
// return uniqueItems, err
// }
//
// r.logger.Info().Msgf("Repair completed for %s. %d unique items", a.Name, len(uniqueItems))
// return uniqueItems, nil
//}
func (r *Repair) clean(job *Job) error {
// Create a new error group
g, ctx := errgroup.WithContext(context.Background())
uniqueItems := make(map[string]string)
mu := sync.Mutex{}
// Limit concurrent goroutines
g.SetLimit(10)
for _, a := range job.Arrs {
a := a // Capture range variable
g.Go(func() error {
// Check if context was canceled
select {
case <-ctx.Done():
return ctx.Err()
default:
}
items, err := r.cleanArr(job, a, "")
if err != nil {
r.logger.Error().Err(err).Msgf("Error cleaning %s", a)
return err
}
// Safely append the found items to the shared slice
if len(items) > 0 {
mu.Lock()
for k, v := range items {
uniqueItems[k] = v
}
mu.Unlock()
}
return nil
})
}
if err := g.Wait(); err != nil {
return err
}
if len(uniqueItems) == 0 {
job.CompletedAt = time.Now()
job.Status = JobCompleted
go func() {
if err := request.SendDiscordMessage("repair_clean_complete", "success", job.discordContext()); err != nil {
r.logger.Error().Msgf("Error sending discord message: %v", err)
}
}()
return nil
}
cache := r.deb.Caches["realdebrid"]
if cache == nil {
return fmt.Errorf("cache not found")
}
torrents := cache.GetTorrents()
dangling := make([]string, 0)
for _, t := range torrents {
if _, ok := uniqueItems[t.Name]; !ok {
dangling = append(dangling, t.Id)
}
}
r.logger.Info().Msgf("Found %d delapitated items", len(dangling))
if len(dangling) == 0 {
job.CompletedAt = time.Now()
job.Status = JobCompleted
return nil
}
client := r.deb.Clients["realdebrid"]
if client == nil {
return fmt.Errorf("client not found")
}
for _, id := range dangling {
err := client.DeleteTorrent(id)
if err != nil {
return err
}
}
return nil
}
func (r *Repair) cleanArr(j *Job, _arr string, tmdbId string) (map[string]string, error) {
uniqueItems := make(map[string]string)
a := r.arrs.Get(_arr)
r.logger.Info().Msgf("Starting repair for %s", a.Name)
media, err := a.GetMedia(tmdbId)
if err != nil {
r.logger.Info().Msgf("Failed to get %s media: %v", a.Name, err)
return uniqueItems, err
}
// Create a new error group
g, ctx := errgroup.WithContext(context.Background())
mu := sync.Mutex{}
// Limit concurrent goroutines
g.SetLimit(runtime.NumCPU() * 4)
for _, m := range media {
m := m // Create a new variable scoped to the loop iteration
g.Go(func() error {
// Check if context was canceled
select {
case <-ctx.Done():
return ctx.Err()
default:
}
u := r.getUniquePaths(m)
for k, v := range u {
mu.Lock()
uniqueItems[k] = v
mu.Unlock()
}
return nil
})
}
if err := g.Wait(); err != nil {
return uniqueItems, err
}
r.logger.Info().Msgf("Repair completed for %s. %d unique items", a.Name, len(uniqueItems))
return uniqueItems, nil
}
//func (r *Repair) getUniquePaths(media arr.Content) map[string]string {
// // Use zurg setup to check file availability with zurg
// // This reduces bandwidth usage significantly
//
// uniqueParents := make(map[string]string)
// files := media.Files
// for _, file := range files {
// target := getSymlinkTarget(file.Path)
// if target != "" {
// file.IsSymlink = true
// dir, f := filepath.Split(target)
// parent := filepath.Base(filepath.Clean(dir))
// // Set target path folder/file.mkv
// file.TargetPath = f
// uniqueParents[parent] = target
// }
// }
// return uniqueParents
//}

View File

@@ -434,26 +434,6 @@ func (r *Repair) repairArr(j *Job, _arr string, tmdbId string) ([]arr.ContentFil
return brokenItems, nil
}
func (r *Repair) getUniquePaths(media arr.Content) map[string]string {
// Use zurg setup to check file availability with zurg
// This reduces bandwidth usage significantly
uniqueParents := make(map[string]string)
files := media.Files
for _, file := range files {
target := getSymlinkTarget(file.Path)
if target != "" {
file.IsSymlink = true
dir, f := filepath.Split(target)
parent := filepath.Base(filepath.Clean(dir))
// Set target path folder/file.mkv
file.TargetPath = f
uniqueParents[parent] = target
}
}
return uniqueParents
}
func (r *Repair) isMediaAccessible(m arr.Content) bool {
files := m.Files
if len(files) == 0 {
@@ -758,7 +738,7 @@ func (r *Repair) saveToFile() {
if err != nil {
r.logger.Debug().Err(err).Msg("Failed to marshal jobs")
}
err = os.WriteFile(r.filename, data, 0644)
_ = os.WriteFile(r.filename, data, 0644)
}
func (r *Repair) loadFromFile() {

View File

@@ -3,13 +3,20 @@ package webdav
import (
"crypto/tls"
"fmt"
"github.com/sirrobot01/debrid-blackhole/internal/request"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/debrid"
"io"
"net/http"
"os"
"sync"
"time"
)
var (
sdClient *request.Client
once sync.Once
)
var sharedClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
@@ -26,6 +33,32 @@ var sharedClient = &http.Client{
Timeout: 60 * time.Second,
}
func getClient() *request.Client {
once.Do(func() {
var transport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
Proxy: http.ProxyFromEnvironment,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 20,
MaxConnsPerHost: 50,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DisableKeepAlives: false,
}
sdClient = request.New(
request.WithTransport(transport),
request.WithTimeout(30*time.Second),
request.WithHeaders(map[string]string{
"Accept": "*/*",
"Connection": "keep-alive",
}),
)
})
return sdClient
}
type File struct {
cache *debrid.Cache
fileId string
@@ -47,6 +80,8 @@ type File struct {
link string
}
// You can not download this file because you have exceeded your traffic on this hoster
// File interface implementations for File
func (f *File) Close() error {
@@ -57,21 +92,113 @@ func (f *File) Close() error {
return nil
}
func (f *File) GetDownloadLink() string {
func (f *File) getDownloadLink(index int) string {
// Check if we already have a final URL cached
if f.downloadLink != "" && isValidURL(f.downloadLink) {
return f.downloadLink
}
downloadLink := f.cache.GetDownloadLink(f.torrentId, f.name, f.link)
downloadLink := f.cache.GetDownloadLink(f.torrentId, f.name, f.link, index)
if downloadLink != "" && isValidURL(downloadLink) {
f.downloadLink = downloadLink
return downloadLink
}
return ""
}
func (f *File) stream(index int) (*http.Response, error) {
client := sharedClient // Might be replaced with the custom client
_log := f.cache.GetLogger()
var (
err error
downloadLink string
)
downloadLink = f.getDownloadLink(index) // Uses the first API key
if downloadLink == "" {
_log.Error().Msgf("Failed to get download link for %s", f.name)
return nil, fmt.Errorf("failed to get download link")
}
req, err := http.NewRequest("GET", downloadLink, nil)
if err != nil {
_log.Error().Msgf("Failed to create HTTP request: %s", err)
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}
if f.offset > 0 {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", f.offset))
}
resp, err := client.Do(req)
if err != nil {
return resp, fmt.Errorf("HTTP request error: %w", err)
}
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
closeResp := func() {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}
if resp.StatusCode == http.StatusServiceUnavailable {
closeResp()
// Read the body to consume the response
f.cache.MarkDownloadLinkAsInvalid(downloadLink, "bandwidth limit reached")
// Generate a new download link
if index < f.cache.TotalDownloadKeys()-1 {
// Retry with a different download key
_log.Debug().Str("link", downloadLink).
Str("torrentId", f.torrentId).
Str("fileId", f.fileId).
Msgf("Bandwidth limit reached, retrying with another API key, attempt %d", index+1)
return f.stream(index + 1) // Retry with the next download key
} else {
// No more download keys available, return an error
_log.Error().Msgf("Bandwidth limit reached for all download keys")
return nil, fmt.Errorf("bandwidth_limit_exceeded")
}
} else if resp.StatusCode == http.StatusNotFound {
closeResp()
// Mark download link as not found
// Regenerate a new download link
f.cache.MarkDownloadLinkAsInvalid(downloadLink, "link_not_found")
f.cache.RemoveDownloadLink(f.link) // Remove the link from the cache
// Generate a new download link
downloadLink = f.getDownloadLink(index)
if downloadLink == "" {
_log.Error().Msgf("Failed to get download link for %s", f.name)
return nil, fmt.Errorf("failed to get download link")
}
req, err = http.NewRequest("GET", downloadLink, nil)
if err != nil {
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}
if f.offset > 0 {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", f.offset))
}
resp, err = client.Do(req)
if err != nil {
return resp, fmt.Errorf("HTTP request error: %w", err)
}
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
closeResp()
// Read the body to consume the response
f.cache.MarkDownloadLinkAsInvalid(downloadLink, "link_not_found")
return resp, fmt.Errorf("link not found")
}
return resp, nil
} else {
closeResp()
return resp, fmt.Errorf("unexpected HTTP status: %d", resp.StatusCode)
}
}
return resp, nil
}
func (f *File) Read(p []byte) (n int, err error) {
if f.isDir {
return 0, os.ErrInvalid
@@ -96,39 +223,15 @@ func (f *File) Read(p []byte) (n int, err error) {
f.reader = nil
}
downloadLink := f.GetDownloadLink()
if downloadLink == "" {
return 0, io.EOF
}
req, err := http.NewRequest("GET", downloadLink, nil)
// Make the request to get the file
resp, err := f.stream(0)
if err != nil {
return 0, fmt.Errorf("failed to create HTTP request: %w", err)
return 0, err
}
if resp == nil {
return 0, fmt.Errorf("failed to get response")
}
if f.offset > 0 {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", f.offset))
}
// Set headers as needed
req.Header.Set("Connection", "keep-alive")
req.Header.Set("Accept", "*/*")
resp, err := sharedClient.Do(req)
if err != nil {
return 0, fmt.Errorf("HTTP request error: %w", err)
}
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
resp.Body.Close()
_log := f.cache.GetLogger()
_log.Debug().
Str("downloadLink", downloadLink).
Str("link", f.link).
Str("torrentId", f.torrentId).
Msgf("Unexpected HTTP status: %d", resp.StatusCode)
return 0, fmt.Errorf("unexpected HTTP status: %d", resp.StatusCode)
}
f.reader = resp.Body
f.seekPending = false
}

View File

@@ -18,17 +18,14 @@ import (
path "path/filepath"
"slices"
"strings"
"sync"
"time"
)
type Handler struct {
Name string
logger zerolog.Logger
cache *debrid.Cache
lastRefresh time.Time
refreshMutex sync.Mutex
RootPath string
Name string
logger zerolog.Logger
cache *debrid.Cache
RootPath string
}
func NewHandler(name string, cache *debrid.Cache, logger zerolog.Logger) *Handler {
@@ -299,11 +296,11 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Vary", "Accept-Encoding")
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(gzippedData)))
w.WriteHeader(responseRecorder.Code)
w.Write(gzippedData)
_, _ = w.Write(gzippedData)
} else {
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(responseData)))
w.WriteHeader(responseRecorder.Code)
w.Write(responseData)
_, _ = w.Write(responseData)
}
return
}
@@ -422,11 +419,11 @@ func (h *Handler) serveFromCacheIfValid(w http.ResponseWriter, r *http.Request,
w.Header().Set("Vary", "Accept-Encoding")
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(respCache.GzippedData)))
w.WriteHeader(http.StatusOK)
w.Write(respCache.GzippedData)
_, _ = w.Write(respCache.GzippedData)
} else {
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(respCache.Data)))
w.WriteHeader(http.StatusOK)
w.Write(respCache.Data)
_, _ = w.Write(respCache.Data)
}
return true
}