From 8d87c602b95b5726c18e75c3e3a6fec280bf22c9 Mon Sep 17 00:00:00 2001 From: Mukhtar Akere Date: Sun, 15 Jun 2025 22:46:07 +0100 Subject: [PATCH] - Add remove stalled torrent - Few cleanup --- internal/config/config.go | 28 +++---- pkg/qbit/http.go | 4 +- pkg/store/queue.go | 143 ++++++++++++++++++++++++++++++++++ pkg/store/store.go | 38 ++++----- pkg/store/torrent.go | 84 +------------------- pkg/store/torrent_storage.go | 20 +++++ pkg/web/api.go | 1 + pkg/web/templates/config.html | 16 +++- 8 files changed, 216 insertions(+), 118 deletions(-) create mode 100644 pkg/store/queue.go diff --git a/internal/config/config.go b/internal/config/config.go index 329531e..f0c10db 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -10,6 +10,7 @@ import ( "runtime" "strings" "sync" + "time" ) var ( @@ -77,19 +78,20 @@ type Config struct { URLBase string `json:"url_base,omitempty"` Port string `json:"port,omitempty"` - LogLevel string `json:"log_level,omitempty"` - Debrids []Debrid `json:"debrids,omitempty"` - QBitTorrent QBitTorrent `json:"qbittorrent,omitempty"` - Arrs []Arr `json:"arrs,omitempty"` - Repair Repair `json:"repair,omitempty"` - WebDav WebDav `json:"webdav,omitempty"` - AllowedExt []string `json:"allowed_file_types,omitempty"` - MinFileSize string `json:"min_file_size,omitempty"` // Minimum file size to download, 10MB, 1GB, etc - MaxFileSize string `json:"max_file_size,omitempty"` // Maximum file size to download (0 means no limit) - Path string `json:"-"` // Path to save the config file - UseAuth bool `json:"use_auth,omitempty"` - Auth *Auth `json:"-"` - DiscordWebhook string `json:"discord_webhook_url,omitempty"` + LogLevel string `json:"log_level,omitempty"` + Debrids []Debrid `json:"debrids,omitempty"` + QBitTorrent QBitTorrent `json:"qbittorrent,omitempty"` + Arrs []Arr `json:"arrs,omitempty"` + Repair Repair `json:"repair,omitempty"` + WebDav WebDav `json:"webdav,omitempty"` + AllowedExt []string `json:"allowed_file_types,omitempty"` + MinFileSize string `json:"min_file_size,omitempty"` // Minimum file size to download, 10MB, 1GB, etc + MaxFileSize string `json:"max_file_size,omitempty"` // Maximum file size to download (0 means no limit) + Path string `json:"-"` // Path to save the config file + UseAuth bool `json:"use_auth,omitempty"` + Auth *Auth `json:"-"` + DiscordWebhook string `json:"discord_webhook_url,omitempty"` + RemoveStalledAfter time.Duration `json:"remove_stalled_after,omitempty"` } func (c *Config) JsonFile() string { diff --git a/pkg/qbit/http.go b/pkg/qbit/http.go index 409677c..210c523 100644 --- a/pkg/qbit/http.go +++ b/pkg/qbit/http.go @@ -108,7 +108,7 @@ func (q *QBit) handleTorrentsAdd(w http.ResponseWriter, r *http.Request) { } for _, url := range urlList { if err := q.addMagnet(ctx, url, _arr, debridName, action); err != nil { - q.logger.Error().Err(err).Msgf("Error adding magnet") + q.logger.Debug().Err(err).Msgf("Error adding magnet") http.Error(w, err.Error(), http.StatusBadRequest) return } @@ -121,7 +121,7 @@ func (q *QBit) handleTorrentsAdd(w http.ResponseWriter, r *http.Request) { if files := r.MultipartForm.File["torrents"]; len(files) > 0 { for _, fileHeader := range files { if err := q.addTorrent(ctx, fileHeader, _arr, debridName, action); err != nil { - q.logger.Error().Err(err).Msgf("Error adding torrent") + q.logger.Debug().Err(err).Msgf("Error adding torrent") http.Error(w, err.Error(), http.StatusBadRequest) return } diff --git a/pkg/store/queue.go b/pkg/store/queue.go new file mode 100644 index 0000000..f4d573e --- /dev/null +++ b/pkg/store/queue.go @@ -0,0 +1,143 @@ +package store + +import ( + "context" + "fmt" + "time" +) + +func (s *Store) addToQueue(importReq *ImportRequest) error { + if importReq.Magnet == nil { + return fmt.Errorf("magnet is required") + } + + if importReq.Arr == nil { + return fmt.Errorf("arr is required") + } + + importReq.Status = "queued" + importReq.CompletedAt = time.Time{} + importReq.Error = nil + err := s.importsQueue.Push(importReq) + if err != nil { + return err + } + return nil +} + +func (s *Store) StartQueueSchedule(ctx context.Context) error { + // Start the slots processing in a separate goroutine + go func() { + if err := s.processSlotsQueue(ctx); err != nil { + s.logger.Error().Err(err).Msg("Error processing slots queue") + } + }() + + // Start the remove stalled torrents processing in a separate goroutine + go func() { + if err := s.processRemoveStalledTorrents(ctx); err != nil { + s.logger.Error().Err(err).Msg("Error processing remove stalled torrents") + } + }() + + return nil +} + +func (s *Store) processSlotsQueue(ctx context.Context) error { + s.trackAvailableSlots(ctx) // Initial tracking of available slots + + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + s.trackAvailableSlots(ctx) + } + } +} + +func (s *Store) processRemoveStalledTorrents(ctx context.Context) error { + if s.removeStalledAfter <= 0 { + return nil // No need to remove stalled torrents if the duration is not set + } + + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + if err := s.removeStalledTorrents(ctx); err != nil { + s.logger.Error().Err(err).Msg("Error removing stalled torrents") + } + } + } +} + +func (s *Store) trackAvailableSlots(ctx context.Context) { + // This function tracks the available slots for each debrid client + availableSlots := make(map[string]int) + + for name, deb := range s.debrid.Debrids() { + slots, err := deb.Client().GetAvailableSlots() + if err != nil { + continue + } + availableSlots[name] = slots + } + + if s.importsQueue.Size() <= 0 { + // Queue is empty, no need to process + return + } + + for name, slots := range availableSlots { + + s.logger.Debug().Msgf("Available slots for %s: %d", name, slots) + // If slots are available, process the next import request from the queue + for slots > 0 { + select { + case <-ctx.Done(): + return // Exit if context is done + default: + if err := s.processFromQueue(ctx); err != nil { + s.logger.Error().Err(err).Msg("Error processing from queue") + return // Exit on error + } + slots-- // Decrease the available slots after processing + } + } + } +} + +func (s *Store) processFromQueue(ctx context.Context) error { + // Pop the next import request from the queue + importReq, err := s.importsQueue.Pop() + if err != nil { + return err + } + if importReq == nil { + return nil + } + return s.AddTorrent(ctx, importReq) +} + +func (s *Store) removeStalledTorrents(ctx context.Context) error { + // This function checks for stalled torrents and removes them + stalledTorrents := s.torrents.GetStalledTorrents(s.removeStalledAfter) + if len(stalledTorrents) == 0 { + return nil // No stalled torrents to remove + } + + for _, torrent := range stalledTorrents { + s.logger.Warn().Msgf("Removing stalled torrent: %s", torrent.Name) + s.torrents.Delete(torrent.Hash, torrent.Category, true) // Remove from store and delete from debrid + } + + return nil +} diff --git a/pkg/store/store.go b/pkg/store/store.go index 4492ffc..99226c3 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -14,15 +14,16 @@ import ( ) type Store struct { - repair *repair.Repair - arr *arr.Storage - debrid *debrid.Storage - importsQueue *ImportQueue // Queued import requests(probably from too_many_active_downloads) - torrents *TorrentStorage - logger zerolog.Logger - refreshInterval time.Duration - skipPreCache bool - downloadSemaphore chan struct{} + repair *repair.Repair + arr *arr.Storage + debrid *debrid.Storage + importsQueue *ImportQueue // Queued import requests(probably from too_many_active_downloads) + torrents *TorrentStorage + logger zerolog.Logger + refreshInterval time.Duration + skipPreCache bool + downloadSemaphore chan struct{} + removeStalledAfter time.Duration // Duration after which stalled torrents are removed } var ( @@ -39,15 +40,16 @@ func Get() *Store { qbitCfg := cfg.QBitTorrent instance = &Store{ - repair: repair.New(arrs, deb), - arr: arrs, - debrid: deb, - torrents: newTorrentStorage(cfg.TorrentsFile()), - logger: logger.Default(), // Use default logger [decypharr] - refreshInterval: time.Duration(cmp.Or(qbitCfg.RefreshInterval, 10)) * time.Minute, - skipPreCache: qbitCfg.SkipPreCache, - downloadSemaphore: make(chan struct{}, cmp.Or(qbitCfg.MaxDownloads, 5)), - importsQueue: NewImportQueue(context.Background(), 1000), + repair: repair.New(arrs, deb), + arr: arrs, + debrid: deb, + torrents: newTorrentStorage(cfg.TorrentsFile()), + logger: logger.Default(), // Use default logger [decypharr] + refreshInterval: time.Duration(cmp.Or(qbitCfg.RefreshInterval, 10)) * time.Minute, + skipPreCache: qbitCfg.SkipPreCache, + downloadSemaphore: make(chan struct{}, cmp.Or(qbitCfg.MaxDownloads, 5)), + importsQueue: NewImportQueue(context.Background(), 1000), + removeStalledAfter: cfg.RemoveStalledAfter, } }) return instance diff --git a/pkg/store/torrent.go b/pkg/store/torrent.go index db73285..17612d7 100644 --- a/pkg/store/torrent.go +++ b/pkg/store/torrent.go @@ -46,89 +46,6 @@ func (s *Store) AddTorrent(ctx context.Context, importReq *ImportRequest) error return nil } -func (s *Store) addToQueue(importReq *ImportRequest) error { - if importReq.Magnet == nil { - return fmt.Errorf("magnet is required") - } - - if importReq.Arr == nil { - return fmt.Errorf("arr is required") - } - - importReq.Status = "queued" - importReq.CompletedAt = time.Time{} - importReq.Error = nil - err := s.importsQueue.Push(importReq) - if err != nil { - return err - } - return nil -} - -func (s *Store) processFromQueue(ctx context.Context) error { - // Pop the next import request from the queue - importReq, err := s.importsQueue.Pop() - if err != nil { - return err - } - if importReq == nil { - return nil - } - return s.AddTorrent(ctx, importReq) -} - -func (s *Store) StartQueueSchedule(ctx context.Context) error { - - s.trackAvailableSlots(ctx) // Initial tracking of available slots - - ticker := time.NewTicker(time.Minute) - - for { - select { - case <-ctx.Done(): - return nil - case <-ticker.C: - s.trackAvailableSlots(ctx) - } - } -} - -func (s *Store) trackAvailableSlots(ctx context.Context) { - // This function tracks the available slots for each debrid client - availableSlots := make(map[string]int) - - for name, deb := range s.debrid.Debrids() { - slots, err := deb.Client().GetAvailableSlots() - if err != nil { - continue - } - availableSlots[name] = slots - } - - if s.importsQueue.Size() <= 0 { - // Queue is empty, no need to process - return - } - - for name, slots := range availableSlots { - - s.logger.Debug().Msgf("Available slots for %s: %d", name, slots) - // If slots are available, process the next import request from the queue - for slots > 0 { - select { - case <-ctx.Done(): - return // Exit if context is done - default: - if err := s.processFromQueue(ctx); err != nil { - s.logger.Error().Err(err).Msg("Error processing from queue") - return // Exit on error - } - slots-- // Decrease the available slots after processing - } - } - } -} - func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, importReq *ImportRequest) { if debridTorrent == nil { @@ -310,6 +227,7 @@ func (s *Store) partialTorrentUpdate(t *Torrent, debridTorrent *types.Torrent) * t.Debrid = debridTorrent.Debrid t.Size = totalSize t.Completed = sizeCompleted + t.NumSeeds = debridTorrent.Seeders t.Downloaded = sizeCompleted t.DownloadedSession = sizeCompleted t.Uploaded = sizeCompleted diff --git a/pkg/store/torrent_storage.go b/pkg/store/torrent_storage.go index 2b36ada..e228c48 100644 --- a/pkg/store/torrent_storage.go +++ b/pkg/store/torrent_storage.go @@ -6,6 +6,7 @@ import ( "os" "sort" "sync" + "time" ) func keyPair(hash, category string) string { @@ -288,3 +289,22 @@ func (ts *TorrentStorage) Reset() { defer ts.mu.Unlock() ts.torrents = make(Torrents) } + +// GetStalledTorrents returns a list of torrents that are stalled +// A torrent is considered stalled if it has no seeds, no progress, and has been downloading for longer than removeStalledAfter +// The torrent must have a DebridID and be in the "downloading" state +func (ts *TorrentStorage) GetStalledTorrents(removeAfter time.Duration) []*Torrent { + ts.mu.RLock() + defer ts.mu.RUnlock() + stalled := make([]*Torrent, 0) + currentTime := time.Now() + for _, torrent := range ts.torrents { + if torrent.DebridID != "" && torrent.State == "downloading" && torrent.NumSeeds == 0 && torrent.Progress == 0 { + addedOn := time.Unix(torrent.AddedOn, 0) + if currentTime.Sub(addedOn) > removeAfter { + stalled = append(stalled, torrent) + } + } + } + return stalled +} diff --git a/pkg/web/api.go b/pkg/web/api.go index c62adf9..6fc3a29 100644 --- a/pkg/web/api.go +++ b/pkg/web/api.go @@ -214,6 +214,7 @@ func (wb *Web) handleUpdateConfig(w http.ResponseWriter, r *http.Request) { currentConfig.LogLevel = updatedConfig.LogLevel currentConfig.MinFileSize = updatedConfig.MinFileSize currentConfig.MaxFileSize = updatedConfig.MaxFileSize + currentConfig.RemoveStalledAfter = updatedConfig.RemoveStalledAfter currentConfig.AllowedExt = updatedConfig.AllowedExt currentConfig.DiscordWebhook = updatedConfig.DiscordWebhook diff --git a/pkg/web/templates/config.html b/pkg/web/templates/config.html index 341ea71..47eb6d0 100644 --- a/pkg/web/templates/config.html +++ b/pkg/web/templates/config.html @@ -142,7 +142,7 @@ -
+
Minimum file size to download (Empty for no limit)
-
+
Maximum file size to download (Empty for no limit)
+
+
+ + + Remove torrents that have been stalled for this duration +
+
@@ -1056,6 +1067,7 @@ allowed_file_types: document.getElementById('allowedExtensions').value.split(',').map(ext => ext.trim()).filter(Boolean), min_file_size: document.getElementById('minFileSize').value, max_file_size: document.getElementById('maxFileSize').value, + remove_stalled_after: document.getElementById('removeStalledAfter').value, url_base: document.getElementById('urlBase').value, bind_address: document.getElementById('bindAddress').value, port: document.getElementById('port').value,