Adds Support for Downloader
This commit is contained in:
@@ -10,8 +10,8 @@ import (
|
||||
|
||||
type Service interface {
|
||||
SubmitMagnet(torrent *Torrent) (*Torrent, error)
|
||||
CheckStatus(torrent *Torrent) (*Torrent, error)
|
||||
DownloadLink(torrent *Torrent) error
|
||||
CheckStatus(torrent *Torrent, isSymlink bool) (*Torrent, error)
|
||||
GetDownloadLinks(torrent *Torrent) error
|
||||
DeleteTorrent(torrent *Torrent)
|
||||
IsAvailable(infohashes []string) map[string]bool
|
||||
GetMountPath() string
|
||||
@@ -120,7 +120,7 @@ func GetLocalCache(infohashes []string, cache *common.Cache) ([]string, map[stri
|
||||
return hashes, result
|
||||
}
|
||||
|
||||
func ProcessQBitTorrent(d Service, magnet *common.Magnet, arr *Arr) (*Torrent, error) {
|
||||
func ProcessQBitTorrent(d Service, magnet *common.Magnet, arr *Arr, isSymlink bool) (*Torrent, error) {
|
||||
debridTorrent := &Torrent{
|
||||
InfoHash: magnet.InfoHash,
|
||||
Magnet: magnet,
|
||||
@@ -144,5 +144,5 @@ func ProcessQBitTorrent(d Service, magnet *common.Magnet, arr *Arr) (*Torrent, e
|
||||
logger.Printf("Error submitting magnet: %s", err)
|
||||
return nil, err
|
||||
}
|
||||
return d.CheckStatus(debridTorrent)
|
||||
return d.CheckStatus(debridTorrent, isSymlink)
|
||||
}
|
||||
|
||||
@@ -40,7 +40,9 @@ func GetTorrentFiles(data structs.RealDebridTorrentInfo) []TorrentFile {
|
||||
files := make([]TorrentFile, 0)
|
||||
for _, f := range data.Files {
|
||||
name := filepath.Base(f.Path)
|
||||
if (!common.RegexMatch(common.VIDEOMATCH, name) && !common.RegexMatch(common.SUBMATCH, name)) || common.RegexMatch(common.SAMPLEMATCH, name) {
|
||||
if (!common.RegexMatch(common.VIDEOMATCH, name) &&
|
||||
!common.RegexMatch(common.SUBMATCH, name) &&
|
||||
!common.RegexMatch(common.MUSICMATCH, name)) || common.RegexMatch(common.SAMPLEMATCH, name) {
|
||||
continue
|
||||
}
|
||||
fileId := f.ID
|
||||
@@ -149,12 +151,13 @@ func (r *RealDebrid) GetTorrent(id string) (*Torrent, error) {
|
||||
torrent.Seeders = data.Seeders
|
||||
torrent.Filename = data.Filename
|
||||
torrent.OriginalFilename = data.OriginalFilename
|
||||
torrent.Links = data.Links
|
||||
files := GetTorrentFiles(data)
|
||||
torrent.Files = files
|
||||
return torrent, nil
|
||||
}
|
||||
|
||||
func (r *RealDebrid) CheckStatus(torrent *Torrent) (*Torrent, error) {
|
||||
func (r *RealDebrid) CheckStatus(torrent *Torrent, isSymlink bool) (*Torrent, error) {
|
||||
url := fmt.Sprintf("%s/torrents/info/%s", r.Host, torrent.Id)
|
||||
for {
|
||||
resp, err := r.client.MakeRequest(http.MethodGet, url, nil)
|
||||
@@ -174,6 +177,8 @@ func (r *RealDebrid) CheckStatus(torrent *Torrent) (*Torrent, error) {
|
||||
torrent.Progress = data.Progress
|
||||
torrent.Speed = data.Speed
|
||||
torrent.Seeders = data.Seeders
|
||||
torrent.Links = data.Links
|
||||
torrent.Status = status
|
||||
if status == "error" || status == "dead" || status == "magnet_error" {
|
||||
return torrent, fmt.Errorf("torrent: %s has error", torrent.Name)
|
||||
} else if status == "waiting_files_selection" {
|
||||
@@ -195,11 +200,16 @@ func (r *RealDebrid) CheckStatus(torrent *Torrent) (*Torrent, error) {
|
||||
return torrent, err
|
||||
}
|
||||
} else if status == "downloaded" {
|
||||
log.Printf("Torrent: %s downloaded\n", torrent.Name)
|
||||
err = r.DownloadLink(torrent)
|
||||
if err != nil {
|
||||
return torrent, err
|
||||
files := GetTorrentFiles(data)
|
||||
torrent.Files = files
|
||||
log.Printf("Torrent: %s downloaded to RD\n", torrent.Name)
|
||||
if !isSymlink {
|
||||
err = r.GetDownloadLinks(torrent)
|
||||
if err != nil {
|
||||
return torrent, err
|
||||
}
|
||||
}
|
||||
|
||||
break
|
||||
} else if status == "downloading" {
|
||||
if !r.DownloadUncached {
|
||||
@@ -225,7 +235,32 @@ func (r *RealDebrid) DeleteTorrent(torrent *Torrent) {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RealDebrid) DownloadLink(torrent *Torrent) error {
|
||||
func (r *RealDebrid) GetDownloadLinks(torrent *Torrent) error {
|
||||
url := fmt.Sprintf("%s/unrestrict/link/", r.Host)
|
||||
downloadLinks := make([]TorrentDownloadLinks, 0)
|
||||
for _, link := range torrent.Links {
|
||||
if link == "" {
|
||||
continue
|
||||
}
|
||||
payload := gourl.Values{
|
||||
"link": {link},
|
||||
}
|
||||
resp, err := r.client.MakeRequest(http.MethodPost, url, strings.NewReader(payload.Encode()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var data structs.RealDebridUnrestrictResponse
|
||||
if err = json.Unmarshal(resp, &data); err != nil {
|
||||
return err
|
||||
}
|
||||
download := TorrentDownloadLinks{
|
||||
Link: data.Link,
|
||||
Filename: data.Filename,
|
||||
DownloadLink: data.Download,
|
||||
}
|
||||
downloadLinks = append(downloadLinks, download)
|
||||
}
|
||||
torrent.DownloadLinks = downloadLinks
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -93,4 +93,15 @@ type RealDebridTorrentInfo struct {
|
||||
Seeders int `json:"seeders,omitempty"`
|
||||
}
|
||||
|
||||
// 5e6e2e77fd3921a7903a41336c844cc409bf8788/14527C07BDFDDFC642963238BB6E7507B9742947/66A1CD1A5C7F4014877A51AC2620E857E3BB4D16
|
||||
type RealDebridUnrestrictResponse struct {
|
||||
Id string `json:"id"`
|
||||
Filename string `json:"filename"`
|
||||
MimeType string `json:"mimeType"`
|
||||
Filesize int64 `json:"filesize"`
|
||||
Link string `json:"link"`
|
||||
Host string `json:"host"`
|
||||
Chunks int64 `json:"chunks"`
|
||||
Crc int64 `json:"crc"`
|
||||
Download string `json:"download"`
|
||||
Streamable int `json:"streamable"`
|
||||
}
|
||||
|
||||
+22
-14
@@ -25,25 +25,33 @@ type ArrHistorySchema struct {
|
||||
}
|
||||
|
||||
type Torrent struct {
|
||||
Id string `json:"id"`
|
||||
InfoHash string `json:"info_hash"`
|
||||
Name string `json:"name"`
|
||||
Folder string `json:"folder"`
|
||||
Filename string `json:"filename"`
|
||||
OriginalFilename string `json:"original_filename"`
|
||||
Size int64 `json:"size"`
|
||||
Bytes int64 `json:"bytes"` // Size of only the files that are downloaded
|
||||
Magnet *common.Magnet `json:"magnet"`
|
||||
Files []TorrentFile `json:"files"`
|
||||
Status string `json:"status"`
|
||||
Progress float64 `json:"progress"`
|
||||
Speed int64 `json:"speed"`
|
||||
Seeders int `json:"seeders"`
|
||||
Id string `json:"id"`
|
||||
InfoHash string `json:"info_hash"`
|
||||
Name string `json:"name"`
|
||||
Folder string `json:"folder"`
|
||||
Filename string `json:"filename"`
|
||||
OriginalFilename string `json:"original_filename"`
|
||||
Size int64 `json:"size"`
|
||||
Bytes int64 `json:"bytes"` // Size of only the files that are downloaded
|
||||
Magnet *common.Magnet `json:"magnet"`
|
||||
Files []TorrentFile `json:"files"`
|
||||
Status string `json:"status"`
|
||||
Progress float64 `json:"progress"`
|
||||
Speed int64 `json:"speed"`
|
||||
Seeders int `json:"seeders"`
|
||||
Links []string `json:"links"`
|
||||
DownloadLinks []TorrentDownloadLinks `json:"download_links"`
|
||||
|
||||
Debrid *Debrid
|
||||
Arr *Arr
|
||||
}
|
||||
|
||||
type TorrentDownloadLinks struct {
|
||||
Filename string `json:"filename"`
|
||||
Link string `json:"link"`
|
||||
DownloadLink string `json:"download_link"`
|
||||
}
|
||||
|
||||
func (t *Torrent) GetSymlinkFolder(parent string) string {
|
||||
return filepath.Join(parent, t.Arr.Name, t.Folder)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,162 @@
|
||||
package qbit
|
||||
|
||||
import (
|
||||
"github.com/cavaliergopher/grab/v3"
|
||||
"goBlack/common"
|
||||
"goBlack/pkg/debrid"
|
||||
"goBlack/pkg/qbit/downloaders"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (q *QBit) processManualFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr *debrid.Arr) {
|
||||
q.logger.Printf("Downloading %d files...", len(debridTorrent.DownloadLinks))
|
||||
parent := common.RemoveInvalidChars(filepath.Join(q.DownloadFolder, debridTorrent.Arr.Name, torrent.Name))
|
||||
err := os.MkdirAll(parent, os.ModePerm)
|
||||
if err != nil {
|
||||
q.logger.Printf("Failed to create directory: %s\n", parent)
|
||||
q.MarkAsFailed(torrent)
|
||||
return
|
||||
}
|
||||
q.downloadFiles(debridTorrent, parent)
|
||||
q.UpdateTorrent(torrent, debridTorrent)
|
||||
q.RefreshArr(arr)
|
||||
}
|
||||
|
||||
func (q *QBit) downloadFile(client *grab.Client, link debrid.TorrentDownloadLinks, parent string, wg *sync.WaitGroup, semaphore chan struct{}) {
|
||||
url := link.DownloadLink
|
||||
defer wg.Done()
|
||||
defer func() { <-semaphore }()
|
||||
req, _ := grab.NewRequest(parent, url)
|
||||
resp := client.Do(req)
|
||||
//t := time.NewTicker(5 * time.Second)
|
||||
//defer t.Stop()
|
||||
//Loop:
|
||||
// for {
|
||||
// select {
|
||||
// case <-t.C:
|
||||
// fmt.Printf(" %s: transferred %d / %d bytes (%.2f%%)\n",
|
||||
// resp.Filename,
|
||||
// resp.BytesComplete(),
|
||||
// resp.Size,
|
||||
// 100*resp.Progress())
|
||||
//
|
||||
// case <-resp.Done:
|
||||
// // download is complete
|
||||
// break Loop
|
||||
// }
|
||||
// }
|
||||
|
||||
// Check for errors
|
||||
if err := resp.Err(); err != nil {
|
||||
q.logger.Printf("Error downloading %v: %v\n", url, err)
|
||||
return
|
||||
}
|
||||
q.logger.Printf("Downloaded %s successfully\n", link.DownloadLink)
|
||||
}
|
||||
|
||||
func (q *QBit) downloadFiles(debridTorrent *debrid.Torrent, parent string) {
|
||||
var wg sync.WaitGroup
|
||||
client := downloaders.GetFastHTTPClient()
|
||||
for _, link := range debridTorrent.DownloadLinks {
|
||||
if link.DownloadLink == "" {
|
||||
q.logger.Printf("No download link found for %s\n", link.Filename)
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(link debrid.TorrentDownloadLinks) {
|
||||
defer wg.Done()
|
||||
err := downloaders.NormalFastHTTP(client, link.DownloadLink, filepath.Join(parent, link.Filename))
|
||||
if err != nil {
|
||||
q.logger.Printf("Error downloading %s: %v\n", link.DownloadLink, err)
|
||||
} else {
|
||||
q.logger.Printf("Downloaded %s successfully\n", link.DownloadLink)
|
||||
}
|
||||
}(link)
|
||||
}
|
||||
wg.Wait()
|
||||
q.logger.Printf("Downloaded all files for %s\n", debridTorrent.Name)
|
||||
}
|
||||
|
||||
func (q *QBit) processSymlink(torrent *Torrent, debridTorrent *debrid.Torrent, arr *debrid.Arr) {
|
||||
var wg sync.WaitGroup
|
||||
files := debridTorrent.Files
|
||||
ready := make(chan debrid.TorrentFile, len(files))
|
||||
|
||||
q.logger.Printf("Checking %d files...", len(files))
|
||||
rCloneBase := q.debrid.GetMountPath()
|
||||
torrentPath, err := q.getTorrentPath(rCloneBase, debridTorrent) // /MyTVShow/
|
||||
if err != nil {
|
||||
q.MarkAsFailed(torrent)
|
||||
q.logger.Printf("Error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
torrentSymlinkPath := filepath.Join(q.DownloadFolder, debridTorrent.Arr.Name, torrentPath) // /mnt/symlinks/{category}/MyTVShow/
|
||||
err = os.MkdirAll(torrentSymlinkPath, os.ModePerm)
|
||||
if err != nil {
|
||||
q.logger.Printf("Failed to create directory: %s\n", torrentSymlinkPath)
|
||||
q.MarkAsFailed(torrent)
|
||||
return
|
||||
}
|
||||
torrentRclonePath := filepath.Join(rCloneBase, torrentPath)
|
||||
for _, file := range files {
|
||||
wg.Add(1)
|
||||
go checkFileLoop(&wg, torrentRclonePath, file, ready)
|
||||
}
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(ready)
|
||||
}()
|
||||
|
||||
for f := range ready {
|
||||
q.logger.Println("File is ready:", f.Path)
|
||||
q.createSymLink(torrentSymlinkPath, torrentRclonePath, f)
|
||||
}
|
||||
// Update the torrent when all files are ready
|
||||
torrent.TorrentPath = filepath.Base(torrentPath) // Quite important
|
||||
q.UpdateTorrent(torrent, debridTorrent)
|
||||
q.RefreshArr(arr)
|
||||
}
|
||||
|
||||
func (q *QBit) getTorrentPath(rclonePath string, debridTorrent *debrid.Torrent) (string, error) {
|
||||
pathChan := make(chan string)
|
||||
errChan := make(chan error)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
torrentPath := debridTorrent.GetMountFolder(rclonePath)
|
||||
if torrentPath != "" {
|
||||
pathChan <- torrentPath
|
||||
return
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case path := <-pathChan:
|
||||
return path, nil
|
||||
case err := <-errChan:
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
||||
func (q *QBit) createSymLink(path string, torrentMountPath string, file debrid.TorrentFile) {
|
||||
|
||||
// Combine the directory and filename to form a full path
|
||||
fullPath := filepath.Join(path, file.Name) // /mnt/symlinks/{category}/MyTVShow/MyTVShow.S01E01.720p.mkv
|
||||
// Create a symbolic link if file doesn't exist
|
||||
torrentFilePath := filepath.Join(torrentMountPath, file.Name) // debridFolder/MyTVShow/MyTVShow.S01E01.720p.mkv
|
||||
err := os.Symlink(torrentFilePath, fullPath)
|
||||
if err != nil {
|
||||
q.logger.Printf("Failed to create symlink: %s\n", fullPath)
|
||||
}
|
||||
// Check if the file exists
|
||||
if !common.FileReady(fullPath) {
|
||||
q.logger.Printf("Symlink not ready: %s\n", fullPath)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package downloaders
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"github.com/valyala/fasthttp"
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
func GetFastHTTPClient() *fasthttp.Client {
|
||||
return &fasthttp.Client{
|
||||
TLSConfig: &tls.Config{InsecureSkipVerify: true},
|
||||
}
|
||||
}
|
||||
|
||||
func NormalFastHTTP(client *fasthttp.Client, url, filename string) error {
|
||||
req := fasthttp.AcquireRequest()
|
||||
resp := fasthttp.AcquireResponse()
|
||||
defer fasthttp.ReleaseRequest(req)
|
||||
defer fasthttp.ReleaseResponse(resp)
|
||||
|
||||
req.SetRequestURI(url)
|
||||
req.Header.SetMethod(fasthttp.MethodGet)
|
||||
|
||||
if err := client.Do(req, resp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check the response status code
|
||||
if resp.StatusCode() != fasthttp.StatusOK {
|
||||
return fmt.Errorf("unexpected status code: %d", resp.StatusCode())
|
||||
}
|
||||
file, err := os.Create(filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
bodyStream := resp.BodyStream()
|
||||
if bodyStream == nil {
|
||||
return fmt.Errorf("bodyStream is nil")
|
||||
}
|
||||
defer func() {
|
||||
if rc, ok := bodyStream.(io.Closer); ok {
|
||||
rc.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
if _, err := io.Copy(file, bodyStream); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
package downloaders
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
)
|
||||
|
||||
func GetHTTPClient() *http.Client {
|
||||
tr := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||||
}
|
||||
return &http.Client{Transport: tr}
|
||||
}
|
||||
|
||||
func NormalHTTP(client *http.Client, url, filename string) error {
|
||||
file, err := os.Create(filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Send the HTTP GET request
|
||||
resp, err := client.Get(url)
|
||||
if err != nil {
|
||||
fmt.Println("Error downloading file:", err)
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Check server response
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("server returned non-200 status: %d %s", resp.StatusCode, resp.Status)
|
||||
}
|
||||
|
||||
// Write the response body to file
|
||||
_, err = io.Copy(file, resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package qbit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
@@ -36,6 +37,8 @@ func (q *QBit) handleTorrentsAdd(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
isSymlink := strings.ToLower(r.FormValue("sequentialDownload")) != "true"
|
||||
q.logger.Printf("isSymlink: %v\n", isSymlink)
|
||||
urls := r.FormValue("urls")
|
||||
category := r.FormValue("category")
|
||||
|
||||
@@ -44,6 +47,8 @@ func (q *QBit) handleTorrentsAdd(w http.ResponseWriter, r *http.Request) {
|
||||
urlList = strings.Split(urls, "\n")
|
||||
}
|
||||
|
||||
ctx = context.WithValue(ctx, "isSymlink", isSymlink)
|
||||
|
||||
for _, url := range urlList {
|
||||
if err := q.AddMagnet(ctx, url, category); err != nil {
|
||||
q.logger.Printf("Error adding magnet: %v\n", err)
|
||||
|
||||
+3
-2
@@ -11,6 +11,7 @@ import (
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -34,7 +35,7 @@ type QBit struct {
|
||||
storage *TorrentStorage
|
||||
debug bool
|
||||
logger *log.Logger
|
||||
arrs map[string]string // host:token (Used for refreshing in worker)
|
||||
arrs sync.Map // host:token (Used for refreshing in worker)
|
||||
RefreshInterval int
|
||||
}
|
||||
|
||||
@@ -54,7 +55,7 @@ func NewQBit(config *common.Config, deb debrid.Service, cache *common.Cache) *QB
|
||||
debug: cfg.Debug,
|
||||
storage: storage,
|
||||
logger: common.NewLogger("QBit", os.Stdout),
|
||||
arrs: make(map[string]string),
|
||||
arrs: sync.Map{},
|
||||
RefreshInterval: refreshInterval,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ func (q *QBit) authContext(next http.Handler) http.Handler {
|
||||
if err == nil {
|
||||
ctx = context.WithValue(r.Context(), "host", host)
|
||||
ctx = context.WithValue(ctx, "token", token)
|
||||
q.arrs[host] = token
|
||||
q.arrs.Store(host, token)
|
||||
next.ServeHTTP(w, r.WithContext(ctx))
|
||||
return
|
||||
}
|
||||
|
||||
+19
-82
@@ -8,10 +8,7 @@ import (
|
||||
"goBlack/pkg/debrid"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -53,7 +50,8 @@ func (q *QBit) Process(ctx context.Context, magnet *common.Magnet, category stri
|
||||
Token: ctx.Value("token").(string),
|
||||
Host: ctx.Value("host").(string),
|
||||
}
|
||||
debridTorrent, err := debrid.ProcessQBitTorrent(q.debrid, magnet, arr)
|
||||
isSymlink := ctx.Value("isSymlink").(bool)
|
||||
debridTorrent, err := debrid.ProcessQBitTorrent(q.debrid, magnet, arr, isSymlink)
|
||||
if err != nil || debridTorrent == nil {
|
||||
if err == nil {
|
||||
err = fmt.Errorf("failed to process torrent")
|
||||
@@ -64,7 +62,7 @@ func (q *QBit) Process(ctx context.Context, magnet *common.Magnet, category stri
|
||||
torrent.DebridTorrent = debridTorrent
|
||||
torrent.Name = debridTorrent.Name
|
||||
q.storage.AddOrUpdate(torrent)
|
||||
go q.processFiles(torrent, debridTorrent, arr) // We can send async for file processing not to delay the response
|
||||
go q.processFiles(torrent, debridTorrent, arr, isSymlink) // We can send async for file processing not to delay the response
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -97,83 +95,22 @@ func (q *QBit) CreateTorrentFromMagnet(magnet *common.Magnet, category string) *
|
||||
return torrent
|
||||
}
|
||||
|
||||
func (q *QBit) processFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr *debrid.Arr) {
|
||||
var wg sync.WaitGroup
|
||||
files := debridTorrent.Files
|
||||
ready := make(chan debrid.TorrentFile, len(files))
|
||||
|
||||
q.logger.Printf("Checking %d files...", len(files))
|
||||
rCloneBase := q.debrid.GetMountPath()
|
||||
torrentPath, err := q.getTorrentPath(rCloneBase, debridTorrent) // /MyTVShow/
|
||||
if err != nil {
|
||||
q.MarkAsFailed(torrent)
|
||||
q.logger.Printf("Error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
torrentSymlinkPath := filepath.Join(q.DownloadFolder, debridTorrent.Arr.Name, torrentPath) // /mnt/symlinks/{category}/MyTVShow/
|
||||
err = os.MkdirAll(torrentSymlinkPath, os.ModePerm)
|
||||
if err != nil {
|
||||
q.logger.Printf("Failed to create directory: %s\n", torrentSymlinkPath)
|
||||
q.MarkAsFailed(torrent)
|
||||
return
|
||||
}
|
||||
torrentRclonePath := filepath.Join(rCloneBase, torrentPath)
|
||||
for _, file := range files {
|
||||
wg.Add(1)
|
||||
go checkFileLoop(&wg, torrentRclonePath, file, ready)
|
||||
}
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(ready)
|
||||
}()
|
||||
|
||||
for f := range ready {
|
||||
q.logger.Println("File is ready:", f.Path)
|
||||
q.createSymLink(torrentSymlinkPath, torrentRclonePath, f)
|
||||
}
|
||||
// Update the torrent when all files are ready
|
||||
torrent.TorrentPath = filepath.Base(torrentPath) // Quite important
|
||||
q.UpdateTorrent(torrent, debridTorrent)
|
||||
q.RefreshArr(arr)
|
||||
}
|
||||
|
||||
func (q *QBit) getTorrentPath(rclonePath string, debridTorrent *debrid.Torrent) (string, error) {
|
||||
pathChan := make(chan string)
|
||||
errChan := make(chan error)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
torrentPath := debridTorrent.GetMountFolder(rclonePath)
|
||||
if torrentPath != "" {
|
||||
pathChan <- torrentPath
|
||||
return
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
func (q *QBit) processFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr *debrid.Arr, isSymlink bool) {
|
||||
for debridTorrent.Status != "downloaded" {
|
||||
progress := debridTorrent.Progress
|
||||
q.logger.Printf("Progress: %.2f%%", progress)
|
||||
time.Sleep(5 * time.Second)
|
||||
dbT, err := q.debrid.CheckStatus(debridTorrent, isSymlink)
|
||||
if err != nil {
|
||||
q.logger.Printf("Error checking status: %v", err)
|
||||
q.MarkAsFailed(torrent)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case path := <-pathChan:
|
||||
return path, nil
|
||||
case err := <-errChan:
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
||||
func (q *QBit) createSymLink(path string, torrentMountPath string, file debrid.TorrentFile) {
|
||||
|
||||
// Combine the directory and filename to form a full path
|
||||
fullPath := filepath.Join(path, file.Name) // /mnt/symlinks/{category}/MyTVShow/MyTVShow.S01E01.720p.mkv
|
||||
// Create a symbolic link if file doesn't exist
|
||||
torrentFilePath := filepath.Join(torrentMountPath, file.Name) // debridFolder/MyTVShow/MyTVShow.S01E01.720p.mkv
|
||||
err := os.Symlink(torrentFilePath, fullPath)
|
||||
if err != nil {
|
||||
q.logger.Printf("Failed to create symlink: %s\n", fullPath)
|
||||
}
|
||||
// Check if the file exists
|
||||
if !common.FileReady(fullPath) {
|
||||
q.logger.Printf("Symlink not ready: %s\n", fullPath)
|
||||
debridTorrent = dbT
|
||||
}
|
||||
if isSymlink {
|
||||
q.processSymlink(torrent, debridTorrent, arr)
|
||||
} else {
|
||||
q.processManualFiles(torrent, debridTorrent, arr)
|
||||
}
|
||||
}
|
||||
|
||||
+9
-2
@@ -30,12 +30,19 @@ func (q *QBit) RefreshArrs() {
|
||||
if len(torrents) == 0 {
|
||||
return
|
||||
}
|
||||
for host, token := range q.arrs {
|
||||
|
||||
q.arrs.Range(func(key, value interface{}) bool {
|
||||
host, ok := key.(string)
|
||||
token, ok2 := value.(string)
|
||||
if !ok || !ok2 {
|
||||
return true
|
||||
}
|
||||
arr := &debrid.Arr{
|
||||
Name: "",
|
||||
Token: token,
|
||||
Host: host,
|
||||
}
|
||||
q.RefreshArr(arr)
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user