- Add more limit to number of gorutines

- Add gorutine stats to logs
- Fix issues with repair
This commit is contained in:
Mukhtar Akere
2025-03-27 08:24:40 +01:00
parent 7bd38736b1
commit d49fbea60f
11 changed files with 163 additions and 139 deletions

View File

@@ -3,6 +3,7 @@ package decypharr
import (
"context"
"fmt"
"github.com/rs/zerolog"
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/internal/logger"
"github.com/sirrobot01/debrid-blackhole/pkg/proxy"
@@ -14,12 +15,26 @@ import (
"github.com/sirrobot01/debrid-blackhole/pkg/webdav"
"github.com/sirrobot01/debrid-blackhole/pkg/worker"
"os"
"runtime"
"runtime/debug"
"strconv"
"sync"
"syscall"
"time"
)
func monitorGoroutines(interval time.Duration, _log zerolog.Logger) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
_log.Debug().Msgf("Current goroutines: %d", runtime.NumGoroutine())
}
}
}
func Start(ctx context.Context) error {
if umaskStr := os.Getenv("UMASK"); umaskStr != "" {
@@ -106,6 +121,11 @@ func Start(ctx context.Context) error {
})
}
safeGo(func() error {
monitorGoroutines(1*time.Minute, _log)
return nil
})
go func() {
wg.Wait()
close(errChan)

View File

@@ -1,8 +1,10 @@
package arr
import (
"context"
"fmt"
"github.com/goccy/go-json"
"golang.org/x/sync/errgroup"
"net/http"
"strconv"
"strings"
@@ -155,20 +157,32 @@ func (a *Arr) searchSonarr(files []ContentFile) error {
id := fmt.Sprintf("%d-%d", f.Id, f.SeasonNumber)
ids[id] = nil
}
errs := make(chan error, len(ids))
g, ctx := errgroup.WithContext(context.Background())
// Limit concurrent goroutines
g.SetLimit(10)
for id := range ids {
go func() {
id := id
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
parts := strings.Split(id, "-")
if len(parts) != 2 {
return
return fmt.Errorf("invalid id: %s", id)
}
seriesId, err := strconv.Atoi(parts[0])
if err != nil {
return
return err
}
seasonNumber, err := strconv.Atoi(parts[1])
if err != nil {
return
return err
}
payload := sonarrSearch{
Name: "SeasonSearch",
@@ -177,20 +191,16 @@ func (a *Arr) searchSonarr(files []ContentFile) error {
}
resp, err := a.Request(http.MethodPost, "api/v3/command", payload)
if err != nil {
errs <- fmt.Errorf("failed to automatic search: %v", err)
return
return fmt.Errorf("failed to automatic search: %v", err)
}
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
errs <- fmt.Errorf("failed to automatic search. Status Code: %s", resp.Status)
return
return fmt.Errorf("failed to automatic search. Status Code: %s", resp.Status)
}
}()
return nil
})
}
for range ids {
err := <-errs
if err != nil {
return err
}
if err := g.Wait(); err != nil {
return err
}
return nil
}

View File

@@ -24,9 +24,11 @@ import (
type WebDavFolderNaming string
const (
WebDavUseFileName WebDavFolderNaming = "filename"
WebDavUseOriginalName WebDavFolderNaming = "original"
WebDavUseID WebDavFolderNaming = "use_id"
WebDavUseFileNameNoExt WebDavFolderNaming = "filename_no_ext"
WebDavUseOriginalNameNoExt WebDavFolderNaming = "original_no_ext"
WebDavUseID WebDavFolderNaming = "id"
)
type PropfindResponse struct {
@@ -78,6 +80,8 @@ type Cache struct {
listingRefreshMu sync.RWMutex // for refreshing torrents
downloadLinksRefreshMu sync.RWMutex // for refreshing download links
torrentsRefreshMu sync.RWMutex // for refreshing torrents
saveSemaphore chan struct{}
}
func NewCache(dc config.Debrid, client types.Client) *Cache {
@@ -99,7 +103,7 @@ func NewCache(dc config.Debrid, client types.Client) *Cache {
torrents: xsync.NewMapOf[string, *CachedTorrent](),
torrentsNames: xsync.NewMapOf[string, *CachedTorrent](),
client: client,
logger: logger.NewLogger(fmt.Sprintf("%s-cache", client.GetName())),
logger: logger.NewLogger(fmt.Sprintf("%s-webdav", client.GetName())),
workers: 200,
downloadLinks: xsync.NewMapOf[string, downloadLinkCache](),
torrentRefreshInterval: torrentRefreshInterval,
@@ -108,17 +112,25 @@ func NewCache(dc config.Debrid, client types.Client) *Cache {
folderNaming: WebDavFolderNaming(dc.WebDavFolderNaming),
autoExpiresLinksAfter: autoExpiresLinksAfter,
repairsInProgress: xsync.NewMapOf[string, bool](),
saveSemaphore: make(chan struct{}, 10),
}
}
func (c *Cache) GetTorrentFolder(torrent *types.Torrent) string {
folderName := torrent.Filename
if c.folderNaming == WebDavUseID {
folderName = torrent.Id
} else if c.folderNaming == WebDavUseOriginalNameNoExt {
folderName = utils.RemoveExtension(folderName)
switch c.folderNaming {
case WebDavUseFileName:
return torrent.Filename
case WebDavUseOriginalName:
return torrent.OriginalFilename
case WebDavUseFileNameNoExt:
return utils.RemoveExtension(torrent.Filename)
case WebDavUseOriginalNameNoExt:
return utils.RemoveExtension(torrent.OriginalFilename)
case WebDavUseID:
return torrent.Id
default:
return torrent.Filename
}
return folderName
}
func (c *Cache) setTorrent(t *CachedTorrent) {
@@ -126,11 +138,7 @@ func (c *Cache) setTorrent(t *CachedTorrent) {
c.torrentsNames.Store(c.GetTorrentFolder(t.Torrent), t)
go func() {
if err := c.SaveTorrent(t); err != nil {
c.logger.Debug().Err(err).Msgf("Failed to save torrent %s", t.Id)
}
}()
c.SaveTorrent(t)
}
func (c *Cache) setTorrents(torrents map[string]*CachedTorrent) {
@@ -141,11 +149,7 @@ func (c *Cache) setTorrents(torrents map[string]*CachedTorrent) {
c.refreshListings()
go func() {
if err := c.SaveTorrents(); err != nil {
c.logger.Debug().Err(err).Msgf("Failed to save torrents")
}
}()
c.SaveTorrents()
}
func (c *Cache) GetListing() []os.FileInfo {
@@ -260,20 +264,31 @@ func (c *Cache) GetTorrentByName(name string) *CachedTorrent {
return nil
}
func (c *Cache) SaveTorrents() error {
func (c *Cache) SaveTorrents() {
c.torrents.Range(func(key string, value *CachedTorrent) bool {
if err := c.SaveTorrent(value); err != nil {
c.logger.Debug().Err(err).Msgf("Failed to save torrent %s", key)
}
c.SaveTorrent(value)
return true
})
return nil
}
func (c *Cache) SaveTorrent(ct *CachedTorrent) error {
func (c *Cache) SaveTorrent(ct *CachedTorrent) {
// Try to acquire semaphore without blocking
select {
case c.saveSemaphore <- struct{}{}:
go func() {
defer func() { <-c.saveSemaphore }()
c.saveTorrent(ct)
}()
default:
c.saveTorrent(ct)
}
}
func (c *Cache) saveTorrent(ct *CachedTorrent) {
data, err := json.MarshalIndent(ct, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal torrent: %w", err)
c.logger.Debug().Err(err).Msgf("Failed to marshal torrent: %s", ct.Id)
return
}
fileName := ct.Torrent.Id + ".json"
@@ -282,20 +297,25 @@ func (c *Cache) SaveTorrent(ct *CachedTorrent) error {
f, err := os.Create(tmpFile)
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
c.logger.Debug().Err(err).Msgf("Failed to create file: %s", tmpFile)
return
}
defer f.Close()
w := bufio.NewWriter(f)
if _, err := w.Write(data); err != nil {
return fmt.Errorf("failed to write data: %w", err)
c.logger.Debug().Err(err).Msgf("Failed to write data: %s", tmpFile)
return
}
if err := w.Flush(); err != nil {
return fmt.Errorf("failed to flush data: %w", err)
c.logger.Debug().Err(err).Msgf("Failed to flush data: %s", tmpFile)
}
return os.Rename(tmpFile, filePath)
if err := os.Rename(tmpFile, filePath); err != nil {
c.logger.Debug().Err(err).Msgf("Failed to rename file: %s", tmpFile)
}
return
}
func (c *Cache) Sync() error {
@@ -508,11 +528,7 @@ func (c *Cache) GenerateDownloadLinks(t *CachedTorrent) {
c.updateDownloadLink(file.Link, file.DownloadLink)
}
go func() {
if err := c.SaveTorrent(t); err != nil {
c.logger.Debug().Err(err).Msgf("Failed to save torrent %s", t.Id)
}
}()
c.SaveTorrent(t)
}
func (c *Cache) AddTorrent(t *types.Torrent) error {
@@ -559,7 +575,7 @@ func (c *Cache) DeleteTorrent(id string) {
if t, ok := c.torrents.Load(id); ok {
c.torrents.Delete(id)
c.torrentsNames.Delete(c.GetTorrentFolder(t.Torrent))
go c.removeFromDB(id)
c.removeFromDB(id)
c.refreshListings()
}
}
@@ -570,7 +586,7 @@ func (c *Cache) DeleteTorrents(ids []string) {
if t, ok := c.torrents.Load(id); ok {
c.torrents.Delete(id)
c.torrentsNames.Delete(c.GetTorrentFolder(t.Torrent))
go c.removeFromDB(id)
c.removeFromDB(id)
}
}
c.refreshListings()
@@ -585,6 +601,10 @@ func (c *Cache) removeFromDB(torrentId string) {
func (c *Cache) OnRemove(torrentId string) {
c.logger.Debug().Msgf("OnRemove triggered for %s", torrentId)
go c.DeleteTorrent(torrentId)
go c.refreshListings()
c.DeleteTorrent(torrentId)
c.refreshListings()
}
func (c *Cache) GetLogger() zerolog.Logger {
return c.logger
}

View File

@@ -1,10 +1,12 @@
package debrid
import (
"context"
"fmt"
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/internal/request"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"golang.org/x/sync/errgroup"
"io"
"net/http"
"os"
@@ -12,7 +14,6 @@ import (
"slices"
"sort"
"strings"
"sync"
"time"
)
@@ -148,20 +149,28 @@ func (c *Cache) refreshTorrents() {
}
c.logger.Info().Msgf("Found %d new torrents", len(newTorrents))
// No need for a complex sync process, just add the new torrents
wg := sync.WaitGroup{}
wg.Add(len(newTorrents))
g, ctx := errgroup.WithContext(context.Background())
for _, t := range newTorrents {
// ProcessTorrent is concurrent safe
go func() {
defer wg.Done()
if err := c.ProcessTorrent(t, true); err != nil {
c.logger.Info().Err(err).Msg("Failed to process torrent")
t := t
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}()
if err := c.ProcessTorrent(t, true); err != nil {
return err
}
return nil
})
}
wg.Wait()
if err := g.Wait(); err != nil {
c.logger.Debug().Err(err).Msg("Failed to process new torrents")
}
}
func (c *Cache) RefreshRclone() error {

View File

@@ -41,22 +41,15 @@ func (c *Cache) IsTorrentBroken(t *CachedTorrent, filenames []string) bool {
break
} else {
// Check if file.Link not in the downloadLink Cache
if _, ok := c.downloadLinks.Load(f.Link); !ok {
// File not in cache
// Check link
if err := c.client.CheckLink(f.Link); err != nil {
if errors.Is(err, request.ErrLinkBroken) {
isBroken = true
break
} else {
// This might just be a temporary error
}
if err := c.client.CheckLink(f.Link); err != nil {
if errors.Is(err, request.ErrLinkBroken) {
isBroken = true
break
} else {
// Generate a new download link?
// This might just be a temporary error
}
} else {
// Link is in cache
// We might skip checking for now, it seems rd removes uncached links
// Generate a new download link?
}
}
}

View File

@@ -445,35 +445,20 @@ func (r *RealDebrid) GetTorrents() ([]*types.Torrent, error) {
}
// Prepare for concurrent fetching
var wg sync.WaitGroup
var mu sync.Mutex
var fetchError error
// Calculate how many more requests we need
batchCount := (remaining + limit - 1) / limit // ceiling division
for i := 1; i <= batchCount; i++ {
wg.Add(1)
go func(batchOffset int) {
defer wg.Done()
_, batch, err := r.getTorrents(batchOffset, limit)
if err != nil {
mu.Lock()
fetchError = err
mu.Unlock()
return
}
mu.Lock()
allTorrents = append(allTorrents, batch...)
mu.Unlock()
}(i * limit)
_, batch, err := r.getTorrents(i*limit, limit)
if err != nil {
fetchError = err
continue
}
allTorrents = append(allTorrents, batch...)
}
// Wait for all fetches to complete
wg.Wait()
if fetchError != nil {
return nil, fetchError
}

View File

@@ -164,6 +164,7 @@ func (r *Repair) preRunChecks() error {
if len(r.deb.Caches) == 0 {
return fmt.Errorf("no caches found")
}
return nil
}
// Check if zurg url is reachable
@@ -195,10 +196,9 @@ func (r *Repair) AddJob(arrsNames []string, mediaIDs []string, autoProcess, recu
job.Recurrent = recurrent
r.reset(job)
r.Jobs[key] = job
go r.saveToFile()
r.saveToFile()
go func() {
if err := r.repair(job); err != nil {
r.logger.Error().Err(err).Msg("Error running repair")
r.logger.Error().Err(err).Msg("Error running repair")
job.FailedAt = time.Now()
job.Error = err.Error()
@@ -453,9 +453,10 @@ func (r *Repair) isMediaAccessible(m arr.Content) bool {
}
firstFile := files[0]
r.logger.Debug().Msgf("Checking parent directory for %s", firstFile.Path)
if _, err := os.Stat(firstFile.Path); os.IsNotExist(err) {
return false
}
//if _, err := os.Stat(firstFile.Path); os.IsNotExist(err) {
// r.logger.Debug().Msgf("Parent directory not accessible for %s", firstFile.Path)
// return false
//}
// Check symlink parent directory
symlinkPath := getSymlinkTarget(firstFile.Path)
@@ -597,6 +598,7 @@ func (r *Repair) getWebdavBrokenFiles(media arr.Content) []arr.ContentFile {
if mountPath == "" {
continue
}
if filepath.Clean(mountPath) == filepath.Clean(dir) {
debridName = client.GetName()
break
@@ -615,7 +617,8 @@ func (r *Repair) getWebdavBrokenFiles(media arr.Content) []arr.ContentFile {
torrentName := filepath.Clean(filepath.Base(torrentPath))
torrent := cache.GetTorrentByName(torrentName)
if torrent == nil {
r.logger.Debug().Msgf("No torrent found for %s. Skipping", torrentName)
r.logger.Debug().Msgf("Torrent not found for %s. Marking as broken", torrentName)
brokenFiles = append(brokenFiles, f...)
continue
}
files := make([]string, 0)
@@ -774,5 +777,5 @@ func (r *Repair) DeleteJobs(ids []string) {
}
}
}
go r.saveToFile()
r.saveToFile()
}

View File

@@ -268,6 +268,9 @@
} else if (job.status === 'pending') {
status = 'Pending';
statusClass = 'text-warning';
} else if (job.status === "processing") {
status = 'Processing';
statusClass = 'text-info';
}
row.innerHTML = `
@@ -486,6 +489,9 @@
} else if (job.status === 'pending') {
status = 'Pending';
statusClass = 'text-warning';
} else if (job.status === "processing") {
status = 'Processing';
statusClass = 'text-info';
}
document.getElementById('modalJobStatus').innerHTML = `<span class="${statusClass}">${status}</span>`;

View File

@@ -12,10 +12,18 @@ import (
var sharedClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
Proxy: http.ProxyFromEnvironment,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 20,
MaxConnsPerHost: 50,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DisableKeepAlives: false,
},
Timeout: 0,
Timeout: 60 * time.Second,
}
type File struct {

View File

@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"github.com/go-chi/chi/v5"
"github.com/sirrobot01/debrid-blackhole/internal/logger"
"github.com/sirrobot01/debrid-blackhole/pkg/service"
"html/template"
"net/http"
@@ -23,7 +22,7 @@ func New() *WebDav {
ready: make(chan struct{}),
}
for name, c := range svc.Debrid.Caches {
h := NewHandler(name, c, logger.NewLogger(fmt.Sprintf("%s-webdav", name)))
h := NewHandler(name, c, c.GetLogger())
w.Handlers = append(w.Handlers, h)
}
return w

View File

@@ -37,24 +37,6 @@ func Start(ctx context.Context) error {
return nil
}
//func arrRefreshWorker(ctx context.Context, cfg *config.Config) {
// // Start Arr Refresh Worker
// _logger := getLogger()
// _logger.Debug().Msg("Refresh Worker started")
// refreshCtx := context.WithValue(ctx, "worker", "refresh")
// refreshTicker := time.NewTicker(time.Duration(cfg.QBitTorrent.RefreshInterval) * time.Second)
//
// for {
// select {
// case <-refreshCtx.Done():
// _logger.Debug().Msg("Refresh Worker stopped")
// return
// case <-refreshTicker.C:
// refreshArrs()
// }
// }
//}
func cleanUpQueuesWorker(ctx context.Context, cfg *config.Config) {
// Start Clean up Queues Worker
_logger := getLogger()
@@ -80,17 +62,6 @@ func cleanUpQueuesWorker(ctx context.Context, cfg *config.Config) {
}
}
//func refreshArrs() {
// for _, a := range service.GetService().Arr.GetAll() {
// err := a.Refresh()
// if err != nil {
// _logger := getLogger()
// _logger.Debug().Err(err).Msg("Error refreshing arr")
// return
// }
// }
//}
func cleanUpQueues() {
// Clean up queues
_logger := getLogger()