fix mounts; backward compatibility

This commit is contained in:
Mukhtar Akere
2025-02-13 05:07:14 +01:00
parent 6f4f72d781
commit bfd2596367
21 changed files with 71 additions and 466 deletions
-11
View File
@@ -29,10 +29,6 @@ type AllDebrid struct {
CheckCached bool
}
func (ad *AllDebrid) GetMountPath() string {
return ad.MountPath
}
func (ad *AllDebrid) GetName() string {
return ad.Name
}
@@ -77,7 +73,6 @@ func (ad *AllDebrid) SubmitMagnet(torrent *torrent.Torrent) (*torrent.Torrent, e
}
magnet := magnets[0]
torrentId := strconv.Itoa(magnet.ID)
ad.logger.Info().Msgf("Torrent: %s added with id: %s", torrent.Name, torrentId)
torrent.Id = torrentId
return torrent, nil
@@ -170,12 +165,6 @@ func (ad *AllDebrid) GetTorrent(id string) (*torrent.Torrent, error) {
t.Seeders = data.Seeders
index := -1
files := flattenFiles(data.Files, "", &index)
parentFolder := data.Filename
if data.NbLinks == 1 {
// All debrid doesn't return the parent folder for single file torrents
parentFolder = ""
}
t.OriginalFilename = parentFolder
t.Files = files
}
return t, nil
-360
View File
@@ -1,360 +0,0 @@
package cache
import (
"bufio"
"encoding/json"
"fmt"
"github.com/rs/zerolog"
"github.com/sirrobot01/debrid-blackhole/internal/logger"
"os"
"path/filepath"
"runtime"
"sync"
"time"
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/engine"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/torrent"
)
type DownloadLinkCache struct {
Link string `json:"download_link"`
}
type CachedTorrent struct {
*torrent.Torrent
LastRead time.Time `json:"last_read"`
IsComplete bool `json:"is_complete"`
DownloadLinks map[string]DownloadLinkCache `json:"download_links"`
}
var (
_logInstance zerolog.Logger
once sync.Once
)
func getLogger() zerolog.Logger {
once.Do(func() {
_logInstance = logger.NewLogger("cache", "info", os.Stdout)
})
return _logInstance
}
type Cache struct {
dir string
client engine.Service
torrents *sync.Map // key: torrent.Id, value: *CachedTorrent
torrentsNames *sync.Map // key: torrent.Name, value: torrent.Id
LastUpdated time.Time `json:"last_updated"`
}
type Manager struct {
caches map[string]*Cache
}
func NewManager(debridService *engine.Engine) *Manager {
cfg := config.GetConfig()
cm := &Manager{
caches: make(map[string]*Cache),
}
for _, debrid := range debridService.GetDebrids() {
c := New(debrid, cfg.Path)
cm.caches[debrid.GetName()] = c
}
return cm
}
func (m *Manager) GetCaches() map[string]*Cache {
return m.caches
}
func (m *Manager) GetCache(debridName string) *Cache {
return m.caches[debridName]
}
func New(debridService engine.Service, basePath string) *Cache {
return &Cache{
dir: filepath.Join(basePath, "cache", debridService.GetName(), "torrents"),
torrents: &sync.Map{},
torrentsNames: &sync.Map{},
client: debridService,
}
}
func (c *Cache) Start() error {
_logger := getLogger()
_logger.Info().Msg("Starting cache for: " + c.client.GetName())
if err := c.Load(); err != nil {
return fmt.Errorf("failed to load cache: %v", err)
}
if err := c.Sync(); err != nil {
return fmt.Errorf("failed to sync cache: %v", err)
}
return nil
}
func (c *Cache) Load() error {
_logger := getLogger()
if err := os.MkdirAll(c.dir, 0755); err != nil {
return fmt.Errorf("failed to create cache directory: %w", err)
}
files, err := os.ReadDir(c.dir)
if err != nil {
return fmt.Errorf("failed to read cache directory: %w", err)
}
for _, file := range files {
if file.IsDir() || filepath.Ext(file.Name()) != ".json" {
continue
}
filePath := filepath.Join(c.dir, file.Name())
data, err := os.ReadFile(filePath)
if err != nil {
_logger.Debug().Err(err).Msgf("Failed to read file: %s", filePath)
continue
}
var ct CachedTorrent
if err := json.Unmarshal(data, &ct); err != nil {
_logger.Debug().Err(err).Msgf("Failed to unmarshal file: %s", filePath)
continue
}
if len(ct.Files) > 0 {
c.torrents.Store(ct.Torrent.Id, &ct)
c.torrentsNames.Store(ct.Torrent.Name, ct.Torrent.Id)
}
}
return nil
}
func (c *Cache) GetTorrent(id string) *CachedTorrent {
if value, ok := c.torrents.Load(id); ok {
return value.(*CachedTorrent)
}
return nil
}
func (c *Cache) GetTorrentByName(name string) *CachedTorrent {
if id, ok := c.torrentsNames.Load(name); ok {
return c.GetTorrent(id.(string))
}
return nil
}
func (c *Cache) SaveTorrent(ct *CachedTorrent) error {
data, err := json.MarshalIndent(ct, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal torrent: %w", err)
}
fileName := ct.Torrent.Id + ".json"
filePath := filepath.Join(c.dir, fileName)
tmpFile := filePath + ".tmp"
f, err := os.Create(tmpFile)
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
defer f.Close()
w := bufio.NewWriter(f)
if _, err := w.Write(data); err != nil {
return fmt.Errorf("failed to write data: %w", err)
}
if err := w.Flush(); err != nil {
return fmt.Errorf("failed to flush data: %w", err)
}
return os.Rename(tmpFile, filePath)
}
func (c *Cache) SaveAll() error {
const batchSize = 100
var wg sync.WaitGroup
_logger := getLogger()
tasks := make(chan *CachedTorrent, batchSize)
for i := 0; i < runtime.NumCPU(); i++ {
wg.Add(1)
go func() {
defer wg.Done()
for ct := range tasks {
if err := c.SaveTorrent(ct); err != nil {
_logger.Error().Err(err).Msg("failed to save torrent")
}
}
}()
}
c.torrents.Range(func(_, value interface{}) bool {
tasks <- value.(*CachedTorrent)
return true
})
close(tasks)
wg.Wait()
c.LastUpdated = time.Now()
return nil
}
func (c *Cache) Sync() error {
_logger := getLogger()
torrents, err := c.client.GetTorrents()
if err != nil {
return fmt.Errorf("failed to sync torrents: %v", err)
}
workers := runtime.NumCPU() * 200
workChan := make(chan *torrent.Torrent, len(torrents))
errChan := make(chan error, len(torrents))
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for t := range workChan {
if err := c.processTorrent(t); err != nil {
errChan <- err
}
}
}()
}
for _, t := range torrents {
workChan <- t
}
close(workChan)
wg.Wait()
close(errChan)
for err := range errChan {
_logger.Error().Err(err).Msg("sync error")
}
_logger.Info().Msgf("Synced %d torrents", len(torrents))
return nil
}
func (c *Cache) processTorrent(t *torrent.Torrent) error {
if existing, ok := c.torrents.Load(t.Id); ok {
ct := existing.(*CachedTorrent)
if ct.IsComplete {
return nil
}
}
c.AddTorrent(t)
return nil
}
func (c *Cache) AddTorrent(t *torrent.Torrent) {
_logger := getLogger()
if len(t.Files) == 0 {
tNew, err := c.client.GetTorrent(t.Id)
_logger.Debug().Msgf("Getting torrent files for %s", t.Id)
if err != nil {
_logger.Debug().Msgf("Failed to get torrent files for %s: %v", t.Id, err)
return
}
t = tNew
}
if len(t.Files) == 0 {
_logger.Debug().Msgf("No files found for %s", t.Id)
return
}
ct := &CachedTorrent{
Torrent: t,
LastRead: time.Now(),
IsComplete: len(t.Files) > 0,
DownloadLinks: make(map[string]DownloadLinkCache),
}
c.torrents.Store(t.Id, ct)
c.torrentsNames.Store(t.Name, t.Id)
go func() {
if err := c.SaveTorrent(ct); err != nil {
_logger.Debug().Err(err).Msgf("Failed to save torrent %s", t.Id)
}
}()
}
func (c *Cache) RefreshTorrent(torrentId string) *CachedTorrent {
_logger := getLogger()
t, err := c.client.GetTorrent(torrentId)
if err != nil {
_logger.Debug().Msgf("Failed to get torrent files for %s: %v", torrentId, err)
return nil
}
if len(t.Files) == 0 {
return nil
}
ct := &CachedTorrent{
Torrent: t,
LastRead: time.Now(),
IsComplete: len(t.Files) > 0,
DownloadLinks: make(map[string]DownloadLinkCache),
}
c.torrents.Store(t.Id, ct)
c.torrentsNames.Store(t.Name, t.Id)
go func() {
if err := c.SaveTorrent(ct); err != nil {
_logger.Debug().Err(err).Msgf("Failed to save torrent %s", t.Id)
}
}()
return ct
}
func (c *Cache) GetFileDownloadLink(t *CachedTorrent, file *torrent.File) (string, error) {
_logger := getLogger()
if linkCache, ok := t.DownloadLinks[file.Id]; ok {
return linkCache.Link, nil
}
if file.Link == "" {
t = c.RefreshTorrent(t.Id)
if t == nil {
return "", fmt.Errorf("torrent not found")
}
file = t.Torrent.GetFile(file.Id)
}
_logger.Debug().Msgf("Getting download link for %s", t.Name)
link := c.client.GetDownloadLink(t.Torrent, file)
if link == nil {
return "", fmt.Errorf("download link not found")
}
t.DownloadLinks[file.Id] = DownloadLinkCache{
Link: link.DownloadLink,
}
go func() {
if err := c.SaveTorrent(t); err != nil {
_logger.Debug().Err(err).Msgf("Failed to save torrent %s", t.Id)
}
}()
return link.DownloadLink, nil
}
func (c *Cache) GetTorrents() *sync.Map {
return c.torrents
}
+1 -1
View File
@@ -81,7 +81,7 @@ func ProcessTorrent(d *engine.Engine, magnet *utils.Magnet, a *arr.Arr, isSymlin
errs = append(errs, err)
continue
}
logger.Info().Msgf("Torrent: %s submitted to %s", dbt.Name, db.GetName())
logger.Info().Msgf("Torrent: %s(id=%s) submitted to %s", dbt.Name, dbt.Id, db.GetName())
d.LastUsed = index
return db.CheckStatus(dbt, isSymlink)
}
-6
View File
@@ -11,7 +11,6 @@ import (
"github.com/sirrobot01/debrid-blackhole/internal/request"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/torrent"
"log"
"net/http"
"os"
"strings"
@@ -29,10 +28,6 @@ type DebridLink struct {
CheckCached bool
}
func (dl *DebridLink) GetMountPath() string {
return dl.MountPath
}
func (dl *DebridLink) GetName() string {
return dl.Name
}
@@ -176,7 +171,6 @@ func (dl *DebridLink) SubmitMagnet(t *torrent.Torrent) (*torrent.Torrent, error)
}
data := *res.Value
status := "downloading"
log.Printf("Torrent: %s added with id: %s", t.Name, data.ID)
name := common.RemoveInvalidChars(data.Name)
t.Id = data.ID
t.Name = name
-1
View File
@@ -12,7 +12,6 @@ type Service interface {
GetDownloadLink(tr *torrent.Torrent, file *torrent.File) *torrent.DownloadLinks
DeleteTorrent(tr *torrent.Torrent)
IsAvailable(infohashes []string) map[string]bool
GetMountPath() string
GetCheckCached() bool
GetTorrent(id string) (*torrent.Torrent, error)
GetTorrents() ([]*torrent.Torrent, error)
+4 -6
View File
@@ -31,10 +31,6 @@ type RealDebrid struct {
CheckCached bool
}
func (r *RealDebrid) GetMountPath() string {
return r.MountPath
}
func (r *RealDebrid) GetName() string {
return r.Name
}
@@ -156,9 +152,9 @@ func (r *RealDebrid) SubmitMagnet(t *torrent.Torrent) (*torrent.Torrent, error)
return nil, err
}
err = json.Unmarshal(resp, &data)
r.logger.Info().Msgf("Torrent: %s added with id: %s", t.Name, data.Id)
t.Id = data.Id
t.Debrid = r.Name
t.MountPath = r.MountPath
return t, nil
}
@@ -218,6 +214,8 @@ func (r *RealDebrid) CheckStatus(t *torrent.Torrent, isSymlink bool) (*torrent.T
t.Seeders = data.Seeders
t.Links = data.Links
t.Status = status
t.Debrid = r.Name
t.MountPath = r.MountPath
downloadingStatus := []string{"downloading", "magnet_conversion", "queued", "compressing", "uploading"}
if status == "waiting_files_selection" {
files := GetTorrentFiles(data, true) // Validate files to be selected
+2 -6
View File
@@ -11,7 +11,6 @@ import (
"github.com/sirrobot01/debrid-blackhole/internal/request"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/torrent"
"log"
"mime/multipart"
"net/http"
gourl "net/url"
@@ -35,10 +34,6 @@ type Torbox struct {
CheckCached bool
}
func (tb *Torbox) GetMountPath() string {
return tb.MountPath
}
func (tb *Torbox) GetName() string {
return tb.Name
}
@@ -130,8 +125,9 @@ func (tb *Torbox) SubmitMagnet(torrent *torrent.Torrent) (*torrent.Torrent, erro
}
dt := *data.Data
torrentId := strconv.Itoa(dt.Id)
log.Printf("Torrent: %s added with id: %s", torrent.Name, torrentId)
torrent.Id = torrentId
torrent.MountPath = tb.MountPath
torrent.Debrid = tb.Name
return torrent, nil
}
+15 -16
View File
@@ -52,14 +52,13 @@ Loop:
func (q *QBit) ProcessManualFile(torrent *Torrent) (string, error) {
debridTorrent := torrent.DebridTorrent
q.logger.Info().Msgf("Downloading %d files...", len(debridTorrent.DownloadLinks))
torrentPath := common.RemoveExtension(debridTorrent.OriginalFilename)
parent := common.RemoveInvalidChars(filepath.Join(q.DownloadFolder, debridTorrent.Arr.Name, torrentPath))
err := os.MkdirAll(parent, os.ModePerm)
torrentPath := common.RemoveInvalidChars(filepath.Join(q.DownloadFolder, debridTorrent.Arr.Name, common.RemoveExtension(debridTorrent.OriginalFilename)))
err := os.MkdirAll(torrentPath, os.ModePerm)
if err != nil {
// add previous error to the error and return
return "", fmt.Errorf("failed to create directory: %s: %v", parent, err)
return "", fmt.Errorf("failed to create directory: %s: %v", torrentPath, err)
}
q.downloadFiles(torrent, parent)
q.downloadFiles(torrent, torrentPath)
return torrentPath, nil
}
@@ -140,25 +139,26 @@ func (q *QBit) ProcessSymlink(torrent *Torrent) (string, error) {
if len(files) == 0 {
return "", fmt.Errorf("no video files found")
}
q.logger.Info().Msgf("Checking %d files...", len(files))
q.logger.Info().Msgf("Checking symlinks for %d files...", len(files))
rCloneBase := debridTorrent.MountPath
torrentPath, err := q.getTorrentPath(rCloneBase, debridTorrent) // /MyTVShow/
// This returns filename.ext for alldebrid instead of the parent folder filename/
torrentFolder := torrentPath
if err != nil {
return "", fmt.Errorf("failed to get torrent path: %v", err)
}
// Fix for alldebrid
newTorrentPath := torrentPath
if newTorrentPath == "" {
// Alldebrid at times doesn't return the parent folder for single file torrents
newTorrentPath = common.RemoveExtension(debridTorrent.Name) // MyTVShow
// Check if the torrent path is a file
torrentRclonePath := filepath.Join(rCloneBase, torrentPath) // leave it as is
if debridTorrent.Debrid == "alldebrid" && len(files) == 1 {
// Alldebrid hotfix for single file torrents
torrentFolder = common.RemoveExtension(torrentFolder)
torrentRclonePath = rCloneBase // /mnt/rclone/magnets/ // Remove the filename since it's in the root folder
}
torrentSymlinkPath := filepath.Join(q.DownloadFolder, debridTorrent.Arr.Name, newTorrentPath) // /mnt/symlinks/{category}/MyTVShow/
torrentSymlinkPath := filepath.Join(q.DownloadFolder, debridTorrent.Arr.Name, torrentFolder) // /mnt/symlinks/{category}/MyTVShow/
err = os.MkdirAll(torrentSymlinkPath, os.ModePerm)
if err != nil {
return "", fmt.Errorf("failed to create directory: %s: %v", torrentSymlinkPath, err)
}
torrentRclonePath := filepath.Join(rCloneBase, torrentPath) // leave it as is
q.logger.Debug().Msgf("Debrid torrent path: %s\nSymlink Path: %s", torrentRclonePath, torrentSymlinkPath)
for _, file := range files {
wg.Add(1)
go checkFileLoop(&wg, torrentRclonePath, file, ready)
@@ -173,12 +173,11 @@ func (q *QBit) ProcessSymlink(torrent *Torrent) (string, error) {
q.logger.Info().Msgf("File is ready: %s", f.Path)
q.createSymLink(torrentSymlinkPath, torrentRclonePath, f)
}
return torrentPath, nil
return torrentSymlinkPath, nil
}
func (q *QBit) getTorrentPath(rclonePath string, debridTorrent *debrid.Torrent) (string, error) {
for {
q.logger.Debug().Msgf("Checking for torrent path: %s", rclonePath)
torrentPath, err := debridTorrent.GetMountFolder(rclonePath)
if err == nil {
q.logger.Debug().Msgf("Found torrent path: %s", torrentPath)
+1
View File
@@ -74,6 +74,7 @@ func (i *ImportRequest) Process(q *QBit) (err error) {
torrent := CreateTorrentFromMagnet(magnet, i.Arr.Name, "manual")
debridTorrent, err := debrid.ProcessTorrent(svc.Debrid, magnet, i.Arr, i.IsSymlink)
if err != nil || debridTorrent == nil {
fmt.Println("Error deleting torrent: ", err)
if debridTorrent != nil {
dbClient := service.GetDebrid().GetByName(debridTorrent.Debrid)
go dbClient.DeleteTorrent(debridTorrent)
+4 -2
View File
@@ -6,6 +6,7 @@ import (
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/internal/logger"
"os"
"path/filepath"
)
type QBit struct {
@@ -22,7 +23,8 @@ type QBit struct {
}
func New() *QBit {
cfg := config.GetConfig().QBitTorrent
_cfg := config.GetConfig()
cfg := _cfg.QBitTorrent
port := cmp.Or(cfg.Port, os.Getenv("QBIT_PORT"), "8282")
refreshInterval := cmp.Or(cfg.RefreshInterval, 10)
return &QBit{
@@ -31,7 +33,7 @@ func New() *QBit {
Port: port,
DownloadFolder: cfg.DownloadFolder,
Categories: cfg.Categories,
Storage: NewTorrentStorage(cmp.Or(os.Getenv("TORRENT_FILE"), "/data/qbit_torrents.json")),
Storage: NewTorrentStorage(filepath.Join(_cfg.Path, "torrents.json")),
logger: logger.NewLogger("qbit", cfg.LogLevel, os.Stdout),
RefreshInterval: refreshInterval,
}
-4
View File
@@ -2,15 +2,11 @@ package qbit
import (
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"net/http"
)
func (q *QBit) Routes() http.Handler {
r := chi.NewRouter()
if q.logger.GetLevel().String() == "debug" {
r.Use(middleware.Logger)
}
r.Use(q.CategoryContext)
r.Post("/auth/login", q.handleLogin)
+7 -14
View File
@@ -90,14 +90,14 @@ func (q *QBit) ProcessFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr
torrent = q.UpdateTorrentMin(torrent, debridTorrent)
}
var (
torrentPath string
err error
torrentSymlinkPath string
err error
)
debridTorrent.Arr = arr
if isSymlink {
torrentPath, err = q.ProcessSymlink(torrent)
torrentSymlinkPath, err = q.ProcessSymlink(torrent) // /mnt/symlinks/{category}/MyTVShow/
} else {
torrentPath, err = q.ProcessManualFile(torrent)
torrentSymlinkPath, err = q.ProcessManualFile(torrent)
}
if err != nil {
q.MarkAsFailed(torrent)
@@ -105,7 +105,7 @@ func (q *QBit) ProcessFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr
q.logger.Info().Msgf("Error: %v", err)
return
}
torrent.TorrentPath = filepath.Base(torrentPath)
torrent.TorrentPath = torrentSymlinkPath
q.UpdateTorrent(torrent, debridTorrent)
_ = arr.Refresh()
}
@@ -161,7 +161,6 @@ func (q *QBit) UpdateTorrentMin(t *Torrent, debridTorrent *debrid.Torrent) *Torr
func (q *QBit) UpdateTorrent(t *Torrent, debridTorrent *debrid.Torrent) *Torrent {
_db := service.GetDebrid().GetByName(debridTorrent.Debrid)
rcLoneMount := _db.GetMountPath()
if debridTorrent == nil && t.ID != "" {
debridTorrent, _ = _db.GetTorrent(t.ID)
}
@@ -172,15 +171,9 @@ func (q *QBit) UpdateTorrent(t *Torrent, debridTorrent *debrid.Torrent) *Torrent
if debridTorrent.Status != "downloaded" {
debridTorrent, _ = _db.GetTorrent(t.ID)
}
if t.TorrentPath == "" {
tPath, _ := debridTorrent.GetMountFolder(rcLoneMount)
t.TorrentPath = filepath.Base(tPath)
}
savePath := filepath.Join(q.DownloadFolder, t.Category) + string(os.PathSeparator)
torrentPath := filepath.Join(savePath, t.TorrentPath) + string(os.PathSeparator)
t = q.UpdateTorrentMin(t, debridTorrent)
t.ContentPath = torrentPath
t.ContentPath = t.TorrentPath + string(os.PathSeparator)
t.SavePath = t.ContentPath
if t.IsReady() {
t.State = "pausedUP"
+6 -9
View File
@@ -3,17 +3,15 @@ package service
import (
"github.com/sirrobot01/debrid-blackhole/pkg/arr"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/cache"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/engine"
"github.com/sirrobot01/debrid-blackhole/pkg/repair"
"sync"
)
type Service struct {
Repair *repair.Repair
Arr *arr.Storage
Debrid *engine.Engine
DebridCache *cache.Manager
Repair *repair.Repair
Arr *arr.Storage
Debrid *engine.Engine
}
var (
@@ -26,10 +24,9 @@ func New() *Service {
arrs := arr.NewStorage()
deb := debrid.New()
instance = &Service{
Repair: repair.New(deb, arrs),
Arr: arrs,
Debrid: deb,
DebridCache: cache.NewManager(deb),
Repair: repair.New(deb, arrs),
Arr: arrs,
Debrid: deb,
}
})
return instance