From 109d0a0c1c6c831022d73a67017fb0108d870790 Mon Sep 17 00:00:00 2001 From: Mukhtar Akere Date: Sat, 17 May 2025 21:23:43 +0100 Subject: [PATCH] - Add cancellation context - Other bug fixes --- cmd/decypharr/main.go | 3 +- pkg/debrid/debrid/cache.go | 19 +++++++------ pkg/debrid/debrid/debrid.go | 6 ++-- pkg/debrid/debrid/refresh.go | 32 ++++++++++++++++++++++ pkg/debrid/debrid/repair.go | 53 ++++++++++++++++++++++-------------- pkg/debrid/debrid/worker.go | 12 ++++---- pkg/qbit/torrent.go | 2 +- pkg/service/service.go | 24 ++++------------ 8 files changed, 91 insertions(+), 60 deletions(-) diff --git a/cmd/decypharr/main.go b/cmd/decypharr/main.go index 8b17d87..f7b8579 100644 --- a/cmd/decypharr/main.go +++ b/cmd/decypharr/main.go @@ -72,6 +72,7 @@ func Start(ctx context.Context) error { // Reload configuration config.Reload() + service.Reset() // Start services again with new context go func() { @@ -123,7 +124,7 @@ func startServices(ctx context.Context) error { fmt.Printf(asciiArt, version.GetInfo(), cfg.LogLevel) - svc := service.New() + svc := service.GetService() _qbit := qbit.New() _webdav := webdav.New() diff --git a/pkg/debrid/debrid/cache.go b/pkg/debrid/debrid/cache.go index 17f75f0..4d702f3 100644 --- a/pkg/debrid/debrid/cache.go +++ b/pkg/debrid/debrid/cache.go @@ -131,12 +131,13 @@ func New(dc config.Debrid, client types.Client) *Cache { customFolders = append(customFolders, name) } + _log := logger.New(fmt.Sprintf("%s-webdav", client.GetName())) c := &Cache{ dir: filepath.Join(cfg.Path, "cache", dc.Name), // path to save cache files torrents: newTorrentCache(dirFilters), client: client, - logger: logger.New(fmt.Sprintf("%s-webdav", client.GetName())), + logger: _log, workers: dc.Workers, downloadLinks: newDownloadLinkCache(), torrentRefreshInterval: dc.TorrentsRefreshInterval, @@ -171,12 +172,9 @@ func (c *Cache) Start(ctx context.Context) error { c.refreshDownloadLinks() }() - go func() { - err := c.StartSchedule() - if err != nil { - c.logger.Error().Err(err).Msg("Failed to start cache worker") - } - }() + if err := c.StartSchedule(); err != nil { + c.logger.Error().Err(err).Msg("Failed to start cache worker") + } c.repairChan = make(chan RepairRequest, 100) go c.repairWorker() @@ -273,7 +271,12 @@ func (c *Cache) load() (map[string]CachedTorrent, error) { // Feed work to workers for _, file := range jsonFiles { - workChan <- file + select { + case <-c.ctx.Done(): + break // Context cancelled + default: + workChan <- file + } } // Signal workers that no more work is coming diff --git a/pkg/debrid/debrid/debrid.go b/pkg/debrid/debrid/debrid.go index fb9171b..2a4a0a2 100644 --- a/pkg/debrid/debrid/debrid.go +++ b/pkg/debrid/debrid/debrid.go @@ -54,14 +54,12 @@ func ProcessTorrent(d *Engine, magnet *utils.Magnet, a *arr.Arr, isSymlink, over for index, db := range d.Clients { logger := db.GetLogger() - logger.Info().Msgf("Processing debrid: %s", db.GetName()) + logger.Info().Str("Debrid", db.GetName()).Str("Hash", debridTorrent.InfoHash).Msg("Processing torrent") if !overrideDownloadUncached && a.DownloadUncached == nil { debridTorrent.DownloadUncached = db.GetDownloadUncached() } - logger.Info().Msgf("Torrent Hash: %s", debridTorrent.InfoHash) - //if db.GetCheckCached() { // hash, exists := db.IsAvailable([]string{debridTorrent.InfoHash})[debridTorrent.InfoHash] // if !exists || !hash { @@ -78,7 +76,7 @@ func ProcessTorrent(d *Engine, magnet *utils.Magnet, a *arr.Arr, isSymlink, over continue } dbt.Arr = a - logger.Info().Msgf("Torrent: %s(id=%s) submitted to %s", dbt.Name, dbt.Id, db.GetName()) + logger.Info().Str("id", dbt.Id).Msgf("Torrent: %s submitted to %s", dbt.Name, db.GetName()) d.LastUsed = index torrent, err := db.CheckStatus(dbt, isSymlink) diff --git a/pkg/debrid/debrid/refresh.go b/pkg/debrid/debrid/refresh.go index e0b9861..af54993 100644 --- a/pkg/debrid/debrid/refresh.go +++ b/pkg/debrid/debrid/refresh.go @@ -47,6 +47,12 @@ func (c *Cache) refreshTorrents() { return } + select { + case <-c.ctx.Done(): + return + default: + } + // Get all torrents from the debrid service debTorrents, err := c.client.GetTorrents() if err != nil { @@ -123,6 +129,12 @@ func (c *Cache) refreshTorrents() { func (c *Cache) refreshRclone() error { cfg := c.config + select { + case <-c.ctx.Done(): + return nil + default: + } + if cfg.RcUrl == "" { return nil } @@ -196,6 +208,18 @@ func (c *Cache) refreshRclone() error { } func (c *Cache) refreshTorrent(torrentId string) *CachedTorrent { + + if torrentId == "" { + c.logger.Error().Msg("Torrent ID is empty") + return nil + } + + select { + case <-c.ctx.Done(): + return nil + default: + } + torrent, err := c.client.GetTorrent(torrentId) if err != nil { c.logger.Error().Err(err).Msgf("Failed to get torrent %s", torrentId) @@ -224,9 +248,17 @@ func (c *Cache) refreshDownloadLinks() { return } + select { + case <-c.ctx.Done(): + return + default: + } + downloadLinks, err := c.client.GetDownloads() + if err != nil { c.logger.Error().Err(err).Msg("Failed to get download links") + return } for k, v := range downloadLinks { // if link is generated in the last 24 hours, add it to cache diff --git a/pkg/debrid/debrid/repair.go b/pkg/debrid/debrid/repair.go index 3eeb309..3151516 100644 --- a/pkg/debrid/debrid/repair.go +++ b/pkg/debrid/debrid/repair.go @@ -123,30 +123,43 @@ func (c *Cache) IsTorrentBroken(t *CachedTorrent, filenames []string) bool { } func (c *Cache) repairWorker() { - // This watches a channel for torrents to repair - for req := range c.repairChan { - torrentId := req.TorrentID - c.logger.Debug().Str("torrentId", req.TorrentID).Msg("Received repair request") + // This watches a channel for torrents to repair and can be cancelled via context + for { + select { + case <-c.ctx.Done(): + // Context was cancelled, exit the goroutine + return - // Get the torrent from the cache - cachedTorrent := c.GetTorrent(torrentId) - if cachedTorrent == nil { - c.logger.Warn().Str("torrentId", torrentId).Msg("Torrent not found in cache") - continue - } + case req, ok := <-c.repairChan: + // Channel was closed + if !ok { + c.logger.Debug().Msg("Repair channel closed, shutting down worker") + return + } - switch req.Type { - case RepairTypeReinsert: - c.logger.Debug().Str("torrentId", torrentId).Msg("Reinserting torrent") - if _, err := c.reInsertTorrent(cachedTorrent); err != nil { - c.logger.Error().Err(err).Str("torrentId", cachedTorrent.Id).Msg("Failed to reinsert torrent") + torrentId := req.TorrentID + c.logger.Debug().Str("torrentId", req.TorrentID).Msg("Received repair request") + + // Get the torrent from the cache + cachedTorrent := c.GetTorrent(torrentId) + if cachedTorrent == nil { + c.logger.Warn().Str("torrentId", torrentId).Msg("Torrent not found in cache") continue } - case RepairTypeDelete: - c.logger.Debug().Str("torrentId", torrentId).Msg("Deleting torrent") - if err := c.DeleteTorrent(torrentId); err != nil { - c.logger.Error().Err(err).Str("torrentId", torrentId).Msg("Failed to delete torrent") - continue + + switch req.Type { + case RepairTypeReinsert: + c.logger.Debug().Str("torrentId", torrentId).Msg("Reinserting torrent") + if _, err := c.reInsertTorrent(cachedTorrent); err != nil { + c.logger.Error().Err(err).Str("torrentId", cachedTorrent.Id).Msg("Failed to reinsert torrent") + continue + } + case RepairTypeDelete: + c.logger.Debug().Str("torrentId", torrentId).Msg("Deleting torrent") + if err := c.DeleteTorrent(torrentId); err != nil { + c.logger.Error().Err(err).Str("torrentId", torrentId).Msg("Failed to delete torrent") + continue + } } } } diff --git a/pkg/debrid/debrid/worker.go b/pkg/debrid/debrid/worker.go index 12369e7..8602144 100644 --- a/pkg/debrid/debrid/worker.go +++ b/pkg/debrid/debrid/worker.go @@ -1,32 +1,30 @@ package debrid import ( - "context" "github.com/sirrobot01/decypharr/internal/utils" "time" ) func (c *Cache) StartSchedule() error { // For now, we just want to refresh the listing and download links - ctx := context.Background() - if _, err := utils.ScheduleJob(ctx, c.downloadLinksRefreshInterval, nil, c.refreshDownloadLinks); err != nil { + if _, err := utils.ScheduleJob(c.ctx, c.downloadLinksRefreshInterval, nil, c.refreshDownloadLinks); err != nil { c.logger.Error().Err(err).Msg("Failed to add download link refresh job") } else { - c.logger.Debug().Msgf("Download link refresh job scheduled every %s", c.downloadLinksRefreshInterval) + c.logger.Debug().Msgf("Download link refresh job scheduled for every %s", c.downloadLinksRefreshInterval) } - if _, err := utils.ScheduleJob(ctx, c.torrentRefreshInterval, nil, c.refreshTorrents); err != nil { + if _, err := utils.ScheduleJob(c.ctx, c.torrentRefreshInterval, nil, c.refreshTorrents); err != nil { c.logger.Error().Err(err).Msg("Failed to add torrent refresh job") } else { - c.logger.Debug().Msgf("Torrent refresh job scheduled every %s", c.torrentRefreshInterval) + c.logger.Debug().Msgf("Torrent refresh job scheduled for every %s", c.torrentRefreshInterval) } // Schedule the reset invalid links job // This job will run every at 00:00 CET // and reset the invalid links in the cache cet, _ := time.LoadLocation("CET") - if _, err := utils.ScheduleJob(ctx, "00:00", cet, c.resetInvalidLinks); err != nil { + if _, err := utils.ScheduleJob(c.ctx, "00:00", cet, c.resetInvalidLinks); err != nil { c.logger.Error().Err(err).Msg("Failed to add reset invalid links job") } diff --git a/pkg/qbit/torrent.go b/pkg/qbit/torrent.go index c935398..6d1bca4 100644 --- a/pkg/qbit/torrent.go +++ b/pkg/qbit/torrent.go @@ -125,7 +125,7 @@ func (q *QBit) ProcessFiles(torrent *Torrent, debridTorrent *debridTypes.Torrent rclonePath := filepath.Join(debridTorrent.MountPath, cache.GetTorrentFolder(debridTorrent)) // /mnt/remote/realdebrid/MyTVShow torrentFolderNoExt := utils.RemoveExtension(debridTorrent.Name) torrentSymlinkPath, err = q.createSymlinksWebdav(debridTorrent, rclonePath, torrentFolderNoExt) // /mnt/symlinks/{category}/MyTVShow/ - q.logger.Debug().Msgf("Adding %s took %s", debridTorrent.Name, time.Since(timer)) + q.logger.Info().Msgf("Adding %s took %s", debridTorrent.Name, time.Since(timer)) } else { // User is using either zurg or debrid webdav diff --git a/pkg/service/service.go b/pkg/service/service.go index 5830598..e5c1cc3 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -18,8 +18,8 @@ var ( once sync.Once ) -func New() *Service { - once = sync.Once{} +// GetService returns the singleton instance +func GetService() *Service { once.Do(func() { arrs := arr.NewStorage() deb := debrid.NewEngine() @@ -32,23 +32,9 @@ func New() *Service { return instance } -// GetService returns the singleton instance -func GetService() *Service { - if instance == nil { - instance = New() - } - return instance -} - -func Update() *Service { - arrs := arr.NewStorage() - deb := debrid.NewEngine() - instance = &Service{ - Repair: repair.New(arrs, deb), - Arr: arrs, - Debrid: deb, - } - return instance +func Reset() { + once = sync.Once{} + instance = nil } func GetDebrid() *debrid.Engine {