Fix issues with dupliacte names; other minor bug fixes
This commit is contained in:
@@ -2,6 +2,7 @@ package debrid
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"cmp"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -319,8 +320,7 @@ func (c *Cache) Sync() error {
|
|||||||
c.logger.Info().Msgf("Found %d deleted torrents", len(deletedTorrents))
|
c.logger.Info().Msgf("Found %d deleted torrents", len(deletedTorrents))
|
||||||
for _, id := range deletedTorrents {
|
for _, id := range deletedTorrents {
|
||||||
if _, ok := cachedTorrents[id]; ok {
|
if _, ok := cachedTorrents[id]; ok {
|
||||||
delete(cachedTorrents, id)
|
c.validateAndDeleteTorrents(deletedTorrents)
|
||||||
c.removeFromDB(id)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -422,6 +422,7 @@ func (c *Cache) GetTorrentFolder(torrent *types.Torrent) string {
|
|||||||
|
|
||||||
func (c *Cache) setTorrent(t *CachedTorrent) {
|
func (c *Cache) setTorrent(t *CachedTorrent) {
|
||||||
torrentKey := c.GetTorrentFolder(t.Torrent)
|
torrentKey := c.GetTorrentFolder(t.Torrent)
|
||||||
|
c.torrents.Store(t.Id, torrentKey) // Store the torrent id with the folder name(we might change the id after, hence why it's stored here)
|
||||||
if o, ok := c.torrentsNames.Load(torrentKey); ok && o.Id != t.Id {
|
if o, ok := c.torrentsNames.Load(torrentKey); ok && o.Id != t.Id {
|
||||||
// If another torrent with the same name exists, merge the files, if the same file exists,
|
// If another torrent with the same name exists, merge the files, if the same file exists,
|
||||||
// keep the one with the most recent added date
|
// keep the one with the most recent added date
|
||||||
@@ -434,7 +435,6 @@ func (c *Cache) setTorrent(t *CachedTorrent) {
|
|||||||
t.Files = mergedFiles
|
t.Files = mergedFiles
|
||||||
|
|
||||||
}
|
}
|
||||||
c.torrents.Store(t.Id, torrentKey)
|
|
||||||
c.torrentsNames.Store(torrentKey, t)
|
c.torrentsNames.Store(torrentKey, t)
|
||||||
c.SaveTorrent(t)
|
c.SaveTorrent(t)
|
||||||
}
|
}
|
||||||
@@ -442,6 +442,7 @@ func (c *Cache) setTorrent(t *CachedTorrent) {
|
|||||||
func (c *Cache) setTorrents(torrents map[string]*CachedTorrent) {
|
func (c *Cache) setTorrents(torrents map[string]*CachedTorrent) {
|
||||||
for _, t := range torrents {
|
for _, t := range torrents {
|
||||||
torrentKey := c.GetTorrentFolder(t.Torrent)
|
torrentKey := c.GetTorrentFolder(t.Torrent)
|
||||||
|
c.torrents.Store(t.Id, torrentKey)
|
||||||
if o, ok := c.torrentsNames.Load(torrentKey); ok && o.Id != t.Id {
|
if o, ok := c.torrentsNames.Load(torrentKey); ok && o.Id != t.Id {
|
||||||
// Save the most recent torrent
|
// Save the most recent torrent
|
||||||
if o.AddedOn.After(t.AddedOn) {
|
if o.AddedOn.After(t.AddedOn) {
|
||||||
@@ -450,7 +451,6 @@ func (c *Cache) setTorrents(torrents map[string]*CachedTorrent) {
|
|||||||
mergedFiles := mergeFiles(t, o)
|
mergedFiles := mergeFiles(t, o)
|
||||||
t.Files = mergedFiles
|
t.Files = mergedFiles
|
||||||
}
|
}
|
||||||
c.torrents.Store(t.Id, torrentKey)
|
|
||||||
c.torrentsNames.Store(torrentKey, t)
|
c.torrentsNames.Store(torrentKey, t)
|
||||||
}
|
}
|
||||||
c.RefreshListings(false)
|
c.RefreshListings(false)
|
||||||
@@ -649,6 +649,26 @@ func (c *Cache) DeleteTorrent(id string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Cache) validateAndDeleteTorrents(torrents []string) {
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
for _, torrent := range torrents {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(t string) {
|
||||||
|
defer wg.Done()
|
||||||
|
// Check if torrent is truly deleted
|
||||||
|
if a, err := c.client.GetTorrent(t); err != nil {
|
||||||
|
c.deleteTorrent(t, false) // Since it's removed from debrid already
|
||||||
|
} else {
|
||||||
|
c.logger.Trace().Msgf("Torrent %s is still present", t)
|
||||||
|
}
|
||||||
|
}(torrent)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
c.RefreshListings(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteTorrent deletes the torrent from the cache and debrid service
|
||||||
|
// It also handles torrents with the same name but different IDs
|
||||||
func (c *Cache) deleteTorrent(id string, removeFromDebrid bool) bool {
|
func (c *Cache) deleteTorrent(id string, removeFromDebrid bool) bool {
|
||||||
|
|
||||||
if torrentName, ok := c.torrents.Load(id); ok {
|
if torrentName, ok := c.torrents.Load(id); ok {
|
||||||
@@ -663,10 +683,12 @@ func (c *Cache) deleteTorrent(id string, removeFromDebrid bool) bool {
|
|||||||
if t, ok := c.torrentsNames.Load(torrentName); ok {
|
if t, ok := c.torrentsNames.Load(torrentName); ok {
|
||||||
|
|
||||||
newFiles := map[string]types.File{}
|
newFiles := map[string]types.File{}
|
||||||
newId := t.Id
|
newId := ""
|
||||||
for _, file := range t.Files {
|
for _, file := range t.Files {
|
||||||
if file.TorrentId != "" && file.TorrentId != id {
|
if file.TorrentId != "" && file.TorrentId != id {
|
||||||
|
if newId == "" && file.TorrentId != "" {
|
||||||
newId = file.TorrentId
|
newId = file.TorrentId
|
||||||
|
}
|
||||||
newFiles[file.Name] = file
|
newFiles[file.Name] = file
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -675,6 +697,7 @@ func (c *Cache) deleteTorrent(id string, removeFromDebrid bool) bool {
|
|||||||
c.torrentsNames.Delete(torrentName)
|
c.torrentsNames.Delete(torrentName)
|
||||||
} else {
|
} else {
|
||||||
t.Files = newFiles
|
t.Files = newFiles
|
||||||
|
newId = cmp.Or(newId, t.Id)
|
||||||
t.Id = newId
|
t.Id = newId
|
||||||
c.setTorrent(t)
|
c.setTorrent(t)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -59,7 +59,7 @@ func (c *Cache) GetDownloadLink(torrentName, filename, fileLink string) (string,
|
|||||||
func (c *Cache) fetchDownloadLink(torrentName, filename, fileLink string) (string, error) {
|
func (c *Cache) fetchDownloadLink(torrentName, filename, fileLink string) (string, error) {
|
||||||
ct := c.GetTorrentByName(torrentName)
|
ct := c.GetTorrentByName(torrentName)
|
||||||
if ct == nil {
|
if ct == nil {
|
||||||
return "", fmt.Errorf("torrent not found: %s", torrentName)
|
return "", fmt.Errorf("torrent not found")
|
||||||
}
|
}
|
||||||
file := ct.Files[filename]
|
file := ct.Files[filename]
|
||||||
|
|
||||||
@@ -67,7 +67,7 @@ func (c *Cache) fetchDownloadLink(torrentName, filename, fileLink string) (strin
|
|||||||
// file link is empty, refresh the torrent to get restricted links
|
// file link is empty, refresh the torrent to get restricted links
|
||||||
ct = c.refreshTorrent(file.TorrentId) // Refresh the torrent from the debrid
|
ct = c.refreshTorrent(file.TorrentId) // Refresh the torrent from the debrid
|
||||||
if ct == nil {
|
if ct == nil {
|
||||||
return "", fmt.Errorf("failed to refresh torrent: %s", torrentName)
|
return "", fmt.Errorf("failed to refresh torrent")
|
||||||
} else {
|
} else {
|
||||||
file = ct.Files[filename]
|
file = ct.Files[filename]
|
||||||
}
|
}
|
||||||
@@ -75,11 +75,10 @@ func (c *Cache) fetchDownloadLink(torrentName, filename, fileLink string) (strin
|
|||||||
|
|
||||||
// If file.Link is still empty, return
|
// If file.Link is still empty, return
|
||||||
if file.Link == "" {
|
if file.Link == "" {
|
||||||
c.logger.Debug().Msgf("File link is empty for %s. Release is probably nerfed", filename)
|
|
||||||
// Try to reinsert the torrent?
|
// Try to reinsert the torrent?
|
||||||
newCt, err := c.reInsertTorrent(ct)
|
newCt, err := c.reInsertTorrent(ct)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("failed to reinsert torrent: %s. %w", ct.Name, err)
|
return "", fmt.Errorf("failed to reinsert torrent. %w", err)
|
||||||
}
|
}
|
||||||
ct = newCt
|
ct = newCt
|
||||||
file = ct.Files[filename]
|
file = ct.Files[filename]
|
||||||
@@ -103,19 +102,19 @@ func (c *Cache) fetchDownloadLink(torrentName, filename, fileLink string) (strin
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
if downloadLink == nil {
|
if downloadLink == nil {
|
||||||
return "", fmt.Errorf("download link is empty for %s", file.Link)
|
return "", fmt.Errorf("download link is empty for")
|
||||||
}
|
}
|
||||||
c.updateDownloadLink(downloadLink)
|
c.updateDownloadLink(downloadLink)
|
||||||
return "", nil
|
return "", nil
|
||||||
} else if errors.Is(err, request.TrafficExceededError) {
|
} else if errors.Is(err, request.TrafficExceededError) {
|
||||||
// This is likely a fair usage limit error
|
// This is likely a fair usage limit error
|
||||||
c.logger.Error().Err(err).Msgf("Traffic exceeded for %s", ct.Name)
|
return "", err
|
||||||
} else {
|
} else {
|
||||||
return "", fmt.Errorf("failed to get download link: %w", err)
|
return "", fmt.Errorf("failed to get download link: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if downloadLink == nil {
|
if downloadLink == nil {
|
||||||
return "", fmt.Errorf("download link is empty for %s", file.Link)
|
return "", fmt.Errorf("download link is empty")
|
||||||
}
|
}
|
||||||
c.updateDownloadLink(downloadLink)
|
c.updateDownloadLink(downloadLink)
|
||||||
return downloadLink.DownloadLink, nil
|
return downloadLink.DownloadLink, nil
|
||||||
@@ -124,6 +123,7 @@ func (c *Cache) fetchDownloadLink(torrentName, filename, fileLink string) (strin
|
|||||||
func (c *Cache) GenerateDownloadLinks(t *CachedTorrent) {
|
func (c *Cache) GenerateDownloadLinks(t *CachedTorrent) {
|
||||||
if err := c.client.GenerateDownloadLinks(t.Torrent); err != nil {
|
if err := c.client.GenerateDownloadLinks(t.Torrent); err != nil {
|
||||||
c.logger.Error().Err(err).Msg("Failed to generate download links")
|
c.logger.Error().Err(err).Msg("Failed to generate download links")
|
||||||
|
return
|
||||||
}
|
}
|
||||||
for _, file := range t.Files {
|
for _, file := range t.Files {
|
||||||
if file.DownloadLink != nil {
|
if file.DownloadLink != nil {
|
||||||
@@ -131,7 +131,6 @@ func (c *Cache) GenerateDownloadLinks(t *CachedTorrent) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.SaveTorrent(t)
|
c.SaveTorrent(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -95,9 +95,17 @@ func (c *Cache) refreshTorrents() {
|
|||||||
currentTorrentIds[t.Id] = struct{}{}
|
currentTorrentIds[t.Id] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Because of how fast AddTorrent is, a torrent might be added before we check
|
// Let's implement deleting torrents removed from debrid
|
||||||
// So let's disable the deletion of torrents for now
|
deletedTorrents := make([]string, 0)
|
||||||
// Deletion now moved to the cleanupWorker
|
c.torrents.Range(func(key string, _ string) bool {
|
||||||
|
if _, exists := currentTorrentIds[key]; !exists {
|
||||||
|
deletedTorrents = append(deletedTorrents, key)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// Validate the torrents are truly deleted, then remove them from the cache too
|
||||||
|
go c.validateAndDeleteTorrents(deletedTorrents)
|
||||||
|
|
||||||
newTorrents := make([]*types.Torrent, 0)
|
newTorrents := make([]*types.Torrent, 0)
|
||||||
for _, t := range debTorrents {
|
for _, t := range debTorrents {
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
"github.com/sirrobot01/decypharr/internal/utils"
|
"github.com/sirrobot01/decypharr/internal/utils"
|
||||||
"github.com/sirrobot01/decypharr/pkg/debrid/types"
|
"github.com/sirrobot01/decypharr/pkg/debrid/types"
|
||||||
"slices"
|
"slices"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -215,4 +216,5 @@ func (c *Cache) reInsertTorrent(ct *CachedTorrent) (*CachedTorrent, error) {
|
|||||||
func (c *Cache) resetInvalidLinks() {
|
func (c *Cache) resetInvalidLinks() {
|
||||||
c.invalidDownloadLinks = xsync.NewMapOf[string, string]()
|
c.invalidDownloadLinks = xsync.NewMapOf[string, string]()
|
||||||
c.client.ResetActiveDownloadKeys() // Reset the active download keys
|
c.client.ResetActiveDownloadKeys() // Reset the active download keys
|
||||||
|
c.failedToReinsert = sync.Map{} // Reset the failed to reinsert map
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -37,42 +37,5 @@ func (c *Cache) StartSchedule() error {
|
|||||||
c.logger.Trace().Msgf("Next reset invalid download links job at: %s", t.Format("2006-01-02 15:04:05"))
|
c.logger.Trace().Msgf("Next reset invalid download links job at: %s", t.Format("2006-01-02 15:04:05"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Schedule the cleanup job
|
|
||||||
|
|
||||||
cleanupJob, err := utils.ScheduleJob(ctx, "1h", nil, c.cleanupWorker)
|
|
||||||
if err != nil {
|
|
||||||
c.logger.Error().Err(err).Msg("Failed to add cleanup job")
|
|
||||||
}
|
|
||||||
if t, err := cleanupJob.NextRun(); err == nil {
|
|
||||||
c.logger.Trace().Msgf("Next cleanup job at: %s", t.Format("2006-01-02 15:04:05"))
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cache) cleanupWorker() {
|
|
||||||
// Cleanup every hour
|
|
||||||
torrents, err := c.client.GetTorrents()
|
|
||||||
if err != nil {
|
|
||||||
c.logger.Error().Err(err).Msg("Failed to get torrents")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
idStore := make(map[string]struct{})
|
|
||||||
for _, t := range torrents {
|
|
||||||
idStore[t.Id] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
deletedTorrents := make([]string, 0)
|
|
||||||
c.torrents.Range(func(key string, _ string) bool {
|
|
||||||
if _, exists := idStore[key]; !exists {
|
|
||||||
deletedTorrents = append(deletedTorrents, key)
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
if len(deletedTorrents) > 0 {
|
|
||||||
c.DeleteTorrents(deletedTorrents)
|
|
||||||
c.logger.Info().Msgf("Deleted %d torrents", len(deletedTorrents))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -47,7 +47,6 @@ func (c *Cache) RefreshParentXml() error {
|
|||||||
return fmt.Errorf("failed to refresh XML for %s: %v", parent, err)
|
return fmt.Errorf("failed to refresh XML for %s: %v", parent, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
c.logger.Trace().Msgf("Refreshed XML cache for %s", c.client.GetName())
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -71,7 +71,7 @@ func (f *File) getDownloadLink() (string, error) {
|
|||||||
f.downloadLink = downloadLink
|
f.downloadLink = downloadLink
|
||||||
return downloadLink, nil
|
return downloadLink, nil
|
||||||
}
|
}
|
||||||
return "", fmt.Errorf("download link not found")
|
return "", os.ErrNotExist
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *File) stream() (*http.Response, error) {
|
func (f *File) stream() (*http.Response, error) {
|
||||||
@@ -203,7 +203,7 @@ func (f *File) Read(p []byte) (n int, err error) {
|
|||||||
// Make the request to get the file
|
// Make the request to get the file
|
||||||
resp, err := f.stream()
|
resp, err := f.stream()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, io.EOF
|
||||||
}
|
}
|
||||||
if resp == nil {
|
if resp == nil {
|
||||||
return 0, io.EOF
|
return 0, io.EOF
|
||||||
@@ -216,10 +216,7 @@ func (f *File) Read(p []byte) (n int, err error) {
|
|||||||
n, err = f.reader.Read(p)
|
n, err = f.reader.Read(p)
|
||||||
f.offset += int64(n)
|
f.offset += int64(n)
|
||||||
|
|
||||||
if err == io.EOF {
|
if err != nil {
|
||||||
f.reader.Close()
|
|
||||||
f.reader = nil
|
|
||||||
} else if err != nil {
|
|
||||||
f.reader.Close()
|
f.reader.Close()
|
||||||
f.reader = nil
|
f.reader = nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -361,6 +361,27 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if r.Method == "HEAD" {
|
||||||
|
f, err := h.OpenFile(r.Context(), r.URL.Path, os.O_RDONLY, 0)
|
||||||
|
if err != nil {
|
||||||
|
h.logger.Error().Err(err).Str("path", r.URL.Path).Msg("Failed to open file")
|
||||||
|
http.NotFound(w, r)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
fi, err := f.Stat()
|
||||||
|
if err != nil {
|
||||||
|
h.logger.Error().Err(err).Msg("Failed to stat file")
|
||||||
|
http.Error(w, "Server Error", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", getContentType(fi.Name()))
|
||||||
|
w.Header().Set("Content-Length", fmt.Sprintf("%d", fi.Size()))
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Fallback: for other methods, use the standard WebDAV handler.
|
// Fallback: for other methods, use the standard WebDAV handler.
|
||||||
handler := &webdav.Handler{
|
handler := &webdav.Handler{
|
||||||
FileSystem: h,
|
FileSystem: h,
|
||||||
|
|||||||
Reference in New Issue
Block a user