367 lines
8.4 KiB
Go
367 lines
8.4 KiB
Go
package debrid
|
|
|
|
import (
|
|
"cmp"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/sirrobot01/decypharr/internal/config"
|
|
"github.com/sirrobot01/decypharr/internal/logger"
|
|
"github.com/sirrobot01/decypharr/internal/request"
|
|
"github.com/sirrobot01/decypharr/internal/utils"
|
|
"github.com/sirrobot01/decypharr/pkg/arr"
|
|
"github.com/sirrobot01/decypharr/pkg/debrid/common"
|
|
"github.com/sirrobot01/decypharr/pkg/debrid/providers/alldebrid"
|
|
"github.com/sirrobot01/decypharr/pkg/debrid/providers/debridlink"
|
|
"github.com/sirrobot01/decypharr/pkg/debrid/providers/realdebrid"
|
|
"github.com/sirrobot01/decypharr/pkg/debrid/providers/torbox"
|
|
debridStore "github.com/sirrobot01/decypharr/pkg/debrid/store"
|
|
"github.com/sirrobot01/decypharr/pkg/debrid/types"
|
|
"github.com/sirrobot01/decypharr/pkg/rclone"
|
|
"go.uber.org/ratelimit"
|
|
)
|
|
|
|
type Debrid struct {
|
|
cache *debridStore.Cache // Could be nil if not using WebDAV
|
|
client common.Client // HTTP client for making requests to the debrid service
|
|
}
|
|
|
|
func (de *Debrid) Client() common.Client {
|
|
return de.client
|
|
}
|
|
|
|
func (de *Debrid) Cache() *debridStore.Cache {
|
|
return de.cache
|
|
}
|
|
|
|
func (de *Debrid) Reset() {
|
|
if de.cache != nil {
|
|
de.cache.Reset()
|
|
}
|
|
}
|
|
|
|
type Storage struct {
|
|
debrids map[string]*Debrid
|
|
mu sync.RWMutex
|
|
lastUsed string
|
|
}
|
|
|
|
func NewStorage(rcManager *rclone.Manager) *Storage {
|
|
cfg := config.Get()
|
|
|
|
_logger := logger.Default()
|
|
|
|
debrids := make(map[string]*Debrid)
|
|
|
|
bindAddress := cfg.BindAddress
|
|
if bindAddress == "" {
|
|
bindAddress = "localhost"
|
|
}
|
|
webdavUrl := fmt.Sprintf("http://%s:%s%s/webdav", bindAddress, cfg.Port, cfg.URLBase)
|
|
|
|
for _, dc := range cfg.Debrids {
|
|
client, err := createDebridClient(dc)
|
|
if err != nil {
|
|
_logger.Error().Err(err).Str("Debrid", dc.Name).Msg("failed to connect to debrid client")
|
|
continue
|
|
}
|
|
var (
|
|
cache *debridStore.Cache
|
|
mounter *rclone.Mount
|
|
)
|
|
_log := client.Logger()
|
|
if dc.UseWebDav {
|
|
if cfg.Rclone.Enabled && rcManager != nil {
|
|
mounter = rclone.NewMount(dc.Name, dc.RcloneMountPath, webdavUrl, rcManager)
|
|
}
|
|
cache = debridStore.NewDebridCache(dc, client, mounter)
|
|
_log.Info().Msg("Debrid Service started with WebDAV")
|
|
} else {
|
|
_log.Info().Msg("Debrid Service started")
|
|
}
|
|
debrids[dc.Name] = &Debrid{
|
|
cache: cache,
|
|
client: client,
|
|
}
|
|
}
|
|
|
|
d := &Storage{
|
|
debrids: debrids,
|
|
lastUsed: "",
|
|
}
|
|
return d
|
|
}
|
|
|
|
func (d *Storage) Debrid(name string) *Debrid {
|
|
d.mu.RLock()
|
|
defer d.mu.RUnlock()
|
|
if debrid, exists := d.debrids[name]; exists {
|
|
return debrid
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *Storage) StartWorker(ctx context.Context) error {
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
|
|
// Start syncAccounts worker
|
|
go d.syncAccountsWorker(ctx)
|
|
|
|
// Start bandwidth reset worker
|
|
go d.checkBandwidthWorker(ctx)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *Storage) checkBandwidthWorker(ctx context.Context) {
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
ticker := time.NewTicker(30 * time.Minute)
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
d.checkAccountBandwidth()
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (d *Storage) checkAccountBandwidth() {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
for _, debrid := range d.debrids {
|
|
if debrid == nil || debrid.client == nil {
|
|
continue
|
|
}
|
|
accountManager := debrid.client.AccountManager()
|
|
if accountManager == nil {
|
|
continue
|
|
}
|
|
accountManager.CheckAndResetBandwidth()
|
|
}
|
|
}
|
|
|
|
func (d *Storage) syncAccountsWorker(ctx context.Context) {
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
|
|
_ = d.syncAccounts()
|
|
ticker := time.NewTicker(5 * time.Minute)
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
_ = d.syncAccounts()
|
|
}
|
|
}
|
|
}()
|
|
|
|
}
|
|
|
|
func (d *Storage) syncAccounts() error {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
for name, debrid := range d.debrids {
|
|
if debrid == nil || debrid.client == nil {
|
|
continue
|
|
}
|
|
_log := debrid.client.Logger()
|
|
if err := debrid.client.SyncAccounts(); err != nil {
|
|
_log.Error().Err(err).Msgf("Failed to sync account for %s", name)
|
|
continue
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *Storage) Debrids() map[string]*Debrid {
|
|
d.mu.RLock()
|
|
defer d.mu.RUnlock()
|
|
debridsCopy := make(map[string]*Debrid)
|
|
for name, debrid := range d.debrids {
|
|
if debrid != nil {
|
|
debridsCopy[name] = debrid
|
|
}
|
|
}
|
|
return debridsCopy
|
|
}
|
|
|
|
func (d *Storage) Client(name string) common.Client {
|
|
d.mu.RLock()
|
|
defer d.mu.RUnlock()
|
|
if client, exists := d.debrids[name]; exists {
|
|
return client.client
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *Storage) Reset() {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
// Reset all debrid clients and caches
|
|
for _, debrid := range d.debrids {
|
|
if debrid != nil {
|
|
debrid.Reset()
|
|
}
|
|
}
|
|
|
|
// Reinitialize the debrids map
|
|
d.debrids = make(map[string]*Debrid)
|
|
d.lastUsed = ""
|
|
}
|
|
|
|
func (d *Storage) Clients() map[string]common.Client {
|
|
d.mu.RLock()
|
|
defer d.mu.RUnlock()
|
|
clientsCopy := make(map[string]common.Client)
|
|
for name, debrid := range d.debrids {
|
|
if debrid != nil && debrid.client != nil {
|
|
clientsCopy[name] = debrid.client
|
|
}
|
|
}
|
|
return clientsCopy
|
|
}
|
|
|
|
func (d *Storage) Caches() map[string]*debridStore.Cache {
|
|
d.mu.RLock()
|
|
defer d.mu.RUnlock()
|
|
cachesCopy := make(map[string]*debridStore.Cache)
|
|
for name, debrid := range d.debrids {
|
|
if debrid != nil && debrid.cache != nil {
|
|
cachesCopy[name] = debrid.cache
|
|
}
|
|
}
|
|
return cachesCopy
|
|
}
|
|
|
|
func (d *Storage) FilterClients(filter func(common.Client) bool) map[string]common.Client {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
filteredClients := make(map[string]common.Client)
|
|
for name, client := range d.debrids {
|
|
if client != nil && filter(client.client) {
|
|
filteredClients[name] = client.client
|
|
}
|
|
}
|
|
return filteredClients
|
|
}
|
|
|
|
func createDebridClient(dc config.Debrid) (common.Client, error) {
|
|
rateLimits := map[string]ratelimit.Limiter{}
|
|
|
|
mainRL := request.ParseRateLimit(dc.RateLimit)
|
|
repairRL := request.ParseRateLimit(cmp.Or(dc.RepairRateLimit, dc.RateLimit))
|
|
downloadRL := request.ParseRateLimit(cmp.Or(dc.DownloadRateLimit, dc.RateLimit))
|
|
|
|
rateLimits["main"] = mainRL
|
|
rateLimits["repair"] = repairRL
|
|
rateLimits["download"] = downloadRL
|
|
|
|
switch dc.Name {
|
|
case "realdebrid":
|
|
return realdebrid.New(dc, rateLimits)
|
|
case "torbox":
|
|
return torbox.New(dc, rateLimits)
|
|
case "debridlink":
|
|
return debridlink.New(dc, rateLimits)
|
|
case "alldebrid":
|
|
return alldebrid.New(dc, rateLimits)
|
|
default:
|
|
return realdebrid.New(dc, rateLimits)
|
|
}
|
|
}
|
|
|
|
func Process(ctx context.Context, store *Storage, selectedDebrid string, magnet *utils.Magnet, a *arr.Arr, action string, overrideDownloadUncached bool) (*types.Torrent, error) {
|
|
|
|
debridTorrent := &types.Torrent{
|
|
InfoHash: magnet.InfoHash,
|
|
Magnet: magnet,
|
|
Name: magnet.Name,
|
|
Arr: a,
|
|
Size: magnet.Size,
|
|
Files: make(map[string]types.File),
|
|
}
|
|
|
|
clients := store.FilterClients(func(c common.Client) bool {
|
|
if selectedDebrid != "" && c.Name() != selectedDebrid {
|
|
return false
|
|
}
|
|
return true
|
|
})
|
|
|
|
if len(clients) == 0 {
|
|
return nil, fmt.Errorf("no debrid clients available")
|
|
}
|
|
|
|
errs := make([]error, 0, len(clients))
|
|
|
|
// Override first, arr second, debrid third
|
|
|
|
if !overrideDownloadUncached && a.DownloadUncached != nil {
|
|
// Arr cached is set
|
|
overrideDownloadUncached = *a.DownloadUncached
|
|
}
|
|
|
|
for _, db := range clients {
|
|
_logger := db.Logger()
|
|
_logger.Info().
|
|
Str("Debrid", db.Name()).
|
|
Str("Arr", a.Name).
|
|
Str("Hash", debridTorrent.InfoHash).
|
|
Str("Name", debridTorrent.Name).
|
|
Str("Action", action).
|
|
Msg("Processing torrent")
|
|
|
|
// If debrid.DownloadUnached is true, it overrides everything
|
|
if db.GetDownloadUncached() || overrideDownloadUncached {
|
|
debridTorrent.DownloadUncached = true
|
|
}
|
|
|
|
dbt, err := db.SubmitMagnet(debridTorrent)
|
|
if err != nil || dbt == nil || dbt.Id == "" {
|
|
errs = append(errs, err)
|
|
continue
|
|
}
|
|
dbt.Arr = a
|
|
_logger.Info().Str("id", dbt.Id).Msgf("Torrent: %s submitted to %s", dbt.Name, db.Name())
|
|
store.lastUsed = db.Name()
|
|
|
|
torrent, err := db.CheckStatus(dbt)
|
|
if err != nil && torrent != nil && torrent.Id != "" {
|
|
// Delete the torrent if it was not downloaded
|
|
go func(id string) {
|
|
_ = db.DeleteTorrent(id)
|
|
}(torrent.Id)
|
|
}
|
|
if err != nil {
|
|
errs = append(errs, err)
|
|
continue
|
|
}
|
|
if torrent == nil {
|
|
errs = append(errs, fmt.Errorf("torrent %s returned nil after checking status", dbt.Name))
|
|
continue
|
|
}
|
|
return torrent, nil
|
|
}
|
|
if len(errs) == 0 {
|
|
return nil, fmt.Errorf("failed to process torrent: no clients available")
|
|
}
|
|
joinedErrors := errors.Join(errs...)
|
|
return nil, fmt.Errorf("failed to process torrent: %w", joinedErrors)
|
|
}
|