- Add remove stalled torrent

- Few cleanup
This commit is contained in:
Mukhtar Akere
2025-06-15 22:46:07 +01:00
parent 7cf25f53e7
commit 8d87c602b9
8 changed files with 216 additions and 118 deletions

View File

@@ -10,6 +10,7 @@ import (
"runtime" "runtime"
"strings" "strings"
"sync" "sync"
"time"
) )
var ( var (
@@ -77,19 +78,20 @@ type Config struct {
URLBase string `json:"url_base,omitempty"` URLBase string `json:"url_base,omitempty"`
Port string `json:"port,omitempty"` Port string `json:"port,omitempty"`
LogLevel string `json:"log_level,omitempty"` LogLevel string `json:"log_level,omitempty"`
Debrids []Debrid `json:"debrids,omitempty"` Debrids []Debrid `json:"debrids,omitempty"`
QBitTorrent QBitTorrent `json:"qbittorrent,omitempty"` QBitTorrent QBitTorrent `json:"qbittorrent,omitempty"`
Arrs []Arr `json:"arrs,omitempty"` Arrs []Arr `json:"arrs,omitempty"`
Repair Repair `json:"repair,omitempty"` Repair Repair `json:"repair,omitempty"`
WebDav WebDav `json:"webdav,omitempty"` WebDav WebDav `json:"webdav,omitempty"`
AllowedExt []string `json:"allowed_file_types,omitempty"` AllowedExt []string `json:"allowed_file_types,omitempty"`
MinFileSize string `json:"min_file_size,omitempty"` // Minimum file size to download, 10MB, 1GB, etc MinFileSize string `json:"min_file_size,omitempty"` // Minimum file size to download, 10MB, 1GB, etc
MaxFileSize string `json:"max_file_size,omitempty"` // Maximum file size to download (0 means no limit) MaxFileSize string `json:"max_file_size,omitempty"` // Maximum file size to download (0 means no limit)
Path string `json:"-"` // Path to save the config file Path string `json:"-"` // Path to save the config file
UseAuth bool `json:"use_auth,omitempty"` UseAuth bool `json:"use_auth,omitempty"`
Auth *Auth `json:"-"` Auth *Auth `json:"-"`
DiscordWebhook string `json:"discord_webhook_url,omitempty"` DiscordWebhook string `json:"discord_webhook_url,omitempty"`
RemoveStalledAfter time.Duration `json:"remove_stalled_after,omitempty"`
} }
func (c *Config) JsonFile() string { func (c *Config) JsonFile() string {

View File

@@ -108,7 +108,7 @@ func (q *QBit) handleTorrentsAdd(w http.ResponseWriter, r *http.Request) {
} }
for _, url := range urlList { for _, url := range urlList {
if err := q.addMagnet(ctx, url, _arr, debridName, action); err != nil { if err := q.addMagnet(ctx, url, _arr, debridName, action); err != nil {
q.logger.Error().Err(err).Msgf("Error adding magnet") q.logger.Debug().Err(err).Msgf("Error adding magnet")
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return return
} }
@@ -121,7 +121,7 @@ func (q *QBit) handleTorrentsAdd(w http.ResponseWriter, r *http.Request) {
if files := r.MultipartForm.File["torrents"]; len(files) > 0 { if files := r.MultipartForm.File["torrents"]; len(files) > 0 {
for _, fileHeader := range files { for _, fileHeader := range files {
if err := q.addTorrent(ctx, fileHeader, _arr, debridName, action); err != nil { if err := q.addTorrent(ctx, fileHeader, _arr, debridName, action); err != nil {
q.logger.Error().Err(err).Msgf("Error adding torrent") q.logger.Debug().Err(err).Msgf("Error adding torrent")
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return return
} }

143
pkg/store/queue.go Normal file
View File

@@ -0,0 +1,143 @@
package store
import (
"context"
"fmt"
"time"
)
func (s *Store) addToQueue(importReq *ImportRequest) error {
if importReq.Magnet == nil {
return fmt.Errorf("magnet is required")
}
if importReq.Arr == nil {
return fmt.Errorf("arr is required")
}
importReq.Status = "queued"
importReq.CompletedAt = time.Time{}
importReq.Error = nil
err := s.importsQueue.Push(importReq)
if err != nil {
return err
}
return nil
}
func (s *Store) StartQueueSchedule(ctx context.Context) error {
// Start the slots processing in a separate goroutine
go func() {
if err := s.processSlotsQueue(ctx); err != nil {
s.logger.Error().Err(err).Msg("Error processing slots queue")
}
}()
// Start the remove stalled torrents processing in a separate goroutine
go func() {
if err := s.processRemoveStalledTorrents(ctx); err != nil {
s.logger.Error().Err(err).Msg("Error processing remove stalled torrents")
}
}()
return nil
}
func (s *Store) processSlotsQueue(ctx context.Context) error {
s.trackAvailableSlots(ctx) // Initial tracking of available slots
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
s.trackAvailableSlots(ctx)
}
}
}
func (s *Store) processRemoveStalledTorrents(ctx context.Context) error {
if s.removeStalledAfter <= 0 {
return nil // No need to remove stalled torrents if the duration is not set
}
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
if err := s.removeStalledTorrents(ctx); err != nil {
s.logger.Error().Err(err).Msg("Error removing stalled torrents")
}
}
}
}
func (s *Store) trackAvailableSlots(ctx context.Context) {
// This function tracks the available slots for each debrid client
availableSlots := make(map[string]int)
for name, deb := range s.debrid.Debrids() {
slots, err := deb.Client().GetAvailableSlots()
if err != nil {
continue
}
availableSlots[name] = slots
}
if s.importsQueue.Size() <= 0 {
// Queue is empty, no need to process
return
}
for name, slots := range availableSlots {
s.logger.Debug().Msgf("Available slots for %s: %d", name, slots)
// If slots are available, process the next import request from the queue
for slots > 0 {
select {
case <-ctx.Done():
return // Exit if context is done
default:
if err := s.processFromQueue(ctx); err != nil {
s.logger.Error().Err(err).Msg("Error processing from queue")
return // Exit on error
}
slots-- // Decrease the available slots after processing
}
}
}
}
func (s *Store) processFromQueue(ctx context.Context) error {
// Pop the next import request from the queue
importReq, err := s.importsQueue.Pop()
if err != nil {
return err
}
if importReq == nil {
return nil
}
return s.AddTorrent(ctx, importReq)
}
func (s *Store) removeStalledTorrents(ctx context.Context) error {
// This function checks for stalled torrents and removes them
stalledTorrents := s.torrents.GetStalledTorrents(s.removeStalledAfter)
if len(stalledTorrents) == 0 {
return nil // No stalled torrents to remove
}
for _, torrent := range stalledTorrents {
s.logger.Warn().Msgf("Removing stalled torrent: %s", torrent.Name)
s.torrents.Delete(torrent.Hash, torrent.Category, true) // Remove from store and delete from debrid
}
return nil
}

View File

@@ -14,15 +14,16 @@ import (
) )
type Store struct { type Store struct {
repair *repair.Repair repair *repair.Repair
arr *arr.Storage arr *arr.Storage
debrid *debrid.Storage debrid *debrid.Storage
importsQueue *ImportQueue // Queued import requests(probably from too_many_active_downloads) importsQueue *ImportQueue // Queued import requests(probably from too_many_active_downloads)
torrents *TorrentStorage torrents *TorrentStorage
logger zerolog.Logger logger zerolog.Logger
refreshInterval time.Duration refreshInterval time.Duration
skipPreCache bool skipPreCache bool
downloadSemaphore chan struct{} downloadSemaphore chan struct{}
removeStalledAfter time.Duration // Duration after which stalled torrents are removed
} }
var ( var (
@@ -39,15 +40,16 @@ func Get() *Store {
qbitCfg := cfg.QBitTorrent qbitCfg := cfg.QBitTorrent
instance = &Store{ instance = &Store{
repair: repair.New(arrs, deb), repair: repair.New(arrs, deb),
arr: arrs, arr: arrs,
debrid: deb, debrid: deb,
torrents: newTorrentStorage(cfg.TorrentsFile()), torrents: newTorrentStorage(cfg.TorrentsFile()),
logger: logger.Default(), // Use default logger [decypharr] logger: logger.Default(), // Use default logger [decypharr]
refreshInterval: time.Duration(cmp.Or(qbitCfg.RefreshInterval, 10)) * time.Minute, refreshInterval: time.Duration(cmp.Or(qbitCfg.RefreshInterval, 10)) * time.Minute,
skipPreCache: qbitCfg.SkipPreCache, skipPreCache: qbitCfg.SkipPreCache,
downloadSemaphore: make(chan struct{}, cmp.Or(qbitCfg.MaxDownloads, 5)), downloadSemaphore: make(chan struct{}, cmp.Or(qbitCfg.MaxDownloads, 5)),
importsQueue: NewImportQueue(context.Background(), 1000), importsQueue: NewImportQueue(context.Background(), 1000),
removeStalledAfter: cfg.RemoveStalledAfter,
} }
}) })
return instance return instance

View File

@@ -46,89 +46,6 @@ func (s *Store) AddTorrent(ctx context.Context, importReq *ImportRequest) error
return nil return nil
} }
func (s *Store) addToQueue(importReq *ImportRequest) error {
if importReq.Magnet == nil {
return fmt.Errorf("magnet is required")
}
if importReq.Arr == nil {
return fmt.Errorf("arr is required")
}
importReq.Status = "queued"
importReq.CompletedAt = time.Time{}
importReq.Error = nil
err := s.importsQueue.Push(importReq)
if err != nil {
return err
}
return nil
}
func (s *Store) processFromQueue(ctx context.Context) error {
// Pop the next import request from the queue
importReq, err := s.importsQueue.Pop()
if err != nil {
return err
}
if importReq == nil {
return nil
}
return s.AddTorrent(ctx, importReq)
}
func (s *Store) StartQueueSchedule(ctx context.Context) error {
s.trackAvailableSlots(ctx) // Initial tracking of available slots
ticker := time.NewTicker(time.Minute)
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
s.trackAvailableSlots(ctx)
}
}
}
func (s *Store) trackAvailableSlots(ctx context.Context) {
// This function tracks the available slots for each debrid client
availableSlots := make(map[string]int)
for name, deb := range s.debrid.Debrids() {
slots, err := deb.Client().GetAvailableSlots()
if err != nil {
continue
}
availableSlots[name] = slots
}
if s.importsQueue.Size() <= 0 {
// Queue is empty, no need to process
return
}
for name, slots := range availableSlots {
s.logger.Debug().Msgf("Available slots for %s: %d", name, slots)
// If slots are available, process the next import request from the queue
for slots > 0 {
select {
case <-ctx.Done():
return // Exit if context is done
default:
if err := s.processFromQueue(ctx); err != nil {
s.logger.Error().Err(err).Msg("Error processing from queue")
return // Exit on error
}
slots-- // Decrease the available slots after processing
}
}
}
}
func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, importReq *ImportRequest) { func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, importReq *ImportRequest) {
if debridTorrent == nil { if debridTorrent == nil {
@@ -310,6 +227,7 @@ func (s *Store) partialTorrentUpdate(t *Torrent, debridTorrent *types.Torrent) *
t.Debrid = debridTorrent.Debrid t.Debrid = debridTorrent.Debrid
t.Size = totalSize t.Size = totalSize
t.Completed = sizeCompleted t.Completed = sizeCompleted
t.NumSeeds = debridTorrent.Seeders
t.Downloaded = sizeCompleted t.Downloaded = sizeCompleted
t.DownloadedSession = sizeCompleted t.DownloadedSession = sizeCompleted
t.Uploaded = sizeCompleted t.Uploaded = sizeCompleted

View File

@@ -6,6 +6,7 @@ import (
"os" "os"
"sort" "sort"
"sync" "sync"
"time"
) )
func keyPair(hash, category string) string { func keyPair(hash, category string) string {
@@ -288,3 +289,22 @@ func (ts *TorrentStorage) Reset() {
defer ts.mu.Unlock() defer ts.mu.Unlock()
ts.torrents = make(Torrents) ts.torrents = make(Torrents)
} }
// GetStalledTorrents returns a list of torrents that are stalled
// A torrent is considered stalled if it has no seeds, no progress, and has been downloading for longer than removeStalledAfter
// The torrent must have a DebridID and be in the "downloading" state
func (ts *TorrentStorage) GetStalledTorrents(removeAfter time.Duration) []*Torrent {
ts.mu.RLock()
defer ts.mu.RUnlock()
stalled := make([]*Torrent, 0)
currentTime := time.Now()
for _, torrent := range ts.torrents {
if torrent.DebridID != "" && torrent.State == "downloading" && torrent.NumSeeds == 0 && torrent.Progress == 0 {
addedOn := time.Unix(torrent.AddedOn, 0)
if currentTime.Sub(addedOn) > removeAfter {
stalled = append(stalled, torrent)
}
}
}
return stalled
}

View File

@@ -214,6 +214,7 @@ func (wb *Web) handleUpdateConfig(w http.ResponseWriter, r *http.Request) {
currentConfig.LogLevel = updatedConfig.LogLevel currentConfig.LogLevel = updatedConfig.LogLevel
currentConfig.MinFileSize = updatedConfig.MinFileSize currentConfig.MinFileSize = updatedConfig.MinFileSize
currentConfig.MaxFileSize = updatedConfig.MaxFileSize currentConfig.MaxFileSize = updatedConfig.MaxFileSize
currentConfig.RemoveStalledAfter = updatedConfig.RemoveStalledAfter
currentConfig.AllowedExt = updatedConfig.AllowedExt currentConfig.AllowedExt = updatedConfig.AllowedExt
currentConfig.DiscordWebhook = updatedConfig.DiscordWebhook currentConfig.DiscordWebhook = updatedConfig.DiscordWebhook

View File

@@ -142,7 +142,7 @@
</div> </div>
</div> </div>
</div> </div>
<div class="col-md-6 mt-3"> <div class="col-md-4 mt-3">
<div class="form-group"> <div class="form-group">
<label for="minFileSize">Minimum File Size</label> <label for="minFileSize">Minimum File Size</label>
<input type="text" <input type="text"
@@ -153,7 +153,7 @@
<small class="form-text text-muted">Minimum file size to download (Empty for no limit)</small> <small class="form-text text-muted">Minimum file size to download (Empty for no limit)</small>
</div> </div>
</div> </div>
<div class="col-md-6 mt-3"> <div class="col-md-4 mt-3">
<div class="form-group"> <div class="form-group">
<label for="maxFileSize">Maximum File Size</label> <label for="maxFileSize">Maximum File Size</label>
<input type="text" <input type="text"
@@ -164,6 +164,17 @@
<small class="form-text text-muted">Maximum file size to download (Empty for no limit)</small> <small class="form-text text-muted">Maximum file size to download (Empty for no limit)</small>
</div> </div>
</div> </div>
<div class="col-md-4 mt-3">
<div class="form-group">
<label for="removeStalledAfter">Remove Stalled Torrents After</label>
<input type="text"
class="form-control"
id="removeStalledAfter"
name="remove_stalled_after"
placeholder="e.g., 1m, 30s, 1h">
<small class="form-text text-muted">Remove torrents that have been stalled for this duration</small>
</div>
</div>
</div> </div>
</div> </div>
<div class="mt-4 d-flex justify-content-end"> <div class="mt-4 d-flex justify-content-end">
@@ -1056,6 +1067,7 @@
allowed_file_types: document.getElementById('allowedExtensions').value.split(',').map(ext => ext.trim()).filter(Boolean), allowed_file_types: document.getElementById('allowedExtensions').value.split(',').map(ext => ext.trim()).filter(Boolean),
min_file_size: document.getElementById('minFileSize').value, min_file_size: document.getElementById('minFileSize').value,
max_file_size: document.getElementById('maxFileSize').value, max_file_size: document.getElementById('maxFileSize').value,
remove_stalled_after: document.getElementById('removeStalledAfter').value,
url_base: document.getElementById('urlBase').value, url_base: document.getElementById('urlBase').value,
bind_address: document.getElementById('bindAddress').value, bind_address: document.getElementById('bindAddress').value,
port: document.getElementById('port').value, port: document.getElementById('port').value,