fix bugs; move to gocron for scheduled jobs

This commit is contained in:
Mukhtar Akere
2025-04-21 23:23:35 +01:00
parent a27c5dd491
commit 32935ce3aa
15 changed files with 308 additions and 248 deletions
+50 -53
View File
@@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"github.com/go-co-op/gocron/v2"
"github.com/goccy/go-json"
"github.com/puzpuzpuz/xsync/v3"
"github.com/rs/zerolog"
@@ -82,51 +83,52 @@ type Cache struct {
repairsInProgress *xsync.MapOf[string, struct{}]
// config
workers int
torrentRefreshInterval time.Duration
downloadLinksRefreshInterval time.Duration
autoExpiresLinksAfter time.Duration
workers int
torrentRefreshInterval string
downloadLinksRefreshInterval string
autoExpiresLinksAfter string
autoExpiresLinksAfterDuration time.Duration
// refresh mutex
listingRefreshMu sync.RWMutex // for refreshing torrents
downloadLinksRefreshMu sync.RWMutex // for refreshing download links
torrentsRefreshMu sync.RWMutex // for refreshing torrents
scheduler gocron.Scheduler
saveSemaphore chan struct{}
ctx context.Context
}
func New(dc config.Debrid, client types.Client) *Cache {
cfg := config.Get()
torrentRefreshInterval, err := time.ParseDuration(dc.TorrentsRefreshInterval)
if err != nil {
torrentRefreshInterval = time.Second * 15
}
downloadLinksRefreshInterval, err := time.ParseDuration(dc.DownloadLinksRefreshInterval)
if err != nil {
downloadLinksRefreshInterval = time.Minute * 40
}
autoExpiresLinksAfter, err := time.ParseDuration(dc.AutoExpireLinksAfter)
if err != nil {
autoExpiresLinksAfter = time.Hour * 24
cet, _ := time.LoadLocation("CET")
s, _ := gocron.NewScheduler(gocron.WithLocation(cet))
autoExpiresLinksAfter, _ := time.ParseDuration(dc.AutoExpireLinksAfter)
if autoExpiresLinksAfter == 0 {
autoExpiresLinksAfter = 24 * time.Hour
}
return &Cache{
dir: path.Join(cfg.Path, "cache", dc.Name), // path to save cache files
torrents: xsync.NewMapOf[string, *CachedTorrent](),
torrentsNames: xsync.NewMapOf[string, *CachedTorrent](),
invalidDownloadLinks: xsync.NewMapOf[string, string](),
client: client,
logger: logger.New(fmt.Sprintf("%s-webdav", client.GetName())),
workers: dc.Workers,
downloadLinks: xsync.NewMapOf[string, downloadLinkCache](),
torrentRefreshInterval: torrentRefreshInterval,
downloadLinksRefreshInterval: downloadLinksRefreshInterval,
PropfindResp: xsync.NewMapOf[string, PropfindResponse](),
folderNaming: WebDavFolderNaming(dc.FolderNaming),
autoExpiresLinksAfter: autoExpiresLinksAfter,
repairsInProgress: xsync.NewMapOf[string, struct{}](),
saveSemaphore: make(chan struct{}, 50),
ctx: context.Background(),
dir: path.Join(cfg.Path, "cache", dc.Name), // path to save cache files
torrents: xsync.NewMapOf[string, *CachedTorrent](),
torrentsNames: xsync.NewMapOf[string, *CachedTorrent](),
invalidDownloadLinks: xsync.NewMapOf[string, string](),
client: client,
logger: logger.New(fmt.Sprintf("%s-webdav", client.GetName())),
workers: dc.Workers,
downloadLinks: xsync.NewMapOf[string, downloadLinkCache](),
torrentRefreshInterval: dc.TorrentsRefreshInterval,
downloadLinksRefreshInterval: dc.DownloadLinksRefreshInterval,
PropfindResp: xsync.NewMapOf[string, PropfindResponse](),
folderNaming: WebDavFolderNaming(dc.FolderNaming),
autoExpiresLinksAfter: dc.AutoExpireLinksAfter,
autoExpiresLinksAfterDuration: autoExpiresLinksAfter,
repairsInProgress: xsync.NewMapOf[string, struct{}](),
saveSemaphore: make(chan struct{}, 50),
ctx: context.Background(),
scheduler: s,
}
}
@@ -146,9 +148,9 @@ func (c *Cache) Start(ctx context.Context) error {
}()
go func() {
err := c.Refresh()
err := c.StartSchedule()
if err != nil {
c.logger.Error().Err(err).Msg("Failed to start cache refresh worker")
c.logger.Error().Err(err).Msg("Failed to start cache worker")
}
}()
@@ -584,16 +586,16 @@ func (c *Cache) ProcessTorrent(t *types.Torrent) error {
return nil
}
func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) string {
func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) (string, error) {
// Check link cache
if dl := c.checkDownloadLink(fileLink); dl != "" {
return dl
return dl, nil
}
ct := c.GetTorrent(torrentId)
if ct == nil {
return ""
return "", fmt.Errorf("torrent not found: %s", torrentId)
}
file := ct.Files[filename]
@@ -601,7 +603,7 @@ func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) string {
// file link is empty, refresh the torrent to get restricted links
ct = c.refreshTorrent(ct) // Refresh the torrent from the debrid
if ct == nil {
return ""
return "", fmt.Errorf("failed to refresh torrent: %s", torrentId)
} else {
file = ct.Files[filename]
}
@@ -613,23 +615,21 @@ func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) string {
// Try to reinsert the torrent?
newCt, err := c.reInsertTorrent(ct)
if err != nil {
c.logger.Error().Err(err).Msgf("Failed to reinsert torrent %s", ct.Name)
return ""
return "", fmt.Errorf("failed to reinsert torrent: %s. %w", ct.Name, err)
}
ct = newCt
file = ct.Files[filename]
c.logger.Debug().Msgf("Reinserted torrent %s", ct.Name)
}
c.logger.Trace().Msgf("Getting download link for %s", filename)
c.logger.Trace().Msgf("Getting download link for %s(%s)", filename, file.Link)
downloadLink, err := c.client.GetDownloadLink(ct.Torrent, &file)
if err != nil {
if errors.Is(err, request.HosterUnavailableError) {
c.logger.Error().Err(err).Msgf("Hoster is unavailable. Triggering repair for %s", ct.Name)
newCt, err := c.reInsertTorrent(ct)
if err != nil {
c.logger.Error().Err(err).Msgf("Failed to reinsert torrent %s", ct.Name)
return ""
return "", fmt.Errorf("failed to reinsert torrent: %w", err)
}
ct = newCt
c.logger.Debug().Msgf("Reinserted torrent %s", ct.Name)
@@ -637,30 +637,26 @@ func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) string {
// Retry getting the download link
downloadLink, err = c.client.GetDownloadLink(ct.Torrent, &file)
if err != nil {
c.logger.Error().Err(err).Msgf("Failed to get download link for %s", file.Link)
return ""
return "", err
}
if downloadLink == nil {
c.logger.Debug().Msgf("Download link is empty for %s", file.Link)
return ""
return "", fmt.Errorf("download link is empty for %s", file.Link)
}
c.updateDownloadLink(downloadLink)
return downloadLink.DownloadLink
return "", nil
} else if errors.Is(err, request.TrafficExceededError) {
// This is likely a fair usage limit error
c.logger.Error().Err(err).Msgf("Traffic exceeded for %s", ct.Name)
} else {
c.logger.Error().Err(err).Msgf("Failed to get download link for %s", file.Link)
return ""
return "", fmt.Errorf("failed to get download link: %w", err)
}
}
if downloadLink == nil {
c.logger.Debug().Msgf("Download link is empty for %s", file.Link)
return ""
return "", fmt.Errorf("download link is empty for %s", file.Link)
}
c.updateDownloadLink(downloadLink)
return downloadLink.DownloadLink
return downloadLink.DownloadLink, nil
}
func (c *Cache) GenerateDownloadLinks(t *CachedTorrent) {
@@ -700,10 +696,11 @@ func (c *Cache) AddTorrent(t *types.Torrent) error {
}
func (c *Cache) updateDownloadLink(dl *types.DownloadLink) {
expiresAt, _ := time.ParseDuration(c.autoExpiresLinksAfter)
c.downloadLinks.Store(dl.Link, downloadLinkCache{
Id: dl.Id,
Link: dl.DownloadLink,
ExpiresAt: time.Now().Add(c.autoExpiresLinksAfter),
ExpiresAt: time.Now().Add(expiresAt),
AccountId: dl.AccountId,
})
}
+2 -2
View File
@@ -241,12 +241,12 @@ func (c *Cache) refreshDownloadLinks() {
for k, v := range downloadLinks {
// if link is generated in the last 24 hours, add it to cache
timeSince := time.Since(v.Generated)
if timeSince < c.autoExpiresLinksAfter {
if timeSince < c.autoExpiresLinksAfterDuration {
c.downloadLinks.Store(k, downloadLinkCache{
Id: v.Id,
AccountId: v.AccountId,
Link: v.DownloadLink,
ExpiresAt: v.Generated.Add(c.autoExpiresLinksAfter - timeSince),
ExpiresAt: v.Generated.Add(c.autoExpiresLinksAfterDuration - timeSince),
})
} else {
c.downloadLinks.Delete(k)
+2 -2
View File
@@ -133,8 +133,8 @@ func (c *Cache) reInsertTorrent(ct *CachedTorrent) (*CachedTorrent, error) {
torrent.DownloadUncached = false // Set to false, avoid re-downloading
torrent, err = c.client.CheckStatus(torrent, true)
if err != nil && torrent != nil {
// Torrent is likely in progress
_ = c.DeleteTorrent(torrent.Id)
// Torrent is likely uncached, delete it
_ = c.client.DeleteTorrent(torrent.Id) // Delete the newly added un-cached torrent
return ct, fmt.Errorf("failed to check status: %w", err)
}
if torrent == nil {
+58 -89
View File
@@ -1,109 +1,78 @@
package debrid
import "time"
import (
"context"
"github.com/sirrobot01/decypharr/internal/utils"
"time"
)
func (c *Cache) Refresh() error {
func (c *Cache) StartSchedule() error {
// For now, we just want to refresh the listing and download links
go c.refreshDownloadLinksWorker()
go c.refreshTorrentsWorker()
go c.resetInvalidLinksWorker()
go c.cleanupWorker()
return nil
}
func (c *Cache) refreshDownloadLinksWorker() {
refreshTicker := time.NewTicker(c.downloadLinksRefreshInterval)
defer refreshTicker.Stop()
for range refreshTicker.C {
c.refreshDownloadLinks()
}
}
func (c *Cache) refreshTorrentsWorker() {
refreshTicker := time.NewTicker(c.torrentRefreshInterval)
defer refreshTicker.Stop()
for range refreshTicker.C {
c.refreshTorrents()
}
}
func (c *Cache) resetInvalidLinksWorker() {
// Calculate time until next 00:00 CET
now := time.Now()
loc, err := time.LoadLocation("CET")
ctx := context.Background()
downloadLinkJob, err := utils.ScheduleJob(ctx, c.downloadLinksRefreshInterval, nil, c.refreshDownloadLinks)
if err != nil {
// Fallback if CET timezone can't be loaded
c.logger.Error().Err(err).Msg("Failed to load CET timezone, using local time")
loc = time.Local
c.logger.Error().Err(err).Msg("Failed to add download link refresh job")
}
if t, err := downloadLinkJob.NextRun(); err == nil {
c.logger.Trace().Msgf("Next download link refresh job: %s", t.Format("2006-01-02 15:04:05"))
}
nowInCET := now.In(loc)
next := time.Date(
nowInCET.Year(),
nowInCET.Month(),
nowInCET.Day(),
0, 0, 0, 0,
loc,
)
// If it's already past 12:00 CET today, schedule for tomorrow
if nowInCET.After(next) {
next = next.Add(24 * time.Hour)
torrentJob, err := utils.ScheduleJob(ctx, c.torrentRefreshInterval, nil, c.refreshTorrents)
if err != nil {
c.logger.Error().Err(err).Msg("Failed to add torrent refresh job")
}
if t, err := torrentJob.NextRun(); err == nil {
c.logger.Trace().Msgf("Next torrent refresh job: %s", t.Format("2006-01-02 15:04:05"))
}
// Duration until next 12:00 CET
initialWait := next.Sub(nowInCET)
// Set up initial timer
timer := time.NewTimer(initialWait)
defer timer.Stop()
c.logger.Debug().Msgf("Scheduled Links Reset at %s (in %s)", next.Format("2006-01-02 15:04:05 MST"), initialWait)
// Wait for the first execution
<-timer.C
c.resetInvalidLinks()
// Now set up the daily ticker
refreshTicker := time.NewTicker(24 * time.Hour)
defer refreshTicker.Stop()
for range refreshTicker.C {
c.resetInvalidLinks()
// Schedule the reset invalid links job
// This job will run every 24 hours
// and reset the invalid links in the cache
cet, _ := time.LoadLocation("CET")
resetLinksJob, err := utils.ScheduleJob(ctx, "00:00", cet, c.resetInvalidLinks)
if err != nil {
c.logger.Error().Err(err).Msg("Failed to add reset invalid links job")
}
if t, err := resetLinksJob.NextRun(); err == nil {
c.logger.Trace().Msgf("Next reset invalid download links job at: %s", t.Format("2006-01-02 15:04:05"))
}
// Schedule the cleanup job
cleanupJob, err := utils.ScheduleJob(ctx, "1h", nil, c.cleanupWorker)
if err != nil {
c.logger.Error().Err(err).Msg("Failed to add cleanup job")
}
if t, err := cleanupJob.NextRun(); err == nil {
c.logger.Trace().Msgf("Next cleanup job at: %s", t.Format("2006-01-02 15:04:05"))
}
return nil
}
func (c *Cache) cleanupWorker() {
// Cleanup every hour
// Removes deleted torrents from the cache
torrents, err := c.client.GetTorrents()
if err != nil {
c.logger.Error().Err(err).Msg("Failed to get torrents")
return
}
ticker := time.NewTicker(1 * time.Hour)
idStore := make(map[string]struct{})
for _, t := range torrents {
idStore[t.Id] = struct{}{}
}
for range ticker.C {
torrents, err := c.client.GetTorrents()
if err != nil {
c.logger.Error().Err(err).Msg("Failed to get torrents")
continue
deletedTorrents := make([]string, 0)
c.torrents.Range(func(key string, _ *CachedTorrent) bool {
if _, exists := idStore[key]; !exists {
deletedTorrents = append(deletedTorrents, key)
}
return true
})
idStore := make(map[string]struct{})
for _, t := range torrents {
idStore[t.Id] = struct{}{}
}
deletedTorrents := make([]string, 0)
c.torrents.Range(func(key string, _ *CachedTorrent) bool {
if _, exists := idStore[key]; !exists {
deletedTorrents = append(deletedTorrents, key)
}
return true
})
if len(deletedTorrents) > 0 {
c.DeleteTorrents(deletedTorrents)
c.logger.Info().Msgf("Deleted %d torrents", len(deletedTorrents))
}
if len(deletedTorrents) > 0 {
c.DeleteTorrents(deletedTorrents)
c.logger.Info().Msgf("Deleted %d torrents", len(deletedTorrents))
}
}
+31 -43
View File
@@ -450,16 +450,18 @@ func (r *RealDebrid) _getDownloadLink(file *types.File) (*types.DownloadLink, er
return nil, err
}
switch data.ErrorCode {
case 19:
return nil, request.HosterUnavailableError // File has been removed
case 23:
return nil, request.TrafficExceededError
case 24:
return nil, request.HosterUnavailableError // Link has been nerfed
case 19:
return nil, request.HosterUnavailableError // File has been removed
case 36:
return nil, request.TrafficExceededError // traffic exceeded
case 34:
return nil, request.TrafficExceededError // traffic exceeded
case 35:
return nil, request.HosterUnavailableError
case 36:
return nil, request.TrafficExceededError // traffic exceeded
default:
return nil, fmt.Errorf("realdebrid API error: Status: %d || Code: %d", resp.StatusCode, data.ErrorCode)
}
@@ -489,48 +491,36 @@ func (r *RealDebrid) GetDownloadLink(t *types.Torrent, file *types.File) (*types
if r.currentDownloadKey == "" {
// If no download key is set, use the first one
r.DownloadKeys.Range(func(key string, value types.Account) bool {
if !value.Disabled {
r.currentDownloadKey = value.Token
return false
}
return true
})
accounts := r.getActiveAccounts()
if len(accounts) < 1 {
// No active download keys. It's likely that the key has reached bandwidth limit
return nil, fmt.Errorf("no active download keys")
}
r.currentDownloadKey = accounts[0].Token
}
r.downloadClient.SetHeader("Authorization", fmt.Sprintf("Bearer %s", r.currentDownloadKey))
downloadLink, err := r._getDownloadLink(file)
retries := 0
if err != nil {
accountsFunc := func() (*types.DownloadLink, error) {
accounts := r.getActiveAccounts()
var err error
if len(accounts) < 1 {
// No active download keys. It's likely that the key has reached bandwidth limit
return nil, fmt.Errorf("no active download keys")
}
for _, account := range accounts {
r.downloadClient.SetHeader("Authorization", fmt.Sprintf("Bearer %s", account.Token))
downloadLink, err := r._getDownloadLink(file)
if err != nil {
if errors.Is(err, request.TrafficExceededError) {
continue
}
// If the error is not traffic exceeded, skip generating the link with a new key
return nil, err
} else {
// If we successfully generated a link, break the loop
downloadLink.AccountId = account.ID
return downloadLink, nil
}
}
// If we reach here, it means all keys have been exhausted
if errors.Is(err, request.TrafficExceededError) {
return nil, request.TrafficExceededError
}
return nil, fmt.Errorf("failed to generate download link: %w", err)
if errors.Is(err, request.TrafficExceededError) {
// Retries generating
retries = 4
} else {
// If the error is not traffic exceeded, return the error
return nil, err
}
return accountsFunc()
}
for retries > 0 {
downloadLink, err = r._getDownloadLink(file)
if err == nil {
return downloadLink, nil
}
if !errors.Is(err, request.TrafficExceededError) {
return nil, err
}
// Add a delay before retrying
time.Sleep(5 * time.Second)
}
return downloadLink, nil
}
@@ -718,11 +708,9 @@ func (r *RealDebrid) DisableAccount(accountId string) {
r.logger.Info().Msgf("Cannot disable last account: %s", accountId)
return
}
r.currentDownloadKey = ""
if value, ok := r.DownloadKeys.Load(accountId); ok {
value.Disabled = true
if value.Token == r.currentDownloadKey {
r.currentDownloadKey = ""
}
r.DownloadKeys.Store(accountId, value)
r.logger.Info().Msgf("Disabled account Index: %s", value.ID)
}