- Add cancellation context

- Other bug fixes
This commit is contained in:
Mukhtar Akere
2025-05-17 21:23:43 +01:00
parent 35a74d8dba
commit 109d0a0c1c
8 changed files with 91 additions and 60 deletions

View File

@@ -131,12 +131,13 @@ func New(dc config.Debrid, client types.Client) *Cache {
customFolders = append(customFolders, name)
}
_log := logger.New(fmt.Sprintf("%s-webdav", client.GetName()))
c := &Cache{
dir: filepath.Join(cfg.Path, "cache", dc.Name), // path to save cache files
torrents: newTorrentCache(dirFilters),
client: client,
logger: logger.New(fmt.Sprintf("%s-webdav", client.GetName())),
logger: _log,
workers: dc.Workers,
downloadLinks: newDownloadLinkCache(),
torrentRefreshInterval: dc.TorrentsRefreshInterval,
@@ -171,12 +172,9 @@ func (c *Cache) Start(ctx context.Context) error {
c.refreshDownloadLinks()
}()
go func() {
err := c.StartSchedule()
if err != nil {
c.logger.Error().Err(err).Msg("Failed to start cache worker")
}
}()
if err := c.StartSchedule(); err != nil {
c.logger.Error().Err(err).Msg("Failed to start cache worker")
}
c.repairChan = make(chan RepairRequest, 100)
go c.repairWorker()
@@ -273,7 +271,12 @@ func (c *Cache) load() (map[string]CachedTorrent, error) {
// Feed work to workers
for _, file := range jsonFiles {
workChan <- file
select {
case <-c.ctx.Done():
break // Context cancelled
default:
workChan <- file
}
}
// Signal workers that no more work is coming

View File

@@ -54,14 +54,12 @@ func ProcessTorrent(d *Engine, magnet *utils.Magnet, a *arr.Arr, isSymlink, over
for index, db := range d.Clients {
logger := db.GetLogger()
logger.Info().Msgf("Processing debrid: %s", db.GetName())
logger.Info().Str("Debrid", db.GetName()).Str("Hash", debridTorrent.InfoHash).Msg("Processing torrent")
if !overrideDownloadUncached && a.DownloadUncached == nil {
debridTorrent.DownloadUncached = db.GetDownloadUncached()
}
logger.Info().Msgf("Torrent Hash: %s", debridTorrent.InfoHash)
//if db.GetCheckCached() {
// hash, exists := db.IsAvailable([]string{debridTorrent.InfoHash})[debridTorrent.InfoHash]
// if !exists || !hash {
@@ -78,7 +76,7 @@ func ProcessTorrent(d *Engine, magnet *utils.Magnet, a *arr.Arr, isSymlink, over
continue
}
dbt.Arr = a
logger.Info().Msgf("Torrent: %s(id=%s) submitted to %s", dbt.Name, dbt.Id, db.GetName())
logger.Info().Str("id", dbt.Id).Msgf("Torrent: %s submitted to %s", dbt.Name, db.GetName())
d.LastUsed = index
torrent, err := db.CheckStatus(dbt, isSymlink)

View File

@@ -47,6 +47,12 @@ func (c *Cache) refreshTorrents() {
return
}
select {
case <-c.ctx.Done():
return
default:
}
// Get all torrents from the debrid service
debTorrents, err := c.client.GetTorrents()
if err != nil {
@@ -123,6 +129,12 @@ func (c *Cache) refreshTorrents() {
func (c *Cache) refreshRclone() error {
cfg := c.config
select {
case <-c.ctx.Done():
return nil
default:
}
if cfg.RcUrl == "" {
return nil
}
@@ -196,6 +208,18 @@ func (c *Cache) refreshRclone() error {
}
func (c *Cache) refreshTorrent(torrentId string) *CachedTorrent {
if torrentId == "" {
c.logger.Error().Msg("Torrent ID is empty")
return nil
}
select {
case <-c.ctx.Done():
return nil
default:
}
torrent, err := c.client.GetTorrent(torrentId)
if err != nil {
c.logger.Error().Err(err).Msgf("Failed to get torrent %s", torrentId)
@@ -224,9 +248,17 @@ func (c *Cache) refreshDownloadLinks() {
return
}
select {
case <-c.ctx.Done():
return
default:
}
downloadLinks, err := c.client.GetDownloads()
if err != nil {
c.logger.Error().Err(err).Msg("Failed to get download links")
return
}
for k, v := range downloadLinks {
// if link is generated in the last 24 hours, add it to cache

View File

@@ -123,30 +123,43 @@ func (c *Cache) IsTorrentBroken(t *CachedTorrent, filenames []string) bool {
}
func (c *Cache) repairWorker() {
// This watches a channel for torrents to repair
for req := range c.repairChan {
torrentId := req.TorrentID
c.logger.Debug().Str("torrentId", req.TorrentID).Msg("Received repair request")
// This watches a channel for torrents to repair and can be cancelled via context
for {
select {
case <-c.ctx.Done():
// Context was cancelled, exit the goroutine
return
// Get the torrent from the cache
cachedTorrent := c.GetTorrent(torrentId)
if cachedTorrent == nil {
c.logger.Warn().Str("torrentId", torrentId).Msg("Torrent not found in cache")
continue
}
case req, ok := <-c.repairChan:
// Channel was closed
if !ok {
c.logger.Debug().Msg("Repair channel closed, shutting down worker")
return
}
switch req.Type {
case RepairTypeReinsert:
c.logger.Debug().Str("torrentId", torrentId).Msg("Reinserting torrent")
if _, err := c.reInsertTorrent(cachedTorrent); err != nil {
c.logger.Error().Err(err).Str("torrentId", cachedTorrent.Id).Msg("Failed to reinsert torrent")
torrentId := req.TorrentID
c.logger.Debug().Str("torrentId", req.TorrentID).Msg("Received repair request")
// Get the torrent from the cache
cachedTorrent := c.GetTorrent(torrentId)
if cachedTorrent == nil {
c.logger.Warn().Str("torrentId", torrentId).Msg("Torrent not found in cache")
continue
}
case RepairTypeDelete:
c.logger.Debug().Str("torrentId", torrentId).Msg("Deleting torrent")
if err := c.DeleteTorrent(torrentId); err != nil {
c.logger.Error().Err(err).Str("torrentId", torrentId).Msg("Failed to delete torrent")
continue
switch req.Type {
case RepairTypeReinsert:
c.logger.Debug().Str("torrentId", torrentId).Msg("Reinserting torrent")
if _, err := c.reInsertTorrent(cachedTorrent); err != nil {
c.logger.Error().Err(err).Str("torrentId", cachedTorrent.Id).Msg("Failed to reinsert torrent")
continue
}
case RepairTypeDelete:
c.logger.Debug().Str("torrentId", torrentId).Msg("Deleting torrent")
if err := c.DeleteTorrent(torrentId); err != nil {
c.logger.Error().Err(err).Str("torrentId", torrentId).Msg("Failed to delete torrent")
continue
}
}
}
}

View File

@@ -1,32 +1,30 @@
package debrid
import (
"context"
"github.com/sirrobot01/decypharr/internal/utils"
"time"
)
func (c *Cache) StartSchedule() error {
// For now, we just want to refresh the listing and download links
ctx := context.Background()
if _, err := utils.ScheduleJob(ctx, c.downloadLinksRefreshInterval, nil, c.refreshDownloadLinks); err != nil {
if _, err := utils.ScheduleJob(c.ctx, c.downloadLinksRefreshInterval, nil, c.refreshDownloadLinks); err != nil {
c.logger.Error().Err(err).Msg("Failed to add download link refresh job")
} else {
c.logger.Debug().Msgf("Download link refresh job scheduled every %s", c.downloadLinksRefreshInterval)
c.logger.Debug().Msgf("Download link refresh job scheduled for every %s", c.downloadLinksRefreshInterval)
}
if _, err := utils.ScheduleJob(ctx, c.torrentRefreshInterval, nil, c.refreshTorrents); err != nil {
if _, err := utils.ScheduleJob(c.ctx, c.torrentRefreshInterval, nil, c.refreshTorrents); err != nil {
c.logger.Error().Err(err).Msg("Failed to add torrent refresh job")
} else {
c.logger.Debug().Msgf("Torrent refresh job scheduled every %s", c.torrentRefreshInterval)
c.logger.Debug().Msgf("Torrent refresh job scheduled for every %s", c.torrentRefreshInterval)
}
// Schedule the reset invalid links job
// This job will run every at 00:00 CET
// and reset the invalid links in the cache
cet, _ := time.LoadLocation("CET")
if _, err := utils.ScheduleJob(ctx, "00:00", cet, c.resetInvalidLinks); err != nil {
if _, err := utils.ScheduleJob(c.ctx, "00:00", cet, c.resetInvalidLinks); err != nil {
c.logger.Error().Err(err).Msg("Failed to add reset invalid links job")
}