Add more rclone flags, fix minor issues
This commit is contained in:
@@ -0,0 +1,317 @@
|
||||
package wire
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/sirrobot01/decypharr/pkg/debrid/types"
|
||||
|
||||
"github.com/cavaliergopher/grab/v3"
|
||||
"github.com/sirrobot01/decypharr/internal/utils"
|
||||
)
|
||||
|
||||
func grabber(client *grab.Client, url, filename string, byterange *[2]int64, progressCallback func(int64, int64)) error {
|
||||
req, err := grab.NewRequest(filename, url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Set byte range if specified
|
||||
if byterange != nil {
|
||||
byterangeStr := fmt.Sprintf("%d-%d", byterange[0], byterange[1])
|
||||
req.HTTPRequest.Header.Set("Range", "bytes="+byterangeStr)
|
||||
}
|
||||
|
||||
resp := client.Do(req)
|
||||
|
||||
t := time.NewTicker(time.Second * 2)
|
||||
defer t.Stop()
|
||||
|
||||
var lastReported int64
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
current := resp.BytesComplete()
|
||||
speed := int64(resp.BytesPerSecond())
|
||||
if current != lastReported {
|
||||
if progressCallback != nil {
|
||||
progressCallback(current-lastReported, speed)
|
||||
}
|
||||
lastReported = current
|
||||
}
|
||||
case <-resp.Done:
|
||||
break Loop
|
||||
}
|
||||
}
|
||||
|
||||
// Report final bytes
|
||||
if progressCallback != nil {
|
||||
progressCallback(resp.BytesComplete()-lastReported, 0)
|
||||
}
|
||||
|
||||
return resp.Err()
|
||||
}
|
||||
|
||||
func (s *Store) processDownload(torrent *Torrent, debridTorrent *types.Torrent) (string, error) {
|
||||
s.logger.Info().Msgf("Downloading %d files...", len(debridTorrent.Files))
|
||||
torrentPath := filepath.Join(torrent.SavePath, utils.RemoveExtension(debridTorrent.OriginalFilename))
|
||||
torrentPath = utils.RemoveInvalidChars(torrentPath)
|
||||
err := os.MkdirAll(torrentPath, os.ModePerm)
|
||||
if err != nil {
|
||||
// add the previous error to the error and return
|
||||
return "", fmt.Errorf("failed to create directory: %s: %v", torrentPath, err)
|
||||
}
|
||||
s.downloadFiles(torrent, debridTorrent, torrentPath)
|
||||
return torrentPath, nil
|
||||
}
|
||||
|
||||
func (s *Store) downloadFiles(torrent *Torrent, debridTorrent *types.Torrent, parent string) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
totalSize := int64(0)
|
||||
for _, file := range debridTorrent.GetFiles() {
|
||||
totalSize += file.Size
|
||||
}
|
||||
debridTorrent.Lock()
|
||||
debridTorrent.SizeDownloaded = 0 // Reset downloaded bytes
|
||||
debridTorrent.Progress = 0 // Reset progress
|
||||
debridTorrent.Unlock()
|
||||
progressCallback := func(downloaded int64, speed int64) {
|
||||
debridTorrent.Lock()
|
||||
defer debridTorrent.Unlock()
|
||||
torrent.Lock()
|
||||
defer torrent.Unlock()
|
||||
|
||||
// Update total downloaded bytes
|
||||
debridTorrent.SizeDownloaded += downloaded
|
||||
debridTorrent.Speed = speed
|
||||
|
||||
// Calculate overall progress
|
||||
if totalSize > 0 {
|
||||
debridTorrent.Progress = float64(debridTorrent.SizeDownloaded) / float64(totalSize) * 100
|
||||
}
|
||||
s.partialTorrentUpdate(torrent, debridTorrent)
|
||||
}
|
||||
client := &grab.Client{
|
||||
UserAgent: "Decypharr[QBitTorrent]",
|
||||
HTTPClient: &http.Client{
|
||||
Transport: &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
},
|
||||
},
|
||||
}
|
||||
errChan := make(chan error, len(debridTorrent.Files))
|
||||
for _, file := range debridTorrent.GetFiles() {
|
||||
if file.DownloadLink == nil {
|
||||
s.logger.Info().Msgf("No download link found for %s", file.Name)
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
s.downloadSemaphore <- struct{}{}
|
||||
go func(file types.File) {
|
||||
defer wg.Done()
|
||||
defer func() { <-s.downloadSemaphore }()
|
||||
filename := file.Name
|
||||
|
||||
err := grabber(
|
||||
client,
|
||||
file.DownloadLink.DownloadLink,
|
||||
filepath.Join(parent, filename),
|
||||
file.ByteRange,
|
||||
progressCallback,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
s.logger.Error().Msgf("Failed to download %s: %v", filename, err)
|
||||
errChan <- err
|
||||
} else {
|
||||
s.logger.Info().Msgf("Downloaded %s", filename)
|
||||
}
|
||||
}(file)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
close(errChan)
|
||||
var errors []error
|
||||
for err := range errChan {
|
||||
if err != nil {
|
||||
errors = append(errors, err)
|
||||
}
|
||||
}
|
||||
if len(errors) > 0 {
|
||||
s.logger.Error().Msgf("Errors occurred during download: %v", errors)
|
||||
return
|
||||
}
|
||||
s.logger.Info().Msgf("Downloaded all files for %s", debridTorrent.Name)
|
||||
}
|
||||
|
||||
func (s *Store) processSymlink(torrent *Torrent, debridTorrent *types.Torrent) (string, error) {
|
||||
files := debridTorrent.Files
|
||||
if len(files) == 0 {
|
||||
return "", fmt.Errorf("no valid files found")
|
||||
}
|
||||
s.logger.Info().Msgf("Checking symlinks for %d files...", len(files))
|
||||
rCloneBase := debridTorrent.MountPath
|
||||
torrentPath, err := s.getTorrentPath(rCloneBase, debridTorrent) // /MyTVShow/
|
||||
// This returns filename.ext for alldebrid instead of the parent folder filename/
|
||||
torrentFolder := torrentPath
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to get torrent path: %v", err)
|
||||
}
|
||||
// Check if the torrent path is a file
|
||||
torrentRclonePath := filepath.Join(rCloneBase, torrentPath) // leave it as is
|
||||
if debridTorrent.Debrid == "alldebrid" && utils.IsMediaFile(torrentPath) {
|
||||
// Alldebrid hotfix for single file torrents
|
||||
torrentFolder = utils.RemoveExtension(torrentFolder)
|
||||
torrentRclonePath = rCloneBase // /mnt/rclone/magnets/ // Remove the filename since it's in the root folder
|
||||
}
|
||||
torrentSymlinkPath := filepath.Join(torrent.SavePath, torrentFolder) // /mnt/symlinks/{category}/MyTVShow/
|
||||
err = os.MkdirAll(torrentSymlinkPath, os.ModePerm)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create directory: %s: %v", torrentSymlinkPath, err)
|
||||
}
|
||||
|
||||
realPaths := make(map[string]string)
|
||||
err = filepath.WalkDir(torrentRclonePath, func(path string, d os.DirEntry, err error) error {
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
if !d.IsDir() {
|
||||
filename := d.Name()
|
||||
rel, _ := filepath.Rel(torrentRclonePath, path)
|
||||
realPaths[filename] = rel
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
s.logger.Warn().Msgf("Error while scanning rclone path: %v", err)
|
||||
}
|
||||
|
||||
pending := make(map[string]types.File)
|
||||
for _, file := range files {
|
||||
if realRelPath, ok := realPaths[file.Name]; ok {
|
||||
file.Path = realRelPath
|
||||
}
|
||||
pending[file.Path] = file
|
||||
}
|
||||
ticker := time.NewTicker(200 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
timeout := time.After(30 * time.Minute)
|
||||
filePaths := make([]string, 0, len(pending))
|
||||
|
||||
for len(pending) > 0 {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
for path, file := range pending {
|
||||
fullFilePath := filepath.Join(torrentRclonePath, file.Path)
|
||||
if _, err := os.Stat(fullFilePath); !os.IsNotExist(err) {
|
||||
fileSymlinkPath := filepath.Join(torrentSymlinkPath, file.Name)
|
||||
if err := os.Symlink(fullFilePath, fileSymlinkPath); err != nil && !os.IsExist(err) {
|
||||
s.logger.Warn().Msgf("Failed to create symlink: %s: %v", fileSymlinkPath, err)
|
||||
} else {
|
||||
filePaths = append(filePaths, fileSymlinkPath)
|
||||
delete(pending, path)
|
||||
s.logger.Info().Msgf("File is ready: %s", file.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
case <-timeout:
|
||||
s.logger.Warn().Msgf("Timeout waiting for files, %d files still pending", len(pending))
|
||||
return torrentSymlinkPath, fmt.Errorf("timeout waiting for files: %d files still pending", len(pending))
|
||||
}
|
||||
}
|
||||
if s.skipPreCache {
|
||||
return torrentSymlinkPath, nil
|
||||
}
|
||||
|
||||
go func() {
|
||||
s.logger.Debug().Msgf("Pre-caching %s", debridTorrent.Name)
|
||||
if err := utils.PreCacheFile(filePaths); err != nil {
|
||||
s.logger.Error().Msgf("Failed to pre-cache file: %s", err)
|
||||
} else {
|
||||
s.logger.Trace().Msgf("Pre-cached %d files", len(filePaths))
|
||||
}
|
||||
}()
|
||||
return torrentSymlinkPath, nil
|
||||
}
|
||||
|
||||
func (s *Store) createSymlinksWebdav(torrent *Torrent, debridTorrent *types.Torrent, rclonePath, torrentFolder string) (string, error) {
|
||||
files := debridTorrent.Files
|
||||
symlinkPath := filepath.Join(torrent.SavePath, torrentFolder) // /mnt/symlinks/{category}/MyTVShow/
|
||||
err := os.MkdirAll(symlinkPath, os.ModePerm)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create directory: %s: %v", symlinkPath, err)
|
||||
}
|
||||
|
||||
remainingFiles := make(map[string]types.File)
|
||||
for _, file := range files {
|
||||
remainingFiles[file.Name] = file
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(100 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
timeout := time.After(30 * time.Minute)
|
||||
filePaths := make([]string, 0, len(files))
|
||||
|
||||
for len(remainingFiles) > 0 {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
entries, err := os.ReadDir(rclonePath)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check which files exist in this batch
|
||||
for _, entry := range entries {
|
||||
filename := entry.Name()
|
||||
if file, exists := remainingFiles[filename]; exists {
|
||||
fullFilePath := filepath.Join(rclonePath, filename)
|
||||
fileSymlinkPath := filepath.Join(symlinkPath, file.Name)
|
||||
|
||||
if err := os.Symlink(fullFilePath, fileSymlinkPath); err != nil && !os.IsExist(err) {
|
||||
s.logger.Debug().Msgf("Failed to create symlink: %s: %v", fileSymlinkPath, err)
|
||||
} else {
|
||||
filePaths = append(filePaths, fileSymlinkPath)
|
||||
delete(remainingFiles, filename)
|
||||
s.logger.Info().Msgf("File is ready: %s", file.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case <-timeout:
|
||||
s.logger.Warn().Msgf("Timeout waiting for files, %d files still pending", len(remainingFiles))
|
||||
return symlinkPath, fmt.Errorf("timeout waiting for files")
|
||||
}
|
||||
}
|
||||
|
||||
if s.skipPreCache {
|
||||
return symlinkPath, nil
|
||||
}
|
||||
|
||||
go func() {
|
||||
s.logger.Debug().Msgf("Pre-caching %s", debridTorrent.Name)
|
||||
if err := utils.PreCacheFile(filePaths); err != nil {
|
||||
s.logger.Error().Msgf("Failed to pre-cache file: %s", err)
|
||||
} else {
|
||||
s.logger.Debug().Msgf("Pre-cached %d files", len(filePaths))
|
||||
}
|
||||
}() // Pre-cache the files in the background
|
||||
// Pre-cache the first 256KB and 1MB of the file
|
||||
return symlinkPath, nil
|
||||
}
|
||||
|
||||
func (s *Store) getTorrentPath(rclonePath string, debridTorrent *types.Torrent) (string, error) {
|
||||
for {
|
||||
torrentPath, err := debridTorrent.GetMountFolder(rclonePath)
|
||||
if err == nil {
|
||||
return torrentPath, err
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package wire
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func createTorrentFromMagnet(req *ImportRequest) *Torrent {
|
||||
magnet := req.Magnet
|
||||
arrName := req.Arr.Name
|
||||
torrent := &Torrent{
|
||||
ID: req.Id,
|
||||
Hash: strings.ToLower(magnet.InfoHash),
|
||||
Name: magnet.Name,
|
||||
Size: magnet.Size,
|
||||
Category: arrName,
|
||||
Source: string(req.Type),
|
||||
State: "downloading",
|
||||
MagnetUri: magnet.Link,
|
||||
|
||||
Tracker: "udp://tracker.opentrackr.org:1337",
|
||||
UpLimit: -1,
|
||||
DlLimit: -1,
|
||||
AutoTmm: false,
|
||||
Ratio: 1,
|
||||
RatioLimit: 1,
|
||||
SavePath: filepath.Join(req.DownloadFolder, arrName) + string(os.PathSeparator),
|
||||
}
|
||||
return torrent
|
||||
}
|
||||
@@ -0,0 +1,141 @@
|
||||
package wire
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
"github.com/sirrobot01/decypharr/internal/utils"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (s *Store) addToQueue(importReq *ImportRequest) error {
|
||||
if importReq.Magnet == nil {
|
||||
return fmt.Errorf("magnet is required")
|
||||
}
|
||||
|
||||
if importReq.Arr == nil {
|
||||
return fmt.Errorf("arr is required")
|
||||
}
|
||||
|
||||
importReq.Status = "queued"
|
||||
importReq.CompletedAt = time.Time{}
|
||||
importReq.Error = nil
|
||||
err := s.importsQueue.Push(importReq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) StartQueueWorkers(ctx context.Context) error {
|
||||
// This function is responsible for starting the scheduled tasks
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
s.scheduler.RemoveByTags("decypharr-store")
|
||||
|
||||
if jd, err := utils.ConvertToJobDef("30s"); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Failed to convert slots tracking interval to job definition")
|
||||
} else {
|
||||
// Schedule the job
|
||||
if _, err := s.scheduler.NewJob(jd, gocron.NewTask(func() {
|
||||
s.trackAvailableSlots(ctx)
|
||||
}), gocron.WithContext(ctx)); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Failed to create slots tracking job")
|
||||
} else {
|
||||
s.logger.Trace().Msgf("Slots tracking job scheduled for every %s", "30s")
|
||||
}
|
||||
}
|
||||
|
||||
if s.removeStalledAfter > 0 {
|
||||
// Stalled torrents removal job
|
||||
if jd, err := utils.ConvertToJobDef("1m"); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Failed to convert remove stalled torrents interval to job definition")
|
||||
} else {
|
||||
// Schedule the job
|
||||
if _, err := s.scheduler.NewJob(jd, gocron.NewTask(func() {
|
||||
err := s.removeStalledTorrents(ctx)
|
||||
if err != nil {
|
||||
s.logger.Error().Err(err).Msg("Failed to process remove stalled torrents")
|
||||
}
|
||||
}), gocron.WithContext(ctx)); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Failed to create remove stalled torrents job")
|
||||
} else {
|
||||
s.logger.Trace().Msgf("Remove stalled torrents job scheduled for every %s", "1m")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Start the scheduler
|
||||
s.scheduler.Start()
|
||||
s.logger.Debug().Msg("Store worker started")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) trackAvailableSlots(ctx context.Context) {
|
||||
// This function tracks the available slots for each debrid client
|
||||
availableSlots := make(map[string]int)
|
||||
|
||||
for name, deb := range s.debrid.Debrids() {
|
||||
slots, err := deb.Client().GetAvailableSlots()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
availableSlots[name] = slots
|
||||
}
|
||||
|
||||
if len(availableSlots) == 0 {
|
||||
s.logger.Debug().Msg("No debrid clients available or no slots found")
|
||||
return // No debrid clients or slots available, nothing to process
|
||||
}
|
||||
|
||||
if s.importsQueue.Size() <= 0 {
|
||||
// Queue is empty, no need to process
|
||||
return
|
||||
}
|
||||
|
||||
for name, slots := range availableSlots {
|
||||
s.logger.Debug().Msgf("Available slots for %s: %d", name, slots)
|
||||
// If slots are available, process the next import request from the queue
|
||||
for slots > 0 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return // Exit if context is done
|
||||
default:
|
||||
if err := s.processFromQueue(ctx); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Error processing from queue")
|
||||
return // Exit on error
|
||||
}
|
||||
slots-- // Decrease the available slots after processing
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) processFromQueue(ctx context.Context) error {
|
||||
// Pop the next import request from the queue
|
||||
importReq, err := s.importsQueue.Pop()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if importReq == nil {
|
||||
return nil
|
||||
}
|
||||
return s.AddTorrent(ctx, importReq)
|
||||
}
|
||||
|
||||
func (s *Store) removeStalledTorrents(ctx context.Context) error {
|
||||
// This function checks for stalled torrents and removes them
|
||||
stalledTorrents := s.torrents.GetStalledTorrents(s.removeStalledAfter)
|
||||
if len(stalledTorrents) == 0 {
|
||||
return nil // No stalled torrents to remove
|
||||
}
|
||||
|
||||
for _, torrent := range stalledTorrents {
|
||||
s.logger.Warn().Msgf("Removing stalled torrent: %s", torrent.Name)
|
||||
s.torrents.Delete(torrent.Hash, torrent.Category, true) // Remove from store and delete from debrid
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,239 @@
|
||||
package wire
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"cmp"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/google/uuid"
|
||||
"github.com/sirrobot01/decypharr/internal/request"
|
||||
"github.com/sirrobot01/decypharr/internal/utils"
|
||||
"github.com/sirrobot01/decypharr/pkg/arr"
|
||||
debridTypes "github.com/sirrobot01/decypharr/pkg/debrid/types"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ImportType string
|
||||
|
||||
const (
|
||||
ImportTypeQBitTorrent ImportType = "qbit"
|
||||
ImportTypeAPI ImportType = "api"
|
||||
)
|
||||
|
||||
type ImportRequest struct {
|
||||
Id string `json:"id"`
|
||||
DownloadFolder string `json:"downloadFolder"`
|
||||
SelectedDebrid string `json:"debrid"`
|
||||
Magnet *utils.Magnet `json:"magnet"`
|
||||
Arr *arr.Arr `json:"arr"`
|
||||
Action string `json:"action"`
|
||||
DownloadUncached bool `json:"downloadUncached"`
|
||||
CallBackUrl string `json:"callBackUrl"`
|
||||
|
||||
Status string `json:"status"`
|
||||
CompletedAt time.Time `json:"completedAt,omitempty"`
|
||||
Error error `json:"error,omitempty"`
|
||||
|
||||
Type ImportType `json:"type"`
|
||||
Async bool `json:"async"`
|
||||
}
|
||||
|
||||
func NewImportRequest(debrid string, downloadFolder string, magnet *utils.Magnet, arr *arr.Arr, action string, downloadUncached bool, callBackUrl string, importType ImportType) *ImportRequest {
|
||||
return &ImportRequest{
|
||||
Id: uuid.New().String(),
|
||||
Status: "started",
|
||||
DownloadFolder: downloadFolder,
|
||||
SelectedDebrid: cmp.Or(arr.SelectedDebrid, debrid), // Use debrid from arr if available
|
||||
Magnet: magnet,
|
||||
Arr: arr,
|
||||
Action: action,
|
||||
DownloadUncached: downloadUncached,
|
||||
CallBackUrl: callBackUrl,
|
||||
Type: importType,
|
||||
}
|
||||
}
|
||||
|
||||
type importResponse struct {
|
||||
Status string `json:"status"`
|
||||
CompletedAt time.Time `json:"completedAt"`
|
||||
Error error `json:"error"`
|
||||
Torrent *Torrent `json:"torrent"`
|
||||
Debrid *debridTypes.Torrent `json:"debrid"`
|
||||
}
|
||||
|
||||
func (i *ImportRequest) sendCallback(torrent *Torrent, debridTorrent *debridTypes.Torrent) {
|
||||
if i.CallBackUrl == "" {
|
||||
return
|
||||
}
|
||||
|
||||
// Check if the callback URL is valid
|
||||
if _, err := url.ParseRequestURI(i.CallBackUrl); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
client := request.New()
|
||||
payload, err := json.Marshal(&importResponse{
|
||||
Status: i.Status,
|
||||
Error: i.Error,
|
||||
CompletedAt: i.CompletedAt,
|
||||
Torrent: torrent,
|
||||
Debrid: debridTorrent,
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
req, err := http.NewRequest("POST", i.CallBackUrl, bytes.NewReader(payload))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
_, _ = client.Do(req)
|
||||
|
||||
}
|
||||
|
||||
func (i *ImportRequest) markAsFailed(err error, torrent *Torrent, debridTorrent *debridTypes.Torrent) {
|
||||
i.Status = "failed"
|
||||
i.Error = err
|
||||
i.CompletedAt = time.Now()
|
||||
i.sendCallback(torrent, debridTorrent)
|
||||
}
|
||||
|
||||
func (i *ImportRequest) markAsCompleted(torrent *Torrent, debridTorrent *debridTypes.Torrent) {
|
||||
i.Status = "completed"
|
||||
i.Error = nil
|
||||
i.CompletedAt = time.Now()
|
||||
i.sendCallback(torrent, debridTorrent)
|
||||
}
|
||||
|
||||
type ImportQueue struct {
|
||||
queue []*ImportRequest
|
||||
mu sync.RWMutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
cond *sync.Cond // For blocking operations
|
||||
}
|
||||
|
||||
func NewImportQueue(ctx context.Context, capacity int) *ImportQueue {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
iq := &ImportQueue{
|
||||
queue: make([]*ImportRequest, 0, capacity),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
iq.cond = sync.NewCond(&iq.mu)
|
||||
return iq
|
||||
}
|
||||
|
||||
func (iq *ImportQueue) Push(req *ImportRequest) error {
|
||||
if req == nil {
|
||||
return fmt.Errorf("import request cannot be nil")
|
||||
}
|
||||
|
||||
iq.mu.Lock()
|
||||
defer iq.mu.Unlock()
|
||||
|
||||
select {
|
||||
case <-iq.ctx.Done():
|
||||
return fmt.Errorf("queue is shutting down")
|
||||
default:
|
||||
}
|
||||
|
||||
if len(iq.queue) >= cap(iq.queue) {
|
||||
return fmt.Errorf("queue is full")
|
||||
}
|
||||
|
||||
iq.queue = append(iq.queue, req)
|
||||
iq.cond.Signal() // Wake up any waiting Pop()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (iq *ImportQueue) Pop() (*ImportRequest, error) {
|
||||
iq.mu.Lock()
|
||||
defer iq.mu.Unlock()
|
||||
|
||||
select {
|
||||
case <-iq.ctx.Done():
|
||||
return nil, fmt.Errorf("queue is shutting down")
|
||||
default:
|
||||
}
|
||||
|
||||
if len(iq.queue) == 0 {
|
||||
return nil, fmt.Errorf("no import requests available")
|
||||
}
|
||||
|
||||
req := iq.queue[0]
|
||||
iq.queue = iq.queue[1:]
|
||||
return req, nil
|
||||
}
|
||||
|
||||
// Delete specific request by ID
|
||||
func (iq *ImportQueue) Delete(requestID string) bool {
|
||||
iq.mu.Lock()
|
||||
defer iq.mu.Unlock()
|
||||
|
||||
for i, req := range iq.queue {
|
||||
if req.Id == requestID {
|
||||
// Remove from slice
|
||||
iq.queue = append(iq.queue[:i], iq.queue[i+1:]...)
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// DeleteWhere requests matching a condition
|
||||
func (iq *ImportQueue) DeleteWhere(predicate func(*ImportRequest) bool) int {
|
||||
iq.mu.Lock()
|
||||
defer iq.mu.Unlock()
|
||||
|
||||
deleted := 0
|
||||
for i := len(iq.queue) - 1; i >= 0; i-- {
|
||||
if predicate(iq.queue[i]) {
|
||||
iq.queue = append(iq.queue[:i], iq.queue[i+1:]...)
|
||||
deleted++
|
||||
}
|
||||
}
|
||||
return deleted
|
||||
}
|
||||
|
||||
// Find request without removing it
|
||||
func (iq *ImportQueue) Find(requestID string) *ImportRequest {
|
||||
iq.mu.RLock()
|
||||
defer iq.mu.RUnlock()
|
||||
|
||||
for _, req := range iq.queue {
|
||||
if req.Id == requestID {
|
||||
return req
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (iq *ImportQueue) Size() int {
|
||||
iq.mu.RLock()
|
||||
defer iq.mu.RUnlock()
|
||||
return len(iq.queue)
|
||||
}
|
||||
|
||||
func (iq *ImportQueue) IsEmpty() bool {
|
||||
return iq.Size() == 0
|
||||
}
|
||||
|
||||
// List all requests (copy to avoid race conditions)
|
||||
func (iq *ImportQueue) List() []*ImportRequest {
|
||||
iq.mu.RLock()
|
||||
defer iq.mu.RUnlock()
|
||||
|
||||
result := make([]*ImportRequest, len(iq.queue))
|
||||
copy(result, iq.queue)
|
||||
return result
|
||||
}
|
||||
|
||||
func (iq *ImportQueue) Close() {
|
||||
iq.cancel()
|
||||
iq.cond.Broadcast()
|
||||
}
|
||||
@@ -0,0 +1,130 @@
|
||||
package wire
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/sirrobot01/decypharr/internal/config"
|
||||
"github.com/sirrobot01/decypharr/internal/logger"
|
||||
"github.com/sirrobot01/decypharr/pkg/arr"
|
||||
"github.com/sirrobot01/decypharr/pkg/debrid"
|
||||
"github.com/sirrobot01/decypharr/pkg/rclone"
|
||||
"github.com/sirrobot01/decypharr/pkg/repair"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Store struct {
|
||||
repair *repair.Repair
|
||||
arr *arr.Storage
|
||||
debrid *debrid.Storage
|
||||
rcloneManager *rclone.Manager
|
||||
importsQueue *ImportQueue // Queued import requests(probably from too_many_active_downloads)
|
||||
torrents *TorrentStorage
|
||||
logger zerolog.Logger
|
||||
refreshInterval time.Duration
|
||||
skipPreCache bool
|
||||
downloadSemaphore chan struct{}
|
||||
removeStalledAfter time.Duration // Duration after which stalled torrents are removed
|
||||
scheduler gocron.Scheduler
|
||||
}
|
||||
|
||||
var (
|
||||
instance *Store
|
||||
once sync.Once
|
||||
)
|
||||
|
||||
// Get returns the singleton instance
|
||||
func Get() *Store {
|
||||
once.Do(func() {
|
||||
cfg := config.Get()
|
||||
qbitCfg := cfg.QBitTorrent
|
||||
|
||||
// Create rclone manager if enabled
|
||||
var rcManager *rclone.Manager
|
||||
if cfg.Rclone.Enabled {
|
||||
rcManager = rclone.NewManager()
|
||||
}
|
||||
|
||||
// Create services with dependencies
|
||||
arrs := arr.NewStorage()
|
||||
deb := debrid.NewStorage(rcManager)
|
||||
|
||||
scheduler, err := gocron.NewScheduler(gocron.WithLocation(time.Local), gocron.WithGlobalJobOptions(gocron.WithTags("decypharr-store")))
|
||||
if err != nil {
|
||||
scheduler, _ = gocron.NewScheduler(gocron.WithGlobalJobOptions(gocron.WithTags("decypharr-store")))
|
||||
}
|
||||
|
||||
instance = &Store{
|
||||
repair: repair.New(arrs, deb),
|
||||
arr: arrs,
|
||||
debrid: deb,
|
||||
rcloneManager: rcManager,
|
||||
torrents: newTorrentStorage(cfg.TorrentsFile()),
|
||||
logger: logger.Default(), // Use default logger [decypharr]
|
||||
refreshInterval: time.Duration(cmp.Or(qbitCfg.RefreshInterval, 30)) * time.Second,
|
||||
skipPreCache: qbitCfg.SkipPreCache,
|
||||
downloadSemaphore: make(chan struct{}, cmp.Or(qbitCfg.MaxDownloads, 5)),
|
||||
importsQueue: NewImportQueue(context.Background(), 1000),
|
||||
scheduler: scheduler,
|
||||
}
|
||||
if cfg.RemoveStalledAfter != "" {
|
||||
removeStalledAfter, err := time.ParseDuration(cfg.RemoveStalledAfter)
|
||||
if err == nil {
|
||||
instance.removeStalledAfter = removeStalledAfter
|
||||
}
|
||||
}
|
||||
})
|
||||
return instance
|
||||
}
|
||||
|
||||
func Reset() {
|
||||
if instance != nil {
|
||||
if instance.debrid != nil {
|
||||
instance.debrid.Reset()
|
||||
}
|
||||
|
||||
if instance.rcloneManager != nil {
|
||||
err := instance.rcloneManager.Stop()
|
||||
if err != nil {
|
||||
instance.logger.Error().Err(err).Msg("Failed to stop rclone manager")
|
||||
}
|
||||
}
|
||||
|
||||
if instance.importsQueue != nil {
|
||||
instance.importsQueue.Close()
|
||||
}
|
||||
if instance.downloadSemaphore != nil {
|
||||
// Close the semaphore channel to
|
||||
close(instance.downloadSemaphore)
|
||||
}
|
||||
|
||||
if instance.scheduler != nil {
|
||||
_ = instance.scheduler.StopJobs()
|
||||
_ = instance.scheduler.Shutdown()
|
||||
}
|
||||
}
|
||||
once = sync.Once{}
|
||||
instance = nil
|
||||
}
|
||||
|
||||
func (s *Store) Arr() *arr.Storage {
|
||||
return s.arr
|
||||
}
|
||||
func (s *Store) Debrid() *debrid.Storage {
|
||||
return s.debrid
|
||||
}
|
||||
func (s *Store) Repair() *repair.Repair {
|
||||
return s.repair
|
||||
}
|
||||
func (s *Store) Torrents() *TorrentStorage {
|
||||
return s.torrents
|
||||
}
|
||||
func (s *Store) RcloneManager() *rclone.Manager {
|
||||
return s.rcloneManager
|
||||
}
|
||||
|
||||
func (s *Store) Scheduler() gocron.Scheduler {
|
||||
return s.scheduler
|
||||
}
|
||||
@@ -0,0 +1,296 @@
|
||||
package wire
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/sirrobot01/decypharr/internal/request"
|
||||
"github.com/sirrobot01/decypharr/internal/utils"
|
||||
debridTypes "github.com/sirrobot01/decypharr/pkg/debrid"
|
||||
"github.com/sirrobot01/decypharr/pkg/debrid/types"
|
||||
)
|
||||
|
||||
func (s *Store) AddTorrent(ctx context.Context, importReq *ImportRequest) error {
|
||||
torrent := createTorrentFromMagnet(importReq)
|
||||
debridTorrent, err := debridTypes.Process(ctx, s.debrid, importReq.SelectedDebrid, importReq.Magnet, importReq.Arr, importReq.Action, importReq.DownloadUncached)
|
||||
|
||||
if err != nil {
|
||||
var httpErr *utils.HTTPError
|
||||
if ok := errors.As(err, &httpErr); ok {
|
||||
switch httpErr.Code {
|
||||
case "too_many_active_downloads":
|
||||
// Handle too much active downloads error
|
||||
s.logger.Warn().Msgf("Too many active downloads for %s, adding to queue", importReq.Magnet.Name)
|
||||
|
||||
if err := s.addToQueue(importReq); err != nil {
|
||||
s.logger.Error().Err(err).Msgf("Failed to add %s to queue", importReq.Magnet.Name)
|
||||
return err
|
||||
}
|
||||
torrent.State = "queued"
|
||||
default:
|
||||
// Unhandled error, return it, caller logs it
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// Unhandled error, return it, caller logs it
|
||||
return err
|
||||
}
|
||||
}
|
||||
torrent = s.partialTorrentUpdate(torrent, debridTorrent)
|
||||
s.torrents.AddOrUpdate(torrent)
|
||||
go s.processFiles(torrent, debridTorrent, importReq) // We can send async for file processing not to delay the response
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) processFiles(torrent *Torrent, debridTorrent *types.Torrent, importReq *ImportRequest) {
|
||||
|
||||
if debridTorrent == nil {
|
||||
// Early return if debridTorrent is nil
|
||||
return
|
||||
}
|
||||
|
||||
deb := s.debrid.Debrid(debridTorrent.Debrid)
|
||||
client := deb.Client()
|
||||
downloadingStatuses := client.GetDownloadingStatus()
|
||||
_arr := importReq.Arr
|
||||
backoff := time.NewTimer(s.refreshInterval)
|
||||
defer backoff.Stop()
|
||||
for debridTorrent.Status != "downloaded" {
|
||||
s.logger.Debug().Msgf("%s <- (%s) Download Progress: %.2f%%", debridTorrent.Debrid, debridTorrent.Name, debridTorrent.Progress)
|
||||
dbT, err := client.CheckStatus(debridTorrent)
|
||||
if err != nil {
|
||||
s.logger.Error().
|
||||
Str("torrent_id", debridTorrent.Id).
|
||||
Str("torrent_name", debridTorrent.Name).
|
||||
Err(err).
|
||||
Msg("Error checking torrent status")
|
||||
if dbT != nil && dbT.Id != "" {
|
||||
// Delete the torrent if it was not downloaded
|
||||
go func() {
|
||||
_ = client.DeleteTorrent(dbT.Id)
|
||||
}()
|
||||
}
|
||||
s.logger.Error().Msgf("Error checking status: %v", err)
|
||||
s.markTorrentAsFailed(torrent)
|
||||
go func() {
|
||||
_arr.Refresh()
|
||||
}()
|
||||
importReq.markAsFailed(err, torrent, debridTorrent)
|
||||
return
|
||||
}
|
||||
|
||||
debridTorrent = dbT
|
||||
torrent = s.partialTorrentUpdate(torrent, debridTorrent)
|
||||
|
||||
// Exit the loop for downloading statuses to prevent memory buildup
|
||||
if debridTorrent.Status == "downloaded" || !utils.Contains(downloadingStatuses, debridTorrent.Status) {
|
||||
break
|
||||
}
|
||||
select {
|
||||
case <-backoff.C:
|
||||
// Increase interval gradually, cap at max
|
||||
nextInterval := min(s.refreshInterval*2, 30*time.Second)
|
||||
backoff.Reset(nextInterval)
|
||||
}
|
||||
}
|
||||
var torrentSymlinkPath string
|
||||
var err error
|
||||
debridTorrent.Arr = _arr
|
||||
|
||||
// Check if debrid supports webdav by checking cache
|
||||
timer := time.Now()
|
||||
|
||||
onFailed := func(err error) {
|
||||
s.markTorrentAsFailed(torrent)
|
||||
go func() {
|
||||
if deleteErr := client.DeleteTorrent(debridTorrent.Id); deleteErr != nil {
|
||||
s.logger.Warn().Err(deleteErr).Msgf("Failed to delete torrent %s", debridTorrent.Id)
|
||||
}
|
||||
}()
|
||||
s.logger.Error().Err(err).Msgf("Error occured while processing torrent %s", debridTorrent.Name)
|
||||
importReq.markAsFailed(err, torrent, debridTorrent)
|
||||
return
|
||||
}
|
||||
|
||||
onSuccess := func(torrentSymlinkPath string) {
|
||||
torrent.TorrentPath = torrentSymlinkPath
|
||||
s.updateTorrent(torrent, debridTorrent)
|
||||
s.logger.Info().Msgf("Adding %s took %s", debridTorrent.Name, time.Since(timer))
|
||||
|
||||
go importReq.markAsCompleted(torrent, debridTorrent) // Mark the import request as completed, send callback if needed
|
||||
go func() {
|
||||
if err := request.SendDiscordMessage("download_complete", "success", torrent.discordContext()); err != nil {
|
||||
s.logger.Error().Msgf("Error sending discord message: %v", err)
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
_arr.Refresh()
|
||||
}()
|
||||
}
|
||||
|
||||
switch importReq.Action {
|
||||
case "symlink":
|
||||
// Symlink action, we will create a symlink to the torrent
|
||||
s.logger.Debug().Msgf("Post-Download Action: Symlink")
|
||||
cache := deb.Cache()
|
||||
if cache != nil {
|
||||
s.logger.Info().Msgf("Using internal webdav for %s", debridTorrent.Debrid)
|
||||
// Use webdav to download the file
|
||||
if err := cache.Add(debridTorrent); err != nil {
|
||||
onFailed(err)
|
||||
return
|
||||
}
|
||||
|
||||
rclonePath := filepath.Join(debridTorrent.MountPath, cache.GetTorrentFolder(debridTorrent)) // /mnt/remote/realdebrid/MyTVShow
|
||||
torrentFolderNoExt := utils.RemoveExtension(debridTorrent.Name)
|
||||
torrentSymlinkPath, err = s.createSymlinksWebdav(torrent, debridTorrent, rclonePath, torrentFolderNoExt) // /mnt/symlinks/{category}/MyTVShow/
|
||||
} else {
|
||||
// User is using either zurg or debrid webdav
|
||||
torrentSymlinkPath, err = s.processSymlink(torrent, debridTorrent) // /mnt/symlinks/{category}/MyTVShow/
|
||||
}
|
||||
if err != nil {
|
||||
onFailed(err)
|
||||
return
|
||||
}
|
||||
if torrentSymlinkPath == "" {
|
||||
err = fmt.Errorf("symlink path is empty for %s", debridTorrent.Name)
|
||||
onFailed(err)
|
||||
}
|
||||
onSuccess(torrentSymlinkPath)
|
||||
return
|
||||
case "download":
|
||||
// Download action, we will download the torrent to the specified folder
|
||||
// Generate download links
|
||||
s.logger.Debug().Msgf("Post-Download Action: Download")
|
||||
if err := client.GetFileDownloadLinks(debridTorrent); err != nil {
|
||||
onFailed(err)
|
||||
return
|
||||
}
|
||||
torrentSymlinkPath, err = s.processDownload(torrent, debridTorrent)
|
||||
if err != nil {
|
||||
onFailed(err)
|
||||
return
|
||||
}
|
||||
if torrentSymlinkPath == "" {
|
||||
err = fmt.Errorf("download path is empty for %s", debridTorrent.Name)
|
||||
onFailed(err)
|
||||
return
|
||||
}
|
||||
onSuccess(torrentSymlinkPath)
|
||||
case "none":
|
||||
s.logger.Debug().Msgf("Post-Download Action: None")
|
||||
// No action, just update the torrent and mark it as completed
|
||||
onSuccess(torrent.TorrentPath)
|
||||
default:
|
||||
// Action is none, do nothing, fallthrough
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) markTorrentAsFailed(t *Torrent) *Torrent {
|
||||
t.State = "error"
|
||||
s.torrents.AddOrUpdate(t)
|
||||
go func() {
|
||||
if err := request.SendDiscordMessage("download_failed", "error", t.discordContext()); err != nil {
|
||||
s.logger.Error().Msgf("Error sending discord message: %v", err)
|
||||
}
|
||||
}()
|
||||
return t
|
||||
}
|
||||
|
||||
func (s *Store) partialTorrentUpdate(t *Torrent, debridTorrent *types.Torrent) *Torrent {
|
||||
if debridTorrent == nil {
|
||||
return t
|
||||
}
|
||||
|
||||
addedOn, err := time.Parse(time.RFC3339, debridTorrent.Added)
|
||||
if err != nil {
|
||||
addedOn = time.Now()
|
||||
}
|
||||
totalSize := debridTorrent.Bytes
|
||||
progress := (cmp.Or(debridTorrent.Progress, 0.0)) / 100.0
|
||||
if math.IsNaN(progress) || math.IsInf(progress, 0) {
|
||||
progress = 0
|
||||
}
|
||||
sizeCompleted := int64(float64(totalSize) * progress)
|
||||
|
||||
var speed int64
|
||||
if debridTorrent.Speed != 0 {
|
||||
speed = debridTorrent.Speed
|
||||
}
|
||||
var eta int
|
||||
if speed != 0 {
|
||||
eta = int((totalSize - sizeCompleted) / speed)
|
||||
}
|
||||
files := make([]*File, 0, len(debridTorrent.Files))
|
||||
for index, file := range debridTorrent.GetFiles() {
|
||||
files = append(files, &File{
|
||||
Index: index,
|
||||
Name: file.Path,
|
||||
Size: file.Size,
|
||||
})
|
||||
}
|
||||
t.DebridID = debridTorrent.Id
|
||||
t.Name = debridTorrent.Name
|
||||
t.AddedOn = addedOn.Unix()
|
||||
t.Files = files
|
||||
t.Debrid = debridTorrent.Debrid
|
||||
t.Size = totalSize
|
||||
t.Completed = sizeCompleted
|
||||
t.NumSeeds = debridTorrent.Seeders
|
||||
t.Downloaded = sizeCompleted
|
||||
t.DownloadedSession = sizeCompleted
|
||||
t.Uploaded = sizeCompleted
|
||||
t.UploadedSession = sizeCompleted
|
||||
t.AmountLeft = totalSize - sizeCompleted
|
||||
t.Progress = progress
|
||||
t.Eta = eta
|
||||
t.Dlspeed = speed
|
||||
t.Upspeed = speed
|
||||
t.ContentPath = filepath.Join(t.SavePath, t.Name) + string(os.PathSeparator)
|
||||
return t
|
||||
}
|
||||
|
||||
func (s *Store) updateTorrent(t *Torrent, debridTorrent *types.Torrent) *Torrent {
|
||||
if debridTorrent == nil {
|
||||
return t
|
||||
}
|
||||
|
||||
if debridClient := s.debrid.Clients()[debridTorrent.Debrid]; debridClient != nil {
|
||||
if debridTorrent.Status != "downloaded" {
|
||||
_ = debridClient.UpdateTorrent(debridTorrent)
|
||||
}
|
||||
}
|
||||
t = s.partialTorrentUpdate(t, debridTorrent)
|
||||
t.ContentPath = t.TorrentPath + string(os.PathSeparator)
|
||||
|
||||
if t.IsReady() {
|
||||
t.State = "pausedUP"
|
||||
s.torrents.Update(t)
|
||||
return t
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(100 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if t.IsReady() {
|
||||
t.State = "pausedUP"
|
||||
s.torrents.Update(t)
|
||||
return t
|
||||
}
|
||||
updatedT := s.updateTorrent(t, debridTorrent)
|
||||
t = updatedT
|
||||
|
||||
case <-time.After(10 * time.Minute): // Add a timeout
|
||||
return t
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,310 @@
|
||||
package wire
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
func keyPair(hash, category string) string {
|
||||
return fmt.Sprintf("%s|%s", hash, category)
|
||||
}
|
||||
|
||||
type Torrents = map[string]*Torrent
|
||||
|
||||
type TorrentStorage struct {
|
||||
torrents Torrents
|
||||
mu sync.RWMutex
|
||||
filename string // Added to store the filename for persistence
|
||||
}
|
||||
|
||||
func loadTorrentsFromJSON(filename string) (Torrents, error) {
|
||||
data, err := os.ReadFile(filename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
torrents := make(Torrents)
|
||||
if err := json.Unmarshal(data, &torrents); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return torrents, nil
|
||||
}
|
||||
|
||||
func newTorrentStorage(filename string) *TorrentStorage {
|
||||
// Open the JSON file and read the data
|
||||
torrents, err := loadTorrentsFromJSON(filename)
|
||||
if err != nil {
|
||||
torrents = make(Torrents)
|
||||
}
|
||||
// Create a new Storage
|
||||
return &TorrentStorage{
|
||||
torrents: torrents,
|
||||
filename: filename,
|
||||
}
|
||||
}
|
||||
|
||||
func (ts *TorrentStorage) Add(torrent *Torrent) {
|
||||
ts.mu.Lock()
|
||||
defer ts.mu.Unlock()
|
||||
ts.torrents[keyPair(torrent.Hash, torrent.Category)] = torrent
|
||||
go func() {
|
||||
err := ts.saveToFile()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (ts *TorrentStorage) AddOrUpdate(torrent *Torrent) {
|
||||
ts.mu.Lock()
|
||||
defer ts.mu.Unlock()
|
||||
ts.torrents[keyPair(torrent.Hash, torrent.Category)] = torrent
|
||||
go func() {
|
||||
err := ts.saveToFile()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (ts *TorrentStorage) Get(hash, category string) *Torrent {
|
||||
ts.mu.RLock()
|
||||
defer ts.mu.RUnlock()
|
||||
torrent, exists := ts.torrents[keyPair(hash, category)]
|
||||
if !exists && category == "" {
|
||||
// Try to find the torrent without knowing the category
|
||||
for _, t := range ts.torrents {
|
||||
if t.Hash == hash {
|
||||
return t
|
||||
}
|
||||
}
|
||||
}
|
||||
return torrent
|
||||
}
|
||||
|
||||
func (ts *TorrentStorage) GetAll(category string, filter string, hashes []string) []*Torrent {
|
||||
ts.mu.RLock()
|
||||
defer ts.mu.RUnlock()
|
||||
torrents := make([]*Torrent, 0)
|
||||
for _, torrent := range ts.torrents {
|
||||
if category != "" && torrent.Category != category {
|
||||
continue
|
||||
}
|
||||
if filter != "" && torrent.State != filter {
|
||||
continue
|
||||
}
|
||||
torrents = append(torrents, torrent)
|
||||
}
|
||||
|
||||
if len(hashes) > 0 {
|
||||
filtered := make([]*Torrent, 0)
|
||||
for _, hash := range hashes {
|
||||
for _, torrent := range torrents {
|
||||
if torrent.Hash == hash {
|
||||
filtered = append(filtered, torrent)
|
||||
}
|
||||
}
|
||||
}
|
||||
torrents = filtered
|
||||
}
|
||||
return torrents
|
||||
}
|
||||
|
||||
func (ts *TorrentStorage) GetAllSorted(category string, filter string, hashes []string, sortBy string, ascending bool) []*Torrent {
|
||||
torrents := ts.GetAll(category, filter, hashes)
|
||||
if sortBy != "" {
|
||||
sort.Slice(torrents, func(i, j int) bool {
|
||||
// If ascending is false, swap i and j to get descending order
|
||||
if !ascending {
|
||||
i, j = j, i
|
||||
}
|
||||
|
||||
switch sortBy {
|
||||
case "name":
|
||||
return torrents[i].Name < torrents[j].Name
|
||||
case "size":
|
||||
return torrents[i].Size < torrents[j].Size
|
||||
case "added_on":
|
||||
return torrents[i].AddedOn < torrents[j].AddedOn
|
||||
case "completed":
|
||||
return torrents[i].Completed < torrents[j].Completed
|
||||
case "progress":
|
||||
return torrents[i].Progress < torrents[j].Progress
|
||||
case "state":
|
||||
return torrents[i].State < torrents[j].State
|
||||
case "category":
|
||||
return torrents[i].Category < torrents[j].Category
|
||||
case "dlspeed":
|
||||
return torrents[i].Dlspeed < torrents[j].Dlspeed
|
||||
case "upspeed":
|
||||
return torrents[i].Upspeed < torrents[j].Upspeed
|
||||
case "ratio":
|
||||
return torrents[i].Ratio < torrents[j].Ratio
|
||||
default:
|
||||
// Default sort by added_on
|
||||
return torrents[i].AddedOn < torrents[j].AddedOn
|
||||
}
|
||||
})
|
||||
}
|
||||
return torrents
|
||||
}
|
||||
|
||||
func (ts *TorrentStorage) Update(torrent *Torrent) {
|
||||
ts.mu.Lock()
|
||||
defer ts.mu.Unlock()
|
||||
ts.torrents[keyPair(torrent.Hash, torrent.Category)] = torrent
|
||||
go func() {
|
||||
err := ts.saveToFile()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (ts *TorrentStorage) Delete(hash, category string, removeFromDebrid bool) {
|
||||
ts.mu.Lock()
|
||||
defer ts.mu.Unlock()
|
||||
key := keyPair(hash, category)
|
||||
torrent, exists := ts.torrents[key]
|
||||
if !exists && category == "" {
|
||||
// Remove the torrent without knowing the category
|
||||
for k, t := range ts.torrents {
|
||||
if t.Hash == hash {
|
||||
key = k
|
||||
torrent = t
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if torrent == nil {
|
||||
return
|
||||
}
|
||||
st := Get()
|
||||
// Check if torrent is queued for download
|
||||
|
||||
if torrent.State == "queued" && torrent.ID != "" {
|
||||
// Remove the torrent from the import queue if it exists
|
||||
st.importsQueue.Delete(torrent.ID)
|
||||
}
|
||||
|
||||
if removeFromDebrid && torrent.DebridID != "" && torrent.Debrid != "" {
|
||||
dbClient := st.debrid.Client(torrent.Debrid)
|
||||
if dbClient != nil {
|
||||
_ = dbClient.DeleteTorrent(torrent.DebridID)
|
||||
}
|
||||
}
|
||||
|
||||
delete(ts.torrents, key)
|
||||
|
||||
// Delete the torrent folder
|
||||
if torrent.ContentPath != "" {
|
||||
err := os.RemoveAll(torrent.ContentPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
err := ts.saveToFile()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (ts *TorrentStorage) DeleteMultiple(hashes []string, removeFromDebrid bool) {
|
||||
ts.mu.Lock()
|
||||
defer ts.mu.Unlock()
|
||||
toDelete := make(map[string]string)
|
||||
|
||||
st := Get()
|
||||
|
||||
for _, hash := range hashes {
|
||||
for key, torrent := range ts.torrents {
|
||||
if torrent == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if torrent.State == "queued" && torrent.ID != "" {
|
||||
// Remove the torrent from the import queue if it exists
|
||||
st.importsQueue.Delete(torrent.ID)
|
||||
}
|
||||
if torrent.Hash == hash {
|
||||
if removeFromDebrid && torrent.DebridID != "" && torrent.Debrid != "" {
|
||||
toDelete[torrent.DebridID] = torrent.Debrid
|
||||
}
|
||||
delete(ts.torrents, key)
|
||||
if torrent.ContentPath != "" {
|
||||
err := os.RemoveAll(torrent.ContentPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
err := ts.saveToFile()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
}()
|
||||
|
||||
clients := st.debrid.Clients()
|
||||
|
||||
go func() {
|
||||
for id, debrid := range toDelete {
|
||||
dbClient, ok := clients[debrid]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
err := dbClient.DeleteTorrent(id)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (ts *TorrentStorage) Save() error {
|
||||
return ts.saveToFile()
|
||||
}
|
||||
|
||||
// saveToFile is a helper function to write the current state to the JSON file
|
||||
func (ts *TorrentStorage) saveToFile() error {
|
||||
ts.mu.RLock()
|
||||
data, err := json.MarshalIndent(ts.torrents, "", " ")
|
||||
ts.mu.RUnlock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return os.WriteFile(ts.filename, data, 0644)
|
||||
}
|
||||
|
||||
func (ts *TorrentStorage) Reset() {
|
||||
ts.mu.Lock()
|
||||
defer ts.mu.Unlock()
|
||||
ts.torrents = make(Torrents)
|
||||
}
|
||||
|
||||
// GetStalledTorrents returns a list of torrents that are stalled
|
||||
// A torrent is considered stalled if it has no seeds, no progress, and has been downloading for longer than removeStalledAfter
|
||||
// The torrent must have a DebridID and be in the "downloading" state
|
||||
func (ts *TorrentStorage) GetStalledTorrents(removeAfter time.Duration) []*Torrent {
|
||||
ts.mu.RLock()
|
||||
defer ts.mu.RUnlock()
|
||||
stalled := make([]*Torrent, 0)
|
||||
currentTime := time.Now()
|
||||
for _, torrent := range ts.torrents {
|
||||
if torrent.DebridID != "" && torrent.State == "downloading" && torrent.NumSeeds == 0 && torrent.Progress == 0 {
|
||||
addedOn := time.Unix(torrent.AddedOn, 0)
|
||||
if currentTime.Sub(addedOn) > removeAfter {
|
||||
stalled = append(stalled, torrent)
|
||||
}
|
||||
}
|
||||
}
|
||||
return stalled
|
||||
}
|
||||
@@ -0,0 +1,88 @@
|
||||
package wire
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type File struct {
|
||||
Index int `json:"index,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Size int64 `json:"size,omitempty"`
|
||||
Progress int `json:"progress,omitempty"`
|
||||
Priority int `json:"priority,omitempty"`
|
||||
IsSeed bool `json:"is_seed,omitempty"`
|
||||
PieceRange []int `json:"piece_range,omitempty"`
|
||||
Availability float64 `json:"availability,omitempty"`
|
||||
}
|
||||
|
||||
type Torrent struct {
|
||||
ID string `json:"id"`
|
||||
DebridID string `json:"debrid_id"`
|
||||
Debrid string `json:"debrid"`
|
||||
TorrentPath string `json:"-"`
|
||||
Files []*File `json:"files,omitempty"`
|
||||
|
||||
AddedOn int64 `json:"added_on,omitempty"`
|
||||
AmountLeft int64 `json:"amount_left"`
|
||||
AutoTmm bool `json:"auto_tmm"`
|
||||
Availability float64 `json:"availability,omitempty"`
|
||||
Category string `json:"category,omitempty"`
|
||||
Completed int64 `json:"completed"`
|
||||
CompletionOn int `json:"completion_on,omitempty"`
|
||||
ContentPath string `json:"content_path"`
|
||||
DlLimit int `json:"dl_limit"`
|
||||
Dlspeed int64 `json:"dlspeed"`
|
||||
Downloaded int64 `json:"downloaded"`
|
||||
DownloadedSession int64 `json:"downloaded_session"`
|
||||
Eta int `json:"eta"`
|
||||
FlPiecePrio bool `json:"f_l_piece_prio,omitempty"`
|
||||
ForceStart bool `json:"force_start,omitempty"`
|
||||
Hash string `json:"hash"`
|
||||
LastActivity int64 `json:"last_activity,omitempty"`
|
||||
MagnetUri string `json:"magnet_uri,omitempty"`
|
||||
MaxRatio int `json:"max_ratio,omitempty"`
|
||||
MaxSeedingTime int `json:"max_seeding_time,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
NumComplete int `json:"num_complete,omitempty"`
|
||||
NumIncomplete int `json:"num_incomplete,omitempty"`
|
||||
NumLeechs int `json:"num_leechs,omitempty"`
|
||||
NumSeeds int `json:"num_seeds,omitempty"`
|
||||
Priority int `json:"priority,omitempty"`
|
||||
Progress float64 `json:"progress"`
|
||||
Ratio int `json:"ratio,omitempty"`
|
||||
RatioLimit int `json:"ratio_limit,omitempty"`
|
||||
SavePath string `json:"save_path"`
|
||||
SeedingTimeLimit int `json:"seeding_time_limit,omitempty"`
|
||||
SeenComplete int64 `json:"seen_complete,omitempty"`
|
||||
SeqDl bool `json:"seq_dl"`
|
||||
Size int64 `json:"size,omitempty"`
|
||||
State string `json:"state,omitempty"`
|
||||
SuperSeeding bool `json:"super_seeding"`
|
||||
Tags string `json:"tags,omitempty"`
|
||||
TimeActive int `json:"time_active,omitempty"`
|
||||
TotalSize int64 `json:"total_size,omitempty"`
|
||||
Tracker string `json:"tracker,omitempty"`
|
||||
UpLimit int64 `json:"up_limit,omitempty"`
|
||||
Uploaded int64 `json:"uploaded,omitempty"`
|
||||
UploadedSession int64 `json:"uploaded_session,omitempty"`
|
||||
Upspeed int64 `json:"upspeed,omitempty"`
|
||||
Source string `json:"source,omitempty"`
|
||||
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func (t *Torrent) IsReady() bool {
|
||||
return (t.AmountLeft <= 0 || t.Progress == 1) && t.TorrentPath != ""
|
||||
}
|
||||
|
||||
func (t *Torrent) discordContext() string {
|
||||
format := `
|
||||
**Name:** %s
|
||||
**Arr:** %s
|
||||
**Hash:** %s
|
||||
**MagnetURI:** %s
|
||||
**Debrid:** %s
|
||||
`
|
||||
return fmt.Sprintf(format, t.Name, t.Category, t.Hash, t.MagnetUri, t.Debrid)
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
package wire
|
||||
|
||||
import "context"
|
||||
|
||||
func (s *Store) StartWorkers(ctx context.Context) {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
// Start debrid workers
|
||||
if err := s.Debrid().StartWorker(ctx); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Failed to start debrid worker")
|
||||
} else {
|
||||
s.logger.Debug().Msg("Started debrid worker")
|
||||
}
|
||||
|
||||
// Cache workers
|
||||
for _, cache := range s.Debrid().Caches() {
|
||||
if cache == nil {
|
||||
continue
|
||||
}
|
||||
go func() {
|
||||
if err := cache.StartWorker(ctx); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Failed to start debrid cache worker")
|
||||
} else {
|
||||
s.logger.Debug().Msgf("Started debrid cache worker for %s", cache.GetConfig().Name)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Store queue workers
|
||||
if err := s.StartQueueWorkers(ctx); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Failed to start store worker")
|
||||
} else {
|
||||
s.logger.Debug().Msg("Started store worker")
|
||||
}
|
||||
|
||||
// Arr workers
|
||||
if err := s.Arr().StartWorker(ctx); err != nil {
|
||||
s.logger.Error().Err(err).Msg("Failed to start Arr worker")
|
||||
} else {
|
||||
s.logger.Debug().Msg("Started Arr worker")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user