142 lines
3.8 KiB
Go
142 lines
3.8 KiB
Go
package wire
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/go-co-op/gocron/v2"
|
|
"github.com/sirrobot01/decypharr/internal/utils"
|
|
"time"
|
|
)
|
|
|
|
func (s *Store) addToQueue(importReq *ImportRequest) error {
|
|
if importReq.Magnet == nil {
|
|
return fmt.Errorf("magnet is required")
|
|
}
|
|
|
|
if importReq.Arr == nil {
|
|
return fmt.Errorf("arr is required")
|
|
}
|
|
|
|
importReq.Status = "queued"
|
|
importReq.CompletedAt = time.Time{}
|
|
importReq.Error = nil
|
|
err := s.importsQueue.Push(importReq)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Store) StartQueueWorkers(ctx context.Context) error {
|
|
// This function is responsible for starting the scheduled tasks
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
|
|
s.scheduler.RemoveByTags("decypharr-store")
|
|
|
|
if jd, err := utils.ConvertToJobDef("30s"); err != nil {
|
|
s.logger.Error().Err(err).Msg("Failed to convert slots tracking interval to job definition")
|
|
} else {
|
|
// Schedule the job
|
|
if _, err := s.scheduler.NewJob(jd, gocron.NewTask(func() {
|
|
s.trackAvailableSlots(ctx)
|
|
}), gocron.WithContext(ctx)); err != nil {
|
|
s.logger.Error().Err(err).Msg("Failed to create slots tracking job")
|
|
} else {
|
|
s.logger.Trace().Msgf("Slots tracking job scheduled for every %s", "30s")
|
|
}
|
|
}
|
|
|
|
if s.removeStalledAfter > 0 {
|
|
// Stalled torrents removal job
|
|
if jd, err := utils.ConvertToJobDef("1m"); err != nil {
|
|
s.logger.Error().Err(err).Msg("Failed to convert remove stalled torrents interval to job definition")
|
|
} else {
|
|
// Schedule the job
|
|
if _, err := s.scheduler.NewJob(jd, gocron.NewTask(func() {
|
|
err := s.removeStalledTorrents(ctx)
|
|
if err != nil {
|
|
s.logger.Error().Err(err).Msg("Failed to process remove stalled torrents")
|
|
}
|
|
}), gocron.WithContext(ctx)); err != nil {
|
|
s.logger.Error().Err(err).Msg("Failed to create remove stalled torrents job")
|
|
} else {
|
|
s.logger.Trace().Msgf("Remove stalled torrents job scheduled for every %s", "1m")
|
|
}
|
|
}
|
|
}
|
|
|
|
// Start the scheduler
|
|
s.scheduler.Start()
|
|
s.logger.Debug().Msg("Store worker started")
|
|
return nil
|
|
}
|
|
|
|
func (s *Store) trackAvailableSlots(ctx context.Context) {
|
|
// This function tracks the available slots for each debrid client
|
|
availableSlots := make(map[string]int)
|
|
|
|
for name, deb := range s.debrid.Debrids() {
|
|
slots, err := deb.Client().GetAvailableSlots()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
availableSlots[name] = slots
|
|
}
|
|
|
|
if len(availableSlots) == 0 {
|
|
s.logger.Debug().Msg("No debrid clients available or no slots found")
|
|
return // No debrid clients or slots available, nothing to process
|
|
}
|
|
|
|
if s.importsQueue.Size() <= 0 {
|
|
// Queue is empty, no need to process
|
|
return
|
|
}
|
|
|
|
for name, slots := range availableSlots {
|
|
s.logger.Debug().Msgf("Available slots for %s: %d", name, slots)
|
|
// If slots are available, process the next import request from the queue
|
|
for slots > 0 {
|
|
select {
|
|
case <-ctx.Done():
|
|
return // Exit if context is done
|
|
default:
|
|
if err := s.processFromQueue(ctx); err != nil {
|
|
s.logger.Error().Err(err).Msg("Error processing from queue")
|
|
return // Exit on error
|
|
}
|
|
slots-- // Decrease the available slots after processing
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Store) processFromQueue(ctx context.Context) error {
|
|
// Pop the next import request from the queue
|
|
importReq, err := s.importsQueue.Pop()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if importReq == nil {
|
|
return nil
|
|
}
|
|
return s.AddTorrent(ctx, importReq)
|
|
}
|
|
|
|
func (s *Store) removeStalledTorrents(ctx context.Context) error {
|
|
// This function checks for stalled torrents and removes them
|
|
stalledTorrents := s.torrents.GetStalledTorrents(s.removeStalledAfter)
|
|
if len(stalledTorrents) == 0 {
|
|
return nil // No stalled torrents to remove
|
|
}
|
|
|
|
for _, torrent := range stalledTorrents {
|
|
s.logger.Warn().Msgf("Removing stalled torrent: %s", torrent.Name)
|
|
s.torrents.Delete(torrent.Hash, torrent.Category, true) // Remove from store and delete from debrid
|
|
}
|
|
|
|
return nil
|
|
}
|