Updste repair
This commit is contained in:
@@ -919,6 +919,7 @@ func (r *RealDebrid) GetProfile() (*types.Profile, error) {
|
|||||||
Expiration: data.Expiration,
|
Expiration: data.Expiration,
|
||||||
Type: data.Type,
|
Type: data.Type,
|
||||||
}
|
}
|
||||||
|
r.Profile = profile
|
||||||
return profile, nil
|
return profile, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -570,6 +570,10 @@ func (c *Cache) GetTorrentByName(name string) *CachedTorrent {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Cache) GetTorrentsName() map[string]CachedTorrent {
|
||||||
|
return c.torrents.getAllByName()
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Cache) GetTorrent(torrentId string) *CachedTorrent {
|
func (c *Cache) GetTorrent(torrentId string) *CachedTorrent {
|
||||||
if torrent, ok := c.torrents.getByID(torrentId); ok {
|
if torrent, ok := c.torrents.getByID(torrentId); ok {
|
||||||
return &torrent
|
return &torrent
|
||||||
|
|||||||
@@ -293,6 +293,16 @@ func (tc *torrentCache) getAllCount() int {
|
|||||||
return len(tc.torrents.byID)
|
return len(tc.torrents.byID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tc *torrentCache) getAllByName() map[string]CachedTorrent {
|
||||||
|
tc.torrents.RLock()
|
||||||
|
defer tc.torrents.RUnlock()
|
||||||
|
results := make(map[string]CachedTorrent, len(tc.torrents.byName))
|
||||||
|
for name, torrent := range tc.torrents.byName {
|
||||||
|
results[name] = torrent
|
||||||
|
}
|
||||||
|
return results
|
||||||
|
}
|
||||||
|
|
||||||
func (tc *torrentCache) getIdMaps() map[string]struct{} {
|
func (tc *torrentCache) getIdMaps() map[string]struct{} {
|
||||||
tc.torrents.RLock()
|
tc.torrents.RLock()
|
||||||
defer tc.torrents.RUnlock()
|
defer tc.torrents.RUnlock()
|
||||||
|
|||||||
@@ -103,11 +103,17 @@ func (r *Repair) checkTorrentFiles(torrentPath string, files []arr.ContentFile,
|
|||||||
r.logger.Debug().Msgf("No cache found for %s. Skipping", debridName)
|
r.logger.Debug().Msgf("No cache found for %s. Skipping", debridName)
|
||||||
return files // Return all files as broken if no cache found
|
return files // Return all files as broken if no cache found
|
||||||
}
|
}
|
||||||
|
tor, ok := r.torrentsMap.Load(debridName)
|
||||||
|
if !ok {
|
||||||
|
r.logger.Debug().Msgf("Could not find torrents for %s. Skipping", debridName)
|
||||||
|
}
|
||||||
|
|
||||||
|
torrentsMap := tor.(map[string]store.CachedTorrent)
|
||||||
|
|
||||||
// Check if torrent exists
|
// Check if torrent exists
|
||||||
torrentName := filepath.Clean(filepath.Base(torrentPath))
|
torrentName := filepath.Clean(filepath.Base(torrentPath))
|
||||||
torrent := cache.GetTorrentByName(torrentName)
|
torrent, ok := torrentsMap[torrentName]
|
||||||
if torrent == nil {
|
if !ok {
|
||||||
r.logger.Debug().Msgf("No torrent found for %s. Skipping", torrentName)
|
r.logger.Debug().Msgf("No torrent found for %s. Skipping", torrentName)
|
||||||
return files // Return all files as broken if torrent not found
|
return files // Return all files as broken if torrent not found
|
||||||
}
|
}
|
||||||
@@ -118,7 +124,7 @@ func (r *Repair) checkTorrentFiles(torrentPath string, files []arr.ContentFile,
|
|||||||
filePaths[i] = file.TargetPath
|
filePaths[i] = file.TargetPath
|
||||||
}
|
}
|
||||||
|
|
||||||
brokenFilePaths := cache.GetBrokenFiles(torrent, filePaths)
|
brokenFilePaths := cache.GetBrokenFiles(&torrent, filePaths)
|
||||||
if len(brokenFilePaths) > 0 {
|
if len(brokenFilePaths) > 0 {
|
||||||
r.logger.Debug().Msgf("%d broken files found in %s", len(brokenFilePaths), torrentName)
|
r.logger.Debug().Msgf("%d broken files found in %s", len(brokenFilePaths), torrentName)
|
||||||
|
|
||||||
@@ -141,15 +147,9 @@ func (r *Repair) checkTorrentFiles(torrentPath string, files []arr.ContentFile,
|
|||||||
|
|
||||||
func (r *Repair) findDebridForPath(dir string, clients map[string]types.Client) string {
|
func (r *Repair) findDebridForPath(dir string, clients map[string]types.Client) string {
|
||||||
// Check cache first
|
// Check cache first
|
||||||
r.cacheMutex.RLock()
|
if debridName, exists := r.debridPathCache.Load(dir); exists {
|
||||||
if r.debridPathCache == nil {
|
return debridName.(string)
|
||||||
r.debridPathCache = make(map[string]string)
|
|
||||||
}
|
}
|
||||||
if debridName, exists := r.debridPathCache[dir]; exists {
|
|
||||||
r.cacheMutex.RUnlock()
|
|
||||||
return debridName
|
|
||||||
}
|
|
||||||
r.cacheMutex.RUnlock()
|
|
||||||
|
|
||||||
// Find debrid client
|
// Find debrid client
|
||||||
for _, client := range clients {
|
for _, client := range clients {
|
||||||
@@ -162,18 +162,14 @@ func (r *Repair) findDebridForPath(dir string, clients map[string]types.Client)
|
|||||||
debridName := client.Name()
|
debridName := client.Name()
|
||||||
|
|
||||||
// Cache the result
|
// Cache the result
|
||||||
r.cacheMutex.Lock()
|
r.debridPathCache.Store(dir, debridName)
|
||||||
r.debridPathCache[dir] = debridName
|
|
||||||
r.cacheMutex.Unlock()
|
|
||||||
|
|
||||||
return debridName
|
return debridName
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cache empty result to avoid repeated lookups
|
// Cache empty result to avoid repeated lookups
|
||||||
r.cacheMutex.Lock()
|
r.debridPathCache.Store(dir, "")
|
||||||
r.debridPathCache[dir] = ""
|
|
||||||
r.cacheMutex.Unlock()
|
|
||||||
|
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,8 +41,8 @@ type Repair struct {
|
|||||||
workers int
|
workers int
|
||||||
scheduler gocron.Scheduler
|
scheduler gocron.Scheduler
|
||||||
|
|
||||||
debridPathCache map[string]string // Cache for path -> debrid name mapping
|
debridPathCache sync.Map // debridPath:debridName cache.Emptied after each run
|
||||||
cacheMutex sync.RWMutex
|
torrentsMap sync.Map //debridName: map[string]*store.CacheTorrent. Emptied after each run
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -214,6 +214,27 @@ func (r *Repair) newJob(arrsNames []string, mediaIDs []string) *Job {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// initRun initializes the repair run, setting up necessary configurations, checks and caches
|
||||||
|
func (r *Repair) initRun(ctx context.Context) {
|
||||||
|
if r.useWebdav {
|
||||||
|
// Webdav use is enabled, initialize debrid torrent caches
|
||||||
|
caches := r.deb.Caches()
|
||||||
|
if len(caches) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for name, cache := range caches {
|
||||||
|
r.torrentsMap.Store(name, cache.GetTorrentsName())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// // onComplete is called when the repair job is completed
|
||||||
|
func (r *Repair) onComplete() {
|
||||||
|
// Set the cache maps to nil
|
||||||
|
r.torrentsMap = sync.Map{} // Clear the torrent map
|
||||||
|
r.debridPathCache = sync.Map{}
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Repair) preRunChecks() error {
|
func (r *Repair) preRunChecks() error {
|
||||||
|
|
||||||
if r.useWebdav {
|
if r.useWebdav {
|
||||||
@@ -271,6 +292,7 @@ func (r *Repair) AddJob(arrsNames []string, mediaIDs []string, autoProcess, recu
|
|||||||
job.CompletedAt = time.Now()
|
job.CompletedAt = time.Now()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
r.onComplete() // Clear caches and maps after job completion
|
||||||
}()
|
}()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -313,6 +335,9 @@ func (r *Repair) repair(job *Job) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Initialize the run
|
||||||
|
r.initRun(job.ctx)
|
||||||
|
|
||||||
// Use a mutex to protect concurrent access to brokenItems
|
// Use a mutex to protect concurrent access to brokenItems
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
brokenItems := map[string][]arr.ContentFile{}
|
brokenItems := map[string][]arr.ContentFile{}
|
||||||
@@ -475,16 +500,17 @@ func (r *Repair) repairArr(job *Job, _arr string, tmdbId string) ([]arr.ContentF
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, m := range media {
|
go func() {
|
||||||
select {
|
defer close(workerChan)
|
||||||
case <-job.ctx.Done():
|
for _, m := range media {
|
||||||
break
|
select {
|
||||||
default:
|
case <-job.ctx.Done():
|
||||||
workerChan <- m
|
return
|
||||||
|
case workerChan <- m:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}()
|
||||||
|
|
||||||
close(workerChan)
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
if len(brokenItems) == 0 {
|
if len(brokenItems) == 0 {
|
||||||
r.logger.Info().Msgf("No broken items found for %s", a.Name)
|
r.logger.Info().Msgf("No broken items found for %s", a.Name)
|
||||||
|
|||||||
Reference in New Issue
Block a user