- Speed up repairs when checking links \n
- Remove run on start for repairs since it causes issues \n - Add support for arr-specific debrid - Support for queuing system - Support for no-op when sending torrents to debrid
This commit is contained in:
+99
-52
@@ -2,9 +2,11 @@ package store
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"cmp"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/google/uuid"
|
||||
"github.com/sirrobot01/decypharr/internal/request"
|
||||
"github.com/sirrobot01/decypharr/internal/utils"
|
||||
"github.com/sirrobot01/decypharr/pkg/arr"
|
||||
@@ -23,11 +25,12 @@ const (
|
||||
)
|
||||
|
||||
type ImportRequest struct {
|
||||
Id string `json:"id"`
|
||||
DownloadFolder string `json:"downloadFolder"`
|
||||
SelectedDebrid string `json:"debrid"`
|
||||
Magnet *utils.Magnet `json:"magnet"`
|
||||
Arr *arr.Arr `json:"arr"`
|
||||
IsSymlink bool `json:"isSymlink"`
|
||||
Action string `json:"action"`
|
||||
DownloadUncached bool `json:"downloadUncached"`
|
||||
CallBackUrl string `json:"callBackUrl"`
|
||||
|
||||
@@ -39,14 +42,15 @@ type ImportRequest struct {
|
||||
Async bool `json:"async"`
|
||||
}
|
||||
|
||||
func NewImportRequest(debrid string, downloadFolder string, magnet *utils.Magnet, arr *arr.Arr, isSymlink, downloadUncached bool, callBackUrl string, importType ImportType) *ImportRequest {
|
||||
func NewImportRequest(debrid string, downloadFolder string, magnet *utils.Magnet, arr *arr.Arr, action string, downloadUncached bool, callBackUrl string, importType ImportType) *ImportRequest {
|
||||
return &ImportRequest{
|
||||
Id: uuid.New().String(),
|
||||
Status: "started",
|
||||
DownloadFolder: downloadFolder,
|
||||
SelectedDebrid: debrid,
|
||||
SelectedDebrid: cmp.Or(arr.SelectedDebrid, debrid), // Use debrid from arr if available
|
||||
Magnet: magnet,
|
||||
Arr: arr,
|
||||
IsSymlink: isSymlink,
|
||||
Action: action,
|
||||
DownloadUncached: downloadUncached,
|
||||
CallBackUrl: callBackUrl,
|
||||
Type: importType,
|
||||
@@ -106,21 +110,22 @@ func (i *ImportRequest) markAsCompleted(torrent *Torrent, debridTorrent *debridT
|
||||
}
|
||||
|
||||
type ImportQueue struct {
|
||||
queue map[string]chan *ImportRequest // Map to hold queues for different debrid services
|
||||
mu sync.RWMutex // Mutex to protect access to the queue map
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
capacity int // Capacity of each channel in the queue
|
||||
queue []*ImportRequest
|
||||
mu sync.RWMutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
cond *sync.Cond // For blocking operations
|
||||
}
|
||||
|
||||
func NewImportQueue(ctx context.Context, capacity int) *ImportQueue {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
return &ImportQueue{
|
||||
queue: make(map[string]chan *ImportRequest),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
capacity: capacity,
|
||||
iq := &ImportQueue{
|
||||
queue: make([]*ImportRequest, 0, capacity),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
iq.cond = sync.NewCond(&iq.mu)
|
||||
return iq
|
||||
}
|
||||
|
||||
func (iq *ImportQueue) Push(req *ImportRequest) error {
|
||||
@@ -131,62 +136,104 @@ func (iq *ImportQueue) Push(req *ImportRequest) error {
|
||||
iq.mu.Lock()
|
||||
defer iq.mu.Unlock()
|
||||
|
||||
if _, exists := iq.queue[req.SelectedDebrid]; !exists {
|
||||
iq.queue[req.SelectedDebrid] = make(chan *ImportRequest, iq.capacity) // Create a new channel for the debrid service
|
||||
select {
|
||||
case <-iq.ctx.Done():
|
||||
return fmt.Errorf("queue is shutting down")
|
||||
default:
|
||||
}
|
||||
|
||||
if len(iq.queue) >= cap(iq.queue) {
|
||||
return fmt.Errorf("queue is full")
|
||||
}
|
||||
|
||||
iq.queue = append(iq.queue, req)
|
||||
iq.cond.Signal() // Wake up any waiting Pop()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (iq *ImportQueue) Pop() (*ImportRequest, error) {
|
||||
iq.mu.Lock()
|
||||
defer iq.mu.Unlock()
|
||||
|
||||
select {
|
||||
case iq.queue[req.SelectedDebrid] <- req:
|
||||
return nil
|
||||
case <-iq.ctx.Done():
|
||||
return fmt.Errorf("retry queue is shutting down")
|
||||
return nil, fmt.Errorf("queue is shutting down")
|
||||
default:
|
||||
}
|
||||
|
||||
if len(iq.queue) == 0 {
|
||||
return nil, fmt.Errorf("no import requests available")
|
||||
}
|
||||
|
||||
req := iq.queue[0]
|
||||
iq.queue = iq.queue[1:]
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (iq *ImportQueue) TryPop(selectedDebrid string) (*ImportRequest, error) {
|
||||
iq.mu.RLock()
|
||||
defer iq.mu.RUnlock()
|
||||
// Delete specific request by ID
|
||||
func (iq *ImportQueue) Delete(requestID string) bool {
|
||||
iq.mu.Lock()
|
||||
defer iq.mu.Unlock()
|
||||
|
||||
if ch, exists := iq.queue[selectedDebrid]; exists {
|
||||
select {
|
||||
case req := <-ch:
|
||||
return req, nil
|
||||
case <-iq.ctx.Done():
|
||||
return nil, fmt.Errorf("queue is shutting down")
|
||||
default:
|
||||
return nil, fmt.Errorf("no import request available for %s", selectedDebrid)
|
||||
for i, req := range iq.queue {
|
||||
if req.Id == requestID {
|
||||
// Remove from slice
|
||||
iq.queue = append(iq.queue[:i], iq.queue[i+1:]...)
|
||||
return true
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("no queue exists for %s", selectedDebrid)
|
||||
return false
|
||||
}
|
||||
|
||||
func (iq *ImportQueue) Size(selectedDebrid string) int {
|
||||
// DeleteWhere requests matching a condition
|
||||
func (iq *ImportQueue) DeleteWhere(predicate func(*ImportRequest) bool) int {
|
||||
iq.mu.Lock()
|
||||
defer iq.mu.Unlock()
|
||||
|
||||
deleted := 0
|
||||
for i := len(iq.queue) - 1; i >= 0; i-- {
|
||||
if predicate(iq.queue[i]) {
|
||||
iq.queue = append(iq.queue[:i], iq.queue[i+1:]...)
|
||||
deleted++
|
||||
}
|
||||
}
|
||||
return deleted
|
||||
}
|
||||
|
||||
// Find request without removing it
|
||||
func (iq *ImportQueue) Find(requestID string) *ImportRequest {
|
||||
iq.mu.RLock()
|
||||
defer iq.mu.RUnlock()
|
||||
|
||||
if ch, exists := iq.queue[selectedDebrid]; exists {
|
||||
return len(ch)
|
||||
for _, req := range iq.queue {
|
||||
if req.Id == requestID {
|
||||
return req
|
||||
}
|
||||
}
|
||||
return 0
|
||||
return nil
|
||||
}
|
||||
|
||||
func (iq *ImportQueue) Size() int {
|
||||
iq.mu.RLock()
|
||||
defer iq.mu.RUnlock()
|
||||
return len(iq.queue)
|
||||
}
|
||||
|
||||
func (iq *ImportQueue) IsEmpty() bool {
|
||||
return iq.Size() == 0
|
||||
}
|
||||
|
||||
// List all requests (copy to avoid race conditions)
|
||||
func (iq *ImportQueue) List() []*ImportRequest {
|
||||
iq.mu.RLock()
|
||||
defer iq.mu.RUnlock()
|
||||
|
||||
result := make([]*ImportRequest, len(iq.queue))
|
||||
copy(result, iq.queue)
|
||||
return result
|
||||
}
|
||||
|
||||
func (iq *ImportQueue) Close() {
|
||||
iq.cancel()
|
||||
iq.mu.Lock()
|
||||
defer iq.mu.Unlock()
|
||||
|
||||
for _, ch := range iq.queue {
|
||||
// Drain remaining items before closing
|
||||
for {
|
||||
select {
|
||||
case <-ch:
|
||||
// Discard remaining items
|
||||
default:
|
||||
close(ch)
|
||||
goto nextChannel
|
||||
}
|
||||
}
|
||||
nextChannel:
|
||||
}
|
||||
iq.queue = make(map[string]chan *ImportRequest)
|
||||
iq.cond.Broadcast()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user