From 8d494fc27740d45531f42ade1ceb0488a910b54c Mon Sep 17 00:00:00 2001 From: Mukhtar Akere Date: Fri, 21 Mar 2025 04:10:16 +0100 Subject: [PATCH] Update repair; fix minor bugs with namings --- go.mod | 1 + go.sum | 2 + internal/logger/logger.go | 3 + internal/request/request.go | 19 ++ pkg/debrid/alldebrid/alldebrid.go | 6 +- pkg/debrid/debrid/cache.go | 5 +- pkg/debrid/debrid/refresh.go | 28 +-- pkg/debrid/debrid/{workers.go => worker.go} | 0 pkg/debrid/debrid/xml.go | 118 +++++++++++ pkg/debrid/debrid_link/debrid_link.go | 6 +- pkg/debrid/realdebrid/realdebrid.go | 14 +- pkg/debrid/torbox/torbox.go | 8 +- pkg/debrid/types/debrid.go | 2 +- pkg/qbit/import.go | 2 +- pkg/qbit/torrent.go | 28 +-- pkg/repair/repair.go | 218 ++++++++++++++++++-- pkg/service/service.go | 4 +- pkg/web/server.go | 25 ++- pkg/web/web/repair.html | 14 +- pkg/webdav/file.go | 77 +++++-- pkg/webdav/handler.go | 26 +-- 21 files changed, 455 insertions(+), 151 deletions(-) rename pkg/debrid/debrid/{workers.go => worker.go} (100%) create mode 100644 pkg/debrid/debrid/xml.go diff --git a/go.mod b/go.mod index 852c34c..b8852ee 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( require ( github.com/anacrolix/missinggo v1.3.0 // indirect github.com/anacrolix/missinggo/v2 v2.7.3 // indirect + github.com/beevik/etree v1.5.0 // indirect github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect diff --git a/go.sum b/go.sum index 32e6b7a..72b0544 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,8 @@ github.com/anacrolix/tagflag v1.1.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CM github.com/anacrolix/torrent v1.55.0 h1:s9yh/YGdPmbN9dTa+0Inh2dLdrLQRvEAj1jdFW/Hdd8= github.com/anacrolix/torrent v1.55.0/go.mod h1:sBdZHBSZNj4de0m+EbYg7vvs/G/STubxu/GzzNbojsE= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= +github.com/beevik/etree v1.5.0 h1:iaQZFSDS+3kYZiGoc9uKeOkUY3nYMXOKLl6KIJxiJWs= +github.com/beevik/etree v1.5.0/go.mod h1:gPNJNaBGVZ9AwsidazFZyygnd+0pAU38N4D+WemwKNs= github.com/benbjohnson/immutable v0.2.0/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= diff --git a/internal/logger/logger.go b/internal/logger/logger.go index 5ad3229..0a6b18a 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -73,6 +73,7 @@ func NewLogger(prefix string) zerolog.Logger { Level(zerolog.InfoLevel) // Set the log level + level = strings.ToLower(level) switch level { case "debug": logger = logger.Level(zerolog.DebugLevel) @@ -82,6 +83,8 @@ func NewLogger(prefix string) zerolog.Logger { logger = logger.Level(zerolog.WarnLevel) case "error": logger = logger.Level(zerolog.ErrorLevel) + case "trace": + logger = logger.Level(zerolog.TraceLevel) } return logger } diff --git a/internal/request/request.go b/internal/request/request.go index d59cdac..2643d94 100644 --- a/internal/request/request.go +++ b/internal/request/request.go @@ -2,6 +2,7 @@ package request import ( "bytes" + "compress/gzip" "context" "crypto/tls" "fmt" @@ -288,3 +289,21 @@ func JSONResponse(w http.ResponseWriter, data interface{}, code int) { return } } + +func Gzip(body []byte) []byte { + + var b bytes.Buffer + if len(body) == 0 { + return nil + } + gz := gzip.NewWriter(&b) + _, err := gz.Write(body) + if err != nil { + return nil + } + err = gz.Close() + if err != nil { + return nil + } + return b.Bytes() +} diff --git a/pkg/debrid/alldebrid/alldebrid.go b/pkg/debrid/alldebrid/alldebrid.go index c1f55bb..0ee2708 100644 --- a/pkg/debrid/alldebrid/alldebrid.go +++ b/pkg/debrid/alldebrid/alldebrid.go @@ -204,12 +204,12 @@ func (ad *AllDebrid) CheckStatus(torrent *types.Torrent, isSymlink bool) (*types return torrent, nil } -func (ad *AllDebrid) DeleteTorrent(torrent *types.Torrent) { - url := fmt.Sprintf("%s/magnet/delete?id=%s", ad.Host, torrent.Id) +func (ad *AllDebrid) DeleteTorrent(torrentId string) { + url := fmt.Sprintf("%s/magnet/delete?id=%s", ad.Host, torrentId) req, _ := http.NewRequest(http.MethodGet, url, nil) _, err := ad.client.MakeRequest(req) if err == nil { - ad.logger.Info().Msgf("Torrent: %s deleted", torrent.Name) + ad.logger.Info().Msgf("Torrent: %s deleted", torrentId) } else { ad.logger.Info().Msgf("Error deleting torrent: %s", err) } diff --git a/pkg/debrid/debrid/cache.go b/pkg/debrid/debrid/cache.go index abcc804..be51e9d 100644 --- a/pkg/debrid/debrid/cache.go +++ b/pkg/debrid/debrid/cache.go @@ -208,7 +208,7 @@ func (c *Cache) load() (map[string]*CachedTorrent, error) { if len(ct.Files) != 0 { // We can assume the torrent is complete ct.IsComplete = true - ct.Torrent.Name = utils.RemoveExtension(ct.Torrent.Filename) // Update the name + ct.Torrent.Name = utils.RemoveExtension(ct.Torrent.OriginalFilename) // Update the name torrents[ct.Id] = &ct } } @@ -403,9 +403,6 @@ func (c *Cache) ProcessTorrent(t *types.Torrent, refreshRclone bool) error { IsComplete: len(t.Files) > 0, } c.setTorrent(ct) - if err := c.RefreshRclone(); err != nil { - c.logger.Debug().Err(err).Msg("Failed to refresh rclone") - } return nil } diff --git a/pkg/debrid/debrid/refresh.go b/pkg/debrid/debrid/refresh.go index abe0b39..07d9752 100644 --- a/pkg/debrid/debrid/refresh.go +++ b/pkg/debrid/debrid/refresh.go @@ -7,8 +7,6 @@ import ( "github.com/sirrobot01/debrid-blackhole/pkg/debrid/types" "net/http" "os" - "path" - "path/filepath" "sort" "sync" "time" @@ -42,7 +40,7 @@ func (c *Cache) refreshListings() { } // Atomic store of the complete ready-to-use slice c.listings.Store(files) - c.resetPropfindResponse() + _ = c.RefreshXml() if err := c.RefreshRclone(); err != nil { c.logger.Debug().Err(err).Msg("Failed to refresh rclone") } @@ -179,27 +177,3 @@ func (c *Cache) refreshDownloadLinks() { c.downloadLinks[k] = v.DownloadLink } } - -func (c *Cache) resetPropfindResponse() { - // Right now, parents are hardcoded - parents := []string{"__all__", "torrents"} - // Reset only the parent directories - // Convert the parents to a keys - // This is a bit hacky, but it works - // Instead of deleting all the keys, we only delete the parent keys, e.g __all__/ or torrents/ - keys := make([]string, 0, len(parents)) - for _, p := range parents { - // Construct the key - // construct url - url := filepath.Join("/webdav", c.client.GetName(), p) - url = path.Clean(url) - key0 := fmt.Sprintf("propfind:%s:0", url) - key1 := fmt.Sprintf("propfind:%s:1", url) - keys = append(keys, key0, key1) - } - - // Delete the keys - for _, k := range keys { - c.PropfindResp.Delete(k) - } -} diff --git a/pkg/debrid/debrid/workers.go b/pkg/debrid/debrid/worker.go similarity index 100% rename from pkg/debrid/debrid/workers.go rename to pkg/debrid/debrid/worker.go diff --git a/pkg/debrid/debrid/xml.go b/pkg/debrid/debrid/xml.go new file mode 100644 index 0000000..be3938b --- /dev/null +++ b/pkg/debrid/debrid/xml.go @@ -0,0 +1,118 @@ +package debrid + +import ( + "fmt" + "github.com/beevik/etree" + "github.com/sirrobot01/debrid-blackhole/internal/request" + "net/http" + "net/url" + path "path/filepath" + "time" +) + +func (c *Cache) RefreshXml() error { + parents := []string{"__all__", "torrents"} + for _, parent := range parents { + if err := c.refreshParentXml(parent); err != nil { + return fmt.Errorf("failed to refresh XML for %s: %v", parent, err) + } + } + + c.logger.Debug().Msgf("Refreshed XML cache for %s", c.client.GetName()) + return nil +} + +func (c *Cache) refreshParentXml(parent string) error { + // Define the WebDAV namespace + davNS := "DAV:" + + // Create the root multistatus element + doc := etree.NewDocument() + doc.CreateProcInst("xml", `version="1.0" encoding="UTF-8"`) + + multistatus := doc.CreateElement("D:multistatus") + multistatus.CreateAttr("xmlns:D", davNS) + + // Get the current timestamp in RFC1123 format (WebDAV format) + currentTime := time.Now().UTC().Format(http.TimeFormat) + + // Add the parent directory + parentPath := fmt.Sprintf("/webdav/%s/%s/", c.client.GetName(), parent) + addDirectoryResponse(multistatus, parentPath, parent, currentTime) + + // Add torrents to the XML + torrents := c.GetListing() + for _, torrent := range torrents { + torrentName := torrent.Name() + torrentPath := fmt.Sprintf("/webdav/%s/%s/%s/", + c.client.GetName(), + url.PathEscape(torrentName), + parent, + ) + + addDirectoryResponse(multistatus, torrentPath, torrentName, currentTime) + } + + // Convert to XML string + xmlData, err := doc.WriteToBytes() + if err != nil { + return fmt.Errorf("failed to generate XML: %v", err) + } + + // Store in cache + // Construct the keys + baseUrl := path.Clean(fmt.Sprintf("/webdav/%s/%s", c.client.GetName())) + key0 := fmt.Sprintf("propfind:%s:0", baseUrl) + key1 := fmt.Sprintf("propfind:%s:1", baseUrl) + + res := PropfindResponse{ + Data: xmlData, + GzippedData: request.Gzip(xmlData), + Ts: time.Now(), + } + c.PropfindResp.Store(key0, res) + c.PropfindResp.Store(key1, res) + return nil +} + +func addDirectoryResponse(multistatus *etree.Element, href, displayName, modTime string) *etree.Element { + responseElem := multistatus.CreateElement("D:response") + + // Add href + hrefElem := responseElem.CreateElement("D:href") + hrefElem.SetText(href) + + // Add propstat + propstatElem := responseElem.CreateElement("D:propstat") + + // Add prop + propElem := propstatElem.CreateElement("D:prop") + + // Add resource type (collection = directory) + resourceTypeElem := propElem.CreateElement("D:resourcetype") + resourceTypeElem.CreateElement("D:collection") + + // Add display name + displayNameElem := propElem.CreateElement("D:displayname") + displayNameElem.SetText(displayName) + + // Add last modified time + lastModElem := propElem.CreateElement("D:getlastmodified") + lastModElem.SetText(modTime) + + // Add supported lock + lockElem := propElem.CreateElement("D:supportedlock") + lockEntryElem := lockElem.CreateElement("D:lockentry") + + lockScopeElem := lockEntryElem.CreateElement("D:lockscope") + lockScopeElem.CreateElement("D:exclusive") + + lockTypeElem := lockEntryElem.CreateElement("D:locktype") + lockTypeElem.CreateElement("D:write") + + // Add status + statusElem := propstatElem.CreateElement("D:status") + statusElem.SetText("HTTP/1.1 200 OK") + + return responseElem +} diff --git a/pkg/debrid/debrid_link/debrid_link.go b/pkg/debrid/debrid_link/debrid_link.go index fd19172..8027be4 100644 --- a/pkg/debrid/debrid_link/debrid_link.go +++ b/pkg/debrid/debrid_link/debrid_link.go @@ -223,12 +223,12 @@ func (dl *DebridLink) CheckStatus(torrent *types.Torrent, isSymlink bool) (*type return torrent, nil } -func (dl *DebridLink) DeleteTorrent(torrent *types.Torrent) { - url := fmt.Sprintf("%s/seedbox/%s/remove", dl.Host, torrent.Id) +func (dl *DebridLink) DeleteTorrent(torrentId string) { + url := fmt.Sprintf("%s/seedbox/%s/remove", dl.Host, torrentId) req, _ := http.NewRequest(http.MethodDelete, url, nil) _, err := dl.client.MakeRequest(req) if err == nil { - dl.logger.Info().Msgf("Torrent: %s deleted", torrent.Name) + dl.logger.Info().Msgf("Torrent: %s deleted", torrentId) } else { dl.logger.Info().Msgf("Error deleting torrent: %s", err) } diff --git a/pkg/debrid/realdebrid/realdebrid.go b/pkg/debrid/realdebrid/realdebrid.go index fa94275..8838ab9 100644 --- a/pkg/debrid/realdebrid/realdebrid.go +++ b/pkg/debrid/realdebrid/realdebrid.go @@ -173,7 +173,7 @@ func (r *RealDebrid) UpdateTorrent(t *types.Torrent) error { if err != nil { return err } - name := utils.RemoveInvalidChars(data.OriginalFilename) + name := utils.RemoveExtension(data.OriginalFilename) t.Name = name t.Bytes = data.Bytes t.Folder = name @@ -182,7 +182,7 @@ func (r *RealDebrid) UpdateTorrent(t *types.Torrent) error { t.Speed = data.Speed t.Seeders = data.Seeders t.Filename = data.Filename - t.OriginalFilename = data.OriginalFilename + t.OriginalFilename = name t.Links = data.Links t.MountPath = r.MountPath t.Debrid = r.Name @@ -208,7 +208,7 @@ func (r *RealDebrid) CheckStatus(t *types.Torrent, isSymlink bool) (*types.Torre t.Name = name // Important because some magnet changes the name t.Folder = name t.Filename = data.Filename - t.OriginalFilename = data.OriginalFilename + t.OriginalFilename = name t.Bytes = data.Bytes t.Progress = data.Progress t.Speed = data.Speed @@ -257,12 +257,12 @@ func (r *RealDebrid) CheckStatus(t *types.Torrent, isSymlink bool) (*types.Torre return t, nil } -func (r *RealDebrid) DeleteTorrent(torrent *types.Torrent) { - url := fmt.Sprintf("%s/torrents/delete/%s", r.Host, torrent.Id) +func (r *RealDebrid) DeleteTorrent(torrentId string) { + url := fmt.Sprintf("%s/torrents/delete/%s", r.Host, torrentId) req, _ := http.NewRequest(http.MethodDelete, url, nil) _, err := r.client.MakeRequest(req) if err == nil { - r.logger.Info().Msgf("Torrent: %s deleted", torrent.Name) + r.logger.Info().Msgf("Torrent: %s deleted", torrentId) } else { r.logger.Info().Msgf("Error deleting torrent: %s", err) } @@ -382,7 +382,7 @@ func (r *RealDebrid) getTorrents(offset int, limit int) (int, []*types.Torrent, } torrents = append(torrents, &types.Torrent{ Id: t.Id, - Name: utils.RemoveInvalidChars(t.Filename), + Name: utils.RemoveInvalidChars(t.Filename), // This changes when we get the files Bytes: t.Bytes, Progress: t.Progress, Status: t.Status, diff --git a/pkg/debrid/torbox/torbox.go b/pkg/debrid/torbox/torbox.go index ce83f2d..9c5e31e 100644 --- a/pkg/debrid/torbox/torbox.go +++ b/pkg/debrid/torbox/torbox.go @@ -232,14 +232,14 @@ func (tb *Torbox) CheckStatus(torrent *types.Torrent, isSymlink bool) (*types.To return torrent, nil } -func (tb *Torbox) DeleteTorrent(torrent *types.Torrent) { - url := fmt.Sprintf("%s/api/torrents/controltorrent/%s", tb.Host, torrent.Id) - payload := map[string]string{"torrent_id": torrent.Id, "action": "Delete"} +func (tb *Torbox) DeleteTorrent(torrentId string) { + url := fmt.Sprintf("%s/api/torrents/controltorrent/%s", tb.Host, torrentId) + payload := map[string]string{"torrent_id": torrentId, "action": "Delete"} jsonPayload, _ := json.Marshal(payload) req, _ := http.NewRequest(http.MethodDelete, url, bytes.NewBuffer(jsonPayload)) _, err := tb.client.MakeRequest(req) if err == nil { - tb.logger.Info().Msgf("Torrent: %s deleted", torrent.Name) + tb.logger.Info().Msgf("Torrent: %s deleted", torrentId) } else { tb.logger.Info().Msgf("Error deleting torrent: %s", err) } diff --git a/pkg/debrid/types/debrid.go b/pkg/debrid/types/debrid.go index 58b599a..e1adf4d 100644 --- a/pkg/debrid/types/debrid.go +++ b/pkg/debrid/types/debrid.go @@ -10,7 +10,7 @@ type Client interface { GenerateDownloadLinks(tr *Torrent) error GetDownloadLink(tr *Torrent, file *File) *File ConvertLinksToFiles(links []string) []File - DeleteTorrent(tr *Torrent) + DeleteTorrent(torrentId string) IsAvailable(infohashes []string) map[string]bool GetCheckCached() bool GetDownloadUncached() bool diff --git a/pkg/qbit/import.go b/pkg/qbit/import.go index 243a99e..724220e 100644 --- a/pkg/qbit/import.go +++ b/pkg/qbit/import.go @@ -78,7 +78,7 @@ func (i *ImportRequest) Process(q *QBit) (err error) { if err != nil || debridTorrent == nil { if debridTorrent != nil { dbClient := service.GetDebrid().GetByName(debridTorrent.Debrid) - go dbClient.DeleteTorrent(debridTorrent) + go dbClient.DeleteTorrent(debridTorrent.Id) } if err == nil { err = fmt.Errorf("failed to process torrent") diff --git a/pkg/qbit/torrent.go b/pkg/qbit/torrent.go index 67c855e..b1cdc8c 100644 --- a/pkg/qbit/torrent.go +++ b/pkg/qbit/torrent.go @@ -60,7 +60,7 @@ func (q *QBit) Process(ctx context.Context, magnet *utils.Magnet, category strin if err != nil || debridTorrent == nil { if debridTorrent != nil { dbClient := service.GetDebrid().GetByName(debridTorrent.Debrid) - go dbClient.DeleteTorrent(debridTorrent) + go dbClient.DeleteTorrent(debridTorrent.Id) } if err == nil { err = fmt.Errorf("failed to process torrent") @@ -81,7 +81,7 @@ func (q *QBit) ProcessFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr dbT, err := client.CheckStatus(debridTorrent, isSymlink) if err != nil { q.logger.Error().Msgf("Error checking status: %v", err) - go client.DeleteTorrent(debridTorrent) + go client.DeleteTorrent(debridTorrent.Id) q.MarkAsFailed(torrent) if err := arr.Refresh(); err != nil { q.logger.Error().Msgf("Error refreshing arr: %v", err) @@ -116,26 +116,10 @@ func (q *QBit) ProcessFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr if err != nil { return } - rclonePath := filepath.Join(debridTorrent.MountPath, debridTorrent.Name) - - // Check if folder exists here - if _, err := os.Stat(rclonePath); os.IsNotExist(err) { - q.logger.Debug().Msgf("Folder does not exist: %s", rclonePath) - - // Check if torrent is in the listing - listing := cache.GetListing() - for _, t := range listing { - if t.Name() == debridTorrent.Name { - q.logger.Debug().Msgf("Torrent found in listing: %s", debridTorrent.Name) - } - } - - // Check if torrent is in the webdav - if t := cache.GetTorrentByName(debridTorrent.Name); t == nil { - q.logger.Debug().Msgf("Torrent not found in webdav: %s", debridTorrent.Name) - } + if err := cache.RefreshRclone(); err != nil { + q.logger.Trace().Msgf("Error refreshing rclone: %v", err) } - + rclonePath := filepath.Join(debridTorrent.MountPath, debridTorrent.Name) torrentSymlinkPath, err = q.createSymlinks(debridTorrent, rclonePath, debridTorrent.Name) } else { @@ -147,7 +131,7 @@ func (q *QBit) ProcessFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr } if err != nil { q.MarkAsFailed(torrent) - go client.DeleteTorrent(debridTorrent) + go client.DeleteTorrent(debridTorrent.Id) q.logger.Info().Msgf("Error: %v", err) return } diff --git a/pkg/repair/repair.go b/pkg/repair/repair.go index 9e6f0f6..6c38cbf 100644 --- a/pkg/repair/repair.go +++ b/pkg/repair/repair.go @@ -10,7 +10,7 @@ import ( "github.com/sirrobot01/debrid-blackhole/internal/logger" "github.com/sirrobot01/debrid-blackhole/internal/request" "github.com/sirrobot01/debrid-blackhole/pkg/arr" - "github.com/sirrobot01/debrid-blackhole/pkg/debrid/types" + "github.com/sirrobot01/debrid-blackhole/pkg/debrid/debrid" "golang.org/x/sync/errgroup" "net" "net/http" @@ -29,7 +29,7 @@ import ( type Repair struct { Jobs map[string]*Job arrs *arr.Storage - deb types.Client + deb *debrid.Engine duration time.Duration runOnStart bool ZurgURL string @@ -39,7 +39,7 @@ type Repair struct { filename string } -func New(arrs *arr.Storage) *Repair { +func New(arrs *arr.Storage, engine *debrid.Engine) *Repair { cfg := config.GetConfig() duration, err := parseSchedule(cfg.Repair.Interval) if err != nil { @@ -53,6 +53,7 @@ func New(arrs *arr.Storage) *Repair { ZurgURL: cfg.Repair.ZurgURL, autoProcess: cfg.Repair.AutoProcess, filename: filepath.Join(cfg.Path, "repair.json"), + deb: engine, } if r.ZurgURL != "" { r.IsZurg = true @@ -66,10 +67,11 @@ func New(arrs *arr.Storage) *Repair { type JobStatus string const ( - JobStarted JobStatus = "started" - JobPending JobStatus = "pending" - JobFailed JobStatus = "failed" - JobCompleted JobStatus = "completed" + JobStarted JobStatus = "started" + JobPending JobStatus = "pending" + JobFailed JobStatus = "failed" + JobCompleted JobStatus = "completed" + JobProcessing JobStatus = "processing" ) type Job struct { @@ -185,12 +187,21 @@ func (r *Repair) AddJob(arrsNames []string, mediaIDs []string, autoProcess, recu r.reset(job) r.Jobs[key] = job go r.saveToFile() - err := r.repair(job) - go r.saveToFile() - return err + go func() { + if err := r.repair(job); err != nil { + r.logger.Error().Err(err).Msg("Error running repair") + r.logger.Error().Err(err).Msg("Error running repair") + job.FailedAt = time.Now() + job.Error = err.Error() + job.Status = JobFailed + job.CompletedAt = time.Now() + } + }() + return nil } func (r *Repair) repair(job *Job) error { + defer r.saveToFile() if err := r.preRunChecks(); err != nil { return err } @@ -331,6 +342,161 @@ func (r *Repair) Start(ctx context.Context) error { } } +func (r *Repair) getUniquePaths(media arr.Content) map[string]string { + // Use zurg setup to check file availability with zurg + // This reduces bandwidth usage significantly + + uniqueParents := make(map[string]string) + files := media.Files + for _, file := range files { + target := getSymlinkTarget(file.Path) + if target != "" { + file.IsSymlink = true + dir, f := filepath.Split(target) + parent := filepath.Base(filepath.Clean(dir)) + // Set target path folder/file.mkv + file.TargetPath = f + uniqueParents[parent] = target + } + } + return uniqueParents +} + +func (r *Repair) clean(job *Job) error { + // Create a new error group + g, ctx := errgroup.WithContext(context.Background()) + + uniqueItems := make(map[string]string) + mu := sync.Mutex{} + + // Limit concurrent goroutines + g.SetLimit(runtime.NumCPU() * 4) + + for _, a := range job.Arrs { + a := a // Capture range variable + g.Go(func() error { + // Check if context was canceled + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + items, err := r.cleanArr(job, a, "") + if err != nil { + r.logger.Error().Err(err).Msgf("Error cleaning %s", a) + return err + } + + // Safely append the found items to the shared slice + if len(items) > 0 { + mu.Lock() + for k, v := range items { + uniqueItems[k] = v + } + mu.Unlock() + } + + return nil + }) + } + + if err := g.Wait(); err != nil { + return err + } + + if len(uniqueItems) == 0 { + job.CompletedAt = time.Now() + job.Status = JobCompleted + + go func() { + if err := request.SendDiscordMessage("repair_clean_complete", "success", job.discordContext()); err != nil { + r.logger.Error().Msgf("Error sending discord message: %v", err) + } + }() + + return nil + } + + cache := r.deb.Caches["realdebrid"] + if cache == nil { + return fmt.Errorf("cache not found") + } + torrents := cache.GetTorrents() + + dangling := make([]string, 0) + for _, t := range torrents { + if _, ok := uniqueItems[t.Name]; !ok { + dangling = append(dangling, t.Id) + } + } + + r.logger.Info().Msgf("Found %d delapitated items", len(dangling)) + + if len(dangling) == 0 { + job.CompletedAt = time.Now() + job.Status = JobCompleted + return nil + } + + client := r.deb.Clients["realdebrid"] + if client == nil { + return fmt.Errorf("client not found") + } + for _, id := range dangling { + client.DeleteTorrent(id) + } + + return nil +} + +func (r *Repair) cleanArr(j *Job, _arr string, tmdbId string) (map[string]string, error) { + uniqueItems := make(map[string]string) + a := r.arrs.Get(_arr) + + r.logger.Info().Msgf("Starting repair for %s", a.Name) + media, err := a.GetMedia(tmdbId) + if err != nil { + r.logger.Info().Msgf("Failed to get %s media: %v", a.Name, err) + return uniqueItems, err + } + + // Create a new error group + g, ctx := errgroup.WithContext(context.Background()) + + mu := sync.Mutex{} + + // Limit concurrent goroutines + g.SetLimit(runtime.NumCPU() * 4) + + for _, m := range media { + m := m // Create a new variable scoped to the loop iteration + g.Go(func() error { + // Check if context was canceled + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + u := r.getUniquePaths(m) + for k, v := range u { + mu.Lock() + uniqueItems[k] = v + mu.Unlock() + } + return nil + }) + } + + if err := g.Wait(); err != nil { + return uniqueItems, err + } + + r.logger.Info().Msgf("Repair completed for %s. %d unique items", a.Name, len(uniqueItems)) + return uniqueItems, nil +} + func (r *Repair) repairArr(j *Job, _arr string, tmdbId string) ([]arr.ContentFile, error) { brokenItems := make([]arr.ContentFile, 0) a := r.arrs.Get(_arr) @@ -575,6 +741,7 @@ func (r *Repair) ProcessJob(id string) error { if job == nil { return fmt.Errorf("job %s not found", id) } + // All validation checks remain the same if job.Status != JobPending { return fmt.Errorf("job %s not pending", id) } @@ -598,6 +765,7 @@ func (r *Repair) ProcessJob(id string) error { // Create a new error group g := new(errgroup.Group) + g.SetLimit(runtime.NumCPU() * 4) for arrName, items := range brokenItems { items := items @@ -612,7 +780,6 @@ func (r *Repair) ProcessJob(id string) error { if err := a.DeleteFiles(items); err != nil { r.logger.Error().Err(err).Msgf("Failed to delete broken items for %s", arrName) return nil - } // Search for missing items if err := a.SearchMissing(items); err != nil { @@ -620,20 +787,29 @@ func (r *Repair) ProcessJob(id string) error { return nil } return nil - }) } - if err := g.Wait(); err != nil { - job.FailedAt = time.Now() - job.Error = err.Error() - job.CompletedAt = time.Now() - job.Status = JobFailed - return err - } + // Update job status to in-progress + job.Status = JobProcessing + r.saveToFile() - job.CompletedAt = time.Now() - job.Status = JobCompleted + // Launch a goroutine to wait for completion and update the job + go func() { + if err := g.Wait(); err != nil { + job.FailedAt = time.Now() + job.Error = err.Error() + job.CompletedAt = time.Now() + job.Status = JobFailed + r.logger.Error().Err(err).Msgf("Job %s failed", id) + } else { + job.CompletedAt = time.Now() + job.Status = JobCompleted + r.logger.Info().Msgf("Job %s completed successfully", id) + } + + r.saveToFile() + }() return nil } diff --git a/pkg/service/service.go b/pkg/service/service.go index a5e0c12..7574f06 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -23,7 +23,7 @@ func New() *Service { arrs := arr.NewStorage() deb := debrid.NewEngine() instance = &Service{ - Repair: repair.New(arrs), + Repair: repair.New(arrs, deb), Arr: arrs, Debrid: deb, } @@ -43,7 +43,7 @@ func Update() *Service { arrs := arr.NewStorage() deb := debrid.NewEngine() instance = &Service{ - Repair: repair.New(arrs), + Repair: repair.New(arrs, deb), Arr: arrs, Debrid: deb, } diff --git a/pkg/web/server.go b/pkg/web/server.go index ba87d09..ddb2bc9 100644 --- a/pkg/web/server.go +++ b/pkg/web/server.go @@ -375,15 +375,20 @@ func (ui *Handler) handleRepairMedia(w http.ResponseWriter, r *http.Request) { svc := service.GetService() - _arr := svc.Arr.Get(req.ArrName) - if _arr == nil { - http.Error(w, "No Arrs found to repair", http.StatusNotFound) - return + var arrs []string + + if req.ArrName != "" { + _arr := svc.Arr.Get(req.ArrName) + if _arr == nil { + http.Error(w, "No Arrs found to repair", http.StatusNotFound) + return + } + arrs = append(arrs, req.ArrName) } if req.Async { go func() { - if err := svc.Repair.AddJob([]string{req.ArrName}, req.MediaIds, req.AutoProcess, false); err != nil { + if err := svc.Repair.AddJob(arrs, req.MediaIds, req.AutoProcess, false); err != nil { ui.logger.Error().Err(err).Msg("Failed to repair media") } }() @@ -459,12 +464,10 @@ func (ui *Handler) handleProcessRepairJob(w http.ResponseWriter, r *http.Request http.Error(w, "No job ID provided", http.StatusBadRequest) return } - go func() { - svc := service.GetService() - if err := svc.Repair.ProcessJob(id); err != nil { - ui.logger.Error().Err(err).Msg("Failed to process repair job") - } - }() + svc := service.GetService() + if err := svc.Repair.ProcessJob(id); err != nil { + ui.logger.Error().Err(err).Msg("Failed to process repair job") + } w.WriteHeader(http.StatusOK) } diff --git a/pkg/web/web/repair.html b/pkg/web/web/repair.html index f99a45d..68f9e86 100644 --- a/pkg/web/web/repair.html +++ b/pkg/web/web/repair.html @@ -8,7 +8,7 @@
-
@@ -174,12 +174,6 @@ submitBtn.innerHTML = 'Repairing...'; let mediaIds = document.getElementById('mediaIds').value.split(',').map(id => id.trim()); let arr = document.getElementById('arrSelect').value; - if (!arr) { - createToast('Please select an Arr instance', 'warning'); - submitBtn.disabled = false; - submitBtn.innerHTML = originalText; - return; - } try { const response = await fetch('/internal/repair', { method: 'POST', @@ -187,7 +181,7 @@ 'Content-Type': 'application/json' }, body: JSON.stringify({ - arr: document.getElementById('arrSelect').value, + arr: arr, mediaIds: mediaIds, async: document.getElementById('isAsync').checked, autoProcess: document.getElementById('autoProcess').checked, @@ -262,17 +256,15 @@ // Determine status let status = 'In Progress'; let statusClass = 'text-primary'; - let canDelete = false; + let canDelete = job.status !== "started"; let totalItems = job.broken_items ? Object.values(job.broken_items).reduce((sum, arr) => sum + arr.length, 0) : 0; if (job.status === 'failed') { status = 'Failed'; statusClass = 'text-danger'; - canDelete = true; } else if (job.status === 'completed') { status = 'Completed'; statusClass = 'text-success'; - canDelete = true; } else if (job.status === 'pending') { status = 'Pending'; statusClass = 'text-warning'; diff --git a/pkg/webdav/file.go b/pkg/webdav/file.go index 6c8b84a..ca5d596 100644 --- a/pkg/webdav/file.go +++ b/pkg/webdav/file.go @@ -1,6 +1,7 @@ package webdav import ( + "bufio" "fmt" "github.com/sirrobot01/debrid-blackhole/pkg/debrid/debrid" "io" @@ -11,11 +12,11 @@ import ( var sharedClient = &http.Client{ Transport: &http.Transport{ - // These settings help maintain persistent connections. MaxIdleConns: 100, IdleConnTimeout: 90 * time.Second, DisableCompression: false, DisableKeepAlives: false, + Proxy: http.ProxyFromEnvironment, }, Timeout: 0, } @@ -39,6 +40,24 @@ type File struct { link string } +type bufferedReadCloser struct { + *bufio.Reader + closer io.Closer +} + +// Create a new bufferedReadCloser with a larger buffer +func newBufferedReadCloser(rc io.ReadCloser) *bufferedReadCloser { + return &bufferedReadCloser{ + Reader: bufio.NewReaderSize(rc, 64*1024), // Increase to 1MB buffer + closer: rc, + } +} + +// Close implements ReadCloser interface +func (brc *bufferedReadCloser) Close() error { + return brc.closer.Close() +} + // File interface implementations for File func (f *File) Close() error { @@ -82,40 +101,48 @@ func (f *File) Read(p []byte) (n int, err error) { return n, nil } - // If we haven't started streaming or a seek was requested, - // close the existing stream and start a new HTTP GET request. + // If we haven't started streaming the file yet or need to reposition if f.reader == nil || f.seekPending { + // Close existing reader if we're repositioning if f.reader != nil && f.seekPending { f.reader.Close() f.reader = nil } - // Create a new HTTP GET request for the file's URL. - req, err := http.NewRequest("GET", f.GetDownloadLink(), nil) + downloadLink := f.GetDownloadLink() + if downloadLink == "" { + return 0, fmt.Errorf("failed to get download link for file") + } + + // Create an HTTP GET request to the file's URL. + req, err := http.NewRequest("GET", downloadLink, nil) if err != nil { return 0, fmt.Errorf("failed to create HTTP request: %w", err) } - // If we've already read some data, request only the remaining bytes. + // Request only the bytes starting from our current offset if f.offset > 0 { req.Header.Set("Range", fmt.Sprintf("bytes=%d-", f.offset)) } - // Execute the HTTP request. + // Add important headers for streaming + req.Header.Set("Connection", "keep-alive") + req.Header.Set("Accept", "*/*") + req.Header.Set("User-Agent", "Infuse/7.0.2 (iOS)") + req.Header.Set("Accept-Encoding", "gzip, deflate, br") + resp, err := sharedClient.Do(req) if err != nil { return 0, fmt.Errorf("HTTP request error: %w", err) } - // Accept a 200 (OK) or 206 (Partial Content) status. + // Check response codes more carefully if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { resp.Body.Close() return 0, fmt.Errorf("unexpected HTTP status: %d", resp.StatusCode) } - // Store the response body as our reader. - f.reader = resp.Body - // Reset the seek pending flag now that we've reinitialized the reader. + f.reader = newBufferedReadCloser(resp.Body) f.seekPending = false } @@ -123,10 +150,12 @@ func (f *File) Read(p []byte) (n int, err error) { n, err = f.reader.Read(p) f.offset += int64(n) - // When we reach the end of the stream, close the reader. if err == io.EOF { f.reader.Close() f.reader = nil + } else if err != nil { + f.reader.Close() + f.reader = nil } return n, err @@ -137,12 +166,12 @@ func (f *File) Seek(offset int64, whence int) (int64, error) { return 0, os.ErrInvalid } - var newOffset int64 + newOffset := f.offset switch whence { case io.SeekStart: newOffset = offset case io.SeekCurrent: - newOffset = f.offset + offset + newOffset += offset case io.SeekEnd: newOffset = f.size + offset default: @@ -156,7 +185,7 @@ func (f *File) Seek(offset int64, whence int) (int64, error) { newOffset = f.size } - // If we're seeking to a new position, mark the reader for reset. + // Only mark seek as pending if position actually changed if newOffset != f.offset { f.offset = newOffset f.seekPending = true @@ -184,6 +213,24 @@ func (f *File) Stat() (os.FileInfo, error) { }, nil } +func (f *File) ReadAt(p []byte, off int64) (n int, err error) { + // Save current position + + // Seek to requested position + _, err = f.Seek(off, io.SeekStart) + if err != nil { + return 0, err + } + + // Read the data + n, err = f.Read(p) + + // Don't restore position for Infuse compatibility + // Infuse expects sequential reads after the initial seek + + return n, err +} + func (f *File) Write(p []byte) (n int, err error) { return 0, os.ErrPermission } diff --git a/pkg/webdav/handler.go b/pkg/webdav/handler.go index b17acb2..c4c3954 100644 --- a/pkg/webdav/handler.go +++ b/pkg/webdav/handler.go @@ -2,7 +2,6 @@ package webdav import ( "bytes" - "compress/gzip" "context" "errors" "fmt" @@ -68,7 +67,7 @@ func (h *Handler) RemoveAll(ctx context.Context, name string) error { } if filename == "" { - h.cache.GetClient().DeleteTorrent(cachedTorrent.Torrent) + h.cache.GetClient().DeleteTorrent(cachedTorrent.Torrent.Id) h.cache.OnRemove(cachedTorrent.Id) return nil } @@ -259,7 +258,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // - Otherwise, for deeper (torrent folder) paths, use a longer TTL. ttl := 30 * time.Minute if h.isParentPath(r.URL.Path) { - ttl = 20 * time.Second + ttl = 30 * time.Second } if served := h.serveFromCacheIfValid(w, r, cacheKey, ttl); served { @@ -281,22 +280,12 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { responseData := responseRecorder.Body.Bytes() // Create compressed version - var gzippedData []byte - if len(responseData) > 0 { - var buf bytes.Buffer - gzw := gzip.NewWriter(&buf) - if _, err := gzw.Write(responseData); err == nil { - if err := gzw.Close(); err == nil { - gzippedData = buf.Bytes() - } - } - } - h.cache.PropfindResp.Store(cacheKey, debrid.PropfindResponse{ - Data: responseData, - GzippedData: gzippedData, - Ts: time.Now(), - }) + //h.cache.PropfindResp.Store(cacheKey, debrid.PropfindResponse{ + // Data: responseData, + // GzippedData: request.Gzip(responseData), + // Ts: time.Now(), + //}) // Forward the captured response to the client. for k, v := range responseRecorder.Header() { @@ -417,7 +406,6 @@ func (h *Handler) serveFromCacheIfValid(w http.ResponseWriter, r *http.Request, if time.Since(respCache.Ts) >= ttl { // Remove expired cache entry - h.cache.PropfindResp.Delete(cacheKey) return false } w.Header().Set("Content-Type", "application/xml; charset=utf-8")