- Add more rclone supports
- Add rclone log viewer - Add more stats to Stats page - Fix some minor bugs
This commit is contained in:
+37
-42
@@ -3,6 +3,8 @@ package store
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
"github.com/sirrobot01/decypharr/internal/utils"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -25,58 +27,51 @@ func (s *Store) addToQueue(importReq *ImportRequest) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) StartQueueSchedule(ctx context.Context) error {
|
||||
// Start the slots processing in a separate goroutine
|
||||
go func() {
|
||||
if err := s.processSlotsQueue(ctx); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Error processing slots queue")
|
||||
}
|
||||
}()
|
||||
func (s *Store) StartQueueWorkers(ctx context.Context) error {
|
||||
// This function is responsible for starting the scheduled tasks
|
||||
|
||||
// Start the remove stalled torrents processing in a separate goroutine
|
||||
go func() {
|
||||
if err := s.processRemoveStalledTorrents(ctx); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Error processing remove stalled torrents")
|
||||
}
|
||||
}()
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
s.scheduler.RemoveByTags("decypharr-store")
|
||||
|
||||
func (s *Store) processSlotsQueue(ctx context.Context) error {
|
||||
s.trackAvailableSlots(ctx) // Initial tracking of available slots
|
||||
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
if jd, err := utils.ConvertToJobDef("30s"); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Failed to convert slots tracking interval to job definition")
|
||||
} else {
|
||||
// Schedule the job
|
||||
if _, err := s.scheduler.NewJob(jd, gocron.NewTask(func() {
|
||||
s.trackAvailableSlots(ctx)
|
||||
}), gocron.WithContext(ctx)); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Failed to create slots tracking job")
|
||||
} else {
|
||||
s.logger.Trace().Msgf("Download link refresh job scheduled for every %s", "30s")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) processRemoveStalledTorrents(ctx context.Context) error {
|
||||
if s.removeStalledAfter <= 0 {
|
||||
return nil // No need to remove stalled torrents if the duration is not set
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
if err := s.removeStalledTorrents(ctx); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Error removing stalled torrents")
|
||||
if s.removeStalledAfter > 0 {
|
||||
// Stalled torrents removal job
|
||||
if jd, err := utils.ConvertToJobDef("1m"); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Failed to convert remove stalled torrents interval to job definition")
|
||||
} else {
|
||||
// Schedule the job
|
||||
if _, err := s.scheduler.NewJob(jd, gocron.NewTask(func() {
|
||||
err := s.removeStalledTorrents(ctx)
|
||||
if err != nil {
|
||||
s.logger.Error().Err(err).Msg("Failed to process remove stalled torrents")
|
||||
}
|
||||
}), gocron.WithContext(ctx)); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Failed to create remove stalled torrents job")
|
||||
} else {
|
||||
s.logger.Trace().Msgf("Remove stalled torrents job scheduled for every %s", "1m")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Start the scheduler
|
||||
s.scheduler.Start()
|
||||
s.logger.Debug().Msg("Store worker started")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) trackAvailableSlots(ctx context.Context) {
|
||||
|
||||
+18
-1
@@ -3,6 +3,7 @@ package store
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/sirrobot01/decypharr/internal/config"
|
||||
"github.com/sirrobot01/decypharr/internal/logger"
|
||||
@@ -26,6 +27,7 @@ type Store struct {
|
||||
skipPreCache bool
|
||||
downloadSemaphore chan struct{}
|
||||
removeStalledAfter time.Duration // Duration after which stalled torrents are removed
|
||||
scheduler gocron.Scheduler
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -49,6 +51,11 @@ func Get() *Store {
|
||||
arrs := arr.NewStorage()
|
||||
deb := debrid.NewStorage(rcManager)
|
||||
|
||||
scheduler, err := gocron.NewScheduler(gocron.WithLocation(time.Local), gocron.WithGlobalJobOptions(gocron.WithTags("decypharr-store")))
|
||||
if err != nil {
|
||||
scheduler, _ = gocron.NewScheduler(gocron.WithGlobalJobOptions(gocron.WithTags("decypharr-store")))
|
||||
}
|
||||
|
||||
instance = &Store{
|
||||
repair: repair.New(arrs, deb),
|
||||
arr: arrs,
|
||||
@@ -56,10 +63,11 @@ func Get() *Store {
|
||||
rcloneManager: rcManager,
|
||||
torrents: newTorrentStorage(cfg.TorrentsFile()),
|
||||
logger: logger.Default(), // Use default logger [decypharr]
|
||||
refreshInterval: time.Duration(cmp.Or(qbitCfg.RefreshInterval, 10)) * time.Minute,
|
||||
refreshInterval: time.Duration(cmp.Or(qbitCfg.RefreshInterval, 30)) * time.Second,
|
||||
skipPreCache: qbitCfg.SkipPreCache,
|
||||
downloadSemaphore: make(chan struct{}, cmp.Or(qbitCfg.MaxDownloads, 5)),
|
||||
importsQueue: NewImportQueue(context.Background(), 1000),
|
||||
scheduler: scheduler,
|
||||
}
|
||||
if cfg.RemoveStalledAfter != "" {
|
||||
removeStalledAfter, err := time.ParseDuration(cfg.RemoveStalledAfter)
|
||||
@@ -88,6 +96,11 @@ func Reset() {
|
||||
// Close the semaphore channel to
|
||||
close(instance.downloadSemaphore)
|
||||
}
|
||||
|
||||
if instance.scheduler != nil {
|
||||
_ = instance.scheduler.StopJobs()
|
||||
_ = instance.scheduler.Shutdown()
|
||||
}
|
||||
}
|
||||
once = sync.Once{}
|
||||
instance = nil
|
||||
@@ -108,3 +121,7 @@ func (s *Store) Torrents() *TorrentStorage {
|
||||
func (s *Store) RcloneManager() *rclone.Manager {
|
||||
return s.rcloneManager
|
||||
}
|
||||
|
||||
func (s *Store) Scheduler() gocron.Scheduler {
|
||||
return s.scheduler
|
||||
}
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
package store
|
||||
|
||||
import "context"
|
||||
|
||||
func (s *Store) StartWorkers(ctx context.Context) {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
// Start debrid workers
|
||||
if err := s.Debrid().StartWorker(ctx); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Failed to start debrid worker")
|
||||
} else {
|
||||
s.logger.Debug().Msg("Started debrid worker")
|
||||
}
|
||||
|
||||
// Cache workers
|
||||
for _, cache := range s.Debrid().Caches() {
|
||||
if cache == nil {
|
||||
continue
|
||||
}
|
||||
go func() {
|
||||
if err := cache.StartWorker(ctx); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Failed to start debrid cache worker")
|
||||
} else {
|
||||
s.logger.Debug().Msgf("Started debrid cache worker for %s", cache.GetConfig().Name)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Store queue workers
|
||||
if err := s.StartQueueWorkers(ctx); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Failed to start store worker")
|
||||
} else {
|
||||
s.logger.Debug().Msg("Started store worker")
|
||||
}
|
||||
|
||||
// Arr workers
|
||||
if err := s.Arr().StartWorker(ctx); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Failed to start Arr worker")
|
||||
} else {
|
||||
s.logger.Debug().Msg("Started Arr worker")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user