Update repair; fix minor bugs with namings

This commit is contained in:
Mukhtar Akere
2025-03-21 04:10:16 +01:00
parent 0c68364a6a
commit 8d494fc277
21 changed files with 455 additions and 151 deletions

1
go.mod
View File

@@ -24,6 +24,7 @@ require (
require (
github.com/anacrolix/missinggo v1.3.0 // indirect
github.com/anacrolix/missinggo/v2 v2.7.3 // indirect
github.com/beevik/etree v1.5.0 // indirect
github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect

2
go.sum
View File

@@ -36,6 +36,8 @@ github.com/anacrolix/tagflag v1.1.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CM
github.com/anacrolix/torrent v1.55.0 h1:s9yh/YGdPmbN9dTa+0Inh2dLdrLQRvEAj1jdFW/Hdd8=
github.com/anacrolix/torrent v1.55.0/go.mod h1:sBdZHBSZNj4de0m+EbYg7vvs/G/STubxu/GzzNbojsE=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/beevik/etree v1.5.0 h1:iaQZFSDS+3kYZiGoc9uKeOkUY3nYMXOKLl6KIJxiJWs=
github.com/beevik/etree v1.5.0/go.mod h1:gPNJNaBGVZ9AwsidazFZyygnd+0pAU38N4D+WemwKNs=
github.com/benbjohnson/immutable v0.2.0/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=

View File

@@ -73,6 +73,7 @@ func NewLogger(prefix string) zerolog.Logger {
Level(zerolog.InfoLevel)
// Set the log level
level = strings.ToLower(level)
switch level {
case "debug":
logger = logger.Level(zerolog.DebugLevel)
@@ -82,6 +83,8 @@ func NewLogger(prefix string) zerolog.Logger {
logger = logger.Level(zerolog.WarnLevel)
case "error":
logger = logger.Level(zerolog.ErrorLevel)
case "trace":
logger = logger.Level(zerolog.TraceLevel)
}
return logger
}

View File

@@ -2,6 +2,7 @@ package request
import (
"bytes"
"compress/gzip"
"context"
"crypto/tls"
"fmt"
@@ -288,3 +289,21 @@ func JSONResponse(w http.ResponseWriter, data interface{}, code int) {
return
}
}
func Gzip(body []byte) []byte {
var b bytes.Buffer
if len(body) == 0 {
return nil
}
gz := gzip.NewWriter(&b)
_, err := gz.Write(body)
if err != nil {
return nil
}
err = gz.Close()
if err != nil {
return nil
}
return b.Bytes()
}

View File

@@ -204,12 +204,12 @@ func (ad *AllDebrid) CheckStatus(torrent *types.Torrent, isSymlink bool) (*types
return torrent, nil
}
func (ad *AllDebrid) DeleteTorrent(torrent *types.Torrent) {
url := fmt.Sprintf("%s/magnet/delete?id=%s", ad.Host, torrent.Id)
func (ad *AllDebrid) DeleteTorrent(torrentId string) {
url := fmt.Sprintf("%s/magnet/delete?id=%s", ad.Host, torrentId)
req, _ := http.NewRequest(http.MethodGet, url, nil)
_, err := ad.client.MakeRequest(req)
if err == nil {
ad.logger.Info().Msgf("Torrent: %s deleted", torrent.Name)
ad.logger.Info().Msgf("Torrent: %s deleted", torrentId)
} else {
ad.logger.Info().Msgf("Error deleting torrent: %s", err)
}

View File

@@ -208,7 +208,7 @@ func (c *Cache) load() (map[string]*CachedTorrent, error) {
if len(ct.Files) != 0 {
// We can assume the torrent is complete
ct.IsComplete = true
ct.Torrent.Name = utils.RemoveExtension(ct.Torrent.Filename) // Update the name
ct.Torrent.Name = utils.RemoveExtension(ct.Torrent.OriginalFilename) // Update the name
torrents[ct.Id] = &ct
}
}
@@ -403,9 +403,6 @@ func (c *Cache) ProcessTorrent(t *types.Torrent, refreshRclone bool) error {
IsComplete: len(t.Files) > 0,
}
c.setTorrent(ct)
if err := c.RefreshRclone(); err != nil {
c.logger.Debug().Err(err).Msg("Failed to refresh rclone")
}
return nil
}

View File

@@ -7,8 +7,6 @@ import (
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"net/http"
"os"
"path"
"path/filepath"
"sort"
"sync"
"time"
@@ -42,7 +40,7 @@ func (c *Cache) refreshListings() {
}
// Atomic store of the complete ready-to-use slice
c.listings.Store(files)
c.resetPropfindResponse()
_ = c.RefreshXml()
if err := c.RefreshRclone(); err != nil {
c.logger.Debug().Err(err).Msg("Failed to refresh rclone")
}
@@ -179,27 +177,3 @@ func (c *Cache) refreshDownloadLinks() {
c.downloadLinks[k] = v.DownloadLink
}
}
func (c *Cache) resetPropfindResponse() {
// Right now, parents are hardcoded
parents := []string{"__all__", "torrents"}
// Reset only the parent directories
// Convert the parents to a keys
// This is a bit hacky, but it works
// Instead of deleting all the keys, we only delete the parent keys, e.g __all__/ or torrents/
keys := make([]string, 0, len(parents))
for _, p := range parents {
// Construct the key
// construct url
url := filepath.Join("/webdav", c.client.GetName(), p)
url = path.Clean(url)
key0 := fmt.Sprintf("propfind:%s:0", url)
key1 := fmt.Sprintf("propfind:%s:1", url)
keys = append(keys, key0, key1)
}
// Delete the keys
for _, k := range keys {
c.PropfindResp.Delete(k)
}
}

118
pkg/debrid/debrid/xml.go Normal file
View File

@@ -0,0 +1,118 @@
package debrid
import (
"fmt"
"github.com/beevik/etree"
"github.com/sirrobot01/debrid-blackhole/internal/request"
"net/http"
"net/url"
path "path/filepath"
"time"
)
func (c *Cache) RefreshXml() error {
parents := []string{"__all__", "torrents"}
for _, parent := range parents {
if err := c.refreshParentXml(parent); err != nil {
return fmt.Errorf("failed to refresh XML for %s: %v", parent, err)
}
}
c.logger.Debug().Msgf("Refreshed XML cache for %s", c.client.GetName())
return nil
}
func (c *Cache) refreshParentXml(parent string) error {
// Define the WebDAV namespace
davNS := "DAV:"
// Create the root multistatus element
doc := etree.NewDocument()
doc.CreateProcInst("xml", `version="1.0" encoding="UTF-8"`)
multistatus := doc.CreateElement("D:multistatus")
multistatus.CreateAttr("xmlns:D", davNS)
// Get the current timestamp in RFC1123 format (WebDAV format)
currentTime := time.Now().UTC().Format(http.TimeFormat)
// Add the parent directory
parentPath := fmt.Sprintf("/webdav/%s/%s/", c.client.GetName(), parent)
addDirectoryResponse(multistatus, parentPath, parent, currentTime)
// Add torrents to the XML
torrents := c.GetListing()
for _, torrent := range torrents {
torrentName := torrent.Name()
torrentPath := fmt.Sprintf("/webdav/%s/%s/%s/",
c.client.GetName(),
url.PathEscape(torrentName),
parent,
)
addDirectoryResponse(multistatus, torrentPath, torrentName, currentTime)
}
// Convert to XML string
xmlData, err := doc.WriteToBytes()
if err != nil {
return fmt.Errorf("failed to generate XML: %v", err)
}
// Store in cache
// Construct the keys
baseUrl := path.Clean(fmt.Sprintf("/webdav/%s/%s", c.client.GetName()))
key0 := fmt.Sprintf("propfind:%s:0", baseUrl)
key1 := fmt.Sprintf("propfind:%s:1", baseUrl)
res := PropfindResponse{
Data: xmlData,
GzippedData: request.Gzip(xmlData),
Ts: time.Now(),
}
c.PropfindResp.Store(key0, res)
c.PropfindResp.Store(key1, res)
return nil
}
func addDirectoryResponse(multistatus *etree.Element, href, displayName, modTime string) *etree.Element {
responseElem := multistatus.CreateElement("D:response")
// Add href
hrefElem := responseElem.CreateElement("D:href")
hrefElem.SetText(href)
// Add propstat
propstatElem := responseElem.CreateElement("D:propstat")
// Add prop
propElem := propstatElem.CreateElement("D:prop")
// Add resource type (collection = directory)
resourceTypeElem := propElem.CreateElement("D:resourcetype")
resourceTypeElem.CreateElement("D:collection")
// Add display name
displayNameElem := propElem.CreateElement("D:displayname")
displayNameElem.SetText(displayName)
// Add last modified time
lastModElem := propElem.CreateElement("D:getlastmodified")
lastModElem.SetText(modTime)
// Add supported lock
lockElem := propElem.CreateElement("D:supportedlock")
lockEntryElem := lockElem.CreateElement("D:lockentry")
lockScopeElem := lockEntryElem.CreateElement("D:lockscope")
lockScopeElem.CreateElement("D:exclusive")
lockTypeElem := lockEntryElem.CreateElement("D:locktype")
lockTypeElem.CreateElement("D:write")
// Add status
statusElem := propstatElem.CreateElement("D:status")
statusElem.SetText("HTTP/1.1 200 OK")
return responseElem
}

View File

@@ -223,12 +223,12 @@ func (dl *DebridLink) CheckStatus(torrent *types.Torrent, isSymlink bool) (*type
return torrent, nil
}
func (dl *DebridLink) DeleteTorrent(torrent *types.Torrent) {
url := fmt.Sprintf("%s/seedbox/%s/remove", dl.Host, torrent.Id)
func (dl *DebridLink) DeleteTorrent(torrentId string) {
url := fmt.Sprintf("%s/seedbox/%s/remove", dl.Host, torrentId)
req, _ := http.NewRequest(http.MethodDelete, url, nil)
_, err := dl.client.MakeRequest(req)
if err == nil {
dl.logger.Info().Msgf("Torrent: %s deleted", torrent.Name)
dl.logger.Info().Msgf("Torrent: %s deleted", torrentId)
} else {
dl.logger.Info().Msgf("Error deleting torrent: %s", err)
}

View File

@@ -173,7 +173,7 @@ func (r *RealDebrid) UpdateTorrent(t *types.Torrent) error {
if err != nil {
return err
}
name := utils.RemoveInvalidChars(data.OriginalFilename)
name := utils.RemoveExtension(data.OriginalFilename)
t.Name = name
t.Bytes = data.Bytes
t.Folder = name
@@ -182,7 +182,7 @@ func (r *RealDebrid) UpdateTorrent(t *types.Torrent) error {
t.Speed = data.Speed
t.Seeders = data.Seeders
t.Filename = data.Filename
t.OriginalFilename = data.OriginalFilename
t.OriginalFilename = name
t.Links = data.Links
t.MountPath = r.MountPath
t.Debrid = r.Name
@@ -208,7 +208,7 @@ func (r *RealDebrid) CheckStatus(t *types.Torrent, isSymlink bool) (*types.Torre
t.Name = name // Important because some magnet changes the name
t.Folder = name
t.Filename = data.Filename
t.OriginalFilename = data.OriginalFilename
t.OriginalFilename = name
t.Bytes = data.Bytes
t.Progress = data.Progress
t.Speed = data.Speed
@@ -257,12 +257,12 @@ func (r *RealDebrid) CheckStatus(t *types.Torrent, isSymlink bool) (*types.Torre
return t, nil
}
func (r *RealDebrid) DeleteTorrent(torrent *types.Torrent) {
url := fmt.Sprintf("%s/torrents/delete/%s", r.Host, torrent.Id)
func (r *RealDebrid) DeleteTorrent(torrentId string) {
url := fmt.Sprintf("%s/torrents/delete/%s", r.Host, torrentId)
req, _ := http.NewRequest(http.MethodDelete, url, nil)
_, err := r.client.MakeRequest(req)
if err == nil {
r.logger.Info().Msgf("Torrent: %s deleted", torrent.Name)
r.logger.Info().Msgf("Torrent: %s deleted", torrentId)
} else {
r.logger.Info().Msgf("Error deleting torrent: %s", err)
}
@@ -382,7 +382,7 @@ func (r *RealDebrid) getTorrents(offset int, limit int) (int, []*types.Torrent,
}
torrents = append(torrents, &types.Torrent{
Id: t.Id,
Name: utils.RemoveInvalidChars(t.Filename),
Name: utils.RemoveInvalidChars(t.Filename), // This changes when we get the files
Bytes: t.Bytes,
Progress: t.Progress,
Status: t.Status,

View File

@@ -232,14 +232,14 @@ func (tb *Torbox) CheckStatus(torrent *types.Torrent, isSymlink bool) (*types.To
return torrent, nil
}
func (tb *Torbox) DeleteTorrent(torrent *types.Torrent) {
url := fmt.Sprintf("%s/api/torrents/controltorrent/%s", tb.Host, torrent.Id)
payload := map[string]string{"torrent_id": torrent.Id, "action": "Delete"}
func (tb *Torbox) DeleteTorrent(torrentId string) {
url := fmt.Sprintf("%s/api/torrents/controltorrent/%s", tb.Host, torrentId)
payload := map[string]string{"torrent_id": torrentId, "action": "Delete"}
jsonPayload, _ := json.Marshal(payload)
req, _ := http.NewRequest(http.MethodDelete, url, bytes.NewBuffer(jsonPayload))
_, err := tb.client.MakeRequest(req)
if err == nil {
tb.logger.Info().Msgf("Torrent: %s deleted", torrent.Name)
tb.logger.Info().Msgf("Torrent: %s deleted", torrentId)
} else {
tb.logger.Info().Msgf("Error deleting torrent: %s", err)
}

View File

@@ -10,7 +10,7 @@ type Client interface {
GenerateDownloadLinks(tr *Torrent) error
GetDownloadLink(tr *Torrent, file *File) *File
ConvertLinksToFiles(links []string) []File
DeleteTorrent(tr *Torrent)
DeleteTorrent(torrentId string)
IsAvailable(infohashes []string) map[string]bool
GetCheckCached() bool
GetDownloadUncached() bool

View File

@@ -78,7 +78,7 @@ func (i *ImportRequest) Process(q *QBit) (err error) {
if err != nil || debridTorrent == nil {
if debridTorrent != nil {
dbClient := service.GetDebrid().GetByName(debridTorrent.Debrid)
go dbClient.DeleteTorrent(debridTorrent)
go dbClient.DeleteTorrent(debridTorrent.Id)
}
if err == nil {
err = fmt.Errorf("failed to process torrent")

View File

@@ -60,7 +60,7 @@ func (q *QBit) Process(ctx context.Context, magnet *utils.Magnet, category strin
if err != nil || debridTorrent == nil {
if debridTorrent != nil {
dbClient := service.GetDebrid().GetByName(debridTorrent.Debrid)
go dbClient.DeleteTorrent(debridTorrent)
go dbClient.DeleteTorrent(debridTorrent.Id)
}
if err == nil {
err = fmt.Errorf("failed to process torrent")
@@ -81,7 +81,7 @@ func (q *QBit) ProcessFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr
dbT, err := client.CheckStatus(debridTorrent, isSymlink)
if err != nil {
q.logger.Error().Msgf("Error checking status: %v", err)
go client.DeleteTorrent(debridTorrent)
go client.DeleteTorrent(debridTorrent.Id)
q.MarkAsFailed(torrent)
if err := arr.Refresh(); err != nil {
q.logger.Error().Msgf("Error refreshing arr: %v", err)
@@ -116,26 +116,10 @@ func (q *QBit) ProcessFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr
if err != nil {
return
}
if err := cache.RefreshRclone(); err != nil {
q.logger.Trace().Msgf("Error refreshing rclone: %v", err)
}
rclonePath := filepath.Join(debridTorrent.MountPath, debridTorrent.Name)
// Check if folder exists here
if _, err := os.Stat(rclonePath); os.IsNotExist(err) {
q.logger.Debug().Msgf("Folder does not exist: %s", rclonePath)
// Check if torrent is in the listing
listing := cache.GetListing()
for _, t := range listing {
if t.Name() == debridTorrent.Name {
q.logger.Debug().Msgf("Torrent found in listing: %s", debridTorrent.Name)
}
}
// Check if torrent is in the webdav
if t := cache.GetTorrentByName(debridTorrent.Name); t == nil {
q.logger.Debug().Msgf("Torrent not found in webdav: %s", debridTorrent.Name)
}
}
torrentSymlinkPath, err = q.createSymlinks(debridTorrent, rclonePath, debridTorrent.Name)
} else {
@@ -147,7 +131,7 @@ func (q *QBit) ProcessFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr
}
if err != nil {
q.MarkAsFailed(torrent)
go client.DeleteTorrent(debridTorrent)
go client.DeleteTorrent(debridTorrent.Id)
q.logger.Info().Msgf("Error: %v", err)
return
}

View File

@@ -10,7 +10,7 @@ import (
"github.com/sirrobot01/debrid-blackhole/internal/logger"
"github.com/sirrobot01/debrid-blackhole/internal/request"
"github.com/sirrobot01/debrid-blackhole/pkg/arr"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/debrid"
"golang.org/x/sync/errgroup"
"net"
"net/http"
@@ -29,7 +29,7 @@ import (
type Repair struct {
Jobs map[string]*Job
arrs *arr.Storage
deb types.Client
deb *debrid.Engine
duration time.Duration
runOnStart bool
ZurgURL string
@@ -39,7 +39,7 @@ type Repair struct {
filename string
}
func New(arrs *arr.Storage) *Repair {
func New(arrs *arr.Storage, engine *debrid.Engine) *Repair {
cfg := config.GetConfig()
duration, err := parseSchedule(cfg.Repair.Interval)
if err != nil {
@@ -53,6 +53,7 @@ func New(arrs *arr.Storage) *Repair {
ZurgURL: cfg.Repair.ZurgURL,
autoProcess: cfg.Repair.AutoProcess,
filename: filepath.Join(cfg.Path, "repair.json"),
deb: engine,
}
if r.ZurgURL != "" {
r.IsZurg = true
@@ -70,6 +71,7 @@ const (
JobPending JobStatus = "pending"
JobFailed JobStatus = "failed"
JobCompleted JobStatus = "completed"
JobProcessing JobStatus = "processing"
)
type Job struct {
@@ -185,12 +187,21 @@ func (r *Repair) AddJob(arrsNames []string, mediaIDs []string, autoProcess, recu
r.reset(job)
r.Jobs[key] = job
go r.saveToFile()
err := r.repair(job)
go r.saveToFile()
return err
go func() {
if err := r.repair(job); err != nil {
r.logger.Error().Err(err).Msg("Error running repair")
r.logger.Error().Err(err).Msg("Error running repair")
job.FailedAt = time.Now()
job.Error = err.Error()
job.Status = JobFailed
job.CompletedAt = time.Now()
}
}()
return nil
}
func (r *Repair) repair(job *Job) error {
defer r.saveToFile()
if err := r.preRunChecks(); err != nil {
return err
}
@@ -331,6 +342,161 @@ func (r *Repair) Start(ctx context.Context) error {
}
}
func (r *Repair) getUniquePaths(media arr.Content) map[string]string {
// Use zurg setup to check file availability with zurg
// This reduces bandwidth usage significantly
uniqueParents := make(map[string]string)
files := media.Files
for _, file := range files {
target := getSymlinkTarget(file.Path)
if target != "" {
file.IsSymlink = true
dir, f := filepath.Split(target)
parent := filepath.Base(filepath.Clean(dir))
// Set target path folder/file.mkv
file.TargetPath = f
uniqueParents[parent] = target
}
}
return uniqueParents
}
func (r *Repair) clean(job *Job) error {
// Create a new error group
g, ctx := errgroup.WithContext(context.Background())
uniqueItems := make(map[string]string)
mu := sync.Mutex{}
// Limit concurrent goroutines
g.SetLimit(runtime.NumCPU() * 4)
for _, a := range job.Arrs {
a := a // Capture range variable
g.Go(func() error {
// Check if context was canceled
select {
case <-ctx.Done():
return ctx.Err()
default:
}
items, err := r.cleanArr(job, a, "")
if err != nil {
r.logger.Error().Err(err).Msgf("Error cleaning %s", a)
return err
}
// Safely append the found items to the shared slice
if len(items) > 0 {
mu.Lock()
for k, v := range items {
uniqueItems[k] = v
}
mu.Unlock()
}
return nil
})
}
if err := g.Wait(); err != nil {
return err
}
if len(uniqueItems) == 0 {
job.CompletedAt = time.Now()
job.Status = JobCompleted
go func() {
if err := request.SendDiscordMessage("repair_clean_complete", "success", job.discordContext()); err != nil {
r.logger.Error().Msgf("Error sending discord message: %v", err)
}
}()
return nil
}
cache := r.deb.Caches["realdebrid"]
if cache == nil {
return fmt.Errorf("cache not found")
}
torrents := cache.GetTorrents()
dangling := make([]string, 0)
for _, t := range torrents {
if _, ok := uniqueItems[t.Name]; !ok {
dangling = append(dangling, t.Id)
}
}
r.logger.Info().Msgf("Found %d delapitated items", len(dangling))
if len(dangling) == 0 {
job.CompletedAt = time.Now()
job.Status = JobCompleted
return nil
}
client := r.deb.Clients["realdebrid"]
if client == nil {
return fmt.Errorf("client not found")
}
for _, id := range dangling {
client.DeleteTorrent(id)
}
return nil
}
func (r *Repair) cleanArr(j *Job, _arr string, tmdbId string) (map[string]string, error) {
uniqueItems := make(map[string]string)
a := r.arrs.Get(_arr)
r.logger.Info().Msgf("Starting repair for %s", a.Name)
media, err := a.GetMedia(tmdbId)
if err != nil {
r.logger.Info().Msgf("Failed to get %s media: %v", a.Name, err)
return uniqueItems, err
}
// Create a new error group
g, ctx := errgroup.WithContext(context.Background())
mu := sync.Mutex{}
// Limit concurrent goroutines
g.SetLimit(runtime.NumCPU() * 4)
for _, m := range media {
m := m // Create a new variable scoped to the loop iteration
g.Go(func() error {
// Check if context was canceled
select {
case <-ctx.Done():
return ctx.Err()
default:
}
u := r.getUniquePaths(m)
for k, v := range u {
mu.Lock()
uniqueItems[k] = v
mu.Unlock()
}
return nil
})
}
if err := g.Wait(); err != nil {
return uniqueItems, err
}
r.logger.Info().Msgf("Repair completed for %s. %d unique items", a.Name, len(uniqueItems))
return uniqueItems, nil
}
func (r *Repair) repairArr(j *Job, _arr string, tmdbId string) ([]arr.ContentFile, error) {
brokenItems := make([]arr.ContentFile, 0)
a := r.arrs.Get(_arr)
@@ -575,6 +741,7 @@ func (r *Repair) ProcessJob(id string) error {
if job == nil {
return fmt.Errorf("job %s not found", id)
}
// All validation checks remain the same
if job.Status != JobPending {
return fmt.Errorf("job %s not pending", id)
}
@@ -598,6 +765,7 @@ func (r *Repair) ProcessJob(id string) error {
// Create a new error group
g := new(errgroup.Group)
g.SetLimit(runtime.NumCPU() * 4)
for arrName, items := range brokenItems {
items := items
@@ -612,7 +780,6 @@ func (r *Repair) ProcessJob(id string) error {
if err := a.DeleteFiles(items); err != nil {
r.logger.Error().Err(err).Msgf("Failed to delete broken items for %s", arrName)
return nil
}
// Search for missing items
if err := a.SearchMissing(items); err != nil {
@@ -620,20 +787,29 @@ func (r *Repair) ProcessJob(id string) error {
return nil
}
return nil
})
}
// Update job status to in-progress
job.Status = JobProcessing
r.saveToFile()
// Launch a goroutine to wait for completion and update the job
go func() {
if err := g.Wait(); err != nil {
job.FailedAt = time.Now()
job.Error = err.Error()
job.CompletedAt = time.Now()
job.Status = JobFailed
return err
}
r.logger.Error().Err(err).Msgf("Job %s failed", id)
} else {
job.CompletedAt = time.Now()
job.Status = JobCompleted
r.logger.Info().Msgf("Job %s completed successfully", id)
}
r.saveToFile()
}()
return nil
}

View File

@@ -23,7 +23,7 @@ func New() *Service {
arrs := arr.NewStorage()
deb := debrid.NewEngine()
instance = &Service{
Repair: repair.New(arrs),
Repair: repair.New(arrs, deb),
Arr: arrs,
Debrid: deb,
}
@@ -43,7 +43,7 @@ func Update() *Service {
arrs := arr.NewStorage()
deb := debrid.NewEngine()
instance = &Service{
Repair: repair.New(arrs),
Repair: repair.New(arrs, deb),
Arr: arrs,
Debrid: deb,
}

View File

@@ -375,15 +375,20 @@ func (ui *Handler) handleRepairMedia(w http.ResponseWriter, r *http.Request) {
svc := service.GetService()
var arrs []string
if req.ArrName != "" {
_arr := svc.Arr.Get(req.ArrName)
if _arr == nil {
http.Error(w, "No Arrs found to repair", http.StatusNotFound)
return
}
arrs = append(arrs, req.ArrName)
}
if req.Async {
go func() {
if err := svc.Repair.AddJob([]string{req.ArrName}, req.MediaIds, req.AutoProcess, false); err != nil {
if err := svc.Repair.AddJob(arrs, req.MediaIds, req.AutoProcess, false); err != nil {
ui.logger.Error().Err(err).Msg("Failed to repair media")
}
}()
@@ -459,12 +464,10 @@ func (ui *Handler) handleProcessRepairJob(w http.ResponseWriter, r *http.Request
http.Error(w, "No job ID provided", http.StatusBadRequest)
return
}
go func() {
svc := service.GetService()
if err := svc.Repair.ProcessJob(id); err != nil {
ui.logger.Error().Err(err).Msg("Failed to process repair job")
}
}()
w.WriteHeader(http.StatusOK)
}

View File

@@ -8,7 +8,7 @@
<form id="repairForm">
<div class="mb-3">
<label for="arrSelect" class="form-label">Select Arr Instance</label>
<select class="form-select" id="arrSelect" required>
<select class="form-select" id="arrSelect">
<option value="">Select an Arr instance</option>
</select>
</div>
@@ -174,12 +174,6 @@
submitBtn.innerHTML = '<span class="spinner-border spinner-border-sm me-2"></span>Repairing...';
let mediaIds = document.getElementById('mediaIds').value.split(',').map(id => id.trim());
let arr = document.getElementById('arrSelect').value;
if (!arr) {
createToast('Please select an Arr instance', 'warning');
submitBtn.disabled = false;
submitBtn.innerHTML = originalText;
return;
}
try {
const response = await fetch('/internal/repair', {
method: 'POST',
@@ -187,7 +181,7 @@
'Content-Type': 'application/json'
},
body: JSON.stringify({
arr: document.getElementById('arrSelect').value,
arr: arr,
mediaIds: mediaIds,
async: document.getElementById('isAsync').checked,
autoProcess: document.getElementById('autoProcess').checked,
@@ -262,17 +256,15 @@
// Determine status
let status = 'In Progress';
let statusClass = 'text-primary';
let canDelete = false;
let canDelete = job.status !== "started";
let totalItems = job.broken_items ? Object.values(job.broken_items).reduce((sum, arr) => sum + arr.length, 0) : 0;
if (job.status === 'failed') {
status = 'Failed';
statusClass = 'text-danger';
canDelete = true;
} else if (job.status === 'completed') {
status = 'Completed';
statusClass = 'text-success';
canDelete = true;
} else if (job.status === 'pending') {
status = 'Pending';
statusClass = 'text-warning';

View File

@@ -1,6 +1,7 @@
package webdav
import (
"bufio"
"fmt"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/debrid"
"io"
@@ -11,11 +12,11 @@ import (
var sharedClient = &http.Client{
Transport: &http.Transport{
// These settings help maintain persistent connections.
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
DisableCompression: false,
DisableKeepAlives: false,
Proxy: http.ProxyFromEnvironment,
},
Timeout: 0,
}
@@ -39,6 +40,24 @@ type File struct {
link string
}
type bufferedReadCloser struct {
*bufio.Reader
closer io.Closer
}
// Create a new bufferedReadCloser with a larger buffer
func newBufferedReadCloser(rc io.ReadCloser) *bufferedReadCloser {
return &bufferedReadCloser{
Reader: bufio.NewReaderSize(rc, 64*1024), // Increase to 1MB buffer
closer: rc,
}
}
// Close implements ReadCloser interface
func (brc *bufferedReadCloser) Close() error {
return brc.closer.Close()
}
// File interface implementations for File
func (f *File) Close() error {
@@ -82,40 +101,48 @@ func (f *File) Read(p []byte) (n int, err error) {
return n, nil
}
// If we haven't started streaming or a seek was requested,
// close the existing stream and start a new HTTP GET request.
// If we haven't started streaming the file yet or need to reposition
if f.reader == nil || f.seekPending {
// Close existing reader if we're repositioning
if f.reader != nil && f.seekPending {
f.reader.Close()
f.reader = nil
}
// Create a new HTTP GET request for the file's URL.
req, err := http.NewRequest("GET", f.GetDownloadLink(), nil)
downloadLink := f.GetDownloadLink()
if downloadLink == "" {
return 0, fmt.Errorf("failed to get download link for file")
}
// Create an HTTP GET request to the file's URL.
req, err := http.NewRequest("GET", downloadLink, nil)
if err != nil {
return 0, fmt.Errorf("failed to create HTTP request: %w", err)
}
// If we've already read some data, request only the remaining bytes.
// Request only the bytes starting from our current offset
if f.offset > 0 {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", f.offset))
}
// Execute the HTTP request.
// Add important headers for streaming
req.Header.Set("Connection", "keep-alive")
req.Header.Set("Accept", "*/*")
req.Header.Set("User-Agent", "Infuse/7.0.2 (iOS)")
req.Header.Set("Accept-Encoding", "gzip, deflate, br")
resp, err := sharedClient.Do(req)
if err != nil {
return 0, fmt.Errorf("HTTP request error: %w", err)
}
// Accept a 200 (OK) or 206 (Partial Content) status.
// Check response codes more carefully
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
resp.Body.Close()
return 0, fmt.Errorf("unexpected HTTP status: %d", resp.StatusCode)
}
// Store the response body as our reader.
f.reader = resp.Body
// Reset the seek pending flag now that we've reinitialized the reader.
f.reader = newBufferedReadCloser(resp.Body)
f.seekPending = false
}
@@ -123,10 +150,12 @@ func (f *File) Read(p []byte) (n int, err error) {
n, err = f.reader.Read(p)
f.offset += int64(n)
// When we reach the end of the stream, close the reader.
if err == io.EOF {
f.reader.Close()
f.reader = nil
} else if err != nil {
f.reader.Close()
f.reader = nil
}
return n, err
@@ -137,12 +166,12 @@ func (f *File) Seek(offset int64, whence int) (int64, error) {
return 0, os.ErrInvalid
}
var newOffset int64
newOffset := f.offset
switch whence {
case io.SeekStart:
newOffset = offset
case io.SeekCurrent:
newOffset = f.offset + offset
newOffset += offset
case io.SeekEnd:
newOffset = f.size + offset
default:
@@ -156,7 +185,7 @@ func (f *File) Seek(offset int64, whence int) (int64, error) {
newOffset = f.size
}
// If we're seeking to a new position, mark the reader for reset.
// Only mark seek as pending if position actually changed
if newOffset != f.offset {
f.offset = newOffset
f.seekPending = true
@@ -184,6 +213,24 @@ func (f *File) Stat() (os.FileInfo, error) {
}, nil
}
func (f *File) ReadAt(p []byte, off int64) (n int, err error) {
// Save current position
// Seek to requested position
_, err = f.Seek(off, io.SeekStart)
if err != nil {
return 0, err
}
// Read the data
n, err = f.Read(p)
// Don't restore position for Infuse compatibility
// Infuse expects sequential reads after the initial seek
return n, err
}
func (f *File) Write(p []byte) (n int, err error) {
return 0, os.ErrPermission
}

View File

@@ -2,7 +2,6 @@ package webdav
import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
@@ -68,7 +67,7 @@ func (h *Handler) RemoveAll(ctx context.Context, name string) error {
}
if filename == "" {
h.cache.GetClient().DeleteTorrent(cachedTorrent.Torrent)
h.cache.GetClient().DeleteTorrent(cachedTorrent.Torrent.Id)
h.cache.OnRemove(cachedTorrent.Id)
return nil
}
@@ -259,7 +258,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// - Otherwise, for deeper (torrent folder) paths, use a longer TTL.
ttl := 30 * time.Minute
if h.isParentPath(r.URL.Path) {
ttl = 20 * time.Second
ttl = 30 * time.Second
}
if served := h.serveFromCacheIfValid(w, r, cacheKey, ttl); served {
@@ -281,22 +280,12 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
responseData := responseRecorder.Body.Bytes()
// Create compressed version
var gzippedData []byte
if len(responseData) > 0 {
var buf bytes.Buffer
gzw := gzip.NewWriter(&buf)
if _, err := gzw.Write(responseData); err == nil {
if err := gzw.Close(); err == nil {
gzippedData = buf.Bytes()
}
}
}
h.cache.PropfindResp.Store(cacheKey, debrid.PropfindResponse{
Data: responseData,
GzippedData: gzippedData,
Ts: time.Now(),
})
//h.cache.PropfindResp.Store(cacheKey, debrid.PropfindResponse{
// Data: responseData,
// GzippedData: request.Gzip(responseData),
// Ts: time.Now(),
//})
// Forward the captured response to the client.
for k, v := range responseRecorder.Header() {
@@ -417,7 +406,6 @@ func (h *Handler) serveFromCacheIfValid(w http.ResponseWriter, r *http.Request,
if time.Since(respCache.Ts) >= ttl {
// Remove expired cache entry
h.cache.PropfindResp.Delete(cacheKey)
return false
}
w.Header().Set("Content-Type", "application/xml; charset=utf-8")