- Cleanup webdav

- Include a re-insert fature for botched torrents
- Other minor bug fixes
This commit is contained in:
Mukhtar Akere
2025-03-31 06:11:04 +01:00
parent cf28f42db4
commit face86e151
18 changed files with 191 additions and 137 deletions

View File

@@ -207,15 +207,14 @@ func (ad *AllDebrid) CheckStatus(torrent *types.Torrent, isSymlink bool) (*types
return torrent, nil
}
func (ad *AllDebrid) DeleteTorrent(torrentId string) {
func (ad *AllDebrid) DeleteTorrent(torrentId string) error {
url := fmt.Sprintf("%s/magnet/delete?id=%s", ad.Host, torrentId)
req, _ := http.NewRequest(http.MethodGet, url, nil)
_, err := ad.client.MakeRequest(req)
if err == nil {
ad.logger.Info().Msgf("Torrent: %s deleted", torrentId)
} else {
ad.logger.Info().Msgf("Error deleting torrent: %s", err)
if _, err := ad.client.MakeRequest(req); err != nil {
return err
}
ad.logger.Info().Msgf("Torrent %s deleted from AD", torrentId)
return nil
}
func (ad *AllDebrid) GenerateDownloadLinks(t *types.Torrent) error {

View File

@@ -16,6 +16,7 @@ import (
"os"
"path/filepath"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
@@ -48,7 +49,15 @@ type downloadLinkCache struct {
ExpiresAt time.Time
}
type RepairType string
const (
RepairTypeReinsert RepairType = "reinsert"
RepairTypeDelete RepairType = "delete"
)
type RepairRequest struct {
Type RepairType
TorrentID string
Priority int
FileName string
@@ -85,7 +94,7 @@ type Cache struct {
ctx context.Context
}
func NewCache(dc config.Debrid, client types.Client) *Cache {
func New(dc config.Debrid, client types.Client) *Cache {
cfg := config.GetConfig()
torrentRefreshInterval, err := time.ParseDuration(dc.TorrentsRefreshInterval)
if err != nil {
@@ -122,6 +131,34 @@ func NewCache(dc config.Debrid, client types.Client) *Cache {
}
}
func (c *Cache) Start(ctx context.Context) error {
if err := os.MkdirAll(c.dir, 0755); err != nil {
return fmt.Errorf("failed to create cache directory: %w", err)
}
c.ctx = ctx
if err := c.Sync(); err != nil {
return fmt.Errorf("failed to sync cache: %w", err)
}
// initial download links
go func() {
c.refreshDownloadLinks()
}()
go func() {
err := c.Refresh()
if err != nil {
c.logger.Error().Err(err).Msg("Failed to start cache refresh worker")
}
}()
c.repairChan = make(chan RepairRequest, 100)
go c.repairWorker()
return nil
}
func (c *Cache) GetTorrentFolder(torrent *types.Torrent) string {
switch c.folderNaming {
case WebDavUseFileName:
@@ -165,34 +202,6 @@ func (c *Cache) GetListing() []os.FileInfo {
return nil
}
func (c *Cache) Start(ctx context.Context) error {
if err := os.MkdirAll(c.dir, 0755); err != nil {
return fmt.Errorf("failed to create cache directory: %w", err)
}
c.ctx = ctx
if err := c.Sync(); err != nil {
return fmt.Errorf("failed to sync cache: %w", err)
}
// initial download links
go func() {
c.refreshDownloadLinks()
}()
go func() {
err := c.Refresh()
if err != nil {
c.logger.Error().Err(err).Msg("Failed to start cache refresh worker")
}
}()
c.repairChan = make(chan RepairRequest, 100)
go c.repairWorker()
return nil
}
func (c *Cache) Close() error {
return nil
}
@@ -226,27 +235,27 @@ func (c *Cache) load() (map[string]*CachedTorrent, error) {
c.logger.Debug().Err(err).Msgf("Failed to unmarshal file: %s", filePath)
continue
}
isComplete := true
if len(ct.Files) != 0 {
// We can assume the torrent is complete
// Make sure no file has a duplicate link
linkStore := make(map[string]bool)
for _, f := range ct.Files {
if _, ok := linkStore[f.Link]; ok {
// Duplicate link, refresh the torrent
ct = *c.refreshTorrent(&ct)
break
} else {
linkStore[f.Link] = true
if f.Link == "" {
c.logger.Debug().Msgf("Torrent %s is not complete, missing link for file %s", ct.Id, f.Name)
isComplete = false
continue
}
}
addedOn, err := time.Parse(time.RFC3339, ct.Added)
if err != nil {
addedOn = now
if isComplete {
addedOn, err := time.Parse(time.RFC3339, ct.Added)
if err != nil {
addedOn = now
}
ct.AddedOn = addedOn
ct.IsComplete = true
torrents[ct.Id] = &ct
}
ct.AddedOn = addedOn
ct.IsComplete = true
torrents[ct.Id] = &ct
}
}
@@ -305,14 +314,26 @@ func (c *Cache) saveTorrent(ct *CachedTorrent) {
fileName := ct.Torrent.Id + ".json"
filePath := filepath.Join(c.dir, fileName)
tmpFile := filePath + ".tmp"
// Use a unique temporary filename for concurrent safety
tmpFile := filePath + ".tmp." + strconv.FormatInt(time.Now().UnixNano(), 10)
f, err := os.Create(tmpFile)
if err != nil {
c.logger.Debug().Err(err).Msgf("Failed to create file: %s", tmpFile)
return
}
defer f.Close()
// Track if we've closed the file
fileClosed := false
defer func() {
// Only close if not already closed
if !fileClosed {
_ = f.Close()
}
// Clean up the temp file if it still exists and rename failed
_ = os.Remove(tmpFile)
}()
w := bufio.NewWriter(f)
if _, err := w.Write(data); err != nil {
@@ -322,12 +343,17 @@ func (c *Cache) saveTorrent(ct *CachedTorrent) {
if err := w.Flush(); err != nil {
c.logger.Debug().Err(err).Msgf("Failed to flush data: %s", tmpFile)
return
}
// Close the file before renaming
_ = f.Close()
fileClosed = true
if err := os.Rename(tmpFile, filePath); err != nil {
c.logger.Debug().Err(err).Msgf("Failed to rename file: %s", tmpFile)
return
}
return
}
func (c *Cache) Sync() error {
@@ -453,6 +479,13 @@ func (c *Cache) ProcessTorrent(t *types.Torrent, refreshRclone bool) error {
return fmt.Errorf("failed to update torrent: %w", err)
}
}
// Validate each file in the torrent
for _, file := range t.Files {
if file.Link == "" {
c.logger.Debug().Msgf("Torrent %s is not complete, missing link for file %s. Triggering a reinsert", t.Id, file.Name)
c.reinsertTorrent(t)
}
}
addedOn, err := time.Parse(time.RFC3339, t.Added)
if err != nil {
@@ -586,15 +619,20 @@ func (c *Cache) GetClient() types.Client {
return c.client
}
func (c *Cache) DeleteTorrent(id string) {
func (c *Cache) DeleteTorrent(id string) error {
c.logger.Info().Msgf("Deleting torrent %s", id)
if t, ok := c.torrents.Load(id); ok {
err := c.client.DeleteTorrent(id)
if err != nil {
return err
}
c.torrents.Delete(id)
c.torrentsNames.Delete(c.GetTorrentFolder(t.Torrent))
c.removeFromDB(id)
c.refreshListings()
}
return nil
}
func (c *Cache) DeleteTorrents(ids []string) {
@@ -618,8 +656,11 @@ func (c *Cache) removeFromDB(torrentId string) {
func (c *Cache) OnRemove(torrentId string) {
c.logger.Debug().Msgf("OnRemove triggered for %s", torrentId)
c.DeleteTorrent(torrentId)
c.refreshListings()
err := c.DeleteTorrent(torrentId)
if err != nil {
c.logger.Error().Err(err).Msgf("Failed to delete torrent: %s", torrentId)
return
}
}
func (c *Cache) GetLogger() zerolog.Logger {

View File

@@ -21,7 +21,7 @@ func NewEngine() *Engine {
client := createDebridClient(dc)
logger := client.GetLogger()
if dc.UseWebDav {
caches[dc.Name] = NewCache(dc, client)
caches[dc.Name] = New(dc, client)
logger.Info().Msg("Debrid Service started with WebDAV")
} else {
logger.Info().Msg("Debrid Service started")

View File

@@ -270,6 +270,6 @@ func (c *Cache) refreshDownloadLinks() {
}
}
c.logger.Debug().Msgf("Refreshed %d download links", len(downloadLinks))
c.logger.Trace().Msgf("Refreshed %d download links", len(downloadLinks))
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/sirrobot01/debrid-blackhole/internal/utils"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"slices"
"time"
)
func (c *Cache) IsTorrentBroken(t *CachedTorrent, filenames []string) bool {
@@ -58,8 +59,6 @@ func (c *Cache) IsTorrentBroken(t *CachedTorrent, filenames []string) bool {
func (c *Cache) repairWorker() {
// This watches a channel for torrents to repair
c.logger.Info().Msg("Starting repair worker")
for {
select {
case req := <-c.repairChan:
@@ -80,24 +79,30 @@ func (c *Cache) repairWorker() {
continue
}
// Check if torrent is broken
if c.IsTorrentBroken(cachedTorrent, nil) {
c.logger.Info().Str("torrentId", torrentId).Msg("Repairing broken torrent")
// Repair torrent
if err := c.repairTorrent(cachedTorrent); err != nil {
c.logger.Error().Err(err).Str("torrentId", torrentId).Msg("Failed to repair torrent")
} else {
c.logger.Info().Str("torrentId", torrentId).Msg("Torrent repaired")
switch req.Type {
case RepairTypeReinsert:
c.logger.Debug().Str("torrentId", torrentId).Msg("Reinserting torrent")
c.reInsertTorrent(cachedTorrent)
case RepairTypeDelete:
c.logger.Debug().Str("torrentId", torrentId).Msg("Deleting torrent")
if err := c.DeleteTorrent(torrentId); err != nil {
c.logger.Error().Err(err).Str("torrentId", torrentId).Msg("Failed to delete torrent")
continue
}
} else {
c.logger.Debug().Str("torrentId", torrentId).Msg("Torrent is not broken")
}
c.repairsInProgress.Delete(torrentId)
}
}
}
func (c *Cache) SubmitForRepair(torrentId, fileName string) {
func (c *Cache) reInsertTorrent(t *CachedTorrent) {
// Reinsert the torrent into the cache
c.torrents.Store(t.Id, t)
c.logger.Debug().Str("torrentId", t.Id).Msg("Reinserted torrent into cache")
}
func (c *Cache) submitForRepair(repairType RepairType, torrentId, fileName string) {
// Submitting a torrent for repair.Not used yet
// Check if already in progress before even submitting
@@ -114,17 +119,14 @@ func (c *Cache) SubmitForRepair(torrentId, fileName string) {
}
}
func (c *Cache) repairTorrent(t *CachedTorrent) error {
func (c *Cache) reinsertTorrent(torrent *types.Torrent) error {
// Check if Magnet is not empty, if empty, reconstruct the magnet
if _, inProgress := c.repairsInProgress.Load(t.Id); inProgress {
c.logger.Debug().Str("torrentID", t.Id).Msg("Repair already in progress")
return nil
if _, ok := c.repairsInProgress.Load(torrent.Id); ok {
return fmt.Errorf("repair already in progress for torrent %s", torrent.Id)
}
torrent := t.Torrent
if torrent.Magnet == nil {
torrent.Magnet = utils.ConstructMagnet(t.InfoHash, t.Name)
torrent.Magnet = utils.ConstructMagnet(torrent.InfoHash, torrent.Name)
}
oldID := torrent.Id
@@ -146,12 +148,21 @@ func (c *Cache) repairTorrent(t *CachedTorrent) error {
return fmt.Errorf("failed to check status: %w", err)
}
c.client.DeleteTorrent(oldID) // delete the old torrent
c.DeleteTorrent(oldID) // Remove from listings
if err := c.DeleteTorrent(oldID); err != nil {
return fmt.Errorf("failed to delete old torrent: %w", err)
}
// Update the torrent in the cache
t.Torrent = torrent
c.setTorrent(t)
addedOn, err := time.Parse(time.RFC3339, torrent.Added)
if err != nil {
addedOn = time.Now()
}
ct := &CachedTorrent{
Torrent: torrent,
IsComplete: len(torrent.Files) > 0,
AddedOn: addedOn,
}
c.setTorrent(ct)
c.refreshListings()
c.repairsInProgress.Delete(oldID)

View File

@@ -4,7 +4,6 @@ import "time"
func (c *Cache) Refresh() error {
// For now, we just want to refresh the listing and download links
c.logger.Info().Msg("Starting cache refresh workers")
go c.refreshDownloadLinksWorker()
go c.refreshTorrentsWorker()
return nil

View File

@@ -224,15 +224,14 @@ func (dl *DebridLink) CheckStatus(torrent *types.Torrent, isSymlink bool) (*type
return torrent, nil
}
func (dl *DebridLink) DeleteTorrent(torrentId string) {
func (dl *DebridLink) DeleteTorrent(torrentId string) error {
url := fmt.Sprintf("%s/seedbox/%s/remove", dl.Host, torrentId)
req, _ := http.NewRequest(http.MethodDelete, url, nil)
_, err := dl.client.MakeRequest(req)
if err == nil {
dl.logger.Info().Msgf("Torrent: %s deleted", torrentId)
} else {
dl.logger.Info().Msgf("Error deleting torrent: %s", err)
if _, err := dl.client.MakeRequest(req); err != nil {
return err
}
dl.logger.Info().Msgf("Torrent: %s deleted from DebridLink", torrentId)
return nil
}
func (dl *DebridLink) GenerateDownloadLinks(t *types.Torrent) error {

View File

@@ -253,15 +253,14 @@ func (r *RealDebrid) CheckStatus(t *types.Torrent, isSymlink bool) (*types.Torre
return t, nil
}
func (r *RealDebrid) DeleteTorrent(torrentId string) {
func (r *RealDebrid) DeleteTorrent(torrentId string) error {
url := fmt.Sprintf("%s/torrents/delete/%s", r.Host, torrentId)
req, _ := http.NewRequest(http.MethodDelete, url, nil)
_, err := r.client.MakeRequest(req)
if err == nil {
r.logger.Info().Msgf("Torrent: %s deleted", torrentId)
} else {
r.logger.Info().Msgf("Error deleting torrent: %s", err)
if _, err := r.client.MakeRequest(req); err != nil {
return err
}
r.logger.Info().Msgf("Torrent: %s deleted from RD", torrentId)
return nil
}
func (r *RealDebrid) GenerateDownloadLinks(t *types.Torrent) error {
@@ -555,14 +554,13 @@ func (r *RealDebrid) GetMountPath() string {
func New(dc config.Debrid) *RealDebrid {
rl := request.ParseRateLimit(dc.RateLimit)
apiKeys := strings.Split(dc.APIKey, ",")
apiKeys := strings.Split(dc.DownloadAPIKeys, ",")
extraKeys := make([]string, 0)
if len(apiKeys) > 1 {
extraKeys = apiKeys[1:]
}
mainKey := apiKeys[0]
headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", mainKey),
"Authorization": fmt.Sprintf("Bearer %s", dc.APIKey),
}
_log := logger.NewLogger(dc.Name)
client := request.New().
@@ -573,7 +571,7 @@ func New(dc config.Debrid) *RealDebrid {
return &RealDebrid{
Name: "realdebrid",
Host: dc.Host,
APIKey: mainKey,
APIKey: dc.APIKey,
ExtraAPIKeys: extraKeys,
DownloadUncached: dc.DownloadUncached,
client: client,

View File

@@ -262,17 +262,16 @@ func (tb *Torbox) CheckStatus(torrent *types.Torrent, isSymlink bool) (*types.To
return torrent, nil
}
func (tb *Torbox) DeleteTorrent(torrentId string) {
func (tb *Torbox) DeleteTorrent(torrentId string) error {
url := fmt.Sprintf("%s/api/torrents/controltorrent/%s", tb.Host, torrentId)
payload := map[string]string{"torrent_id": torrentId, "action": "Delete"}
jsonPayload, _ := json.Marshal(payload)
req, _ := http.NewRequest(http.MethodDelete, url, bytes.NewBuffer(jsonPayload))
_, err := tb.client.MakeRequest(req)
if err == nil {
tb.logger.Info().Msgf("Torrent: %s deleted", torrentId)
} else {
tb.logger.Info().Msgf("Error deleting torrent: %s", err)
if _, err := tb.client.MakeRequest(req); err != nil {
return err
}
tb.logger.Info().Msgf("Torrent %s deleted from Torbox", torrentId)
return nil
}
func (tb *Torbox) GenerateDownloadLinks(t *types.Torrent) error {

View File

@@ -9,7 +9,7 @@ type Client interface {
CheckStatus(tr *Torrent, isSymlink bool) (*Torrent, error)
GenerateDownloadLinks(tr *Torrent) error
GetDownloadLink(tr *Torrent, file *File) (string, error)
DeleteTorrent(torrentId string)
DeleteTorrent(torrentId string) error
IsAvailable(infohashes []string) map[string]bool
GetCheckCached() bool
GetDownloadUncached() bool