- Refractor code

- Add a better logging for 429 when streaming
- Fix minor issues
This commit is contained in:
Mukhtar Akere
2025-04-01 06:37:10 +01:00
parent 8bf164451c
commit 7d954052ae
28 changed files with 214 additions and 179 deletions
+73 -51
View File
@@ -9,14 +9,14 @@ import (
"github.com/sirrobot01/debrid-blackhole/internal/request"
"github.com/sirrobot01/debrid-blackhole/internal/utils"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"slices"
"strings"
"time"
"net/http"
gourl "net/url"
"path/filepath"
"slices"
"strconv"
"strings"
"sync"
"time"
)
type AllDebrid struct {
@@ -32,6 +32,36 @@ type AllDebrid struct {
CheckCached bool
}
func New(dc config.Debrid) *AllDebrid {
rl := request.ParseRateLimit(dc.RateLimit)
apiKeys := strings.Split(dc.APIKey, ",")
extraKeys := make([]string, 0)
if len(apiKeys) > 1 {
extraKeys = apiKeys[1:]
}
mainKey := apiKeys[0]
headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", mainKey),
}
_log := logger.New(dc.Name)
client := request.New(
request.WithHeaders(headers),
request.WithLogger(_log),
request.WithRateLimiter(rl),
)
return &AllDebrid{
Name: "alldebrid",
Host: dc.Host,
APIKey: mainKey,
ExtraAPIKeys: extraKeys,
DownloadUncached: dc.DownloadUncached,
client: client,
MountPath: dc.Folder,
logger: logger.New(dc.Name),
CheckCached: dc.CheckCached,
}
}
func (ad *AllDebrid) GetName() string {
return ad.Name
}
@@ -89,7 +119,7 @@ func getAlldebridStatus(statusCode int) string {
func flattenFiles(files []MagnetFile, parentPath string, index *int) map[string]types.File {
result := make(map[string]types.File)
cfg := config.GetConfig()
cfg := config.Get()
for _, f := range files {
currentPath := f.Name
@@ -218,26 +248,46 @@ func (ad *AllDebrid) DeleteTorrent(torrentId string) error {
}
func (ad *AllDebrid) GenerateDownloadLinks(t *types.Torrent) error {
for _, file := range t.Files {
url := fmt.Sprintf("%s/link/unlock", ad.Host)
query := gourl.Values{}
query.Add("link", file.Link)
url += "?" + query.Encode()
req, _ := http.NewRequest(http.MethodGet, url, nil)
resp, err := ad.client.MakeRequest(req)
if err != nil {
return err
}
var data DownloadLink
if err = json.Unmarshal(resp, &data); err != nil {
return err
}
link := data.Data.Link
file.DownloadLink = link
file.Generated = time.Now()
t.Files[file.Name] = file
filesCh := make(chan types.File, len(t.Files))
errCh := make(chan error, len(t.Files))
var wg sync.WaitGroup
wg.Add(len(t.Files))
for _, file := range t.Files {
go func(file types.File) {
defer wg.Done()
link, err := ad.GetDownloadLink(t, &file)
if err != nil {
errCh <- err
return
}
file.DownloadLink = link
file.Generated = time.Now()
if link == "" {
errCh <- fmt.Errorf("error getting download links %w", err)
return
}
filesCh <- file
}(file)
}
go func() {
wg.Wait()
close(filesCh)
close(errCh)
}()
files := make(map[string]types.File, len(t.Files))
for file := range filesCh {
files[file.Name] = file
}
// Check for errors
for err := range errCh {
if err != nil {
return err // Return the first error encountered
}
}
t.Files = files
return nil
}
@@ -317,31 +367,3 @@ func (ad *AllDebrid) CheckLink(link string) error {
func (ad *AllDebrid) GetMountPath() string {
return ad.MountPath
}
func New(dc config.Debrid) *AllDebrid {
rl := request.ParseRateLimit(dc.RateLimit)
apiKeys := strings.Split(dc.APIKey, ",")
extraKeys := make([]string, 0)
if len(apiKeys) > 1 {
extraKeys = apiKeys[1:]
}
mainKey := apiKeys[0]
headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", mainKey),
}
_log := logger.NewLogger(dc.Name)
client := request.New().
WithHeaders(headers).
WithRateLimiter(rl).WithLogger(_log)
return &AllDebrid{
Name: "alldebrid",
Host: dc.Host,
APIKey: mainKey,
ExtraAPIKeys: extraKeys,
DownloadUncached: dc.DownloadUncached,
client: client,
MountPath: dc.Folder,
logger: logger.NewLogger(dc.Name),
CheckCached: dc.CheckCached,
}
}
+6 -5
View File
@@ -95,7 +95,7 @@ type Cache struct {
}
func New(dc config.Debrid, client types.Client) *Cache {
cfg := config.GetConfig()
cfg := config.Get()
torrentRefreshInterval, err := time.ParseDuration(dc.TorrentsRefreshInterval)
if err != nil {
torrentRefreshInterval = time.Second * 15
@@ -117,7 +117,7 @@ func New(dc config.Debrid, client types.Client) *Cache {
torrents: xsync.NewMapOf[string, *CachedTorrent](),
torrentsNames: xsync.NewMapOf[string, *CachedTorrent](),
client: client,
logger: logger.NewLogger(fmt.Sprintf("%s-webdav", client.GetName())),
logger: logger.New(fmt.Sprintf("%s-webdav", client.GetName())),
workers: workers,
downloadLinks: xsync.NewMapOf[string, downloadLinkCache](),
torrentRefreshInterval: torrentRefreshInterval,
@@ -219,11 +219,12 @@ func (c *Cache) load() (map[string]*CachedTorrent, error) {
now := time.Now()
for _, file := range files {
if file.IsDir() || filepath.Ext(file.Name()) != ".json" {
fileName := file.Name()
if file.IsDir() || filepath.Ext(fileName) != ".json" {
continue
}
filePath := filepath.Join(c.dir, file.Name())
filePath := filepath.Join(c.dir, fileName)
data, err := os.ReadFile(filePath)
if err != nil {
c.logger.Debug().Err(err).Msgf("Failed to read file: %s", filePath)
@@ -301,7 +302,7 @@ func (c *Cache) SaveTorrent(ct *CachedTorrent) {
c.saveTorrent(ct)
}()
default:
c.saveTorrent(ct)
go c.saveTorrent(ct) // If the semaphore is full, just run the save in the background
}
}
+1 -1
View File
@@ -12,7 +12,7 @@ type Engine struct {
}
func NewEngine() *Engine {
cfg := config.GetConfig()
cfg := config.Get()
clients := make(map[string]types.Client)
caches := make(map[string]*Cache)
+2 -3
View File
@@ -61,8 +61,7 @@ func (c *Cache) refreshListings() {
}
// Atomic store of the complete ready-to-use slice
c.listings.Store(files)
//c.resetPropfindResponse()
_ = c.RefreshXml()
_ = c.refreshXml()
if err := c.RefreshRclone(); err != nil {
c.logger.Debug().Err(err).Msg("Failed to refresh rclone")
}
@@ -187,7 +186,7 @@ func (c *Cache) refreshTorrents() {
func (c *Cache) RefreshRclone() error {
client := request.Default()
cfg := config.GetConfig().WebDav
cfg := config.Get().WebDav
if cfg.RcUrl == "" {
return nil
+2 -1
View File
@@ -130,7 +130,6 @@ func (c *Cache) ReInsertTorrent(torrent *types.Torrent) error {
}
oldID := torrent.Id
defer c.repairsInProgress.Delete(oldID)
// Submit the magnet to the debrid service
@@ -138,6 +137,8 @@ func (c *Cache) ReInsertTorrent(torrent *types.Torrent) error {
var err error
torrent, err = c.client.SubmitMagnet(torrent)
if err != nil {
// Remove the old torrent from the cache and debrid service
_ = c.DeleteTorrent(oldID)
return fmt.Errorf("failed to submit magnet: %w", err)
}
+1 -1
View File
@@ -10,7 +10,7 @@ import (
"time"
)
func (c *Cache) RefreshXml() error {
func (c *Cache) refreshXml() error {
parents := []string{"__all__", "torrents"}
torrents := c.GetListing()
for _, parent := range parents {
+10 -7
View File
@@ -129,7 +129,7 @@ func (dl *DebridLink) UpdateTorrent(t *types.Torrent) error {
t.Seeders = data.PeersConnected
t.Filename = name
t.OriginalFilename = name
cfg := config.GetConfig()
cfg := config.Get()
for _, f := range data.Files {
if !cfg.IsSizeAllowed(f.Size) {
continue
@@ -235,6 +235,7 @@ func (dl *DebridLink) DeleteTorrent(torrentId string) error {
}
func (dl *DebridLink) GenerateDownloadLinks(t *types.Torrent) error {
// Download links are already generated
return nil
}
@@ -270,10 +271,12 @@ func New(dc config.Debrid) *DebridLink {
"Authorization": fmt.Sprintf("Bearer %s", mainKey),
"Content-Type": "application/json",
}
_log := logger.NewLogger(dc.Name)
client := request.New().
WithHeaders(headers).
WithRateLimiter(rl).WithLogger(_log)
_log := logger.New(dc.Name)
client := request.New(
request.WithHeaders(headers),
request.WithLogger(_log),
request.WithRateLimiter(rl),
)
return &DebridLink{
Name: "debridlink",
Host: dc.Host,
@@ -282,7 +285,7 @@ func New(dc config.Debrid) *DebridLink {
DownloadUncached: dc.DownloadUncached,
client: client,
MountPath: dc.Folder,
logger: logger.NewLogger(dc.Name),
logger: logger.New(dc.Name),
CheckCached: dc.CheckCached,
}
}
@@ -341,7 +344,7 @@ func (dl *DebridLink) getTorrents(page, perPage int) ([]*types.Torrent, error) {
Debrid: dl.Name,
MountPath: dl.MountPath,
}
cfg := config.GetConfig()
cfg := config.Get()
for _, f := range t.Files {
if !cfg.IsSizeAllowed(f.Size) {
continue
+12 -11
View File
@@ -48,7 +48,7 @@ func (r *RealDebrid) GetLogger() zerolog.Logger {
// if validate is false, selected files will be returned
func getTorrentFiles(t *types.Torrent, data TorrentInfo, validate bool) map[string]types.File {
files := make(map[string]types.File)
cfg := config.GetConfig()
cfg := config.Get()
idx := 0
for _, f := range data.Files {
name := filepath.Base(f.Path)
@@ -267,9 +267,8 @@ func (r *RealDebrid) GenerateDownloadLinks(t *types.Torrent) error {
errCh := make(chan error, len(t.Files))
var wg sync.WaitGroup
wg.Add(len(t.Files))
for _, f := range t.Files {
wg.Add(1)
go func(file types.File) {
defer wg.Done()
@@ -446,7 +445,7 @@ func (r *RealDebrid) getTorrents(offset int, limit int) (int, []*types.Torrent,
}
func (r *RealDebrid) GetTorrents() ([]*types.Torrent, error) {
limit := 1000
limit := 5000
// Get first batch and total count
totalItems, firstBatch, err := r.getTorrents(0, limit)
@@ -561,12 +560,14 @@ func New(dc config.Debrid) *RealDebrid {
headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", dc.APIKey),
}
_log := logger.NewLogger(dc.Name)
client := request.New().
WithHeaders(headers).
WithRateLimiter(rl).WithLogger(_log).
WithMaxRetries(5).
WithRetryableStatus(429)
_log := logger.New(dc.Name)
client := request.New(
request.WithHeaders(headers),
request.WithRateLimiter(rl),
request.WithLogger(_log),
request.WithMaxRetries(5),
request.WithRetryableStatus(429),
)
return &RealDebrid{
Name: "realdebrid",
Host: dc.Host,
@@ -575,7 +576,7 @@ func New(dc config.Debrid) *RealDebrid {
DownloadUncached: dc.DownloadUncached,
client: client,
MountPath: dc.Folder,
logger: logger.NewLogger(dc.Name),
logger: logger.New(dc.Name),
CheckCached: dc.CheckCached,
}
}
+43 -29
View File
@@ -10,8 +10,6 @@ import (
"github.com/sirrobot01/debrid-blackhole/internal/request"
"github.com/sirrobot01/debrid-blackhole/internal/utils"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"time"
"mime/multipart"
"net/http"
gourl "net/url"
@@ -20,6 +18,7 @@ import (
"slices"
"strconv"
"strings"
"sync"
)
type Torbox struct {
@@ -46,10 +45,12 @@ func New(dc config.Debrid) *Torbox {
headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", mainKey),
}
_log := logger.NewLogger(dc.Name)
client := request.New().
WithHeaders(headers).
WithRateLimiter(rl).WithLogger(_log)
_log := logger.New(dc.Name)
client := request.New(
request.WithHeaders(headers),
request.WithRateLimiter(rl),
request.WithLogger(_log),
)
return &Torbox{
Name: "torbox",
@@ -196,7 +197,7 @@ func (tb *Torbox) UpdateTorrent(t *types.Torrent) error {
t.OriginalFilename = name
t.MountPath = tb.MountPath
t.Debrid = tb.Name
cfg := config.GetConfig()
cfg := config.Get()
for _, f := range data.Files {
fileName := filepath.Base(f.Name)
if utils.IsSampleFile(f.AbsolutePath) {
@@ -275,30 +276,43 @@ func (tb *Torbox) DeleteTorrent(torrentId string) error {
}
func (tb *Torbox) GenerateDownloadLinks(t *types.Torrent) error {
filesCh := make(chan types.File, len(t.Files))
errCh := make(chan error, len(t.Files))
var wg sync.WaitGroup
wg.Add(len(t.Files))
for _, file := range t.Files {
url := fmt.Sprintf("%s/api/torrents/requestdl/", tb.Host)
query := gourl.Values{}
query.Add("torrent_id", t.Id)
query.Add("token", tb.APIKey)
query.Add("file_id", file.Id)
url += "?" + query.Encode()
req, _ := http.NewRequest(http.MethodGet, url, nil)
resp, err := tb.client.MakeRequest(req)
if err != nil {
return err
}
var data DownloadLinksResponse
if err = json.Unmarshal(resp, &data); err != nil {
return err
}
if data.Data == nil {
return fmt.Errorf("error getting download links")
}
link := *data.Data
file.DownloadLink = link
file.Generated = time.Now()
t.Files[file.Name] = file
go func() {
defer wg.Done()
link, err := tb.GetDownloadLink(t, &file)
if err != nil {
errCh <- err
return
}
file.DownloadLink = link
filesCh <- file
}()
}
go func() {
wg.Wait()
close(filesCh)
close(errCh)
}()
// Collect results
files := make(map[string]types.File, len(t.Files))
for file := range filesCh {
files[file.Name] = file
}
// Check for errors
for err := range errCh {
if err != nil {
return err // Return the first error encountered
}
}
t.Files = files
return nil
}