Improvements:

- An improvised caching for stats; using metadata on ls
- Integrated into the downloading system
- Fix minor bugs noticed
- Still experiemental, sike
This commit is contained in:
Mukhtar Akere
2025-03-20 10:42:51 +01:00
parent 50c775ca74
commit 0c68364a6a
26 changed files with 715 additions and 636 deletions

View File

@@ -23,6 +23,7 @@ type Debrid struct {
DownloadUncached bool `json:"download_uncached"` DownloadUncached bool `json:"download_uncached"`
CheckCached bool `json:"check_cached"` CheckCached bool `json:"check_cached"`
RateLimit string `json:"rate_limit"` // 200/minute or 10/second RateLimit string `json:"rate_limit"` // 200/minute or 10/second
EnableWebDav bool `json:"enable_webdav"`
} }
type Proxy struct { type Proxy struct {
@@ -174,7 +175,7 @@ func validateQbitTorrent(config *QBitTorrent) error {
return errors.New("qbittorent download folder is required") return errors.New("qbittorent download folder is required")
} }
if _, err := os.Stat(config.DownloadFolder); os.IsNotExist(err) { if _, err := os.Stat(config.DownloadFolder); os.IsNotExist(err) {
return errors.New("qbittorent download folder does not exist") return fmt.Errorf("qbittorent download folder(%s) does not exist", config.DownloadFolder)
} }
return nil return nil
} }

View File

@@ -8,7 +8,7 @@ import (
"github.com/sirrobot01/debrid-blackhole/internal/logger" "github.com/sirrobot01/debrid-blackhole/internal/logger"
"github.com/sirrobot01/debrid-blackhole/internal/request" "github.com/sirrobot01/debrid-blackhole/internal/request"
"github.com/sirrobot01/debrid-blackhole/internal/utils" "github.com/sirrobot01/debrid-blackhole/internal/utils"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/torrent" "github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"slices" "slices"
"time" "time"
@@ -47,7 +47,7 @@ func (ad *AllDebrid) IsAvailable(hashes []string) map[string]bool {
return result return result
} }
func (ad *AllDebrid) SubmitMagnet(torrent *torrent.Torrent) (*torrent.Torrent, error) { func (ad *AllDebrid) SubmitMagnet(torrent *types.Torrent) (*types.Torrent, error) {
url := fmt.Sprintf("%s/magnet/upload", ad.Host) url := fmt.Sprintf("%s/magnet/upload", ad.Host)
query := gourl.Values{} query := gourl.Values{}
query.Add("magnets[]", torrent.Magnet.Link) query.Add("magnets[]", torrent.Magnet.Link)
@@ -84,8 +84,8 @@ func getAlldebridStatus(statusCode int) string {
} }
} }
func flattenFiles(files []MagnetFile, parentPath string, index *int) map[string]torrent.File { func flattenFiles(files []MagnetFile, parentPath string, index *int) map[string]types.File {
result := make(map[string]torrent.File) result := make(map[string]types.File)
cfg := config.GetConfig() cfg := config.GetConfig()
@@ -123,7 +123,7 @@ func flattenFiles(files []MagnetFile, parentPath string, index *int) map[string]
} }
*index++ *index++
file := torrent.File{ file := types.File{
Id: strconv.Itoa(*index), Id: strconv.Itoa(*index),
Name: fileName, Name: fileName,
Size: f.Size, Size: f.Size,
@@ -136,7 +136,7 @@ func flattenFiles(files []MagnetFile, parentPath string, index *int) map[string]
return result return result
} }
func (ad *AllDebrid) UpdateTorrent(t *torrent.Torrent) error { func (ad *AllDebrid) UpdateTorrent(t *types.Torrent) error {
url := fmt.Sprintf("%s/magnet/status?id=%s", ad.Host, t.Id) url := fmt.Sprintf("%s/magnet/status?id=%s", ad.Host, t.Id)
req, _ := http.NewRequest(http.MethodGet, url, nil) req, _ := http.NewRequest(http.MethodGet, url, nil)
resp, err := ad.client.MakeRequest(req) resp, err := ad.client.MakeRequest(req)
@@ -172,7 +172,7 @@ func (ad *AllDebrid) UpdateTorrent(t *torrent.Torrent) error {
return nil return nil
} }
func (ad *AllDebrid) CheckStatus(torrent *torrent.Torrent, isSymlink bool) (*torrent.Torrent, error) { func (ad *AllDebrid) CheckStatus(torrent *types.Torrent, isSymlink bool) (*types.Torrent, error) {
for { for {
err := ad.UpdateTorrent(torrent) err := ad.UpdateTorrent(torrent)
@@ -204,7 +204,7 @@ func (ad *AllDebrid) CheckStatus(torrent *torrent.Torrent, isSymlink bool) (*tor
return torrent, nil return torrent, nil
} }
func (ad *AllDebrid) DeleteTorrent(torrent *torrent.Torrent) { func (ad *AllDebrid) DeleteTorrent(torrent *types.Torrent) {
url := fmt.Sprintf("%s/magnet/delete?id=%s", ad.Host, torrent.Id) url := fmt.Sprintf("%s/magnet/delete?id=%s", ad.Host, torrent.Id)
req, _ := http.NewRequest(http.MethodGet, url, nil) req, _ := http.NewRequest(http.MethodGet, url, nil)
_, err := ad.client.MakeRequest(req) _, err := ad.client.MakeRequest(req)
@@ -215,7 +215,7 @@ func (ad *AllDebrid) DeleteTorrent(torrent *torrent.Torrent) {
} }
} }
func (ad *AllDebrid) GenerateDownloadLinks(t *torrent.Torrent) error { func (ad *AllDebrid) GenerateDownloadLinks(t *types.Torrent) error {
for _, file := range t.Files { for _, file := range t.Files {
url := fmt.Sprintf("%s/link/unlock", ad.Host) url := fmt.Sprintf("%s/link/unlock", ad.Host)
query := gourl.Values{} query := gourl.Values{}
@@ -239,7 +239,7 @@ func (ad *AllDebrid) GenerateDownloadLinks(t *torrent.Torrent) error {
return nil return nil
} }
func (ad *AllDebrid) GetDownloadLink(t *torrent.Torrent, file *torrent.File) *torrent.File { func (ad *AllDebrid) GetDownloadLink(t *types.Torrent, file *types.File) *types.File {
url := fmt.Sprintf("%s/link/unlock", ad.Host) url := fmt.Sprintf("%s/link/unlock", ad.Host)
query := gourl.Values{} query := gourl.Values{}
query.Add("link", file.Link) query.Add("link", file.Link)
@@ -263,11 +263,11 @@ func (ad *AllDebrid) GetCheckCached() bool {
return ad.CheckCached return ad.CheckCached
} }
func (ad *AllDebrid) GetTorrents() ([]*torrent.Torrent, error) { func (ad *AllDebrid) GetTorrents() ([]*types.Torrent, error) {
return nil, nil return nil, nil
} }
func (ad *AllDebrid) GetDownloads() (map[string]torrent.DownloadLinks, error) { func (ad *AllDebrid) GetDownloads() (map[string]types.DownloadLinks, error) {
return nil, nil return nil, nil
} }
@@ -279,7 +279,7 @@ func (ad *AllDebrid) GetDownloadUncached() bool {
return ad.DownloadUncached return ad.DownloadUncached
} }
func (ad *AllDebrid) ConvertLinksToFiles(links []string) []torrent.File { func (ad *AllDebrid) ConvertLinksToFiles(links []string) []types.File {
return nil return nil
} }

View File

@@ -1,105 +0,0 @@
package debrid
import (
"fmt"
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/internal/utils"
"github.com/sirrobot01/debrid-blackhole/pkg/arr"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/alldebrid"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/debrid"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/debrid_link"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/engine"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/realdebrid"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/torbox"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/torrent"
)
func New() *engine.Engine {
cfg := config.GetConfig()
debrids := make([]debrid.Client, 0)
for _, dc := range cfg.Debrids {
client := createDebridClient(dc)
logger := client.GetLogger()
logger.Info().Msg("Debrid Service started")
debrids = append(debrids, client)
}
d := &engine.Engine{
Debrids: debrids,
LastUsed: 0,
}
return d
}
func createDebridClient(dc config.Debrid) debrid.Client {
switch dc.Name {
case "realdebrid":
return realdebrid.New(dc)
case "torbox":
return torbox.New(dc)
case "debridlink":
return debrid_link.New(dc)
case "alldebrid":
return alldebrid.New(dc)
default:
return realdebrid.New(dc)
}
}
func ProcessTorrent(d *engine.Engine, magnet *utils.Magnet, a *arr.Arr, isSymlink, overrideDownloadUncached bool) (*torrent.Torrent, error) {
debridTorrent := &torrent.Torrent{
InfoHash: magnet.InfoHash,
Magnet: magnet,
Name: magnet.Name,
Arr: a,
Size: magnet.Size,
Files: make(map[string]torrent.File),
}
errs := make([]error, 0)
for index, db := range d.Debrids {
logger := db.GetLogger()
logger.Info().Msgf("Processing debrid: %s", db.GetName())
// Override first, arr second, debrid third
if overrideDownloadUncached {
debridTorrent.DownloadUncached = true
} else if a.DownloadUncached != nil {
// Arr cached is set
debridTorrent.DownloadUncached = *a.DownloadUncached
} else {
debridTorrent.DownloadUncached = db.GetDownloadUncached()
}
logger.Info().Msgf("Torrent Hash: %s", debridTorrent.InfoHash)
if db.GetCheckCached() {
hash, exists := db.IsAvailable([]string{debridTorrent.InfoHash})[debridTorrent.InfoHash]
if !exists || !hash {
logger.Info().Msgf("Torrent: %s is not cached", debridTorrent.Name)
continue
} else {
logger.Info().Msgf("Torrent: %s is cached(or downloading)", debridTorrent.Name)
}
}
dbt, err := db.SubmitMagnet(debridTorrent)
if dbt != nil {
dbt.Arr = a
}
if err != nil || dbt == nil || dbt.Id == "" {
errs = append(errs, err)
continue
}
logger.Info().Msgf("Torrent: %s(id=%s) submitted to %s", dbt.Name, dbt.Id, db.GetName())
d.LastUsed = index
return db.CheckStatus(dbt, isSymlink)
}
err := fmt.Errorf("failed to process torrent")
for _, e := range errs {
err = fmt.Errorf("%w\n%w", err, e)
}
return nil, err
}

View File

@@ -1,53 +1,50 @@
package webdav package debrid
import ( import (
"bufio" "bufio"
"context" "context"
"fmt" "fmt"
"github.com/dgraph-io/badger/v4"
"github.com/goccy/go-json" "github.com/goccy/go-json"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/sirrobot01/debrid-blackhole/internal/logger" "github.com/sirrobot01/debrid-blackhole/internal/logger"
"github.com/sirrobot01/debrid-blackhole/internal/utils" "github.com/sirrobot01/debrid-blackhole/internal/utils"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/debrid" "github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"os" "os"
"path/filepath" "path/filepath"
"runtime" "runtime"
"sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/sirrobot01/debrid-blackhole/internal/config" "github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/torrent"
) )
type DownloadLinkCache struct { type DownloadLinkCache struct {
Link string `json:"download_link"` Link string `json:"download_link"`
} }
type propfindResponse struct { type PropfindResponse struct {
data []byte Data []byte
ts time.Time GzippedData []byte
Ts time.Time
} }
type CachedTorrent struct { type CachedTorrent struct {
*torrent.Torrent *types.Torrent
LastRead time.Time `json:"last_read"` LastRead time.Time `json:"last_read"`
IsComplete bool `json:"is_complete"` IsComplete bool `json:"is_complete"`
} }
type Cache struct { type Cache struct {
dir string dir string
client debrid.Client client types.Client
db *badger.DB
logger zerolog.Logger logger zerolog.Logger
torrents map[string]*CachedTorrent // key: torrent.Id, value: *CachedTorrent torrents map[string]*CachedTorrent // key: torrent.Id, value: *CachedTorrent
torrentsNames map[string]*CachedTorrent // key: torrent.Name, value: torrent torrentsNames map[string]*CachedTorrent // key: torrent.Name, value: torrent
listings atomic.Value listings atomic.Value
downloadLinks map[string]string // key: file.Link, value: download link downloadLinks map[string]string // key: file.Link, value: download link
propfindResp sync.Map PropfindResp sync.Map
workers int workers int
@@ -63,13 +60,28 @@ type Cache struct {
downloadLinksMutex sync.Mutex // for downloadLinks downloadLinksMutex sync.Mutex // for downloadLinks
} }
type fileInfo struct {
name string
size int64
mode os.FileMode
modTime time.Time
isDir bool
}
func (fi *fileInfo) Name() string { return fi.name }
func (fi *fileInfo) Size() int64 { return fi.size }
func (fi *fileInfo) Mode() os.FileMode { return fi.mode }
func (fi *fileInfo) ModTime() time.Time { return fi.modTime }
func (fi *fileInfo) IsDir() bool { return fi.isDir }
func (fi *fileInfo) Sys() interface{} { return nil }
func (c *Cache) setTorrent(t *CachedTorrent) { func (c *Cache) setTorrent(t *CachedTorrent) {
c.torrentsMutex.Lock() c.torrentsMutex.Lock()
c.torrents[t.Id] = t c.torrents[t.Id] = t
c.torrentsNames[t.Name] = t c.torrentsNames[t.Name] = t
c.torrentsMutex.Unlock() c.torrentsMutex.Unlock()
go c.refreshListings() // This is concurrent safe tryLock(&c.listingRefreshMu, c.refreshListings)
go func() { go func() {
if err := c.SaveTorrent(t); err != nil { if err := c.SaveTorrent(t); err != nil {
@@ -78,36 +90,6 @@ func (c *Cache) setTorrent(t *CachedTorrent) {
}() }()
} }
func (c *Cache) refreshListings() {
// Copy the current torrents to avoid concurrent issues
c.torrentsMutex.RLock()
torrents := make([]string, 0, len(c.torrents))
for _, t := range c.torrents {
if t != nil && t.Torrent != nil {
torrents = append(torrents, t.Name)
}
}
c.torrentsMutex.RUnlock()
sort.Slice(torrents, func(i, j int) bool {
return torrents[i] < torrents[j]
})
files := make([]os.FileInfo, 0, len(torrents))
now := time.Now()
for _, t := range torrents {
files = append(files, &FileInfo{
name: t,
size: 0,
mode: 0755 | os.ModeDir,
modTime: now,
isDir: true,
})
}
// Atomic store of the complete ready-to-use slice
c.listings.Store(files)
}
func (c *Cache) GetListing() []os.FileInfo { func (c *Cache) GetListing() []os.FileInfo {
if v, ok := c.listings.Load().([]os.FileInfo); ok { if v, ok := c.listings.Load().([]os.FileInfo); ok {
return v return v
@@ -124,7 +106,7 @@ func (c *Cache) setTorrents(torrents map[string]*CachedTorrent) {
c.torrentsMutex.Unlock() c.torrentsMutex.Unlock()
go c.refreshListings() // This is concurrent safe tryLock(&c.listingRefreshMu, c.refreshListings)
go func() { go func() {
if err := c.SaveTorrents(); err != nil { if err := c.SaveTorrents(); err != nil {
@@ -149,31 +131,7 @@ func (c *Cache) GetTorrentNames() map[string]*CachedTorrent {
return c.torrentsNames return c.torrentsNames
} }
type Manager struct { func NewCache(client types.Client) *Cache {
caches map[string]*Cache
}
func NewCacheManager(clients []debrid.Client) *Manager {
m := &Manager{
caches: make(map[string]*Cache),
}
for _, client := range clients {
m.caches[client.GetName()] = NewCache(client)
}
return m
}
func (m *Manager) GetCaches() map[string]*Cache {
return m.caches
}
func (m *Manager) GetCache(debridName string) *Cache {
return m.caches[debridName]
}
func NewCache(client debrid.Client) *Cache {
cfg := config.GetConfig() cfg := config.GetConfig()
dbPath := filepath.Join(cfg.Path, "cache", client.GetName()) dbPath := filepath.Join(cfg.Path, "cache", client.GetName())
return &Cache{ return &Cache{
@@ -202,7 +160,7 @@ func (c *Cache) Start() error {
c.downloadLinksRefreshMu.Lock() c.downloadLinksRefreshMu.Lock()
defer c.downloadLinksRefreshMu.Unlock() defer c.downloadLinksRefreshMu.Unlock()
// This prevents the download links from being refreshed twice // This prevents the download links from being refreshed twice
c.refreshDownloadLinks() tryLock(&c.downloadLinksRefreshMu, c.refreshDownloadLinks)
}() }()
go func() { go func() {
@@ -216,9 +174,6 @@ func (c *Cache) Start() error {
} }
func (c *Cache) Close() error { func (c *Cache) Close() error {
if c.db != nil {
return c.db.Close()
}
return nil return nil
} }
@@ -327,7 +282,7 @@ func (c *Cache) Sync() error {
c.logger.Info().Msgf("Got %d torrents from %s", len(torrents), c.client.GetName()) c.logger.Info().Msgf("Got %d torrents from %s", len(torrents), c.client.GetName())
newTorrents := make([]*torrent.Torrent, 0) newTorrents := make([]*types.Torrent, 0)
idStore := make(map[string]bool, len(torrents)) idStore := make(map[string]bool, len(torrents))
for _, t := range torrents { for _, t := range torrents {
idStore[t.Id] = true idStore[t.Id] = true
@@ -368,12 +323,12 @@ func (c *Cache) Sync() error {
return nil return nil
} }
func (c *Cache) sync(torrents []*torrent.Torrent) error { func (c *Cache) sync(torrents []*types.Torrent) error {
// Calculate optimal workers - balance between CPU and IO // Calculate optimal workers - balance between CPU and IO
workers := runtime.NumCPU() * 50 // A more balanced multiplier for BadgerDB workers := runtime.NumCPU() * 50 // A more balanced multiplier for BadgerDB
// Create channels with appropriate buffering // Create channels with appropriate buffering
workChan := make(chan *torrent.Torrent, workers*2) workChan := make(chan *types.Torrent, workers*2)
// Use an atomic counter for progress tracking // Use an atomic counter for progress tracking
var processed int64 var processed int64
@@ -398,7 +353,7 @@ func (c *Cache) sync(torrents []*torrent.Torrent) error {
return // Channel closed, exit goroutine return // Channel closed, exit goroutine
} }
if err := c.processTorrent(t); err != nil { if err := c.ProcessTorrent(t, true); err != nil {
c.logger.Error().Err(err).Str("torrent", t.Name).Msg("sync error") c.logger.Error().Err(err).Str("torrent", t.Name).Msg("sync error")
atomic.AddInt64(&errorCount, 1) atomic.AddInt64(&errorCount, 1)
} }
@@ -435,11 +390,11 @@ func (c *Cache) sync(torrents []*torrent.Torrent) error {
return nil return nil
} }
func (c *Cache) processTorrent(t *torrent.Torrent) error { func (c *Cache) ProcessTorrent(t *types.Torrent, refreshRclone bool) error {
var err error if len(t.Files) == 0 {
err = c.client.UpdateTorrent(t) if err := c.client.UpdateTorrent(t); err != nil {
if err != nil { return fmt.Errorf("failed to update torrent: %w", err)
return fmt.Errorf("failed to get torrent files: %v", err) }
} }
ct := &CachedTorrent{ ct := &CachedTorrent{
@@ -448,6 +403,9 @@ func (c *Cache) processTorrent(t *torrent.Torrent) error {
IsComplete: len(t.Files) > 0, IsComplete: len(t.Files) > 0,
} }
c.setTorrent(ct) c.setTorrent(ct)
if err := c.RefreshRclone(); err != nil {
c.logger.Debug().Err(err).Msg("Failed to refresh rclone")
}
return nil return nil
} }
@@ -469,7 +427,7 @@ func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) string {
if ct.IsComplete { if ct.IsComplete {
return "" return ""
} }
ct = c.refreshTorrent(ct) // Refresh the torrent from the debrid service ct = c.refreshTorrent(ct) // Refresh the torrent from the debrid
if ct == nil { if ct == nil {
return "" return ""
} else { } else {
@@ -477,7 +435,7 @@ func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) string {
} }
} }
c.logger.Debug().Msgf("Getting download link for %s", ct.Name) c.logger.Trace().Msgf("Getting download link for %s", ct.Name)
f := c.client.GetDownloadLink(ct.Torrent, &file) f := c.client.GetDownloadLink(ct.Torrent, &file)
if f == nil { if f == nil {
return "" return ""
@@ -490,7 +448,7 @@ func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) string {
return f.DownloadLink return f.DownloadLink
} }
func (c *Cache) updateDownloadLink(file *torrent.File) { func (c *Cache) updateDownloadLink(file *types.File) {
c.downloadLinksMutex.Lock() c.downloadLinksMutex.Lock()
defer c.downloadLinksMutex.Unlock() defer c.downloadLinksMutex.Unlock()
c.downloadLinks[file.Link] = file.DownloadLink c.downloadLinks[file.Link] = file.DownloadLink
@@ -503,111 +461,10 @@ func (c *Cache) checkDownloadLink(link string) string {
return "" return ""
} }
func (c *Cache) refreshTorrent(t *CachedTorrent) *CachedTorrent { func (c *Cache) GetClient() types.Client {
_torrent := t.Torrent
err := c.client.UpdateTorrent(_torrent)
if err != nil {
c.logger.Debug().Msgf("Failed to get torrent files for %s: %v", t.Id, err)
return nil
}
if len(t.Files) == 0 {
return nil
}
ct := &CachedTorrent{
Torrent: _torrent,
LastRead: time.Now(),
IsComplete: len(t.Files) > 0,
}
c.setTorrent(ct)
return ct
}
func (c *Cache) refreshDownloadLinks() map[string]string {
c.downloadLinksMutex.Lock()
defer c.downloadLinksMutex.Unlock()
downloadLinks, err := c.client.GetDownloads()
if err != nil {
c.logger.Debug().Err(err).Msg("Failed to get download links")
return nil
}
for k, v := range downloadLinks {
c.downloadLinks[k] = v.DownloadLink
}
return c.downloadLinks
}
func (c *Cache) GetClient() debrid.Client {
return c.client return c.client
} }
func (c *Cache) refreshTorrents() {
c.torrentsMutex.RLock()
currentTorrents := c.torrents //
// Create a copy of the current torrents to avoid concurrent issues
torrents := make(map[string]string, len(currentTorrents)) // a mpa of id and name
for _, v := range currentTorrents {
torrents[v.Id] = v.Name
}
c.torrentsMutex.RUnlock()
// Get new torrents from the debrid service
debTorrents, err := c.client.GetTorrents()
if err != nil {
c.logger.Debug().Err(err).Msg("Failed to get torrents")
return
}
if len(debTorrents) == 0 {
// Maybe an error occurred
return
}
// Get the newly added torrents only
newTorrents := make([]*torrent.Torrent, 0)
idStore := make(map[string]bool, len(debTorrents))
for _, t := range debTorrents {
idStore[t.Id] = true
if _, ok := torrents[t.Id]; !ok {
newTorrents = append(newTorrents, t)
}
}
// Check for deleted torrents
deletedTorrents := make([]string, 0)
for id, _ := range torrents {
if _, ok := idStore[id]; !ok {
deletedTorrents = append(deletedTorrents, id)
}
}
if len(deletedTorrents) > 0 {
c.DeleteTorrent(deletedTorrents)
}
if len(newTorrents) == 0 {
return
}
c.logger.Info().Msgf("Found %d new torrents", len(newTorrents))
// No need for a complex sync process, just add the new torrents
wg := sync.WaitGroup{}
wg.Add(len(newTorrents))
for _, t := range newTorrents {
// processTorrent is concurrent safe
go func() {
defer wg.Done()
if err := c.processTorrent(t); err != nil {
c.logger.Info().Err(err).Msg("Failed to process torrent")
}
}()
}
wg.Wait()
}
func (c *Cache) DeleteTorrent(ids []string) { func (c *Cache) DeleteTorrent(ids []string) {
c.logger.Info().Msgf("Deleting %d torrents", len(ids)) c.logger.Info().Msgf("Deleting %d torrents", len(ids))
c.torrentsMutex.Lock() c.torrentsMutex.Lock()
@@ -628,25 +485,7 @@ func (c *Cache) removeFromDB(torrentId string) {
} }
} }
func (c *Cache) resetPropfindResponse() { func (c *Cache) OnRemove(torrentId string) {
// Right now, parents are hardcoded go c.DeleteTorrent([]string{torrentId})
parents := []string{"__all__", "torrents"} go tryLock(&c.listingRefreshMu, c.refreshListings)
// Reset only the parent directories
// Convert the parents to a keys
// This is a bit hacky, but it works
// Instead of deleting all the keys, we only delete the parent keys, e.g __all__/ or torrents/
keys := make([]string, 0, len(parents))
for _, p := range parents {
// Construct the key
// construct url
url := filepath.Join("/webdav/%s/%s", c.client.GetName(), p)
key0 := fmt.Sprintf("propfind:%s:0", url)
key1 := fmt.Sprintf("propfind:%s:1", url)
keys = append(keys, key0, key1)
}
// Delete the keys
for _, k := range keys {
c.propfindResp.Delete(k)
}
} }

View File

@@ -1,24 +1,86 @@
package debrid package debrid
import ( import (
"github.com/rs/zerolog" "fmt"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/torrent" "github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/internal/utils"
"github.com/sirrobot01/debrid-blackhole/pkg/arr"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/alldebrid"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/debrid_link"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/realdebrid"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/torbox"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
) )
type Client interface { func createDebridClient(dc config.Debrid) types.Client {
SubmitMagnet(tr *torrent.Torrent) (*torrent.Torrent, error) switch dc.Name {
CheckStatus(tr *torrent.Torrent, isSymlink bool) (*torrent.Torrent, error) case "realdebrid":
GenerateDownloadLinks(tr *torrent.Torrent) error return realdebrid.New(dc)
GetDownloadLink(tr *torrent.Torrent, file *torrent.File) *torrent.File case "torbox":
ConvertLinksToFiles(links []string) []torrent.File return torbox.New(dc)
DeleteTorrent(tr *torrent.Torrent) case "debridlink":
IsAvailable(infohashes []string) map[string]bool return debrid_link.New(dc)
GetCheckCached() bool case "alldebrid":
GetDownloadUncached() bool return alldebrid.New(dc)
UpdateTorrent(torrent *torrent.Torrent) error default:
GetTorrents() ([]*torrent.Torrent, error) return realdebrid.New(dc)
GetName() string }
GetLogger() zerolog.Logger }
GetDownloadingStatus() []string
GetDownloads() (map[string]torrent.DownloadLinks, error) func ProcessTorrent(d *Engine, magnet *utils.Magnet, a *arr.Arr, isSymlink, overrideDownloadUncached bool) (*types.Torrent, error) {
debridTorrent := &types.Torrent{
InfoHash: magnet.InfoHash,
Magnet: magnet,
Name: magnet.Name,
Arr: a,
Size: magnet.Size,
Files: make(map[string]types.File),
}
errs := make([]error, 0)
for index, db := range d.Clients {
logger := db.GetLogger()
logger.Info().Msgf("Processing debrid: %s", db.GetName())
// Override first, arr second, debrid third
if overrideDownloadUncached {
debridTorrent.DownloadUncached = true
} else if a.DownloadUncached != nil {
// Arr cached is set
debridTorrent.DownloadUncached = *a.DownloadUncached
} else {
debridTorrent.DownloadUncached = db.GetDownloadUncached()
}
logger.Info().Msgf("Torrent Hash: %s", debridTorrent.InfoHash)
if db.GetCheckCached() {
hash, exists := db.IsAvailable([]string{debridTorrent.InfoHash})[debridTorrent.InfoHash]
if !exists || !hash {
logger.Info().Msgf("Torrent: %s is not cached", debridTorrent.Name)
continue
} else {
logger.Info().Msgf("Torrent: %s is cached(or downloading)", debridTorrent.Name)
}
}
dbt, err := db.SubmitMagnet(debridTorrent)
if dbt != nil {
dbt.Arr = a
}
if err != nil || dbt == nil || dbt.Id == "" {
errs = append(errs, err)
continue
}
logger.Info().Msgf("Torrent: %s(id=%s) submitted to %s", dbt.Name, dbt.Id, db.GetName())
d.LastUsed = index
return db.CheckStatus(dbt, isSymlink)
}
err := fmt.Errorf("failed to process torrent")
for _, e := range errs {
err = fmt.Errorf("%w\n%w", err, e)
}
return nil, err
} }

View File

@@ -0,0 +1,51 @@
package debrid
import (
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
)
type Engine struct {
Clients map[string]types.Client
Caches map[string]*Cache
LastUsed string
}
func NewEngine() *Engine {
cfg := config.GetConfig()
clients := make(map[string]types.Client)
caches := make(map[string]*Cache)
for _, dc := range cfg.Debrids {
client := createDebridClient(dc)
logger := client.GetLogger()
logger.Info().Msg("Debrid Service started")
clients[dc.Name] = client
caches[dc.Name] = NewCache(client)
}
d := &Engine{
Clients: clients,
LastUsed: "",
Caches: caches,
}
return d
}
func (d *Engine) Get() types.Client {
if d.LastUsed == "" {
for _, c := range d.Clients {
return c
}
}
return d.Clients[d.LastUsed]
}
func (d *Engine) GetByName(name string) types.Client {
return d.Clients[name]
}
func (d *Engine) GetDebrids() map[string]types.Client {
return d.Clients
}

10
pkg/debrid/debrid/misc.go Normal file
View File

@@ -0,0 +1,10 @@
package debrid
import "sync"
func tryLock(mu *sync.Mutex, f func()) {
if mu.TryLock() {
defer mu.Unlock()
f()
}
}

View File

@@ -0,0 +1,205 @@
package debrid
import (
"bytes"
"fmt"
"github.com/goccy/go-json"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"net/http"
"os"
"path"
"path/filepath"
"sort"
"sync"
"time"
)
func (c *Cache) refreshListings() {
// Copy the current torrents to avoid concurrent issues
c.torrentsMutex.RLock()
torrents := make([]string, 0, len(c.torrents))
for _, t := range c.torrents {
if t != nil && t.Torrent != nil {
torrents = append(torrents, t.Name)
}
}
c.torrentsMutex.RUnlock()
sort.Slice(torrents, func(i, j int) bool {
return torrents[i] < torrents[j]
})
files := make([]os.FileInfo, 0, len(torrents))
now := time.Now()
for _, t := range torrents {
files = append(files, &fileInfo{
name: t,
size: 0,
mode: 0755 | os.ModeDir,
modTime: now,
isDir: true,
})
}
// Atomic store of the complete ready-to-use slice
c.listings.Store(files)
c.resetPropfindResponse()
if err := c.RefreshRclone(); err != nil {
c.logger.Debug().Err(err).Msg("Failed to refresh rclone")
}
}
func (c *Cache) refreshTorrents() {
c.torrentsMutex.RLock()
currentTorrents := c.torrents //
// Create a copy of the current torrents to avoid concurrent issues
torrents := make(map[string]string, len(currentTorrents)) // a mpa of id and name
for _, v := range currentTorrents {
torrents[v.Id] = v.Name
}
c.torrentsMutex.RUnlock()
// Get new torrents from the debrid service
debTorrents, err := c.client.GetTorrents()
if err != nil {
c.logger.Debug().Err(err).Msg("Failed to get torrents")
return
}
if len(debTorrents) == 0 {
// Maybe an error occurred
return
}
// Get the newly added torrents only
newTorrents := make([]*types.Torrent, 0)
idStore := make(map[string]bool, len(debTorrents))
for _, t := range debTorrents {
idStore[t.Id] = true
if _, ok := torrents[t.Id]; !ok {
newTorrents = append(newTorrents, t)
}
}
// Check for deleted torrents
deletedTorrents := make([]string, 0)
for id, _ := range torrents {
if _, ok := idStore[id]; !ok {
deletedTorrents = append(deletedTorrents, id)
}
}
if len(deletedTorrents) > 0 {
c.DeleteTorrent(deletedTorrents)
}
if len(newTorrents) == 0 {
return
}
c.logger.Info().Msgf("Found %d new torrents", len(newTorrents))
// No need for a complex sync process, just add the new torrents
wg := sync.WaitGroup{}
wg.Add(len(newTorrents))
for _, t := range newTorrents {
// ProcessTorrent is concurrent safe
go func() {
defer wg.Done()
if err := c.ProcessTorrent(t, true); err != nil {
c.logger.Info().Err(err).Msg("Failed to process torrent")
}
}()
}
wg.Wait()
}
func (c *Cache) RefreshRclone() error {
params := map[string]interface{}{
"recursive": "false",
}
// Convert parameters to JSON
jsonParams, err := json.Marshal(params)
if err != nil {
return err
}
// Create HTTP request
url := "http://192.168.0.219:9990/vfs/refresh" // Switch to config
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonParams))
if err != nil {
return err
}
// Set the appropriate headers
req.Header.Set("Content-Type", "application/json")
// Send the request
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return fmt.Errorf("failed to refresh rclone: %s", resp.Status)
}
return nil
}
func (c *Cache) refreshTorrent(t *CachedTorrent) *CachedTorrent {
_torrent := t.Torrent
err := c.client.UpdateTorrent(_torrent)
if err != nil {
c.logger.Debug().Msgf("Failed to get torrent files for %s: %v", t.Id, err)
return nil
}
if len(t.Files) == 0 {
return nil
}
ct := &CachedTorrent{
Torrent: _torrent,
LastRead: time.Now(),
IsComplete: len(t.Files) > 0,
}
c.setTorrent(ct)
return ct
}
func (c *Cache) refreshDownloadLinks() {
c.downloadLinksMutex.Lock()
defer c.downloadLinksMutex.Unlock()
downloadLinks, err := c.client.GetDownloads()
if err != nil {
c.logger.Debug().Err(err).Msg("Failed to get download links")
}
for k, v := range downloadLinks {
c.downloadLinks[k] = v.DownloadLink
}
}
func (c *Cache) resetPropfindResponse() {
// Right now, parents are hardcoded
parents := []string{"__all__", "torrents"}
// Reset only the parent directories
// Convert the parents to a keys
// This is a bit hacky, but it works
// Instead of deleting all the keys, we only delete the parent keys, e.g __all__/ or torrents/
keys := make([]string, 0, len(parents))
for _, p := range parents {
// Construct the key
// construct url
url := filepath.Join("/webdav", c.client.GetName(), p)
url = path.Clean(url)
key0 := fmt.Sprintf("propfind:%s:0", url)
key1 := fmt.Sprintf("propfind:%s:1", url)
keys = append(keys, key0, key1)
}
// Delete the keys
for _, k := range keys {
c.PropfindResp.Delete(k)
}
}

View File

@@ -0,0 +1,35 @@
package debrid
import "time"
func (c *Cache) Refresh() error {
// For now, we just want to refresh the listing and download links
c.logger.Info().Msg("Starting cache refresh workers")
go c.refreshDownloadLinksWorker()
go c.refreshTorrentsWorker()
return nil
}
func (c *Cache) refreshDownloadLinksWorker() {
refreshTicker := time.NewTicker(40 * time.Minute)
defer refreshTicker.Stop()
for {
select {
case <-refreshTicker.C:
tryLock(&c.downloadLinksRefreshMu, c.refreshDownloadLinks)
}
}
}
func (c *Cache) refreshTorrentsWorker() {
refreshTicker := time.NewTicker(5 * time.Second)
defer refreshTicker.Stop()
for {
select {
case <-refreshTicker.C:
tryLock(&c.torrentsRefreshMu, c.refreshTorrents)
}
}
}

View File

@@ -9,7 +9,7 @@ import (
"github.com/sirrobot01/debrid-blackhole/internal/logger" "github.com/sirrobot01/debrid-blackhole/internal/logger"
"github.com/sirrobot01/debrid-blackhole/internal/request" "github.com/sirrobot01/debrid-blackhole/internal/request"
"github.com/sirrobot01/debrid-blackhole/internal/utils" "github.com/sirrobot01/debrid-blackhole/internal/utils"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/torrent" "github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"slices" "slices"
"time" "time"
@@ -89,7 +89,7 @@ func (dl *DebridLink) IsAvailable(hashes []string) map[string]bool {
return result return result
} }
func (dl *DebridLink) UpdateTorrent(t *torrent.Torrent) error { func (dl *DebridLink) UpdateTorrent(t *types.Torrent) error {
url := fmt.Sprintf("%s/seedbox/list?ids=%s", dl.Host, t.Id) url := fmt.Sprintf("%s/seedbox/list?ids=%s", dl.Host, t.Id)
req, _ := http.NewRequest(http.MethodGet, url, nil) req, _ := http.NewRequest(http.MethodGet, url, nil)
resp, err := dl.client.MakeRequest(req) resp, err := dl.client.MakeRequest(req)
@@ -133,7 +133,7 @@ func (dl *DebridLink) UpdateTorrent(t *torrent.Torrent) error {
if !cfg.IsSizeAllowed(f.Size) { if !cfg.IsSizeAllowed(f.Size) {
continue continue
} }
file := torrent.File{ file := types.File{
Id: f.ID, Id: f.ID,
Name: f.Name, Name: f.Name,
Size: f.Size, Size: f.Size,
@@ -146,7 +146,7 @@ func (dl *DebridLink) UpdateTorrent(t *torrent.Torrent) error {
return nil return nil
} }
func (dl *DebridLink) SubmitMagnet(t *torrent.Torrent) (*torrent.Torrent, error) { func (dl *DebridLink) SubmitMagnet(t *types.Torrent) (*types.Torrent, error) {
url := fmt.Sprintf("%s/seedbox/add", dl.Host) url := fmt.Sprintf("%s/seedbox/add", dl.Host)
payload := map[string]string{"url": t.Magnet.Link} payload := map[string]string{"url": t.Magnet.Link}
jsonPayload, _ := json.Marshal(payload) jsonPayload, _ := json.Marshal(payload)
@@ -179,7 +179,7 @@ func (dl *DebridLink) SubmitMagnet(t *torrent.Torrent) (*torrent.Torrent, error)
t.MountPath = dl.MountPath t.MountPath = dl.MountPath
t.Debrid = dl.Name t.Debrid = dl.Name
for _, f := range data.Files { for _, f := range data.Files {
file := torrent.File{ file := types.File{
Id: f.ID, Id: f.ID,
Name: f.Name, Name: f.Name,
Size: f.Size, Size: f.Size,
@@ -194,7 +194,7 @@ func (dl *DebridLink) SubmitMagnet(t *torrent.Torrent) (*torrent.Torrent, error)
return t, nil return t, nil
} }
func (dl *DebridLink) CheckStatus(torrent *torrent.Torrent, isSymlink bool) (*torrent.Torrent, error) { func (dl *DebridLink) CheckStatus(torrent *types.Torrent, isSymlink bool) (*types.Torrent, error) {
for { for {
err := dl.UpdateTorrent(torrent) err := dl.UpdateTorrent(torrent)
if err != nil || torrent == nil { if err != nil || torrent == nil {
@@ -223,7 +223,7 @@ func (dl *DebridLink) CheckStatus(torrent *torrent.Torrent, isSymlink bool) (*to
return torrent, nil return torrent, nil
} }
func (dl *DebridLink) DeleteTorrent(torrent *torrent.Torrent) { func (dl *DebridLink) DeleteTorrent(torrent *types.Torrent) {
url := fmt.Sprintf("%s/seedbox/%s/remove", dl.Host, torrent.Id) url := fmt.Sprintf("%s/seedbox/%s/remove", dl.Host, torrent.Id)
req, _ := http.NewRequest(http.MethodDelete, url, nil) req, _ := http.NewRequest(http.MethodDelete, url, nil)
_, err := dl.client.MakeRequest(req) _, err := dl.client.MakeRequest(req)
@@ -234,15 +234,15 @@ func (dl *DebridLink) DeleteTorrent(torrent *torrent.Torrent) {
} }
} }
func (dl *DebridLink) GenerateDownloadLinks(t *torrent.Torrent) error { func (dl *DebridLink) GenerateDownloadLinks(t *types.Torrent) error {
return nil return nil
} }
func (dl *DebridLink) GetDownloads() (map[string]torrent.DownloadLinks, error) { func (dl *DebridLink) GetDownloads() (map[string]types.DownloadLinks, error) {
return nil, nil return nil, nil
} }
func (dl *DebridLink) GetDownloadLink(t *torrent.Torrent, file *torrent.File) *torrent.File { func (dl *DebridLink) GetDownloadLink(t *types.Torrent, file *types.File) *types.File {
return file return file
} }
@@ -280,10 +280,10 @@ func New(dc config.Debrid) *DebridLink {
} }
} }
func (dl *DebridLink) GetTorrents() ([]*torrent.Torrent, error) { func (dl *DebridLink) GetTorrents() ([]*types.Torrent, error) {
return nil, nil return nil, nil
} }
func (dl *DebridLink) ConvertLinksToFiles(links []string) []torrent.File { func (dl *DebridLink) ConvertLinksToFiles(links []string) []types.File {
return nil return nil
} }

View File

@@ -1,30 +0,0 @@
package engine
import (
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/debrid"
)
type Engine struct {
Debrids []debrid.Client
LastUsed int
}
func (d *Engine) Get() debrid.Client {
if d.LastUsed == 0 {
return d.Debrids[0]
}
return d.Debrids[d.LastUsed]
}
func (d *Engine) GetByName(name string) debrid.Client {
for _, deb := range d.Debrids {
if deb.GetName() == name {
return deb
}
}
return nil
}
func (d *Engine) GetDebrids() []debrid.Client {
return d.Debrids
}

View File

@@ -8,7 +8,7 @@ import (
"github.com/sirrobot01/debrid-blackhole/internal/logger" "github.com/sirrobot01/debrid-blackhole/internal/logger"
"github.com/sirrobot01/debrid-blackhole/internal/request" "github.com/sirrobot01/debrid-blackhole/internal/request"
"github.com/sirrobot01/debrid-blackhole/internal/utils" "github.com/sirrobot01/debrid-blackhole/internal/utils"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/torrent" "github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"io" "io"
"net/http" "net/http"
gourl "net/url" gourl "net/url"
@@ -43,8 +43,8 @@ func (r *RealDebrid) GetLogger() zerolog.Logger {
// getTorrentFiles returns a list of torrent files from the torrent info // getTorrentFiles returns a list of torrent files from the torrent info
// validate is used to determine if the files should be validated // validate is used to determine if the files should be validated
// if validate is false, selected files will be returned // if validate is false, selected files will be returned
func getTorrentFiles(t *torrent.Torrent, data TorrentInfo, validate bool) map[string]torrent.File { func getTorrentFiles(t *types.Torrent, data TorrentInfo, validate bool) map[string]types.File {
files := make(map[string]torrent.File) files := make(map[string]types.File)
cfg := config.GetConfig() cfg := config.GetConfig()
idx := 0 idx := 0
for _, f := range data.Files { for _, f := range data.Files {
@@ -80,7 +80,7 @@ func getTorrentFiles(t *torrent.Torrent, data TorrentInfo, validate bool) map[st
continue continue
} }
file := torrent.File{ file := types.File{
Name: name, Name: name,
Path: name, Path: name,
Size: f.Bytes, Size: f.Bytes,
@@ -141,7 +141,7 @@ func (r *RealDebrid) IsAvailable(hashes []string) map[string]bool {
return result return result
} }
func (r *RealDebrid) SubmitMagnet(t *torrent.Torrent) (*torrent.Torrent, error) { func (r *RealDebrid) SubmitMagnet(t *types.Torrent) (*types.Torrent, error) {
url := fmt.Sprintf("%s/torrents/addMagnet", r.Host) url := fmt.Sprintf("%s/torrents/addMagnet", r.Host)
payload := gourl.Values{ payload := gourl.Values{
"magnet": {t.Magnet.Link}, "magnet": {t.Magnet.Link},
@@ -161,7 +161,7 @@ func (r *RealDebrid) SubmitMagnet(t *torrent.Torrent) (*torrent.Torrent, error)
return t, nil return t, nil
} }
func (r *RealDebrid) UpdateTorrent(t *torrent.Torrent) error { func (r *RealDebrid) UpdateTorrent(t *types.Torrent) error {
url := fmt.Sprintf("%s/torrents/info/%s", r.Host, t.Id) url := fmt.Sprintf("%s/torrents/info/%s", r.Host, t.Id)
req, _ := http.NewRequest(http.MethodGet, url, nil) req, _ := http.NewRequest(http.MethodGet, url, nil)
resp, err := r.client.MakeRequest(req) resp, err := r.client.MakeRequest(req)
@@ -173,7 +173,7 @@ func (r *RealDebrid) UpdateTorrent(t *torrent.Torrent) error {
if err != nil { if err != nil {
return err return err
} }
name := utils.RemoveExtension(data.OriginalFilename) name := utils.RemoveInvalidChars(data.OriginalFilename)
t.Name = name t.Name = name
t.Bytes = data.Bytes t.Bytes = data.Bytes
t.Folder = name t.Folder = name
@@ -190,7 +190,7 @@ func (r *RealDebrid) UpdateTorrent(t *torrent.Torrent) error {
return nil return nil
} }
func (r *RealDebrid) CheckStatus(t *torrent.Torrent, isSymlink bool) (*torrent.Torrent, error) { func (r *RealDebrid) CheckStatus(t *types.Torrent, isSymlink bool) (*types.Torrent, error) {
url := fmt.Sprintf("%s/torrents/info/%s", r.Host, t.Id) url := fmt.Sprintf("%s/torrents/info/%s", r.Host, t.Id)
req, _ := http.NewRequest(http.MethodGet, url, nil) req, _ := http.NewRequest(http.MethodGet, url, nil)
for { for {
@@ -204,7 +204,7 @@ func (r *RealDebrid) CheckStatus(t *torrent.Torrent, isSymlink bool) (*torrent.T
return t, err return t, err
} }
status := data.Status status := data.Status
name := utils.RemoveInvalidChars(data.OriginalFilename) name := utils.RemoveExtension(data.OriginalFilename)
t.Name = name // Important because some magnet changes the name t.Name = name // Important because some magnet changes the name
t.Folder = name t.Folder = name
t.Filename = data.Filename t.Filename = data.Filename
@@ -257,7 +257,7 @@ func (r *RealDebrid) CheckStatus(t *torrent.Torrent, isSymlink bool) (*torrent.T
return t, nil return t, nil
} }
func (r *RealDebrid) DeleteTorrent(torrent *torrent.Torrent) { func (r *RealDebrid) DeleteTorrent(torrent *types.Torrent) {
url := fmt.Sprintf("%s/torrents/delete/%s", r.Host, torrent.Id) url := fmt.Sprintf("%s/torrents/delete/%s", r.Host, torrent.Id)
req, _ := http.NewRequest(http.MethodDelete, url, nil) req, _ := http.NewRequest(http.MethodDelete, url, nil)
_, err := r.client.MakeRequest(req) _, err := r.client.MakeRequest(req)
@@ -268,7 +268,7 @@ func (r *RealDebrid) DeleteTorrent(torrent *torrent.Torrent) {
} }
} }
func (r *RealDebrid) GenerateDownloadLinks(t *torrent.Torrent) error { func (r *RealDebrid) GenerateDownloadLinks(t *types.Torrent) error {
url := fmt.Sprintf("%s/unrestrict/link/", r.Host) url := fmt.Sprintf("%s/unrestrict/link/", r.Host)
for _, f := range t.Files { for _, f := range t.Files {
if f.DownloadLink != "" { if f.DownloadLink != "" {
@@ -294,8 +294,8 @@ func (r *RealDebrid) GenerateDownloadLinks(t *torrent.Torrent) error {
return nil return nil
} }
func (r *RealDebrid) ConvertLinksToFiles(links []string) []torrent.File { func (r *RealDebrid) ConvertLinksToFiles(links []string) []types.File {
files := make([]torrent.File, 0) files := make([]types.File, 0)
for _, l := range links { for _, l := range links {
url := fmt.Sprintf("%s/unrestrict/link/", r.Host) url := fmt.Sprintf("%s/unrestrict/link/", r.Host)
payload := gourl.Values{ payload := gourl.Values{
@@ -310,7 +310,7 @@ func (r *RealDebrid) ConvertLinksToFiles(links []string) []torrent.File {
if err = json.Unmarshal(resp, &data); err != nil { if err = json.Unmarshal(resp, &data); err != nil {
continue continue
} }
files = append(files, torrent.File{ files = append(files, types.File{
Name: data.Filename, Name: data.Filename,
Size: data.Filesize, Size: data.Filesize,
Link: l, Link: l,
@@ -321,7 +321,7 @@ func (r *RealDebrid) ConvertLinksToFiles(links []string) []torrent.File {
return files return files
} }
func (r *RealDebrid) GetDownloadLink(t *torrent.Torrent, file *torrent.File) *torrent.File { func (r *RealDebrid) GetDownloadLink(t *types.Torrent, file *types.File) *types.File {
url := fmt.Sprintf("%s/unrestrict/link/", r.Host) url := fmt.Sprintf("%s/unrestrict/link/", r.Host)
payload := gourl.Values{ payload := gourl.Values{
"link": {file.Link}, "link": {file.Link},
@@ -344,9 +344,9 @@ func (r *RealDebrid) GetCheckCached() bool {
return r.CheckCached return r.CheckCached
} }
func (r *RealDebrid) getTorrents(offset int, limit int) (int, []*torrent.Torrent, error) { func (r *RealDebrid) getTorrents(offset int, limit int) (int, []*types.Torrent, error) {
url := fmt.Sprintf("%s/torrents?limit=%d", r.Host, limit) url := fmt.Sprintf("%s/torrents?limit=%d", r.Host, limit)
torrents := make([]*torrent.Torrent, 0) torrents := make([]*types.Torrent, 0)
if offset > 0 { if offset > 0 {
url = fmt.Sprintf("%s&offset=%d", url, offset) url = fmt.Sprintf("%s&offset=%d", url, offset)
} }
@@ -374,10 +374,13 @@ func (r *RealDebrid) getTorrents(offset int, limit int) (int, []*torrent.Torrent
} }
filenames := map[string]bool{} filenames := map[string]bool{}
for _, t := range data { for _, t := range data {
if t.Status != "downloaded" {
continue
}
if _, exists := filenames[t.Filename]; exists { if _, exists := filenames[t.Filename]; exists {
continue continue
} }
torrents = append(torrents, &torrent.Torrent{ torrents = append(torrents, &types.Torrent{
Id: t.Id, Id: t.Id,
Name: utils.RemoveInvalidChars(t.Filename), Name: utils.RemoveInvalidChars(t.Filename),
Bytes: t.Bytes, Bytes: t.Bytes,
@@ -386,7 +389,7 @@ func (r *RealDebrid) getTorrents(offset int, limit int) (int, []*torrent.Torrent
Filename: t.Filename, Filename: t.Filename,
OriginalFilename: t.Filename, OriginalFilename: t.Filename,
Links: t.Links, Links: t.Links,
Files: make(map[string]torrent.File), Files: make(map[string]types.File),
InfoHash: t.Hash, InfoHash: t.Hash,
Debrid: r.Name, Debrid: r.Name,
MountPath: r.MountPath, MountPath: r.MountPath,
@@ -395,7 +398,7 @@ func (r *RealDebrid) getTorrents(offset int, limit int) (int, []*torrent.Torrent
return totalItems, torrents, nil return totalItems, torrents, nil
} }
func (r *RealDebrid) GetTorrents() ([]*torrent.Torrent, error) { func (r *RealDebrid) GetTorrents() ([]*types.Torrent, error) {
limit := 5000 limit := 5000
// Get first batch and total count // Get first batch and total count
@@ -449,8 +452,8 @@ func (r *RealDebrid) GetTorrents() ([]*torrent.Torrent, error) {
return allTorrents, nil return allTorrents, nil
} }
func (r *RealDebrid) GetDownloads() (map[string]torrent.DownloadLinks, error) { func (r *RealDebrid) GetDownloads() (map[string]types.DownloadLinks, error) {
links := make(map[string]torrent.DownloadLinks) links := make(map[string]types.DownloadLinks)
offset := 0 offset := 0
limit := 5000 limit := 5000
for { for {
@@ -475,7 +478,7 @@ func (r *RealDebrid) GetDownloads() (map[string]torrent.DownloadLinks, error) {
return links, nil return links, nil
} }
func (r *RealDebrid) _getDownloads(offset int, limit int) ([]torrent.DownloadLinks, error) { func (r *RealDebrid) _getDownloads(offset int, limit int) ([]types.DownloadLinks, error) {
url := fmt.Sprintf("%s/downloads?limit=%d", r.Host, limit) url := fmt.Sprintf("%s/downloads?limit=%d", r.Host, limit)
if offset > 0 { if offset > 0 {
url = fmt.Sprintf("%s&offset=%d", url, offset) url = fmt.Sprintf("%s&offset=%d", url, offset)
@@ -489,9 +492,9 @@ func (r *RealDebrid) _getDownloads(offset int, limit int) ([]torrent.DownloadLin
if err = json.Unmarshal(resp, &data); err != nil { if err = json.Unmarshal(resp, &data); err != nil {
return nil, err return nil, err
} }
links := make([]torrent.DownloadLinks, 0) links := make([]types.DownloadLinks, 0)
for _, d := range data { for _, d := range data {
links = append(links, torrent.DownloadLinks{ links = append(links, types.DownloadLinks{
Filename: d.Filename, Filename: d.Filename,
Size: d.Filesize, Size: d.Filesize,
Link: d.Link, Link: d.Link,

View File

@@ -9,7 +9,7 @@ import (
"github.com/sirrobot01/debrid-blackhole/internal/logger" "github.com/sirrobot01/debrid-blackhole/internal/logger"
"github.com/sirrobot01/debrid-blackhole/internal/request" "github.com/sirrobot01/debrid-blackhole/internal/request"
"github.com/sirrobot01/debrid-blackhole/internal/utils" "github.com/sirrobot01/debrid-blackhole/internal/utils"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/torrent" "github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"time" "time"
"mime/multipart" "mime/multipart"
@@ -93,7 +93,7 @@ func (tb *Torbox) IsAvailable(hashes []string) map[string]bool {
return result return result
} }
func (tb *Torbox) SubmitMagnet(torrent *torrent.Torrent) (*torrent.Torrent, error) { func (tb *Torbox) SubmitMagnet(torrent *types.Torrent) (*types.Torrent, error) {
url := fmt.Sprintf("%s/api/torrents/createtorrent", tb.Host) url := fmt.Sprintf("%s/api/torrents/createtorrent", tb.Host)
payload := &bytes.Buffer{} payload := &bytes.Buffer{}
writer := multipart.NewWriter(payload) writer := multipart.NewWriter(payload)
@@ -141,7 +141,7 @@ func getTorboxStatus(status string, finished bool) string {
} }
} }
func (tb *Torbox) UpdateTorrent(t *torrent.Torrent) error { func (tb *Torbox) UpdateTorrent(t *types.Torrent) error {
url := fmt.Sprintf("%s/api/torrents/mylist/?id=%s", tb.Host, t.Id) url := fmt.Sprintf("%s/api/torrents/mylist/?id=%s", tb.Host, t.Id)
req, _ := http.NewRequest(http.MethodGet, url, nil) req, _ := http.NewRequest(http.MethodGet, url, nil)
resp, err := tb.client.MakeRequest(req) resp, err := tb.client.MakeRequest(req)
@@ -180,7 +180,7 @@ func (tb *Torbox) UpdateTorrent(t *torrent.Torrent) error {
if !cfg.IsSizeAllowed(f.Size) { if !cfg.IsSizeAllowed(f.Size) {
continue continue
} }
file := torrent.File{ file := types.File{
Id: strconv.Itoa(f.Id), Id: strconv.Itoa(f.Id),
Name: fileName, Name: fileName,
Size: f.Size, Size: f.Size,
@@ -200,7 +200,7 @@ func (tb *Torbox) UpdateTorrent(t *torrent.Torrent) error {
return nil return nil
} }
func (tb *Torbox) CheckStatus(torrent *torrent.Torrent, isSymlink bool) (*torrent.Torrent, error) { func (tb *Torbox) CheckStatus(torrent *types.Torrent, isSymlink bool) (*types.Torrent, error) {
for { for {
err := tb.UpdateTorrent(torrent) err := tb.UpdateTorrent(torrent)
@@ -232,7 +232,7 @@ func (tb *Torbox) CheckStatus(torrent *torrent.Torrent, isSymlink bool) (*torren
return torrent, nil return torrent, nil
} }
func (tb *Torbox) DeleteTorrent(torrent *torrent.Torrent) { func (tb *Torbox) DeleteTorrent(torrent *types.Torrent) {
url := fmt.Sprintf("%s/api/torrents/controltorrent/%s", tb.Host, torrent.Id) url := fmt.Sprintf("%s/api/torrents/controltorrent/%s", tb.Host, torrent.Id)
payload := map[string]string{"torrent_id": torrent.Id, "action": "Delete"} payload := map[string]string{"torrent_id": torrent.Id, "action": "Delete"}
jsonPayload, _ := json.Marshal(payload) jsonPayload, _ := json.Marshal(payload)
@@ -245,7 +245,7 @@ func (tb *Torbox) DeleteTorrent(torrent *torrent.Torrent) {
} }
} }
func (tb *Torbox) GenerateDownloadLinks(t *torrent.Torrent) error { func (tb *Torbox) GenerateDownloadLinks(t *types.Torrent) error {
for _, file := range t.Files { for _, file := range t.Files {
url := fmt.Sprintf("%s/api/torrents/requestdl/", tb.Host) url := fmt.Sprintf("%s/api/torrents/requestdl/", tb.Host)
query := gourl.Values{} query := gourl.Values{}
@@ -273,7 +273,7 @@ func (tb *Torbox) GenerateDownloadLinks(t *torrent.Torrent) error {
return nil return nil
} }
func (tb *Torbox) GetDownloadLink(t *torrent.Torrent, file *torrent.File) *torrent.File { func (tb *Torbox) GetDownloadLink(t *types.Torrent, file *types.File) *types.File {
url := fmt.Sprintf("%s/api/torrents/requestdl/", tb.Host) url := fmt.Sprintf("%s/api/torrents/requestdl/", tb.Host)
query := gourl.Values{} query := gourl.Values{}
query.Add("torrent_id", t.Id) query.Add("torrent_id", t.Id)
@@ -306,7 +306,7 @@ func (tb *Torbox) GetCheckCached() bool {
return tb.CheckCached return tb.CheckCached
} }
func (tb *Torbox) GetTorrents() ([]*torrent.Torrent, error) { func (tb *Torbox) GetTorrents() ([]*types.Torrent, error) {
return nil, nil return nil, nil
} }
@@ -336,10 +336,10 @@ func New(dc config.Debrid) *Torbox {
} }
} }
func (tb *Torbox) ConvertLinksToFiles(links []string) []torrent.File { func (tb *Torbox) ConvertLinksToFiles(links []string) []types.File {
return nil return nil
} }
func (tb *Torbox) GetDownloads() (map[string]torrent.DownloadLinks, error) { func (tb *Torbox) GetDownloads() (map[string]types.DownloadLinks, error) {
return nil, nil return nil, nil
} }

View File

@@ -0,0 +1,23 @@
package types
import (
"github.com/rs/zerolog"
)
type Client interface {
SubmitMagnet(tr *Torrent) (*Torrent, error)
CheckStatus(tr *Torrent, isSymlink bool) (*Torrent, error)
GenerateDownloadLinks(tr *Torrent) error
GetDownloadLink(tr *Torrent, file *File) *File
ConvertLinksToFiles(links []string) []File
DeleteTorrent(tr *Torrent)
IsAvailable(infohashes []string) map[string]bool
GetCheckCached() bool
GetDownloadUncached() bool
UpdateTorrent(torrent *Torrent) error
GetTorrents() ([]*Torrent, error)
GetName() string
GetLogger() zerolog.Logger
GetDownloadingStatus() []string
GetDownloads() (map[string]DownloadLinks, error)
}

View File

@@ -1,4 +1,4 @@
package torrent package types
import ( import (
"fmt" "fmt"

View File

@@ -5,7 +5,7 @@ import (
"fmt" "fmt"
"github.com/cavaliergopher/grab/v3" "github.com/cavaliergopher/grab/v3"
"github.com/sirrobot01/debrid-blackhole/internal/utils" "github.com/sirrobot01/debrid-blackhole/internal/utils"
debrid "github.com/sirrobot01/debrid-blackhole/pkg/debrid/torrent" debrid "github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"io" "io"
"net/http" "net/http"
"os" "os"
@@ -154,8 +154,13 @@ func (q *QBit) ProcessSymlink(torrent *Torrent) (string, error) {
torrentFolder = utils.RemoveExtension(torrentFolder) torrentFolder = utils.RemoveExtension(torrentFolder)
torrentRclonePath = rCloneBase // /mnt/rclone/magnets/ // Remove the filename since it's in the root folder torrentRclonePath = rCloneBase // /mnt/rclone/magnets/ // Remove the filename since it's in the root folder
} }
torrentSymlinkPath := filepath.Join(q.DownloadFolder, debridTorrent.Arr.Name, torrentFolder) // /mnt/symlinks/{category}/MyTVShow/ return q.createSymlinks(debridTorrent, torrentRclonePath, torrentFolder) // verify cos we're using external webdav
err = os.MkdirAll(torrentSymlinkPath, os.ModePerm) }
func (q *QBit) createSymlinks(debridTorrent *debrid.Torrent, rclonePath, torrentFolder string) (string, error) {
files := debridTorrent.Files
torrentSymlinkPath := filepath.Join(q.DownloadFolder, debridTorrent.Arr.Name, torrentFolder)
err := os.MkdirAll(torrentSymlinkPath, os.ModePerm)
if err != nil { if err != nil {
return "", fmt.Errorf("failed to create directory: %s: %v", torrentSymlinkPath, err) return "", fmt.Errorf("failed to create directory: %s: %v", torrentSymlinkPath, err)
} }
@@ -164,16 +169,16 @@ func (q *QBit) ProcessSymlink(torrent *Torrent) (string, error) {
for _, file := range files { for _, file := range files {
pending[file.Path] = file pending[file.Path] = file
} }
ticker := time.NewTicker(200 * time.Millisecond) ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop() defer ticker.Stop()
for len(pending) > 0 { for len(pending) > 0 {
<-ticker.C <-ticker.C
for path, file := range pending { for path, file := range pending {
fullFilePath := filepath.Join(torrentRclonePath, file.Path) fullFilePath := filepath.Join(rclonePath, file.Path)
if _, err := os.Stat(fullFilePath); !os.IsNotExist(err) { if _, err := os.Stat(fullFilePath); !os.IsNotExist(err) {
q.logger.Info().Msgf("File is ready: %s", file.Path) q.logger.Info().Msgf("File is ready: %s", file.Path)
q.createSymLink(torrentSymlinkPath, torrentRclonePath, file) q.createSymLink(torrentSymlinkPath, rclonePath, file)
delete(pending, path) delete(pending, path)
} }
} }

View File

@@ -3,12 +3,12 @@ package qbit
import ( import (
"fmt" "fmt"
"github.com/sirrobot01/debrid-blackhole/internal/utils" "github.com/sirrobot01/debrid-blackhole/internal/utils"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/debrid"
"github.com/sirrobot01/debrid-blackhole/pkg/service" "github.com/sirrobot01/debrid-blackhole/pkg/service"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/sirrobot01/debrid-blackhole/pkg/arr" "github.com/sirrobot01/debrid-blackhole/pkg/arr"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid"
) )
type ImportRequest struct { type ImportRequest struct {

View File

@@ -7,8 +7,8 @@ import (
"github.com/sirrobot01/debrid-blackhole/internal/request" "github.com/sirrobot01/debrid-blackhole/internal/request"
"github.com/sirrobot01/debrid-blackhole/internal/utils" "github.com/sirrobot01/debrid-blackhole/internal/utils"
"github.com/sirrobot01/debrid-blackhole/pkg/arr" "github.com/sirrobot01/debrid-blackhole/pkg/arr"
db "github.com/sirrobot01/debrid-blackhole/pkg/debrid" db "github.com/sirrobot01/debrid-blackhole/pkg/debrid/debrid"
debrid "github.com/sirrobot01/debrid-blackhole/pkg/debrid/torrent" debrid "github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"github.com/sirrobot01/debrid-blackhole/pkg/service" "github.com/sirrobot01/debrid-blackhole/pkg/service"
"io" "io"
"mime/multipart" "mime/multipart"
@@ -74,13 +74,14 @@ func (q *QBit) Process(ctx context.Context, magnet *utils.Magnet, category strin
} }
func (q *QBit) ProcessFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr *arr.Arr, isSymlink bool) { func (q *QBit) ProcessFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr *arr.Arr, isSymlink bool) {
debridClient := service.GetDebrid().GetByName(debridTorrent.Debrid) svc := service.GetService()
client := svc.Debrid.GetByName(debridTorrent.Debrid)
for debridTorrent.Status != "downloaded" { for debridTorrent.Status != "downloaded" {
q.logger.Debug().Msgf("%s <- (%s) Download Progress: %.2f%%", debridTorrent.Debrid, debridTorrent.Name, debridTorrent.Progress) q.logger.Debug().Msgf("%s <- (%s) Download Progress: %.2f%%", debridTorrent.Debrid, debridTorrent.Name, debridTorrent.Progress)
dbT, err := debridClient.CheckStatus(debridTorrent, isSymlink) dbT, err := client.CheckStatus(debridTorrent, isSymlink)
if err != nil { if err != nil {
q.logger.Error().Msgf("Error checking status: %v", err) q.logger.Error().Msgf("Error checking status: %v", err)
go debridClient.DeleteTorrent(debridTorrent) go client.DeleteTorrent(debridTorrent)
q.MarkAsFailed(torrent) q.MarkAsFailed(torrent)
if err := arr.Refresh(); err != nil { if err := arr.Refresh(); err != nil {
q.logger.Error().Msgf("Error refreshing arr: %v", err) q.logger.Error().Msgf("Error refreshing arr: %v", err)
@@ -92,7 +93,7 @@ func (q *QBit) ProcessFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr
torrent = q.UpdateTorrentMin(torrent, debridTorrent) torrent = q.UpdateTorrentMin(torrent, debridTorrent)
// Exit the loop for downloading statuses to prevent memory buildup // Exit the loop for downloading statuses to prevent memory buildup
if !slices.Contains(debridClient.GetDownloadingStatus(), debridTorrent.Status) { if !slices.Contains(client.GetDownloadingStatus(), debridTorrent.Status) {
break break
} }
time.Sleep(time.Duration(q.RefreshInterval) * time.Second) time.Sleep(time.Duration(q.RefreshInterval) * time.Second)
@@ -102,14 +103,51 @@ func (q *QBit) ProcessFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr
err error err error
) )
debridTorrent.Arr = arr debridTorrent.Arr = arr
// File is done downloading at this stage
// Check if debrid supports webdav by checking cache
if isSymlink { if isSymlink {
torrentSymlinkPath, err = q.ProcessSymlink(torrent) // /mnt/symlinks/{category}/MyTVShow/ cache, ok := svc.Debrid.Caches[debridTorrent.Debrid]
if ok {
q.logger.Info().Msgf("Using internal webdav for %s", debridTorrent.Debrid)
// Use webdav to download the file
err := cache.ProcessTorrent(debridTorrent, true)
if err != nil {
return
}
rclonePath := filepath.Join(debridTorrent.MountPath, debridTorrent.Name)
// Check if folder exists here
if _, err := os.Stat(rclonePath); os.IsNotExist(err) {
q.logger.Debug().Msgf("Folder does not exist: %s", rclonePath)
// Check if torrent is in the listing
listing := cache.GetListing()
for _, t := range listing {
if t.Name() == debridTorrent.Name {
q.logger.Debug().Msgf("Torrent found in listing: %s", debridTorrent.Name)
}
}
// Check if torrent is in the webdav
if t := cache.GetTorrentByName(debridTorrent.Name); t == nil {
q.logger.Debug().Msgf("Torrent not found in webdav: %s", debridTorrent.Name)
}
}
torrentSymlinkPath, err = q.createSymlinks(debridTorrent, rclonePath, debridTorrent.Name)
} else {
// User is using either zurg or debrid webdav
torrentSymlinkPath, err = q.ProcessSymlink(torrent) // /mnt/symlinks/{category}/MyTVShow/
}
} else { } else {
torrentSymlinkPath, err = q.ProcessManualFile(torrent) torrentSymlinkPath, err = q.ProcessManualFile(torrent)
} }
if err != nil { if err != nil {
q.MarkAsFailed(torrent) q.MarkAsFailed(torrent)
go debridClient.DeleteTorrent(debridTorrent) go client.DeleteTorrent(debridTorrent)
q.logger.Info().Msgf("Error: %v", err) q.logger.Info().Msgf("Error: %v", err)
return return
} }

View File

@@ -2,7 +2,7 @@ package qbit
import ( import (
"fmt" "fmt"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/torrent" "github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"sync" "sync"
) )
@@ -173,10 +173,10 @@ type TorrentCategory struct {
} }
type Torrent struct { type Torrent struct {
ID string `json:"id"` ID string `json:"id"`
DebridTorrent *torrent.Torrent `json:"-"` DebridTorrent *types.Torrent `json:"-"`
Debrid string `json:"debrid"` Debrid string `json:"debrid"`
TorrentPath string `json:"-"` TorrentPath string `json:"-"`
AddedOn int64 `json:"added_on,omitempty"` AddedOn int64 `json:"added_on,omitempty"`
AmountLeft int64 `json:"amount_left"` AmountLeft int64 `json:"amount_left"`

View File

@@ -10,7 +10,7 @@ import (
"github.com/sirrobot01/debrid-blackhole/internal/logger" "github.com/sirrobot01/debrid-blackhole/internal/logger"
"github.com/sirrobot01/debrid-blackhole/internal/request" "github.com/sirrobot01/debrid-blackhole/internal/request"
"github.com/sirrobot01/debrid-blackhole/pkg/arr" "github.com/sirrobot01/debrid-blackhole/pkg/arr"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/debrid" "github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"net" "net"
"net/http" "net/http"
@@ -29,7 +29,7 @@ import (
type Repair struct { type Repair struct {
Jobs map[string]*Job Jobs map[string]*Job
arrs *arr.Storage arrs *arr.Storage
deb debrid.Client deb types.Client
duration time.Duration duration time.Duration
runOnStart bool runOnStart bool
ZurgURL string ZurgURL string

View File

@@ -2,8 +2,7 @@ package service
import ( import (
"github.com/sirrobot01/debrid-blackhole/pkg/arr" "github.com/sirrobot01/debrid-blackhole/pkg/arr"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid" "github.com/sirrobot01/debrid-blackhole/pkg/debrid/debrid"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/engine"
"github.com/sirrobot01/debrid-blackhole/pkg/repair" "github.com/sirrobot01/debrid-blackhole/pkg/repair"
"sync" "sync"
) )
@@ -11,7 +10,7 @@ import (
type Service struct { type Service struct {
Repair *repair.Repair Repair *repair.Repair
Arr *arr.Storage Arr *arr.Storage
Debrid *engine.Engine Debrid *debrid.Engine
} }
var ( var (
@@ -22,7 +21,7 @@ var (
func New() *Service { func New() *Service {
once.Do(func() { once.Do(func() {
arrs := arr.NewStorage() arrs := arr.NewStorage()
deb := debrid.New() deb := debrid.NewEngine()
instance = &Service{ instance = &Service{
Repair: repair.New(arrs), Repair: repair.New(arrs),
Arr: arrs, Arr: arrs,
@@ -42,7 +41,7 @@ func GetService() *Service {
func Update() *Service { func Update() *Service {
arrs := arr.NewStorage() arrs := arr.NewStorage()
deb := debrid.New() deb := debrid.NewEngine()
instance = &Service{ instance = &Service{
Repair: repair.New(arrs), Repair: repair.New(arrs),
Arr: arrs, Arr: arrs,
@@ -51,6 +50,6 @@ func Update() *Service {
return instance return instance
} }
func GetDebrid() *engine.Engine { func GetDebrid() *debrid.Engine {
return GetService().Debrid return GetService().Debrid
} }

View File

@@ -2,6 +2,7 @@ package webdav
import ( import (
"fmt" "fmt"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/debrid"
"io" "io"
"net/http" "net/http"
"os" "os"
@@ -20,18 +21,19 @@ var sharedClient = &http.Client{
} }
type File struct { type File struct {
cache *Cache cache *debrid.Cache
fileId string fileId string
torrentId string torrentId string
size int64 size int64
offset int64 offset int64
isDir bool isDir bool
children []os.FileInfo children []os.FileInfo
reader io.ReadCloser reader io.ReadCloser
seekPending bool seekPending bool
content []byte content []byte
name string name string
metadataOnly bool
downloadLink string downloadLink string
link string link string
@@ -49,11 +51,12 @@ func (f *File) Close() error {
func (f *File) GetDownloadLink() string { func (f *File) GetDownloadLink() string {
// Check if we already have a final URL cached // Check if we already have a final URL cached
if f.downloadLink != "" {
if f.downloadLink != "" && isValidURL(f.downloadLink) {
return f.downloadLink return f.downloadLink
} }
downloadLink := f.cache.GetDownloadLink(f.torrentId, f.name, f.link) downloadLink := f.cache.GetDownloadLink(f.torrentId, f.name, f.link)
if downloadLink != "" { if downloadLink != "" && isValidURL(downloadLink) {
f.downloadLink = downloadLink f.downloadLink = downloadLink
return downloadLink return downloadLink
} }
@@ -65,6 +68,9 @@ func (f *File) Read(p []byte) (n int, err error) {
if f.isDir { if f.isDir {
return 0, os.ErrInvalid return 0, os.ErrInvalid
} }
if f.metadataOnly {
return 0, io.EOF
}
// If file content is preloaded, read from memory. // If file content is preloaded, read from memory.
if f.content != nil { if f.content != nil {

View File

@@ -2,11 +2,13 @@ package webdav
import ( import (
"bytes" "bytes"
"compress/gzip"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/torrent" "github.com/sirrobot01/debrid-blackhole/pkg/debrid/debrid"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"golang.org/x/net/webdav" "golang.org/x/net/webdav"
"html/template" "html/template"
"io" "io"
@@ -14,7 +16,7 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os" "os"
"path" path "path/filepath"
"slices" "slices"
"strings" "strings"
"sync" "sync"
@@ -24,7 +26,7 @@ import (
type Handler struct { type Handler struct {
Name string Name string
logger zerolog.Logger logger zerolog.Logger
cache *Cache cache *debrid.Cache
lastRefresh time.Time lastRefresh time.Time
refreshMutex sync.Mutex refreshMutex sync.Mutex
RootPath string RootPath string
@@ -33,7 +35,7 @@ type Handler struct {
ctx context.Context ctx context.Context
} }
func NewHandler(name string, cache *Cache, logger zerolog.Logger) *Handler { func NewHandler(name string, cache *debrid.Cache, logger zerolog.Logger) *Handler {
h := &Handler{ h := &Handler{
Name: name, Name: name,
cache: cache, cache: cache,
@@ -67,9 +69,7 @@ func (h *Handler) RemoveAll(ctx context.Context, name string) error {
if filename == "" { if filename == "" {
h.cache.GetClient().DeleteTorrent(cachedTorrent.Torrent) h.cache.GetClient().DeleteTorrent(cachedTorrent.Torrent)
go h.cache.refreshListings() h.cache.OnRemove(cachedTorrent.Id)
go h.cache.refreshTorrents()
go h.cache.resetPropfindResponse()
return nil return nil
} }
@@ -117,22 +117,29 @@ func (h *Handler) OpenFile(ctx context.Context, name string, flag int, perm os.F
name = path.Clean("/" + name) name = path.Clean("/" + name)
rootDir := h.getRootPath() rootDir := h.getRootPath()
metadataOnly := false
if ctx.Value("metadataOnly") != nil {
metadataOnly = true
}
// Fast path optimization with a map lookup instead of string comparisons // Fast path optimization with a map lookup instead of string comparisons
switch name { switch name {
case rootDir: case rootDir:
return &File{ return &File{
cache: h.cache, cache: h.cache,
isDir: true, isDir: true,
children: h.getParentFiles(), children: h.getParentFiles(),
name: "/", name: "/",
metadataOnly: metadataOnly,
}, nil }, nil
case path.Join(rootDir, "version.txt"): case path.Join(rootDir, "version.txt"):
return &File{ return &File{
cache: h.cache, cache: h.cache,
isDir: false, isDir: false,
content: []byte("v1.0.0"), content: []byte("v1.0.0"),
name: "version.txt", name: "version.txt",
size: int64(len("v1.0.0")), size: int64(len("v1.0.0")),
metadataOnly: metadataOnly,
}, nil }, nil
} }
@@ -145,11 +152,12 @@ func (h *Handler) OpenFile(ctx context.Context, name string, flag int, perm os.F
children := h.getTorrentsFolders() children := h.getTorrentsFolders()
return &File{ return &File{
cache: h.cache, cache: h.cache,
isDir: true, isDir: true,
children: children, children: children,
name: folderName, name: folderName,
size: 0, size: 0,
metadataOnly: metadataOnly,
}, nil }, nil
} }
@@ -168,12 +176,13 @@ func (h *Handler) OpenFile(ctx context.Context, name string, flag int, perm os.F
if len(parts) == 2 { if len(parts) == 2 {
// Torrent folder level // Torrent folder level
return &File{ return &File{
cache: h.cache, cache: h.cache,
torrentId: cachedTorrent.Id, torrentId: cachedTorrent.Id,
isDir: true, isDir: true,
children: h.getFileInfos(cachedTorrent.Torrent), children: h.getFileInfos(cachedTorrent.Torrent),
name: cachedTorrent.Name, name: cachedTorrent.Name,
size: cachedTorrent.Size, size: cachedTorrent.Size,
metadataOnly: metadataOnly,
}, nil }, nil
} }
@@ -189,6 +198,7 @@ func (h *Handler) OpenFile(ctx context.Context, name string, flag int, perm os.F
size: file.Size, size: file.Size,
link: file.Link, link: file.Link,
downloadLink: file.DownloadLink, downloadLink: file.DownloadLink,
metadataOnly: metadataOnly,
} }
return fi, nil return fi, nil
} }
@@ -207,7 +217,7 @@ func (h *Handler) Stat(ctx context.Context, name string) (os.FileInfo, error) {
return f.Stat() return f.Stat()
} }
func (h *Handler) getFileInfos(torrent *torrent.Torrent) []os.FileInfo { func (h *Handler) getFileInfos(torrent *types.Torrent) []os.FileInfo {
files := make([]os.FileInfo, 0, len(torrent.Files)) files := make([]os.FileInfo, 0, len(torrent.Files))
now := time.Now() now := time.Now()
for _, file := range torrent.Files { for _, file := range torrent.Files {
@@ -232,34 +242,28 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Cache PROPFIND responses for a short time to reduce load. // Cache PROPFIND responses for a short time to reduce load.
if r.Method == "PROPFIND" { if r.Method == "PROPFIND" {
// Determine the Depth; default to "1" if not provided. // Determine the Depth; default to "1" if not provided.
// Set metadata only
ctx := context.WithValue(r.Context(), "metadataOnly", true)
r = r.WithContext(ctx)
cleanPath := path.Clean(r.URL.Path)
depth := r.Header.Get("Depth") depth := r.Header.Get("Depth")
if depth == "" { if depth == "" {
depth = "1" depth = "1"
} }
// Use both path and Depth header to form the cache key. // Use both path and Depth header to form the cache key.
cacheKey := fmt.Sprintf("propfind:%s:%s", r.URL.Path, depth) cacheKey := fmt.Sprintf("propfind:%s:%s", cleanPath, depth)
// Determine TTL based on the requested folder: // Determine TTL based on the requested folder:
// - If the path is exactly the parent folder (which changes frequently), // - If the path is exactly the parent folder (which changes frequently),
// use a short TTL. // use a short TTL.
// - Otherwise, for deeper (torrent folder) paths, use a longer TTL. // - Otherwise, for deeper (torrent folder) paths, use a longer TTL.
var ttl time.Duration ttl := 30 * time.Minute
if h.isParentPath(r.URL.Path) { if h.isParentPath(r.URL.Path) {
ttl = 10 * time.Second ttl = 20 * time.Second
} else {
ttl = 1 * time.Minute
} }
// Check if we have a cached response that hasn't expired. if served := h.serveFromCacheIfValid(w, r, cacheKey, ttl); served {
if cached, ok := h.cache.propfindResp.Load(cacheKey); ok { return
if respCache, ok := cached.(propfindResponse); ok {
if time.Since(respCache.ts) < ttl {
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(respCache.data)))
w.Write(respCache.data)
return
}
}
} }
// No valid cache entry; process the PROPFIND request. // No valid cache entry; process the PROPFIND request.
@@ -276,10 +280,22 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
handler.ServeHTTP(responseRecorder, r) handler.ServeHTTP(responseRecorder, r)
responseData := responseRecorder.Body.Bytes() responseData := responseRecorder.Body.Bytes()
// Store the new response in the cache. // Create compressed version
h.cache.propfindResp.Store(cacheKey, propfindResponse{ var gzippedData []byte
data: responseData, if len(responseData) > 0 {
ts: time.Now(), var buf bytes.Buffer
gzw := gzip.NewWriter(&buf)
if _, err := gzw.Write(responseData); err == nil {
if err := gzw.Close(); err == nil {
gzippedData = buf.Bytes()
}
}
}
h.cache.PropfindResp.Store(cacheKey, debrid.PropfindResponse{
Data: responseData,
GzippedData: gzippedData,
Ts: time.Now(),
}) })
// Forward the captured response to the client. // Forward the captured response to the client.
@@ -332,58 +348,6 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Serve the file with the correct modification time. // Serve the file with the correct modification time.
// http.ServeContent automatically handles Range requests. // http.ServeContent automatically handles Range requests.
http.ServeContent(w, r, fileName, fi.ModTime(), rs) http.ServeContent(w, r, fileName, fi.ModTime(), rs)
// Set headers to indicate support for range requests and content type.
//fileName := fi.Name()
//w.Header().Set("Accept-Ranges", "bytes")
//w.Header().Set("Content-Type", getContentType(fileName))
//
//// If a Range header is provided, parse and handle partial content.
//rangeHeader := r.Header.Get("Range")
//if rangeHeader != "" {
// parts := strings.Split(strings.TrimPrefix(rangeHeader, "bytes="), "-")
// if len(parts) == 2 {
// start, startErr := strconv.ParseInt(parts[0], 10, 64)
// end := fi.Size() - 1
// if parts[1] != "" {
// var endErr error
// end, endErr = strconv.ParseInt(parts[1], 10, 64)
// if endErr != nil {
// end = fi.Size() - 1
// }
// }
//
// if startErr == nil && start < fi.Size() {
// if start > end {
// start, end = end, start
// }
// if end >= fi.Size() {
// end = fi.Size() - 1
// }
//
// contentLength := end - start + 1
// w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, fi.Size()))
// w.Header().Set("Content-Length", fmt.Sprintf("%d", contentLength))
// w.WriteHeader(http.StatusPartialContent)
//
// // Attempt to cast to your concrete File type to call Seek.
// if file, ok := f.(*File); ok {
// _, err = file.Seek(start, io.SeekStart)
// if err != nil {
// h.logger.Error().Err(err).Msg("Failed to seek in file")
// http.Error(w, "Server Error", http.StatusInternalServerError)
// return
// }
//
// limitedReader := io.LimitReader(f, contentLength)
// h.ioCopy(limitedReader, w)
// return
// }
// }
// }
//}
//w.Header().Set("Content-Length", fmt.Sprintf("%d", fi.Size()))
//h.ioCopy(f, w)
return return
} }
@@ -433,13 +397,43 @@ func (h *Handler) isParentPath(_path string) bool {
rootPath := h.getRootPath() rootPath := h.getRootPath()
parents := h.getParentItems() parents := h.getParentItems()
for _, p := range parents { for _, p := range parents {
if _path == path.Join(rootPath, p) { if path.Clean(_path) == path.Clean(path.Join(rootPath, p)) {
return true return true
} }
} }
return false return false
} }
func (h *Handler) serveFromCacheIfValid(w http.ResponseWriter, r *http.Request, cacheKey string, ttl time.Duration) bool {
cached, ok := h.cache.PropfindResp.Load(cacheKey)
if !ok {
return false
}
respCache, ok := cached.(debrid.PropfindResponse)
if !ok {
return false
}
if time.Since(respCache.Ts) >= ttl {
// Remove expired cache entry
h.cache.PropfindResp.Delete(cacheKey)
return false
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
if acceptsGzip(r) && len(respCache.GzippedData) > 0 {
w.Header().Set("Content-Encoding", "gzip")
w.Header().Set("Vary", "Accept-Encoding")
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(respCache.GzippedData)))
w.Write(respCache.GzippedData)
} else {
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(respCache.Data)))
w.Write(respCache.Data)
}
return true
}
func (h *Handler) serveDirectory(w http.ResponseWriter, r *http.Request, file webdav.File) { func (h *Handler) serveDirectory(w http.ResponseWriter, r *http.Request, file webdav.File) {
var children []os.FileInfo var children []os.FileInfo
if f, ok := file.(*File); ok { if f, ok := file.(*File); ok {

View File

@@ -1,6 +1,10 @@
package webdav package webdav
import "strings" import (
"net/http"
"net/url"
"strings"
)
// getName: Returns the torrent name and filename from the path // getName: Returns the torrent name and filename from the path
// /webdav/alldebrid/__all__/TorrentName // /webdav/alldebrid/__all__/TorrentName
@@ -12,3 +16,13 @@ func getName(rootDir, path string) (string, string) {
} }
return parts[0], strings.Join(parts[1:], "/") return parts[0], strings.Join(parts[1:], "/")
} }
func acceptsGzip(r *http.Request) bool {
return strings.Contains(r.Header.Get("Accept-Encoding"), "gzip")
}
func isValidURL(str string) bool {
u, err := url.Parse(str)
// A valid URL should parse without error, and have a non-empty scheme and host.
return err == nil && u.Scheme != "" && u.Host != ""
}

View File

@@ -20,9 +20,7 @@ func New() *WebDav {
w := &WebDav{ w := &WebDav{
Handlers: make([]*Handler, 0), Handlers: make([]*Handler, 0),
} }
debrids := svc.Debrid.GetDebrids() for name, c := range svc.Debrid.Caches {
cacheManager := NewCacheManager(debrids)
for name, c := range cacheManager.GetCaches() {
h := NewHandler(name, c, logger.NewLogger(fmt.Sprintf("%s-webdav", name))) h := NewHandler(name, c, logger.NewLogger(fmt.Sprintf("%s-webdav", name)))
w.Handlers = append(w.Handlers, h) w.Handlers = append(w.Handlers, h)
} }

View File

@@ -1,69 +0,0 @@
package webdav
import "time"
func (c *Cache) Refresh() error {
// For now, we just want to refresh the listing and download links
c.logger.Info().Msg("Starting cache refresh workers")
go c.refreshListingWorker()
go c.refreshDownloadLinksWorker()
go c.refreshTorrentsWorker()
return nil
}
func (c *Cache) refreshListingWorker() {
refreshTicker := time.NewTicker(10 * time.Second)
defer refreshTicker.Stop()
for {
select {
case <-refreshTicker.C:
if c.listingRefreshMu.TryLock() {
func() {
defer c.listingRefreshMu.Unlock()
c.refreshListings()
}()
} else {
c.logger.Debug().Msg("Refresh already in progress")
}
}
}
}
func (c *Cache) refreshDownloadLinksWorker() {
refreshTicker := time.NewTicker(40 * time.Minute)
defer refreshTicker.Stop()
for {
select {
case <-refreshTicker.C:
if c.downloadLinksRefreshMu.TryLock() {
func() {
defer c.downloadLinksRefreshMu.Unlock()
c.refreshDownloadLinks()
}()
} else {
c.logger.Debug().Msg("Refresh already in progress")
}
}
}
}
func (c *Cache) refreshTorrentsWorker() {
refreshTicker := time.NewTicker(5 * time.Second)
defer refreshTicker.Stop()
for {
select {
case <-refreshTicker.C:
if c.listingRefreshMu.TryLock() {
func() {
defer c.listingRefreshMu.Unlock()
c.refreshTorrents()
}()
} else {
c.logger.Debug().Msg("Refresh already in progress")
}
}
}
}