Performance improvements; import speedup

This commit is contained in:
Mukhtar Akere
2025-04-03 11:24:30 +01:00
parent 7d954052ae
commit 4659cd4273
16 changed files with 273 additions and 192 deletions

View File

@@ -32,6 +32,7 @@ RUN --mount=type=cache,target=/go/pkg/mod \
# Stage 2: Create directory structure
FROM alpine:3.19 as dirsetup
RUN mkdir -p /app/logs && \
mkdir -p /app/cache && \
chmod 777 /app/logs && \
touch /app/logs/decypharr.log && \
chmod 666 /app/logs/decypharr.log

View File

@@ -218,7 +218,10 @@ This is particularly useful if you want to use the Repair tool without using Qbi
The repair worker is a simple worker that checks for missing files in the Arrs(Sonarr, Radarr, etc). It's particularly useful for files either deleted by the Debrid provider or files with bad symlinks.
**Note**: If you're using zurg, set the `zurg_url` under repair config. This will speed up the repair process, exponentially.
**Notes**
- For those using the webdav server, set the `use_webdav` key to `true` in the debrid provider config. This will speed up the repair process, exponentially.
- For those using zurg, set the `zurg_url` under repair config. This will speed up the repair process, exponentially.
- Search for broken symlinks/files
- Search for missing files

View File

@@ -86,7 +86,8 @@
"enabled": false,
"interval": "12h",
"run_on_start": false,
"zurg_url": "http://zurg:9999",
"zurg_url": "",
"use_webdav": false,
"auto_process": false
},
"log_level": "info",

View File

@@ -11,7 +11,7 @@ var (
MUSICMATCH = "(?i)(\\.)(mp2|mp3|m4a|m4b|m4p|ogg|oga|opus|wma|wav|wv|flac|ape|aif|aiff|aifc)$"
)
var SAMPLEMATCH = `(?i)(^|[\\/]|\s|[._-])(sample|trailer|thumb|special|extras?)s?(\s|[._-]|$|/)`
var SAMPLEMATCH = `(?i)(^|[\\/]|\s|[(])(?:sample|trailer|thumb|special|extras?)s?([\s._\-/)]|$)`
func RegexMatch(regex string, value string) bool {
re := regexp.MustCompile(regex)

12
main.go
View File

@@ -5,7 +5,10 @@ import (
"flag"
"github.com/sirrobot01/debrid-blackhole/cmd/decypharr"
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/pkg/version"
"log"
"net/http"
_ "net/http/pprof" // registers pprof handlers
"os"
"os/signal"
"runtime/debug"
@@ -19,6 +22,15 @@ func main() {
debug.PrintStack()
}
}()
if version.GetInfo().Channel == "dev" {
log.Println("Running in dev mode")
go func() {
if err := http.ListenAndServe(":6060", nil); err != nil {
log.Fatalf("pprof server failed: %v", err)
}
}()
}
var configPath string
flag.StringVar(&configPath, "config", "/data", "path to the data folder")
flag.Parse()

View File

@@ -137,6 +137,10 @@ func (a *Arr) CleanupQueue() error {
isMessedUp = true
break
}
if strings.Contains(m.Title, "One or more episodes expected in this release were not imported or missing from the release") {
isMessedUp = true
break
}
}
}
}

View File

@@ -228,7 +228,7 @@ func (ad *AllDebrid) CheckStatus(torrent *types.Torrent, isSymlink bool) (*types
}
// Break out of the loop if the torrent is downloading.
// This is necessary to prevent infinite loop since we moved to sync downloading and async processing
break
return torrent, nil
} else {
return torrent, fmt.Errorf("torrent: %s has error", torrent.Name)
}

View File

@@ -126,7 +126,7 @@ func New(dc config.Debrid, client types.Client) *Cache {
folderNaming: WebDavFolderNaming(dc.FolderNaming),
autoExpiresLinksAfter: autoExpiresLinksAfter,
repairsInProgress: xsync.NewMapOf[string, bool](),
saveSemaphore: make(chan struct{}, 10),
saveSemaphore: make(chan struct{}, 50),
ctx: context.Background(),
}
}
@@ -159,53 +159,6 @@ func (c *Cache) Start(ctx context.Context) error {
return nil
}
func (c *Cache) GetTorrentFolder(torrent *types.Torrent) string {
switch c.folderNaming {
case WebDavUseFileName:
return torrent.Filename
case WebDavUseOriginalName:
return torrent.OriginalFilename
case WebDavUseFileNameNoExt:
return utils.RemoveExtension(torrent.Filename)
case WebDavUseOriginalNameNoExt:
return utils.RemoveExtension(torrent.OriginalFilename)
case WebDavUseID:
return torrent.Id
default:
return torrent.Filename
}
}
func (c *Cache) setTorrent(t *CachedTorrent) {
c.torrents.Store(t.Id, t)
c.torrentsNames.Store(c.GetTorrentFolder(t.Torrent), t)
c.SaveTorrent(t)
}
func (c *Cache) setTorrents(torrents map[string]*CachedTorrent) {
for _, t := range torrents {
c.torrents.Store(t.Id, t)
c.torrentsNames.Store(c.GetTorrentFolder(t.Torrent), t)
}
c.refreshListings()
c.SaveTorrents()
}
func (c *Cache) GetListing() []os.FileInfo {
if v, ok := c.listings.Load().([]os.FileInfo); ok {
return v
}
return nil
}
func (c *Cache) Close() error {
return nil
}
func (c *Cache) load() (map[string]*CachedTorrent, error) {
torrents := make(map[string]*CachedTorrent)
if err := os.MkdirAll(c.dir, 0755); err != nil {
@@ -238,13 +191,11 @@ func (c *Cache) load() (map[string]*CachedTorrent, error) {
}
isComplete := true
if len(ct.Files) != 0 {
// We can assume the torrent is complete
// Check if all files are valid, if not, delete the file.json and remove from cache.
for _, f := range ct.Files {
if f.Link == "" {
c.logger.Debug().Msgf("Torrent %s is not complete, missing link for file %s", ct.Id, f.Name)
if !f.IsValid() {
isComplete = false
continue
break
}
}
if isComplete {
@@ -255,6 +206,9 @@ func (c *Cache) load() (map[string]*CachedTorrent, error) {
ct.AddedOn = addedOn
ct.IsComplete = true
torrents[ct.Id] = &ct
} else {
// Delete the file if it's not complete
_ = os.Remove(filePath)
}
}
@@ -263,100 +217,6 @@ func (c *Cache) load() (map[string]*CachedTorrent, error) {
return torrents, nil
}
func (c *Cache) GetTorrents() map[string]*CachedTorrent {
torrents := make(map[string]*CachedTorrent)
c.torrents.Range(func(key string, value *CachedTorrent) bool {
torrents[key] = value
return true
})
return torrents
}
func (c *Cache) GetTorrent(id string) *CachedTorrent {
if t, ok := c.torrents.Load(id); ok {
return t
}
return nil
}
func (c *Cache) GetTorrentByName(name string) *CachedTorrent {
if t, ok := c.torrentsNames.Load(name); ok {
return t
}
return nil
}
func (c *Cache) SaveTorrents() {
c.torrents.Range(func(key string, value *CachedTorrent) bool {
c.SaveTorrent(value)
return true
})
}
func (c *Cache) SaveTorrent(ct *CachedTorrent) {
// Try to acquire semaphore without blocking
select {
case c.saveSemaphore <- struct{}{}:
go func() {
defer func() { <-c.saveSemaphore }()
c.saveTorrent(ct)
}()
default:
go c.saveTorrent(ct) // If the semaphore is full, just run the save in the background
}
}
func (c *Cache) saveTorrent(ct *CachedTorrent) {
data, err := json.MarshalIndent(ct, "", " ")
if err != nil {
c.logger.Debug().Err(err).Msgf("Failed to marshal torrent: %s", ct.Id)
return
}
fileName := ct.Torrent.Id + ".json"
filePath := filepath.Join(c.dir, fileName)
// Use a unique temporary filename for concurrent safety
tmpFile := filePath + ".tmp." + strconv.FormatInt(time.Now().UnixNano(), 10)
f, err := os.Create(tmpFile)
if err != nil {
c.logger.Debug().Err(err).Msgf("Failed to create file: %s", tmpFile)
return
}
// Track if we've closed the file
fileClosed := false
defer func() {
// Only close if not already closed
if !fileClosed {
_ = f.Close()
}
// Clean up the temp file if it still exists and rename failed
_ = os.Remove(tmpFile)
}()
w := bufio.NewWriter(f)
if _, err := w.Write(data); err != nil {
c.logger.Debug().Err(err).Msgf("Failed to write data: %s", tmpFile)
return
}
if err := w.Flush(); err != nil {
c.logger.Debug().Err(err).Msgf("Failed to flush data: %s", tmpFile)
return
}
// Close the file before renaming
_ = f.Close()
fileClosed = true
if err := os.Rename(tmpFile, filePath); err != nil {
c.logger.Debug().Err(err).Msgf("Failed to rename file: %s", tmpFile)
return
}
}
func (c *Cache) Sync() error {
cachedTorrents, err := c.load()
if err != nil {
@@ -414,7 +274,7 @@ func (c *Cache) Sync() error {
func (c *Cache) sync(torrents []*types.Torrent) error {
// Create channels with appropriate buffering
workChan := make(chan *types.Torrent, min(1000, len(torrents)))
workChan := make(chan *types.Torrent, min(c.workers, len(torrents)))
// Use an atomic counter for progress tracking
var processed int64
@@ -474,6 +334,157 @@ func (c *Cache) sync(torrents []*types.Torrent) error {
return nil
}
func (c *Cache) GetTorrentFolder(torrent *types.Torrent) string {
switch c.folderNaming {
case WebDavUseFileName:
return torrent.Filename
case WebDavUseOriginalName:
return torrent.OriginalFilename
case WebDavUseFileNameNoExt:
return utils.RemoveExtension(torrent.Filename)
case WebDavUseOriginalNameNoExt:
return utils.RemoveExtension(torrent.OriginalFilename)
case WebDavUseID:
return torrent.Id
default:
return torrent.Filename
}
}
func (c *Cache) setTorrent(t *CachedTorrent) {
c.torrents.Store(t.Id, t)
c.torrentsNames.Store(c.GetTorrentFolder(t.Torrent), t)
c.SaveTorrent(t)
}
func (c *Cache) setTorrents(torrents map[string]*CachedTorrent) {
for _, t := range torrents {
c.torrents.Store(t.Id, t)
c.torrentsNames.Store(c.GetTorrentFolder(t.Torrent), t)
}
c.refreshListings()
c.SaveTorrents()
}
func (c *Cache) GetListing() []os.FileInfo {
if v, ok := c.listings.Load().([]os.FileInfo); ok {
return v
}
return nil
}
func (c *Cache) Close() error {
return nil
}
func (c *Cache) GetTorrents() map[string]*CachedTorrent {
torrents := make(map[string]*CachedTorrent)
c.torrents.Range(func(key string, value *CachedTorrent) bool {
torrents[key] = value
return true
})
return torrents
}
func (c *Cache) GetTorrent(id string) *CachedTorrent {
if t, ok := c.torrents.Load(id); ok {
return t
}
return nil
}
func (c *Cache) GetTorrentByName(name string) *CachedTorrent {
if t, ok := c.torrentsNames.Load(name); ok {
return t
}
return nil
}
func (c *Cache) SaveTorrents() {
c.torrents.Range(func(key string, value *CachedTorrent) bool {
c.SaveTorrent(value)
return true
})
}
func (c *Cache) SaveTorrent(ct *CachedTorrent) {
marshaled, err := json.MarshalIndent(ct, "", " ")
if err != nil {
c.logger.Debug().Err(err).Msgf("Failed to marshal torrent: %s", ct.Id)
return
}
// Store just the essential info needed for the file operation
saveInfo := struct {
id string
jsonData []byte
}{
id: ct.Torrent.Id,
jsonData: marshaled,
}
// Try to acquire semaphore without blocking
select {
case c.saveSemaphore <- struct{}{}:
go func() {
defer func() { <-c.saveSemaphore }()
c.saveTorrent(saveInfo.id, saveInfo.jsonData)
}()
default:
c.saveTorrent(saveInfo.id, saveInfo.jsonData)
}
}
func (c *Cache) saveTorrent(id string, data []byte) {
fileName := id + ".json"
filePath := filepath.Join(c.dir, fileName)
// Use a unique temporary filename for concurrent safety
tmpFile := filePath + ".tmp." + strconv.FormatInt(time.Now().UnixNano(), 10)
f, err := os.Create(tmpFile)
if err != nil {
c.logger.Debug().Err(err).Msgf("Failed to create file: %s", tmpFile)
return
}
// Track if we've closed the file
fileClosed := false
defer func() {
// Only close if not already closed
if !fileClosed {
_ = f.Close()
}
// Clean up the temp file if it still exists and rename failed
_ = os.Remove(tmpFile)
}()
w := bufio.NewWriter(f)
if _, err := w.Write(data); err != nil {
c.logger.Debug().Err(err).Msgf("Failed to write data: %s", tmpFile)
return
}
if err := w.Flush(); err != nil {
c.logger.Debug().Err(err).Msgf("Failed to flush data: %s", tmpFile)
return
}
// Close the file before renaming
_ = f.Close()
fileClosed = true
if err := os.Rename(tmpFile, filePath); err != nil {
c.logger.Debug().Err(err).Msgf("Failed to rename file: %s", tmpFile)
return
}
}
func (c *Cache) ProcessTorrent(t *types.Torrent, refreshRclone bool) error {
if len(t.Files) == 0 {
if err := c.client.UpdateTorrent(t); err != nil {
@@ -481,15 +492,21 @@ func (c *Cache) ProcessTorrent(t *types.Torrent, refreshRclone bool) error {
}
}
// Validate each file in the torrent
isComplete := true
for _, file := range t.Files {
if file.Link == "" {
c.logger.Debug().Msgf("Torrent %s is not complete, missing link for file %s. Triggering a reinsert", t.Id, file.Name)
isComplete = false
continue
}
}
if !isComplete {
c.logger.Debug().Msgf("Torrent %s is not complete, missing link for file %s. Triggering a reinsert", t.Id)
if err := c.ReInsertTorrent(t); err != nil {
c.logger.Error().Err(err).Msgf("Failed to reinsert torrent %s", t.Id)
return fmt.Errorf("failed to reinsert torrent: %w", err)
}
}
}
addedOn, err := time.Parse(time.RFC3339, t.Added)
if err != nil {
@@ -535,6 +552,8 @@ func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) string {
downloadLink, err := c.client.GetDownloadLink(ct.Torrent, &file)
if err != nil {
if errors.Is(err, request.HosterUnavailableError) {
c.logger.Debug().Err(err).Msgf("Hoster is unavailable for %s/%s", ct.Name, filename)
return ""
// This code is commented iut due to the fact that if a torrent link is uncached, it's likely that we can't redownload it again
// Do not attempt to repair the torrent if the hoster is unavailable
// Check link here??
@@ -565,8 +584,10 @@ func (c *Cache) GetDownloadLink(torrentId, filename, fileLink string) string {
file.Generated = time.Now()
ct.Files[filename] = file
go c.updateDownloadLink(file.Link, downloadLink)
go c.setTorrent(ct)
go func() {
c.updateDownloadLink(file.Link, downloadLink)
c.setTorrent(ct)
}()
return file.DownloadLink
}

View File

@@ -63,7 +63,7 @@ func (c *Cache) refreshListings() {
c.listings.Store(files)
_ = c.refreshXml()
if err := c.RefreshRclone(); err != nil {
c.logger.Debug().Err(err).Msg("Failed to refresh rclone")
c.logger.Trace().Err(err).Msg("Failed to refresh rclone") // silent error
}
}

View File

@@ -131,6 +131,7 @@ func (c *Cache) ReInsertTorrent(torrent *types.Torrent) error {
oldID := torrent.Id
defer c.repairsInProgress.Delete(oldID)
defer c.DeleteTorrent(oldID)
// Submit the magnet to the debrid service
torrent.Id = ""
@@ -138,7 +139,6 @@ func (c *Cache) ReInsertTorrent(torrent *types.Torrent) error {
torrent, err = c.client.SubmitMagnet(torrent)
if err != nil {
// Remove the old torrent from the cache and debrid service
_ = c.DeleteTorrent(oldID)
return fmt.Errorf("failed to submit magnet: %w", err)
}
@@ -150,21 +150,24 @@ func (c *Cache) ReInsertTorrent(torrent *types.Torrent) error {
torrent, err = c.client.CheckStatus(torrent, true)
if err != nil && torrent != nil {
// Torrent is likely in progress
// Delete the old and new torrent
_ = c.DeleteTorrent(oldID)
_ = c.DeleteTorrent(torrent.Id)
return fmt.Errorf("failed to check status: %w", err)
}
if err := c.DeleteTorrent(oldID); err != nil {
return fmt.Errorf("failed to delete old torrent: %w", err)
}
if torrent == nil {
return fmt.Errorf("failed to check status: empty torrent")
}
for _, file := range torrent.Files {
if file.Link == "" {
c.logger.Debug().Msgf("Torrent %s is still not complete, missing link for file %s.", torrent.Name, file.Name)
// Delete the torrent from the cache
_ = c.DeleteTorrent(torrent.Id)
return fmt.Errorf("torrent %s is still not complete, missing link for file %s", torrent.Name, file.Name)
}
}
// Update the torrent in the cache
addedOn, err := time.Parse(time.RFC3339, torrent.Added)
if err != nil {

View File

@@ -215,7 +215,7 @@ func (dl *DebridLink) CheckStatus(torrent *types.Torrent, isSymlink bool) (*type
}
// Break out of the loop if the torrent is downloading.
// This is necessary to prevent infinite loop since we moved to sync downloading and async processing
break
return torrent, nil
} else {
return torrent, fmt.Errorf("torrent: %s has error", torrent.Name)
}

View File

@@ -244,6 +244,7 @@ func (r *RealDebrid) CheckStatus(t *types.Torrent, isSymlink bool) (*types.Torre
if !t.DownloadUncached {
return t, fmt.Errorf("torrent: %s not cached", t.Name)
}
return t, nil
} else {
return t, fmt.Errorf("torrent: %s has error: %s", t.Name, status)
}

View File

@@ -254,7 +254,7 @@ func (tb *Torbox) CheckStatus(torrent *types.Torrent, isSymlink bool) (*types.To
}
// Break out of the loop if the torrent is downloading.
// This is necessary to prevent infinite loop since we moved to sync downloading and async processing
break
return torrent, nil
} else {
return torrent, fmt.Errorf("torrent: %s has error", torrent.Name)
}

View File

@@ -2,6 +2,7 @@ package types
import (
"fmt"
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/internal/logger"
"github.com/sirrobot01/debrid-blackhole/internal/utils"
"github.com/sirrobot01/debrid-blackhole/pkg/arr"
@@ -80,6 +81,26 @@ type File struct {
Generated time.Time `json:"generated"`
}
func (f *File) IsValid() bool {
cfg := config.Get()
name := filepath.Base(f.Path)
if utils.IsSampleFile(f.Path) {
return false
}
if !cfg.IsAllowedFile(name) {
return false
}
if !cfg.IsSizeAllowed(f.Size) {
return false
}
if f.Link == "" {
return false
}
return true
}
func (t *Torrent) Cleanup(remove bool) {
if remove {
err := os.Remove(t.Filename)

View File

@@ -163,6 +163,7 @@ func (q *QBit) createSymlinks(debridTorrent *debrid.Torrent, rclonePath, torrent
}
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
filePaths := make([]string, 0, len(pending))
for len(pending) > 0 {
<-ticker.C
@@ -170,11 +171,24 @@ func (q *QBit) createSymlinks(debridTorrent *debrid.Torrent, rclonePath, torrent
fullFilePath := filepath.Join(rclonePath, file.Path)
if _, err := os.Stat(fullFilePath); !os.IsNotExist(err) {
q.logger.Info().Msgf("File is ready: %s", file.Path)
q.createSymLink(torrentSymlinkPath, rclonePath, file)
_filePath := q.createSymLink(torrentSymlinkPath, rclonePath, file)
filePaths = append(filePaths, _filePath)
delete(pending, path)
}
}
}
if q.SkipPreCache {
return torrentSymlinkPath, nil
}
go func() {
if err := q.preCacheFile(debridTorrent.Name, filePaths); err != nil {
q.logger.Error().Msgf("Failed to pre-cache file: %s", err)
}
}() // Pre-cache the files in the background
// Pre-cache the first 256KB and 1MB of the file
return torrentSymlinkPath, nil
}
@@ -189,7 +203,7 @@ func (q *QBit) getTorrentPath(rclonePath string, debridTorrent *debrid.Torrent)
}
}
func (q *QBit) createSymLink(path string, torrentMountPath string, file debrid.File) {
func (q *QBit) createSymLink(path string, torrentMountPath string, file debrid.File) string {
// Combine the directory and filename to form a full path
fullPath := filepath.Join(path, file.Name) // /mnt/symlinks/{category}/MyTVShow/MyTVShow.S01E01.720p.mkv
@@ -200,28 +214,27 @@ func (q *QBit) createSymLink(path string, torrentMountPath string, file debrid.F
// It's okay if the symlink already exists
q.logger.Debug().Msgf("Failed to create symlink: %s: %v", fullPath, err)
}
if q.SkipPreCache {
return torrentFilePath
}
func (q *QBit) preCacheFile(name string, filePaths []string) error {
q.logger.Trace().Msgf("Pre-caching file: %s", name)
if len(filePaths) == 0 {
return fmt.Errorf("no file paths provided")
}
for _, filePath := range filePaths {
func() {
file, err := os.Open(filePath)
defer file.Close()
if err != nil {
return
}
go func() {
err := q.preCacheFile(torrentFilePath)
if err != nil {
q.logger.Debug().Msgf("Failed to pre-cache file: %s: %v", torrentFilePath, err)
}
}()
}
func (q *QBit) preCacheFile(filePath string) error {
q.logger.Trace().Msgf("Pre-caching file: %s", filePath)
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("error opening file: %v", err)
}
defer file.Close()
// Pre-cache the file header (first 256KB) using 16KB chunks.
q.readSmallChunks(file, 0, 256*1024, 16*1024)
q.readSmallChunks(file, 1024*1024, 64*1024, 16*1024)
return
}()
}
return nil
}

View File

@@ -124,8 +124,9 @@ func (q *QBit) ProcessFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr
q.MarkAsFailed(torrent)
return
}
rclonePath := filepath.Join(debridTorrent.MountPath, cache.GetTorrentFolder(debridTorrent))
torrentSymlinkPath, err = q.createSymlinks(debridTorrent, rclonePath, debridTorrent.Name)
rclonePath := filepath.Join(debridTorrent.MountPath, cache.GetTorrentFolder(debridTorrent)) // /mnt/remote/realdebrid/MyTVShow
torrentFolderNoExt := utils.RemoveExtension(debridTorrent.Name)
torrentSymlinkPath, err = q.createSymlinks(debridTorrent, rclonePath, torrentFolderNoExt) // /mnt/symlinks/{category}/MyTVShow/
} else {
// User is using either zurg or debrid webdav