Torrent Queuing for Botched torrent (#83)
* Implement a queue for handling failed torrent * Add checks for getting slots * Few other cleanups, change some function names
This commit is contained in:
@@ -13,7 +13,7 @@ import (
|
||||
"github.com/sirrobot01/decypharr/internal/utils"
|
||||
)
|
||||
|
||||
func Download(client *grab.Client, url, filename string, byterange *[2]int64, progressCallback func(int64, int64)) error {
|
||||
func grabber(client *grab.Client, url, filename string, byterange *[2]int64, progressCallback func(int64, int64)) error {
|
||||
req, err := grab.NewRequest(filename, url)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -56,7 +56,7 @@ Loop:
|
||||
return resp.Err()
|
||||
}
|
||||
|
||||
func (s *Store) ProcessManualFile(torrent *Torrent) (string, error) {
|
||||
func (s *Store) processDownload(torrent *Torrent) (string, error) {
|
||||
debridTorrent := torrent.DebridTorrent
|
||||
s.logger.Info().Msgf("Downloading %d files...", len(debridTorrent.Files))
|
||||
torrentPath := filepath.Join(torrent.SavePath, utils.RemoveExtension(debridTorrent.OriginalFilename))
|
||||
@@ -96,7 +96,7 @@ func (s *Store) downloadFiles(torrent *Torrent, parent string) {
|
||||
if totalSize > 0 {
|
||||
debridTorrent.Progress = float64(debridTorrent.SizeDownloaded) / float64(totalSize) * 100
|
||||
}
|
||||
s.UpdateTorrentMin(torrent, debridTorrent)
|
||||
s.partialTorrentUpdate(torrent, debridTorrent)
|
||||
}
|
||||
client := &grab.Client{
|
||||
UserAgent: "Decypharr[QBitTorrent]",
|
||||
@@ -119,7 +119,7 @@ func (s *Store) downloadFiles(torrent *Torrent, parent string) {
|
||||
defer func() { <-s.downloadSemaphore }()
|
||||
filename := file.Name
|
||||
|
||||
err := Download(
|
||||
err := grabber(
|
||||
client,
|
||||
file.DownloadLink.DownloadLink,
|
||||
filepath.Join(parent, filename),
|
||||
@@ -151,7 +151,7 @@ func (s *Store) downloadFiles(torrent *Torrent, parent string) {
|
||||
s.logger.Info().Msgf("Downloaded all files for %s", debridTorrent.Name)
|
||||
}
|
||||
|
||||
func (s *Store) ProcessSymlink(torrent *Torrent) (string, error) {
|
||||
func (s *Store) processSymlink(torrent *Torrent) (string, error) {
|
||||
debridTorrent := torrent.DebridTorrent
|
||||
files := debridTorrent.Files
|
||||
if len(files) == 0 {
|
||||
|
||||
+104
-15
@@ -2,13 +2,16 @@ package store
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/sirrobot01/decypharr/internal/request"
|
||||
"github.com/sirrobot01/decypharr/internal/utils"
|
||||
"github.com/sirrobot01/decypharr/pkg/arr"
|
||||
debridTypes "github.com/sirrobot01/decypharr/pkg/debrid/types"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -19,23 +22,9 @@ const (
|
||||
ImportTypeAPI ImportType = "api"
|
||||
)
|
||||
|
||||
func NewImportRequest(debrid string, downloadFolder string, magnet *utils.Magnet, arr *arr.Arr, isSymlink, downloadUncached bool, callBackUrl string, importType ImportType) *ImportRequest {
|
||||
return &ImportRequest{
|
||||
Status: "started",
|
||||
DownloadFolder: downloadFolder,
|
||||
Debrid: debrid,
|
||||
Magnet: magnet,
|
||||
Arr: arr,
|
||||
IsSymlink: isSymlink,
|
||||
DownloadUncached: downloadUncached,
|
||||
CallBackUrl: callBackUrl,
|
||||
Type: importType,
|
||||
}
|
||||
}
|
||||
|
||||
type ImportRequest struct {
|
||||
DownloadFolder string `json:"downloadFolder"`
|
||||
Debrid string `json:"debrid"`
|
||||
SelectedDebrid string `json:"debrid"`
|
||||
Magnet *utils.Magnet `json:"magnet"`
|
||||
Arr *arr.Arr `json:"arr"`
|
||||
IsSymlink bool `json:"isSymlink"`
|
||||
@@ -50,6 +39,20 @@ type ImportRequest struct {
|
||||
Async bool `json:"async"`
|
||||
}
|
||||
|
||||
func NewImportRequest(debrid string, downloadFolder string, magnet *utils.Magnet, arr *arr.Arr, isSymlink, downloadUncached bool, callBackUrl string, importType ImportType) *ImportRequest {
|
||||
return &ImportRequest{
|
||||
Status: "started",
|
||||
DownloadFolder: downloadFolder,
|
||||
SelectedDebrid: debrid,
|
||||
Magnet: magnet,
|
||||
Arr: arr,
|
||||
IsSymlink: isSymlink,
|
||||
DownloadUncached: downloadUncached,
|
||||
CallBackUrl: callBackUrl,
|
||||
Type: importType,
|
||||
}
|
||||
}
|
||||
|
||||
type importResponse struct {
|
||||
Status string `json:"status"`
|
||||
CompletedAt time.Time `json:"completedAt"`
|
||||
@@ -101,3 +104,89 @@ func (i *ImportRequest) markAsCompleted(torrent *Torrent, debridTorrent *debridT
|
||||
i.CompletedAt = time.Now()
|
||||
i.sendCallback(torrent, debridTorrent)
|
||||
}
|
||||
|
||||
type ImportQueue struct {
|
||||
queue map[string]chan *ImportRequest // Map to hold queues for different debrid services
|
||||
mu sync.RWMutex // Mutex to protect access to the queue map
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
capacity int // Capacity of each channel in the queue
|
||||
}
|
||||
|
||||
func NewImportQueue(ctx context.Context, capacity int) *ImportQueue {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
return &ImportQueue{
|
||||
queue: make(map[string]chan *ImportRequest),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
capacity: capacity,
|
||||
}
|
||||
}
|
||||
|
||||
func (iq *ImportQueue) Push(req *ImportRequest) error {
|
||||
if req == nil {
|
||||
return fmt.Errorf("import request cannot be nil")
|
||||
}
|
||||
|
||||
iq.mu.Lock()
|
||||
defer iq.mu.Unlock()
|
||||
|
||||
if _, exists := iq.queue[req.SelectedDebrid]; !exists {
|
||||
iq.queue[req.SelectedDebrid] = make(chan *ImportRequest, iq.capacity) // Create a new channel for the debrid service
|
||||
}
|
||||
|
||||
select {
|
||||
case iq.queue[req.SelectedDebrid] <- req:
|
||||
return nil
|
||||
case <-iq.ctx.Done():
|
||||
return fmt.Errorf("retry queue is shutting down")
|
||||
}
|
||||
}
|
||||
|
||||
func (iq *ImportQueue) TryPop(selectedDebrid string) (*ImportRequest, error) {
|
||||
iq.mu.RLock()
|
||||
defer iq.mu.RUnlock()
|
||||
|
||||
if ch, exists := iq.queue[selectedDebrid]; exists {
|
||||
select {
|
||||
case req := <-ch:
|
||||
return req, nil
|
||||
case <-iq.ctx.Done():
|
||||
return nil, fmt.Errorf("queue is shutting down")
|
||||
default:
|
||||
return nil, fmt.Errorf("no import request available for %s", selectedDebrid)
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("no queue exists for %s", selectedDebrid)
|
||||
}
|
||||
|
||||
func (iq *ImportQueue) Size(selectedDebrid string) int {
|
||||
iq.mu.RLock()
|
||||
defer iq.mu.RUnlock()
|
||||
|
||||
if ch, exists := iq.queue[selectedDebrid]; exists {
|
||||
return len(ch)
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (iq *ImportQueue) Close() {
|
||||
iq.cancel()
|
||||
iq.mu.Lock()
|
||||
defer iq.mu.Unlock()
|
||||
|
||||
for _, ch := range iq.queue {
|
||||
// Drain remaining items before closing
|
||||
for {
|
||||
select {
|
||||
case <-ch:
|
||||
// Discard remaining items
|
||||
default:
|
||||
close(ch)
|
||||
goto nextChannel
|
||||
}
|
||||
}
|
||||
nextChannel:
|
||||
}
|
||||
iq.queue = make(map[string]chan *ImportRequest)
|
||||
}
|
||||
|
||||
+14
-6
@@ -2,6 +2,7 @@ package store
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/sirrobot01/decypharr/internal/config"
|
||||
"github.com/sirrobot01/decypharr/internal/logger"
|
||||
@@ -16,6 +17,7 @@ type Store struct {
|
||||
repair *repair.Repair
|
||||
arr *arr.Storage
|
||||
debrid *debrid.Storage
|
||||
importsQueue *ImportQueue // Queued import requests(probably from too_many_active_downloads)
|
||||
torrents *TorrentStorage
|
||||
logger zerolog.Logger
|
||||
refreshInterval time.Duration
|
||||
@@ -28,8 +30,8 @@ var (
|
||||
once sync.Once
|
||||
)
|
||||
|
||||
// GetStore returns the singleton instance
|
||||
func GetStore() *Store {
|
||||
// Get returns the singleton instance
|
||||
func Get() *Store {
|
||||
once.Do(func() {
|
||||
arrs := arr.NewStorage()
|
||||
deb := debrid.NewStorage()
|
||||
@@ -45,6 +47,7 @@ func GetStore() *Store {
|
||||
refreshInterval: time.Duration(cmp.Or(qbitCfg.RefreshInterval, 10)) * time.Minute,
|
||||
skipPreCache: qbitCfg.SkipPreCache,
|
||||
downloadSemaphore: make(chan struct{}, cmp.Or(qbitCfg.MaxDownloads, 5)),
|
||||
importsQueue: NewImportQueue(context.Background(), 1000),
|
||||
}
|
||||
})
|
||||
return instance
|
||||
@@ -55,21 +58,26 @@ func Reset() {
|
||||
if instance.debrid != nil {
|
||||
instance.debrid.Reset()
|
||||
}
|
||||
|
||||
if instance.importsQueue != nil {
|
||||
instance.importsQueue.Close()
|
||||
}
|
||||
|
||||
close(instance.downloadSemaphore)
|
||||
}
|
||||
once = sync.Once{}
|
||||
instance = nil
|
||||
}
|
||||
|
||||
func (s *Store) GetArr() *arr.Storage {
|
||||
func (s *Store) Arr() *arr.Storage {
|
||||
return s.arr
|
||||
}
|
||||
func (s *Store) GetDebrid() *debrid.Storage {
|
||||
func (s *Store) Debrid() *debrid.Storage {
|
||||
return s.debrid
|
||||
}
|
||||
func (s *Store) GetRepair() *repair.Repair {
|
||||
func (s *Store) Repair() *repair.Repair {
|
||||
return s.repair
|
||||
}
|
||||
func (s *Store) GetTorrentStorage() *TorrentStorage {
|
||||
func (s *Store) Torrents() *TorrentStorage {
|
||||
return s.torrents
|
||||
}
|
||||
|
||||
+123
-20
@@ -3,6 +3,7 @@ package store
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/sirrobot01/decypharr/internal/request"
|
||||
"github.com/sirrobot01/decypharr/internal/utils"
|
||||
@@ -15,22 +16,125 @@ import (
|
||||
|
||||
func (s *Store) AddTorrent(ctx context.Context, importReq *ImportRequest) error {
|
||||
torrent := createTorrentFromMagnet(importReq)
|
||||
debridTorrent, err := debridTypes.ProcessTorrent(ctx, s.debrid, importReq.Debrid, importReq.Magnet, importReq.Arr, importReq.IsSymlink, importReq.DownloadUncached)
|
||||
if err != nil || debridTorrent == nil {
|
||||
if err == nil {
|
||||
err = fmt.Errorf("failed to process torrent")
|
||||
debridTorrent, err := debridTypes.Process(ctx, s.debrid, importReq.SelectedDebrid, importReq.Magnet, importReq.Arr, importReq.IsSymlink, importReq.DownloadUncached)
|
||||
|
||||
if err != nil {
|
||||
var httpErr *utils.HTTPError
|
||||
if ok := errors.As(err, &httpErr); ok {
|
||||
switch httpErr.Code {
|
||||
case "too_many_active_downloads":
|
||||
// Handle too much active downloads error
|
||||
s.logger.Warn().Msgf("Too many active downloads for %s, adding to queue", importReq.Magnet.Name)
|
||||
err := s.addToQueue(importReq)
|
||||
if err != nil {
|
||||
s.logger.Error().Err(err).Msgf("Failed to add %s to queue", importReq.Magnet.Name)
|
||||
return err
|
||||
}
|
||||
torrent.State = "queued"
|
||||
default:
|
||||
// Unhandled error, return it, caller logs it
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// Unhandled error, return it, caller logs it
|
||||
return err
|
||||
}
|
||||
// This error is returned immediately to the user(no need for callback)
|
||||
return err
|
||||
}
|
||||
torrent = s.UpdateTorrentMin(torrent, debridTorrent)
|
||||
torrent = s.partialTorrentUpdate(torrent, debridTorrent)
|
||||
s.torrents.AddOrUpdate(torrent)
|
||||
go s.processFiles(torrent, debridTorrent, importReq) // We can send async for file processing not to delay the response
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) addToQueue(importReq *ImportRequest) error {
|
||||
if importReq.Magnet == nil {
|
||||
return fmt.Errorf("magnet is required")
|
||||
}
|
||||
|
||||
if importReq.Arr == nil {
|
||||
return fmt.Errorf("arr is required")
|
||||
}
|
||||
|
||||
importReq.Status = "queued"
|
||||
importReq.CompletedAt = time.Time{}
|
||||
importReq.Error = nil
|
||||
err := s.importsQueue.Push(importReq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) processFromQueue(ctx context.Context, selectedDebrid string) error {
|
||||
// Pop the next import request from the queue
|
||||
importReq, err := s.importsQueue.TryPop(selectedDebrid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if importReq == nil {
|
||||
return nil
|
||||
}
|
||||
return s.AddTorrent(ctx, importReq)
|
||||
}
|
||||
|
||||
func (s *Store) StartQueueSchedule(ctx context.Context) error {
|
||||
|
||||
s.trackAvailableSlots(ctx) // Initial tracking of available slots
|
||||
|
||||
ticker := time.NewTicker(time.Minute)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
s.trackAvailableSlots(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) trackAvailableSlots(ctx context.Context) {
|
||||
// This function tracks the available slots for each debrid client
|
||||
availableSlots := make(map[string]int)
|
||||
|
||||
for name, deb := range s.debrid.Debrids() {
|
||||
slots, err := deb.Client().GetAvailableSlots()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
availableSlots[name] = slots
|
||||
}
|
||||
|
||||
for name, slots := range availableSlots {
|
||||
if s.importsQueue.Size(name) <= 0 {
|
||||
continue
|
||||
}
|
||||
s.logger.Debug().Msgf("Available slots for %s: %d", name, slots)
|
||||
// If slots are available, process the next import request from the queue
|
||||
for slots > 0 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return // Exit if context is done
|
||||
default:
|
||||
if err := s.processFromQueue(ctx, name); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Error processing from queue")
|
||||
return // Exit on error
|
||||
}
|
||||
slots-- // Decrease the available slots after processing
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, importReq *ImportRequest) {
|
||||
client := s.debrid.GetClient(debridTorrent.Debrid)
|
||||
|
||||
if debridTorrent == nil {
|
||||
// Early return if debridTorrent is nil
|
||||
return
|
||||
}
|
||||
|
||||
deb := s.debrid.Debrid(debridTorrent.Debrid)
|
||||
client := deb.Client()
|
||||
downloadingStatuses := client.GetDownloadingStatus()
|
||||
_arr := importReq.Arr
|
||||
for debridTorrent.Status != "downloaded" {
|
||||
@@ -53,7 +157,7 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp
|
||||
}
|
||||
|
||||
debridTorrent = dbT
|
||||
torrent = s.UpdateTorrentMin(torrent, debridTorrent)
|
||||
torrent = s.partialTorrentUpdate(torrent, debridTorrent)
|
||||
|
||||
// Exit the loop for downloading statuses to prevent memory buildup
|
||||
if debridTorrent.Status == "downloaded" || !utils.Contains(downloadingStatuses, debridTorrent.Status) {
|
||||
@@ -71,9 +175,8 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp
|
||||
// Check if debrid supports webdav by checking cache
|
||||
timer := time.Now()
|
||||
if importReq.IsSymlink {
|
||||
caches := s.debrid.GetCaches()
|
||||
cache, useWebdav := caches[debridTorrent.Debrid]
|
||||
if useWebdav {
|
||||
cache := deb.Cache()
|
||||
if cache != nil {
|
||||
s.logger.Info().Msgf("Using internal webdav for %s", debridTorrent.Debrid)
|
||||
|
||||
// Use webdav to download the file
|
||||
@@ -91,10 +194,10 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp
|
||||
|
||||
} else {
|
||||
// User is using either zurg or debrid webdav
|
||||
torrentSymlinkPath, err = s.ProcessSymlink(torrent) // /mnt/symlinks/{category}/MyTVShow/
|
||||
torrentSymlinkPath, err = s.processSymlink(torrent) // /mnt/symlinks/{category}/MyTVShow/
|
||||
}
|
||||
} else {
|
||||
torrentSymlinkPath, err = s.ProcessManualFile(torrent)
|
||||
torrentSymlinkPath, err = s.processDownload(torrent)
|
||||
}
|
||||
if err != nil {
|
||||
s.markTorrentAsFailed(torrent)
|
||||
@@ -106,7 +209,7 @@ func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, imp
|
||||
return
|
||||
}
|
||||
torrent.TorrentPath = torrentSymlinkPath
|
||||
s.UpdateTorrent(torrent, debridTorrent)
|
||||
s.updateTorrent(torrent, debridTorrent)
|
||||
s.logger.Info().Msgf("Adding %s took %s", debridTorrent.Name, time.Since(timer))
|
||||
|
||||
go importReq.markAsCompleted(torrent, debridTorrent) // Mark the import request as completed, send callback if needed
|
||||
@@ -129,7 +232,7 @@ func (s *Store) markTorrentAsFailed(t *Torrent) *Torrent {
|
||||
return t
|
||||
}
|
||||
|
||||
func (s *Store) UpdateTorrentMin(t *Torrent, debridTorrent *types.Torrent) *Torrent {
|
||||
func (s *Store) partialTorrentUpdate(t *Torrent, debridTorrent *types.Torrent) *Torrent {
|
||||
if debridTorrent == nil {
|
||||
return t
|
||||
}
|
||||
@@ -170,17 +273,17 @@ func (s *Store) UpdateTorrentMin(t *Torrent, debridTorrent *types.Torrent) *Torr
|
||||
return t
|
||||
}
|
||||
|
||||
func (s *Store) UpdateTorrent(t *Torrent, debridTorrent *types.Torrent) *Torrent {
|
||||
func (s *Store) updateTorrent(t *Torrent, debridTorrent *types.Torrent) *Torrent {
|
||||
if debridTorrent == nil {
|
||||
return t
|
||||
}
|
||||
|
||||
if debridClient := s.debrid.GetClients()[debridTorrent.Debrid]; debridClient != nil {
|
||||
if debridClient := s.debrid.Clients()[debridTorrent.Debrid]; debridClient != nil {
|
||||
if debridTorrent.Status != "downloaded" {
|
||||
_ = debridClient.UpdateTorrent(debridTorrent)
|
||||
}
|
||||
}
|
||||
t = s.UpdateTorrentMin(t, debridTorrent)
|
||||
t = s.partialTorrentUpdate(t, debridTorrent)
|
||||
t.ContentPath = t.TorrentPath + string(os.PathSeparator)
|
||||
|
||||
if t.IsReady() {
|
||||
@@ -200,7 +303,7 @@ func (s *Store) UpdateTorrent(t *Torrent, debridTorrent *types.Torrent) *Torrent
|
||||
s.torrents.Update(t)
|
||||
return t
|
||||
}
|
||||
updatedT := s.UpdateTorrent(t, debridTorrent)
|
||||
updatedT := s.updateTorrent(t, debridTorrent)
|
||||
t = updatedT
|
||||
|
||||
case <-time.After(10 * time.Minute): // Add a timeout
|
||||
|
||||
@@ -184,7 +184,7 @@ func (ts *TorrentStorage) Delete(hash, category string, removeFromDebrid bool) {
|
||||
return
|
||||
}
|
||||
if removeFromDebrid && torrent.ID != "" && torrent.Debrid != "" {
|
||||
dbClient := GetStore().debrid.GetClient(torrent.Debrid)
|
||||
dbClient := Get().debrid.Client(torrent.Debrid)
|
||||
if dbClient != nil {
|
||||
_ = dbClient.DeleteTorrent(torrent.ID)
|
||||
}
|
||||
@@ -238,7 +238,7 @@ func (ts *TorrentStorage) DeleteMultiple(hashes []string, removeFromDebrid bool)
|
||||
}
|
||||
}()
|
||||
|
||||
clients := GetStore().debrid.GetClients()
|
||||
clients := Get().debrid.Clients()
|
||||
|
||||
go func() {
|
||||
for id, debrid := range toDelete {
|
||||
|
||||
Reference in New Issue
Block a user