- Re-enable refresh torrents

- Fix issues with re-inserts etc
- Fix getting torrents and updating
This commit is contained in:
Mukhtar Akere
2025-05-12 16:07:47 +01:00
parent 9de7cfd73b
commit 7c1bb52793
8 changed files with 108 additions and 121 deletions

View File

@@ -39,9 +39,16 @@ const (
type CachedTorrent struct {
*types.Torrent
AddedOn time.Time `json:"added_on"`
IsComplete bool `json:"is_complete"`
DuplicateIds []string `json:"duplicate_ids"`
AddedOn time.Time `json:"added_on"`
IsComplete bool `json:"is_complete"`
}
func (c CachedTorrent) copy() CachedTorrent {
return CachedTorrent{
Torrent: c.Torrent,
AddedOn: c.AddedOn,
IsComplete: c.IsComplete,
}
}
type RepairType string
@@ -176,9 +183,9 @@ func (c *Cache) Start(ctx context.Context) error {
return nil
}
func (c *Cache) load() (map[string]*CachedTorrent, error) {
torrents := make(map[string]*CachedTorrent)
var results sync.Map
func (c *Cache) load() (map[string]CachedTorrent, error) {
torrents := make(map[string]CachedTorrent)
mu := sync.Mutex{}
if err := os.MkdirAll(c.dir, 0755); err != nil {
return torrents, fmt.Errorf("failed to create cache directory: %w", err)
@@ -254,7 +261,9 @@ func (c *Cache) load() (map[string]*CachedTorrent, error) {
ct.IsComplete = true
ct.Files = fs
ct.Name = path.Clean(ct.Name)
results.Store(ct.Id, &ct)
mu.Lock()
torrents[ct.Id] = ct
mu.Unlock()
}
}
}
@@ -272,14 +281,6 @@ func (c *Cache) load() (map[string]*CachedTorrent, error) {
// Wait for all workers to complete
wg.Wait()
// Convert sync.Map to regular map
results.Range(func(key, value interface{}) bool {
id, _ := key.(string)
torrent, _ := value.(*CachedTorrent)
torrents[id] = torrent
return true
})
return torrents, nil
}
@@ -295,12 +296,17 @@ func (c *Cache) Sync() error {
return fmt.Errorf("failed to sync torrents: %v", err)
}
c.logger.Info().Msgf("Got %d torrents from %s", len(torrents), c.client.GetName())
totalTorrents := len(torrents)
c.logger.Info().Msgf("Got %d torrents from %s", totalTorrents, c.client.GetName())
newTorrents := make([]*types.Torrent, 0)
idStore := make(map[string]string, len(torrents))
idStore := make(map[string]struct{}, totalTorrents)
for _, t := range torrents {
idStore[t.Id] = t.Added
if _, exists := idStore[t.Id]; exists {
c.logger.Debug().Msgf("Torrent %s already exists in cache", t.Id)
}
idStore[t.Id] = struct{}{}
if _, ok := cachedTorrents[t.Id]; !ok {
newTorrents = append(newTorrents, t)
}
@@ -309,10 +315,6 @@ func (c *Cache) Sync() error {
// Check for deleted torrents
deletedTorrents := make([]string, 0)
for _, t := range cachedTorrents {
t.Added = idStore[t.Id]
if addedOn, err := time.Parse(time.RFC3339, t.Added); err == nil {
t.AddedOn = addedOn
}
if _, ok := idStore[t.Id]; !ok {
deletedTorrents = append(deletedTorrents, t.Id)
}
@@ -424,43 +426,34 @@ func (c *Cache) GetTorrentFolder(torrent *types.Torrent) string {
}
}
func (c *Cache) setTorrent(t *CachedTorrent, callback func(torrent *CachedTorrent)) {
func (c *Cache) setTorrent(t CachedTorrent, callback func(torrent CachedTorrent)) {
torrentName := c.GetTorrentFolder(t.Torrent)
torrentId := t.Id
updatedTorrent := t.copy()
if o, ok := c.torrents.getByName(torrentName); ok && o.Id != t.Id {
// If another torrent with the same name exists, merge the files, if the same file exists,
// keep the one with the most recent added date
// Save the most recent torrent
mergedFiles := mergeFiles(t, o) // Useful for merging files across multiple torrents, while keeping the most recent
if o.AddedOn.After(t.AddedOn) {
// Swap the new torrent to "become" the old one
t = o
}
t.Files = mergedFiles
mergedFiles := mergeFiles(o, updatedTorrent) // Useful for merging files across multiple torrents, while keeping the most recent
updatedTorrent.Files = mergedFiles
}
c.torrents.set(torrentId, torrentName, t)
c.SaveTorrent(t)
c.torrents.set(torrentName, t, updatedTorrent)
c.SaveTorrent(updatedTorrent)
if callback != nil {
callback(t)
callback(updatedTorrent)
}
}
func (c *Cache) setTorrents(torrents map[string]*CachedTorrent, callback func()) {
func (c *Cache) setTorrents(torrents map[string]CachedTorrent, callback func()) {
for _, t := range torrents {
torrentName := c.GetTorrentFolder(t.Torrent)
torrentId := t.Id
updatedTorrent := t.copy()
if o, ok := c.torrents.getByName(torrentName); ok && o.Id != t.Id {
// Save the most recent torrent
mergedFiles := mergeFiles(t, o) // Useful for merging files across multiple torrents, while keeping the most recent
if o.AddedOn.After(t.AddedOn) {
t = o
}
t.Files = mergedFiles
mergedFiles := mergeFiles(o, updatedTorrent)
updatedTorrent.Files = mergedFiles
}
c.torrents.set(torrentId, torrentName, t)
c.torrents.set(torrentName, t, updatedTorrent)
}
c.SaveTorrents()
if callback != nil {
@@ -492,20 +485,20 @@ func (c *Cache) Close() error {
return nil
}
func (c *Cache) GetTorrents() map[string]*CachedTorrent {
func (c *Cache) GetTorrents() map[string]CachedTorrent {
return c.torrents.getAll()
}
func (c *Cache) GetTorrentByName(name string) *CachedTorrent {
if torrent, ok := c.torrents.getByName(name); ok {
return torrent
return &torrent
}
return nil
}
func (c *Cache) GetTorrent(torrentId string) *CachedTorrent {
if torrent, ok := c.torrents.getByID(torrentId); ok {
return torrent
return &torrent
}
return nil
}
@@ -517,7 +510,7 @@ func (c *Cache) SaveTorrents() {
}
}
func (c *Cache) SaveTorrent(ct *CachedTorrent) {
func (c *Cache) SaveTorrent(ct CachedTorrent) {
marshaled, err := json.MarshalIndent(ct, "", " ")
if err != nil {
c.logger.Error().Err(err).Msgf("Failed to marshal torrent: %s", ct.Id)
@@ -617,12 +610,12 @@ func (c *Cache) ProcessTorrent(t *types.Torrent) error {
if err != nil {
addedOn = time.Now()
}
ct := &CachedTorrent{
ct := CachedTorrent{
Torrent: t,
IsComplete: len(t.Files) > 0,
AddedOn: addedOn,
}
c.setTorrent(ct, func(tor *CachedTorrent) {
c.setTorrent(ct, func(tor CachedTorrent) {
c.listingDebouncer.Call(false)
})
}
@@ -639,12 +632,12 @@ func (c *Cache) AddTorrent(t *types.Torrent) error {
if err != nil {
addedOn = time.Now()
}
ct := &CachedTorrent{
ct := CachedTorrent{
Torrent: t,
IsComplete: len(t.Files) > 0,
AddedOn: addedOn,
}
c.setTorrent(ct, func(tor *CachedTorrent) {
c.setTorrent(ct, func(tor CachedTorrent) {
c.RefreshListings(true)
})
go c.GenerateDownloadLinks(ct)
@@ -688,7 +681,7 @@ func (c *Cache) validateAndDeleteTorrents(torrents []string) {
// It also handles torrents with the same name but different IDs
func (c *Cache) deleteTorrent(id string, removeFromDebrid bool) bool {
if torrentName, ok := c.torrents.getByIDName(id); ok {
if torrent, ok := c.torrents.getByID(id); ok {
c.torrents.removeId(id) // Delete id from cache
defer func() {
c.removeFromDB(id)
@@ -697,6 +690,8 @@ func (c *Cache) deleteTorrent(id string, removeFromDebrid bool) bool {
}
}() // defer delete from debrid
torrentName := torrent.Name
if t, ok := c.torrents.getByName(torrentName); ok {
newFiles := map[string]types.File{}
@@ -716,7 +711,7 @@ func (c *Cache) deleteTorrent(id string, removeFromDebrid bool) bool {
t.Files = newFiles
newId = cmp.Or(newId, t.Id)
t.Id = newId
c.setTorrent(t, func(tor *CachedTorrent) {
c.setTorrent(t, func(tor CachedTorrent) {
c.RefreshListings(false)
})
}

View File

@@ -125,7 +125,7 @@ func (c *Cache) fetchDownloadLink(torrentName, filename, fileLink string) (strin
return downloadLink.DownloadLink, nil
}
func (c *Cache) GenerateDownloadLinks(t *CachedTorrent) {
func (c *Cache) GenerateDownloadLinks(t CachedTorrent) {
if err := c.client.GenerateDownloadLinks(t.Torrent); err != nil {
c.logger.Error().Err(err).Msg("Failed to generate download links")
return

View File

@@ -10,7 +10,7 @@ import (
// This is useful for deduplicating files across multiple torrents.
// The order of the torrents is determined by the AddedOn time, with the earliest added torrent first.
// If a file with the same name exists in multiple torrents, the last one will be used.
func mergeFiles(torrents ...*CachedTorrent) map[string]types.File {
func mergeFiles(torrents ...CachedTorrent) map[string]types.File {
merged := make(map[string]types.File)
// order torrents by added time

View File

@@ -70,8 +70,9 @@ func (c *Cache) refreshTorrents() {
}
}
// Validate the torrents are truly deleted, then remove them from the cache too
go c.validateAndDeleteTorrents(deletedTorrents)
if len(deletedTorrents) > 0 {
go c.validateAndDeleteTorrents(deletedTorrents)
}
newTorrents := make([]*types.Torrent, 0)
cachedIdsMaps := c.torrents.getIdMaps()
@@ -85,6 +86,8 @@ func (c *Cache) refreshTorrents() {
return
}
c.logger.Trace().Msgf("Found %d new torrents", len(newTorrents))
workChan := make(chan *types.Torrent, min(100, len(newTorrents)))
errChan := make(chan error, len(newTorrents))
var wg sync.WaitGroup
@@ -200,16 +203,16 @@ func (c *Cache) refreshTorrent(torrentId string) *CachedTorrent {
if err != nil {
addedOn = time.Now()
}
ct := &CachedTorrent{
ct := CachedTorrent{
Torrent: torrent,
AddedOn: addedOn,
IsComplete: len(torrent.Files) > 0,
}
c.setTorrent(ct, func(torrent *CachedTorrent) {
c.setTorrent(ct, func(torrent CachedTorrent) {
go c.listingDebouncer.Call(true)
})
return ct
return &ct
}
func (c *Cache) refreshDownloadLinks() {

View File

@@ -194,15 +194,17 @@ func (c *Cache) reInsertTorrent(ct *CachedTorrent) (*CachedTorrent, error) {
}
}
// Set torrent to newTorrent
ct = &CachedTorrent{
newCt := CachedTorrent{
Torrent: newTorrent,
AddedOn: addedOn,
IsComplete: len(newTorrent.Files) > 0,
}
c.setTorrent(ct, func(torrent *CachedTorrent) {
c.setTorrent(newCt, func(torrent CachedTorrent) {
c.listingDebouncer.Call(true)
})
ct = &newCt // Update ct to point to the new torrent
// We can safely delete the old torrent here
if oldID != "" {
if err := c.DeleteTorrent(oldID); err != nil {

View File

@@ -41,8 +41,8 @@ type directoryFilter struct {
type torrentCache struct {
mu sync.RWMutex
byID map[string]string
byName map[string]*CachedTorrent
byID map[string]CachedTorrent
byName map[string]CachedTorrent
listing atomic.Value
folderListing map[string][]os.FileInfo
folderListingMu sync.RWMutex
@@ -59,8 +59,8 @@ type sortableFile struct {
func newTorrentCache(dirFilters map[string][]directoryFilter) *torrentCache {
tc := &torrentCache{
byID: make(map[string]string),
byName: make(map[string]*CachedTorrent),
byID: make(map[string]CachedTorrent),
byName: make(map[string]CachedTorrent),
folderListing: make(map[string][]os.FileInfo),
directoriesFilters: dirFilters,
}
@@ -70,36 +70,36 @@ func newTorrentCache(dirFilters map[string][]directoryFilter) *torrentCache {
return tc
}
func (tc *torrentCache) getByID(id string) (*CachedTorrent, bool) {
func (tc *torrentCache) getByID(id string) (CachedTorrent, bool) {
tc.mu.RLock()
defer tc.mu.RUnlock()
torrent, exists := tc.byID[id]
if !exists {
return nil, false
}
t, ok := tc.byName[torrent]
return t, ok
return torrent, exists
}
func (tc *torrentCache) getByIDName(id string) (string, bool) {
tc.mu.RLock()
defer tc.mu.RUnlock()
name, exists := tc.byID[id]
return name, exists
}
func (tc *torrentCache) getByName(name string) (*CachedTorrent, bool) {
func (tc *torrentCache) getByName(name string) (CachedTorrent, bool) {
tc.mu.RLock()
defer tc.mu.RUnlock()
torrent, exists := tc.byName[name]
return torrent, exists
}
func (tc *torrentCache) set(id, name string, torrent *CachedTorrent) {
func (tc *torrentCache) set(name string, torrent, newTorrent CachedTorrent) {
tc.mu.Lock()
// Set the id first
tc.byID[newTorrent.Id] = torrent // This is the unadulterated torrent
tc.byName[name] = newTorrent // This is likely the modified torrent
tc.mu.Unlock()
tc.sortNeeded.Store(true)
}
func (tc *torrentCache) setMany(torrents map[string]CachedTorrent) {
tc.mu.Lock()
defer tc.mu.Unlock()
tc.byID[id] = name
tc.byName[name] = torrent
for id, torrent := range torrents {
tc.byID[id] = torrent
tc.byName[torrent.Name] = torrent
}
tc.sortNeeded.Store(true)
}
@@ -227,10 +227,10 @@ func (tc *torrentCache) torrentMatchDirectory(filters []directoryFilter, file so
return true
}
func (tc *torrentCache) getAll() map[string]*CachedTorrent {
func (tc *torrentCache) getAll() map[string]CachedTorrent {
tc.mu.RLock()
defer tc.mu.RUnlock()
result := make(map[string]*CachedTorrent)
result := make(map[string]CachedTorrent)
for name, torrent := range tc.byName {
result[name] = torrent
}
@@ -247,12 +247,12 @@ func (tc *torrentCache) getAllIDs() []string {
return ids
}
func (tc *torrentCache) getIdMaps() map[string]string {
func (tc *torrentCache) getIdMaps() map[string]struct{} {
tc.mu.RLock()
defer tc.mu.RUnlock()
res := make(map[string]string)
for id, name := range tc.byID {
res[id] = name
res := make(map[string]struct{}, len(tc.byID))
for id, _ := range tc.byID {
res[id] = struct{}{}
}
return res
}

View File

@@ -17,13 +17,13 @@ func (c *Cache) StartSchedule() error {
c.logger.Trace().Msgf("Next download link refresh job: %s", t.Format("2006-01-02 15:04:05"))
}
//torrentJob, err := utils.ScheduleJob(ctx, c.torrentRefreshInterval, nil, c.refreshTorrents)
//if err != nil {
// c.logger.Error().Err(err).Msg("Failed to add torrent refresh job")
//}
//if t, err := torrentJob.NextRun(); err == nil {
// c.logger.Trace().Msgf("Next torrent refresh job: %s", t.Format("2006-01-02 15:04:05"))
//}
torrentJob, err := utils.ScheduleJob(ctx, c.torrentRefreshInterval, nil, c.refreshTorrents)
if err != nil {
c.logger.Error().Err(err).Msg("Failed to add torrent refresh job")
}
if t, err := torrentJob.NextRun(); err == nil {
c.logger.Trace().Msgf("Next torrent refresh job: %s", t.Format("2006-01-02 15:04:05"))
}
// Schedule the reset invalid links job
// This job will run every 24 hours

View File

@@ -621,9 +621,6 @@ func (r *RealDebrid) getTorrents(offset int, limit int) (int, []*types.Torrent,
if t.Status != "downloaded" {
continue
}
if _, exists := filenames[t.Filename]; exists {
continue
}
torrents = append(torrents, &types.Torrent{
Id: t.Id,
Name: t.Filename,
@@ -648,32 +645,22 @@ func (r *RealDebrid) GetTorrents() ([]*types.Torrent, error) {
limit := 5000
// Get first batch and total count
totalItems, firstBatch, err := r.getTorrents(0, limit)
if err != nil {
return nil, err
}
allTorrents := firstBatch
// Calculate remaining requests
remaining := totalItems - len(firstBatch)
if remaining <= 0 {
return allTorrents, nil
}
// Prepare for concurrent fetching
allTorrents := make([]*types.Torrent, 0)
var fetchError error
// Calculate how many more requests we need
batchCount := (remaining + limit - 1) / limit // ceiling division
for i := 1; i <= batchCount; i++ {
_, batch, err := r.getTorrents(i*limit, limit)
offset := 0
for {
// Fetch next batch of torrents
_, torrents, err := r.getTorrents(offset, limit)
if err != nil {
fetchError = err
continue
break
}
allTorrents = append(allTorrents, batch...)
totalTorrents := len(torrents)
if totalTorrents == 0 {
break
}
allTorrents = append(allTorrents, torrents...)
offset += totalTorrents
}
if fetchError != nil {